summaryrefslogtreecommitdiff
path: root/cmd/rescribe/main.go
diff options
context:
space:
mode:
authorNick White <git@njw.name>2022-01-31 14:11:21 +0000
committerNick White <git@njw.name>2022-01-31 14:11:21 +0000
commit550752fa2ab493fb6d10aa9d963fc45996c0d100 (patch)
tree279d2c7c7d062f6232f363d1462539738b7e4cc8 /cmd/rescribe/main.go
parent57a3dc6da88e08951060e2e6e11605eb807f54ac (diff)
Make pipeline context-aware, so the rescribe tool can cancel jobs
Diffstat (limited to 'cmd/rescribe/main.go')
-rw-r--r--cmd/rescribe/main.go31
1 files changed, 18 insertions, 13 deletions
diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go
index 3f7bd71..cd242af 100644
--- a/cmd/rescribe/main.go
+++ b/cmd/rescribe/main.go
@@ -12,6 +12,7 @@ package main
import (
"archive/zip"
"bytes"
+ "context"
_ "embed"
"flag"
"fmt"
@@ -284,7 +285,9 @@ These training files are included in rescribe, and are always available:
ispdf = true
}
- err = startProcess(*verboselog, tessCommand, bookdir, bookname, trainingName, savedir, tessdir)
+ var ctx context.Context
+
+ err = startProcess(ctx, *verboselog, tessCommand, bookdir, bookname, trainingName, savedir, tessdir)
if err != nil {
log.Fatalln(err)
}
@@ -413,7 +416,7 @@ func rmIfNotImage(f string) error {
return nil
}
-func startProcess(logger log.Logger, tessCommand string, bookdir string, bookname string, trainingName string, savedir string, tessdir string) error {
+func startProcess(ctx context.Context, logger log.Logger, tessCommand string, bookdir string, bookname string, trainingName string, savedir string, tessdir string) error {
_, err := exec.Command(tessCommand, "--help").Output()
if err != nil {
errmsg := "Error, Can't run Tesseract\n"
@@ -441,14 +444,14 @@ func startProcess(logger log.Logger, tessCommand string, bookdir string, booknam
fmt.Printf("Copying book to pipeline\n")
- err = uploadbook(bookdir, bookname, conn)
+ err = uploadbook(ctx, bookdir, bookname, conn)
if err != nil {
_ = os.RemoveAll(tempdir)
return fmt.Errorf("Error uploading book: %v", err)
}
fmt.Printf("Processing book\n")
- err = processbook(trainingName, tessCommand, conn)
+ err = processbook(ctx, trainingName, tessCommand, conn)
if err != nil {
_ = os.RemoveAll(tempdir)
return fmt.Errorf("Error processing book: %v", err)
@@ -554,16 +557,16 @@ func addTxtVersion(hocrfn string) error {
return nil
}
-func uploadbook(dir string, name string, conn Pipeliner) error {
+func uploadbook(ctx context.Context, dir string, name string, conn Pipeliner) error {
_, err := os.Stat(dir)
if err != nil && !os.IsExist(err) {
return fmt.Errorf("Error: directory %s not found", dir)
}
- err = pipeline.CheckImages(dir)
+ err = pipeline.CheckImages(ctx, dir)
if err != nil {
return fmt.Errorf("Error with images in %s: %v", dir, err)
}
- err = pipeline.UploadImages(dir, name, conn)
+ err = pipeline.UploadImages(ctx, dir, name, conn)
if err != nil {
return fmt.Errorf("Error saving images to process from %s: %v", dir, err)
}
@@ -602,7 +605,7 @@ func downloadbook(dir string, name string, conn Pipeliner) error {
return nil
}
-func processbook(training string, tesscmd string, conn Pipeliner) error {
+func processbook(ctx context.Context, training string, tesscmd string, conn Pipeliner) error {
origPattern := regexp.MustCompile(`[0-9]{4}.(jpg|png)$`)
wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.(jpg|png)$`)
ocredPattern := regexp.MustCompile(`.hocr$`)
@@ -624,6 +627,8 @@ func processbook(training string, tesscmd string, conn Pipeliner) error {
for {
select {
+ case <-ctx.Done():
+ return ctx.Err()
case <-checkPreQueue:
msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs)
checkPreQueue = time.After(PauseBetweenChecks)
@@ -637,12 +642,12 @@ func processbook(training string, tesscmd string, conn Pipeliner) error {
stopTimer(stopIfQuiet)
conn.Log("Message received on preprocess queue, processing", msg.Body)
fmt.Printf(" Preprocessing book (binarising and wiping)\n")
- err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess(thresholds), origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
- fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output
+ err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Preprocess(thresholds), origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
resetTimer(stopIfQuiet, quietTime)
if err != nil {
return fmt.Errorf("Error during preprocess: %v", err)
}
+ fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output
case <-checkWipeQueue:
msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs)
checkWipeQueue = time.After(PauseBetweenChecks)
@@ -656,7 +661,7 @@ func processbook(training string, tesscmd string, conn Pipeliner) error {
stopTimer(stopIfQuiet)
conn.Log("Message received on wipeonly queue, processing", msg.Body)
fmt.Printf(" Preprocessing book (wiping only)\n")
- err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
+ err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output
resetTimer(stopIfQuiet, quietTime)
if err != nil {
@@ -677,7 +682,7 @@ func processbook(training string, tesscmd string, conn Pipeliner) error {
stopTimer(stopIfQuiet)
conn.Log("Message received on OCR Page queue, processing", msg.Body)
fmt.Printf(".")
- err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training, tesscmd), conn.OCRPageQueueId(), conn.AnalyseQueueId())
+ err = pipeline.OcrPage(ctx, msg, conn, pipeline.Ocr(training, tesscmd), conn.OCRPageQueueId(), conn.AnalyseQueueId())
resetTimer(stopIfQuiet, quietTime)
if err != nil {
return fmt.Errorf("\nError during OCR Page process: %v", err)
@@ -695,7 +700,7 @@ func processbook(training string, tesscmd string, conn Pipeliner) error {
stopTimer(stopIfQuiet)
conn.Log("Message received on analyse queue, processing", msg.Body)
fmt.Printf("\n Analysing OCR and compiling PDFs\n")
- err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "")
+ err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "")
resetTimer(stopIfQuiet, quietTime)
if err != nil {
return fmt.Errorf("Error during analysis: %v", err)