diff options
-rw-r--r-- | aws.go | 14 | ||||
-rw-r--r-- | cmd/bookpipeline/main.go | 182 | ||||
-rw-r--r-- | cmd/lspipeline/main.go | 2 | ||||
-rw-r--r-- | cmd/mkpipeline/main.go | 2 |
4 files changed, 192 insertions, 8 deletions
@@ -44,6 +44,7 @@ type AwsConn struct { downloader *s3manager.Downloader uploader *s3manager.Uploader wipequrl, prequrl, ocrqurl, analysequrl string + ocrpgqurl string wipstorageid string } @@ -105,6 +106,15 @@ func (a *AwsConn) Init() error { } a.analysequrl = *result.QueueUrl + a.Logger.Println("Getting OCR Page queue URL") + result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String("rescribeocrpage"), + }) + if err != nil { + return errors.New(fmt.Sprintf("Error getting OCR Page queue URL: %s", err)) + } + a.ocrpgqurl = *result.QueueUrl + a.wipstorageid = "rescribeinprogress" return nil @@ -224,6 +234,10 @@ func (a *AwsConn) OCRQueueId() string { return a.ocrqurl } +func (a *AwsConn) OCRPageQueueId() string { + return a.ocrpgqurl +} + func (a *AwsConn) AnalyseQueueId() string { return a.analysequrl } diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 0ed0d67..3a539c1 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -17,7 +17,7 @@ import ( "rescribe.xyz/utils/pkg/hocr" ) -const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-na] [-t training] +const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-nop] [-na] [-t training] Watches the preprocess, ocr and analyse queues for book names. When one is found this general process is followed: @@ -35,6 +35,7 @@ one is found this general process is followed: ` const PauseBetweenChecks = 3 * time.Minute +const PauseBetweenOCRPageChecks = 1 * time.Second const HeartbeatTime = 60 // null writer to enable non-verbose logging to be discarded @@ -60,6 +61,7 @@ type Pipeliner interface { PreQueueId() string WipeQueueId() string OCRQueueId() string + OCRPageQueueId() string AnalyseQueueId() string WIPStorageId() string GetLogger() *log.Logger @@ -99,6 +101,31 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha done <- true } +func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { + for path := range c { + name := filepath.Base(path) + key := filepath.Join(bookname, name) + logger.Println("Uploading", key) + err := conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + conn.GetLogger().Println("Adding", key, "to queue", toQueue) + err = conn.AddToQueue(toQueue, key) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + } + + done <- true +} + func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { for path := range pre { logger.Println("Preprocessing", path) @@ -277,6 +304,122 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri } } +// allOCRed checks whether all pages of a book have been OCRed. +// This is determined by whether every _bin0.?.png file has a +// corresponding .hocr file. +func allOCRed(bookname string, conn Pipeliner) bool { + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + return false + } + + // Full wipePattern can match things like 0000.png which getgbook + // can emit but aren't ocr-able + //wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`) + wipePattern := regexp.MustCompile(`[0-9]{6}(.bin)?.png$`) + preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) + + atleastone := false + for _, png := range objs { + if wipePattern.MatchString(png) || preprocessedPattern.MatchString(png) { + atleastone = true + found := false + b := strings.TrimSuffix(filepath.Base(png), ".png") + hocrname := bookname + "/" + b + ".hocr" + for _, hocr := range objs { + if hocr == hocrname { + found = true + break + } + } + if found == false { + return false + } + } + } + if atleastone == false { + return false + } + return true +} + +// ocrPage OCRs a page based on a message. It may make sense to +// roll this back into processBook (on which it is based) once +// working well. +func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { + dl := make(chan string) + msgc := make(chan bookpipeline.Qmsg) + processc := make(chan string) + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) + + bookname := filepath.Dir(msg.Body) + + d := filepath.Join(os.TempDir(), bookname) + err := os.MkdirAll(d, 0755) + if err != nil { + return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err)) + } + + t := time.NewTicker(HeartbeatTime * time.Second) + go heartbeat(conn, t, msg, fromQueue, msgc, errc) + + // these functions will do their jobs when their channels have data + go download(dl, processc, conn, d, errc, conn.GetLogger()) + go process(processc, upc, errc, conn.GetLogger()) + go up(upc, done, conn, bookname, errc, conn.GetLogger()) + + dl <- msg.Body + close(dl) + + // wait for either the done or errc channel to be sent to + select { + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + return err + case <-done: + } + + if allOCRed(bookname, conn) && toQueue != "" { + conn.GetLogger().Println("Sending", bookname, "to queue", toQueue) + err = conn.AddToQueue(toQueue, bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err)) + } + } + + t.Stop() + + // check whether we're using a newer msg handle + select { + case m, ok := <-msgc: + if ok { + msg = m + conn.GetLogger().Println("Using new message handle to delete message from queue") + } + default: + conn.GetLogger().Println("Using original message handle to delete message from queue") + } + + conn.GetLogger().Println("Deleting original message from queue", fromQueue) + err = conn.DelFromQueue(fromQueue, msg.Handle) + if err != nil { + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err)) + } + + err = os.RemoveAll(d) + if err != nil { + return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err)) + } + + return nil +} + func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { dl := make(chan string) msgc := make(chan bookpipeline.Qmsg) @@ -299,7 +442,11 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string // these functions will do their jobs when their channels have data go download(dl, processc, conn, d, errc, conn.GetLogger()) go process(processc, upc, errc, conn.GetLogger()) - go up(upc, done, conn, bookname, errc, conn.GetLogger()) + if toQueue == conn.OCRPageQueueId() { + go upAndQueue(upc, done, toQueue, conn, bookname, errc, conn.GetLogger()) + } else { + go up(upc, done, conn, bookname, errc, conn.GetLogger()) + } conn.GetLogger().Println("Getting list of objects to download") objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) @@ -330,7 +477,8 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string case <-done: } - if toQueue != "" { + if toQueue != "" && toQueue != conn.OCRPageQueueId() { + go upAndQueue(upc, done, toQueue, conn, bookname, errc, conn.GetLogger()) conn.GetLogger().Println("Sending", bookname, "to queue", toQueue) err = conn.AddToQueue(toQueue, bookname) if err != nil { @@ -347,10 +495,10 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string case m, ok := <-msgc: if ok { msg = m - conn.GetLogger().Println("Using new message handle to delete message from old queue") + conn.GetLogger().Println("Using new message handle to delete message from queue") } default: - conn.GetLogger().Println("Using original message handle to delete message from old queue") + conn.GetLogger().Println("Using original message handle to delete message from queue") } conn.GetLogger().Println("Deleting original message from queue", fromQueue) @@ -374,6 +522,7 @@ func main() { nopreproc := flag.Bool("np", false, "disable preprocessing") nowipe := flag.Bool("nw", false, "disable wipeonly") noocr := flag.Bool("no", false, "disable ocr") + noocrpg := flag.Bool("nop", false, "disable ocr on individual pages") noanalyse := flag.Bool("na", false, "disable analysis") flag.Usage = func() { @@ -408,6 +557,7 @@ func main() { var checkPreQueue <-chan time.Time var checkWipeQueue <-chan time.Time var checkOCRQueue <-chan time.Time + var checkOCRPageQueue <-chan time.Time var checkAnalyseQueue <-chan time.Time if !*nopreproc { checkPreQueue = time.After(0) @@ -418,6 +568,9 @@ func main() { if !*noocr { checkOCRQueue = time.After(0) } + if !*noocrpg { + checkOCRPageQueue = time.After(0) + } if !*noanalyse { checkAnalyseQueue = time.After(0) } @@ -436,7 +589,7 @@ func main() { continue } verboselog.Println("Message received on preprocess queue, processing", msg.Body) - err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId()) + err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) if err != nil { log.Println("Error during preprocess", err) } @@ -452,10 +605,25 @@ func main() { continue } verboselog.Println("Message received on wipeonly queue, processing", msg.Body) - err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRQueueId()) + err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) if err != nil { log.Println("Error during wipe", err) } + case <-checkOCRPageQueue: + msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatTime*2) + checkOCRPageQueue = time.After(PauseBetweenOCRPageChecks) + if err != nil { + log.Println("Error checking OCR Page queue", err) + continue + } + if msg.Handle == "" { + continue + } + verboselog.Println("Message received on OCR Page queue, processing", msg.Body) + err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + if err != nil { + log.Println("Error during OCR Page process", err) + } case <-checkOCRQueue: msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime*2) checkOCRQueue = time.After(PauseBetweenChecks) diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index 0b8ce49..a32d851 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -28,6 +28,7 @@ type LsPipeliner interface { PreQueueId() string WipeQueueId() string OCRQueueId() string + OCRPageQueueId() string AnalyseQueueId() string GetQueueDetails(url string) (string, string, error) GetInstanceDetails() ([]bookpipeline.InstanceDetails, error) @@ -62,6 +63,7 @@ func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) { {"preprocess", conn.PreQueueId()}, {"wipeonly", conn.WipeQueueId()}, {"ocr", conn.OCRQueueId()}, + {"ocrpage", conn.OCRPageQueueId()}, {"analyse", conn.AnalyseQueueId()}, } for _, q := range queues { diff --git a/cmd/mkpipeline/main.go b/cmd/mkpipeline/main.go index e37a56d..a32526a 100644 --- a/cmd/mkpipeline/main.go +++ b/cmd/mkpipeline/main.go @@ -34,7 +34,7 @@ func main() { prefix := "rescribe" buckets := []string{"inprogress", "done"} - queues := []string{"preprocess", "wipeonly", "ocr", "analyse"} + queues := []string{"preprocess", "wipeonly", "ocr", "analyse", "ocrpage"} for _, bucket := range buckets { bname := prefix + bucket |