From 21d49b546a27de6c53d8fe7d1a68d5a3b5506c93 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 28 Feb 2022 16:17:35 +0000 Subject: Add PreNoWipe queue, that just does binarisation but no wiping --- aws.go | 16 +++++++++++++++- cloudsettings.go | 11 ++++++----- cmd/bookpipeline/main.go | 23 ++++++++++++++++++++++- cmd/booktopipeline/main.go | 12 +++++++++--- cmd/rescribe/gui.go | 8 +++++--- cmd/rescribe/main.go | 39 +++++++++++++++++++++++++++++++-------- doc.go | 11 +++++++++++ internal/pipeline/pipeline.go | 9 ++++++--- internal/pipeline/put.go | 5 ++++- local.go | 5 +++++ 10 files changed, 114 insertions(+), 25 deletions(-) diff --git a/aws.go b/aws.go index 0ca657f..b954951 100644 --- a/aws.go +++ b/aws.go @@ -52,6 +52,7 @@ type AwsConn struct { uploader *s3manager.Uploader wipequrl string prequrl string + prenwqurl string ocrpgqurl string analysequrl string testqurl string @@ -102,6 +103,15 @@ func (a *AwsConn) Init() error { } a.prequrl = *result.QueueUrl + a.Logger.Println("Getting preprocess no wipe queue URL") + result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(queuePreNoWipe), + }) + if err != nil { + return errors.New(fmt.Sprintf("Error getting preprocess no wipe queue URL: %s", err)) + } + a.prenwqurl = *result.QueueUrl + a.Logger.Println("Getting wipeonly queue URL") result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String(queueWipeOnly), @@ -337,6 +347,10 @@ func (a *AwsConn) PreQueueId() string { return a.prequrl } +func (a *AwsConn) PreNoWipeQueueId() string { + return a.prenwqurl +} + func (a *AwsConn) WipeQueueId() string { return a.wipequrl } @@ -616,7 +630,7 @@ func (a *AwsConn) Log(v ...interface{}) { // TODO: also set up the necessary security group and iam stuff func (a *AwsConn) MkPipeline() error { buckets := []string{storageWip} - queues := []string{queuePreProc, queueWipeOnly, queueAnalyse, queueOcrPage, queueTest} + queues := []string{queuePreProc, queuePreNoWipe, queueWipeOnly, queueAnalyse, queueOcrPage, queueTest} for _, bucket := range buckets { err := a.CreateBucket(bucket) diff --git a/cloudsettings.go b/cloudsettings.go index eec088e..5d1d41c 100644 --- a/cloudsettings.go +++ b/cloudsettings.go @@ -26,11 +26,12 @@ const ( // Queue names. Can be anything unique in SQS. const ( - queuePreProc = "rescribepreprocess" - queueWipeOnly = "rescribewipeonly" - queueOcrPage = "rescribeocrpage" - queueAnalyse = "rescribeanalyse" - queueTest = "rescribetest1" + queuePreProc = "rescribepreprocess" + queuePreNoWipe = "rescribeprenowipe" + queueWipeOnly = "rescribewipeonly" + queueOcrPage = "rescribeocrpage" + queueAnalyse = "rescribeanalyse" + queueTest = "rescribetest1" ) // Storage bucket names. Can be anything unique in S3. diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 11c5a41..2a9f54b 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -69,6 +69,7 @@ type Clouder interface { type Pipeliner interface { Clouder PreQueueId() string + PreNoWipeQueueId() string WipeQueueId() string OCRPageQueueId() string AnalyseQueueId() string @@ -151,6 +152,7 @@ func main() { hostname, err := os.Hostname() var checkPreQueue <-chan time.Time + var checkPreNoWipeQueue <-chan time.Time var checkWipeQueue <-chan time.Time var checkOCRPageQueue <-chan time.Time var checkAnalyseQueue <-chan time.Time @@ -168,6 +170,7 @@ func main() { if !*noanalyse { checkAnalyseQueue = time.After(0) } + checkPreNoWipeQueue = time.After(0) var quietTime = time.Duration(*autostop) * time.Second stopIfQuiet = time.NewTimer(quietTime) if quietTime == 0 { @@ -194,11 +197,29 @@ func main() { } conn.Log("Message received on preprocess queue, processing", msg.Body) stopTimer(stopIfQuiet) - err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}, false), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during preprocess", err) } + case <-checkPreNoWipeQueue: + msg, err := conn.CheckQueue(conn.PreNoWipeQueueId(), QueueTimeoutSecs) + checkPreNoWipeQueue = time.After(PauseBetweenChecks) + if err != nil { + conn.Log("Error checking preprocess (no wipe) queue", err) + continue + } + if msg.Handle == "" { + conn.Log("No message received on preprocess (no wipe) queue, sleeping") + continue + } + conn.Log("Message received on preprocess (no wipe) queue, processing", msg.Body) + stopTimer(stopIfQuiet) + err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}, true), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + resetTimer(stopIfQuiet, quietTime) + if err != nil { + conn.Log("Error during preprocess (no wipe)", err) + } case <-checkWipeQueue: msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) checkWipeQueue = time.After(PauseBetweenChecks) diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go index bf088a0..ee2ef47 100644 --- a/cmd/booktopipeline/main.go +++ b/cmd/booktopipeline/main.go @@ -19,7 +19,7 @@ import ( "rescribe.xyz/bookpipeline/internal/pipeline" ) -const usage = `Usage: booktopipeline [-c conn] [-t training] [-prebinarised] [-notbinarised] [-v] bookdir [bookname] +const usage = `Usage: booktopipeline [-c conn] [-t training] [-prebinarised] [-notbinarised] [-nowipe] [-v] bookdir [bookname] Uploads the book in bookdir to the S3 'inprogress' bucket and adds it to the 'preprocess' or 'wipeonly' SQS queue. The queue to send to is @@ -46,6 +46,7 @@ func main() { conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") wipeonly := flag.Bool("prebinarised", false, "Prebinarised: only preprocessing will be to wipe") dobinarise := flag.Bool("notbinarised", false, "Not binarised: all preprocessing will be done including binarisation") + nowipe := flag.Bool("nowipe", false, "No wipe: Disable wiping as part of preprocessing") training := flag.String("t", "", "Training to use (training filename without the .traineddata part)") flag.Usage = func() { @@ -89,7 +90,7 @@ func main() { log.Fatalln("Failed to set up cloud connection:", err) } - qid := pipeline.DetectQueueType(bookdir, conn) + qid := pipeline.DetectQueueType(bookdir, conn, false) // Flags set override the queue selection if *wipeonly { @@ -98,6 +99,9 @@ func main() { if *dobinarise { qid = conn.PreQueueId() } + if *nowipe { + qid = conn.PreNoWipeQueueId() + } verboselog.Println("Checking that all images are valid in", bookdir) err = pipeline.CheckImages(ctx, bookdir) @@ -131,8 +135,10 @@ func main() { var qname string if qid == conn.PreQueueId() { qname = "preprocess" - } else { + } else if qid == conn.WipeQueueId() { qname = "wipeonly" + } else { + qname = "nowipe" } fmt.Println("Uploaded book to queue", qname) diff --git a/cmd/rescribe/gui.go b/cmd/rescribe/gui.go index 8603e08..c67d15a 100644 --- a/cmd/rescribe/gui.go +++ b/cmd/rescribe/gui.go @@ -338,6 +338,8 @@ func startGui(log log.Logger, cmd string, gbookcmd string, training string, tess d.Show() }) + wipe := widget.NewCheck("Automatically clean image sides", func(bool) {}) + trainingLabel := widget.NewLabel("Training") trainingOpts := mkTrainingSelect([]string{training}, myWindow) @@ -527,7 +529,7 @@ func startGui(log log.Logger, cmd string, gbookcmd string, training string, tess training = training[start:end] } - err = startProcess(ctx, log, cmd, bookdir, bookname, training, savedir, tessdir) + err = startProcess(ctx, log, cmd, bookdir, bookname, training, savedir, tessdir, !wipe.Checked) if err != nil && strings.HasSuffix(err.Error(), "context canceled") { progressBar.SetValue(0.0) return @@ -561,8 +563,8 @@ func startGui(log log.Logger, cmd string, gbookcmd string, training string, tess trainingBits := container.New(layout.NewBorderLayout(nil, nil, trainingLabel, nil), trainingLabel, trainingOpts) - fullContent = container.NewVBox(choices, chosen, trainingBits, gobtn, abortbtn, progressBar, detail) - startContent := container.NewVBox(choices, trainingBits, gobtn, abortbtn, progressBar, detail) + fullContent = container.NewVBox(choices, chosen, trainingBits, wipe, gobtn, abortbtn, progressBar, detail) + startContent := container.NewVBox(choices, trainingBits, wipe, gobtn, abortbtn, progressBar, detail) myWindow.SetContent(startContent) diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index ec37f05..54623b1 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -74,6 +74,7 @@ type Clouder interface { type Pipeliner interface { Clouder PreQueueId() string + PreNoWipeQueueId() string WipeQueueId() string OCRPageQueueId() string AnalyseQueueId() string @@ -155,6 +156,7 @@ These training files are included in rescribe, and are always available: - rescribev8_fast.traineddata (Latin historic printing) `) tesscmd := flag.String("tesscmd", deftesscmd, "The Tesseract executable to run. You may need to set this to the full path of Tesseract.exe if you're on Windows.") + wipe := flag.Bool("wipe", false, "Use wiper tool to remove noise like gutters from page before processing.") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) @@ -279,6 +281,7 @@ These training files are included in rescribe, and are always available: } var ctx context.Context + ctx = context.Background() // TODO: support google book downloading, as done with the GUI @@ -288,8 +291,6 @@ These training files are included in rescribe, and are always available: savedir = strings.TrimSuffix(bookdir, ".pdf") } - // BUG: this seems to fail from command line, yet works from GUI - // (used to work) bookdir, err = extractPdfImgs(ctx, bookdir) if err != nil { log.Fatalln("Error opening file as PDF:", err) @@ -305,7 +306,7 @@ These training files are included in rescribe, and are always available: ispdf = true } - err = startProcess(ctx, *verboselog, tessCommand, bookdir, bookname, trainingName, savedir, tessdir) + err = startProcess(ctx, *verboselog, tessCommand, bookdir, bookname, trainingName, savedir, tessdir, !*wipe) if err != nil { log.Fatalln(err) } @@ -445,7 +446,7 @@ func rmIfNotImage(f string) error { return nil } -func startProcess(ctx context.Context, logger log.Logger, tessCommand string, bookdir string, bookname string, trainingName string, savedir string, tessdir string) error { +func startProcess(ctx context.Context, logger log.Logger, tessCommand string, bookdir string, bookname string, trainingName string, savedir string, tessdir string, nowipe bool) error { cmd := exec.Command(tessCommand, "--help") pipeline.HideCmd(cmd) _, err := cmd.Output() @@ -475,7 +476,7 @@ func startProcess(ctx context.Context, logger log.Logger, tessCommand string, bo fmt.Printf("Copying book to pipeline\n") - err = uploadbook(ctx, bookdir, bookname, conn) + err = uploadbook(ctx, bookdir, bookname, conn, nowipe) if err != nil { _ = os.RemoveAll(tempdir) return fmt.Errorf("Error uploading book: %v", err) @@ -588,7 +589,7 @@ func addTxtVersion(hocrfn string) error { return nil } -func uploadbook(ctx context.Context, dir string, name string, conn Pipeliner) error { +func uploadbook(ctx context.Context, dir string, name string, conn Pipeliner, nowipe bool) error { _, err := os.Stat(dir) if err != nil && !os.IsExist(err) { return fmt.Errorf("Error: directory %s not found", dir) @@ -602,7 +603,8 @@ func uploadbook(ctx context.Context, dir string, name string, conn Pipeliner) er return fmt.Errorf("Error saving images to process from %s: %v", dir, err) } - qid := pipeline.DetectQueueType(dir, conn) + qid := pipeline.DetectQueueType(dir, conn, nowipe) + fmt.Printf("Uploading to queue %s\n", qid) err = conn.AddToQueue(qid, name) if err != nil { @@ -642,11 +644,13 @@ func processbook(ctx context.Context, training string, tesscmd string, conn Pipe ocredPattern := regexp.MustCompile(`.hocr$`) var checkPreQueue <-chan time.Time + var checkPreNoWipeQueue <-chan time.Time var checkWipeQueue <-chan time.Time var checkOCRPageQueue <-chan time.Time var checkAnalyseQueue <-chan time.Time var stopIfQuiet *time.Timer checkPreQueue = time.After(0) + checkPreNoWipeQueue = time.After(0) checkWipeQueue = time.After(0) checkOCRPageQueue = time.After(0) checkAnalyseQueue = time.After(0) @@ -660,6 +664,25 @@ func processbook(ctx context.Context, training string, tesscmd string, conn Pipe select { case <-ctx.Done(): return ctx.Err() + case <-checkPreNoWipeQueue: + msg, err := conn.CheckQueue(conn.PreNoWipeQueueId(), QueueTimeoutSecs) + checkPreNoWipeQueue = time.After(PauseBetweenChecks) + if err != nil { + return fmt.Errorf("Error checking preprocess no wipe queue: %v", err) + } + if msg.Handle == "" { + conn.Log("No message received on preprocess no wipe queue, sleeping") + continue + } + stopTimer(stopIfQuiet) + conn.Log("Message received on preprocess no wipe queue, processing", msg.Body) + fmt.Printf(" Preprocessing book (binarising only, no wiping)\n") + err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Preprocess(thresholds, true), origPattern, conn.PreNoWipeQueueId(), conn.OCRPageQueueId()) + resetTimer(stopIfQuiet, quietTime) + if err != nil { + return fmt.Errorf("Error during preprocess (no wipe): %v", err) + } + fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output case <-checkPreQueue: msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs) checkPreQueue = time.After(PauseBetweenChecks) @@ -673,7 +696,7 @@ func processbook(ctx context.Context, training string, tesscmd string, conn Pipe stopTimer(stopIfQuiet) conn.Log("Message received on preprocess queue, processing", msg.Body) fmt.Printf(" Preprocessing book (binarising and wiping)\n") - err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Preprocess(thresholds), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(ctx, msg, conn, pipeline.Preprocess(thresholds, false), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { return fmt.Errorf("Error during preprocess: %v", err) diff --git a/doc.go b/doc.go index bd4da11..9624177 100644 --- a/doc.go +++ b/doc.go @@ -131,6 +131,17 @@ which have been prebinarised. example message: APolishGentleman_MemoirByAdamKruczkiewicz example message: APolishGentleman_MemoirByAdamKruczkiewicz rescribefrav2 +queuePreNoWipe + +This queue works the same as queuePreProc, except that it doesn'T +wipe the pages, only runs the binarisation. It is designed for books +which don't have tricky gutters or similar noise around the edges, but +do have marginal content which might be inadventently removed by the +wiper. + + example message: APolishGentleman_MemoirByAdamKruczkiewicz + example message: APolishGentleman_MemoirByAdamKruczkiewicz rescribefrav2 + queueOcrPage This queue contains the path of individual pages, optionally followed by diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 639bba1..40ed02c 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -63,6 +63,7 @@ type Queuer interface { DelFromQueue(url string, handle string) error Log(v ...interface{}) OCRPageQueueId() string + PreNoWipeQueueId() string PreQueueId() string QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) WipeQueueId() string @@ -72,6 +73,7 @@ type UploadQueuer interface { Log(v ...interface{}) Upload(bucket string, key string, path string) error WIPStorageId() string + PreNoWipeQueueId() string PreQueueId() string WipeQueueId() string OCRPageQueueId() string @@ -93,6 +95,7 @@ type Pipeliner interface { ListObjects(bucket string, prefix string) ([]string, error) Log(v ...interface{}) OCRPageQueueId() string + PreNoWipeQueueId() string PreQueueId() string QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) Upload(bucket string, key string, path string) error @@ -239,7 +242,7 @@ func upAndQueue(ctx context.Context, c chan string, done chan bool, toQueue stri done <- true } -func Preprocess(thresholds []float64) func(context.Context, chan string, chan string, chan error, *log.Logger) { +func Preprocess(thresholds []float64, nowipe bool) func(context.Context, chan string, chan string, chan error, *log.Logger) { return func(ctx context.Context, pre chan string, up chan string, errc chan error, logger *log.Logger) { for path := range pre { select { @@ -251,7 +254,7 @@ func Preprocess(thresholds []float64) func(context.Context, chan string, chan st default: } logger.Println("Preprocessing", path) - done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, true, 5, 30, 120, 30) + done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, !nowipe, 5, 30, 120, 30) if err != nil { for range pre { } // consume the rest of the receiving channel so it isn't blocked @@ -822,7 +825,7 @@ func ProcessBook(ctx context.Context, msg bookpipeline.Qmsg, conn Pipeliner, pro // complete, and will fill the ocrpage queue with parts which succeeded // on each run, so in that case it's better to delete the message from // the queue and notify us. - if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { + if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() || fromQueue == conn.PreNoWipeQueueId() { conn.Log("Deleting message from queue due to a bad error", fromQueue) err2 := conn.DelFromQueue(fromQueue, msg.Handle) if err2 != nil { diff --git a/internal/pipeline/put.go b/internal/pipeline/put.go index aba9e0e..47729b5 100644 --- a/internal/pipeline/put.go +++ b/internal/pipeline/put.go @@ -83,7 +83,10 @@ func CheckImages(ctx context.Context, dir string) error { // DetectQueueType detects which queue to use based on the preponderance // of files of a particular extension in a directory -func DetectQueueType(dir string, conn Queuer) string { +func DetectQueueType(dir string, conn Queuer, nowipe bool) string { + if nowipe { + return conn.PreNoWipeQueueId() + } pngdirs, _ := filepath.Glob(dir + "/*.png") jpgdirs, _ := filepath.Glob(dir + "/*.jpg") pngcount := len(pngdirs) diff --git a/local.go b/local.go index 615f9a6..24c7562 100644 --- a/local.go +++ b/local.go @@ -16,6 +16,7 @@ import ( ) const qidPre = "queuePre" +const qidPreNoWipe = "queuePreNoWipe" const qidWipe = "queueWipe" const qidOCR = "queueOCR" const qidAnalyse = "queueAnalyse" @@ -114,6 +115,10 @@ func (a *LocalConn) PreQueueId() string { return qidPre } +func (a *LocalConn) PreNoWipeQueueId() string { + return qidPreNoWipe +} + func (a *LocalConn) WipeQueueId() string { return qidWipe } -- cgit v1.2.1-24-ge1ad