diff options
-rw-r--r-- | aws.go | 41 | ||||
-rw-r--r-- | cloudsettings.go | 1 | ||||
-rw-r--r-- | cmd/addtoqueue/main.go | 3 | ||||
-rw-r--r-- | cmd/bookpipeline/main.go | 31 | ||||
-rw-r--r-- | cmd/lspipeline/main.go | 2 | ||||
-rw-r--r-- | cmd/unstickocr/main.go | 118 |
6 files changed, 16 insertions, 180 deletions
@@ -40,16 +40,14 @@ type AwsConn struct { Region string Logger *log.Logger - // these are used internally - sess *session.Session - ec2svc *ec2.EC2 - s3svc *s3.S3 - sqssvc *sqs.SQS - downloader *s3manager.Downloader - uploader *s3manager.Uploader - wipequrl, prequrl, ocrqurl, analysequrl string - ocrpgqurl string - wipstorageid string + sess *session.Session + ec2svc *ec2.EC2 + s3svc *s3.S3 + sqssvc *sqs.SQS + downloader *s3manager.Downloader + uploader *s3manager.Uploader + wipequrl, prequrl, ocrpgqurl, analysequrl string + wipstorageid string } // MinimalInit does the bare minimum to initialise aws services @@ -105,14 +103,14 @@ func (a *AwsConn) Init() error { } a.wipequrl = *result.QueueUrl - a.Logger.Println("Getting OCR queue URL") + a.Logger.Println("Getting OCR Page queue URL") result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String(queueOcr), + QueueName: aws.String(queueOcrPage), }) if err != nil { - return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err)) + return errors.New(fmt.Sprintf("Error getting OCR Page queue URL: %s", err)) } - a.ocrqurl = *result.QueueUrl + a.ocrpgqurl = *result.QueueUrl a.Logger.Println("Getting analyse queue URL") result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ @@ -123,15 +121,6 @@ func (a *AwsConn) Init() error { } a.analysequrl = *result.QueueUrl - a.Logger.Println("Getting OCR Page queue URL") - result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String(queueOcrPage), - }) - if err != nil { - return errors.New(fmt.Sprintf("Error getting OCR Page queue URL: %s", err)) - } - a.ocrpgqurl = *result.QueueUrl - return nil } @@ -245,10 +234,6 @@ func (a *AwsConn) WipeQueueId() string { return a.wipequrl } -func (a *AwsConn) OCRQueueId() string { - return a.ocrqurl -} - func (a *AwsConn) OCRPageQueueId() string { return a.ocrpgqurl } @@ -467,7 +452,7 @@ func (a *AwsConn) Log(v ...interface{}) { // mkpipeline sets up necessary buckets and queues for the pipeline func (a *AwsConn) MkPipeline() error { buckets := []string{storageWip} - queues := []string{queuePreProc, queueWipeOnly, queueOcr, queueAnalyse, queueOcrPage} + queues := []string{queuePreProc, queueWipeOnly, queueAnalyse, queueOcrPage} for _, bucket := range buckets { err := a.CreateBucket(bucket) diff --git a/cloudsettings.go b/cloudsettings.go index d627342..0cf1777 100644 --- a/cloudsettings.go +++ b/cloudsettings.go @@ -19,7 +19,6 @@ const ( const ( queuePreProc = "rescribepreprocess" queueWipeOnly = "rescribewipeonly" - queueOcr = "rescribeocr" queueOcrPage = "rescribeocrpage" queueAnalyse = "rescribeanalyse" ) diff --git a/cmd/addtoqueue/main.go b/cmd/addtoqueue/main.go index 06edac5..c3284e9 100644 --- a/cmd/addtoqueue/main.go +++ b/cmd/addtoqueue/main.go @@ -21,7 +21,6 @@ This is handy to work around bugs when things are misbehaving. Valid queue names: - preprocess - wipeonly -- ocr - ocrpage - analyse ` @@ -38,7 +37,6 @@ type QueuePipeliner interface { AddToQueue(url string, msg string) error PreQueueId() string WipeQueueId() string - OCRQueueId() string OCRPageQueueId() string AnalyseQueueId() string } @@ -70,7 +68,6 @@ func main() { }{ {conn.PreQueueId(), "preprocess"}, {conn.WipeQueueId(), "wipeonly"}, - {conn.OCRQueueId(), "ocr"}, {conn.OCRPageQueueId(), "ocrpage"}, {conn.AnalyseQueueId(), "analyse"}, } diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 8d2ffcc..9059167 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -22,10 +22,10 @@ import ( "rescribe.xyz/utils/pkg/hocr" ) -const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-nop] [-na] [-t training] [-shutdown true/false] +const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] -Watches the preprocess, ocr and analyse queues for book names. When -one is found this general process is followed: +Watches the preprocess, wipeonly, ocrpage and analyse queues for messages. +When one is found this general process is followed: - The book name is hidden from the queue, and a 'heartbeat' is started which keeps it hidden (this will time out after 2 minutes @@ -66,7 +66,6 @@ type Pipeliner interface { Clouder PreQueueId() string WipeQueueId() string - OCRQueueId() string OCRPageQueueId() string AnalyseQueueId() string WIPStorageId() string @@ -672,7 +671,6 @@ func main() { training := flag.String("t", "rescribealphav5", "default tesseract training file to use (without the .traineddata part)") nopreproc := flag.Bool("np", false, "disable preprocessing") nowipe := flag.Bool("nw", false, "disable wipeonly") - noocr := flag.Bool("no", false, "disable ocr") noocrpg := flag.Bool("nop", false, "disable ocr on individual pages") noanalyse := flag.Bool("na", false, "disable analysis") autoshutdown := flag.Bool("shutdown", false, "automatically shut down if no work has been available for 5 minutes") @@ -693,7 +691,6 @@ func main() { origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`) - preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) ocredPattern := regexp.MustCompile(`.hocr$`) var conn Pipeliner @@ -711,7 +708,6 @@ func main() { var checkPreQueue <-chan time.Time var checkWipeQueue <-chan time.Time - var checkOCRQueue <-chan time.Time var checkOCRPageQueue <-chan time.Time var checkAnalyseQueue <-chan time.Time var shutdownIfQuiet *time.Timer @@ -722,9 +718,6 @@ func main() { if !*nowipe { checkWipeQueue = time.After(0) } - if !*noocr { - checkOCRQueue = time.After(0) - } if !*noocrpg { checkOCRPageQueue = time.After(0) } @@ -794,24 +787,6 @@ func main() { if err != nil { conn.Log("Error during OCR Page process", err) } - case <-checkOCRQueue: - msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatSeconds*2) - checkOCRQueue = time.After(PauseBetweenChecks) - if err != nil { - conn.Log("Error checking OCR queue", err) - continue - } - if msg.Handle == "" { - conn.Log("No message received on OCR queue, sleeping") - continue - } - stopTimer(shutdownIfQuiet) - conn.Log("Message received on OCR queue, processing", msg.Body) - err = processBook(msg, conn, ocr(*training), preprocessedPattern, conn.OCRQueueId(), conn.AnalyseQueueId()) - shutdownIfQuiet.Reset(TimeBeforeShutdown) - if err != nil { - conn.Log("Error during OCR process", err) - } case <-checkAnalyseQueue: msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatSeconds*2) checkAnalyseQueue = time.After(PauseBetweenChecks) diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index 659b034..ab2f8b1 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -30,7 +30,6 @@ type LsPipeliner interface { Init() error PreQueueId() string WipeQueueId() string - OCRQueueId() string OCRPageQueueId() string AnalyseQueueId() string GetQueueDetails(url string) (string, string, error) @@ -66,7 +65,6 @@ func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) { queues := []struct{ name, id string }{ {"preprocess", conn.PreQueueId()}, {"wipeonly", conn.WipeQueueId()}, - {"ocr", conn.OCRQueueId()}, {"ocrpage", conn.OCRPageQueueId()}, {"analyse", conn.AnalyseQueueId()}, } diff --git a/cmd/unstickocr/main.go b/cmd/unstickocr/main.go deleted file mode 100644 index 796525b..0000000 --- a/cmd/unstickocr/main.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2019 Nick White. -// Use of this source code is governed by the GPLv3 -// license that can be found in the LICENSE file. - -package main - -import ( - "flag" - "fmt" - "log" - "os" - "time" - - "rescribe.xyz/bookpipeline" -) - -const usage = `Usage: unstickocr [-v] bookname - -unstickocr deletes a book from the OCR queue and adds it to the -Analyse queue. - -This should be done automatically by the bookpipeline tool once -the OCR job has completed, but sometimes it isn't, because of a -nasty bug. Once that bug is squashed, this tool can be deleted. -` - -// null writer to enable non-verbose logging to be discarded -type NullWriter bool - -func (w NullWriter) Write(p []byte) (n int, err error) { - return len(p), nil -} - -type UnstickPipeliner interface { - Init() error - CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) - AddToQueue(url string, msg string) error - DelFromQueue(url string, handle string) error - OCRQueueId() string - AnalyseQueueId() string -} - -func main() { - verbose := flag.Bool("v", false, "verbose") - flag.Usage = func() { - fmt.Fprintf(flag.CommandLine.Output(), usage) - flag.PrintDefaults() - } - flag.Parse() - - if flag.NArg() != 1 { - flag.Usage() - return - } - - var verboselog *log.Logger - if *verbose { - verboselog = log.New(os.Stdout, "", 0) - } else { - var n NullWriter - verboselog = log.New(n, "", 0) - } - - var conn UnstickPipeliner - conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} - - err := conn.Init() - if err != nil { - log.Fatalln("Error setting up cloud connection:", err) - } - - book := flag.Arg(0) - done := false - - for a := 0; a < 5; a++ { - for i := 0; i < 10; i++ { - verboselog.Println("Checking OCR queue for", book) - msg, err := conn.CheckQueue(conn.OCRQueueId(), 10) - if err != nil { - log.Fatalln("Error checking OCR queue:", err) - continue - } - if msg.Handle == "" { - verboselog.Println("No message received on OCR queue") - continue - } - if msg.Body != book { - verboselog.Println("Message received on OCR queue is not the one we're", - "looking for, so will try again - found", msg.Body) - continue - } - err = conn.DelFromQueue(conn.OCRQueueId(), msg.Handle) - if err != nil { - log.Fatalln("Error deleting message from OCR queue:", err) - } - err = conn.AddToQueue(conn.AnalyseQueueId(), book) - if err != nil { - log.Fatalln("Error adding message to Analyse queue:", err) - } - done = true - break - } - if done == true { - break - } - log.Println("No message found yet, sleeping for 30 seconds to try again") - time.Sleep(30 * time.Minute) - } - - if done == true { - fmt.Println("Succeeded moving message from OCR queue to Analyse queue.") - } else { - log.Fatalln("Failed to find message", book, "on OCR queue; is it still being processed?", - "It can only be discovered and processed by this tool when it is available.", - "Try shutting down any instance that is using it, waiting a few minutes,", - "and rerunning this tool.") - } -} |