From 94352c6124aed2d85cfb21e83c0dafb6f918fb7d Mon Sep 17 00:00:00 2001 From: Nick White Date: Wed, 2 Oct 2019 12:32:59 +0100 Subject: Add wipeonly queue and functionality This is useful for prebinarised images, which don't need full preprocessing, but do require wiping, albeit with a more conservative threshold. --- bookpipeline/aws.go | 29 ++++++++++++++++------- bookpipeline/cmd/bookpipeline/main.go | 43 +++++++++++++++++++++++++++++++++- bookpipeline/cmd/lspipeline/main.go | 2 ++ bookpipeline/cmd/mkpipeline/main.go | 2 +- preproc/wipesides.go | 44 ++++++++++++++++++++++++++++++++++- 5 files changed, 109 insertions(+), 11 deletions(-) diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index d4cd306..0127d6e 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -33,14 +33,14 @@ type AwsConn struct { Logger *log.Logger // these are used internally - sess *session.Session - ec2svc *ec2.EC2 - s3svc *s3.S3 - sqssvc *sqs.SQS - downloader *s3manager.Downloader - uploader *s3manager.Uploader - prequrl, ocrqurl, analysequrl string - wipstorageid string + sess *session.Session + ec2svc *ec2.EC2 + s3svc *s3.S3 + sqssvc *sqs.SQS + downloader *s3manager.Downloader + uploader *s3manager.Uploader + wipequrl, prequrl, ocrqurl, analysequrl string + wipstorageid string } // TODO: split this up, as not everything is needed for different uses @@ -74,6 +74,15 @@ func (a *AwsConn) Init() error { } a.prequrl = *result.QueueUrl + a.Logger.Println("Getting wipeonly queue URL") + result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String("rescribewipeonly"), + }) + if err != nil { + return errors.New(fmt.Sprintf("Error getting wipeonly queue URL: %s", err)) + } + a.wipequrl = *result.QueueUrl + a.Logger.Println("Getting OCR queue URL") result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String("rescribeocr"), @@ -192,6 +201,10 @@ func (a *AwsConn) PreQueueId() string { return a.prequrl } +func (a *AwsConn) WipeQueueId() string { + return a.wipequrl +} + func (a *AwsConn) OCRQueueId() string { return a.ocrqurl } diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go index de9b072..7d04e3a 100644 --- a/bookpipeline/cmd/bookpipeline/main.go +++ b/bookpipeline/cmd/bookpipeline/main.go @@ -19,7 +19,7 @@ import ( "rescribe.xyz/go.git/preproc" ) -const usage = `Usage: bookpipeline [-v] [-np] [-no] [-na] [-t training] +const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-na] [-t training] Watches the preprocess, ocr and analyse queues for book names. When one is found this general process is followed: @@ -60,6 +60,7 @@ type Clouder interface { type Pipeliner interface { Clouder PreQueueId() string + WipeQueueId() string OCRQueueId() string AnalyseQueueId() string WIPStorageId() string @@ -118,6 +119,25 @@ func preprocess(pre chan string, up chan string, errc chan error, logger *log.Lo close(up) } +func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range towipe { + logger.Println("Wiping", path) + s := strings.Split(path, ".") + base := strings.Join(s[:len(s)-1], "") + outpath := base + "_bin0.0.png" + err := preproc.WipeFile(path, outpath, 5, 0.03, 30) + if err != nil { + for range towipe { + } // consume the rest of the receiving channel so it isn't blocked + close(up) + errc <- err + return + } + up <- outpath + } + close(up) +} + func ocr(training string) func(chan string, chan string, chan error, *log.Logger) { return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { for path := range toocr { @@ -347,6 +367,7 @@ func main() { verbose := flag.Bool("v", false, "verbose") training := flag.String("t", "rescribealphav5", "tesseract training file to use") nopreproc := flag.Bool("np", false, "disable preprocessing") + nowipe := flag.Bool("nw", false, "disable wipeonly") noocr := flag.Bool("no", false, "disable ocr") noanalyse := flag.Bool("na", false, "disable analysis") @@ -379,11 +400,15 @@ func main() { verboselog.Println("Finished setting up AWS session") var checkPreQueue <-chan time.Time + var checkWipeQueue <-chan time.Time var checkOCRQueue <-chan time.Time var checkAnalyseQueue <-chan time.Time if !*nopreproc { checkPreQueue = time.After(0) } + if !*nowipe { + checkWipeQueue = time.After(0) + } if !*noocr { checkOCRQueue = time.After(0) } @@ -409,6 +434,22 @@ func main() { if err != nil { log.Println("Error during preprocess", err) } + case <-checkWipeQueue: + msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatTime*2) + checkWipeQueue = time.After(PauseBetweenChecks) + if err != nil { + log.Println("Error checking wipeonly queue", err) + continue + } + if msg.Handle == "" { + verboselog.Println("No message received on wipeonly queue, sleeping") + continue + } + verboselog.Println("Message received on wipeonly queue, processing", msg.Body) + err = processBook(msg, conn, wipe, origPattern, conn.WipeQueueId(), conn.OCRQueueId()) + if err != nil { + log.Println("Error during wipe", err) + } case <-checkOCRQueue: msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime*2) checkOCRQueue = time.After(PauseBetweenChecks) diff --git a/bookpipeline/cmd/lspipeline/main.go b/bookpipeline/cmd/lspipeline/main.go index 3e9df61..46a1d63 100644 --- a/bookpipeline/cmd/lspipeline/main.go +++ b/bookpipeline/cmd/lspipeline/main.go @@ -24,6 +24,7 @@ Lists useful things related to the pipeline. type LsPipeliner interface { Init() error PreQueueId() string + WipeQueueId() string OCRQueueId() string AnalyseQueueId() string GetQueueDetails(url string) (string, string, error) @@ -57,6 +58,7 @@ func getInstances(conn LsPipeliner, detailsc chan bookpipeline.InstanceDetails) func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) { queues := []struct{ name, id string }{ {"preprocess", conn.PreQueueId()}, + {"wipeonly", conn.WipeQueueId()}, {"ocr", conn.OCRQueueId()}, {"analyse", conn.AnalyseQueueId()}, } diff --git a/bookpipeline/cmd/mkpipeline/main.go b/bookpipeline/cmd/mkpipeline/main.go index 970543e..e37a56d 100644 --- a/bookpipeline/cmd/mkpipeline/main.go +++ b/bookpipeline/cmd/mkpipeline/main.go @@ -34,7 +34,7 @@ func main() { prefix := "rescribe" buckets := []string{"inprogress", "done"} - queues := []string{"preprocess", "ocr", "analyse"} + queues := []string{"preprocess", "wipeonly", "ocr", "analyse"} for _, bucket := range buckets { bname := prefix + bucket diff --git a/preproc/wipesides.go b/preproc/wipesides.go index 24fb7bd..1b73791 100644 --- a/preproc/wipesides.go +++ b/preproc/wipesides.go @@ -4,8 +4,14 @@ package preproc // TODO: switch to an interface rather than integralimg.I import ( + "errors" + "fmt" "image" "image/color" + "image/draw" + _ "image/jpeg" + "image/png" + "os" "rescribe.xyz/go.git/integralimg" ) @@ -106,7 +112,7 @@ func toonarrow(img *image.Gray, lowedge int, highedge int, min int) bool { return false } -// wipe fills the sections of image which fall outside the content +// Wipe fills the sections of image which fall outside the content // area with white, providing the content area is above min % func Wipe(img *image.Gray, wsize int, thresh float64, min int) *image.Gray { integral := integralimg.ToIntegralImg(img) @@ -116,3 +122,39 @@ func Wipe(img *image.Gray, wsize int, thresh float64, min int) *image.Gray { } return wipesides(img, lowedge, highedge) } + +// WipeFile wipes an image file, filling the sections of the image +// which fall outside the content area with white, providing the +// content area is above min %. +// inPath: path of the input image. +// outPath: path to save the output image. +// wsize: window size for wipe algorithm. +// thresh: threshold for wipe algorithm. +// min: minimum % of content area width to consider valid. +func WipeFile(inPath string, outPath string, wsize int, thresh float64, min int) error { + f, err := os.Open(inPath) + defer f.Close() + if err != nil { + return errors.New(fmt.Sprintf("Could not open file %s: %v", inPath, err)) + } + img, _, err := image.Decode(f) + if err != nil { + return errors.New(fmt.Sprintf("Could not decode image: %v", err)) + } + b := img.Bounds() + gray := image.NewGray(image.Rect(0, 0, b.Dx(), b.Dy())) + draw.Draw(gray, b, img, b.Min, draw.Src) + + clean := Wipe(gray, wsize, thresh, min) + + f, err = os.Create(outPath) + if err != nil { + return errors.New(fmt.Sprintf("Could not create file %s: %v", outPath, err)) + } + defer f.Close() + err = png.Encode(f, clean) + if err != nil { + return errors.New(fmt.Sprintf("Could not encode image: %v", err)) + } + return nil +} -- cgit v1.2.1-24-ge1ad