From 74b89d5f2cd968e58be9a28f1dbce7a1ebda581e Mon Sep 17 00:00:00 2001
From: Nick White <git@njw.name>
Date: Wed, 28 Aug 2019 18:09:06 +0100
Subject: Split out bookpipeline to cmd/

---
 bookpipeline/aws.go                   |  56 ++---
 bookpipeline/cmd/bookpipeline/main.go | 388 +++++++++++++++++++++++++++++++++
 bookpipeline/graph.go                 |  35 +--
 bookpipeline/main.go                  | 397 ----------------------------------
 4 files changed, 438 insertions(+), 438 deletions(-)
 create mode 100644 bookpipeline/cmd/bookpipeline/main.go
 delete mode 100644 bookpipeline/main.go

(limited to 'bookpipeline')

diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index 761031d..7409434 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -1,4 +1,4 @@
-package main
+package bookpipeline
 
 import (
 	"errors"
@@ -17,10 +17,14 @@ import (
 const PreprocPattern = `_bin[0-9].[0-9].png`
 const HeartbeatTime = 60
 
-type awsConn struct {
+type Qmsg struct {
+        Handle, Body string
+}
+
+type AwsConn struct {
 	// these need to be set before running Init()
-	region string
-	logger *log.Logger
+	Region string
+	Logger *log.Logger
 
 	// these are used internally
 	sess                          *session.Session
@@ -32,17 +36,17 @@ type awsConn struct {
 	wipstorageid                  string
 }
 
-func (a *awsConn) Init() error {
-	if a.region == "" {
-		return errors.New("No region set")
+func (a *AwsConn) Init() error {
+	if a.Region == "" {
+		return errors.New("No Region set")
 	}
-	if a.logger == nil {
+	if a.Logger == nil {
 		return errors.New("No logger set")
 	}
 
 	var err error
 	a.sess, err = session.NewSession(&aws.Config{
-		Region: aws.String(a.region),
+		Region: aws.String(a.Region),
 	})
 	if err != nil {
 		return errors.New(fmt.Sprintf("Failed to set up aws session: %s", err))
@@ -52,7 +56,7 @@ func (a *awsConn) Init() error {
 	a.downloader = s3manager.NewDownloader(a.sess)
 	a.uploader = s3manager.NewUploader(a.sess)
 
-	a.logger.Println("Getting preprocess queue URL")
+	a.Logger.Println("Getting preprocess queue URL")
 	result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
 		QueueName: aws.String("rescribepreprocess"),
 	})
@@ -61,7 +65,7 @@ func (a *awsConn) Init() error {
 	}
 	a.prequrl = *result.QueueUrl
 
-	a.logger.Println("Getting OCR queue URL")
+	a.Logger.Println("Getting OCR queue URL")
 	result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
 		QueueName: aws.String("rescribeocr"),
 	})
@@ -70,7 +74,7 @@ func (a *awsConn) Init() error {
 	}
 	a.ocrqurl = *result.QueueUrl
 
-	a.logger.Println("Getting analyse queue URL")
+	a.Logger.Println("Getting analyse queue URL")
 	result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
 		QueueName: aws.String("rescribeanalyse"),
 	})
@@ -84,7 +88,7 @@ func (a *awsConn) Init() error {
 	return nil
 }
 
-func (a *awsConn) CheckQueue(url string) (Qmsg, error) {
+func (a *AwsConn) CheckQueue(url string) (Qmsg, error) {
 	msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
 		MaxNumberOfMessages: aws.Int64(1),
 		VisibilityTimeout:   aws.Int64(HeartbeatTime * 2),
@@ -97,14 +101,14 @@ func (a *awsConn) CheckQueue(url string) (Qmsg, error) {
 
 	if len(msgResult.Messages) > 0 {
 		msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body}
-		a.logger.Println("Message received:", msg.Body)
+		a.Logger.Println("Message received:", msg.Body)
 		return msg, nil
 	} else {
 		return Qmsg{}, nil
 	}
 }
 
-func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {
+func (a *AwsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {
 	for _ = range t.C {
 		duration := int64(HeartbeatTime * 2)
 		_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
@@ -119,23 +123,23 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string)
 	return nil
 }
 
-func (a *awsConn) PreQueueId() string {
+func (a *AwsConn) PreQueueId() string {
 	return a.prequrl
 }
 
-func (a *awsConn) OCRQueueId() string {
+func (a *AwsConn) OCRQueueId() string {
 	return a.ocrqurl
 }
 
-func (a *awsConn) AnalyseQueueId() string {
+func (a *AwsConn) AnalyseQueueId() string {
 	return a.analysequrl
 }
 
-func (a *awsConn) WIPStorageId() string {
+func (a *AwsConn) WIPStorageId() string {
 	return a.wipstorageid
 }
 
-func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) {
+func (a *AwsConn) ListObjects(bucket string, prefix string) ([]string, error) {
 	var names []string
 	err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{
 		Bucket: aws.String(bucket),
@@ -149,7 +153,7 @@ func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) {
 	return names, err
 }
 
-func (a *awsConn) AddToQueue(url string, msg string) error {
+func (a *AwsConn) AddToQueue(url string, msg string) error {
 	_, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{
 		MessageBody: &msg,
 		QueueUrl:    &url,
@@ -157,7 +161,7 @@ func (a *awsConn) AddToQueue(url string, msg string) error {
 	return err
 }
 
-func (a *awsConn) DelFromQueue(url string, handle string) error {
+func (a *AwsConn) DelFromQueue(url string, handle string) error {
 	_, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{
 		QueueUrl:      &url,
 		ReceiptHandle: &handle,
@@ -165,7 +169,7 @@ func (a *awsConn) DelFromQueue(url string, handle string) error {
 	return err
 }
 
-func (a *awsConn) Download(bucket string, key string, path string) error {
+func (a *AwsConn) Download(bucket string, key string, path string) error {
 	f, err := os.Create(path)
 	if err != nil {
 		return err
@@ -180,7 +184,7 @@ func (a *awsConn) Download(bucket string, key string, path string) error {
 	return err
 }
 
-func (a *awsConn) Upload(bucket string, key string, path string) error {
+func (a *AwsConn) Upload(bucket string, key string, path string) error {
 	file, err := os.Open(path)
 	if err != nil {
 		log.Fatalln("Failed to open file", path, err)
@@ -195,6 +199,6 @@ func (a *awsConn) Upload(bucket string, key string, path string) error {
 	return err
 }
 
-func (a *awsConn) Logger() *log.Logger {
-	return a.logger
+func (a *AwsConn) GetLogger() *log.Logger {
+	return a.Logger
 }
diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go
new file mode 100644
index 0000000..9fa7159
--- /dev/null
+++ b/bookpipeline/cmd/bookpipeline/main.go
@@ -0,0 +1,388 @@
+package main
+
+// TODO: have logs go somewhere useful, like email
+// TODO: check if images are prebinarised and if so skip multiple binarisation
+
+import (
+	"errors"
+	"flag"
+	"fmt"
+	"log"
+	"os"
+	"os/exec"
+	"path/filepath"
+	"regexp"
+	"strings"
+	"time"
+
+	"rescribe.xyz/go.git/bookpipeline"
+	"rescribe.xyz/go.git/lib/hocr"
+	"rescribe.xyz/go.git/preproc"
+)
+
+const usage = `Usage: bookpipeline [-v] [-t training]
+
+Watches the preprocess, ocr and analyse queues for book names. When
+one is found this general process is followed:
+
+- The book name is hidden from the queue, and a 'heartbeat' is
+  started which keeps it hidden (this will time out after 2 minutes
+  if the program is terminated)
+- The necessary files from bookname/ are downloaded
+- The files are processed
+- The resulting files are uploaded to bookname/
+- The heartbeat is stopped
+- The book name is removed from the queue it was taken from, and
+  added to the next queue for future processing
+
+`
+
+const PauseBetweenChecks = 3 * time.Minute
+const HeartbeatTime = 60
+
+// null writer to enable non-verbose logging to be discarded
+type NullWriter bool
+
+func (w NullWriter) Write(p []byte) (n int, err error) {
+	return len(p), nil
+}
+
+type Clouder interface {
+	Init() error
+	ListObjects(bucket string, prefix string) ([]string, error)
+	Download(bucket string, key string, fn string) error
+	Upload(bucket string, key string, path string) error
+	CheckQueue(url string) (bookpipeline.Qmsg, error)
+	AddToQueue(url string, msg string) error
+	DelFromQueue(url string, handle string) error
+	QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error
+}
+
+type Pipeliner interface {
+	Clouder
+	PreQueueId() string
+	OCRQueueId() string
+	AnalyseQueueId() string
+	WIPStorageId() string
+	GetLogger() *log.Logger
+}
+
+func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error) {
+	for key := range dl {
+		fn := filepath.Join(dir, filepath.Base(key))
+		err := conn.Download(conn.WIPStorageId(), key, fn)
+		if err != nil {
+			close(process)
+			errc <- err
+			return
+		}
+		process <- fn
+	}
+	close(process)
+}
+
+func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error) {
+	for path := range c {
+		name := filepath.Base(path)
+		key := filepath.Join(bookname, name)
+		err := conn.Upload(conn.WIPStorageId(), key, path)
+		if err != nil {
+			errc <- err
+			return
+		}
+	}
+
+	done <- true
+}
+
+func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) {
+	for path := range pre {
+		logger.Println("Preprocessing", path)
+		done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30)
+		if err != nil {
+			close(up)
+			errc <- err
+			return
+		}
+		for _, p := range done {
+			up <- p
+		}
+	}
+	close(up)
+}
+
+func ocr(training string) func(chan string, chan string, chan error, *log.Logger) {
+	return func (toocr chan string, up chan string, errc chan error, logger *log.Logger) {
+		for path := range toocr {
+			logger.Println("OCRing", path)
+			name := strings.Replace(path, ".png", "", 1)
+			cmd := exec.Command("tesseract", "-l", training, path, name, "hocr")
+			err := cmd.Run()
+			if err != nil {
+				close(up)
+				errc <- errors.New(fmt.Sprintf("Error ocring %s: %s", path, err))
+				return
+			}
+			up <- name + ".hocr"
+		}
+		close(up)
+	}
+}
+
+func analyse(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) {
+	confs := make(map[string][]*bookpipeline.Conf)
+	bestconfs := make(map[string]*bookpipeline.Conf)
+	savedir := ""
+
+	for path := range toanalyse {
+		if savedir == "" {
+			savedir = filepath.Dir(path)
+		}
+		logger.Println("Calculating confidence for", path)
+		avg, err := hocr.GetAvgConf(path)
+		if err != nil {
+			close(up)
+			errc <- errors.New(fmt.Sprintf("Error retreiving confidence for %s: %s", path, err))
+			return
+		}
+		base := filepath.Base(path)
+		codestart := strings.Index(base, "_bin")
+		name := base[0:codestart]
+		var c bookpipeline.Conf
+		c.Path = path
+		c.Code = base[codestart:]
+		c.Conf = avg
+		confs[name] = append(confs[name], &c)
+
+	}
+
+	fn := filepath.Join(savedir, "conf")
+	logger.Println("Saving confidences in file", fn)
+	f, err := os.Create(fn)
+	if err != nil {
+		close(up)
+		errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err))
+		return
+	}
+	defer f.Close()
+
+	logger.Println("Finding best confidence for each page, and saving all confidences")
+	for base, conf := range confs {
+		var best float64
+		for _, c := range conf {
+			if c.Conf > best {
+				best = c.Conf
+				bestconfs[base] = c
+			}
+			_, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf)
+			if err != nil {
+				close(up)
+				errc <- errors.New(fmt.Sprintf("Error writing confidences file: %s", err))
+				return
+			}
+		}
+	}
+	up <- fn
+
+	logger.Println("Creating best file listing the best file for each page")
+	fn = filepath.Join(savedir, "best")
+	f, err = os.Create(fn)
+	if err != nil {
+		close(up)
+		errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err))
+		return
+	}
+	defer f.Close()
+	for _, conf := range bestconfs {
+		_, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path))
+	}
+	up <- fn
+
+	logger.Println("Creating graph")
+	fn = filepath.Join(savedir, "graph.png")
+	f, err = os.Create(fn)
+	if err != nil {
+		close(up)
+		errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err))
+		return
+	}
+	defer f.Close()
+	err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f)
+	if err != nil {
+		close(up)
+		errc <- errors.New(fmt.Sprintf("Error rendering graph: %s", err))
+		return
+	}
+	up <- fn
+
+	close(up)
+}
+
+func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
+	bookname := msg.Body
+
+	t := time.NewTicker(HeartbeatTime * time.Second)
+	go conn.QueueHeartbeat(t, msg.Handle, fromQueue)
+
+	d := filepath.Join(os.TempDir(), bookname)
+	err := os.MkdirAll(d, 0755)
+	if err != nil {
+		t.Stop()
+		return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err))
+	}
+
+	dl := make(chan string)
+	processc := make(chan string)
+	upc := make(chan string)
+	done := make(chan bool)
+	errc := make(chan error)
+
+	// these functions will do their jobs when their channels have data
+	go download(dl, processc, conn, d, errc)
+	go process(processc, upc, errc, conn.GetLogger())
+	go up(upc, done, conn, bookname, errc)
+
+	conn.GetLogger().Println("Getting list of objects to download")
+	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
+	if err != nil {
+		t.Stop()
+		_ = os.RemoveAll(d)
+		return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err))
+	}
+	var todl []string
+	for _, n := range objs {
+		if !match.MatchString(n) {
+			conn.GetLogger().Println("Skipping item that doesn't match target", n)
+			continue
+		}
+		todl = append(todl, n)
+	}
+	for _, a := range todl {
+		dl <- a
+	}
+	close(dl)
+
+	// wait for either the done or errc channel to be sent to
+	select {
+	case err = <-errc:
+		t.Stop()
+		_ = os.RemoveAll(d)
+		return err
+	case <-done:
+	}
+
+	if toQueue != "" {
+		conn.GetLogger().Println("Sending", bookname, "to queue")
+		err = conn.AddToQueue(toQueue, bookname)
+		if err != nil {
+			t.Stop()
+			_ = os.RemoveAll(d)
+			return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err))
+		}
+	}
+
+	t.Stop()
+
+	conn.GetLogger().Println("Deleting original message from queue")
+	err = conn.DelFromQueue(fromQueue, msg.Handle)
+	if err != nil {
+		_ = os.RemoveAll(d)
+		return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err))
+	}
+
+	err = os.RemoveAll(d)
+	if err != nil {
+		return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err))
+	}
+
+	return nil
+}
+
+func main() {
+	verbose := flag.Bool("v", false, "verbose")
+	training := flag.String("t", "rescribealphav5", "tesseract training file to use")
+	flag.Usage = func() {
+		fmt.Fprintf(flag.CommandLine.Output(), usage)
+		flag.PrintDefaults()
+	}
+	flag.Parse()
+
+	var verboselog *log.Logger
+	if *verbose {
+		verboselog = log.New(os.Stdout, "", log.LstdFlags)
+	} else {
+		var n NullWriter
+		verboselog = log.New(n, "", log.LstdFlags)
+	}
+
+	origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) // TODO: match alternative file naming
+	preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`)
+	ocredPattern := regexp.MustCompile(`.hocr$`)
+
+	var conn Pipeliner
+	conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
+
+	verboselog.Println("Setting up AWS session")
+	err := conn.Init()
+	if err != nil {
+		log.Fatalln("Error setting up cloud connection:", err)
+	}
+	verboselog.Println("Finished setting up AWS session")
+
+	var checkPreQueue <-chan time.Time
+	var checkOCRQueue <-chan time.Time
+	var checkAnalyseQueue <-chan time.Time
+	checkPreQueue = time.After(0)
+	checkOCRQueue = time.After(0)
+	checkAnalyseQueue = time.After(0)
+
+	for {
+		select {
+		case <-checkPreQueue:
+			msg, err := conn.CheckQueue(conn.PreQueueId())
+			checkPreQueue = time.After(PauseBetweenChecks)
+			if err != nil {
+				log.Println("Error checking preprocess queue", err)
+				continue
+			}
+			if msg.Handle == "" {
+				verboselog.Println("No message received on preprocess queue, sleeping")
+				continue
+			}
+			err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId())
+			if err != nil {
+				log.Println("Error during preprocess", err)
+			}
+		case <-checkOCRQueue:
+			msg, err := conn.CheckQueue(conn.OCRQueueId())
+			checkOCRQueue = time.After(PauseBetweenChecks)
+			if err != nil {
+				log.Println("Error checking OCR queue", err)
+				continue
+			}
+			if msg.Handle == "" {
+				verboselog.Println("No message received on OCR queue, sleeping")
+				continue
+			}
+			err = processBook(msg, conn, ocr(*training), preprocessedPattern, conn.OCRQueueId(), conn.AnalyseQueueId())
+			if err != nil {
+				log.Println("Error during OCR process", err)
+			}
+		case <-checkAnalyseQueue:
+			msg, err := conn.CheckQueue(conn.AnalyseQueueId())
+			checkAnalyseQueue = time.After(PauseBetweenChecks)
+			if err != nil {
+				log.Println("Error checking analyse queue", err)
+				continue
+			}
+			if msg.Handle == "" {
+				verboselog.Println("No message received on analyse queue, sleeping")
+				continue
+			}
+			err = processBook(msg, conn, analyse, ocredPattern, conn.AnalyseQueueId(), "")
+			if err != nil {
+				log.Println("Error during analysis", err)
+			}
+		}
+	}
+}
diff --git a/bookpipeline/graph.go b/bookpipeline/graph.go
index 27ffd39..a4fdee0 100644
--- a/bookpipeline/graph.go
+++ b/bookpipeline/graph.go
@@ -1,4 +1,4 @@
-package main
+package bookpipeline
 
 import (
 	"fmt"
@@ -14,26 +14,31 @@ import (
 const maxticks = 20
 const cutoff = 70
 
+type Conf struct {
+        Path, Code string
+        Conf float64
+}
+
 type GraphConf struct {
-	pgnum, conf float64
+	Pgnum, Conf float64
 }
 
-func graph(confs map[string]*Conf, bookname string, w io.Writer) (error) {
+func Graph(confs map[string]*Conf, bookname string, w io.Writer) (error) {
 	// Organise confs to sort them by page
 	var graphconf []GraphConf
 	for _, conf := range confs {
-		name := filepath.Base(conf.path)
+		name := filepath.Base(conf.Path)
 		numend := strings.Index(name, "_")
 		pgnum, err := strconv.ParseFloat(name[0:numend], 64)
 		if err != nil {
 			continue
 		}
 		var c GraphConf
-		c.pgnum = pgnum
-		c.conf = conf.conf
+		c.Pgnum = pgnum
+		c.Conf = conf.Conf
 		graphconf = append(graphconf, c)
 	}
-	sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].pgnum < graphconf[j].pgnum })
+	sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].Pgnum < graphconf[j].Pgnum })
 
 	// Create main xvalues and yvalues, annotations and ticks
 	var xvalues, yvalues []float64
@@ -43,13 +48,13 @@ func graph(confs map[string]*Conf, bookname string, w io.Writer) (error) {
 	tickevery := len(graphconf) / maxticks
 	for _, c := range graphconf {
 		i = i + 1
-		xvalues = append(xvalues, c.pgnum)
-		yvalues = append(yvalues, c.conf)
-		if c.conf < cutoff {
-			annotations = append(annotations, chart.Value2{Label: fmt.Sprintf("%.0f", c.pgnum), XValue: c.pgnum, YValue: c.conf})
+		xvalues = append(xvalues, c.Pgnum)
+		yvalues = append(yvalues, c.Conf)
+		if c.Conf < cutoff {
+			annotations = append(annotations, chart.Value2{Label: fmt.Sprintf("%.0f", c.Pgnum), XValue: c.Pgnum, YValue: c.Conf})
 		}
 		if tickevery % i == 0 {
-			ticks = append(ticks, chart.Tick{c.pgnum, fmt.Sprintf("%.0f", c.pgnum)})
+			ticks = append(ticks, chart.Tick{c.Pgnum, fmt.Sprintf("%.0f", c.Pgnum)})
 		}
 	}
 	mainSeries := chart.ContinuousSeries{
@@ -73,9 +78,9 @@ func graph(confs map[string]*Conf, bookname string, w io.Writer) (error) {
 	}
 
 	// Create lines marking top and bottom 10% confidence
-	sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].conf < graphconf[j].conf })
-	lowconf := graphconf[int(len(graphconf) / 10)].conf
-	highconf := graphconf[int((len(graphconf) / 10) * 9)].conf
+	sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].Conf < graphconf[j].Conf })
+	lowconf := graphconf[int(len(graphconf) / 10)].Conf
+	highconf := graphconf[int((len(graphconf) / 10) * 9)].Conf
 	yvalues = []float64{}
 	for _ = range graphconf {
 		yvalues = append(yvalues, lowconf)
diff --git a/bookpipeline/main.go b/bookpipeline/main.go
deleted file mode 100644
index b7b01dd..0000000
--- a/bookpipeline/main.go
+++ /dev/null
@@ -1,397 +0,0 @@
-package main
-
-// TODO: have logs go somewhere useful, like email
-// TODO: check if images are prebinarised and if so skip multiple binarisation
-
-import (
-	"errors"
-	"flag"
-	"fmt"
-	"log"
-	"os"
-	"os/exec"
-	"path/filepath"
-	"regexp"
-	"strings"
-	"time"
-
-	"rescribe.xyz/go.git/lib/hocr"
-	"rescribe.xyz/go.git/preproc"
-)
-
-const usage = `Usage: bookpipeline [-v] [-t training]
-
-Watches the preprocess, ocr and analyse queues for book names. When
-one is found this general process is followed:
-
-- The book name is hidden from the queue, and a 'heartbeat' is
-  started which keeps it hidden (this will time out after 2 minutes
-  if the program is terminated)
-- The necessary files from bookname/ are downloaded
-- The files are processed
-- The resulting files are uploaded to bookname/
-- The heartbeat is stopped
-- The book name is removed from the queue it was taken from, and
-  added to the next queue for future processing
-
-`
-
-// null writer to enable non-verbose logging to be discarded
-type NullWriter bool
-
-func (w NullWriter) Write(p []byte) (n int, err error) {
-	return len(p), nil
-}
-
-const PauseBetweenChecks = 3 * time.Minute
-
-type Clouder interface {
-	Init() error
-	ListObjects(bucket string, prefix string) ([]string, error)
-	Download(bucket string, key string, fn string) error
-	Upload(bucket string, key string, path string) error
-	CheckQueue(url string) (Qmsg, error)
-	AddToQueue(url string, msg string) error
-	DelFromQueue(url string, handle string) error
-	QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error
-}
-
-type Pipeliner interface {
-	Clouder
-	PreQueueId() string
-	OCRQueueId() string
-	AnalyseQueueId() string
-	WIPStorageId() string
-	Logger() *log.Logger
-}
-
-type Qmsg struct {
-	Handle, Body string
-}
-
-func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error) {
-	for key := range dl {
-		fn := filepath.Join(dir, filepath.Base(key))
-		err := conn.Download(conn.WIPStorageId(), key, fn)
-		if err != nil {
-			close(process)
-			errc <- err
-			return
-		}
-		process <- fn
-	}
-	close(process)
-}
-
-func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error) {
-	for path := range c {
-		name := filepath.Base(path)
-		key := filepath.Join(bookname, name)
-		err := conn.Upload(conn.WIPStorageId(), key, path)
-		if err != nil {
-			errc <- err
-			return
-		}
-	}
-
-	done <- true
-}
-
-func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) {
-	for path := range pre {
-		logger.Println("Preprocessing", path)
-		done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30)
-		if err != nil {
-			close(up)
-			errc <- err
-			return
-		}
-		for _, p := range done {
-			up <- p
-		}
-	}
-	close(up)
-}
-
-func ocr(training string) func(chan string, chan string, chan error, *log.Logger) {
-	return func (toocr chan string, up chan string, errc chan error, logger *log.Logger) {
-		for path := range toocr {
-			logger.Println("OCRing", path)
-			name := strings.Replace(path, ".png", "", 1)
-			cmd := exec.Command("tesseract", "-l", training, path, name, "hocr")
-			err := cmd.Run()
-			if err != nil {
-				close(up)
-				errc <- errors.New(fmt.Sprintf("Error ocring %s: %s", path, err))
-				return
-			}
-			up <- name + ".hocr"
-		}
-		close(up)
-	}
-}
-
-type Conf struct {
-	path, code string
-	conf float64
-}
-
-func analyse(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) {
-	confs := make(map[string][]*Conf)
-	bestconfs := make(map[string]*Conf)
-	savedir := ""
-
-	for path := range toanalyse {
-		if savedir == "" {
-			savedir = filepath.Dir(path)
-		}
-		logger.Println("Calculating confidence for", path)
-		avg, err := hocr.GetAvgConf(path)
-		if err != nil {
-			close(up)
-			errc <- errors.New(fmt.Sprintf("Error retreiving confidence for %s: %s", path, err))
-			return
-		}
-		base := filepath.Base(path)
-		codestart := strings.Index(base, "_bin")
-		name := base[0:codestart]
-		var c Conf
-		c.path = path
-		c.code = base[codestart:]
-		c.conf = avg
-		confs[name] = append(confs[name], &c)
-
-	}
-
-	fn := filepath.Join(savedir, "conf")
-	logger.Println("Saving confidences in file", fn)
-	f, err := os.Create(fn)
-	if err != nil {
-		close(up)
-		errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err))
-		return
-	}
-	defer f.Close()
-
-	logger.Println("Finding best confidence for each page, and saving all confidences")
-	for base, conf := range confs {
-		var best float64
-		for _, c := range conf {
-			if c.conf > best {
-				best = c.conf
-				bestconfs[base] = c
-			}
-			_, err = fmt.Fprintf(f, "%s\t%02.f\n", c.path, c.conf)
-			if err != nil {
-				close(up)
-				errc <- errors.New(fmt.Sprintf("Error writing confidences file: %s", err))
-				return
-			}
-		}
-	}
-	up <- fn
-
-	logger.Println("Creating best file listing the best file for each page")
-	fn = filepath.Join(savedir, "best")
-	f, err = os.Create(fn)
-	if err != nil {
-		close(up)
-		errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err))
-		return
-	}
-	defer f.Close()
-	for _, conf := range bestconfs {
-		_, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.path))
-	}
-	up <- fn
-
-	logger.Println("Creating graph")
-	fn = filepath.Join(savedir, "graph.png")
-	f, err = os.Create(fn)
-	if err != nil {
-		close(up)
-		errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err))
-		return
-	}
-	defer f.Close()
-	err = graph(bestconfs, filepath.Base(savedir), f)
-	if err != nil {
-		close(up)
-		errc <- errors.New(fmt.Sprintf("Error rendering graph: %s", err))
-		return
-	}
-	up <- fn
-
-	// TODO: generate a general report.txt with statistics etc for the book, send to up
-
-	close(up)
-}
-
-func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
-	bookname := msg.Body
-
-	t := time.NewTicker(HeartbeatTime * time.Second)
-	go conn.QueueHeartbeat(t, msg.Handle, fromQueue)
-
-	d := filepath.Join(os.TempDir(), bookname)
-	err := os.MkdirAll(d, 0755)
-	if err != nil {
-		t.Stop()
-		return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err))
-	}
-
-	dl := make(chan string)
-	processc := make(chan string)
-	upc := make(chan string)
-	done := make(chan bool)
-	errc := make(chan error)
-
-	// these functions will do their jobs when their channels have data
-	go download(dl, processc, conn, d, errc)
-	go process(processc, upc, errc, conn.Logger())
-	go up(upc, done, conn, bookname, errc)
-
-	conn.Logger().Println("Getting list of objects to download")
-	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
-	if err != nil {
-		t.Stop()
-		_ = os.RemoveAll(d)
-		return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err))
-	}
-	var todl []string
-	for _, n := range objs {
-		if !match.MatchString(n) {
-			conn.Logger().Println("Skipping item that doesn't match target", n)
-			continue
-		}
-		todl = append(todl, n)
-	}
-	for _, a := range todl {
-		dl <- a
-	}
-	close(dl)
-
-	// wait for either the done or errc channel to be sent to
-	select {
-	case err = <-errc:
-		t.Stop()
-		_ = os.RemoveAll(d)
-		return err
-	case <-done:
-	}
-
-	if toQueue != "" {
-		conn.Logger().Println("Sending", bookname, "to queue")
-		err = conn.AddToQueue(toQueue, bookname)
-		if err != nil {
-			t.Stop()
-			_ = os.RemoveAll(d)
-			return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err))
-		}
-	}
-
-	t.Stop()
-
-	conn.Logger().Println("Deleting original message from queue")
-	err = conn.DelFromQueue(fromQueue, msg.Handle)
-	if err != nil {
-		_ = os.RemoveAll(d)
-		return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err))
-	}
-
-	err = os.RemoveAll(d)
-	if err != nil {
-		return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err))
-	}
-
-	return nil
-}
-
-func main() {
-	verbose := flag.Bool("v", false, "verbose")
-	training := flag.String("t", "rescribealphav5", "tesseract training file to use")
-	flag.Usage = func() {
-		fmt.Fprintf(flag.CommandLine.Output(), usage)
-		flag.PrintDefaults()
-	}
-	flag.Parse()
-
-	var verboselog *log.Logger
-	if *verbose {
-		verboselog = log.New(os.Stdout, "", log.LstdFlags)
-	} else {
-		var n NullWriter
-		verboselog = log.New(n, "", log.LstdFlags)
-	}
-
-	origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) // TODO: match alternative file naming
-	preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`)
-	ocredPattern := regexp.MustCompile(`.hocr$`)
-
-	var conn Pipeliner
-	conn = &awsConn{region: "eu-west-2", logger: verboselog}
-
-	verboselog.Println("Setting up AWS session")
-	err := conn.Init()
-	if err != nil {
-		log.Fatalln("Error setting up cloud connection:", err)
-	}
-	verboselog.Println("Finished setting up AWS session")
-
-	var checkPreQueue <-chan time.Time
-	var checkOCRQueue <-chan time.Time
-	var checkAnalyseQueue <-chan time.Time
-	checkPreQueue = time.After(0)
-	checkOCRQueue = time.After(0)
-	checkAnalyseQueue = time.After(0)
-
-	for {
-		select {
-		case <-checkPreQueue:
-			msg, err := conn.CheckQueue(conn.PreQueueId())
-			checkPreQueue = time.After(PauseBetweenChecks)
-			if err != nil {
-				log.Println("Error checking preprocess queue", err)
-				continue
-			}
-			if msg.Handle == "" {
-				verboselog.Println("No message received on preprocess queue, sleeping")
-				continue
-			}
-			err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId())
-			if err != nil {
-				log.Println("Error during preprocess", err)
-			}
-		case <-checkOCRQueue:
-			msg, err := conn.CheckQueue(conn.OCRQueueId())
-			checkOCRQueue = time.After(PauseBetweenChecks)
-			if err != nil {
-				log.Println("Error checking OCR queue", err)
-				continue
-			}
-			if msg.Handle == "" {
-				verboselog.Println("No message received on OCR queue, sleeping")
-				continue
-			}
-			err = processBook(msg, conn, ocr(*training), preprocessedPattern, conn.OCRQueueId(), conn.AnalyseQueueId())
-			if err != nil {
-				log.Println("Error during OCR process", err)
-			}
-		case <-checkAnalyseQueue:
-			msg, err := conn.CheckQueue(conn.AnalyseQueueId())
-			checkAnalyseQueue = time.After(PauseBetweenChecks)
-			if err != nil {
-				log.Println("Error checking analyse queue", err)
-				continue
-			}
-			if msg.Handle == "" {
-				verboselog.Println("No message received on analyse queue, sleeping")
-				continue
-			}
-			err = processBook(msg, conn, analyse, ocredPattern, conn.AnalyseQueueId(), "")
-			if err != nil {
-				log.Println("Error during analysis", err)
-			}
-		}
-	}
-}
-- 
cgit v1.2.1-24-ge1ad