From 94352c6124aed2d85cfb21e83c0dafb6f918fb7d Mon Sep 17 00:00:00 2001
From: Nick White <git@njw.name>
Date: Wed, 2 Oct 2019 12:32:59 +0100
Subject: Add wipeonly queue and functionality

This is useful for prebinarised images, which don't need full preprocessing,
but do require wiping, albeit with a more conservative threshold.
---
 bookpipeline/aws.go                   | 29 ++++++++++++++++-------
 bookpipeline/cmd/bookpipeline/main.go | 43 +++++++++++++++++++++++++++++++++-
 bookpipeline/cmd/lspipeline/main.go   |  2 ++
 bookpipeline/cmd/mkpipeline/main.go   |  2 +-
 preproc/wipesides.go                  | 44 ++++++++++++++++++++++++++++++++++-
 5 files changed, 109 insertions(+), 11 deletions(-)

diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index d4cd306..0127d6e 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -33,14 +33,14 @@ type AwsConn struct {
 	Logger *log.Logger
 
 	// these are used internally
-	sess                          *session.Session
-	ec2svc                        *ec2.EC2
-	s3svc                         *s3.S3
-	sqssvc                        *sqs.SQS
-	downloader                    *s3manager.Downloader
-	uploader                      *s3manager.Uploader
-	prequrl, ocrqurl, analysequrl string
-	wipstorageid                  string
+	sess                                    *session.Session
+	ec2svc                                  *ec2.EC2
+	s3svc                                   *s3.S3
+	sqssvc                                  *sqs.SQS
+	downloader                              *s3manager.Downloader
+	uploader                                *s3manager.Uploader
+	wipequrl, prequrl, ocrqurl, analysequrl string
+	wipstorageid                            string
 }
 
 // TODO: split this up, as not everything is needed for different uses
@@ -74,6 +74,15 @@ func (a *AwsConn) Init() error {
 	}
 	a.prequrl = *result.QueueUrl
 
+	a.Logger.Println("Getting wipeonly queue URL")
+	result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+		QueueName: aws.String("rescribewipeonly"),
+	})
+	if err != nil {
+		return errors.New(fmt.Sprintf("Error getting wipeonly queue URL: %s", err))
+	}
+	a.wipequrl = *result.QueueUrl
+
 	a.Logger.Println("Getting OCR queue URL")
 	result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
 		QueueName: aws.String("rescribeocr"),
@@ -192,6 +201,10 @@ func (a *AwsConn) PreQueueId() string {
 	return a.prequrl
 }
 
+func (a *AwsConn) WipeQueueId() string {
+	return a.wipequrl
+}
+
 func (a *AwsConn) OCRQueueId() string {
 	return a.ocrqurl
 }
diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go
index de9b072..7d04e3a 100644
--- a/bookpipeline/cmd/bookpipeline/main.go
+++ b/bookpipeline/cmd/bookpipeline/main.go
@@ -19,7 +19,7 @@ import (
 	"rescribe.xyz/go.git/preproc"
 )
 
-const usage = `Usage: bookpipeline [-v] [-np] [-no] [-na] [-t training]
+const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-na] [-t training]
 
 Watches the preprocess, ocr and analyse queues for book names. When
 one is found this general process is followed:
@@ -60,6 +60,7 @@ type Clouder interface {
 type Pipeliner interface {
 	Clouder
 	PreQueueId() string
+	WipeQueueId() string
 	OCRQueueId() string
 	AnalyseQueueId() string
 	WIPStorageId() string
@@ -118,6 +119,25 @@ func preprocess(pre chan string, up chan string, errc chan error, logger *log.Lo
 	close(up)
 }
 
+func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) {
+	for path := range towipe {
+		logger.Println("Wiping", path)
+		s := strings.Split(path, ".")
+		base := strings.Join(s[:len(s)-1], "")
+		outpath := base + "_bin0.0.png"
+		err := preproc.WipeFile(path, outpath, 5, 0.03, 30)
+		if err != nil {
+			for range towipe {
+			} // consume the rest of the receiving channel so it isn't blocked
+			close(up)
+			errc <- err
+			return
+		}
+		up <- outpath
+	}
+	close(up)
+}
+
 func ocr(training string) func(chan string, chan string, chan error, *log.Logger) {
 	return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) {
 		for path := range toocr {
@@ -347,6 +367,7 @@ func main() {
 	verbose := flag.Bool("v", false, "verbose")
 	training := flag.String("t", "rescribealphav5", "tesseract training file to use")
 	nopreproc := flag.Bool("np", false, "disable preprocessing")
+	nowipe := flag.Bool("nw", false, "disable wipeonly")
 	noocr := flag.Bool("no", false, "disable ocr")
 	noanalyse := flag.Bool("na", false, "disable analysis")
 
@@ -379,11 +400,15 @@ func main() {
 	verboselog.Println("Finished setting up AWS session")
 
 	var checkPreQueue <-chan time.Time
+	var checkWipeQueue <-chan time.Time
 	var checkOCRQueue <-chan time.Time
 	var checkAnalyseQueue <-chan time.Time
 	if !*nopreproc {
 		checkPreQueue = time.After(0)
 	}
+	if !*nowipe {
+		checkWipeQueue = time.After(0)
+	}
 	if !*noocr {
 		checkOCRQueue = time.After(0)
 	}
@@ -409,6 +434,22 @@ func main() {
 			if err != nil {
 				log.Println("Error during preprocess", err)
 			}
+		case <-checkWipeQueue:
+			msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatTime*2)
+			checkWipeQueue = time.After(PauseBetweenChecks)
+			if err != nil {
+				log.Println("Error checking wipeonly queue", err)
+				continue
+			}
+			if msg.Handle == "" {
+				verboselog.Println("No message received on wipeonly queue, sleeping")
+				continue
+			}
+			verboselog.Println("Message received on wipeonly queue, processing", msg.Body)
+			err = processBook(msg, conn, wipe, origPattern, conn.WipeQueueId(), conn.OCRQueueId())
+			if err != nil {
+				log.Println("Error during wipe", err)
+			}
 		case <-checkOCRQueue:
 			msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime*2)
 			checkOCRQueue = time.After(PauseBetweenChecks)
diff --git a/bookpipeline/cmd/lspipeline/main.go b/bookpipeline/cmd/lspipeline/main.go
index 3e9df61..46a1d63 100644
--- a/bookpipeline/cmd/lspipeline/main.go
+++ b/bookpipeline/cmd/lspipeline/main.go
@@ -24,6 +24,7 @@ Lists useful things related to the pipeline.
 type LsPipeliner interface {
 	Init() error
 	PreQueueId() string
+	WipeQueueId() string
 	OCRQueueId() string
 	AnalyseQueueId() string
 	GetQueueDetails(url string) (string, string, error)
@@ -57,6 +58,7 @@ func getInstances(conn LsPipeliner, detailsc chan bookpipeline.InstanceDetails)
 func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) {
 	queues := []struct{ name, id string }{
 		{"preprocess", conn.PreQueueId()},
+		{"wipeonly", conn.WipeQueueId()},
 		{"ocr", conn.OCRQueueId()},
 		{"analyse", conn.AnalyseQueueId()},
 	}
diff --git a/bookpipeline/cmd/mkpipeline/main.go b/bookpipeline/cmd/mkpipeline/main.go
index 970543e..e37a56d 100644
--- a/bookpipeline/cmd/mkpipeline/main.go
+++ b/bookpipeline/cmd/mkpipeline/main.go
@@ -34,7 +34,7 @@ func main() {
 
 	prefix := "rescribe"
 	buckets := []string{"inprogress", "done"}
-	queues := []string{"preprocess", "ocr", "analyse"}
+	queues := []string{"preprocess", "wipeonly", "ocr", "analyse"}
 
 	for _, bucket := range buckets {
 		bname := prefix + bucket
diff --git a/preproc/wipesides.go b/preproc/wipesides.go
index 24fb7bd..1b73791 100644
--- a/preproc/wipesides.go
+++ b/preproc/wipesides.go
@@ -4,8 +4,14 @@ package preproc
 // TODO: switch to an interface rather than integralimg.I
 
 import (
+	"errors"
+	"fmt"
 	"image"
 	"image/color"
+	"image/draw"
+	_ "image/jpeg"
+	"image/png"
+	"os"
 
 	"rescribe.xyz/go.git/integralimg"
 )
@@ -106,7 +112,7 @@ func toonarrow(img *image.Gray, lowedge int, highedge int, min int) bool {
 	return false
 }
 
-// wipe fills the sections of image which fall outside the content
+// Wipe fills the sections of image which fall outside the content
 // area with white, providing the content area is above min %
 func Wipe(img *image.Gray, wsize int, thresh float64, min int) *image.Gray {
 	integral := integralimg.ToIntegralImg(img)
@@ -116,3 +122,39 @@ func Wipe(img *image.Gray, wsize int, thresh float64, min int) *image.Gray {
 	}
 	return wipesides(img, lowedge, highedge)
 }
+
+// WipeFile wipes an image file, filling the sections of the image
+// which fall outside the content area with white, providing the
+// content area is above min %.
+// inPath: path of the input image.
+// outPath: path to save the output image.
+// wsize: window size for wipe algorithm.
+// thresh: threshold for wipe algorithm.
+// min: minimum % of content area width to consider valid.
+func WipeFile(inPath string, outPath string, wsize int, thresh float64, min int) error {
+	f, err := os.Open(inPath)
+	defer f.Close()
+	if err != nil {
+		return errors.New(fmt.Sprintf("Could not open file %s: %v", inPath, err))
+	}
+	img, _, err := image.Decode(f)
+	if err != nil {
+		return errors.New(fmt.Sprintf("Could not decode image: %v", err))
+	}
+	b := img.Bounds()
+	gray := image.NewGray(image.Rect(0, 0, b.Dx(), b.Dy()))
+	draw.Draw(gray, b, img, b.Min, draw.Src)
+
+	clean := Wipe(gray, wsize, thresh, min)
+
+	f, err = os.Create(outPath)
+	if err != nil {
+		return errors.New(fmt.Sprintf("Could not create file %s: %v", outPath, err))
+	}
+	defer f.Close()
+	err = png.Encode(f, clean)
+	if err != nil {
+		return errors.New(fmt.Sprintf("Could not encode image: %v", err))
+	}
+	return nil
+}
-- 
cgit v1.2.1-24-ge1ad