From 7693065b8b8a9eff1f17dc993ae1235984872e09 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 19 Nov 2019 14:47:41 +0000 Subject: Add ocrpage queue for processing individual pages This should be a good way to get around the ongoing heartbeat issue, as individual page jobs will never come close to a the 12 hour mark that can cause the bug. The OCR page processing is done and working now, still to do is to populate the queue (rather than the ocr queue) after preprocessing / wiping. --- aws.go | 14 +++++ cmd/bookpipeline/main.go | 140 ++++++++++++++++++++++++++++++++++++++++++++++- cmd/lspipeline/main.go | 2 + cmd/mkpipeline/main.go | 2 +- 4 files changed, 156 insertions(+), 2 deletions(-) diff --git a/aws.go b/aws.go index 73f3b2f..f5ac338 100644 --- a/aws.go +++ b/aws.go @@ -44,6 +44,7 @@ type AwsConn struct { downloader *s3manager.Downloader uploader *s3manager.Uploader wipequrl, prequrl, ocrqurl, analysequrl string + ocrpgqurl string wipstorageid string } @@ -105,6 +106,15 @@ func (a *AwsConn) Init() error { } a.analysequrl = *result.QueueUrl + a.Logger.Println("Getting OCR Page queue URL") + result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String("rescribeocrpage"), + }) + if err != nil { + return errors.New(fmt.Sprintf("Error getting OCR Page queue URL: %s", err)) + } + a.ocrpgqurl = *result.QueueUrl + a.wipstorageid = "rescribeinprogress" return nil @@ -224,6 +234,10 @@ func (a *AwsConn) OCRQueueId() string { return a.ocrqurl } +func (a *AwsConn) OCRPageQueueId() string { + return a.ocrpgqurl +} + func (a *AwsConn) AnalyseQueueId() string { return a.analysequrl } diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 0ed0d67..d70dbc7 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -17,7 +17,7 @@ import ( "rescribe.xyz/utils/pkg/hocr" ) -const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-na] [-t training] +const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-nop] [-na] [-t training] Watches the preprocess, ocr and analyse queues for book names. When one is found this general process is followed: @@ -60,6 +60,7 @@ type Pipeliner interface { PreQueueId() string WipeQueueId() string OCRQueueId() string + OCRPageQueueId() string AnalyseQueueId() string WIPStorageId() string GetLogger() *log.Logger @@ -277,6 +278,122 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri } } +// allOCRed checks whether all pages of a book have been OCRed. +// This is determined by whether every _bin0.?.png file has a +// corresponding .hocr file. +func allOCRed(bookname string, conn Pipeliner) bool { + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + return false + } + + // Full wipePattern can match things like 0000.png which getgbook + // can emit but aren't ocr-able + //wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`) + wipePattern := regexp.MustCompile(`[0-9]{6}(.bin)?.png$`) + preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) + + atleastone := false + for _, png := range objs { + if wipePattern.MatchString(png) || preprocessedPattern.MatchString(png) { + atleastone = true + found := false + b := strings.TrimSuffix(filepath.Base(png), ".png") + hocrname := bookname + "/" + b + ".hocr" + for _, hocr := range objs { + if hocr == hocrname { + found = true + break + } + } + if found == false { + return false + } + } + } + if atleastone == false { + return false + } + return true +} + +// ocrPage OCRs a page based on a message. It may make sense to +// roll this back into processBook (on which it is based) once +// working well. +func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { + dl := make(chan string) + msgc := make(chan bookpipeline.Qmsg) + processc := make(chan string) + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) + + bookname := filepath.Dir(msg.Body) + + d := filepath.Join(os.TempDir(), bookname) + err := os.MkdirAll(d, 0755) + if err != nil { + return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err)) + } + + t := time.NewTicker(HeartbeatTime * time.Second) + go heartbeat(conn, t, msg, fromQueue, msgc, errc) + + // these functions will do their jobs when their channels have data + go download(dl, processc, conn, d, errc, conn.GetLogger()) + go process(processc, upc, errc, conn.GetLogger()) + go up(upc, done, conn, bookname, errc, conn.GetLogger()) + + dl <- msg.Body + close(dl) + + // wait for either the done or errc channel to be sent to + select { + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + return err + case <-done: + } + + if allOCRed(bookname, conn) && toQueue != "" { + conn.GetLogger().Println("Sending", bookname, "to queue", toQueue) + err = conn.AddToQueue(toQueue, bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err)) + } + } + + t.Stop() + + // check whether we're using a newer msg handle + select { + case m, ok := <-msgc: + if ok { + msg = m + conn.GetLogger().Println("Using new message handle to delete message from old queue") + } + default: + conn.GetLogger().Println("Using original message handle to delete message from old queue") + } + + conn.GetLogger().Println("Deleting original message from queue", fromQueue) + err = conn.DelFromQueue(fromQueue, msg.Handle) + if err != nil { + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err)) + } + + err = os.RemoveAll(d) + if err != nil { + return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err)) + } + + return nil +} + func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { dl := make(chan string) msgc := make(chan bookpipeline.Qmsg) @@ -374,6 +491,7 @@ func main() { nopreproc := flag.Bool("np", false, "disable preprocessing") nowipe := flag.Bool("nw", false, "disable wipeonly") noocr := flag.Bool("no", false, "disable ocr") + noocrpg := flag.Bool("nop", false, "disable ocr on individual pages") noanalyse := flag.Bool("na", false, "disable analysis") flag.Usage = func() { @@ -408,6 +526,7 @@ func main() { var checkPreQueue <-chan time.Time var checkWipeQueue <-chan time.Time var checkOCRQueue <-chan time.Time + var checkOCRPageQueue <-chan time.Time var checkAnalyseQueue <-chan time.Time if !*nopreproc { checkPreQueue = time.After(0) @@ -418,6 +537,9 @@ func main() { if !*noocr { checkOCRQueue = time.After(0) } + if !*noocrpg { + checkOCRPageQueue = time.After(0) + } if !*noanalyse { checkAnalyseQueue = time.After(0) } @@ -456,6 +578,22 @@ func main() { if err != nil { log.Println("Error during wipe", err) } + case <-checkOCRPageQueue: + msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatTime*2) + checkOCRPageQueue = time.After(PauseBetweenChecks) + if err != nil { + log.Println("Error checking OCR Page queue", err) + continue + } + if msg.Handle == "" { + verboselog.Println("No message received on OCR Page queue, sleeping") + continue + } + verboselog.Println("Message received on OCR Page queue, processing", msg.Body) + err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + if err != nil { + log.Println("Error during OCR Page process", err) + } case <-checkOCRQueue: msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime*2) checkOCRQueue = time.After(PauseBetweenChecks) diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index 0b8ce49..a32d851 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -28,6 +28,7 @@ type LsPipeliner interface { PreQueueId() string WipeQueueId() string OCRQueueId() string + OCRPageQueueId() string AnalyseQueueId() string GetQueueDetails(url string) (string, string, error) GetInstanceDetails() ([]bookpipeline.InstanceDetails, error) @@ -62,6 +63,7 @@ func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) { {"preprocess", conn.PreQueueId()}, {"wipeonly", conn.WipeQueueId()}, {"ocr", conn.OCRQueueId()}, + {"ocrpage", conn.OCRPageQueueId()}, {"analyse", conn.AnalyseQueueId()}, } for _, q := range queues { diff --git a/cmd/mkpipeline/main.go b/cmd/mkpipeline/main.go index e37a56d..a32526a 100644 --- a/cmd/mkpipeline/main.go +++ b/cmd/mkpipeline/main.go @@ -34,7 +34,7 @@ func main() { prefix := "rescribe" buckets := []string{"inprogress", "done"} - queues := []string{"preprocess", "wipeonly", "ocr", "analyse"} + queues := []string{"preprocess", "wipeonly", "ocr", "analyse", "ocrpage"} for _, bucket := range buckets { bname := prefix + bucket -- cgit v1.2.1-24-ge1ad