summaryrefslogtreecommitdiff
path: root/cmd/bookpipeline/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/bookpipeline/main.go')
-rw-r--r--cmd/bookpipeline/main.go35
1 files changed, 30 insertions, 5 deletions
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go
index 65c9b79..076df32 100644
--- a/cmd/bookpipeline/main.go
+++ b/cmd/bookpipeline/main.go
@@ -9,6 +9,7 @@ package main
import (
"bytes"
+ "context"
"flag"
"fmt"
"log"
@@ -68,6 +69,7 @@ type Clouder interface {
type Pipeliner interface {
Clouder
PreQueueId() string
+ PreNoWipeQueueId() string
WipeQueueId() string
OCRPageQueueId() string
AnalyseQueueId() string
@@ -91,7 +93,7 @@ func resetTimer(t *time.Timer, d time.Duration) {
func main() {
verbose := flag.Bool("v", false, "verbose")
- training := flag.String("t", "rescribealphav5", "default tesseract training file to use (without the .traineddata part)")
+ training := flag.String("t", "rescribev9", "default tesseract training file to use (without the .traineddata part)")
nopreproc := flag.Bool("np", false, "disable preprocessing")
nowipe := flag.Bool("nw", false, "disable wipeonly")
noocrpg := flag.Bool("nop", false, "disable ocr on individual pages")
@@ -118,6 +120,9 @@ func main() {
wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`)
ocredPattern := regexp.MustCompile(`.hocr$`)
+ var ctx context.Context
+ ctx = context.Background()
+
var conn Pipeliner
switch *conntype {
case "aws":
@@ -147,6 +152,7 @@ func main() {
hostname, err := os.Hostname()
var checkPreQueue <-chan time.Time
+ var checkPreNoWipeQueue <-chan time.Time
var checkWipeQueue <-chan time.Time
var checkOCRPageQueue <-chan time.Time
var checkAnalyseQueue <-chan time.Time
@@ -164,6 +170,7 @@ func main() {
if !*noanalyse {
checkAnalyseQueue = time.After(0)
}
+ checkPreNoWipeQueue = time.After(0)
var quietTime = time.Duration(*autostop) * time.Second
stopIfQuiet = time.NewTimer(quietTime)
if quietTime == 0 {
@@ -190,11 +197,29 @@ func main() {
}
conn.Log("Message received on preprocess queue, processing", msg.Body)
stopTimer(stopIfQuiet)
- err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
+ err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}, false), origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during preprocess", err)
}
+ case <-checkPreNoWipeQueue:
+ msg, err := conn.CheckQueue(conn.PreNoWipeQueueId(), QueueTimeoutSecs)
+ checkPreNoWipeQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ conn.Log("Error checking preprocess (no wipe) queue", err)
+ continue
+ }
+ if msg.Handle == "" {
+ conn.Log("No message received on preprocess (no wipe) queue, sleeping")
+ continue
+ }
+ conn.Log("Message received on preprocess (no wipe) queue, processing", msg.Body)
+ stopTimer(stopIfQuiet)
+ err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}, true), origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
+ resetTimer(stopIfQuiet, quietTime)
+ if err != nil {
+ conn.Log("Error during preprocess (no wipe)", err)
+ }
case <-checkWipeQueue:
msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs)
checkWipeQueue = time.After(PauseBetweenChecks)
@@ -208,7 +233,7 @@ func main() {
}
stopTimer(stopIfQuiet)
conn.Log("Message received on wipeonly queue, processing", msg.Body)
- err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
+ err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during wipe", err)
@@ -228,7 +253,7 @@ func main() {
checkOCRPageQueue = time.After(0)
stopTimer(stopIfQuiet)
conn.Log("Message received on OCR Page queue, processing", msg.Body)
- err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training, ""), conn.OCRPageQueueId(), conn.AnalyseQueueId())
+ err = pipeline.OcrPage(ctx, msg, conn, pipeline.Ocr(*training, ""), conn.OCRPageQueueId(), conn.AnalyseQueueId())
resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during OCR Page process", err)
@@ -246,7 +271,7 @@ func main() {
}
stopTimer(stopIfQuiet)
conn.Log("Message received on analyse queue, processing", msg.Body)
- err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "")
+ err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Analyse(conn, false), ocredPattern, conn.AnalyseQueueId(), "")
resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during analysis", err)