summaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorNick White <git@njw.name>2021-02-05 17:15:51 +0000
committerNick White <git@njw.name>2021-02-05 17:15:51 +0000
commit11470933e4fd379b4aefa4e2bab33662a72791c2 (patch)
tree8607e7739989ff63032b9ce10a8bf8553ecc6eb6 /cmd
parent3e7da751b3ca917adb79674eac4ef2a3267e3984 (diff)
parenta8c7481f0dc02bbda3b3a07091e9d61f6eb728b2 (diff)
Merge branch 'master' of ssh://ssh.phx.nearlyfreespeech.net/home/public/bookpipeline
Diffstat (limited to 'cmd')
-rw-r--r--cmd/bookpipeline/main.go765
-rw-r--r--cmd/booktopipeline/main.go92
-rw-r--r--cmd/getbests/main.go4
-rw-r--r--cmd/getpipelinebook/main.go98
-rw-r--r--cmd/logwholequeue/main.go85
-rw-r--r--cmd/lspipeline/main.go82
-rw-r--r--cmd/postprocess-bythresh/main.go71
-rw-r--r--cmd/rescribe/main.go395
-rw-r--r--cmd/rmbook/main.go87
-rw-r--r--cmd/trimqueue/main.go84
10 files changed, 840 insertions, 923 deletions
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go
index 36295a6..909b431 100644
--- a/cmd/bookpipeline/main.go
+++ b/cmd/bookpipeline/main.go
@@ -11,23 +11,18 @@ import (
"bytes"
"flag"
"fmt"
- "io/ioutil"
"log"
- "net/smtp"
"os"
"os/exec"
- "path/filepath"
"regexp"
- "sort"
- "strings"
"time"
"rescribe.xyz/bookpipeline"
- "rescribe.xyz/preproc"
- "rescribe.xyz/utils/pkg/hocr"
+
+ "rescribe.xyz/bookpipeline/internal/pipeline"
)
-const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false]
+const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] [-autostop secs]
Watches the preprocess, wipeonly, ocrpage and analyse queues for messages.
When one is found this general process is followed:
@@ -47,10 +42,9 @@ this put a text file in {UserConfigDir}/bookpipeline/mailsettings with
the contents: {smtpserver} {port} {username} {password} {from} {to}
`
+const QueueTimeoutSecs = 2 * 60
const PauseBetweenChecks = 3 * time.Minute
-const TimeBeforeShutdown = 5 * time.Minute
const LogSaveTime = 1 * time.Minute
-const HeartbeatSeconds = 60
// null writer to enable non-verbose logging to be discarded
type NullWriter bool
@@ -81,686 +75,16 @@ type Pipeliner interface {
Log(v ...interface{})
}
-type pageimg struct {
- hocr, img string
-}
-
-type mailSettings struct {
- server, port, user, pass, from, to string
-}
-
-func getMailSettings() (mailSettings, error) {
- p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings")
- b, err := ioutil.ReadFile(p)
- if err != nil {
- return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err)
- }
- f := strings.Fields(string(b))
- if len(f) != 6 {
- return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f))
- }
- return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil
-}
-
-func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) {
- for key := range dl {
- fn := filepath.Join(dir, filepath.Base(key))
- logger.Println("Downloading", key)
- err := conn.Download(conn.WIPStorageId(), key, fn)
- if err != nil {
- for range dl {
- } // consume the rest of the receiving channel so it isn't blocked
- close(process)
- errc <- err
- return
- }
- process <- fn
- }
- close(process)
-}
-
-func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) {
- for path := range c {
- name := filepath.Base(path)
- key := bookname + "/" + name
- logger.Println("Uploading", key)
- err := conn.Upload(conn.WIPStorageId(), key, path)
- if err != nil {
- for range c {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- err = os.Remove(path)
- if err != nil {
- for range c {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- }
-
- done <- true
-}
-
-func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) {
- for path := range c {
- name := filepath.Base(path)
- key := bookname + "/" + name
- logger.Println("Uploading", key)
- err := conn.Upload(conn.WIPStorageId(), key, path)
- if err != nil {
- for range c {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- err = os.Remove(path)
- if err != nil {
- for range c {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- logger.Println("Adding", key, training, "to queue", toQueue)
- err = conn.AddToQueue(toQueue, key+" "+training)
- if err != nil {
- for range c {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- }
-
- done <- true
-}
-
-func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) {
- for path := range pre {
- logger.Println("Preprocessing", path)
- done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30)
- if err != nil {
- for range pre {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- _ = os.Remove(path)
- for _, p := range done {
- up <- p
- }
- }
- close(up)
-}
-
-func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) {
- for path := range towipe {
- logger.Println("Wiping", path)
- s := strings.Split(path, ".")
- base := strings.Join(s[:len(s)-1], "")
- outpath := base + "_bin0.0.png"
- err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30)
- if err != nil {
- for range towipe {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- up <- outpath
- }
- close(up)
-}
-
-func ocr(training string) func(chan string, chan string, chan error, *log.Logger) {
- return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) {
- for path := range toocr {
- logger.Println("OCRing", path)
- name := strings.Replace(path, ".png", "", 1)
- cmd := exec.Command("tesseract", "-l", training, path, name, "hocr")
- var stdout, stderr bytes.Buffer
- cmd.Stdout = &stdout
- cmd.Stderr = &stderr
- err := cmd.Run()
- if err != nil {
- for range toocr {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String())
- return
- }
- up <- name + ".hocr"
- }
- close(up)
- }
-}
-
-func analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) {
- return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) {
- confs := make(map[string][]*bookpipeline.Conf)
- bestconfs := make(map[string]*bookpipeline.Conf)
- savedir := ""
-
- for path := range toanalyse {
- if savedir == "" {
- savedir = filepath.Dir(path)
- }
- logger.Println("Calculating confidence for", path)
- avg, err := hocr.GetAvgConf(path)
- if err != nil && err.Error() == "No words found" {
- continue
- }
- if err != nil {
- for range toanalyse {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err)
- return
- }
- base := filepath.Base(path)
- codestart := strings.Index(base, "_bin")
- name := base[0:codestart]
- var c bookpipeline.Conf
- c.Path = path
- c.Code = base[codestart:]
- c.Conf = avg
- confs[name] = append(confs[name], &c)
- }
-
- fn := filepath.Join(savedir, "conf")
- logger.Println("Saving confidences in file", fn)
- f, err := os.Create(fn)
- if err != nil {
- errc <- fmt.Errorf("Error creating file %s: %s", fn, err)
- return
- }
- defer f.Close()
-
- logger.Println("Finding best confidence for each page, and saving all confidences")
- for base, conf := range confs {
- var best float64
- for _, c := range conf {
- if c.Conf > best {
- best = c.Conf
- bestconfs[base] = c
- }
- _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf)
- if err != nil {
- errc <- fmt.Errorf("Error writing confidences file: %s", err)
- return
- }
- }
- }
- up <- fn
-
- logger.Println("Creating best file listing the best file for each page")
- fn = filepath.Join(savedir, "best")
- f, err = os.Create(fn)
- if err != nil {
- errc <- fmt.Errorf("Error creating file %s: %s", fn, err)
- return
- }
- defer f.Close()
- for _, conf := range bestconfs {
- _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path))
- }
- up <- fn
-
- var pgs []string
- for _, conf := range bestconfs {
- pgs = append(pgs, conf.Path)
- }
- sort.Strings(pgs)
-
- logger.Println("Downloading binarised and original images to create PDFs")
- bookname, err := filepath.Rel(os.TempDir(), savedir)
- if err != nil {
- errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err)
- return
- }
- colourpdf := new(bookpipeline.Fpdf)
- err = colourpdf.Setup()
- if err != nil {
- errc <- fmt.Errorf("Failed to set up PDF: %s", err)
- return
- }
- binarisedpdf := new(bookpipeline.Fpdf)
- err = binarisedpdf.Setup()
- if err != nil {
- errc <- fmt.Errorf("Failed to set up PDF: %s", err)
- return
- }
- binhascontent, colourhascontent := false, false
-
- var colourimgs, binimgs []pageimg
-
- for _, pg := range pgs {
- base := filepath.Base(pg)
- nosuffix := strings.TrimSuffix(base, ".hocr")
- p := strings.SplitN(base, "_bin", 2)
-
- var fn string
- if len(p) > 1 {
- fn = p[0] + ".jpg"
- } else {
- fn = nosuffix + ".jpg"
- }
-
- binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"})
- colourimgs = append(colourimgs, pageimg{hocr: base, img: fn})
- }
-
- for _, pg := range binimgs {
- logger.Println("Downloading binarised page to add to PDF", pg.img)
- err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img))
- if err != nil {
- logger.Println("Download failed; skipping page", pg.img)
- } else {
- err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true)
- if err != nil {
- errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err)
- return
- }
- binhascontent = true
- err = os.Remove(filepath.Join(savedir, pg.img))
- if err != nil {
- errc <- err
- return
- }
- }
- }
-
- if binhascontent {
- fn = filepath.Join(savedir, bookname+".binarised.pdf")
- err = binarisedpdf.Save(fn)
- if err != nil {
- errc <- fmt.Errorf("Failed to save binarised pdf: %s", err)
- return
- }
- up <- fn
- key := bookname + "/" + bookname + ".binarised.pdf"
- conn.Log("Uploading", key)
- err := conn.Upload(conn.WIPStorageId(), key, fn)
- if err != nil {
- }
- }
-
- for _, pg := range colourimgs {
- logger.Println("Downloading colour page to add to PDF", pg.img)
- colourfn := pg.img
- err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn))
- if err != nil {
- colourfn = strings.Replace(pg.img, ".jpg", ".png", 1)
- logger.Println("Download failed; trying", colourfn)
- err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn))
- if err != nil {
- logger.Println("Download failed; skipping page", pg.img)
- }
- }
- if err == nil {
- err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true)
- if err != nil {
- errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err)
- return
- }
- colourhascontent = true
- err = os.Remove(filepath.Join(savedir, colourfn))
- if err != nil {
- errc <- err
- return
- }
- }
- }
- if colourhascontent {
- fn = filepath.Join(savedir, bookname+".colour.pdf")
- err = colourpdf.Save(fn)
- if err != nil {
- errc <- fmt.Errorf("Failed to save colour pdf: %s", err)
- return
- }
- up <- fn
- }
-
- logger.Println("Creating graph")
- fn = filepath.Join(savedir, "graph.png")
- f, err = os.Create(fn)
- if err != nil {
- errc <- fmt.Errorf("Error creating file %s: %s", fn, err)
- return
- }
- defer f.Close()
- err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f)
- if err != nil && err.Error() != "Not enough valid confidences" {
- errc <- fmt.Errorf("Error rendering graph: %s", err)
- return
- }
- up <- fn
-
- close(up)
- }
-}
-
-func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) {
- currentmsg := msg
- for range t.C {
- m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2)
- if err != nil {
- // This is for better debugging of the heartbeat issue
- conn.Log("Error with heartbeat", err)
- os.Exit(1)
- // TODO: would be better to ensure this error stops any running
- // processes, as they will ultimately fail in the case of
- // it. could do this by setting a global variable that
- // processes check each time they loop.
- errc <- err
- t.Stop()
- return
- }
- if m.Id != "" {
- conn.Log("Replaced message handle as visibilitytimeout limit was reached")
- currentmsg = m
- // TODO: maybe handle communicating new msg more gracefully than this
- for range msgc {
- } // throw away any old msgc
- msgc <- m
- }
- }
-}
-
-// allOCRed checks whether all pages of a book have been OCRed.
-// This is determined by whether every _bin0.?.png file has a
-// corresponding .hocr file.
-func allOCRed(bookname string, conn Pipeliner) bool {
- objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
- if err != nil {
- return false
- }
-
- preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`)
-
- atleastone := false
- for _, png := range objs {
- if preprocessedPattern.MatchString(png) {
- atleastone = true
- found := false
- b := strings.TrimSuffix(filepath.Base(png), ".png")
- hocrname := bookname + "/" + b + ".hocr"
- for _, hocr := range objs {
- if hocr == hocrname {
- found = true
- break
- }
- }
- if found == false {
- return false
- }
- }
- }
- if atleastone == false {
- return false
- }
- return true
-}
-
-// ocrPage OCRs a page based on a message. It may make sense to
-// roll this back into processBook (on which it is based) once
-// working well.
-func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error {
- dl := make(chan string)
- msgc := make(chan bookpipeline.Qmsg)
- processc := make(chan string)
- upc := make(chan string)
- done := make(chan bool)
- errc := make(chan error)
-
- msgparts := strings.Split(msg.Body, " ")
- bookname := filepath.Dir(msgparts[0])
- if len(msgparts) > 1 && msgparts[1] != "" {
- process = ocr(msgparts[1])
- }
-
- d := filepath.Join(os.TempDir(), bookname)
- err := os.MkdirAll(d, 0755)
- if err != nil {
- return fmt.Errorf("Failed to create directory %s: %s", d, err)
- }
-
- t := time.NewTicker(HeartbeatSeconds * time.Second)
- go heartbeat(conn, t, msg, fromQueue, msgc, errc)
-
- // these functions will do their jobs when their channels have data
- go download(dl, processc, conn, d, errc, conn.GetLogger())
- go process(processc, upc, errc, conn.GetLogger())
- go up(upc, done, conn, bookname, errc, conn.GetLogger())
-
- dl <- msgparts[0]
- close(dl)
-
- // wait for either the done or errc channel to be sent to
- select {
- case err = <-errc:
- t.Stop()
- _ = os.RemoveAll(d)
- return err
- case <-done:
- }
-
- if allOCRed(bookname, conn) && toQueue != "" {
- conn.Log("Sending", bookname, "to queue", toQueue)
- err = conn.AddToQueue(toQueue, bookname)
- if err != nil {
- t.Stop()
- _ = os.RemoveAll(d)
- return fmt.Errorf("Error adding to queue %s: %s", bookname, err)
- }
- }
-
- t.Stop()
-
- // check whether we're using a newer msg handle
- select {
- case m, ok := <-msgc:
- if ok {
- msg = m
- conn.Log("Using new message handle to delete message from queue")
- }
- default:
- conn.Log("Using original message handle to delete message from queue")
- }
-
- conn.Log("Deleting original message from queue", fromQueue)
- err = conn.DelFromQueue(fromQueue, msg.Handle)
- if err != nil {
- _ = os.RemoveAll(d)
- return fmt.Errorf("Error deleting message from queue: %s", err)
- }
-
- err = os.RemoveAll(d)
- if err != nil {
- return fmt.Errorf("Failed to remove directory %s: %s", d, err)
- }
-
- return nil
-}
-
-func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
- dl := make(chan string)
- msgc := make(chan bookpipeline.Qmsg)
- processc := make(chan string)
- upc := make(chan string)
- done := make(chan bool)
- errc := make(chan error)
-
- msgparts := strings.Split(msg.Body, " ")
- bookname := msgparts[0]
-
- var training string
- if len(msgparts) > 1 {
- training = msgparts[1]
- }
-
- d := filepath.Join(os.TempDir(), bookname)
- err := os.MkdirAll(d, 0755)
- if err != nil {
- return fmt.Errorf("Failed to create directory %s: %s", d, err)
- }
-
- t := time.NewTicker(HeartbeatSeconds * time.Second)
- go heartbeat(conn, t, msg, fromQueue, msgc, errc)
-
- // these functions will do their jobs when their channels have data
- go download(dl, processc, conn, d, errc, conn.GetLogger())
- go process(processc, upc, errc, conn.GetLogger())
- if toQueue == conn.OCRPageQueueId() {
- go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger())
- } else {
- go up(upc, done, conn, bookname, errc, conn.GetLogger())
- }
-
- conn.Log("Getting list of objects to download")
- objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
- if err != nil {
- t.Stop()
- _ = os.RemoveAll(d)
- return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err)
- }
- var todl []string
- for _, n := range objs {
- if !match.MatchString(n) {
- conn.Log("Skipping item that doesn't match target", n)
- continue
- }
- todl = append(todl, n)
- }
- for _, a := range todl {
- dl <- a
- }
- close(dl)
-
- // wait for either the done or errc channel to be sent to
- select {
- case err = <-errc:
- t.Stop()
- _ = os.RemoveAll(d)
- // if the error is in preprocessing / wipeonly, chances are that it will never
- // complete, and will fill the ocrpage queue with parts which succeeded
- // on each run, so in that case it's better to delete the message from
- // the queue and notify us.
- if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() {
- conn.Log("Deleting message from queue due to a bad error", fromQueue)
- err2 := conn.DelFromQueue(fromQueue, msg.Handle)
- if err2 != nil {
- conn.Log("Error deleting message from queue", err2)
- }
- ms, err2 := getMailSettings()
- if err2 != nil {
- conn.Log("Failed to mail settings ", err2)
- }
- if err2 == nil && ms.server != "" {
- logs, err2 := getlogs()
- if err2 != nil {
- conn.Log("Failed to get logs ", err2)
- logs = ""
- }
- msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" +
- "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" +
- " Fail message: %s\r\nFull log:\r\n%s\r\n",
- ms.to, ms.from, bookname, err, logs)
- host := fmt.Sprintf("%s:%s", ms.server, ms.port)
- auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server)
- err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg))
- if err2 != nil {
- conn.Log("Error sending email ", err2)
- }
- }
- }
- return err
- case <-done:
- }
-
- if toQueue != "" && toQueue != conn.OCRPageQueueId() {
- conn.Log("Sending", bookname, "to queue", toQueue)
- err = conn.AddToQueue(toQueue, bookname)
- if err != nil {
- t.Stop()
- _ = os.RemoveAll(d)
- return fmt.Errorf("Error adding to queue %s: %s", bookname, err)
- }
- }
-
- t.Stop()
-
- // check whether we're using a newer msg handle
- select {
- case m, ok := <-msgc:
- if ok {
- msg = m
- conn.Log("Using new message handle to delete message from queue")
- }
- default:
- conn.Log("Using original message handle to delete message from queue")
- }
-
- conn.Log("Deleting original message from queue", fromQueue)
- err = conn.DelFromQueue(fromQueue, msg.Handle)
- if err != nil {
- _ = os.RemoveAll(d)
- return fmt.Errorf("Error deleting message from queue: %s", err)
- }
-
- err = os.RemoveAll(d)
- if err != nil {
- return fmt.Errorf("Failed to remove directory %s: %s", d, err)
- }
-
- return nil
-}
-
func stopTimer(t *time.Timer) {
if !t.Stop() {
<-t.C
}
}
-// TODO: rather than relying on journald, would be nicer to save the logs
-// ourselves maybe, so that we weren't relying on a particular systemd
-// setup. this can be done by having the conn.Log also append line
-// to a file (though that would mean everything would have to go through
-// conn.Log, which we're not consistently doing yet). the correct thing
-// to do then would be to implement a new interface that covers the part
-// of log.Logger we use (e.g. Print and Printf), and then have an exported
-// conn struct that implements those, so that we could pass a log.Logger
-// or the new conn struct everywhere (we wouldn't be passing a log.Logger,
-// it's just good to be able to keep the compatibility)
-func getlogs() (string, error) {
- cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all")
- var stdout, stderr bytes.Buffer
- cmd.Stdout = &stdout
- cmd.Stderr = &stderr
- err := cmd.Run()
- return stdout.String(), err
-}
-
-func savelogs(conn Pipeliner, starttime int64, hostname string) error {
- logs, err := getlogs()
- if err != nil {
- return fmt.Errorf("Error getting logs, error: %v", err)
+func resetTimer(t *time.Timer, d time.Duration) {
+ if d > 0 {
+ t.Reset(d)
}
- key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname)
- path := filepath.Join(os.TempDir(), key)
- f, err := os.Create(path)
- if err != nil {
- return fmt.Errorf("Error creating log file", err)
- }
- defer f.Close()
- _, err = f.WriteString(logs)
- if err != nil {
- return fmt.Errorf("Error saving log file", err)
- }
- _ = f.Close()
- err = conn.Upload(conn.WIPStorageId(), key, path)
- if err != nil {
- return fmt.Errorf("Error uploading log", err)
- }
- conn.Log("Log saved to", key)
- return nil
}
func main() {
@@ -770,7 +94,8 @@ func main() {
nowipe := flag.Bool("nw", false, "disable wipeonly")
noocrpg := flag.Bool("nop", false, "disable ocr on individual pages")
noanalyse := flag.Bool("na", false, "disable analysis")
- autoshutdown := flag.Bool("shutdown", false, "automatically shut down if no work has been available for 5 minutes")
+ autostop := flag.Int64("autostop", 300, "automatically stop process if no work has been available for this number of seconds (to disable autostop set to 0)")
+ autoshutdown := flag.Bool("shutdown", false, "automatically shut down host computer if there has been no work to do for the duration set with -autostop")
conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")
flag.Usage = func() {
@@ -801,17 +126,20 @@ func main() {
log.Fatalln("Unknown connection type")
}
- _, err := getMailSettings()
- if err != nil {
- conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err)
+ var err error
+ if *conntype != "local" {
+ _, err = pipeline.GetMailSettings()
+ if err != nil {
+ conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err)
+ }
}
- conn.Log("Setting up AWS session")
+ conn.Log("Setting up session")
err = conn.Init()
if err != nil {
- log.Fatalln("Error setting up cloud connection:", err)
+ log.Fatalln("Error setting up connection:", err)
}
- conn.Log("Finished setting up AWS session")
+ conn.Log("Finished setting up session")
starttime := time.Now().Unix()
hostname, err := os.Hostname()
@@ -820,7 +148,7 @@ func main() {
var checkWipeQueue <-chan time.Time
var checkOCRPageQueue <-chan time.Time
var checkAnalyseQueue <-chan time.Time
- var shutdownIfQuiet *time.Timer
+ var stopIfQuiet *time.Timer
var savelognow *time.Ticker
if !*nopreproc {
checkPreQueue = time.After(0)
@@ -834,13 +162,21 @@ func main() {
if !*noanalyse {
checkAnalyseQueue = time.After(0)
}
- shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown)
+ var quietTime = time.Duration(*autostop) * time.Second
+ stopIfQuiet = time.NewTimer(quietTime)
+ if quietTime == 0 {
+ stopIfQuiet.Stop()
+ }
+
savelognow = time.NewTicker(LogSaveTime)
+ if *conntype == "local" {
+ savelognow.Stop()
+ }
for {
select {
case <-checkPreQueue:
- msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatSeconds*2)
+ msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs)
checkPreQueue = time.After(PauseBetweenChecks)
if err != nil {
conn.Log("Error checking preprocess queue", err)
@@ -851,14 +187,14 @@ func main() {
continue
}
conn.Log("Message received on preprocess queue, processing", msg.Body)
- stopTimer(shutdownIfQuiet)
- err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
- shutdownIfQuiet.Reset(TimeBeforeShutdown)
+ stopTimer(stopIfQuiet)
+ err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
+ resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during preprocess", err)
}
case <-checkWipeQueue:
- msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatSeconds*2)
+ msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs)
checkWipeQueue = time.After(PauseBetweenChecks)
if err != nil {
conn.Log("Error checking wipeonly queue", err)
@@ -868,15 +204,15 @@ func main() {
conn.Log("No message received on wipeonly queue, sleeping")
continue
}
- stopTimer(shutdownIfQuiet)
+ stopTimer(stopIfQuiet)
conn.Log("Message received on wipeonly queue, processing", msg.Body)
- err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
- shutdownIfQuiet.Reset(TimeBeforeShutdown)
+ err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
+ resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during wipe", err)
}
case <-checkOCRPageQueue:
- msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatSeconds*2)
+ msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs)
checkOCRPageQueue = time.After(PauseBetweenChecks)
if err != nil {
conn.Log("Error checking OCR Page queue", err)
@@ -888,15 +224,15 @@ func main() {
// Have OCRPageQueue checked immediately after completion, as chances are high that
// there will be more pages that should be done without delay
checkOCRPageQueue = time.After(0)
- stopTimer(shutdownIfQuiet)
+ stopTimer(stopIfQuiet)
conn.Log("Message received on OCR Page queue, processing", msg.Body)
- err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId())
- shutdownIfQuiet.Reset(TimeBeforeShutdown)
+ err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training, ""), conn.OCRPageQueueId(), conn.AnalyseQueueId())
+ resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during OCR Page process", err)
}
case <-checkAnalyseQueue:
- msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatSeconds*2)
+ msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs)
checkAnalyseQueue = time.After(PauseBetweenChecks)
if err != nil {
conn.Log("Error checking analyse queue", err)
@@ -906,25 +242,30 @@ func main() {
conn.Log("No message received on analyse queue, sleeping")
continue
}
- stopTimer(shutdownIfQuiet)
+ stopTimer(stopIfQuiet)
conn.Log("Message received on analyse queue, processing", msg.Body)
- err = processBook(msg, conn, analyse(conn), ocredPattern, conn.AnalyseQueueId(), "")
- shutdownIfQuiet.Reset(TimeBeforeShutdown)
+ err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "")
+ resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during analysis", err)
}
case <-savelognow.C:
conn.Log("Saving logs")
- err = savelogs(conn, starttime, hostname)
+ err = pipeline.SaveLogs(conn, starttime, hostname)
if err != nil {
conn.Log("Error saving logs", err)
}
- case <-shutdownIfQuiet.C:
- if !*autoshutdown {
+ case <-stopIfQuiet.C:
+ if quietTime == 0 {
continue
}
+ if !*autoshutdown {
+ conn.Log("Stopping pipeline")
+ _ = pipeline.SaveLogs(conn, starttime, hostname)
+ return
+ }
conn.Log("Shutting down")
- _ = savelogs(conn, starttime, hostname)
+ _ = pipeline.SaveLogs(conn, starttime, hostname)
cmd := exec.Command("sudo", "systemctl", "poweroff")
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go
index 60d1f81..b4f4d99 100644
--- a/cmd/booktopipeline/main.go
+++ b/cmd/booktopipeline/main.go
@@ -9,14 +9,13 @@ package main
import (
"flag"
"fmt"
- "image"
- _ "image/png"
- _ "image/jpeg"
"log"
"os"
"path/filepath"
"rescribe.xyz/bookpipeline"
+
+ "rescribe.xyz/bookpipeline/internal/pipeline"
)
const usage = `Usage: booktopipeline [-c conn] [-t training] [-prebinarised] [-notbinarised] [-v] bookdir [bookname]
@@ -32,15 +31,6 @@ using the flags -prebinarised (for the wipeonly queue) or
If bookname is omitted the last part of the bookdir is used.
`
-type Pipeliner interface {
- Init() error
- PreQueueId() string
- WipeQueueId() string
- WIPStorageId() string
- AddToQueue(url string, msg string) error
- Upload(bucket string, key string, path string) error
-}
-
// null writer to enable non-verbose logging to be discarded
type NullWriter bool
@@ -50,18 +40,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) {
var verboselog *log.Logger
-type fileWalk chan string
-
-func (f fileWalk) Walk(path string, info os.FileInfo, err error) error {
- if err != nil {
- return err
- }
- if !info.IsDir() {
- f <- path
- }
- return nil
-}
-
func main() {
verbose := flag.Bool("v", false, "Verbose")
conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")
@@ -94,7 +72,7 @@ func main() {
verboselog = log.New(n, "", log.LstdFlags)
}
- var conn Pipeliner
+ var conn pipeline.Pipeliner
switch *conntype {
case "aws":
conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
@@ -108,18 +86,7 @@ func main() {
log.Fatalln("Failed to set up cloud connection:", err)
}
- qid := conn.PreQueueId()
-
- // Auto detect type of queue to send to based on file extension
- pngdirs, _ := filepath.Glob(bookdir + "/*.png")
- jpgdirs, _ := filepath.Glob(bookdir + "/*.jpg")
- pngcount := len(pngdirs)
- jpgcount := len(jpgdirs)
- if pngcount > jpgcount {
- qid = conn.WipeQueueId()
- } else {
- qid = conn.PreQueueId()
- }
+ qid := pipeline.DetectQueueType(bookdir, conn)
// Flags set override the queue selection
if *wipeonly {
@@ -130,43 +97,24 @@ func main() {
}
verboselog.Println("Checking that all images are valid in", bookdir)
- checker := make(fileWalk)
- go func() {
- err = filepath.Walk(bookdir, checker.Walk)
- if err != nil {
- log.Fatalln("Filesystem walk failed:", err)
- }
- close(checker)
- }()
-
- for path := range checker {
- f, err := os.Open(path)
- if err != nil {
- log.Fatalln("Opening image %s failed, bailing: %v", path, err)
- }
- _, _, err = image.Decode(f)
- if err != nil {
- log.Fatalf("Decoding image %s failed, bailing: %v", path, err)
- }
+ err = pipeline.CheckImages(bookdir)
+ if err != nil {
+ log.Fatalln(err)
}
- verboselog.Println("Walking", bookdir)
- walker := make(fileWalk)
- go func() {
- err = filepath.Walk(bookdir, walker.Walk)
- if err != nil {
- log.Fatalln("Filesystem walk failed:", err)
- }
- close(walker)
- }()
-
- for path := range walker {
- verboselog.Println("Uploading", path)
- name := filepath.Base(path)
- err = conn.Upload(conn.WIPStorageId(), filepath.Join(bookname, name), path)
- if err != nil {
- log.Fatalln("Failed to upload", path, err)
- }
+ verboselog.Println("Checking that a book hasn't already been uploaded with that name")
+ list, err := conn.ListObjects(conn.WIPStorageId(), bookname)
+ if err != nil {
+ log.Fatalln(err)
+ }
+ if len(list) > 0 {
+ log.Fatalf("Error: There is already a book in S3 named %s", bookname)
+ }
+
+ verboselog.Println("Uploading all images are valid in", bookdir)
+ err = pipeline.UploadImages(bookdir, bookname, conn)
+ if err != nil {
+ log.Fatalln(err)
}
if *training != "" {
diff --git a/cmd/getbests/main.go b/cmd/getbests/main.go
index 9eca0d8..c1ee50d 100644
--- a/cmd/getbests/main.go
+++ b/cmd/getbests/main.go
@@ -62,8 +62,8 @@ func main() {
log.Println("Downloading all best files found")
for _, i := range objs {
parts := strings.Split(i, "/")
- if parts[len(parts) - 1] == "best" {
- err = conn.Download(conn.WIPStorageId(), i, parts[0] + "-best")
+ if parts[len(parts)-1] == "best" {
+ err = conn.Download(conn.WIPStorageId(), i, parts[0]+"-best")
if err != nil {
log.Fatalln("Failed to download file", i, err)
}
diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go
index 03e709b..ccedd72 100644
--- a/cmd/getpipelinebook/main.go
+++ b/cmd/getpipelinebook/main.go
@@ -6,15 +6,15 @@
package main
import (
- "bufio"
"flag"
"fmt"
"log"
"os"
"path/filepath"
- "strings"
"rescribe.xyz/bookpipeline"
+
+ "rescribe.xyz/bookpipeline/internal/pipeline"
)
const usage = `Usage: getpipelinebook [-c conn] [-a] [-graph] [-pdf] [-png] [-v] bookname
@@ -33,28 +33,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) {
return len(p), nil
}
-type Pipeliner interface {
- MinimalInit() error
- ListObjects(bucket string, prefix string) ([]string, error)
- Download(bucket string, key string, fn string) error
- Upload(bucket string, key string, path string) error
- CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
- AddToQueue(url string, msg string) error
- DelFromQueue(url string, handle string) error
- WIPStorageId() string
-}
-
-func getpdfs(conn Pipeliner, l *log.Logger, bookname string) {
- for _, suffix := range []string{".colour.pdf", ".binarised.pdf"} {
- fn := filepath.Join(bookname, bookname+suffix)
- l.Println("Downloading PDF", fn)
- err := conn.Download(conn.WIPStorageId(), fn, fn)
- if err != nil {
- log.Printf("Failed to download %s: %s\n", fn, err)
- }
- }
-}
-
func main() {
all := flag.Bool("a", false, "Get all files for book")
conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")
@@ -83,7 +61,7 @@ func main() {
verboselog = log.New(n, "", log.LstdFlags)
}
- var conn Pipeliner
+ var conn pipeline.MinPipeliner
switch *conntype {
case "aws":
conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
@@ -109,18 +87,10 @@ func main() {
if *all {
verboselog.Println("Downloading all files for", bookname)
- objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
+ err = pipeline.DownloadAll(bookname, bookname, conn)
if err != nil {
- log.Fatalln("Failed to get list of files for book", bookname, err)
- }
- for _, i := range objs {
- verboselog.Println("Downloading", i)
- err = conn.Download(conn.WIPStorageId(), i, i)
- if err != nil {
- log.Fatalln("Failed to download file", i, err)
- }
+ log.Fatalln(err)
}
- return
}
if *binarisedpdf {
@@ -151,61 +121,29 @@ func main() {
}
if *pdf {
- getpdfs(conn, verboselog, bookname)
+ verboselog.Println("Downloading PDFs")
+ pipeline.DownloadPdfs(bookname, bookname, conn)
}
if *binarisedpdf || *colourpdf || *graph || *pdf {
return
}
- verboselog.Println("Downloading best file")
- fn := filepath.Join(bookname, "best")
- err = conn.Download(conn.WIPStorageId(), fn, fn)
+ verboselog.Println("Downloading best pages")
+ err = pipeline.DownloadBestPages(bookname, bookname, conn, *png)
if err != nil {
- log.Fatalln("Failed to download 'best' file", err)
- }
- f, err := os.Open(fn)
- if err != nil {
- log.Fatalln("Failed to open best file", err)
- }
- defer f.Close()
-
- if *png {
- verboselog.Println("Downloading png files")
- s := bufio.NewScanner(f)
- for s.Scan() {
- txtfn := filepath.Join(bookname, s.Text())
- fn = strings.Replace(txtfn, ".hocr", ".png", 1)
- verboselog.Println("Downloading file", fn)
- err = conn.Download(conn.WIPStorageId(), fn, fn)
- if err != nil {
- log.Fatalln("Failed to download file", fn, err)
- }
- }
- return
+ log.Fatalln(err)
}
- verboselog.Println("Downloading HOCR files")
- s := bufio.NewScanner(f)
- for s.Scan() {
- fn = filepath.Join(bookname, s.Text())
- verboselog.Println("Downloading file", fn)
- err = conn.Download(conn.WIPStorageId(), fn, fn)
- if err != nil {
- log.Fatalln("Failed to download file", fn, err)
- }
+ verboselog.Println("Downloading PDFs")
+ pipeline.DownloadPdfs(bookname, bookname, conn)
+ if err != nil {
+ log.Fatalln(err)
}
- verboselog.Println("Downloading PDF files")
- getpdfs(conn, verboselog, bookname)
-
- verboselog.Println("Downloading analysis files")
- for _, a := range []string{"conf", "graph.png"} {
- fn = filepath.Join(bookname, a)
- verboselog.Println("Downloading file", fn)
- err = conn.Download(conn.WIPStorageId(), fn, fn)
- if err != nil {
- log.Fatalln("Failed to download file", fn, err)
- }
+ verboselog.Println("Downloading analyses")
+ err = pipeline.DownloadAnalyses(bookname, bookname, conn)
+ if err != nil {
+ log.Fatalln(err)
}
}
diff --git a/cmd/logwholequeue/main.go b/cmd/logwholequeue/main.go
new file mode 100644
index 0000000..71e8927
--- /dev/null
+++ b/cmd/logwholequeue/main.go
@@ -0,0 +1,85 @@
+// Copyright 2019 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+// logwholequeue gets all messages in a queue. This can be useful
+// for debugging queue issues.
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+
+ "rescribe.xyz/bookpipeline"
+)
+
+const usage = `Usage: logwholequeue qname
+
+logwholequeue gets all messages in a queue.
+
+This can be useful for debugging queue issues.
+
+Valid queue names:
+- preprocess
+- wipeonly
+- ocrpage
+- analyse
+`
+
+type QueuePipeliner interface {
+ Init() error
+ LogQueue(url string) error
+ PreQueueId() string
+ WipeQueueId() string
+ OCRPageQueueId() string
+ AnalyseQueueId() string
+}
+
+func main() {
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(), usage)
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+
+ if flag.NArg() != 1 {
+ flag.Usage()
+ return
+ }
+
+ var conn QueuePipeliner
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2"}
+
+ err := conn.Init()
+ if err != nil {
+ log.Fatalln("Error setting up cloud connection:", err)
+ }
+
+ qdetails := []struct {
+ id, name string
+ }{
+ {conn.PreQueueId(), "preprocess"},
+ {conn.WipeQueueId(), "wipeonly"},
+ {conn.OCRPageQueueId(), "ocrpage"},
+ {conn.AnalyseQueueId(), "analyse"},
+ }
+
+ qname := flag.Arg(0)
+
+ var qid string
+ for i, n := range qdetails {
+ if n.name == qname {
+ qid = qdetails[i].id
+ break
+ }
+ }
+ if qid == "" {
+ log.Fatalln("Error, no queue named", qname)
+ }
+
+ err = conn.LogQueue(qid)
+ if err != nil {
+ log.Fatalln("Error getting queue", qname, ":", err)
+ }
+}
diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go
index b649778..131ff12 100644
--- a/cmd/lspipeline/main.go
+++ b/cmd/lspipeline/main.go
@@ -12,6 +12,7 @@ import (
"os/exec"
"sort"
"strings"
+ "time"
"rescribe.xyz/bookpipeline"
)
@@ -35,7 +36,7 @@ type LsPipeliner interface {
AnalyseQueueId() string
GetQueueDetails(url string) (string, string, error)
GetInstanceDetails() ([]bookpipeline.InstanceDetails, error)
- ListObjectsWithMeta(bucket string, prefix string) ([]bookpipeline.ObjMeta, error)
+ ListObjectWithMeta(bucket string, prefix string) (bookpipeline.ObjMeta, error)
ListObjectPrefixes(bucket string) ([]string, error)
WIPStorageId() string
}
@@ -100,43 +101,88 @@ func (o ObjMetas) Less(i, j int) bool {
return o[i].Date.Before(o[j].Date)
}
+// getBookDetails determines whether a book is done and what date
+// it was completed, or if it has not finished, the date of any
+// book file.
+func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, err error) {
+ // First try to get the graph.png file from the book, which marks
+ // it as done
+ obj, err := conn.ListObjectWithMeta(conn.WIPStorageId(), key+"graph.png")
+ if err == nil {
+ return obj.Date, true, nil
+ }
+
+ // Otherwise get any file from the book to get a date to sort by
+ obj, err = conn.ListObjectWithMeta(conn.WIPStorageId(), key)
+ if err != nil {
+ return time.Time{}, false, err
+ }
+ return obj.Date, false, nil
+}
+
+// getBookDetailsChan gets the details for a book putting it into either the
+// done or inprogress channels as appropriate, or sending an error to errc
+// on failure.
+func getBookDetailsChan(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error) {
+ date, isdone, err := getBookDetails(conn, key)
+ if err != nil {
+ errc <- err
+ return
+ }
+ meta := bookpipeline.ObjMeta{Name: strings.TrimSuffix(key, "/"), Date: date}
+ if isdone {
+ done <- meta
+ } else {
+ inprogress <- meta
+ }
+}
+
// getBookStatus returns a list of in progress and done books.
// It determines this by finding all prefixes, and splitting them
// into two lists, those which have a 'graph.png' file (the done
// list), and those which do not (the inprogress list). They are
// sorted according to the date of the graph.png file, or the date
// of a random file with the prefix if no graph.png was found.
+// It spins up many goroutines to do query the book status and
+// dates, as it is far faster to do concurrently.
func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) {
prefixes, err := conn.ListObjectPrefixes(conn.WIPStorageId())
- var inprogressmeta, donemeta ObjMetas
if err != nil {
log.Println("Error getting object prefixes:", err)
return
}
- // Search for graph.png to determine done books (and save the date of it to sort with)
+
+ donec := make(chan bookpipeline.ObjMeta, 100)
+ inprogressc := make(chan bookpipeline.ObjMeta, 100)
+ errc := make(chan error)
+
for _, p := range prefixes {
- objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), p+"graph.png")
- if err != nil || len(objs) == 0 {
- inprogressmeta = append(inprogressmeta, bookpipeline.ObjMeta{Name: p})
- } else {
- donemeta = append(donemeta, bookpipeline.ObjMeta{Name: p, Date: objs[0].Date})
- }
+ go getBookDetailsChan(conn, p, donec, inprogressc, errc)
}
- // Get a random file from the inprogress list to get a date to sort by
- for _, i := range inprogressmeta {
- objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), i.Name)
- if err != nil || len(objs) == 0 {
- continue
+
+ var inprogressmeta, donemeta ObjMetas
+
+ // there will be exactly as many sends to donec or inprogressc
+ // as there are prefixes
+ for range prefixes {
+ select {
+ case i := <-donec:
+ donemeta = append(donemeta, i)
+ case i := <-inprogressc:
+ inprogressmeta = append(inprogressmeta, i)
+ case err = <-errc:
+ return inprogress, done, err
}
- i.Date = objs[0].Date
}
+
sort.Sort(donemeta)
+ sort.Sort(inprogressmeta)
+
for _, i := range donemeta {
- done = append(done, strings.TrimSuffix(i.Name, "/"))
+ done = append(done, i.Name)
}
- sort.Sort(inprogressmeta)
for _, i := range inprogressmeta {
- inprogress = append(inprogress, strings.TrimSuffix(i.Name, "/"))
+ inprogress = append(inprogress, i.Name)
}
return
diff --git a/cmd/postprocess-bythresh/main.go b/cmd/postprocess-bythresh/main.go
index 37b77e7..5bdb839 100644
--- a/cmd/postprocess-bythresh/main.go
+++ b/cmd/postprocess-bythresh/main.go
@@ -19,7 +19,6 @@ import (
//TO DO: make writetofile return an error and handle that accordingly
// potential TO DO: add text versions where footer is cropped on odd/even pages only
-
// the trimblanks function trims the blank lines from a text input
func trimblanks(hocrfile string) string {
@@ -50,7 +49,7 @@ func dehyphenateString(in string) string {
words := strings.Split(line, " ")
last := words[len(words)-1]
// the - 2 here is to account for a trailing newline and counting from zero
- if len(last) > 0 && last[len(last) - 1] == '-' && i < len(lines) - 2 {
+ if len(last) > 0 && last[len(last)-1] == '-' && i < len(lines)-2 {
nextwords := strings.Split(lines[i+1], " ")
if len(nextwords) > 0 {
line = line[0:len(line)-1] + nextwords[0]
@@ -66,17 +65,15 @@ func dehyphenateString(in string) string {
return strings.Join(newlines, " ")
}
-
// the fullcrop function takes a text input and crops the first and the last line (if text is at least 2 lines long)
func fullcrop(noblanks string) string {
-
alllines := strings.Split(noblanks, "\n")
-
+
if len(alllines) <= 2 {
- return ""
- } else {
- return strings.Join(alllines[1:len(alllines)-2], "\n")
+ return ""
+ } else {
+ return strings.Join(alllines[1:len(alllines)-2], "\n")
}
}
@@ -132,7 +129,6 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string,
var killheadtxt string
var footkilltxt string
-
hocrfilepath := filepath.Join(bookdirectory, hocrfilename)
confpath := filepath.Join(bookdirectory, "conf")
@@ -165,18 +161,16 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string,
if err != nil {
log.Fatal(err)
}
-
-
+
trimbest := trimblanks(hocrfiletext)
-
+
alltxt = dehyphenateString(trimbest)
-
+
croptxt = dehyphenateString(fullcrop(trimbest))
-
+
killheadtxt = dehyphenateString(headcrop(trimbest))
-
+
footkilltxt = dehyphenateString(footcrop(trimbest))
-
}
return alltxt, croptxt, killheadtxt, footkilltxt
@@ -185,7 +179,7 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string,
// the writetofile function takes a directory, filename and text input and creates a text file within the bookdirectory from them.
func writetofile(bookdirectory, textfilebase, txt string) error {
alltxtfile := filepath.Join(bookdirectory, textfilebase)
-
+
file, err := os.Create(alltxtfile)
if err != nil {
return fmt.Errorf("Error opening file %s: %v", alltxtfile, err)
@@ -194,7 +188,7 @@ func writetofile(bookdirectory, textfilebase, txt string) error {
if _, err := file.WriteString(txt); err != nil {
log.Println(err)
}
-return err
+ return err
}
@@ -215,7 +209,7 @@ func main() {
bookdirectory := flag.Arg(0)
confthreshstring := strconv.Itoa(*confthresh)
-
+
fmt.Println("Postprocessing", bookdirectory, "with threshold", *confthresh)
bestpath := filepath.Join(bookdirectory, "best")
@@ -239,32 +233,31 @@ func main() {
crop = crop + " " + croptxt
killhead = killhead + " " + killheadtxt
killfoot = killfoot + " " + footkilltxt
-
+
}
}
-
-
- bookname:= filepath.Base(bookdirectory)
- b := bookname + "_" + confthreshstring
- err1 := writetofile(bookdirectory, b + "_all.txt", all)
- if err1 != nil {
+ bookname := filepath.Base(bookdirectory)
+ b := bookname + "_" + confthreshstring
+
+ err1 := writetofile(bookdirectory, b+"_all.txt", all)
+ if err1 != nil {
log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err1)
- }
-
- err2 := writetofile(bookdirectory, b + "_crop.txt", crop)
- if err2 != nil {
+ }
+
+ err2 := writetofile(bookdirectory, b+"_crop.txt", crop)
+ if err2 != nil {
log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err2)
- }
-
- err3 := writetofile(bookdirectory, b + "_nohead.txt", killhead)
- if err3 != nil {
+ }
+
+ err3 := writetofile(bookdirectory, b+"_nohead.txt", killhead)
+ if err3 != nil {
log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err3)
- }
-
- err4 := writetofile(bookdirectory, b + "_nofoot.txt", killfoot)
- if err4 != nil {
+ }
+
+ err4 := writetofile(bookdirectory, b+"_nofoot.txt", killfoot)
+ if err4 != nil {
log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err4)
- }
+ }
}
diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go
new file mode 100644
index 0000000..07eeaf0
--- /dev/null
+++ b/cmd/rescribe/main.go
@@ -0,0 +1,395 @@
+// Copyright 2019 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+// rescribe is a modification of bookpipeline designed for local-only
+// operation, which rolls uploading, processing, and downloading of
+// a single book by the pipeline into one command.
+package main
+
+import (
+ "flag"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "regexp"
+ "runtime"
+ "strings"
+ "time"
+
+ "rescribe.xyz/bookpipeline"
+ "rescribe.xyz/utils/pkg/hocr"
+
+ "rescribe.xyz/bookpipeline/internal/pipeline"
+)
+
+const usage = `Usage: rescribe [-v] [-t training] bookdir [savedir]
+
+Process and OCR a book using the Rescribe pipeline on a local machine.
+
+OCR results are saved into the bookdir directory unless savedir is
+specified.
+`
+
+const QueueTimeoutSecs = 2 * 60
+const PauseBetweenChecks = 1 * time.Second
+const LogSaveTime = 1 * time.Minute
+var thresholds = []float64{0.1, 0.2, 0.3}
+
+// null writer to enable non-verbose logging to be discarded
+type NullWriter bool
+
+func (w NullWriter) Write(p []byte) (n int, err error) {
+ return len(p), nil
+}
+
+type Clouder interface {
+ Init() error
+ ListObjects(bucket string, prefix string) ([]string, error)
+ Download(bucket string, key string, fn string) error
+ Upload(bucket string, key string, path string) error
+ CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
+ AddToQueue(url string, msg string) error
+ DelFromQueue(url string, handle string) error
+ QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)
+}
+
+type Pipeliner interface {
+ Clouder
+ PreQueueId() string
+ WipeQueueId() string
+ OCRPageQueueId() string
+ AnalyseQueueId() string
+ WIPStorageId() string
+ GetLogger() *log.Logger
+ Log(v ...interface{})
+}
+
+func stopTimer(t *time.Timer) {
+ if !t.Stop() {
+ <-t.C
+ }
+}
+
+func resetTimer(t *time.Timer, d time.Duration) {
+ if d > 0 {
+ t.Reset(d)
+ }
+}
+
+func main() {
+ deftesscmd := "tesseract"
+ if runtime.GOOS == "windows" {
+ deftesscmd = "C:\\Program Files\\Tesseract-OCR\\tesseract.exe"
+ }
+
+ verbose := flag.Bool("v", false, "verbose")
+ training := flag.String("t", "training/rescribev7_fast.traineddata", "path to the tesseract training file to use")
+ tesscmd := flag.String("tesscmd", deftesscmd, "The Tesseract executable to run. You may need to set this to the full path of Tesseract.exe if you're on Windows.")
+
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(), usage)
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+
+ if flag.NArg() < 1 || flag.NArg() > 2 {
+ flag.Usage()
+ return
+ }
+
+ bookdir := flag.Arg(0)
+ bookname := filepath.Base(bookdir)
+ savedir := bookdir
+ if flag.NArg() > 1 {
+ savedir = flag.Arg(1)
+ }
+
+ var verboselog *log.Logger
+ if *verbose {
+ verboselog = log.New(os.Stdout, "", 0)
+ } else {
+ var n NullWriter
+ verboselog = log.New(n, "", 0)
+ }
+
+ f, err := os.Open(*training)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Error: Training file %s could not be opened.\n", *training)
+ fmt.Fprintf(os.Stderr, "Set the `-t` flag with path to a tesseract .traineddata file.\n")
+ os.Exit(1)
+ }
+ f.Close()
+
+ abstraining, err := filepath.Abs(*training)
+ if err != nil {
+ log.Fatalf("Error getting absolute path of training %s: %v", err)
+ }
+ tessPrefix, trainingName := filepath.Split(abstraining)
+ trainingName = strings.TrimSuffix(trainingName, ".traineddata")
+ err = os.Setenv("TESSDATA_PREFIX", tessPrefix)
+ if err != nil {
+ log.Fatalln("Error setting TESSDATA_PREFIX:", err)
+ }
+
+ _, err = exec.Command(*tesscmd, "--help").Output()
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Error: Can't run Tesseract.\n")
+ fmt.Fprintf(os.Stderr, "Ensure that Tesseract is installed and available.\n")
+ fmt.Fprintf(os.Stderr, "You may need to -tesscmd to the full path of Tesseract.exe if you're on Windows, like this:\n")
+ fmt.Fprintf(os.Stderr, " rescribe -tesscmd 'C:\\Program Files\\Tesseract OCR (x86)\\tesseract.exe' ...\n")
+ os.Exit(1)
+ }
+
+ tempdir, err := ioutil.TempDir("", "bookpipeline")
+ if err != nil {
+ log.Fatalln("Error setting up temporary directory:", err)
+ }
+
+ var conn Pipeliner
+ conn = &bookpipeline.LocalConn{Logger: verboselog, TempDir: tempdir}
+
+ conn.Log("Setting up session")
+ err = conn.Init()
+ if err != nil {
+ log.Fatalln("Error setting up connection:", err)
+ }
+ conn.Log("Finished setting up session")
+
+ fmt.Printf("Copying book to pipeline\n")
+
+ err = uploadbook(bookdir, bookname, conn)
+ if err != nil {
+ _ = os.RemoveAll(tempdir)
+ log.Fatalln(err)
+ }
+
+ fmt.Printf("Processing book\n")
+ err = processbook(trainingName, *tesscmd, conn)
+ if err != nil {
+ _ = os.RemoveAll(tempdir)
+ log.Fatalln(err)
+ }
+
+ fmt.Printf("Saving finished book to %s\n", savedir)
+ err = os.MkdirAll(savedir, 0755)
+ if err != nil {
+ log.Fatalf("Error creating save directory %s: %v", savedir, err)
+ }
+ err = downloadbook(savedir, bookname, conn)
+ if err != nil {
+ _ = os.RemoveAll(tempdir)
+ log.Fatalln(err)
+ }
+
+ err = os.RemoveAll(tempdir)
+ if err != nil {
+ log.Fatalf("Error removing temporary directory %s: %v", tempdir, err)
+ }
+
+ hocrs, err := filepath.Glob(fmt.Sprintf("%s%s*.hocr", savedir, string(filepath.Separator)))
+ if err != nil {
+ log.Fatalf("Error looking for .hocr files: %v", err)
+ }
+
+ for _, v := range hocrs {
+ err = addTxtVersion(v)
+ if err != nil {
+ log.Fatalf("Error creating txt version of %s: %v", v, err)
+ }
+
+ err = os.MkdirAll(filepath.Join(savedir, "hocr"), 0755)
+ if err != nil {
+ log.Fatalf("Error creating hocr directory: %v", err)
+ }
+
+ err = os.Rename(v, filepath.Join(savedir, "hocr", filepath.Base(v)))
+ if err != nil {
+ log.Fatalf("Error moving hocr %s to hocr directory: %v", v, err)
+ }
+ }
+
+ // For simplicity, remove .binarised.pdf and rename .colour.pdf to .pdf
+ _ = os.Remove(filepath.Join(savedir, bookname + ".binarised.pdf"))
+ _ = os.Rename(filepath.Join(savedir, bookname + ".colour.pdf"), filepath.Join(savedir, bookname + ".pdf"))
+}
+
+func addTxtVersion(hocrfn string) error {
+ dir := filepath.Dir(hocrfn)
+ err := os.MkdirAll(filepath.Join(dir, "text"), 0755)
+ if err != nil {
+ log.Fatalf("Error creating text directory: %v", err)
+ }
+
+ t, err := hocr.GetText(hocrfn)
+ if err != nil {
+ return fmt.Errorf("Error getting text from hocr file %s: %v", hocrfn, err)
+ }
+
+ basefn := filepath.Base(hocrfn)
+ for _, v := range thresholds {
+ basefn = strings.TrimSuffix(basefn, fmt.Sprintf("_bin%.1f.hocr", v))
+ }
+ fn := filepath.Join(dir, "text", basefn + ".txt")
+
+ err = ioutil.WriteFile(fn, []byte(t), 0644)
+ if err != nil {
+ return fmt.Errorf("Error creating text file %s: %v", fn, err)
+ }
+
+ return nil
+}
+
+func uploadbook(dir string, name string, conn Pipeliner) error {
+ err := pipeline.CheckImages(dir)
+ if err != nil {
+ return fmt.Errorf("Error with images in %s: %v", dir, err)
+ }
+ err = pipeline.UploadImages(dir, name, conn)
+ if err != nil {
+ return fmt.Errorf("Error saving images to process from %s: %v", dir, err)
+ }
+
+ qid := pipeline.DetectQueueType(dir, conn)
+
+ err = conn.AddToQueue(qid, name)
+ if err != nil {
+ return fmt.Errorf("Error adding book job to queue %s: %v", qid, err)
+ }
+
+ return nil
+}
+
+func downloadbook(dir string, name string, conn Pipeliner) error {
+ err := os.MkdirAll(name, 0755)
+ if err != nil {
+ log.Fatalln("Failed to create directory", name, err)
+ }
+
+ err = pipeline.DownloadBestPages(dir, name, conn, false)
+ if err != nil {
+ return fmt.Errorf("Error downloading best pages: %v", err)
+ }
+
+ err = pipeline.DownloadPdfs(dir, name, conn)
+ if err != nil {
+ return fmt.Errorf("Error downloading PDFs: %v", err)
+ }
+
+ err = pipeline.DownloadAnalyses(dir, name, conn)
+ if err != nil {
+ return fmt.Errorf("Error downloading analyses: %v", err)
+ }
+
+ return nil
+}
+
+func processbook(training string, tesscmd string, conn Pipeliner) error {
+ origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`)
+ wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`)
+ ocredPattern := regexp.MustCompile(`.hocr$`)
+
+ var checkPreQueue <-chan time.Time
+ var checkWipeQueue <-chan time.Time
+ var checkOCRPageQueue <-chan time.Time
+ var checkAnalyseQueue <-chan time.Time
+ var stopIfQuiet *time.Timer
+ checkPreQueue = time.After(0)
+ checkWipeQueue = time.After(0)
+ checkOCRPageQueue = time.After(0)
+ checkAnalyseQueue = time.After(0)
+ var quietTime = 1 * time.Second
+ stopIfQuiet = time.NewTimer(quietTime)
+ if quietTime == 0 {
+ stopIfQuiet.Stop()
+ }
+
+ for {
+ select {
+ case <-checkPreQueue:
+ msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs)
+ checkPreQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ return fmt.Errorf("Error checking preprocess queue: %v", err)
+ }
+ if msg.Handle == "" {
+ conn.Log("No message received on preprocess queue, sleeping")
+ continue
+ }
+ stopTimer(stopIfQuiet)
+ conn.Log("Message received on preprocess queue, processing", msg.Body)
+ fmt.Printf(" Preprocessing book (binarising and wiping)\n")
+ err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess(thresholds), origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
+ fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output
+ resetTimer(stopIfQuiet, quietTime)
+ if err != nil {
+ return fmt.Errorf("Error during preprocess: %v", err)
+ }
+ case <-checkWipeQueue:
+ msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs)
+ checkWipeQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ return fmt.Errorf("Error checking wipeonly queue, %v", err)
+ }
+ if msg.Handle == "" {
+ conn.Log("No message received on wipeonly queue, sleeping")
+ continue
+ }
+ stopTimer(stopIfQuiet)
+ conn.Log("Message received on wipeonly queue, processing", msg.Body)
+ fmt.Printf(" Preprocessing book (wiping only)\n")
+ err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
+ fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output
+ resetTimer(stopIfQuiet, quietTime)
+ if err != nil {
+ return fmt.Errorf("Error during wipe: %v", err)
+ }
+ case <-checkOCRPageQueue:
+ msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs)
+ checkOCRPageQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ return fmt.Errorf("Error checking OCR Page queue: %v", err)
+ }
+ if msg.Handle == "" {
+ continue
+ }
+ // Have OCRPageQueue checked immediately after completion, as chances are high that
+ // there will be more pages that should be done without delay
+ checkOCRPageQueue = time.After(0)
+ stopTimer(stopIfQuiet)
+ conn.Log("Message received on OCR Page queue, processing", msg.Body)
+ fmt.Printf(".")
+ err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training, tesscmd), conn.OCRPageQueueId(), conn.AnalyseQueueId())
+ resetTimer(stopIfQuiet, quietTime)
+ if err != nil {
+ return fmt.Errorf("\nError during OCR Page process: %v", err)
+ }
+ case <-checkAnalyseQueue:
+ msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs)
+ checkAnalyseQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ return fmt.Errorf("Error checking analyse queue: %v", err)
+ }
+ if msg.Handle == "" {
+ conn.Log("No message received on analyse queue, sleeping")
+ continue
+ }
+ stopTimer(stopIfQuiet)
+ conn.Log("Message received on analyse queue, processing", msg.Body)
+ fmt.Printf("\n Analysing OCR and compiling PDFs\n")
+ err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "")
+ resetTimer(stopIfQuiet, quietTime)
+ if err != nil {
+ return fmt.Errorf("Error during analysis: %v", err)
+ }
+ case <-stopIfQuiet.C:
+ conn.Log("Processing finished")
+ return nil
+ }
+ }
+
+ return fmt.Errorf("Ended unexpectedly") // should never be reached
+}
diff --git a/cmd/rmbook/main.go b/cmd/rmbook/main.go
new file mode 100644
index 0000000..fcacc2e
--- /dev/null
+++ b/cmd/rmbook/main.go
@@ -0,0 +1,87 @@
+// Copyright 2020 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+// rmbook removes a book from cloud storage.
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+
+ "rescribe.xyz/bookpipeline"
+)
+
+const usage = `Usage: rmbook [-dryrun] bookname
+
+Removes a book from cloud storage.
+`
+
+// null writer to enable non-verbose logging to be discarded
+type NullWriter bool
+
+func (w NullWriter) Write(p []byte) (n int, err error) {
+ return len(p), nil
+}
+
+type RmPipeliner interface {
+ MinimalInit() error
+ WIPStorageId() string
+ DeleteObjects(bucket string, keys []string) error
+ ListObjects(bucket string, prefix string) ([]string, error)
+}
+
+func main() {
+ dryrun := flag.Bool("dryrun", false, "print which files would be deleted but don't delete")
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(), usage)
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+
+ if flag.NArg() < 1 {
+ flag.Usage()
+ return
+ }
+
+ var n NullWriter
+ verboselog := log.New(n, "", log.LstdFlags)
+
+ var conn RmPipeliner
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
+
+ fmt.Println("Setting up cloud connection")
+ err := conn.MinimalInit()
+ if err != nil {
+ log.Fatalln("Error setting up cloud connection:", err)
+ }
+
+ bookname := flag.Arg(0) + "/"
+
+ fmt.Println("Getting list of files for book")
+ objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
+ if err != nil {
+ log.Fatalln("Error in listing book items:", err)
+ }
+
+ if len(objs) == 0 {
+ log.Fatalln("No files found for book:", bookname)
+ }
+
+ if *dryrun {
+ fmt.Printf("I would delete these files:\n")
+ for _, v := range objs {
+ fmt.Println(v)
+ }
+ return
+ }
+
+ fmt.Println("Deleting all files for book")
+ err = conn.DeleteObjects(conn.WIPStorageId(), objs)
+ if err != nil {
+ log.Fatalln("Error deleting book files:", err)
+ }
+
+ fmt.Println("Finished deleting files")
+}
diff --git a/cmd/trimqueue/main.go b/cmd/trimqueue/main.go
new file mode 100644
index 0000000..cf65c4d
--- /dev/null
+++ b/cmd/trimqueue/main.go
@@ -0,0 +1,84 @@
+// Copyright 2019 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+// trimqueue deletes any messages in a queue that match a specified
+// prefix.
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+
+ "rescribe.xyz/bookpipeline"
+)
+
+const usage = `Usage: trimprefix qname prefix
+
+trimqueue deletes any messages in a queue that match a specified
+prefix.
+
+Valid queue names:
+- preprocess
+- wipeonly
+- ocrpage
+- analyse
+`
+
+type QueuePipeliner interface {
+ Init() error
+ RemovePrefixesFromQueue(url string, prefix string) error
+ PreQueueId() string
+ WipeQueueId() string
+ OCRPageQueueId() string
+ AnalyseQueueId() string
+}
+
+func main() {
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(), usage)
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+
+ if flag.NArg() != 2 {
+ flag.Usage()
+ return
+ }
+
+ var conn QueuePipeliner
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2"}
+
+ err := conn.Init()
+ if err != nil {
+ log.Fatalln("Error setting up cloud connection:", err)
+ }
+
+ qdetails := []struct {
+ id, name string
+ }{
+ {conn.PreQueueId(), "preprocess"},
+ {conn.WipeQueueId(), "wipeonly"},
+ {conn.OCRPageQueueId(), "ocrpage"},
+ {conn.AnalyseQueueId(), "analyse"},
+ }
+
+ qname := flag.Arg(0)
+
+ var qid string
+ for i, n := range qdetails {
+ if n.name == qname {
+ qid = qdetails[i].id
+ break
+ }
+ }
+ if qid == "" {
+ log.Fatalln("Error, no queue named", qname)
+ }
+
+ err = conn.RemovePrefixesFromQueue(qid, flag.Arg(1))
+ if err != nil {
+ log.Fatalln("Error removing prefixes from queue", qname, ":", err)
+ }
+}