From 9f588a71e9a2d7ad179890d0fc19372fae047b04 Mon Sep 17 00:00:00 2001
From: Nick White <git@njw.name>
Date: Tue, 20 Aug 2019 16:11:03 +0100
Subject: Add basic OCR support, and reorganise code

The previously committed thing didn't work, as listobjects was sending
to a channel synchronously, so it was never being received.

The current API isn't great, mixing synchronous and non-synchronous things,
not handling errors consistently, and generally is over complicated. That
will be fixed soon.
---
 pipelinepreprocess/aws.go  |  88 +++++++++++++--
 pipelinepreprocess/main.go | 259 +++++++++++++++++++++++++++++++++------------
 2 files changed, 269 insertions(+), 78 deletions(-)

diff --git a/pipelinepreprocess/aws.go b/pipelinepreprocess/aws.go
index bb969ed..75bf81c 100644
--- a/pipelinepreprocess/aws.go
+++ b/pipelinepreprocess/aws.go
@@ -29,7 +29,7 @@ type awsConn struct {
         sqssvc *sqs.SQS
         downloader *s3manager.Downloader
 	uploader *s3manager.Uploader
-	prequrl, ocrqurl string
+	prequrl, ocrqurl, analysequrl string
 }
 
 func (a *awsConn) Init() error {
@@ -69,6 +69,16 @@ func (a *awsConn) Init() error {
                 return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err))
         }
         a.ocrqurl = *result.QueueUrl
+
+        a.logger.Println("Getting analyse queue URL")
+        result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+                QueueName: aws.String("rescribeanalyse"),
+        })
+        if err != nil {
+                return errors.New(fmt.Sprintf("Error getting analyse queue URL: %s", err))
+        }
+        a.analysequrl = *result.QueueUrl
+
 	return nil
 }
 
@@ -97,6 +107,16 @@ func (a *awsConn) CheckPreQueue() (Qmsg, error) {
 	return a.CheckQueue(a.prequrl)
 }
 
+func (a *awsConn) CheckOCRQueue() (Qmsg, error) {
+	a.logger.Println("Checking OCR queue for new messages")
+	return a.CheckQueue(a.ocrqurl)
+}
+
+func (a *awsConn) CheckAnalyseQueue() (Qmsg, error) {
+	a.logger.Println("Checking analyse queue for new messages")
+	return a.CheckQueue(a.ocrqurl)
+}
+
 func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {
 	for _ = range t.C {
 		duration := int64(HeartbeatTime * 2)
@@ -113,31 +133,65 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string)
 }
 
 func (a *awsConn) PreQueueHeartbeat(t *time.Ticker, msgHandle string) error {
-	a.logger.Println("Starting preprocess queue heartbeat for", msgHandle)
+	a.logger.Println("Starting preprocess queue heartbeat")
 	return a.QueueHeartbeat(t, msgHandle, a.prequrl)
 }
 
-func (a *awsConn) ListObjects(bucket string, prefix string, names chan string) error {
-	alreadydone := regexp.MustCompile(PreprocPattern)
+func (a *awsConn) OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error {
+	a.logger.Println("Starting ocr queue heartbeat")
+	return a.QueueHeartbeat(t, msgHandle, a.ocrqurl)
+}
+
+func (a *awsConn) ListObjects(bucket string, prefix string, names chan string) {
 	err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{
 		Bucket: aws.String(bucket),
 		Prefix: aws.String(prefix),
 	}, func(page *s3.ListObjectsV2Output, last bool) bool {
 		for _, r := range page.Contents {
-			if alreadydone.MatchString(*r.Key) {
-				a.logger.Println("Skipping item that looks like it has already been processed", *r.Key)
-				continue
-			}
 			names <- *r.Key
 		}
 		return true
 	})
 	close(names)
-	return err
+	if err != nil {
+		// TODO: handle error properly
+		log.Println("Error getting objects")
+	}
 }
 
-func (a *awsConn) ListInProgress(bookname string, names chan string) error {
-	return a.ListObjects("rescribeinprogress", bookname, names)
+func (a *awsConn) ListToPreprocess(bookname string, names chan string) error {
+	objs := make(chan string)
+	preprocessed := regexp.MustCompile(PreprocPattern)
+	go a.ListObjects("rescribeinprogress", bookname, objs)
+	// Filter out any object that looks like it's already been preprocessed
+	for n := range objs {
+		if preprocessed.MatchString(n) {
+			a.logger.Println("Skipping item that looks like it has already been processed", n)
+			continue
+		}
+		names <- n
+	}
+	close(names)
+	// TODO: handle errors from ListObjects
+	return nil
+}
+
+func (a *awsConn) ListToOCR(bookname string, names chan string) error {
+	objs := make(chan string)
+	preprocessed := regexp.MustCompile(PreprocPattern)
+	go a.ListObjects("rescribeinprogress", bookname, objs)
+	a.logger.Println("Completed running listobjects")
+	// Filter out any object that looks like it hasn't already been preprocessed
+	for n := range objs {
+		if ! preprocessed.MatchString(n) {
+			a.logger.Println("Skipping item that looks like it is not preprocessed", n)
+			continue
+		}
+		names <- n
+	}
+	close(names)
+	// TODO: handle errors from ListObjects
+	return nil
 }
 
 func (a *awsConn) AddToQueue(url string, msg string) error {
@@ -152,6 +206,10 @@ func (a *awsConn) AddToOCRQueue(msg string) error {
 	return a.AddToQueue(a.ocrqurl, msg)
 }
 
+func (a *awsConn) AddToAnalyseQueue(msg string) error {
+	return a.AddToQueue(a.analysequrl, msg)
+}
+
 func (a *awsConn) DelFromQueue(url string, handle string) error {
 	_, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{
 		QueueUrl: &url,
@@ -164,6 +222,10 @@ func (a *awsConn) DelFromPreQueue(handle string) error {
 	return a.DelFromQueue(a.prequrl, handle)
 }
 
+func (a *awsConn) DelFromOCRQueue(handle string) error {
+	return a.DelFromQueue(a.ocrqurl, handle)
+}
+
 func (a *awsConn) Download(bucket string, key string, path string) error {
 	f, err := os.Create(path)
 	if err != nil {
@@ -203,3 +265,7 @@ func (a *awsConn) UploadToInProgress(key string, path string) error {
 	a.logger.Println("Uploading", path)
 	return a.Upload("rescribeinprogress", key, path)
 }
+
+func (a *awsConn) Logger() *log.Logger {
+	return a.logger
+}
diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go
index a223d0b..61dec96 100644
--- a/pipelinepreprocess/main.go
+++ b/pipelinepreprocess/main.go
@@ -8,7 +8,9 @@ package main
 import (
 	"log"
 	"os"
+	"os/exec"
 	"path/filepath"
+	"strings"
 	"time"
 
 	"rescribe.xyz/go.git/preproc"
@@ -16,6 +18,8 @@ import (
 
 const usage = "Usage: pipelinepreprocess [-v]\n\nContinuously checks the preprocess queue for books.\nWhen a book is found it's downloaded from the S3 inprogress bucket, preprocessed, and the results are uploaded to the S3 inprogress bucket. The book name is then added to the ocr queue, and removed from the preprocess queue.\n\n-v  verbose\n"
 
+const training = "rescribealphav5" // TODO: allow to set on cmdline
+
 // null writer to enable non-verbose logging to be discarded
 type NullWriter bool
 func (w NullWriter) Write(p []byte) (n int, err error) {
@@ -28,7 +32,8 @@ const PauseBetweenChecks = 60 * time.Second
 
 type Clouder interface {
 	Init() error
-	ListObjects(bucket string, prefix string, names chan string) error
+	//ListObjects(bucket string, prefix string, names chan string) error
+	ListObjects(bucket string, prefix string, names chan string)
 	Download(bucket string, key string, fn string) error
 	Upload(bucket string, key string, path string) error
 	CheckQueue(url string) (Qmsg, error)
@@ -39,13 +44,20 @@ type Clouder interface {
 
 type Pipeliner interface {
 	Clouder
-	ListInProgress(bookname string, names chan string) error
+	ListToPreprocess(bookname string, names chan string) error
+	ListToOCR(bookname string, names chan string) error
 	DownloadFromInProgress(key string, fn string) error
 	UploadToInProgress(key string, path string) error
 	CheckPreQueue() (Qmsg, error)
+	CheckOCRQueue() (Qmsg, error)
+	CheckAnalyseQueue() (Qmsg, error)
 	AddToOCRQueue(msg string) error
+	AddToAnalyseQueue(msg string) error
 	DelFromPreQueue(handle string) error
+	DelFromOCRQueue(handle string) error
 	PreQueueHeartbeat(t *time.Ticker, msgHandle string) error
+	OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error
+	Logger() *log.Logger
 }
 
 type Qmsg struct {
@@ -64,11 +76,27 @@ func download(dl chan string, pre chan string, conn Pipeliner, dir string) {
 	close(pre)
 }
 
+func up(c chan string, done chan bool, conn Pipeliner, bookname string) {
+	for path := range c {
+		name := filepath.Base(path)
+		key := filepath.Join(bookname, name)
+		err := conn.UploadToInProgress(key, path)
+		if err != nil {
+			log.Fatalln("Failed to upload", path, err)
+		}
+	}
+
+	done <- true
+}
+
 func preprocess(pre chan string, up chan string, logger *log.Logger) {
 	for path := range pre {
 		logger.Println("Preprocessing", path)
 		done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30)
 		if err != nil {
+			// TODO: have error channel to signal that things are screwy, which
+			// can close channels and stop the heartbeat, rather than just kill
+			// the whole program
 			log.Fatalln("Error preprocessing", path, err)
 		}
 		for _, p := range done {
@@ -78,17 +106,137 @@ func preprocess(pre chan string, up chan string, logger *log.Logger) {
 	close(up)
 }
 
-func up(c chan string, done chan bool, conn Pipeliner, bookname string) {
-	for path := range c {
-		name := filepath.Base(path)
-		key := filepath.Join(bookname, name)
-		err := conn.UploadToInProgress(key, path)
+// TODO: use Tesseract API rather than calling the executable
+func ocr(toocr chan string, up chan string, logger *log.Logger) {
+	for path := range toocr {
+		logger.Println("OCRing", path)
+		name := strings.Replace(path, ".png", "", 1) // TODO: handle any file extension
+		cmd := exec.Command("tesseract", "-l", training, path, name, "hocr")
+		err := cmd.Run()
 		if err != nil {
-			log.Fatalln("Failed to upload", path, err)
+			// TODO: have error channel to signal that things are screwy, which
+			// can close channels and stop the heartbeat, rather than just kill
+			// the whole program
+			log.Fatalln("Error ocring", path, err)
 		}
+		up <- name + ".hocr"
 	}
+	close(up)
+}
 
-	done <- true
+func preProcBook(msg Qmsg, conn Pipeliner) {
+	bookname := msg.Body
+
+	t := time.NewTicker(HeartbeatTime * time.Second)
+	go conn.PreQueueHeartbeat(t, msg.Handle)
+
+	d := filepath.Join(os.TempDir(), bookname)
+	err := os.MkdirAll(d, 0755)
+	if err != nil {
+		log.Println("Failed to create directory", d, err)
+		t.Stop()
+		return
+	}
+
+	dl := make(chan string)
+	pre := make(chan string)
+	upc := make(chan string) // TODO: rename
+	done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated
+
+	// these functions will do their jobs when their channels have data
+	go download(dl, pre, conn, d)
+	go preprocess(pre, upc, conn.Logger())
+	go up(upc, done, conn, bookname)
+
+	conn.Logger().Println("Getting list of objects to download")
+	err = conn.ListToPreprocess(bookname, dl)
+	if err != nil {
+		log.Println("Failed to get list of files for book", bookname, err)
+		t.Stop()
+		return
+	}
+
+	// wait for the done channel to be posted to
+	<-done
+
+	conn.Logger().Println("Sending", bookname, "to OCR queue")
+	err = conn.AddToOCRQueue(bookname)
+	if err != nil {
+		log.Println("Error adding to ocr queue", bookname, err)
+		t.Stop()
+		return
+	}
+
+	t.Stop()
+
+	conn.Logger().Println("Deleting original message from preprocessing queue")
+	err = conn.DelFromPreQueue(msg.Handle)
+	if err != nil {
+		log.Println("Error deleting message from preprocessing queue", err)
+	}
+
+	err = os.RemoveAll(d)
+	if err != nil {
+		log.Println("Failed to remove directory", d, err)
+	}
+}
+
+func ocrBook(msg Qmsg, conn Pipeliner) {
+	bookname := msg.Body
+
+	t := time.NewTicker(HeartbeatTime * time.Second)
+	go conn.OCRQueueHeartbeat(t, msg.Handle)
+
+	d := filepath.Join(os.TempDir(), bookname)
+	err := os.MkdirAll(d, 0755)
+	if err != nil {
+		log.Println("Failed to create directory", d, err)
+		t.Stop()
+		return
+	}
+
+	dl := make(chan string)
+	ocrc := make(chan string)
+	upc := make(chan string) // TODO: rename
+	done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated
+
+	// these functions will do their jobs when their channels have data
+	go download(dl, ocrc, conn, d)
+	go ocr(ocrc, upc, conn.Logger())
+	go up(upc, done, conn, bookname)
+
+	conn.Logger().Println("Getting list of objects to download")
+	go conn.ListToOCR(bookname, dl)
+	//err = conn.ListToOCR(bookname, dl)
+	//if err != nil {
+	//	log.Println("Failed to get list of files for book", bookname, err)
+	//	t.Stop()
+	//	return
+	//}
+
+	// wait for the done channel to be posted to
+	<-done
+
+	conn.Logger().Println("Sending", bookname, "to analyse queue")
+	err = conn.AddToAnalyseQueue(bookname)
+	if err != nil {
+		log.Println("Error adding to analyse queue", bookname, err)
+		t.Stop()
+		return
+	}
+
+	t.Stop()
+
+	conn.Logger().Println("Deleting original message from OCR queue")
+	err = conn.DelFromOCRQueue(msg.Handle)
+	if err != nil {
+		log.Println("Error deleting message from OCR queue", err)
+	}
+
+	err = os.RemoveAll(d)
+	if err != nil {
+		log.Println("Failed to remove directory", d, err)
+	}
 }
 
 func main() {
@@ -112,66 +260,43 @@ func main() {
 	if err != nil {
 		log.Fatalln("Error setting up cloud connection:", err)
 	}
+	verboselog.Println("Finished setting up AWS session")
 
-	for {
-		msg, err := conn.CheckPreQueue()
-		if err != nil {
-			log.Fatalln("Error checking preprocess queue", err)
-		}
-		if msg.Handle == "" {
-			verboselog.Println("No message received, sleeping")
-			time.Sleep(PauseBetweenChecks)
-			continue
-		}
-		bookname := msg.Body
-
-		t := time.NewTicker(HeartbeatTime * time.Second)
-		go conn.PreQueueHeartbeat(t, msg.Handle)
-
-
-		d := filepath.Join(os.TempDir(), bookname)
-		err = os.MkdirAll(d, 0755)
-		if err != nil {
-			log.Fatalln("Failed to create directory", d, err)
-		}
-
-		dl := make(chan string)
-		pre := make(chan string)
-		upc := make(chan string) // TODO: rename
-		done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated
-
-		// these functions will do their jobs when their channels have data
-		go download(dl, pre, conn, d)
-		go preprocess(pre, upc, verboselog)
-		go up(upc, done, conn, bookname)
-
-
-		verboselog.Println("Getting list of objects to download")
-		err = conn.ListInProgress(bookname, dl)
-		if err != nil {
-			log.Fatalln("Failed to get list of files for book", bookname, err)
-		}
+	var checkPreQueue <-chan time.Time
+	var checkOCRQueue <-chan time.Time
+	checkPreQueue = time.After(0)
+	checkOCRQueue = time.After(0)
 
-		// wait for the done channel to be posted to
-		<-done
-
-		verboselog.Println("Sending", bookname, "to OCR queue")
-		err = conn.AddToOCRQueue(bookname)
-		if err != nil {
-			log.Fatalln("Error adding to ocr queue", bookname, err)
-		}
-
-		t.Stop()
-
-		verboselog.Println("Deleting original message from preprocessing queue")
-		err = conn.DelFromPreQueue(msg.Handle)
-		if err != nil {
-			log.Fatalln("Error deleting message from preprocessing queue", err)
-		}
-
-		err = os.RemoveAll(d)
-		if err != nil {
-			log.Fatalln("Failed to remove directory", d, err)
+	// TODO: use a buffer or something to limit number of running processes
+	//       could start preprocbook / ocrbook and just have them listen on
+	//       channels for stuff to do, that way they'd do things one at a time
+	// TODO: don't trigger the checkOCRQueue until a running thing has finished
+	for {
+		select {
+		case <- checkPreQueue:
+			msg, err := conn.CheckPreQueue()
+			checkPreQueue = time.After(PauseBetweenChecks)
+			if err != nil {
+				log.Println("Error checking preprocess queue", err)
+				continue
+			}
+			if msg.Handle == "" {
+				verboselog.Println("No message received on preprocess queue, sleeping")
+				continue
+			}
+			go preProcBook(msg, conn)
+		case <- checkOCRQueue:
+			msg, err := conn.CheckOCRQueue()
+			//checkOCRQueue = time.After(PauseBetweenChecks)
+			if err != nil {
+				log.Println("Error checking OCR queue", err)
+				continue
+			}
+			if msg.Handle == "" {
+				verboselog.Println("No message received on OCR queue, sleeping")
+				continue
+			}
+			go ocrBook(msg, conn)
 		}
 	}
 }
-- 
cgit v1.2.1-24-ge1ad