From 6e7b6af77326f307df4f797a821af661a64ce6d8 Mon Sep 17 00:00:00 2001
From: Nick White <git@njw.name>
Date: Tue, 19 Nov 2019 16:07:07 +0000
Subject: Send pages to the individual OCR Page queue by default

This now concludes the OCR Page queue stuff; it should all be working
out of the box now.
---
 cmd/bookpipeline/main.go | 50 ++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 40 insertions(+), 10 deletions(-)

(limited to 'cmd')

diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go
index d70dbc7..3a539c1 100644
--- a/cmd/bookpipeline/main.go
+++ b/cmd/bookpipeline/main.go
@@ -35,6 +35,7 @@ one is found this general process is followed:
 `
 
 const PauseBetweenChecks = 3 * time.Minute
+const PauseBetweenOCRPageChecks = 1 * time.Second
 const HeartbeatTime = 60
 
 // null writer to enable non-verbose logging to be discarded
@@ -100,6 +101,31 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha
 	done <- true
 }
 
+func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) {
+	for path := range c {
+		name := filepath.Base(path)
+		key := filepath.Join(bookname, name)
+		logger.Println("Uploading", key)
+		err := conn.Upload(conn.WIPStorageId(), key, path)
+		if err != nil {
+			for range c {
+			} // consume the rest of the receiving channel so it isn't blocked
+			errc <- err
+			return
+		}
+		conn.GetLogger().Println("Adding", key, "to queue", toQueue)
+		err = conn.AddToQueue(toQueue, key)
+		if err != nil {
+			for range c {
+			} // consume the rest of the receiving channel so it isn't blocked
+			errc <- err
+			return
+		}
+	}
+
+	done <- true
+}
+
 func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) {
 	for path := range pre {
 		logger.Println("Preprocessing", path)
@@ -373,10 +399,10 @@ func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, ch
 	case m, ok := <-msgc:
 		if ok {
 			msg = m
-			conn.GetLogger().Println("Using new message handle to delete message from old queue")
+			conn.GetLogger().Println("Using new message handle to delete message from queue")
 		}
 	default:
-		conn.GetLogger().Println("Using original message handle to delete message from old queue")
+		conn.GetLogger().Println("Using original message handle to delete message from queue")
 	}
 
 	conn.GetLogger().Println("Deleting original message from queue", fromQueue)
@@ -416,7 +442,11 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
 	// these functions will do their jobs when their channels have data
 	go download(dl, processc, conn, d, errc, conn.GetLogger())
 	go process(processc, upc, errc, conn.GetLogger())
-	go up(upc, done, conn, bookname, errc, conn.GetLogger())
+	if toQueue == conn.OCRPageQueueId() {
+		go upAndQueue(upc, done, toQueue, conn, bookname, errc, conn.GetLogger())
+	} else {
+		go up(upc, done, conn, bookname, errc, conn.GetLogger())
+	}
 
 	conn.GetLogger().Println("Getting list of objects to download")
 	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
@@ -447,7 +477,8 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
 	case <-done:
 	}
 
-	if toQueue != "" {
+	if toQueue != "" && toQueue != conn.OCRPageQueueId() {
+		go upAndQueue(upc, done, toQueue, conn, bookname, errc, conn.GetLogger())
 		conn.GetLogger().Println("Sending", bookname, "to queue", toQueue)
 		err = conn.AddToQueue(toQueue, bookname)
 		if err != nil {
@@ -464,10 +495,10 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
 	case m, ok := <-msgc:
 		if ok {
 			msg = m
-			conn.GetLogger().Println("Using new message handle to delete message from old queue")
+			conn.GetLogger().Println("Using new message handle to delete message from queue")
 		}
 	default:
-		conn.GetLogger().Println("Using original message handle to delete message from old queue")
+		conn.GetLogger().Println("Using original message handle to delete message from queue")
 	}
 
 	conn.GetLogger().Println("Deleting original message from queue", fromQueue)
@@ -558,7 +589,7 @@ func main() {
 				continue
 			}
 			verboselog.Println("Message received on preprocess queue, processing", msg.Body)
-			err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId())
+			err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
 			if err != nil {
 				log.Println("Error during preprocess", err)
 			}
@@ -574,19 +605,18 @@ func main() {
 				continue
 			}
 			verboselog.Println("Message received on wipeonly queue, processing", msg.Body)
-			err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRQueueId())
+			err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
 			if err != nil {
 				log.Println("Error during wipe", err)
 			}
 		case <-checkOCRPageQueue:
 			msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatTime*2)
-			checkOCRPageQueue = time.After(PauseBetweenChecks)
+			checkOCRPageQueue = time.After(PauseBetweenOCRPageChecks)
 			if err != nil {
 				log.Println("Error checking OCR Page queue", err)
 				continue
 			}
 			if msg.Handle == "" {
-				verboselog.Println("No message received on OCR Page queue, sleeping")
 				continue
 			}
 			verboselog.Println("Message received on OCR Page queue, processing", msg.Body)
-- 
cgit v1.2.1-24-ge1ad