From 94352c6124aed2d85cfb21e83c0dafb6f918fb7d Mon Sep 17 00:00:00 2001
From: Nick White <git@njw.name>
Date: Wed, 2 Oct 2019 12:32:59 +0100
Subject: Add wipeonly queue and functionality

This is useful for prebinarised images, which don't need full preprocessing,
but do require wiping, albeit with a more conservative threshold.
---
 bookpipeline/cmd/bookpipeline/main.go | 43 ++++++++++++++++++++++++++++++++++-
 bookpipeline/cmd/lspipeline/main.go   |  2 ++
 bookpipeline/cmd/mkpipeline/main.go   |  2 +-
 3 files changed, 45 insertions(+), 2 deletions(-)

(limited to 'bookpipeline/cmd')

diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go
index de9b072..7d04e3a 100644
--- a/bookpipeline/cmd/bookpipeline/main.go
+++ b/bookpipeline/cmd/bookpipeline/main.go
@@ -19,7 +19,7 @@ import (
 	"rescribe.xyz/go.git/preproc"
 )
 
-const usage = `Usage: bookpipeline [-v] [-np] [-no] [-na] [-t training]
+const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-na] [-t training]
 
 Watches the preprocess, ocr and analyse queues for book names. When
 one is found this general process is followed:
@@ -60,6 +60,7 @@ type Clouder interface {
 type Pipeliner interface {
 	Clouder
 	PreQueueId() string
+	WipeQueueId() string
 	OCRQueueId() string
 	AnalyseQueueId() string
 	WIPStorageId() string
@@ -118,6 +119,25 @@ func preprocess(pre chan string, up chan string, errc chan error, logger *log.Lo
 	close(up)
 }
 
+func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) {
+	for path := range towipe {
+		logger.Println("Wiping", path)
+		s := strings.Split(path, ".")
+		base := strings.Join(s[:len(s)-1], "")
+		outpath := base + "_bin0.0.png"
+		err := preproc.WipeFile(path, outpath, 5, 0.03, 30)
+		if err != nil {
+			for range towipe {
+			} // consume the rest of the receiving channel so it isn't blocked
+			close(up)
+			errc <- err
+			return
+		}
+		up <- outpath
+	}
+	close(up)
+}
+
 func ocr(training string) func(chan string, chan string, chan error, *log.Logger) {
 	return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) {
 		for path := range toocr {
@@ -347,6 +367,7 @@ func main() {
 	verbose := flag.Bool("v", false, "verbose")
 	training := flag.String("t", "rescribealphav5", "tesseract training file to use")
 	nopreproc := flag.Bool("np", false, "disable preprocessing")
+	nowipe := flag.Bool("nw", false, "disable wipeonly")
 	noocr := flag.Bool("no", false, "disable ocr")
 	noanalyse := flag.Bool("na", false, "disable analysis")
 
@@ -379,11 +400,15 @@ func main() {
 	verboselog.Println("Finished setting up AWS session")
 
 	var checkPreQueue <-chan time.Time
+	var checkWipeQueue <-chan time.Time
 	var checkOCRQueue <-chan time.Time
 	var checkAnalyseQueue <-chan time.Time
 	if !*nopreproc {
 		checkPreQueue = time.After(0)
 	}
+	if !*nowipe {
+		checkWipeQueue = time.After(0)
+	}
 	if !*noocr {
 		checkOCRQueue = time.After(0)
 	}
@@ -409,6 +434,22 @@ func main() {
 			if err != nil {
 				log.Println("Error during preprocess", err)
 			}
+		case <-checkWipeQueue:
+			msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatTime*2)
+			checkWipeQueue = time.After(PauseBetweenChecks)
+			if err != nil {
+				log.Println("Error checking wipeonly queue", err)
+				continue
+			}
+			if msg.Handle == "" {
+				verboselog.Println("No message received on wipeonly queue, sleeping")
+				continue
+			}
+			verboselog.Println("Message received on wipeonly queue, processing", msg.Body)
+			err = processBook(msg, conn, wipe, origPattern, conn.WipeQueueId(), conn.OCRQueueId())
+			if err != nil {
+				log.Println("Error during wipe", err)
+			}
 		case <-checkOCRQueue:
 			msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime*2)
 			checkOCRQueue = time.After(PauseBetweenChecks)
diff --git a/bookpipeline/cmd/lspipeline/main.go b/bookpipeline/cmd/lspipeline/main.go
index 3e9df61..46a1d63 100644
--- a/bookpipeline/cmd/lspipeline/main.go
+++ b/bookpipeline/cmd/lspipeline/main.go
@@ -24,6 +24,7 @@ Lists useful things related to the pipeline.
 type LsPipeliner interface {
 	Init() error
 	PreQueueId() string
+	WipeQueueId() string
 	OCRQueueId() string
 	AnalyseQueueId() string
 	GetQueueDetails(url string) (string, string, error)
@@ -57,6 +58,7 @@ func getInstances(conn LsPipeliner, detailsc chan bookpipeline.InstanceDetails)
 func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) {
 	queues := []struct{ name, id string }{
 		{"preprocess", conn.PreQueueId()},
+		{"wipeonly", conn.WipeQueueId()},
 		{"ocr", conn.OCRQueueId()},
 		{"analyse", conn.AnalyseQueueId()},
 	}
diff --git a/bookpipeline/cmd/mkpipeline/main.go b/bookpipeline/cmd/mkpipeline/main.go
index 970543e..e37a56d 100644
--- a/bookpipeline/cmd/mkpipeline/main.go
+++ b/bookpipeline/cmd/mkpipeline/main.go
@@ -34,7 +34,7 @@ func main() {
 
 	prefix := "rescribe"
 	buckets := []string{"inprogress", "done"}
-	queues := []string{"preprocess", "ocr", "analyse"}
+	queues := []string{"preprocess", "wipeonly", "ocr", "analyse"}
 
 	for _, bucket := range buckets {
 		bname := prefix + bucket
-- 
cgit v1.2.1-24-ge1ad