diff options
Diffstat (limited to 'cmd/bookpipeline/main.go')
-rw-r--r-- | cmd/bookpipeline/main.go | 23 |
1 files changed, 22 insertions, 1 deletions
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 11c5a41..2a9f54b 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -69,6 +69,7 @@ type Clouder interface { type Pipeliner interface { Clouder PreQueueId() string + PreNoWipeQueueId() string WipeQueueId() string OCRPageQueueId() string AnalyseQueueId() string @@ -151,6 +152,7 @@ func main() { hostname, err := os.Hostname() var checkPreQueue <-chan time.Time + var checkPreNoWipeQueue <-chan time.Time var checkWipeQueue <-chan time.Time var checkOCRPageQueue <-chan time.Time var checkAnalyseQueue <-chan time.Time @@ -168,6 +170,7 @@ func main() { if !*noanalyse { checkAnalyseQueue = time.After(0) } + checkPreNoWipeQueue = time.After(0) var quietTime = time.Duration(*autostop) * time.Second stopIfQuiet = time.NewTimer(quietTime) if quietTime == 0 { @@ -194,11 +197,29 @@ func main() { } conn.Log("Message received on preprocess queue, processing", msg.Body) stopTimer(stopIfQuiet) - err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}, false), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during preprocess", err) } + case <-checkPreNoWipeQueue: + msg, err := conn.CheckQueue(conn.PreNoWipeQueueId(), QueueTimeoutSecs) + checkPreNoWipeQueue = time.After(PauseBetweenChecks) + if err != nil { + conn.Log("Error checking preprocess (no wipe) queue", err) + continue + } + if msg.Handle == "" { + conn.Log("No message received on preprocess (no wipe) queue, sleeping") + continue + } + conn.Log("Message received on preprocess (no wipe) queue, processing", msg.Body) + stopTimer(stopIfQuiet) + err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}, true), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + resetTimer(stopIfQuiet, quietTime) + if err != nil { + conn.Log("Error during preprocess (no wipe)", err) + } case <-checkWipeQueue: msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) checkWipeQueue = time.After(PauseBetweenChecks) |