summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick White <git@njw.name>2020-11-09 17:33:52 +0000
committerNick White <git@njw.name>2020-11-09 17:33:52 +0000
commitfc6becf5ed98e9c0815532fd76639c15eb481ed1 (patch)
tree0464ef25b3200333da2bb7d97eb4eac4fb2f7659
parent4c7cdeb5646e84af3f15d4a7cd48f64d8086a6b9 (diff)
[rescribe] work in progress at a self-contained local pipeline processor, called rescribe
-rw-r--r--cmd/rescribe/main.go247
-rw-r--r--internal/pipeline/pipeline.go (renamed from internal/pipeline/main.go)4
-rw-r--r--internal/pipeline/put.go85
3 files changed, 336 insertions, 0 deletions
diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go
new file mode 100644
index 0000000..e3781cb
--- /dev/null
+++ b/cmd/rescribe/main.go
@@ -0,0 +1,247 @@
+// Copyright 2019 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+// rescribe is a modification of bookpipeline designed for local-only
+// operation, which rolls uploading, processing, and downloading of
+// a single book by the pipeline into one command.
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "os"
+ "path/filepath"
+ "regexp"
+ "time"
+
+ "rescribe.xyz/bookpipeline"
+
+ "rescribe.xyz/bookpipeline/internal/pipeline"
+)
+
+const usage = `Usage: rescribe [-v] [-t training] bookdir
+
+Process and OCR a book using the Rescribe pipeline on a local machine.
+`
+
+const QueueTimeoutSecs = 2 * 60
+const PauseBetweenChecks = 1 * time.Second
+const LogSaveTime = 1 * time.Minute
+
+// null writer to enable non-verbose logging to be discarded
+type NullWriter bool
+
+func (w NullWriter) Write(p []byte) (n int, err error) {
+ return len(p), nil
+}
+
+type Clouder interface {
+ Init() error
+ ListObjects(bucket string, prefix string) ([]string, error)
+ Download(bucket string, key string, fn string) error
+ Upload(bucket string, key string, path string) error
+ CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
+ AddToQueue(url string, msg string) error
+ DelFromQueue(url string, handle string) error
+ QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)
+}
+
+type Pipeliner interface {
+ Clouder
+ PreQueueId() string
+ WipeQueueId() string
+ OCRPageQueueId() string
+ AnalyseQueueId() string
+ WIPStorageId() string
+ GetLogger() *log.Logger
+ Log(v ...interface{})
+}
+
+func stopTimer(t *time.Timer) {
+ if !t.Stop() {
+ <-t.C
+ }
+}
+
+func resetTimer(t *time.Timer, d time.Duration) {
+ if d > 0 {
+ t.Reset(d)
+ }
+}
+
+func main() {
+ verbose := flag.Bool("v", false, "verbose")
+ training := flag.String("t", "rescribealphav5", "default tesseract training file to use (without the .traineddata part)")
+
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(), usage)
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+
+ if flag.NArg() < 1 || flag.NArg() > 3 {
+ flag.Usage()
+ return
+ }
+
+ bookdir := flag.Arg(0)
+ var bookname string
+ if flag.NArg() > 2 {
+ bookname = flag.Arg(1)
+ } else {
+ bookname = filepath.Base(bookdir)
+ }
+
+ var verboselog *log.Logger
+ if *verbose {
+ verboselog = log.New(os.Stdout, "", 0)
+ } else {
+ var n NullWriter
+ verboselog = log.New(n, "", 0)
+ }
+
+ var conn Pipeliner
+ // TODO: set tmpdir to a specific random thing for this run only
+ conn = &bookpipeline.LocalConn{Logger: verboselog}
+
+ conn.Log("Setting up session")
+ err := conn.Init()
+ if err != nil {
+ log.Fatalln("Error setting up connection:", err)
+ }
+ conn.Log("Finished setting up session")
+
+ uploadbook(bookdir, bookname, *training, conn)
+
+ processbook(*training, conn)
+
+ // TODO: save book
+}
+
+func uploadbook(dir string, name string, training string, conn Pipeliner) error {
+ err := pipeline.CheckImages(dir)
+ if err != nil {
+ return fmt.Errorf("Error with images in %s: %v", dir, err)
+ }
+ err = pipeline.UploadImages(dir, name, conn)
+ if err != nil {
+ return fmt.Errorf("Error saving images to process from %s: %v", dir, err)
+ }
+
+ qid := pipeline.DetectQueueType(dir, conn)
+ if training != "" {
+ name = name + " " + training
+ }
+ err = conn.AddToQueue(qid, name)
+ if err != nil {
+ return fmt.Errorf("Error adding book job to queue %s: %v", qid, err)
+ }
+
+ return nil
+}
+
+
+func processbook(training string, conn Pipeliner) {
+ origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`)
+ wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`)
+ ocredPattern := regexp.MustCompile(`.hocr$`)
+
+ var checkPreQueue <-chan time.Time
+ var checkWipeQueue <-chan time.Time
+ var checkOCRPageQueue <-chan time.Time
+ var checkAnalyseQueue <-chan time.Time
+ var stopIfQuiet *time.Timer
+ checkPreQueue = time.After(0)
+ checkWipeQueue = time.After(0)
+ checkOCRPageQueue = time.After(0)
+ checkAnalyseQueue = time.After(0)
+ var quietTime = 1 * time.Second
+ stopIfQuiet = time.NewTimer(quietTime)
+ if quietTime == 0 {
+ stopIfQuiet.Stop()
+ }
+
+ for {
+ select {
+ case <-checkPreQueue:
+ msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs)
+ checkPreQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ conn.Log("Error checking preprocess queue", err)
+ continue
+ }
+ if msg.Handle == "" {
+ conn.Log("No message received on preprocess queue, sleeping")
+ continue
+ }
+ conn.Log("Message received on preprocess queue, processing", msg.Body)
+ stopTimer(stopIfQuiet)
+ err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
+ resetTimer(stopIfQuiet, quietTime)
+ if err != nil {
+ conn.Log("Error during preprocess", err)
+ }
+ case <-checkWipeQueue:
+ msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs)
+ checkWipeQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ conn.Log("Error checking wipeonly queue", err)
+ continue
+ }
+ if msg.Handle == "" {
+ conn.Log("No message received on wipeonly queue, sleeping")
+ continue
+ }
+ stopTimer(stopIfQuiet)
+ conn.Log("Message received on wipeonly queue, processing", msg.Body)
+ err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
+ resetTimer(stopIfQuiet, quietTime)
+ if err != nil {
+ conn.Log("Error during wipe", err)
+ }
+ case <-checkOCRPageQueue:
+ msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs)
+ checkOCRPageQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ conn.Log("Error checking OCR Page queue", err)
+ continue
+ }
+ if msg.Handle == "" {
+ continue
+ }
+ // Have OCRPageQueue checked immediately after completion, as chances are high that
+ // there will be more pages that should be done without delay
+ checkOCRPageQueue = time.After(0)
+ stopTimer(stopIfQuiet)
+ conn.Log("Message received on OCR Page queue, processing", msg.Body)
+ err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training), conn.OCRPageQueueId(), conn.AnalyseQueueId())
+ resetTimer(stopIfQuiet, quietTime)
+ if err != nil {
+ conn.Log("Error during OCR Page process", err)
+ }
+ case <-checkAnalyseQueue:
+ msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs)
+ checkAnalyseQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ conn.Log("Error checking analyse queue", err)
+ continue
+ }
+ if msg.Handle == "" {
+ conn.Log("No message received on analyse queue, sleeping")
+ continue
+ }
+ stopTimer(stopIfQuiet)
+ conn.Log("Message received on analyse queue, processing", msg.Body)
+ err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "")
+ resetTimer(stopIfQuiet, quietTime)
+ if err != nil {
+ conn.Log("Error during analysis", err)
+ }
+ case <-stopIfQuiet.C:
+ conn.Log("Processing finished")
+ return
+ }
+ }
+}
diff --git a/internal/pipeline/main.go b/internal/pipeline/pipeline.go
index 6e03c50..b1c3cb9 100644
--- a/internal/pipeline/main.go
+++ b/internal/pipeline/pipeline.go
@@ -1,3 +1,7 @@
+// Copyright 2020 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
// pipeline is a package used by the bookpipeline command, which
// handles the core functionality, using channels heavily to
// coordinate jobs. Note that it is considered an "internal" package,
diff --git a/internal/pipeline/put.go b/internal/pipeline/put.go
new file mode 100644
index 0000000..8ada41f
--- /dev/null
+++ b/internal/pipeline/put.go
@@ -0,0 +1,85 @@
+// Copyright 2020 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+package pipeline
+
+import (
+ "fmt"
+ "image"
+ _ "image/png"
+ _ "image/jpeg"
+ "os"
+ "path/filepath"
+)
+
+// null writer to enable non-verbose logging to be discarded
+type NullWriter bool
+
+func (w NullWriter) Write(p []byte) (n int, err error) {
+ return len(p), nil
+}
+
+type fileWalk chan string
+
+func (f fileWalk) Walk(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if !info.IsDir() {
+ f <- path
+ }
+ return nil
+}
+
+func CheckImages(dir string) error {
+ checker := make(fileWalk)
+ go func() {
+ _ = filepath.Walk(dir, checker.Walk)
+ close(checker)
+ }()
+
+ for path := range checker {
+ f, err := os.Open(path)
+ if err != nil {
+ return fmt.Errorf("Opening image %s failed: %v", path, err)
+ }
+ _, _, err = image.Decode(f)
+ if err != nil {
+ return fmt.Errorf("Decoding image %s failed: %v", path, err)
+ }
+ }
+
+ return nil
+}
+
+func DetectQueueType(dir string, conn Pipeliner) string {
+ // Auto detect type of queue to send to based on file extension
+ pngdirs, _ := filepath.Glob(dir + "/*.png")
+ jpgdirs, _ := filepath.Glob(dir + "/*.jpg")
+ pngcount := len(pngdirs)
+ jpgcount := len(jpgdirs)
+ if pngcount > jpgcount {
+ return conn.WipeQueueId()
+ } else {
+ return conn.PreQueueId()
+ }
+}
+
+func UploadImages(dir string, bookname string, conn Pipeliner) error {
+ walker := make(fileWalk)
+ go func() {
+ _ = filepath.Walk(dir, walker.Walk)
+ close(walker)
+ }()
+
+ for path := range walker {
+ name := filepath.Base(path)
+ err := conn.Upload(conn.WIPStorageId(), filepath.Join(bookname, name), path)
+ if err != nil {
+ return fmt.Errorf("Failed to upload %s: %v", path, err)
+ }
+ }
+
+ return nil
+}