summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick White <git@njw.name>2021-02-05 17:15:51 +0000
committerNick White <git@njw.name>2021-02-05 17:15:51 +0000
commit11470933e4fd379b4aefa4e2bab33662a72791c2 (patch)
tree8607e7739989ff63032b9ce10a8bf8553ecc6eb6
parent3e7da751b3ca917adb79674eac4ef2a3267e3984 (diff)
parenta8c7481f0dc02bbda3b3a07091e9d61f6eb728b2 (diff)
Merge branch 'master' of ssh://ssh.phx.nearlyfreespeech.net/home/public/bookpipeline
-rw-r--r--README19
-rw-r--r--aws.go112
-rw-r--r--cmd/bookpipeline/main.go765
-rw-r--r--cmd/booktopipeline/main.go92
-rw-r--r--cmd/getbests/main.go4
-rw-r--r--cmd/getpipelinebook/main.go98
-rw-r--r--cmd/logwholequeue/main.go85
-rw-r--r--cmd/lspipeline/main.go82
-rw-r--r--cmd/postprocess-bythresh/main.go71
-rw-r--r--cmd/rescribe/main.go395
-rw-r--r--cmd/rmbook/main.go87
-rw-r--r--cmd/trimqueue/main.go84
-rw-r--r--doc.go14
-rw-r--r--go.mod12
-rw-r--r--go.sum49
-rw-r--r--internal/pipeline/get.go96
-rw-r--r--internal/pipeline/pipeline.go735
-rw-r--r--internal/pipeline/put.go85
-rw-r--r--local.go22
-rw-r--r--makefile12
-rw-r--r--pdf.go2
21 files changed, 1952 insertions, 969 deletions
diff --git a/README b/README
index 6829c1c..0d2119b 100644
--- a/README
+++ b/README
@@ -9,6 +9,11 @@ by running `go get rescribe.xyz/bookpipeline/...` and documentation
can be read with the `go doc` command or online at
<https://pkg.go.dev/rescribe.xyz/bookpipeline>.
+If you just want to install and use the commands, you can get the
+package with `git clone https://git.rescribe.xyz/bookpipeline`, and
+then install them with `go install ./...` from within the
+`bookpipeline` directory.
+
## Commands
The commands in the cmd/ directory are at the heart of this
@@ -41,6 +46,20 @@ setting:
- pdfbook : creates a searchable PDF from a directory of hOCR
and image files
+## Rescribe tool for local operation
+
+While bookpipeline was built with cloud based operation in mind, there is also
+a local mode that can be used to run OCR jobs from a single computer, with all
+the benefits of preprocessing, choosing the best threshold for each image,
+graph creation, PDF creation, and so on that the pipeline provides.
+
+Several of the commands accept a `-c local` flag for local operation, but now
+there is also a new command, named `rescribe`, that is designed to make things
+much simpler for people just wanting to do some OCR on their local computer.
+
+More information about this, including links to prebuilt executables, can be
+found on our blog at <https://blog.rescribe.xyz/posts/desktop-tool/>.
+
## Contributions
Any and all comments, bug reports, patches or pull requests would
diff --git a/aws.go b/aws.go
index 5ebc79f..65671fa 100644
--- a/aws.go
+++ b/aws.go
@@ -9,6 +9,7 @@ import (
"fmt"
"log"
"os"
+ "strings"
"time"
"github.com/aws/aws-sdk-go/aws"
@@ -178,6 +179,63 @@ func (a *AwsConn) LogAndPurgeQueue(url string) error {
return nil
}
+// LogQueue prints the body of all messages in a queue to the log
+func (a *AwsConn) LogQueue(url string) error {
+ for {
+ msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
+ MaxNumberOfMessages: aws.Int64(10),
+ VisibilityTimeout: aws.Int64(300),
+ QueueUrl: &url,
+ })
+ if err != nil {
+ return err
+ }
+
+ if len(msgResult.Messages) > 0 {
+ for _, m := range msgResult.Messages {
+ a.Logger.Println(*m.Body)
+ }
+ } else {
+ break
+ }
+ }
+ return nil
+}
+
+// RemovePrefixesFromQueue removes any messages in a queue whose
+// body starts with the specified prefix.
+func (a *AwsConn) RemovePrefixesFromQueue(url string, prefix string) error {
+ for {
+ msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
+ MaxNumberOfMessages: aws.Int64(10),
+ VisibilityTimeout: aws.Int64(300),
+ QueueUrl: &url,
+ })
+ if err != nil {
+ return err
+ }
+
+ if len(msgResult.Messages) > 0 {
+ for _, m := range msgResult.Messages {
+ if !strings.HasPrefix(*m.Body, prefix) {
+ continue
+ }
+ a.Logger.Printf("Removing %s from queue\n", *m.Body)
+ _, err = a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{
+ QueueUrl: &url,
+ ReceiptHandle: m.ReceiptHandle,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ } else {
+ break
+ }
+ }
+ return nil
+}
+
// QueueHeartbeat updates the visibility timeout of a message. This
// ensures that the message remains "in flight", meaning that it
// cannot be seen by other processes, but if this process fails the
@@ -306,12 +364,31 @@ func (a *AwsConn) ListObjectsWithMeta(bucket string, prefix string) ([]ObjMeta,
return objs, err
}
+// ListObjectWithMeta lists the name and last modified date of the
+// first object with the specified prefix.
+func (a *AwsConn) ListObjectWithMeta(bucket string, prefix string) (ObjMeta, error) {
+ var obj ObjMeta
+ err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{
+ Bucket: aws.String(bucket),
+ Prefix: aws.String(prefix),
+ MaxKeys: aws.Int64(1),
+ }, func(page *s3.ListObjectsV2Output, last bool) bool {
+ for _, r := range page.Contents {
+ obj = ObjMeta{Name: *r.Key, Date: *r.LastModified}
+ }
+ return false
+ })
+ if obj.Name == "" && obj.Date.IsZero() && err == nil {
+ return obj, fmt.Errorf("No object could be found for %s", prefix)
+ }
+ return obj, err
+}
+
func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) {
var prefixes []string
err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Delimiter: aws.String("/"),
- MaxKeys: aws.Int64(1),
}, func(page *s3.ListObjectsV2Output, last bool) bool {
for _, r := range page.CommonPrefixes {
prefixes = append(prefixes, *r.Prefix)
@@ -321,6 +398,39 @@ func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) {
return prefixes, err
}
+// Deletes a list of objects
+func (a *AwsConn) DeleteObjects(bucket string, keys []string) error {
+ objs := []*s3.ObjectIdentifier{}
+ for i, v := range keys {
+ o := s3.ObjectIdentifier{Key: aws.String(v)}
+ objs = append(objs, &o)
+ // s3.DeleteObjects can only take up to 1000 keys at a time,
+ // so if necessary delete those collected so far and empty
+ // the objs queue
+ if i % 1000 == 1 {
+ _, err := a.s3svc.DeleteObjects(&s3.DeleteObjectsInput{
+ Bucket: aws.String(bucket),
+ Delete: &s3.Delete{
+ Objects: objs,
+ Quiet: aws.Bool(true),
+ },
+ })
+ if err != nil {
+ return err
+ }
+ objs = []*s3.ObjectIdentifier{}
+ }
+ }
+ _, err := a.s3svc.DeleteObjects(&s3.DeleteObjectsInput{
+ Bucket: aws.String(bucket),
+ Delete: &s3.Delete{
+ Objects: objs,
+ Quiet: aws.Bool(true),
+ },
+ })
+ return err
+}
+
// CreateBucket creates a new S3 bucket
func (a *AwsConn) CreateBucket(name string) error {
_, err := a.s3svc.CreateBucket(&s3.CreateBucketInput{
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go
index 36295a6..909b431 100644
--- a/cmd/bookpipeline/main.go
+++ b/cmd/bookpipeline/main.go
@@ -11,23 +11,18 @@ import (
"bytes"
"flag"
"fmt"
- "io/ioutil"
"log"
- "net/smtp"
"os"
"os/exec"
- "path/filepath"
"regexp"
- "sort"
- "strings"
"time"
"rescribe.xyz/bookpipeline"
- "rescribe.xyz/preproc"
- "rescribe.xyz/utils/pkg/hocr"
+
+ "rescribe.xyz/bookpipeline/internal/pipeline"
)
-const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false]
+const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] [-autostop secs]
Watches the preprocess, wipeonly, ocrpage and analyse queues for messages.
When one is found this general process is followed:
@@ -47,10 +42,9 @@ this put a text file in {UserConfigDir}/bookpipeline/mailsettings with
the contents: {smtpserver} {port} {username} {password} {from} {to}
`
+const QueueTimeoutSecs = 2 * 60
const PauseBetweenChecks = 3 * time.Minute
-const TimeBeforeShutdown = 5 * time.Minute
const LogSaveTime = 1 * time.Minute
-const HeartbeatSeconds = 60
// null writer to enable non-verbose logging to be discarded
type NullWriter bool
@@ -81,686 +75,16 @@ type Pipeliner interface {
Log(v ...interface{})
}
-type pageimg struct {
- hocr, img string
-}
-
-type mailSettings struct {
- server, port, user, pass, from, to string
-}
-
-func getMailSettings() (mailSettings, error) {
- p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings")
- b, err := ioutil.ReadFile(p)
- if err != nil {
- return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err)
- }
- f := strings.Fields(string(b))
- if len(f) != 6 {
- return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f))
- }
- return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil
-}
-
-func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) {
- for key := range dl {
- fn := filepath.Join(dir, filepath.Base(key))
- logger.Println("Downloading", key)
- err := conn.Download(conn.WIPStorageId(), key, fn)
- if err != nil {
- for range dl {
- } // consume the rest of the receiving channel so it isn't blocked
- close(process)
- errc <- err
- return
- }
- process <- fn
- }
- close(process)
-}
-
-func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) {
- for path := range c {
- name := filepath.Base(path)
- key := bookname + "/" + name
- logger.Println("Uploading", key)
- err := conn.Upload(conn.WIPStorageId(), key, path)
- if err != nil {
- for range c {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- err = os.Remove(path)
- if err != nil {
- for range c {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- }
-
- done <- true
-}
-
-func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) {
- for path := range c {
- name := filepath.Base(path)
- key := bookname + "/" + name
- logger.Println("Uploading", key)
- err := conn.Upload(conn.WIPStorageId(), key, path)
- if err != nil {
- for range c {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- err = os.Remove(path)
- if err != nil {
- for range c {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- logger.Println("Adding", key, training, "to queue", toQueue)
- err = conn.AddToQueue(toQueue, key+" "+training)
- if err != nil {
- for range c {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- }
-
- done <- true
-}
-
-func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) {
- for path := range pre {
- logger.Println("Preprocessing", path)
- done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30)
- if err != nil {
- for range pre {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- _ = os.Remove(path)
- for _, p := range done {
- up <- p
- }
- }
- close(up)
-}
-
-func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) {
- for path := range towipe {
- logger.Println("Wiping", path)
- s := strings.Split(path, ".")
- base := strings.Join(s[:len(s)-1], "")
- outpath := base + "_bin0.0.png"
- err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30)
- if err != nil {
- for range towipe {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- up <- outpath
- }
- close(up)
-}
-
-func ocr(training string) func(chan string, chan string, chan error, *log.Logger) {
- return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) {
- for path := range toocr {
- logger.Println("OCRing", path)
- name := strings.Replace(path, ".png", "", 1)
- cmd := exec.Command("tesseract", "-l", training, path, name, "hocr")
- var stdout, stderr bytes.Buffer
- cmd.Stdout = &stdout
- cmd.Stderr = &stderr
- err := cmd.Run()
- if err != nil {
- for range toocr {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String())
- return
- }
- up <- name + ".hocr"
- }
- close(up)
- }
-}
-
-func analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) {
- return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) {
- confs := make(map[string][]*bookpipeline.Conf)
- bestconfs := make(map[string]*bookpipeline.Conf)
- savedir := ""
-
- for path := range toanalyse {
- if savedir == "" {
- savedir = filepath.Dir(path)
- }
- logger.Println("Calculating confidence for", path)
- avg, err := hocr.GetAvgConf(path)
- if err != nil && err.Error() == "No words found" {
- continue
- }
- if err != nil {
- for range toanalyse {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err)
- return
- }
- base := filepath.Base(path)
- codestart := strings.Index(base, "_bin")
- name := base[0:codestart]
- var c bookpipeline.Conf
- c.Path = path
- c.Code = base[codestart:]
- c.Conf = avg
- confs[name] = append(confs[name], &c)
- }
-
- fn := filepath.Join(savedir, "conf")
- logger.Println("Saving confidences in file", fn)
- f, err := os.Create(fn)
- if err != nil {
- errc <- fmt.Errorf("Error creating file %s: %s", fn, err)
- return
- }
- defer f.Close()
-
- logger.Println("Finding best confidence for each page, and saving all confidences")
- for base, conf := range confs {
- var best float64
- for _, c := range conf {
- if c.Conf > best {
- best = c.Conf
- bestconfs[base] = c
- }
- _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf)
- if err != nil {
- errc <- fmt.Errorf("Error writing confidences file: %s", err)
- return
- }
- }
- }
- up <- fn
-
- logger.Println("Creating best file listing the best file for each page")
- fn = filepath.Join(savedir, "best")
- f, err = os.Create(fn)
- if err != nil {
- errc <- fmt.Errorf("Error creating file %s: %s", fn, err)
- return
- }
- defer f.Close()
- for _, conf := range bestconfs {
- _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path))
- }
- up <- fn
-
- var pgs []string
- for _, conf := range bestconfs {
- pgs = append(pgs, conf.Path)
- }
- sort.Strings(pgs)
-
- logger.Println("Downloading binarised and original images to create PDFs")
- bookname, err := filepath.Rel(os.TempDir(), savedir)
- if err != nil {
- errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err)
- return
- }
- colourpdf := new(bookpipeline.Fpdf)
- err = colourpdf.Setup()
- if err != nil {
- errc <- fmt.Errorf("Failed to set up PDF: %s", err)
- return
- }
- binarisedpdf := new(bookpipeline.Fpdf)
- err = binarisedpdf.Setup()
- if err != nil {
- errc <- fmt.Errorf("Failed to set up PDF: %s", err)
- return
- }
- binhascontent, colourhascontent := false, false
-
- var colourimgs, binimgs []pageimg
-
- for _, pg := range pgs {
- base := filepath.Base(pg)
- nosuffix := strings.TrimSuffix(base, ".hocr")
- p := strings.SplitN(base, "_bin", 2)
-
- var fn string
- if len(p) > 1 {
- fn = p[0] + ".jpg"
- } else {
- fn = nosuffix + ".jpg"
- }
-
- binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"})
- colourimgs = append(colourimgs, pageimg{hocr: base, img: fn})
- }
-
- for _, pg := range binimgs {
- logger.Println("Downloading binarised page to add to PDF", pg.img)
- err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img))
- if err != nil {
- logger.Println("Download failed; skipping page", pg.img)
- } else {
- err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true)
- if err != nil {
- errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err)
- return
- }
- binhascontent = true
- err = os.Remove(filepath.Join(savedir, pg.img))
- if err != nil {
- errc <- err
- return
- }
- }
- }
-
- if binhascontent {
- fn = filepath.Join(savedir, bookname+".binarised.pdf")
- err = binarisedpdf.Save(fn)
- if err != nil {
- errc <- fmt.Errorf("Failed to save binarised pdf: %s", err)
- return
- }
- up <- fn
- key := bookname + "/" + bookname + ".binarised.pdf"
- conn.Log("Uploading", key)
- err := conn.Upload(conn.WIPStorageId(), key, fn)
- if err != nil {
- }
- }
-
- for _, pg := range colourimgs {
- logger.Println("Downloading colour page to add to PDF", pg.img)
- colourfn := pg.img
- err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn))
- if err != nil {
- colourfn = strings.Replace(pg.img, ".jpg", ".png", 1)
- logger.Println("Download failed; trying", colourfn)
- err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn))
- if err != nil {
- logger.Println("Download failed; skipping page", pg.img)
- }
- }
- if err == nil {
- err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true)
- if err != nil {
- errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err)
- return
- }
- colourhascontent = true
- err = os.Remove(filepath.Join(savedir, colourfn))
- if err != nil {
- errc <- err
- return
- }
- }
- }
- if colourhascontent {
- fn = filepath.Join(savedir, bookname+".colour.pdf")
- err = colourpdf.Save(fn)
- if err != nil {
- errc <- fmt.Errorf("Failed to save colour pdf: %s", err)
- return
- }
- up <- fn
- }
-
- logger.Println("Creating graph")
- fn = filepath.Join(savedir, "graph.png")
- f, err = os.Create(fn)
- if err != nil {
- errc <- fmt.Errorf("Error creating file %s: %s", fn, err)
- return
- }
- defer f.Close()
- err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f)
- if err != nil && err.Error() != "Not enough valid confidences" {
- errc <- fmt.Errorf("Error rendering graph: %s", err)
- return
- }
- up <- fn
-
- close(up)
- }
-}
-
-func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) {
- currentmsg := msg
- for range t.C {
- m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2)
- if err != nil {
- // This is for better debugging of the heartbeat issue
- conn.Log("Error with heartbeat", err)
- os.Exit(1)
- // TODO: would be better to ensure this error stops any running
- // processes, as they will ultimately fail in the case of
- // it. could do this by setting a global variable that
- // processes check each time they loop.
- errc <- err
- t.Stop()
- return
- }
- if m.Id != "" {
- conn.Log("Replaced message handle as visibilitytimeout limit was reached")
- currentmsg = m
- // TODO: maybe handle communicating new msg more gracefully than this
- for range msgc {
- } // throw away any old msgc
- msgc <- m
- }
- }
-}
-
-// allOCRed checks whether all pages of a book have been OCRed.
-// This is determined by whether every _bin0.?.png file has a
-// corresponding .hocr file.
-func allOCRed(bookname string, conn Pipeliner) bool {
- objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
- if err != nil {
- return false
- }
-
- preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`)
-
- atleastone := false
- for _, png := range objs {
- if preprocessedPattern.MatchString(png) {
- atleastone = true
- found := false
- b := strings.TrimSuffix(filepath.Base(png), ".png")
- hocrname := bookname + "/" + b + ".hocr"
- for _, hocr := range objs {
- if hocr == hocrname {
- found = true
- break
- }
- }
- if found == false {
- return false
- }
- }
- }
- if atleastone == false {
- return false
- }
- return true
-}
-
-// ocrPage OCRs a page based on a message. It may make sense to
-// roll this back into processBook (on which it is based) once
-// working well.
-func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error {
- dl := make(chan string)
- msgc := make(chan bookpipeline.Qmsg)
- processc := make(chan string)
- upc := make(chan string)
- done := make(chan bool)
- errc := make(chan error)
-
- msgparts := strings.Split(msg.Body, " ")
- bookname := filepath.Dir(msgparts[0])
- if len(msgparts) > 1 && msgparts[1] != "" {
- process = ocr(msgparts[1])
- }
-
- d := filepath.Join(os.TempDir(), bookname)
- err := os.MkdirAll(d, 0755)
- if err != nil {
- return fmt.Errorf("Failed to create directory %s: %s", d, err)
- }
-
- t := time.NewTicker(HeartbeatSeconds * time.Second)
- go heartbeat(conn, t, msg, fromQueue, msgc, errc)
-
- // these functions will do their jobs when their channels have data
- go download(dl, processc, conn, d, errc, conn.GetLogger())
- go process(processc, upc, errc, conn.GetLogger())
- go up(upc, done, conn, bookname, errc, conn.GetLogger())
-
- dl <- msgparts[0]
- close(dl)
-
- // wait for either the done or errc channel to be sent to
- select {
- case err = <-errc:
- t.Stop()
- _ = os.RemoveAll(d)
- return err
- case <-done:
- }
-
- if allOCRed(bookname, conn) && toQueue != "" {
- conn.Log("Sending", bookname, "to queue", toQueue)
- err = conn.AddToQueue(toQueue, bookname)
- if err != nil {
- t.Stop()
- _ = os.RemoveAll(d)
- return fmt.Errorf("Error adding to queue %s: %s", bookname, err)
- }
- }
-
- t.Stop()
-
- // check whether we're using a newer msg handle
- select {
- case m, ok := <-msgc:
- if ok {
- msg = m
- conn.Log("Using new message handle to delete message from queue")
- }
- default:
- conn.Log("Using original message handle to delete message from queue")
- }
-
- conn.Log("Deleting original message from queue", fromQueue)
- err = conn.DelFromQueue(fromQueue, msg.Handle)
- if err != nil {
- _ = os.RemoveAll(d)
- return fmt.Errorf("Error deleting message from queue: %s", err)
- }
-
- err = os.RemoveAll(d)
- if err != nil {
- return fmt.Errorf("Failed to remove directory %s: %s", d, err)
- }
-
- return nil
-}
-
-func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
- dl := make(chan string)
- msgc := make(chan bookpipeline.Qmsg)
- processc := make(chan string)
- upc := make(chan string)
- done := make(chan bool)
- errc := make(chan error)
-
- msgparts := strings.Split(msg.Body, " ")
- bookname := msgparts[0]
-
- var training string
- if len(msgparts) > 1 {
- training = msgparts[1]
- }
-
- d := filepath.Join(os.TempDir(), bookname)
- err := os.MkdirAll(d, 0755)
- if err != nil {
- return fmt.Errorf("Failed to create directory %s: %s", d, err)
- }
-
- t := time.NewTicker(HeartbeatSeconds * time.Second)
- go heartbeat(conn, t, msg, fromQueue, msgc, errc)
-
- // these functions will do their jobs when their channels have data
- go download(dl, processc, conn, d, errc, conn.GetLogger())
- go process(processc, upc, errc, conn.GetLogger())
- if toQueue == conn.OCRPageQueueId() {
- go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger())
- } else {
- go up(upc, done, conn, bookname, errc, conn.GetLogger())
- }
-
- conn.Log("Getting list of objects to download")
- objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
- if err != nil {
- t.Stop()
- _ = os.RemoveAll(d)
- return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err)
- }
- var todl []string
- for _, n := range objs {
- if !match.MatchString(n) {
- conn.Log("Skipping item that doesn't match target", n)
- continue
- }
- todl = append(todl, n)
- }
- for _, a := range todl {
- dl <- a
- }
- close(dl)
-
- // wait for either the done or errc channel to be sent to
- select {
- case err = <-errc:
- t.Stop()
- _ = os.RemoveAll(d)
- // if the error is in preprocessing / wipeonly, chances are that it will never
- // complete, and will fill the ocrpage queue with parts which succeeded
- // on each run, so in that case it's better to delete the message from
- // the queue and notify us.
- if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() {
- conn.Log("Deleting message from queue due to a bad error", fromQueue)
- err2 := conn.DelFromQueue(fromQueue, msg.Handle)
- if err2 != nil {
- conn.Log("Error deleting message from queue", err2)
- }
- ms, err2 := getMailSettings()
- if err2 != nil {
- conn.Log("Failed to mail settings ", err2)
- }
- if err2 == nil && ms.server != "" {
- logs, err2 := getlogs()
- if err2 != nil {
- conn.Log("Failed to get logs ", err2)
- logs = ""
- }
- msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" +
- "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" +
- " Fail message: %s\r\nFull log:\r\n%s\r\n",
- ms.to, ms.from, bookname, err, logs)
- host := fmt.Sprintf("%s:%s", ms.server, ms.port)
- auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server)
- err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg))
- if err2 != nil {
- conn.Log("Error sending email ", err2)
- }
- }
- }
- return err
- case <-done:
- }
-
- if toQueue != "" && toQueue != conn.OCRPageQueueId() {
- conn.Log("Sending", bookname, "to queue", toQueue)
- err = conn.AddToQueue(toQueue, bookname)
- if err != nil {
- t.Stop()
- _ = os.RemoveAll(d)
- return fmt.Errorf("Error adding to queue %s: %s", bookname, err)
- }
- }
-
- t.Stop()
-
- // check whether we're using a newer msg handle
- select {
- case m, ok := <-msgc:
- if ok {
- msg = m
- conn.Log("Using new message handle to delete message from queue")
- }
- default:
- conn.Log("Using original message handle to delete message from queue")
- }
-
- conn.Log("Deleting original message from queue", fromQueue)
- err = conn.DelFromQueue(fromQueue, msg.Handle)
- if err != nil {
- _ = os.RemoveAll(d)
- return fmt.Errorf("Error deleting message from queue: %s", err)
- }
-
- err = os.RemoveAll(d)
- if err != nil {
- return fmt.Errorf("Failed to remove directory %s: %s", d, err)
- }
-
- return nil
-}
-
func stopTimer(t *time.Timer) {
if !t.Stop() {
<-t.C
}
}
-// TODO: rather than relying on journald, would be nicer to save the logs
-// ourselves maybe, so that we weren't relying on a particular systemd
-// setup. this can be done by having the conn.Log also append line
-// to a file (though that would mean everything would have to go through
-// conn.Log, which we're not consistently doing yet). the correct thing
-// to do then would be to implement a new interface that covers the part
-// of log.Logger we use (e.g. Print and Printf), and then have an exported
-// conn struct that implements those, so that we could pass a log.Logger
-// or the new conn struct everywhere (we wouldn't be passing a log.Logger,
-// it's just good to be able to keep the compatibility)
-func getlogs() (string, error) {
- cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all")
- var stdout, stderr bytes.Buffer
- cmd.Stdout = &stdout
- cmd.Stderr = &stderr
- err := cmd.Run()
- return stdout.String(), err
-}
-
-func savelogs(conn Pipeliner, starttime int64, hostname string) error {
- logs, err := getlogs()
- if err != nil {
- return fmt.Errorf("Error getting logs, error: %v", err)
+func resetTimer(t *time.Timer, d time.Duration) {
+ if d > 0 {
+ t.Reset(d)
}
- key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname)
- path := filepath.Join(os.TempDir(), key)
- f, err := os.Create(path)
- if err != nil {
- return fmt.Errorf("Error creating log file", err)
- }
- defer f.Close()
- _, err = f.WriteString(logs)
- if err != nil {
- return fmt.Errorf("Error saving log file", err)
- }
- _ = f.Close()
- err = conn.Upload(conn.WIPStorageId(), key, path)
- if err != nil {
- return fmt.Errorf("Error uploading log", err)
- }
- conn.Log("Log saved to", key)
- return nil
}
func main() {
@@ -770,7 +94,8 @@ func main() {
nowipe := flag.Bool("nw", false, "disable wipeonly")
noocrpg := flag.Bool("nop", false, "disable ocr on individual pages")
noanalyse := flag.Bool("na", false, "disable analysis")
- autoshutdown := flag.Bool("shutdown", false, "automatically shut down if no work has been available for 5 minutes")
+ autostop := flag.Int64("autostop", 300, "automatically stop process if no work has been available for this number of seconds (to disable autostop set to 0)")
+ autoshutdown := flag.Bool("shutdown", false, "automatically shut down host computer if there has been no work to do for the duration set with -autostop")
conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")
flag.Usage = func() {
@@ -801,17 +126,20 @@ func main() {
log.Fatalln("Unknown connection type")
}
- _, err := getMailSettings()
- if err != nil {
- conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err)
+ var err error
+ if *conntype != "local" {
+ _, err = pipeline.GetMailSettings()
+ if err != nil {
+ conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err)
+ }
}
- conn.Log("Setting up AWS session")
+ conn.Log("Setting up session")
err = conn.Init()
if err != nil {
- log.Fatalln("Error setting up cloud connection:", err)
+ log.Fatalln("Error setting up connection:", err)
}
- conn.Log("Finished setting up AWS session")
+ conn.Log("Finished setting up session")
starttime := time.Now().Unix()
hostname, err := os.Hostname()
@@ -820,7 +148,7 @@ func main() {
var checkWipeQueue <-chan time.Time
var checkOCRPageQueue <-chan time.Time
var checkAnalyseQueue <-chan time.Time
- var shutdownIfQuiet *time.Timer
+ var stopIfQuiet *time.Timer
var savelognow *time.Ticker
if !*nopreproc {
checkPreQueue = time.After(0)
@@ -834,13 +162,21 @@ func main() {
if !*noanalyse {
checkAnalyseQueue = time.After(0)
}
- shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown)
+ var quietTime = time.Duration(*autostop) * time.Second
+ stopIfQuiet = time.NewTimer(quietTime)
+ if quietTime == 0 {
+ stopIfQuiet.Stop()
+ }
+
savelognow = time.NewTicker(LogSaveTime)
+ if *conntype == "local" {
+ savelognow.Stop()
+ }
for {
select {
case <-checkPreQueue:
- msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatSeconds*2)
+ msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs)
checkPreQueue = time.After(PauseBetweenChecks)
if err != nil {
conn.Log("Error checking preprocess queue", err)
@@ -851,14 +187,14 @@ func main() {
continue
}
conn.Log("Message received on preprocess queue, processing", msg.Body)
- stopTimer(shutdownIfQuiet)
- err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
- shutdownIfQuiet.Reset(TimeBeforeShutdown)
+ stopTimer(stopIfQuiet)
+ err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
+ resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during preprocess", err)
}
case <-checkWipeQueue:
- msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatSeconds*2)
+ msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs)
checkWipeQueue = time.After(PauseBetweenChecks)
if err != nil {
conn.Log("Error checking wipeonly queue", err)
@@ -868,15 +204,15 @@ func main() {
conn.Log("No message received on wipeonly queue, sleeping")
continue
}
- stopTimer(shutdownIfQuiet)
+ stopTimer(stopIfQuiet)
conn.Log("Message received on wipeonly queue, processing", msg.Body)
- err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
- shutdownIfQuiet.Reset(TimeBeforeShutdown)
+ err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
+ resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during wipe", err)
}
case <-checkOCRPageQueue:
- msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatSeconds*2)
+ msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs)
checkOCRPageQueue = time.After(PauseBetweenChecks)
if err != nil {
conn.Log("Error checking OCR Page queue", err)
@@ -888,15 +224,15 @@ func main() {
// Have OCRPageQueue checked immediately after completion, as chances are high that
// there will be more pages that should be done without delay
checkOCRPageQueue = time.After(0)
- stopTimer(shutdownIfQuiet)
+ stopTimer(stopIfQuiet)
conn.Log("Message received on OCR Page queue, processing", msg.Body)
- err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId())
- shutdownIfQuiet.Reset(TimeBeforeShutdown)
+ err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training, ""), conn.OCRPageQueueId(), conn.AnalyseQueueId())
+ resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during OCR Page process", err)
}
case <-checkAnalyseQueue:
- msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatSeconds*2)
+ msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs)
checkAnalyseQueue = time.After(PauseBetweenChecks)
if err != nil {
conn.Log("Error checking analyse queue", err)
@@ -906,25 +242,30 @@ func main() {
conn.Log("No message received on analyse queue, sleeping")
continue
}
- stopTimer(shutdownIfQuiet)
+ stopTimer(stopIfQuiet)
conn.Log("Message received on analyse queue, processing", msg.Body)
- err = processBook(msg, conn, analyse(conn), ocredPattern, conn.AnalyseQueueId(), "")
- shutdownIfQuiet.Reset(TimeBeforeShutdown)
+ err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "")
+ resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during analysis", err)
}
case <-savelognow.C:
conn.Log("Saving logs")
- err = savelogs(conn, starttime, hostname)
+ err = pipeline.SaveLogs(conn, starttime, hostname)
if err != nil {
conn.Log("Error saving logs", err)
}
- case <-shutdownIfQuiet.C:
- if !*autoshutdown {
+ case <-stopIfQuiet.C:
+ if quietTime == 0 {
continue
}
+ if !*autoshutdown {
+ conn.Log("Stopping pipeline")
+ _ = pipeline.SaveLogs(conn, starttime, hostname)
+ return
+ }
conn.Log("Shutting down")
- _ = savelogs(conn, starttime, hostname)
+ _ = pipeline.SaveLogs(conn, starttime, hostname)
cmd := exec.Command("sudo", "systemctl", "poweroff")
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go
index 60d1f81..b4f4d99 100644
--- a/cmd/booktopipeline/main.go
+++ b/cmd/booktopipeline/main.go
@@ -9,14 +9,13 @@ package main
import (
"flag"
"fmt"
- "image"
- _ "image/png"
- _ "image/jpeg"
"log"
"os"
"path/filepath"
"rescribe.xyz/bookpipeline"
+
+ "rescribe.xyz/bookpipeline/internal/pipeline"
)
const usage = `Usage: booktopipeline [-c conn] [-t training] [-prebinarised] [-notbinarised] [-v] bookdir [bookname]
@@ -32,15 +31,6 @@ using the flags -prebinarised (for the wipeonly queue) or
If bookname is omitted the last part of the bookdir is used.
`
-type Pipeliner interface {
- Init() error
- PreQueueId() string
- WipeQueueId() string
- WIPStorageId() string
- AddToQueue(url string, msg string) error
- Upload(bucket string, key string, path string) error
-}
-
// null writer to enable non-verbose logging to be discarded
type NullWriter bool
@@ -50,18 +40,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) {
var verboselog *log.Logger
-type fileWalk chan string
-
-func (f fileWalk) Walk(path string, info os.FileInfo, err error) error {
- if err != nil {
- return err
- }
- if !info.IsDir() {
- f <- path
- }
- return nil
-}
-
func main() {
verbose := flag.Bool("v", false, "Verbose")
conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")
@@ -94,7 +72,7 @@ func main() {
verboselog = log.New(n, "", log.LstdFlags)
}
- var conn Pipeliner
+ var conn pipeline.Pipeliner
switch *conntype {
case "aws":
conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
@@ -108,18 +86,7 @@ func main() {
log.Fatalln("Failed to set up cloud connection:", err)
}
- qid := conn.PreQueueId()
-
- // Auto detect type of queue to send to based on file extension
- pngdirs, _ := filepath.Glob(bookdir + "/*.png")
- jpgdirs, _ := filepath.Glob(bookdir + "/*.jpg")
- pngcount := len(pngdirs)
- jpgcount := len(jpgdirs)
- if pngcount > jpgcount {
- qid = conn.WipeQueueId()
- } else {
- qid = conn.PreQueueId()
- }
+ qid := pipeline.DetectQueueType(bookdir, conn)
// Flags set override the queue selection
if *wipeonly {
@@ -130,43 +97,24 @@ func main() {
}
verboselog.Println("Checking that all images are valid in", bookdir)
- checker := make(fileWalk)
- go func() {
- err = filepath.Walk(bookdir, checker.Walk)
- if err != nil {
- log.Fatalln("Filesystem walk failed:", err)
- }
- close(checker)
- }()
-
- for path := range checker {
- f, err := os.Open(path)
- if err != nil {
- log.Fatalln("Opening image %s failed, bailing: %v", path, err)
- }
- _, _, err = image.Decode(f)
- if err != nil {
- log.Fatalf("Decoding image %s failed, bailing: %v", path, err)
- }
+ err = pipeline.CheckImages(bookdir)
+ if err != nil {
+ log.Fatalln(err)
}
- verboselog.Println("Walking", bookdir)
- walker := make(fileWalk)
- go func() {
- err = filepath.Walk(bookdir, walker.Walk)
- if err != nil {
- log.Fatalln("Filesystem walk failed:", err)
- }
- close(walker)
- }()
-
- for path := range walker {
- verboselog.Println("Uploading", path)
- name := filepath.Base(path)
- err = conn.Upload(conn.WIPStorageId(), filepath.Join(bookname, name), path)
- if err != nil {
- log.Fatalln("Failed to upload", path, err)
- }
+ verboselog.Println("Checking that a book hasn't already been uploaded with that name")
+ list, err := conn.ListObjects(conn.WIPStorageId(), bookname)
+ if err != nil {
+ log.Fatalln(err)
+ }
+ if len(list) > 0 {
+ log.Fatalf("Error: There is already a book in S3 named %s", bookname)
+ }
+
+ verboselog.Println("Uploading all images are valid in", bookdir)
+ err = pipeline.UploadImages(bookdir, bookname, conn)
+ if err != nil {
+ log.Fatalln(err)
}
if *training != "" {
diff --git a/cmd/getbests/main.go b/cmd/getbests/main.go
index 9eca0d8..c1ee50d 100644
--- a/cmd/getbests/main.go
+++ b/cmd/getbests/main.go
@@ -62,8 +62,8 @@ func main() {
log.Println("Downloading all best files found")
for _, i := range objs {
parts := strings.Split(i, "/")
- if parts[len(parts) - 1] == "best" {
- err = conn.Download(conn.WIPStorageId(), i, parts[0] + "-best")
+ if parts[len(parts)-1] == "best" {
+ err = conn.Download(conn.WIPStorageId(), i, parts[0]+"-best")
if err != nil {
log.Fatalln("Failed to download file", i, err)
}
diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go
index 03e709b..ccedd72 100644
--- a/cmd/getpipelinebook/main.go
+++ b/cmd/getpipelinebook/main.go
@@ -6,15 +6,15 @@
package main
import (
- "bufio"
"flag"
"fmt"
"log"
"os"
"path/filepath"
- "strings"
"rescribe.xyz/bookpipeline"
+
+ "rescribe.xyz/bookpipeline/internal/pipeline"
)
const usage = `Usage: getpipelinebook [-c conn] [-a] [-graph] [-pdf] [-png] [-v] bookname
@@ -33,28 +33,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) {
return len(p), nil
}
-type Pipeliner interface {
- MinimalInit() error
- ListObjects(bucket string, prefix string) ([]string, error)
- Download(bucket string, key string, fn string) error
- Upload(bucket string, key string, path string) error
- CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
- AddToQueue(url string, msg string) error
- DelFromQueue(url string, handle string) error
- WIPStorageId() string
-}
-
-func getpdfs(conn Pipeliner, l *log.Logger, bookname string) {
- for _, suffix := range []string{".colour.pdf", ".binarised.pdf"} {
- fn := filepath.Join(bookname, bookname+suffix)
- l.Println("Downloading PDF", fn)
- err := conn.Download(conn.WIPStorageId(), fn, fn)
- if err != nil {
- log.Printf("Failed to download %s: %s\n", fn, err)
- }
- }
-}
-
func main() {
all := flag.Bool("a", false, "Get all files for book")
conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")
@@ -83,7 +61,7 @@ func main() {
verboselog = log.New(n, "", log.LstdFlags)
}
- var conn Pipeliner
+ var conn pipeline.MinPipeliner
switch *conntype {
case "aws":
conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
@@ -109,18 +87,10 @@ func main() {
if *all {
verboselog.Println("Downloading all files for", bookname)
- objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
+ err = pipeline.DownloadAll(bookname, bookname, conn)
if err != nil {
- log.Fatalln("Failed to get list of files for book", bookname, err)
- }
- for _, i := range objs {
- verboselog.Println("Downloading", i)
- err = conn.Download(conn.WIPStorageId(), i, i)
- if err != nil {
- log.Fatalln("Failed to download file", i, err)
- }
+ log.Fatalln(err)
}
- return
}
if *binarisedpdf {
@@ -151,61 +121,29 @@ func main() {
}
if *pdf {
- getpdfs(conn, verboselog, bookname)
+ verboselog.Println("Downloading PDFs")
+ pipeline.DownloadPdfs(bookname, bookname, conn)
}
if *binarisedpdf || *colourpdf || *graph || *pdf {
return
}
- verboselog.Println("Downloading best file")
- fn := filepath.Join(bookname, "best")
- err = conn.Download(conn.WIPStorageId(), fn, fn)
+ verboselog.Println("Downloading best pages")
+ err = pipeline.DownloadBestPages(bookname, bookname, conn, *png)
if err != nil {
- log.Fatalln("Failed to download 'best' file", err)
- }
- f, err := os.Open(fn)
- if err != nil {
- log.Fatalln("Failed to open best file", err)
- }
- defer f.Close()
-
- if *png {
- verboselog.Println("Downloading png files")
- s := bufio.NewScanner(f)
- for s.Scan() {
- txtfn := filepath.Join(bookname, s.Text())
- fn = strings.Replace(txtfn, ".hocr", ".png", 1)
- verboselog.Println("Downloading file", fn)
- err = conn.Download(conn.WIPStorageId(), fn, fn)
- if err != nil {
- log.Fatalln("Failed to download file", fn, err)
- }
- }
- return
+ log.Fatalln(err)
}
- verboselog.Println("Downloading HOCR files")
- s := bufio.NewScanner(f)
- for s.Scan() {
- fn = filepath.Join(bookname, s.Text())
- verboselog.Println("Downloading file", fn)
- err = conn.Download(conn.WIPStorageId(), fn, fn)
- if err != nil {
- log.Fatalln("Failed to download file", fn, err)
- }
+ verboselog.Println("Downloading PDFs")
+ pipeline.DownloadPdfs(bookname, bookname, conn)
+ if err != nil {
+ log.Fatalln(err)
}
- verboselog.Println("Downloading PDF files")
- getpdfs(conn, verboselog, bookname)
-
- verboselog.Println("Downloading analysis files")
- for _, a := range []string{"conf", "graph.png"} {
- fn = filepath.Join(bookname, a)
- verboselog.Println("Downloading file", fn)
- err = conn.Download(conn.WIPStorageId(), fn, fn)
- if err != nil {
- log.Fatalln("Failed to download file", fn, err)
- }
+ verboselog.Println("Downloading analyses")
+ err = pipeline.DownloadAnalyses(bookname, bookname, conn)
+ if err != nil {
+ log.Fatalln(err)
}
}
diff --git a/cmd/logwholequeue/main.go b/cmd/logwholequeue/main.go
new file mode 100644
index 0000000..71e8927
--- /dev/null
+++ b/cmd/logwholequeue/main.go
@@ -0,0 +1,85 @@
+// Copyright 2019 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+// logwholequeue gets all messages in a queue. This can be useful
+// for debugging queue issues.
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+
+ "rescribe.xyz/bookpipeline"
+)
+
+const usage = `Usage: logwholequeue qname
+
+logwholequeue gets all messages in a queue.
+
+This can be useful for debugging queue issues.
+
+Valid queue names:
+- preprocess
+- wipeonly
+- ocrpage
+- analyse
+`
+
+type QueuePipeliner interface {
+ Init() error
+ LogQueue(url string) error
+ PreQueueId() string
+ WipeQueueId() string
+ OCRPageQueueId() string
+ AnalyseQueueId() string
+}
+
+func main() {
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(), usage)
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+
+ if flag.NArg() != 1 {
+ flag.Usage()
+ return
+ }
+
+ var conn QueuePipeliner
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2"}
+
+ err := conn.Init()
+ if err != nil {
+ log.Fatalln("Error setting up cloud connection:", err)
+ }
+
+ qdetails := []struct {
+ id, name string
+ }{
+ {conn.PreQueueId(), "preprocess"},
+ {conn.WipeQueueId(), "wipeonly"},
+ {conn.OCRPageQueueId(), "ocrpage"},
+ {conn.AnalyseQueueId(), "analyse"},
+ }
+
+ qname := flag.Arg(0)
+
+ var qid string
+ for i, n := range qdetails {
+ if n.name == qname {
+ qid = qdetails[i].id
+ break
+ }
+ }
+ if qid == "" {
+ log.Fatalln("Error, no queue named", qname)
+ }
+
+ err = conn.LogQueue(qid)
+ if err != nil {
+ log.Fatalln("Error getting queue", qname, ":", err)
+ }
+}
diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go
index b649778..131ff12 100644
--- a/cmd/lspipeline/main.go
+++ b/cmd/lspipeline/main.go
@@ -12,6 +12,7 @@ import (
"os/exec"
"sort"
"strings"
+ "time"
"rescribe.xyz/bookpipeline"
)
@@ -35,7 +36,7 @@ type LsPipeliner interface {
AnalyseQueueId() string
GetQueueDetails(url string) (string, string, error)
GetInstanceDetails() ([]bookpipeline.InstanceDetails, error)
- ListObjectsWithMeta(bucket string, prefix string) ([]bookpipeline.ObjMeta, error)
+ ListObjectWithMeta(bucket string, prefix string) (bookpipeline.ObjMeta, error)
ListObjectPrefixes(bucket string) ([]string, error)
WIPStorageId() string
}
@@ -100,43 +101,88 @@ func (o ObjMetas) Less(i, j int) bool {
return o[i].Date.Before(o[j].Date)
}
+// getBookDetails determines whether a book is done and what date
+// it was completed, or if it has not finished, the date of any
+// book file.
+func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, err error) {
+ // First try to get the graph.png file from the book, which marks
+ // it as done
+ obj, err := conn.ListObjectWithMeta(conn.WIPStorageId(), key+"graph.png")
+ if err == nil {
+ return obj.Date, true, nil
+ }
+
+ // Otherwise get any file from the book to get a date to sort by
+ obj, err = conn.ListObjectWithMeta(conn.WIPStorageId(), key)
+ if err != nil {
+ return time.Time{}, false, err
+ }
+ return obj.Date, false, nil
+}
+
+// getBookDetailsChan gets the details for a book putting it into either the
+// done or inprogress channels as appropriate, or sending an error to errc
+// on failure.
+func getBookDetailsChan(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error) {
+ date, isdone, err := getBookDetails(conn, key)
+ if err != nil {
+ errc <- err
+ return
+ }
+ meta := bookpipeline.ObjMeta{Name: strings.TrimSuffix(key, "/"), Date: date}
+ if isdone {
+ done <- meta
+ } else {
+ inprogress <- meta
+ }
+}
+
// getBookStatus returns a list of in progress and done books.
// It determines this by finding all prefixes, and splitting them
// into two lists, those which have a 'graph.png' file (the done
// list), and those which do not (the inprogress list). They are
// sorted according to the date of the graph.png file, or the date
// of a random file with the prefix if no graph.png was found.
+// It spins up many goroutines to do query the book status and
+// dates, as it is far faster to do concurrently.
func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) {
prefixes, err := conn.ListObjectPrefixes(conn.WIPStorageId())
- var inprogressmeta, donemeta ObjMetas
if err != nil {
log.Println("Error getting object prefixes:", err)
return
}
- // Search for graph.png to determine done books (and save the date of it to sort with)
+
+ donec := make(chan bookpipeline.ObjMeta, 100)
+ inprogressc := make(chan bookpipeline.ObjMeta, 100)
+ errc := make(chan error)
+
for _, p := range prefixes {
- objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), p+"graph.png")
- if err != nil || len(objs) == 0 {
- inprogressmeta = append(inprogressmeta, bookpipeline.ObjMeta{Name: p})
- } else {
- donemeta = append(donemeta, bookpipeline.ObjMeta{Name: p, Date: objs[0].Date})
- }
+ go getBookDetailsChan(conn, p, donec, inprogressc, errc)
}
- // Get a random file from the inprogress list to get a date to sort by
- for _, i := range inprogressmeta {
- objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), i.Name)
- if err != nil || len(objs) == 0 {
- continue
+
+ var inprogressmeta, donemeta ObjMetas
+
+ // there will be exactly as many sends to donec or inprogressc
+ // as there are prefixes
+ for range prefixes {
+ select {
+ case i := <-donec:
+ donemeta = append(donemeta, i)
+ case i := <-inprogressc:
+ inprogressmeta = append(inprogressmeta, i)
+ case err = <-errc:
+ return inprogress, done, err
}
- i.Date = objs[0].Date
}
+
sort.Sort(donemeta)
+ sort.Sort(inprogressmeta)
+
for _, i := range donemeta {
- done = append(done, strings.TrimSuffix(i.Name, "/"))
+ done = append(done, i.Name)
}
- sort.Sort(inprogressmeta)
for _, i := range inprogressmeta {
- inprogress = append(inprogress, strings.TrimSuffix(i.Name, "/"))
+ inprogress = append(inprogress, i.Name)
}
return
diff --git a/cmd/postprocess-bythresh/main.go b/cmd/postprocess-bythresh/main.go
index 37b77e7..5bdb839 100644
--- a/cmd/postprocess-bythresh/main.go
+++ b/cmd/postprocess-bythresh/main.go
@@ -19,7 +19,6 @@ import (
//TO DO: make writetofile return an error and handle that accordingly
// potential TO DO: add text versions where footer is cropped on odd/even pages only
-
// the trimblanks function trims the blank lines from a text input
func trimblanks(hocrfile string) string {
@@ -50,7 +49,7 @@ func dehyphenateString(in string) string {
words := strings.Split(line, " ")
last := words[len(words)-1]
// the - 2 here is to account for a trailing newline and counting from zero
- if len(last) > 0 && last[len(last) - 1] == '-' && i < len(lines) - 2 {
+ if len(last) > 0 && last[len(last)-1] == '-' && i < len(lines)-2 {
nextwords := strings.Split(lines[i+1], " ")
if len(nextwords) > 0 {
line = line[0:len(line)-1] + nextwords[0]
@@ -66,17 +65,15 @@ func dehyphenateString(in string) string {
return strings.Join(newlines, " ")
}
-
// the fullcrop function takes a text input and crops the first and the last line (if text is at least 2 lines long)
func fullcrop(noblanks string) string {
-
alllines := strings.Split(noblanks, "\n")
-
+
if len(alllines) <= 2 {
- return ""
- } else {
- return strings.Join(alllines[1:len(alllines)-2], "\n")
+ return ""
+ } else {
+ return strings.Join(alllines[1:len(alllines)-2], "\n")
}
}
@@ -132,7 +129,6 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string,
var killheadtxt string
var footkilltxt string
-
hocrfilepath := filepath.Join(bookdirectory, hocrfilename)
confpath := filepath.Join(bookdirectory, "conf")
@@ -165,18 +161,16 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string,
if err != nil {
log.Fatal(err)
}
-
-
+
trimbest := trimblanks(hocrfiletext)
-
+
alltxt = dehyphenateString(trimbest)
-
+
croptxt = dehyphenateString(fullcrop(trimbest))
-
+
killheadtxt = dehyphenateString(headcrop(trimbest))
-
+
footkilltxt = dehyphenateString(footcrop(trimbest))
-
}
return alltxt, croptxt, killheadtxt, footkilltxt
@@ -185,7 +179,7 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string,
// the writetofile function takes a directory, filename and text input and creates a text file within the bookdirectory from them.
func writetofile(bookdirectory, textfilebase, txt string) error {
alltxtfile := filepath.Join(bookdirectory, textfilebase)
-
+
file, err := os.Create(alltxtfile)
if err != nil {
return fmt.Errorf("Error opening file %s: %v", alltxtfile, err)
@@ -194,7 +188,7 @@ func writetofile(bookdirectory, textfilebase, txt string) error {
if _, err := file.WriteString(txt); err != nil {
log.Println(err)
}
-return err
+ return err
}
@@ -215,7 +209,7 @@ func main() {
bookdirectory := flag.Arg(0)
confthreshstring := strconv.Itoa(*confthresh)
-
+
fmt.Println("Postprocessing", bookdirectory, "with threshold", *confthresh)
bestpath := filepath.Join(bookdirectory, "best")
@@ -239,32 +233,31 @@ func main() {
crop = crop + " " + croptxt
killhead = killhead + " " + killheadtxt
killfoot = killfoot + " " + footkilltxt
-
+
}
}
-
-
- bookname:= filepath.Base(bookdirectory)
- b := bookname + "_" + confthreshstring
- err1 := writetofile(bookdirectory, b + "_all.txt", all)
- if err1 != nil {
+ bookname := filepath.Base(bookdirectory)
+ b := bookname + "_" + confthreshstring
+
+ err1 := writetofile(bookdirectory, b+"_all.txt", all)
+ if err1 != nil {
log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err1)
- }
-
- err2 := writetofile(bookdirectory, b + "_crop.txt", crop)
- if err2 != nil {
+ }
+
+ err2 := writetofile(bookdirectory, b+"_crop.txt", crop)
+ if err2 != nil {
log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err2)
- }
-
- err3 := writetofile(bookdirectory, b + "_nohead.txt", killhead)
- if err3 != nil {
+ }
+
+ err3 := writetofile(bookdirectory, b+"_nohead.txt", killhead)
+ if err3 != nil {
log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err3)
- }
-
- err4 := writetofile(bookdirectory, b + "_nofoot.txt", killfoot)
- if err4 != nil {
+ }
+
+ err4 := writetofile(bookdirectory, b+"_nofoot.txt", killfoot)
+ if err4 != nil {
log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err4)
- }
+ }
}
diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go
new file mode 100644
index 0000000..07eeaf0
--- /dev/null
+++ b/cmd/rescribe/main.go
@@ -0,0 +1,395 @@
+// Copyright 2019 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+// rescribe is a modification of bookpipeline designed for local-only
+// operation, which rolls uploading, processing, and downloading of
+// a single book by the pipeline into one command.
+package main
+
+import (
+ "flag"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "regexp"
+ "runtime"
+ "strings"
+ "time"
+
+ "rescribe.xyz/bookpipeline"
+ "rescribe.xyz/utils/pkg/hocr"
+
+ "rescribe.xyz/bookpipeline/internal/pipeline"
+)
+
+const usage = `Usage: rescribe [-v] [-t training] bookdir [savedir]
+
+Process and OCR a book using the Rescribe pipeline on a local machine.
+
+OCR results are saved into the bookdir directory unless savedir is
+specified.
+`
+
+const QueueTimeoutSecs = 2 * 60
+const PauseBetweenChecks = 1 * time.Second
+const LogSaveTime = 1 * time.Minute
+var thresholds = []float64{0.1, 0.2, 0.3}
+
+// null writer to enable non-verbose logging to be discarded
+type NullWriter bool
+
+func (w NullWriter) Write(p []byte) (n int, err error) {
+ return len(p), nil
+}
+
+type Clouder interface {
+ Init() error
+ ListObjects(bucket string, prefix string) ([]string, error)
+ Download(bucket string, key string, fn string) error
+ Upload(bucket string, key string, path string) error
+ CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
+ AddToQueue(url string, msg string) error
+ DelFromQueue(url string, handle string) error
+ QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)
+}
+
+type Pipeliner interface {
+ Clouder
+ PreQueueId() string
+ WipeQueueId() string
+ OCRPageQueueId() string
+ AnalyseQueueId() string
+ WIPStorageId() string
+ GetLogger() *log.Logger
+ Log(v ...interface{})
+}
+
+func stopTimer(t *time.Timer) {
+ if !t.Stop() {
+ <-t.C
+ }
+}
+
+func resetTimer(t *time.Timer, d time.Duration) {
+ if d > 0 {
+ t.Reset(d)
+ }
+}
+
+func main() {
+ deftesscmd := "tesseract"
+ if runtime.GOOS == "windows" {
+ deftesscmd = "C:\\Program Files\\Tesseract-OCR\\tesseract.exe"
+ }
+
+ verbose := flag.Bool("v", false, "verbose")
+ training := flag.String("t", "training/rescribev7_fast.traineddata", "path to the tesseract training file to use")
+ tesscmd := flag.String("tesscmd", deftesscmd, "The Tesseract executable to run. You may need to set this to the full path of Tesseract.exe if you're on Windows.")
+
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(), usage)
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+
+ if flag.NArg() < 1 || flag.NArg() > 2 {
+ flag.Usage()
+ return
+ }
+
+ bookdir := flag.Arg(0)
+ bookname := filepath.Base(bookdir)
+ savedir := bookdir
+ if flag.NArg() > 1 {
+ savedir = flag.Arg(1)
+ }
+
+ var verboselog *log.Logger
+ if *verbose {
+ verboselog = log.New(os.Stdout, "", 0)
+ } else {
+ var n NullWriter
+ verboselog = log.New(n, "", 0)
+ }
+
+ f, err := os.Open(*training)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Error: Training file %s could not be opened.\n", *training)
+ fmt.Fprintf(os.Stderr, "Set the `-t` flag with path to a tesseract .traineddata file.\n")
+ os.Exit(1)
+ }
+ f.Close()
+
+ abstraining, err := filepath.Abs(*training)
+ if err != nil {
+ log.Fatalf("Error getting absolute path of training %s: %v", err)
+ }
+ tessPrefix, trainingName := filepath.Split(abstraining)
+ trainingName = strings.TrimSuffix(trainingName, ".traineddata")
+ err = os.Setenv("TESSDATA_PREFIX", tessPrefix)
+ if err != nil {
+ log.Fatalln("Error setting TESSDATA_PREFIX:", err)
+ }
+
+ _, err = exec.Command(*tesscmd, "--help").Output()
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Error: Can't run Tesseract.\n")
+ fmt.Fprintf(os.Stderr, "Ensure that Tesseract is installed and available.\n")
+ fmt.Fprintf(os.Stderr, "You may need to -tesscmd to the full path of Tesseract.exe if you're on Windows, like this:\n")
+ fmt.Fprintf(os.Stderr, " rescribe -tesscmd 'C:\\Program Files\\Tesseract OCR (x86)\\tesseract.exe' ...\n")
+ os.Exit(1)
+ }
+
+ tempdir, err := ioutil.TempDir("", "bookpipeline")
+ if err != nil {
+ log.Fatalln("Error setting up temporary directory:", err)
+ }
+
+ var conn Pipeliner
+ conn = &bookpipeline.LocalConn{Logger: verboselog, TempDir: tempdir}
+
+ conn.Log("Setting up session")
+ err = conn.Init()
+ if err != nil {
+ log.Fatalln("Error setting up connection:", err)
+ }
+ conn.Log("Finished setting up session")
+
+ fmt.Printf("Copying book to pipeline\n")
+
+ err = uploadbook(bookdir, bookname, conn)
+ if err != nil {
+ _ = os.RemoveAll(tempdir)
+ log.Fatalln(err)
+ }
+
+ fmt.Printf("Processing book\n")
+ err = processbook(trainingName, *tesscmd, conn)
+ if err != nil {
+ _ = os.RemoveAll(tempdir)
+ log.Fatalln(err)
+ }
+
+ fmt.Printf("Saving finished book to %s\n", savedir)
+ err = os.MkdirAll(savedir, 0755)
+ if err != nil {
+ log.Fatalf("Error creating save directory %s: %v", savedir, err)
+ }
+ err = downloadbook(savedir, bookname, conn)
+ if err != nil {
+ _ = os.RemoveAll(tempdir)
+ log.Fatalln(err)
+ }
+
+ err = os.RemoveAll(tempdir)
+ if err != nil {
+ log.Fatalf("Error removing temporary directory %s: %v", tempdir, err)
+ }
+
+ hocrs, err := filepath.Glob(fmt.Sprintf("%s%s*.hocr", savedir, string(filepath.Separator)))
+ if err != nil {
+ log.Fatalf("Error looking for .hocr files: %v", err)
+ }
+
+ for _, v := range hocrs {
+ err = addTxtVersion(v)
+ if err != nil {
+ log.Fatalf("Error creating txt version of %s: %v", v, err)
+ }
+
+ err = os.MkdirAll(filepath.Join(savedir, "hocr"), 0755)
+ if err != nil {
+ log.Fatalf("Error creating hocr directory: %v", err)
+ }
+
+ err = os.Rename(v, filepath.Join(savedir, "hocr", filepath.Base(v)))
+ if err != nil {
+ log.Fatalf("Error moving hocr %s to hocr directory: %v", v, err)
+ }
+ }
+
+ // For simplicity, remove .binarised.pdf and rename .colour.pdf to .pdf
+ _ = os.Remove(filepath.Join(savedir, bookname + ".binarised.pdf"))
+ _ = os.Rename(filepath.Join(savedir, bookname + ".colour.pdf"), filepath.Join(savedir, bookname + ".pdf"))
+}
+
+func addTxtVersion(hocrfn string) error {
+ dir := filepath.Dir(hocrfn)
+ err := os.MkdirAll(filepath.Join(dir, "text"), 0755)
+ if err != nil {
+ log.Fatalf("Error creating text directory: %v", err)
+ }
+
+ t, err := hocr.GetText(hocrfn)
+ if err != nil {
+ return fmt.Errorf("Error getting text from hocr file %s: %v", hocrfn, err)
+ }
+
+ basefn := filepath.Base(hocrfn)
+ for _, v := range thresholds {
+ basefn = strings.TrimSuffix(basefn, fmt.Sprintf("_bin%.1f.hocr", v))
+ }
+ fn := filepath.Join(dir, "text", basefn + ".txt")
+
+ err = ioutil.WriteFile(fn, []byte(t), 0644)
+ if err != nil {
+ return fmt.Errorf("Error creating text file %s: %v", fn, err)
+ }
+
+ return nil
+}
+
+func uploadbook(dir string, name string, conn Pipeliner) error {
+ err := pipeline.CheckImages(dir)
+ if err != nil {
+ return fmt.Errorf("Error with images in %s: %v", dir, err)
+ }
+ err = pipeline.UploadImages(dir, name, conn)
+ if err != nil {
+ return fmt.Errorf("Error saving images to process from %s: %v", dir, err)
+ }
+
+ qid := pipeline.DetectQueueType(dir, conn)
+
+ err = conn.AddToQueue(qid, name)
+ if err != nil {
+ return fmt.Errorf("Error adding book job to queue %s: %v", qid, err)
+ }
+
+ return nil
+}
+
+func downloadbook(dir string, name string, conn Pipeliner) error {
+ err := os.MkdirAll(name, 0755)
+ if err != nil {
+ log.Fatalln("Failed to create directory", name, err)
+ }
+
+ err = pipeline.DownloadBestPages(dir, name, conn, false)
+ if err != nil {
+ return fmt.Errorf("Error downloading best pages: %v", err)
+ }
+
+ err = pipeline.DownloadPdfs(dir, name, conn)
+ if err != nil {
+ return fmt.Errorf("Error downloading PDFs: %v", err)
+ }
+
+ err = pipeline.DownloadAnalyses(dir, name, conn)
+ if err != nil {
+ return fmt.Errorf("Error downloading analyses: %v", err)
+ }
+
+ return nil
+}
+
+func processbook(training string, tesscmd string, conn Pipeliner) error {
+ origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`)
+ wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`)
+ ocredPattern := regexp.MustCompile(`.hocr$`)
+
+ var checkPreQueue <-chan time.Time
+ var checkWipeQueue <-chan time.Time
+ var checkOCRPageQueue <-chan time.Time
+ var checkAnalyseQueue <-chan time.Time
+ var stopIfQuiet *time.Timer
+ checkPreQueue = time.After(0)
+ checkWipeQueue = time.After(0)
+ checkOCRPageQueue = time.After(0)
+ checkAnalyseQueue = time.After(0)
+ var quietTime = 1 * time.Second
+ stopIfQuiet = time.NewTimer(quietTime)
+ if quietTime == 0 {
+ stopIfQuiet.Stop()
+ }
+
+ for {
+ select {
+ case <-checkPreQueue:
+ msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs)
+ checkPreQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ return fmt.Errorf("Error checking preprocess queue: %v", err)
+ }
+ if msg.Handle == "" {
+ conn.Log("No message received on preprocess queue, sleeping")
+ continue
+ }
+ stopTimer(stopIfQuiet)
+ conn.Log("Message received on preprocess queue, processing", msg.Body)
+ fmt.Printf(" Preprocessing book (binarising and wiping)\n")
+ err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess(thresholds), origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
+ fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output
+ resetTimer(stopIfQuiet, quietTime)
+ if err != nil {
+ return fmt.Errorf("Error during preprocess: %v", err)
+ }
+ case <-checkWipeQueue:
+ msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs)
+ checkWipeQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ return fmt.Errorf("Error checking wipeonly queue, %v", err)
+ }
+ if msg.Handle == "" {
+ conn.Log("No message received on wipeonly queue, sleeping")
+ continue
+ }
+ stopTimer(stopIfQuiet)
+ conn.Log("Message received on wipeonly queue, processing", msg.Body)
+ fmt.Printf(" Preprocessing book (wiping only)\n")
+ err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
+ fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output
+ resetTimer(stopIfQuiet, quietTime)
+ if err != nil {
+ return fmt.Errorf("Error during wipe: %v", err)
+ }
+ case <-checkOCRPageQueue:
+ msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs)
+ checkOCRPageQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ return fmt.Errorf("Error checking OCR Page queue: %v", err)
+ }
+ if msg.Handle == "" {
+ continue
+ }
+ // Have OCRPageQueue checked immediately after completion, as chances are high that
+ // there will be more pages that should be done without delay
+ checkOCRPageQueue = time.After(0)
+ stopTimer(stopIfQuiet)
+ conn.Log("Message received on OCR Page queue, processing", msg.Body)
+ fmt.Printf(".")
+ err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training, tesscmd), conn.OCRPageQueueId(), conn.AnalyseQueueId())
+ resetTimer(stopIfQuiet, quietTime)
+ if err != nil {
+ return fmt.Errorf("\nError during OCR Page process: %v", err)
+ }
+ case <-checkAnalyseQueue:
+ msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs)
+ checkAnalyseQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ return fmt.Errorf("Error checking analyse queue: %v", err)
+ }
+ if msg.Handle == "" {
+ conn.Log("No message received on analyse queue, sleeping")
+ continue
+ }
+ stopTimer(stopIfQuiet)
+ conn.Log("Message received on analyse queue, processing", msg.Body)
+ fmt.Printf("\n Analysing OCR and compiling PDFs\n")
+ err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "")
+ resetTimer(stopIfQuiet, quietTime)
+ if err != nil {
+ return fmt.Errorf("Error during analysis: %v", err)
+ }
+ case <-stopIfQuiet.C:
+ conn.Log("Processing finished")
+ return nil
+ }
+ }
+
+ return fmt.Errorf("Ended unexpectedly") // should never be reached
+}
diff --git a/cmd/rmbook/main.go b/cmd/rmbook/main.go
new file mode 100644
index 0000000..fcacc2e
--- /dev/null
+++ b/cmd/rmbook/main.go
@@ -0,0 +1,87 @@
+// Copyright 2020 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+// rmbook removes a book from cloud storage.
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+
+ "rescribe.xyz/bookpipeline"
+)
+
+const usage = `Usage: rmbook [-dryrun] bookname
+
+Removes a book from cloud storage.
+`
+
+// null writer to enable non-verbose logging to be discarded
+type NullWriter bool
+
+func (w NullWriter) Write(p []byte) (n int, err error) {
+ return len(p), nil
+}
+
+type RmPipeliner interface {
+ MinimalInit() error
+ WIPStorageId() string
+ DeleteObjects(bucket string, keys []string) error
+ ListObjects(bucket string, prefix string) ([]string, error)
+}
+
+func main() {
+ dryrun := flag.Bool("dryrun", false, "print which files would be deleted but don't delete")
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(), usage)
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+
+ if flag.NArg() < 1 {
+ flag.Usage()
+ return
+ }
+
+ var n NullWriter
+ verboselog := log.New(n, "", log.LstdFlags)
+
+ var conn RmPipeliner
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
+
+ fmt.Println("Setting up cloud connection")
+ err := conn.MinimalInit()
+ if err != nil {
+ log.Fatalln("Error setting up cloud connection:", err)
+ }
+
+ bookname := flag.Arg(0) + "/"
+
+ fmt.Println("Getting list of files for book")
+ objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
+ if err != nil {
+ log.Fatalln("Error in listing book items:", err)
+ }
+
+ if len(objs) == 0 {
+ log.Fatalln("No files found for book:", bookname)
+ }
+
+ if *dryrun {
+ fmt.Printf("I would delete these files:\n")
+ for _, v := range objs {
+ fmt.Println(v)
+ }
+ return
+ }
+
+ fmt.Println("Deleting all files for book")
+ err = conn.DeleteObjects(conn.WIPStorageId(), objs)
+ if err != nil {
+ log.Fatalln("Error deleting book files:", err)
+ }
+
+ fmt.Println("Finished deleting files")
+}
diff --git a/cmd/trimqueue/main.go b/cmd/trimqueue/main.go
new file mode 100644
index 0000000..cf65c4d
--- /dev/null
+++ b/cmd/trimqueue/main.go
@@ -0,0 +1,84 @@
+// Copyright 2019 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+// trimqueue deletes any messages in a queue that match a specified
+// prefix.
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+
+ "rescribe.xyz/bookpipeline"
+)
+
+const usage = `Usage: trimprefix qname prefix
+
+trimqueue deletes any messages in a queue that match a specified
+prefix.
+
+Valid queue names:
+- preprocess
+- wipeonly
+- ocrpage
+- analyse
+`
+
+type QueuePipeliner interface {
+ Init() error
+ RemovePrefixesFromQueue(url string, prefix string) error
+ PreQueueId() string
+ WipeQueueId() string
+ OCRPageQueueId() string
+ AnalyseQueueId() string
+}
+
+func main() {
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(), usage)
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+
+ if flag.NArg() != 2 {
+ flag.Usage()
+ return
+ }
+
+ var conn QueuePipeliner
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2"}
+
+ err := conn.Init()
+ if err != nil {
+ log.Fatalln("Error setting up cloud connection:", err)
+ }
+
+ qdetails := []struct {
+ id, name string
+ }{
+ {conn.PreQueueId(), "preprocess"},
+ {conn.WipeQueueId(), "wipeonly"},
+ {conn.OCRPageQueueId(), "ocrpage"},
+ {conn.AnalyseQueueId(), "analyse"},
+ }
+
+ qname := flag.Arg(0)
+
+ var qid string
+ for i, n := range qdetails {
+ if n.name == qname {
+ qid = qdetails[i].id
+ break
+ }
+ }
+ if qid == "" {
+ log.Fatalln("Error, no queue named", qname)
+ }
+
+ err = conn.RemovePrefixesFromQueue(qid, flag.Arg(1))
+ if err != nil {
+ log.Fatalln("Error removing prefixes from queue", qname, ":", err)
+ }
+}
diff --git a/doc.go b/doc.go
index 823ac2f..bd4da11 100644
--- a/doc.go
+++ b/doc.go
@@ -168,5 +168,19 @@ At present the bookpipeline has some silly limitations of file names for book
pages to be recognised. This is something which will be fixed in due course.
Pages that are to be fully processed: *[0-9]{4}.jpg$
Pages that are to be wiped only: *[0-9]{6}(.bin)?.png$
+
+Local operation
+
+While bookpipeline was built with cloud based operation in mind, there is also
+a local mode that can be used to run OCR jobs from a single computer, with all
+the benefits of preprocessing, choosing the best threshold for each image,
+graph creation, PDF creation, and so on that the pipeline provides.
+
+Several of the commands accept a `-c local` flag for local operation, but now
+there is also a new command, named rescribe, that is designed to make things
+much simpler for people just wanting to do some OCR on their local computer.
+
+Note that the local mode is not as well tested as the core cloud modes; please
+report any bugs you find with it.
*/
package bookpipeline
diff --git a/go.mod b/go.mod
index 9c995f3..50354b8 100644
--- a/go.mod
+++ b/go.mod
@@ -3,16 +3,10 @@ module rescribe.xyz/bookpipeline
go 1.14
require (
- github.com/aws/aws-sdk-go v1.30.5
- github.com/davecgh/go-spew v1.1.1 // indirect
- github.com/jung-kurt/gofpdf v1.16.2
- github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
+ github.com/aws/aws-sdk-go v1.37.1
+ github.com/phpdave11/gofpdf v1.4.2
github.com/wcharczuk/go-chart/v2 v2.1.0
golang.org/x/image v0.0.0-20200927104501-e162460cd6b5
- golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect
- golang.org/x/text v0.3.2 // indirect
- gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
- gopkg.in/yaml.v2 v2.2.8 // indirect
- rescribe.xyz/preproc v0.4.0
+ rescribe.xyz/preproc v0.4.1
rescribe.xyz/utils v0.1.3
)
diff --git a/go.sum b/go.sum
index 16815d6..7eb2c51 100644
--- a/go.sum
+++ b/go.sum
@@ -1,30 +1,24 @@
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/DataDog/datadog-go v0.0.0-20180822151419-281ae9f2d895/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/airbrake/gobrake v3.6.1+incompatible/go.mod h1:wM4gu3Cn0W0K7GUuVWnlXZU11AGBXMILnrdOU8Kn00o=
-github.com/aws/aws-sdk-go v1.30.5 h1:i+sSesaMrSxiUt3NJddOApe2mXK+VNBgfcmRTvNFrXM=
-github.com/aws/aws-sdk-go v1.30.5/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
+github.com/aws/aws-sdk-go v1.37.1 h1:BTHmuN+gzhxkvU9sac2tZvaY0gV9ihbHw+KxZOecYvY=
+github.com/aws/aws-sdk-go v1.37.1/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/blend/go-sdk v1.1.1/go.mod h1:IP1XHXFveOXHRnojRJO7XvqWGqyzevtXND9AdSztAe8=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
-github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
-github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc=
-github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
+github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
+github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
+github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
+github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
-github.com/jung-kurt/gofpdf v1.16.2 h1:jgbatWHfRlPYiK85qgevsZTHviWXKwB1TTiKdz5PtRc=
-github.com/jung-kurt/gofpdf v1.16.2/go.mod h1:1hl7y57EsiPAkLbOwzpzqgx1A30nQCk/YmFV8S2vmK0=
-github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
-github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
-github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
-github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
-github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
-github.com/phpdave11/gofpdi v1.0.7/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
+github.com/phpdave11/gofpdf v1.4.2 h1:KPKiIbfwbvC/wOncwhrpRdXVj2CZTCFlw4wnoyjtHfQ=
+github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY=
+github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -33,43 +27,38 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
-github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
-github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/wcharczuk/go-chart v2.0.2-0.20191206192251-962b9abdec2b+incompatible h1:ahpaSRefPekV3gcXot2AOgngIV8WYqzvDyFe3i7W24w=
github.com/wcharczuk/go-chart v2.0.2-0.20191206192251-962b9abdec2b+incompatible/go.mod h1:PF5tmL4EIx/7Wf+hEkpCqYi5He4u90sw+0+6FhrryuE=
github.com/wcharczuk/go-chart/v2 v2.1.0 h1:tY2slqVQ6bN+yHSnDYwZebLQFkphK4WNrVwnt7CJZ2I=
github.com/wcharczuk/go-chart/v2 v2.1.0/go.mod h1:yx7MvAVNcP/kN9lKXM/NTce4au4DFN99j6i1OwDclNA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/image v0.0.0-20190910094157-69e4b8554b2a/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.0.0-20200618115811-c13761719519 h1:1e2ufUJNM3lCHEY5jIgac/7UTjd6cgJNdatjPdFWf34=
golang.org/x/image v0.0.0-20200618115811-c13761719519/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.0.0-20200927104501-e162460cd6b5 h1:QelT11PB4FXiDEXucrfNckHoFxwt8USGY1ajP1ZF5lM=
golang.org/x/image v0.0.0-20200927104501-e162460cd6b5/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
-golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
-golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e h1:3G+cUijn7XD+S4eJFddp53Pv7+slrESplyjG25HgL+k=
-golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME=
+golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
-golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
-golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181205014116-22934f0fdb62/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
-gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
-gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
rescribe.xyz/integral v0.6.0 h1:CLF3sQ6th/OuG+/rp/lLR+AGOT4R7tG3IiUjSLKsriw=
rescribe.xyz/integral v0.6.0/go.mod h1:gKJq4UaVn17RsMsUasEMcJDkTkwqeb6AzPIJtwcUipg=
-rescribe.xyz/preproc v0.4.0 h1:HyR1R/e9hDuCdZouUgaojq0YSfJyWo87y31xAuhCdHE=
-rescribe.xyz/preproc v0.4.0/go.mod h1:Yh3wyeoKK+pu50mNrYUN/zuUbRO0kxifIr9DeE3MZvY=
+rescribe.xyz/preproc v0.4.1 h1:QAA9rBxVbFq1JH+uhRQeE507LxDl0uhgeSFTXCl/7rM=
+rescribe.xyz/preproc v0.4.1/go.mod h1:Yh3wyeoKK+pu50mNrYUN/zuUbRO0kxifIr9DeE3MZvY=
rescribe.xyz/utils v0.1.3 h1:2rlHbUjAGXy/xgtmUb6Y7Kbpxl3qkwtWzkFUQ/cOaIA=
rescribe.xyz/utils v0.1.3/go.mod h1:4L2vClYUFklsXggN0CUyP/alcgzLNRT0dMpMfEiVbX8=
diff --git a/internal/pipeline/get.go b/internal/pipeline/get.go
new file mode 100644
index 0000000..6c5b92c
--- /dev/null
+++ b/internal/pipeline/get.go
@@ -0,0 +1,96 @@
+// Copyright 2019 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+package pipeline
+
+import (
+ "bufio"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+)
+
+func DownloadBestPages(dir string, name string, conn Pipeliner, pluspngs bool) error {
+ key := filepath.Join(name, "best")
+ fn := filepath.Join(dir, "best")
+ err := conn.Download(conn.WIPStorageId(), key, fn)
+ if err != nil {
+ return fmt.Errorf("Failed to download 'best' file: %v", err)
+ }
+ f, err := os.Open(fn)
+ if err != nil {
+ return fmt.Errorf("Failed to open best file: %v", err)
+ }
+ defer f.Close()
+
+ s := bufio.NewScanner(f)
+ for s.Scan() {
+ key = filepath.Join(name, s.Text())
+ fn = filepath.Join(dir, s.Text())
+ conn.Log("Downloading file", key)
+ err = conn.Download(conn.WIPStorageId(), key, fn)
+ if err != nil {
+ return fmt.Errorf("Failed to download file %s: %v", key, err)
+ }
+ }
+
+ if !pluspngs {
+ return nil
+ }
+
+ s = bufio.NewScanner(f)
+ for s.Scan() {
+ imgname := strings.Replace(s.Text(), ".hocr", ".png", 1)
+ key = filepath.Join(name, imgname)
+ fn = filepath.Join(dir, imgname)
+ conn.Log("Downloading file", key)
+ err = conn.Download(conn.WIPStorageId(), key, fn)
+ if err != nil {
+ return fmt.Errorf("Failed to download file %s: %v", key, err)
+ }
+ }
+ return nil
+}
+
+func DownloadPdfs(dir string, name string, conn Pipeliner) error {
+ for _, suffix := range []string{".colour.pdf", ".binarised.pdf"} {
+ key := filepath.Join(name, name+suffix)
+ fn := filepath.Join(dir, name+suffix)
+ err := conn.Download(conn.WIPStorageId(), key, fn)
+ if err != nil {
+ return fmt.Errorf("Failed to download PDF %s: %v", key, err)
+ }
+ }
+ return nil
+}
+
+func DownloadAnalyses(dir string, name string, conn Pipeliner) error {
+ for _, a := range []string{"conf", "graph.png"} {
+ key := filepath.Join(name, a)
+ fn := filepath.Join(dir, a)
+ err := conn.Download(conn.WIPStorageId(), key, fn)
+ if err != nil {
+ return fmt.Errorf("Failed to download analysis file %s: %v", key, err)
+ }
+ }
+ return nil
+}
+
+func DownloadAll(dir string, name string, conn Pipeliner) error {
+ objs, err := conn.ListObjects(conn.WIPStorageId(), name)
+ if err != nil {
+ return fmt.Errorf("Failed to get list of files for book", name, err)
+ }
+ for _, i := range objs {
+ base := filepath.Base(i)
+ fn := filepath.Join(dir, base)
+ conn.Log("Downloading", i)
+ err = conn.Download(conn.WIPStorageId(), i, fn)
+ if err != nil {
+ return fmt.Errorf("Failed to download file %s: %v", i, err)
+ }
+ }
+ return nil
+}
diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go
new file mode 100644
index 0000000..13339d7
--- /dev/null
+++ b/internal/pipeline/pipeline.go
@@ -0,0 +1,735 @@
+// Copyright 2020 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+// pipeline is a package used by the bookpipeline command, which
+// handles the core functionality, using channels heavily to
+// coordinate jobs. Note that it is considered an "internal" package,
+// not intended for external use, and no guarantee is made of the
+// stability of any interfaces provided.
+package pipeline
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "net/smtp"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "regexp"
+ "sort"
+ "strings"
+ "time"
+
+ "rescribe.xyz/bookpipeline"
+ "rescribe.xyz/preproc"
+ "rescribe.xyz/utils/pkg/hocr"
+)
+
+const HeartbeatSeconds = 60
+
+type Clouder interface {
+ Init() error
+ ListObjects(bucket string, prefix string) ([]string, error)
+ Download(bucket string, key string, fn string) error
+ Upload(bucket string, key string, path string) error
+ CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
+ AddToQueue(url string, msg string) error
+ DelFromQueue(url string, handle string) error
+ QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)
+}
+
+type Pipeliner interface {
+ Clouder
+ PreQueueId() string
+ WipeQueueId() string
+ OCRPageQueueId() string
+ AnalyseQueueId() string
+ WIPStorageId() string
+ GetLogger() *log.Logger
+ Log(v ...interface{})
+}
+
+type MinPipeliner interface {
+ Pipeliner
+ MinimalInit() error
+}
+
+type pageimg struct {
+ hocr, img string
+}
+
+type mailSettings struct {
+ server, port, user, pass, from, to string
+}
+
+func GetMailSettings() (mailSettings, error) {
+ p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings")
+ b, err := ioutil.ReadFile(p)
+ if err != nil {
+ return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err)
+ }
+ f := strings.Fields(string(b))
+ if len(f) != 6 {
+ return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f))
+ }
+ return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil
+}
+
+func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) {
+ for key := range dl {
+ fn := filepath.Join(dir, filepath.Base(key))
+ logger.Println("Downloading", key)
+ err := conn.Download(conn.WIPStorageId(), key, fn)
+ if err != nil {
+ for range dl {
+ } // consume the rest of the receiving channel so it isn't blocked
+ close(process)
+ errc <- err
+ return
+ }
+ process <- fn
+ }
+ close(process)
+}
+
+func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) {
+ for path := range c {
+ name := filepath.Base(path)
+ key := bookname + "/" + name
+ logger.Println("Uploading", key)
+ err := conn.Upload(conn.WIPStorageId(), key, path)
+ if err != nil {
+ for range c {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ err = os.Remove(path)
+ if err != nil {
+ for range c {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ }
+
+ done <- true
+}
+
+func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) {
+ for path := range c {
+ name := filepath.Base(path)
+ key := bookname + "/" + name
+ logger.Println("Uploading", key)
+ err := conn.Upload(conn.WIPStorageId(), key, path)
+ if err != nil {
+ for range c {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ err = os.Remove(path)
+ if err != nil {
+ for range c {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ logger.Println("Adding", key, training, "to queue", toQueue)
+ err = conn.AddToQueue(toQueue, key+" "+training)
+ if err != nil {
+ for range c {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ }
+
+ done <- true
+}
+
+func Preprocess(thresholds []float64) func(chan string, chan string, chan error, *log.Logger) {
+ return func(pre chan string, up chan string, errc chan error, logger *log.Logger) {
+ for path := range pre {
+ logger.Println("Preprocessing", path)
+ done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, true, 5, 30, 120, 30)
+ if err != nil {
+ for range pre {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ _ = os.Remove(path)
+ for _, p := range done {
+ up <- p
+ }
+ }
+ close(up)
+ }
+}
+
+func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) {
+ for path := range towipe {
+ logger.Println("Wiping", path)
+ s := strings.Split(path, ".")
+ base := strings.Join(s[:len(s)-1], "")
+ outpath := base + "_bin0.0.png"
+ err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30)
+ if err != nil {
+ for range towipe {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ up <- outpath
+ }
+ close(up)
+}
+
+func Ocr(training string, tesscmd string) func(chan string, chan string, chan error, *log.Logger) {
+ return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) {
+ if tesscmd == "" {
+ tesscmd = "tesseract"
+ }
+ for path := range toocr {
+ logger.Println("OCRing", path)
+ name := strings.Replace(path, ".png", "", 1)
+ cmd := exec.Command(tesscmd, "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0")
+ var stdout, stderr bytes.Buffer
+ cmd.Stdout = &stdout
+ cmd.Stderr = &stderr
+ err := cmd.Run()
+ if err != nil {
+ for range toocr {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String())
+ return
+ }
+ up <- name + ".hocr"
+ }
+ close(up)
+ }
+}
+
+func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) {
+ return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) {
+ confs := make(map[string][]*bookpipeline.Conf)
+ bestconfs := make(map[string]*bookpipeline.Conf)
+ savedir := ""
+
+ for path := range toanalyse {
+ if savedir == "" {
+ savedir = filepath.Dir(path)
+ }
+ logger.Println("Calculating confidence for", path)
+ avg, err := hocr.GetAvgConf(path)
+ if err != nil && err.Error() == "No words found" {
+ continue
+ }
+ if err != nil {
+ for range toanalyse {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err)
+ return
+ }
+ base := filepath.Base(path)
+ codestart := strings.Index(base, "_bin")
+ name := base[0:codestart]
+ var c bookpipeline.Conf
+ c.Path = path
+ c.Code = base[codestart:]
+ c.Conf = avg
+ confs[name] = append(confs[name], &c)
+ }
+
+ fn := filepath.Join(savedir, "conf")
+ logger.Println("Saving confidences in file", fn)
+ f, err := os.Create(fn)
+ if err != nil {
+ errc <- fmt.Errorf("Error creating file %s: %s", fn, err)
+ return
+ }
+ defer f.Close()
+
+ logger.Println("Finding best confidence for each page, and saving all confidences")
+ for base, conf := range confs {
+ var best float64
+ for _, c := range conf {
+ if c.Conf > best {
+ best = c.Conf
+ bestconfs[base] = c
+ }
+ _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf)
+ if err != nil {
+ errc <- fmt.Errorf("Error writing confidences file: %s", err)
+ return
+ }
+ }
+ }
+ f.Close()
+ up <- fn
+
+ logger.Println("Creating best file listing the best file for each page")
+ fn = filepath.Join(savedir, "best")
+ f, err = os.Create(fn)
+ if err != nil {
+ errc <- fmt.Errorf("Error creating file %s: %s", fn, err)
+ return
+ }
+ defer f.Close()
+ for _, conf := range bestconfs {
+ _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path))
+ }
+ f.Close()
+ up <- fn
+
+ var pgs []string
+ for _, conf := range bestconfs {
+ pgs = append(pgs, conf.Path)
+ }
+ sort.Strings(pgs)
+
+ logger.Println("Downloading binarised and original images to create PDFs")
+ bookname, err := filepath.Rel(os.TempDir(), savedir)
+ if err != nil {
+ errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err)
+ return
+ }
+ colourpdf := new(bookpipeline.Fpdf)
+ err = colourpdf.Setup()
+ if err != nil {
+ errc <- fmt.Errorf("Failed to set up PDF: %s", err)
+ return
+ }
+ binarisedpdf := new(bookpipeline.Fpdf)
+ err = binarisedpdf.Setup()
+ if err != nil {
+ errc <- fmt.Errorf("Failed to set up PDF: %s", err)
+ return
+ }
+ binhascontent, colourhascontent := false, false
+
+ var colourimgs, binimgs []pageimg
+
+ for _, pg := range pgs {
+ base := filepath.Base(pg)
+ nosuffix := strings.TrimSuffix(base, ".hocr")
+ p := strings.SplitN(base, "_bin", 2)
+
+ var fn string
+ if len(p) > 1 {
+ fn = p[0] + ".jpg"
+ } else {
+ fn = nosuffix + ".jpg"
+ }
+
+ binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"})
+ colourimgs = append(colourimgs, pageimg{hocr: base, img: fn})
+ }
+
+ for _, pg := range binimgs {
+ logger.Println("Downloading binarised page to add to PDF", pg.img)
+ err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img))
+ if err != nil {
+ logger.Println("Download failed; skipping page", pg.img)
+ } else {
+ err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true)
+ if err != nil {
+ errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err)
+ return
+ }
+ binhascontent = true
+ err = os.Remove(filepath.Join(savedir, pg.img))
+ if err != nil {
+ errc <- err
+ return
+ }
+ }
+ }
+
+ if binhascontent {
+ fn = filepath.Join(savedir, bookname+".binarised.pdf")
+ err = binarisedpdf.Save(fn)
+ if err != nil {
+ errc <- fmt.Errorf("Failed to save binarised pdf: %s", err)
+ return
+ }
+ up <- fn
+ }
+
+ for _, pg := range colourimgs {
+ logger.Println("Downloading colour page to add to PDF", pg.img)
+ colourfn := pg.img
+ err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn))
+ if err != nil {
+ colourfn = strings.Replace(pg.img, ".jpg", ".png", 1)
+ logger.Println("Download failed; trying", colourfn)
+ err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn))
+ if err != nil {
+ logger.Println("Download failed; skipping page", pg.img)
+ }
+ }
+ if err == nil {
+ err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true)
+ if err != nil {
+ errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err)
+ return
+ }
+ colourhascontent = true
+ err = os.Remove(filepath.Join(savedir, colourfn))
+ if err != nil {
+ errc <- err
+ return
+ }
+ }
+ }
+ if colourhascontent {
+ fn = filepath.Join(savedir, bookname+".colour.pdf")
+ err = colourpdf.Save(fn)
+ if err != nil {
+ errc <- fmt.Errorf("Failed to save colour pdf: %s", err)
+ return
+ }
+ up <- fn
+ }
+
+ logger.Println("Creating graph")
+ fn = filepath.Join(savedir, "graph.png")
+ f, err = os.Create(fn)
+ if err != nil {
+ errc <- fmt.Errorf("Error creating file %s: %s", fn, err)
+ return
+ }
+ defer f.Close()
+ err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f)
+ if err != nil && err.Error() != "Not enough valid confidences" {
+ errc <- fmt.Errorf("Error rendering graph: %s", err)
+ return
+ }
+ up <- fn
+
+ close(up)
+ }
+}
+
+func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) {
+ currentmsg := msg
+ for range t.C {
+ m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2)
+ if err != nil {
+ // This is for better debugging of the heartbeat issue
+ conn.Log("Error with heartbeat", err)
+ os.Exit(1)
+ // TODO: would be better to ensure this error stops any running
+ // processes, as they will ultimately fail in the case of
+ // it. could do this by setting a global variable that
+ // processes check each time they loop.
+ errc <- err
+ t.Stop()
+ return
+ }
+ if m.Id != "" {
+ conn.Log("Replaced message handle as visibilitytimeout limit was reached")
+ currentmsg = m
+ // TODO: maybe handle communicating new msg more gracefully than this
+ for range msgc {
+ } // throw away any old msgc
+ msgc <- m
+ }
+ }
+}
+
+// allOCRed checks whether all pages of a book have been OCRed.
+// This is determined by whether every _bin0.?.png file has a
+// corresponding .hocr file.
+func allOCRed(bookname string, conn Pipeliner) bool {
+ objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
+ if err != nil {
+ return false
+ }
+
+ preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`)
+
+ atleastone := false
+ for _, png := range objs {
+ if preprocessedPattern.MatchString(png) {
+ atleastone = true
+ found := false
+ hocrname := strings.TrimSuffix(png, ".png") + ".hocr"
+ for _, hocr := range objs {
+ if hocr == hocrname {
+ found = true
+ break
+ }
+ }
+ if found == false {
+ return false
+ }
+ }
+ }
+ if atleastone == false {
+ return false
+ }
+ return true
+}
+
+// OcrPage OCRs a page based on a message. It may make sense to
+// roll this back into processBook (on which it is based) once
+// working well.
+func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error {
+ dl := make(chan string)
+ msgc := make(chan bookpipeline.Qmsg)
+ processc := make(chan string)
+ upc := make(chan string)
+ done := make(chan bool)
+ errc := make(chan error)
+
+ msgparts := strings.Split(msg.Body, " ")
+ bookname := filepath.Dir(msgparts[0])
+ if len(msgparts) > 1 && msgparts[1] != "" {
+ process = Ocr(msgparts[1], "")
+ }
+
+ d := filepath.Join(os.TempDir(), bookname)
+ err := os.MkdirAll(d, 0755)
+ if err != nil {
+ return fmt.Errorf("Failed to create directory %s: %s", d, err)
+ }
+
+ t := time.NewTicker(HeartbeatSeconds * time.Second)
+ go heartbeat(conn, t, msg, fromQueue, msgc, errc)
+
+ // these functions will do their jobs when their channels have data
+ go download(dl, processc, conn, d, errc, conn.GetLogger())
+ go process(processc, upc, errc, conn.GetLogger())
+ go up(upc, done, conn, bookname, errc, conn.GetLogger())
+
+ dl <- msgparts[0]
+ close(dl)
+
+ // wait for either the done or errc channel to be sent to
+ select {
+ case err = <-errc:
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return err
+ case <-done:
+ }
+
+ if allOCRed(bookname, conn) && toQueue != "" {
+ conn.Log("Sending", bookname, "to queue", toQueue)
+ err = conn.AddToQueue(toQueue, bookname)
+ if err != nil {
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return fmt.Errorf("Error adding to queue %s: %s", bookname, err)
+ }
+ }
+
+ t.Stop()
+
+ // check whether we're using a newer msg handle
+ select {
+ case m, ok := <-msgc:
+ if ok {
+ msg = m
+ conn.Log("Using new message handle to delete message from queue")
+ }
+ default:
+ conn.Log("Using original message handle to delete message from queue")
+ }
+
+ conn.Log("Deleting original message from queue", fromQueue)
+ err = conn.DelFromQueue(fromQueue, msg.Handle)
+ if err != nil {
+ _ = os.RemoveAll(d)
+ return fmt.Errorf("Error deleting message from queue: %s", err)
+ }
+
+ err = os.RemoveAll(d)
+ if err != nil {
+ return fmt.Errorf("Failed to remove directory %s: %s", d, err)
+ }
+
+ return nil
+}
+
+func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
+ dl := make(chan string)
+ msgc := make(chan bookpipeline.Qmsg)
+ processc := make(chan string)
+ upc := make(chan string)
+ done := make(chan bool)
+ errc := make(chan error)
+
+ msgparts := strings.Split(msg.Body, " ")
+ bookname := msgparts[0]
+
+ var training string
+ if len(msgparts) > 1 {
+ training = msgparts[1]
+ }
+
+ d := filepath.Join(os.TempDir(), bookname)
+ err := os.MkdirAll(d, 0755)
+ if err != nil {
+ return fmt.Errorf("Failed to create directory %s: %s", d, err)
+ }
+
+ t := time.NewTicker(HeartbeatSeconds * time.Second)
+ go heartbeat(conn, t, msg, fromQueue, msgc, errc)
+
+ // these functions will do their jobs when their channels have data
+ go download(dl, processc, conn, d, errc, conn.GetLogger())
+ go process(processc, upc, errc, conn.GetLogger())
+ if toQueue == conn.OCRPageQueueId() {
+ go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger())
+ } else {
+ go up(upc, done, conn, bookname, errc, conn.GetLogger())
+ }
+
+ conn.Log("Getting list of objects to download")
+ objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
+ if err != nil {
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err)
+ }
+ var todl []string
+ for _, n := range objs {
+ if !match.MatchString(n) {
+ conn.Log("Skipping item that doesn't match target", n)
+ continue
+ }
+ todl = append(todl, n)
+ }
+ for _, a := range todl {
+ dl <- a
+ }
+ close(dl)
+
+ // wait for either the done or errc channel to be sent to
+ select {
+ case err = <-errc:
+ t.Stop()
+ _ = os.RemoveAll(d)
+ // if the error is in preprocessing / wipeonly, chances are that it will never
+ // complete, and will fill the ocrpage queue with parts which succeeded
+ // on each run, so in that case it's better to delete the message from
+ // the queue and notify us.
+ if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() {
+ conn.Log("Deleting message from queue due to a bad error", fromQueue)
+ err2 := conn.DelFromQueue(fromQueue, msg.Handle)
+ if err2 != nil {
+ conn.Log("Error deleting message from queue", err2)
+ }
+ ms, err2 := GetMailSettings()
+ if err2 != nil {
+ conn.Log("Failed to mail settings ", err2)
+ }
+ if err2 == nil && ms.server != "" {
+ logs, err2 := getLogs()
+ if err2 != nil {
+ conn.Log("Failed to get logs ", err2)
+ logs = ""
+ }
+ msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n"+
+ "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n"+
+ " Fail message: %s\r\nFull log:\r\n%s\r\n",
+ ms.to, ms.from, bookname, err, logs)
+ host := fmt.Sprintf("%s:%s", ms.server, ms.port)
+ auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server)
+ err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg))
+ if err2 != nil {
+ conn.Log("Error sending email ", err2)
+ }
+ }
+ }
+ return err
+ case <-done:
+ }
+
+ if toQueue != "" && toQueue != conn.OCRPageQueueId() {
+ conn.Log("Sending", bookname, "to queue", toQueue)
+ err = conn.AddToQueue(toQueue, bookname)
+ if err != nil {
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return fmt.Errorf("Error adding to queue %s: %s", bookname, err)
+ }
+ }
+
+ t.Stop()
+
+ // check whether we're using a newer msg handle
+ select {
+ case m, ok := <-msgc:
+ if ok {
+ msg = m
+ conn.Log("Using new message handle to delete message from queue")
+ }
+ default:
+ conn.Log("Using original message handle to delete message from queue")
+ }
+
+ conn.Log("Deleting original message from queue", fromQueue)
+ err = conn.DelFromQueue(fromQueue, msg.Handle)
+ if err != nil {
+ _ = os.RemoveAll(d)
+ return fmt.Errorf("Error deleting message from queue: %s", err)
+ }
+
+ err = os.RemoveAll(d)
+ if err != nil {
+ return fmt.Errorf("Failed to remove directory %s: %s", d, err)
+ }
+
+ return nil
+}
+
+// TODO: rather than relying on journald, would be nicer to save the logs
+// ourselves maybe, so that we weren't relying on a particular systemd
+// setup. this can be done by having the conn.Log also append line
+// to a file (though that would mean everything would have to go through
+// conn.Log, which we're not consistently doing yet). the correct thing
+// to do then would be to implement a new interface that covers the part
+// of log.Logger we use (e.g. Print and Printf), and then have an exported
+// conn struct that implements those, so that we could pass a log.Logger
+// or the new conn struct everywhere (we wouldn't be passing a log.Logger,
+// it's just good to be able to keep the compatibility)
+func getLogs() (string, error) {
+ cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all")
+ var stdout, stderr bytes.Buffer
+ cmd.Stdout = &stdout
+ cmd.Stderr = &stderr
+ err := cmd.Run()
+ return stdout.String(), err
+}
+
+func SaveLogs(conn Pipeliner, starttime int64, hostname string) error {
+ logs, err := getLogs()
+ if err != nil {
+ return fmt.Errorf("Error getting logs, error: %v", err)
+ }
+ key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname)
+ path := filepath.Join(os.TempDir(), key)
+ f, err := os.Create(path)
+ if err != nil {
+ return fmt.Errorf("Error creating log file", err)
+ }
+ defer f.Close()
+ _, err = f.WriteString(logs)
+ if err != nil {
+ return fmt.Errorf("Error saving log file", err)
+ }
+ _ = f.Close()
+ err = conn.Upload(conn.WIPStorageId(), key, path)
+ if err != nil {
+ return fmt.Errorf("Error uploading log", err)
+ }
+ conn.Log("Log saved to", key)
+ return nil
+}
diff --git a/internal/pipeline/put.go b/internal/pipeline/put.go
new file mode 100644
index 0000000..4b38ea5
--- /dev/null
+++ b/internal/pipeline/put.go
@@ -0,0 +1,85 @@
+// Copyright 2020 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+package pipeline
+
+import (
+ "fmt"
+ "image"
+ _ "image/jpeg"
+ _ "image/png"
+ "os"
+ "path/filepath"
+)
+
+// null writer to enable non-verbose logging to be discarded
+type NullWriter bool
+
+func (w NullWriter) Write(p []byte) (n int, err error) {
+ return len(p), nil
+}
+
+type fileWalk chan string
+
+func (f fileWalk) Walk(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if !info.IsDir() {
+ f <- path
+ }
+ return nil
+}
+
+func CheckImages(dir string) error {
+ checker := make(fileWalk)
+ go func() {
+ _ = filepath.Walk(dir, checker.Walk)
+ close(checker)
+ }()
+
+ for path := range checker {
+ f, err := os.Open(path)
+ if err != nil {
+ return fmt.Errorf("Opening image %s failed: %v", path, err)
+ }
+ _, _, err = image.Decode(f)
+ if err != nil {
+ return fmt.Errorf("Decoding image %s failed: %v", path, err)
+ }
+ }
+
+ return nil
+}
+
+func DetectQueueType(dir string, conn Pipeliner) string {
+ // Auto detect type of queue to send to based on file extension
+ pngdirs, _ := filepath.Glob(dir + "/*.png")
+ jpgdirs, _ := filepath.Glob(dir + "/*.jpg")
+ pngcount := len(pngdirs)
+ jpgcount := len(jpgdirs)
+ if pngcount > jpgcount {
+ return conn.WipeQueueId()
+ } else {
+ return conn.PreQueueId()
+ }
+}
+
+func UploadImages(dir string, bookname string, conn Pipeliner) error {
+ walker := make(fileWalk)
+ go func() {
+ _ = filepath.Walk(dir, walker.Walk)
+ close(walker)
+ }()
+
+ for path := range walker {
+ name := filepath.Base(path)
+ err := conn.Upload(conn.WIPStorageId(), filepath.Join(bookname, name), path)
+ if err != nil {
+ return fmt.Errorf("Failed to upload %s: %v", path, err)
+ }
+ }
+
+ return nil
+}
diff --git a/local.go b/local.go
index ebc3611..31e44a9 100644
--- a/local.go
+++ b/local.go
@@ -27,7 +27,7 @@ const storageId = "storage"
type LocalConn struct {
// these should be set before running Init(), or left to defaults
TempDir string
- Logger *log.Logger
+ Logger *log.Logger
}
// MinimalInit does the bare minimum initialisation
@@ -36,7 +36,7 @@ func (a *LocalConn) MinimalInit() error {
if a.TempDir == "" {
a.TempDir = filepath.Join(os.TempDir(), "bookpipeline")
}
- err = os.Mkdir(a.TempDir, 0700)
+ err = os.MkdirAll(a.TempDir, 0700)
if err != nil && !os.IsExist(err) {
return fmt.Errorf("Error creating temporary directory: %v", err)
}
@@ -134,6 +134,7 @@ func prefixwalker(dirpath string, prefix string, list *[]ObjMeta) filepath.WalkF
}
n := strings.TrimPrefix(path, dirpath)
n = strings.TrimPrefix(n, "/")
+ n = strings.TrimPrefix(n, "\\")
o := ObjMeta{Name: n, Date: info.ModTime()}
*list = append(*list, o)
return nil
@@ -158,6 +159,17 @@ func (a *LocalConn) ListObjectsWithMeta(bucket string, prefix string) ([]ObjMeta
return list, err
}
+func (a *LocalConn) ListObjectWithMeta(bucket string, prefix string) (ObjMeta, error) {
+ list, err := a.ListObjectsWithMeta(bucket, prefix)
+ if err != nil {
+ return ObjMeta{}, err
+ }
+ if len(list) == 0 {
+ return ObjMeta{}, fmt.Errorf("No object found for %s", prefix)
+ }
+ return list[0], nil
+}
+
// AddToQueue adds a message to a queue
func (a *LocalConn) AddToQueue(url string, msg string) error {
f, err := os.OpenFile(filepath.Join(a.TempDir, url), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
@@ -184,12 +196,12 @@ func (a *LocalConn) DelFromQueue(url string, handle string) error {
// store the joining of part before and part after handle
var complete string
- if len(s) >= len(handle) + 1 {
+ if len(s) >= len(handle)+1 {
if i > 0 {
complete = s[:i]
}
// the '+1' is for the newline character
- complete += s[i + len(handle) + 1:]
+ complete += s[i+len(handle)+1:]
}
f, err := os.Create(filepath.Join(a.TempDir, url))
@@ -221,7 +233,7 @@ func (a *LocalConn) Download(bucket string, key string, path string) error {
// Upload just copies the file from path to TempDir/bucket/key
func (a *LocalConn) Upload(bucket string, key string, path string) error {
d := filepath.Join(a.TempDir, bucket, filepath.Dir(key))
- err := os.Mkdir(d, 0700)
+ err := os.MkdirAll(d, 0700)
if err != nil && !os.IsExist(err) {
return fmt.Errorf("Error creating temporary directory: %v", err)
}
diff --git a/makefile b/makefile
new file mode 100644
index 0000000..6ba1af5
--- /dev/null
+++ b/makefile
@@ -0,0 +1,12 @@
+# See LICENSE file for copyright and license details.
+
+default:
+ @echo "To build and install use the basic go tools like so: go install ./..."
+ @echo "This makefile is just for cross compiling (for which the"
+ @echo "targets rescribe-osx and rescribe-w32 exist)"
+
+rescribe-osx:
+ GOOS=darwin GOARCH=amd64 go build -o $@ ./cmd/rescribe
+
+rescribe.exe:
+ GOOS=windows GOARCH=386 go build -o $@ ./cmd/rescribe
diff --git a/pdf.go b/pdf.go
index f8217ba..64a8654 100644
--- a/pdf.go
+++ b/pdf.go
@@ -16,7 +16,7 @@ import (
"io/ioutil"
"os"
- "github.com/jung-kurt/gofpdf"
+ "github.com/phpdave11/gofpdf"
"golang.org/x/image/draw"
"rescribe.xyz/utils/pkg/hocr"
)