diff options
author | Nick White <git@njw.name> | 2022-01-31 14:11:21 +0000 |
---|---|---|
committer | Nick White <git@njw.name> | 2022-01-31 14:11:21 +0000 |
commit | 550752fa2ab493fb6d10aa9d963fc45996c0d100 (patch) | |
tree | 279d2c7c7d062f6232f363d1462539738b7e4cc8 /cmd/bookpipeline | |
parent | 57a3dc6da88e08951060e2e6e11605eb807f54ac (diff) |
Make pipeline context-aware, so the rescribe tool can cancel jobs
Diffstat (limited to 'cmd/bookpipeline')
-rw-r--r-- | cmd/bookpipeline/main.go | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 65c9b79..4de9ea9 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -9,6 +9,7 @@ package main import ( "bytes" + "context" "flag" "fmt" "log" @@ -118,6 +119,8 @@ func main() { wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`) ocredPattern := regexp.MustCompile(`.hocr$`) + var ctx context.Context + var conn Pipeliner switch *conntype { case "aws": @@ -190,7 +193,7 @@ func main() { } conn.Log("Message received on preprocess queue, processing", msg.Body) stopTimer(stopIfQuiet) - err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during preprocess", err) @@ -208,7 +211,7 @@ func main() { } stopTimer(stopIfQuiet) conn.Log("Message received on wipeonly queue, processing", msg.Body) - err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during wipe", err) @@ -228,7 +231,7 @@ func main() { checkOCRPageQueue = time.After(0) stopTimer(stopIfQuiet) conn.Log("Message received on OCR Page queue, processing", msg.Body) - err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training, ""), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + err = pipeline.OcrPage(ctx, msg, conn, pipeline.Ocr(*training, ""), conn.OCRPageQueueId(), conn.AnalyseQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during OCR Page process", err) @@ -246,7 +249,7 @@ func main() { } stopTimer(stopIfQuiet) conn.Log("Message received on analyse queue, processing", msg.Body) - err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") + err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during analysis", err) |