From 550752fa2ab493fb6d10aa9d963fc45996c0d100 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 31 Jan 2022 14:11:21 +0000 Subject: Make pipeline context-aware, so the rescribe tool can cancel jobs --- cmd/rescribe/main.go | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) (limited to 'cmd/rescribe/main.go') diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 3f7bd71..cd242af 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -12,6 +12,7 @@ package main import ( "archive/zip" "bytes" + "context" _ "embed" "flag" "fmt" @@ -284,7 +285,9 @@ These training files are included in rescribe, and are always available: ispdf = true } - err = startProcess(*verboselog, tessCommand, bookdir, bookname, trainingName, savedir, tessdir) + var ctx context.Context + + err = startProcess(ctx, *verboselog, tessCommand, bookdir, bookname, trainingName, savedir, tessdir) if err != nil { log.Fatalln(err) } @@ -413,7 +416,7 @@ func rmIfNotImage(f string) error { return nil } -func startProcess(logger log.Logger, tessCommand string, bookdir string, bookname string, trainingName string, savedir string, tessdir string) error { +func startProcess(ctx context.Context, logger log.Logger, tessCommand string, bookdir string, bookname string, trainingName string, savedir string, tessdir string) error { _, err := exec.Command(tessCommand, "--help").Output() if err != nil { errmsg := "Error, Can't run Tesseract\n" @@ -441,14 +444,14 @@ func startProcess(logger log.Logger, tessCommand string, bookdir string, booknam fmt.Printf("Copying book to pipeline\n") - err = uploadbook(bookdir, bookname, conn) + err = uploadbook(ctx, bookdir, bookname, conn) if err != nil { _ = os.RemoveAll(tempdir) return fmt.Errorf("Error uploading book: %v", err) } fmt.Printf("Processing book\n") - err = processbook(trainingName, tessCommand, conn) + err = processbook(ctx, trainingName, tessCommand, conn) if err != nil { _ = os.RemoveAll(tempdir) return fmt.Errorf("Error processing book: %v", err) @@ -554,16 +557,16 @@ func addTxtVersion(hocrfn string) error { return nil } -func uploadbook(dir string, name string, conn Pipeliner) error { +func uploadbook(ctx context.Context, dir string, name string, conn Pipeliner) error { _, err := os.Stat(dir) if err != nil && !os.IsExist(err) { return fmt.Errorf("Error: directory %s not found", dir) } - err = pipeline.CheckImages(dir) + err = pipeline.CheckImages(ctx, dir) if err != nil { return fmt.Errorf("Error with images in %s: %v", dir, err) } - err = pipeline.UploadImages(dir, name, conn) + err = pipeline.UploadImages(ctx, dir, name, conn) if err != nil { return fmt.Errorf("Error saving images to process from %s: %v", dir, err) } @@ -602,7 +605,7 @@ func downloadbook(dir string, name string, conn Pipeliner) error { return nil } -func processbook(training string, tesscmd string, conn Pipeliner) error { +func processbook(ctx context.Context, training string, tesscmd string, conn Pipeliner) error { origPattern := regexp.MustCompile(`[0-9]{4}.(jpg|png)$`) wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.(jpg|png)$`) ocredPattern := regexp.MustCompile(`.hocr$`) @@ -624,6 +627,8 @@ func processbook(training string, tesscmd string, conn Pipeliner) error { for { select { + case <-ctx.Done(): + return ctx.Err() case <-checkPreQueue: msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs) checkPreQueue = time.After(PauseBetweenChecks) @@ -637,12 +642,12 @@ func processbook(training string, tesscmd string, conn Pipeliner) error { stopTimer(stopIfQuiet) conn.Log("Message received on preprocess queue, processing", msg.Body) fmt.Printf(" Preprocessing book (binarising and wiping)\n") - err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess(thresholds), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) - fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output + err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Preprocess(thresholds), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { return fmt.Errorf("Error during preprocess: %v", err) } + fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output case <-checkWipeQueue: msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) checkWipeQueue = time.After(PauseBetweenChecks) @@ -656,7 +661,7 @@ func processbook(training string, tesscmd string, conn Pipeliner) error { stopTimer(stopIfQuiet) conn.Log("Message received on wipeonly queue, processing", msg.Body) fmt.Printf(" Preprocessing book (wiping only)\n") - err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output resetTimer(stopIfQuiet, quietTime) if err != nil { @@ -677,7 +682,7 @@ func processbook(training string, tesscmd string, conn Pipeliner) error { stopTimer(stopIfQuiet) conn.Log("Message received on OCR Page queue, processing", msg.Body) fmt.Printf(".") - err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training, tesscmd), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + err = pipeline.OcrPage(ctx, msg, conn, pipeline.Ocr(training, tesscmd), conn.OCRPageQueueId(), conn.AnalyseQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { return fmt.Errorf("\nError during OCR Page process: %v", err) @@ -695,7 +700,7 @@ func processbook(training string, tesscmd string, conn Pipeliner) error { stopTimer(stopIfQuiet) conn.Log("Message received on analyse queue, processing", msg.Body) fmt.Printf("\n Analysing OCR and compiling PDFs\n") - err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") + err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") resetTimer(stopIfQuiet, quietTime) if err != nil { return fmt.Errorf("Error during analysis: %v", err) -- cgit v1.2.1-24-ge1ad