From 9f588a71e9a2d7ad179890d0fc19372fae047b04 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 20 Aug 2019 16:11:03 +0100 Subject: Add basic OCR support, and reorganise code The previously committed thing didn't work, as listobjects was sending to a channel synchronously, so it was never being received. The current API isn't great, mixing synchronous and non-synchronous things, not handling errors consistently, and generally is over complicated. That will be fixed soon. --- pipelinepreprocess/main.go | 259 +++++++++++++++++++++++++++++++++------------ 1 file changed, 192 insertions(+), 67 deletions(-) (limited to 'pipelinepreprocess/main.go') diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go index a223d0b..61dec96 100644 --- a/pipelinepreprocess/main.go +++ b/pipelinepreprocess/main.go @@ -8,7 +8,9 @@ package main import ( "log" "os" + "os/exec" "path/filepath" + "strings" "time" "rescribe.xyz/go.git/preproc" @@ -16,6 +18,8 @@ import ( const usage = "Usage: pipelinepreprocess [-v]\n\nContinuously checks the preprocess queue for books.\nWhen a book is found it's downloaded from the S3 inprogress bucket, preprocessed, and the results are uploaded to the S3 inprogress bucket. The book name is then added to the ocr queue, and removed from the preprocess queue.\n\n-v verbose\n" +const training = "rescribealphav5" // TODO: allow to set on cmdline + // null writer to enable non-verbose logging to be discarded type NullWriter bool func (w NullWriter) Write(p []byte) (n int, err error) { @@ -28,7 +32,8 @@ const PauseBetweenChecks = 60 * time.Second type Clouder interface { Init() error - ListObjects(bucket string, prefix string, names chan string) error + //ListObjects(bucket string, prefix string, names chan string) error + ListObjects(bucket string, prefix string, names chan string) Download(bucket string, key string, fn string) error Upload(bucket string, key string, path string) error CheckQueue(url string) (Qmsg, error) @@ -39,13 +44,20 @@ type Clouder interface { type Pipeliner interface { Clouder - ListInProgress(bookname string, names chan string) error + ListToPreprocess(bookname string, names chan string) error + ListToOCR(bookname string, names chan string) error DownloadFromInProgress(key string, fn string) error UploadToInProgress(key string, path string) error CheckPreQueue() (Qmsg, error) + CheckOCRQueue() (Qmsg, error) + CheckAnalyseQueue() (Qmsg, error) AddToOCRQueue(msg string) error + AddToAnalyseQueue(msg string) error DelFromPreQueue(handle string) error + DelFromOCRQueue(handle string) error PreQueueHeartbeat(t *time.Ticker, msgHandle string) error + OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error + Logger() *log.Logger } type Qmsg struct { @@ -64,11 +76,27 @@ func download(dl chan string, pre chan string, conn Pipeliner, dir string) { close(pre) } +func up(c chan string, done chan bool, conn Pipeliner, bookname string) { + for path := range c { + name := filepath.Base(path) + key := filepath.Join(bookname, name) + err := conn.UploadToInProgress(key, path) + if err != nil { + log.Fatalln("Failed to upload", path, err) + } + } + + done <- true +} + func preprocess(pre chan string, up chan string, logger *log.Logger) { for path := range pre { logger.Println("Preprocessing", path) done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30) if err != nil { + // TODO: have error channel to signal that things are screwy, which + // can close channels and stop the heartbeat, rather than just kill + // the whole program log.Fatalln("Error preprocessing", path, err) } for _, p := range done { @@ -78,17 +106,137 @@ func preprocess(pre chan string, up chan string, logger *log.Logger) { close(up) } -func up(c chan string, done chan bool, conn Pipeliner, bookname string) { - for path := range c { - name := filepath.Base(path) - key := filepath.Join(bookname, name) - err := conn.UploadToInProgress(key, path) +// TODO: use Tesseract API rather than calling the executable +func ocr(toocr chan string, up chan string, logger *log.Logger) { + for path := range toocr { + logger.Println("OCRing", path) + name := strings.Replace(path, ".png", "", 1) // TODO: handle any file extension + cmd := exec.Command("tesseract", "-l", training, path, name, "hocr") + err := cmd.Run() if err != nil { - log.Fatalln("Failed to upload", path, err) + // TODO: have error channel to signal that things are screwy, which + // can close channels and stop the heartbeat, rather than just kill + // the whole program + log.Fatalln("Error ocring", path, err) } + up <- name + ".hocr" } + close(up) +} - done <- true +func preProcBook(msg Qmsg, conn Pipeliner) { + bookname := msg.Body + + t := time.NewTicker(HeartbeatTime * time.Second) + go conn.PreQueueHeartbeat(t, msg.Handle) + + d := filepath.Join(os.TempDir(), bookname) + err := os.MkdirAll(d, 0755) + if err != nil { + log.Println("Failed to create directory", d, err) + t.Stop() + return + } + + dl := make(chan string) + pre := make(chan string) + upc := make(chan string) // TODO: rename + done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated + + // these functions will do their jobs when their channels have data + go download(dl, pre, conn, d) + go preprocess(pre, upc, conn.Logger()) + go up(upc, done, conn, bookname) + + conn.Logger().Println("Getting list of objects to download") + err = conn.ListToPreprocess(bookname, dl) + if err != nil { + log.Println("Failed to get list of files for book", bookname, err) + t.Stop() + return + } + + // wait for the done channel to be posted to + <-done + + conn.Logger().Println("Sending", bookname, "to OCR queue") + err = conn.AddToOCRQueue(bookname) + if err != nil { + log.Println("Error adding to ocr queue", bookname, err) + t.Stop() + return + } + + t.Stop() + + conn.Logger().Println("Deleting original message from preprocessing queue") + err = conn.DelFromPreQueue(msg.Handle) + if err != nil { + log.Println("Error deleting message from preprocessing queue", err) + } + + err = os.RemoveAll(d) + if err != nil { + log.Println("Failed to remove directory", d, err) + } +} + +func ocrBook(msg Qmsg, conn Pipeliner) { + bookname := msg.Body + + t := time.NewTicker(HeartbeatTime * time.Second) + go conn.OCRQueueHeartbeat(t, msg.Handle) + + d := filepath.Join(os.TempDir(), bookname) + err := os.MkdirAll(d, 0755) + if err != nil { + log.Println("Failed to create directory", d, err) + t.Stop() + return + } + + dl := make(chan string) + ocrc := make(chan string) + upc := make(chan string) // TODO: rename + done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated + + // these functions will do their jobs when their channels have data + go download(dl, ocrc, conn, d) + go ocr(ocrc, upc, conn.Logger()) + go up(upc, done, conn, bookname) + + conn.Logger().Println("Getting list of objects to download") + go conn.ListToOCR(bookname, dl) + //err = conn.ListToOCR(bookname, dl) + //if err != nil { + // log.Println("Failed to get list of files for book", bookname, err) + // t.Stop() + // return + //} + + // wait for the done channel to be posted to + <-done + + conn.Logger().Println("Sending", bookname, "to analyse queue") + err = conn.AddToAnalyseQueue(bookname) + if err != nil { + log.Println("Error adding to analyse queue", bookname, err) + t.Stop() + return + } + + t.Stop() + + conn.Logger().Println("Deleting original message from OCR queue") + err = conn.DelFromOCRQueue(msg.Handle) + if err != nil { + log.Println("Error deleting message from OCR queue", err) + } + + err = os.RemoveAll(d) + if err != nil { + log.Println("Failed to remove directory", d, err) + } } func main() { @@ -112,66 +260,43 @@ func main() { if err != nil { log.Fatalln("Error setting up cloud connection:", err) } + verboselog.Println("Finished setting up AWS session") - for { - msg, err := conn.CheckPreQueue() - if err != nil { - log.Fatalln("Error checking preprocess queue", err) - } - if msg.Handle == "" { - verboselog.Println("No message received, sleeping") - time.Sleep(PauseBetweenChecks) - continue - } - bookname := msg.Body - - t := time.NewTicker(HeartbeatTime * time.Second) - go conn.PreQueueHeartbeat(t, msg.Handle) - - - d := filepath.Join(os.TempDir(), bookname) - err = os.MkdirAll(d, 0755) - if err != nil { - log.Fatalln("Failed to create directory", d, err) - } - - dl := make(chan string) - pre := make(chan string) - upc := make(chan string) // TODO: rename - done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated - - // these functions will do their jobs when their channels have data - go download(dl, pre, conn, d) - go preprocess(pre, upc, verboselog) - go up(upc, done, conn, bookname) - - - verboselog.Println("Getting list of objects to download") - err = conn.ListInProgress(bookname, dl) - if err != nil { - log.Fatalln("Failed to get list of files for book", bookname, err) - } + var checkPreQueue <-chan time.Time + var checkOCRQueue <-chan time.Time + checkPreQueue = time.After(0) + checkOCRQueue = time.After(0) - // wait for the done channel to be posted to - <-done - - verboselog.Println("Sending", bookname, "to OCR queue") - err = conn.AddToOCRQueue(bookname) - if err != nil { - log.Fatalln("Error adding to ocr queue", bookname, err) - } - - t.Stop() - - verboselog.Println("Deleting original message from preprocessing queue") - err = conn.DelFromPreQueue(msg.Handle) - if err != nil { - log.Fatalln("Error deleting message from preprocessing queue", err) - } - - err = os.RemoveAll(d) - if err != nil { - log.Fatalln("Failed to remove directory", d, err) + // TODO: use a buffer or something to limit number of running processes + // could start preprocbook / ocrbook and just have them listen on + // channels for stuff to do, that way they'd do things one at a time + // TODO: don't trigger the checkOCRQueue until a running thing has finished + for { + select { + case <- checkPreQueue: + msg, err := conn.CheckPreQueue() + checkPreQueue = time.After(PauseBetweenChecks) + if err != nil { + log.Println("Error checking preprocess queue", err) + continue + } + if msg.Handle == "" { + verboselog.Println("No message received on preprocess queue, sleeping") + continue + } + go preProcBook(msg, conn) + case <- checkOCRQueue: + msg, err := conn.CheckOCRQueue() + //checkOCRQueue = time.After(PauseBetweenChecks) + if err != nil { + log.Println("Error checking OCR queue", err) + continue + } + if msg.Handle == "" { + verboselog.Println("No message received on OCR queue, sleeping") + continue + } + go ocrBook(msg, conn) } } } -- cgit v1.2.1-24-ge1ad