summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick White <git@njw.name>2020-11-09 16:46:43 +0000
committerNick White <git@njw.name>2020-11-09 16:46:43 +0000
commit4c7cdeb5646e84af3f15d4a7cd48f64d8086a6b9 (patch)
treea0a186d5ac1fbdf491a57f5ef25758deb5e51c8d
parentfa1593d4b9b5f5e2b9f46137739557e29f65c765 (diff)
[bookpipeline] Split most functionality out to package internal/pipeline
No functionality changes, but this should make it easier to make custom builds using the pipeline in slightly different ways.
-rw-r--r--cmd/bookpipeline/main.go711
-rw-r--r--internal/pipeline/main.go725
2 files changed, 740 insertions, 696 deletions
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go
index 5a6606d..aff7b87 100644
--- a/cmd/bookpipeline/main.go
+++ b/cmd/bookpipeline/main.go
@@ -11,20 +11,15 @@ import (
"bytes"
"flag"
"fmt"
- "io/ioutil"
"log"
- "net/smtp"
"os"
"os/exec"
- "path/filepath"
"regexp"
- "sort"
- "strings"
"time"
"rescribe.xyz/bookpipeline"
- "rescribe.xyz/preproc"
- "rescribe.xyz/utils/pkg/hocr"
+
+ "rescribe.xyz/bookpipeline/internal/pipeline"
)
const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] [-autostop secs]
@@ -47,9 +42,9 @@ this put a text file in {UserConfigDir}/bookpipeline/mailsettings with
the contents: {smtpserver} {port} {username} {password} {from} {to}
`
+const QueueTimeoutSecs = 2 * 60
const PauseBetweenChecks = 3 * time.Minute
const LogSaveTime = 1 * time.Minute
-const HeartbeatSeconds = 60
// null writer to enable non-verbose logging to be discarded
type NullWriter bool
@@ -80,638 +75,6 @@ type Pipeliner interface {
Log(v ...interface{})
}
-type pageimg struct {
- hocr, img string
-}
-
-type mailSettings struct {
- server, port, user, pass, from, to string
-}
-
-func getMailSettings() (mailSettings, error) {
- p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings")
- b, err := ioutil.ReadFile(p)
- if err != nil {
- return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err)
- }
- f := strings.Fields(string(b))
- if len(f) != 6 {
- return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f))
- }
- return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil
-}
-
-func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) {
- for key := range dl {
- fn := filepath.Join(dir, filepath.Base(key))
- logger.Println("Downloading", key)
- err := conn.Download(conn.WIPStorageId(), key, fn)
- if err != nil {
- for range dl {
- } // consume the rest of the receiving channel so it isn't blocked
- close(process)
- errc <- err
- return
- }
- process <- fn
- }
- close(process)
-}
-
-func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) {
- for path := range c {
- name := filepath.Base(path)
- key := bookname + "/" + name
- logger.Println("Uploading", key)
- err := conn.Upload(conn.WIPStorageId(), key, path)
- if err != nil {
- for range c {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- err = os.Remove(path)
- if err != nil {
- for range c {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- }
-
- done <- true
-}
-
-func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) {
- for path := range c {
- name := filepath.Base(path)
- key := bookname + "/" + name
- logger.Println("Uploading", key)
- err := conn.Upload(conn.WIPStorageId(), key, path)
- if err != nil {
- for range c {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- err = os.Remove(path)
- if err != nil {
- for range c {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- logger.Println("Adding", key, training, "to queue", toQueue)
- err = conn.AddToQueue(toQueue, key+" "+training)
- if err != nil {
- for range c {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- }
-
- done <- true
-}
-
-func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) {
- for path := range pre {
- logger.Println("Preprocessing", path)
- done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30)
- if err != nil {
- for range pre {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- _ = os.Remove(path)
- for _, p := range done {
- up <- p
- }
- }
- close(up)
-}
-
-func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) {
- for path := range towipe {
- logger.Println("Wiping", path)
- s := strings.Split(path, ".")
- base := strings.Join(s[:len(s)-1], "")
- outpath := base + "_bin0.0.png"
- err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30)
- if err != nil {
- for range towipe {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- up <- outpath
- }
- close(up)
-}
-
-func ocr(training string) func(chan string, chan string, chan error, *log.Logger) {
- return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) {
- for path := range toocr {
- logger.Println("OCRing", path)
- name := strings.Replace(path, ".png", "", 1)
- cmd := exec.Command("tesseract", "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0")
- var stdout, stderr bytes.Buffer
- cmd.Stdout = &stdout
- cmd.Stderr = &stderr
- err := cmd.Run()
- if err != nil {
- for range toocr {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String())
- return
- }
- up <- name + ".hocr"
- }
- close(up)
- }
-}
-
-func analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) {
- return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) {
- confs := make(map[string][]*bookpipeline.Conf)
- bestconfs := make(map[string]*bookpipeline.Conf)
- savedir := ""
-
- for path := range toanalyse {
- if savedir == "" {
- savedir = filepath.Dir(path)
- }
- logger.Println("Calculating confidence for", path)
- avg, err := hocr.GetAvgConf(path)
- if err != nil && err.Error() == "No words found" {
- continue
- }
- if err != nil {
- for range toanalyse {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err)
- return
- }
- base := filepath.Base(path)
- codestart := strings.Index(base, "_bin")
- name := base[0:codestart]
- var c bookpipeline.Conf
- c.Path = path
- c.Code = base[codestart:]
- c.Conf = avg
- confs[name] = append(confs[name], &c)
- }
-
- fn := filepath.Join(savedir, "conf")
- logger.Println("Saving confidences in file", fn)
- f, err := os.Create(fn)
- if err != nil {
- errc <- fmt.Errorf("Error creating file %s: %s", fn, err)
- return
- }
- defer f.Close()
-
- logger.Println("Finding best confidence for each page, and saving all confidences")
- for base, conf := range confs {
- var best float64
- for _, c := range conf {
- if c.Conf > best {
- best = c.Conf
- bestconfs[base] = c
- }
- _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf)
- if err != nil {
- errc <- fmt.Errorf("Error writing confidences file: %s", err)
- return
- }
- }
- }
- up <- fn
-
- logger.Println("Creating best file listing the best file for each page")
- fn = filepath.Join(savedir, "best")
- f, err = os.Create(fn)
- if err != nil {
- errc <- fmt.Errorf("Error creating file %s: %s", fn, err)
- return
- }
- defer f.Close()
- for _, conf := range bestconfs {
- _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path))
- }
- up <- fn
-
- var pgs []string
- for _, conf := range bestconfs {
- pgs = append(pgs, conf.Path)
- }
- sort.Strings(pgs)
-
- logger.Println("Downloading binarised and original images to create PDFs")
- bookname, err := filepath.Rel(os.TempDir(), savedir)
- if err != nil {
- errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err)
- return
- }
- colourpdf := new(bookpipeline.Fpdf)
- err = colourpdf.Setup()
- if err != nil {
- errc <- fmt.Errorf("Failed to set up PDF: %s", err)
- return
- }
- binarisedpdf := new(bookpipeline.Fpdf)
- err = binarisedpdf.Setup()
- if err != nil {
- errc <- fmt.Errorf("Failed to set up PDF: %s", err)
- return
- }
- binhascontent, colourhascontent := false, false
-
- var colourimgs, binimgs []pageimg
-
- for _, pg := range pgs {
- base := filepath.Base(pg)
- nosuffix := strings.TrimSuffix(base, ".hocr")
- p := strings.SplitN(base, "_bin", 2)
-
- var fn string
- if len(p) > 1 {
- fn = p[0] + ".jpg"
- } else {
- fn = nosuffix + ".jpg"
- }
-
- binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"})
- colourimgs = append(colourimgs, pageimg{hocr: base, img: fn})
- }
-
- for _, pg := range binimgs {
- logger.Println("Downloading binarised page to add to PDF", pg.img)
- err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img))
- if err != nil {
- logger.Println("Download failed; skipping page", pg.img)
- } else {
- err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true)
- if err != nil {
- errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err)
- return
- }
- binhascontent = true
- err = os.Remove(filepath.Join(savedir, pg.img))
- if err != nil {
- errc <- err
- return
- }
- }
- }
-
- if binhascontent {
- fn = filepath.Join(savedir, bookname+".binarised.pdf")
- err = binarisedpdf.Save(fn)
- if err != nil {
- errc <- fmt.Errorf("Failed to save binarised pdf: %s", err)
- return
- }
- up <- fn
- key := bookname + "/" + bookname + ".binarised.pdf"
- conn.Log("Uploading", key)
- err := conn.Upload(conn.WIPStorageId(), key, fn)
- if err != nil {
- }
- }
-
- for _, pg := range colourimgs {
- logger.Println("Downloading colour page to add to PDF", pg.img)
- colourfn := pg.img
- err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn))
- if err != nil {
- colourfn = strings.Replace(pg.img, ".jpg", ".png", 1)
- logger.Println("Download failed; trying", colourfn)
- err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn))
- if err != nil {
- logger.Println("Download failed; skipping page", pg.img)
- }
- }
- if err == nil {
- err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true)
- if err != nil {
- errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err)
- return
- }
- colourhascontent = true
- err = os.Remove(filepath.Join(savedir, colourfn))
- if err != nil {
- errc <- err
- return
- }
- }
- }
- if colourhascontent {
- fn = filepath.Join(savedir, bookname+".colour.pdf")
- err = colourpdf.Save(fn)
- if err != nil {
- errc <- fmt.Errorf("Failed to save colour pdf: %s", err)
- return
- }
- up <- fn
- }
-
- logger.Println("Creating graph")
- fn = filepath.Join(savedir, "graph.png")
- f, err = os.Create(fn)
- if err != nil {
- errc <- fmt.Errorf("Error creating file %s: %s", fn, err)
- return
- }
- defer f.Close()
- err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f)
- if err != nil && err.Error() != "Not enough valid confidences" {
- errc <- fmt.Errorf("Error rendering graph: %s", err)
- return
- }
- up <- fn
-
- close(up)
- }
-}
-
-func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) {
- currentmsg := msg
- for range t.C {
- m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2)
- if err != nil {
- // This is for better debugging of the heartbeat issue
- conn.Log("Error with heartbeat", err)
- os.Exit(1)
- // TODO: would be better to ensure this error stops any running
- // processes, as they will ultimately fail in the case of
- // it. could do this by setting a global variable that
- // processes check each time they loop.
- errc <- err
- t.Stop()
- return
- }
- if m.Id != "" {
- conn.Log("Replaced message handle as visibilitytimeout limit was reached")
- currentmsg = m
- // TODO: maybe handle communicating new msg more gracefully than this
- for range msgc {
- } // throw away any old msgc
- msgc <- m
- }
- }
-}
-
-// allOCRed checks whether all pages of a book have been OCRed.
-// This is determined by whether every _bin0.?.png file has a
-// corresponding .hocr file.
-func allOCRed(bookname string, conn Pipeliner) bool {
- objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
- if err != nil {
- return false
- }
-
- preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`)
-
- atleastone := false
- for _, png := range objs {
- if preprocessedPattern.MatchString(png) {
- atleastone = true
- found := false
- b := strings.TrimSuffix(filepath.Base(png), ".png")
- hocrname := bookname + "/" + b + ".hocr"
- for _, hocr := range objs {
- if hocr == hocrname {
- found = true
- break
- }
- }
- if found == false {
- return false
- }
- }
- }
- if atleastone == false {
- return false
- }
- return true
-}
-
-// ocrPage OCRs a page based on a message. It may make sense to
-// roll this back into processBook (on which it is based) once
-// working well.
-func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error {
- dl := make(chan string)
- msgc := make(chan bookpipeline.Qmsg)
- processc := make(chan string)
- upc := make(chan string)
- done := make(chan bool)
- errc := make(chan error)
-
- msgparts := strings.Split(msg.Body, " ")
- bookname := filepath.Dir(msgparts[0])
- if len(msgparts) > 1 && msgparts[1] != "" {
- process = ocr(msgparts[1])
- }
-
- d := filepath.Join(os.TempDir(), bookname)
- err := os.MkdirAll(d, 0755)
- if err != nil {
- return fmt.Errorf("Failed to create directory %s: %s", d, err)
- }
-
- t := time.NewTicker(HeartbeatSeconds * time.Second)
- go heartbeat(conn, t, msg, fromQueue, msgc, errc)
-
- // these functions will do their jobs when their channels have data
- go download(dl, processc, conn, d, errc, conn.GetLogger())
- go process(processc, upc, errc, conn.GetLogger())
- go up(upc, done, conn, bookname, errc, conn.GetLogger())
-
- dl <- msgparts[0]
- close(dl)
-
- // wait for either the done or errc channel to be sent to
- select {
- case err = <-errc:
- t.Stop()
- _ = os.RemoveAll(d)
- return err
- case <-done:
- }
-
- if allOCRed(bookname, conn) && toQueue != "" {
- conn.Log("Sending", bookname, "to queue", toQueue)
- err = conn.AddToQueue(toQueue, bookname)
- if err != nil {
- t.Stop()
- _ = os.RemoveAll(d)
- return fmt.Errorf("Error adding to queue %s: %s", bookname, err)
- }
- }
-
- t.Stop()
-
- // check whether we're using a newer msg handle
- select {
- case m, ok := <-msgc:
- if ok {
- msg = m
- conn.Log("Using new message handle to delete message from queue")
- }
- default:
- conn.Log("Using original message handle to delete message from queue")
- }
-
- conn.Log("Deleting original message from queue", fromQueue)
- err = conn.DelFromQueue(fromQueue, msg.Handle)
- if err != nil {
- _ = os.RemoveAll(d)
- return fmt.Errorf("Error deleting message from queue: %s", err)
- }
-
- err = os.RemoveAll(d)
- if err != nil {
- return fmt.Errorf("Failed to remove directory %s: %s", d, err)
- }
-
- return nil
-}
-
-func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
- dl := make(chan string)
- msgc := make(chan bookpipeline.Qmsg)
- processc := make(chan string)
- upc := make(chan string)
- done := make(chan bool)
- errc := make(chan error)
-
- msgparts := strings.Split(msg.Body, " ")
- bookname := msgparts[0]
-
- var training string
- if len(msgparts) > 1 {
- training = msgparts[1]
- }
-
- d := filepath.Join(os.TempDir(), bookname)
- err := os.MkdirAll(d, 0755)
- if err != nil {
- return fmt.Errorf("Failed to create directory %s: %s", d, err)
- }
-
- t := time.NewTicker(HeartbeatSeconds * time.Second)
- go heartbeat(conn, t, msg, fromQueue, msgc, errc)
-
- // these functions will do their jobs when their channels have data
- go download(dl, processc, conn, d, errc, conn.GetLogger())
- go process(processc, upc, errc, conn.GetLogger())
- if toQueue == conn.OCRPageQueueId() {
- go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger())
- } else {
- go up(upc, done, conn, bookname, errc, conn.GetLogger())
- }
-
- conn.Log("Getting list of objects to download")
- objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
- if err != nil {
- t.Stop()
- _ = os.RemoveAll(d)
- return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err)
- }
- var todl []string
- for _, n := range objs {
- if !match.MatchString(n) {
- conn.Log("Skipping item that doesn't match target", n)
- continue
- }
- todl = append(todl, n)
- }
- for _, a := range todl {
- dl <- a
- }
- close(dl)
-
- // wait for either the done or errc channel to be sent to
- select {
- case err = <-errc:
- t.Stop()
- _ = os.RemoveAll(d)
- // if the error is in preprocessing / wipeonly, chances are that it will never
- // complete, and will fill the ocrpage queue with parts which succeeded
- // on each run, so in that case it's better to delete the message from
- // the queue and notify us.
- if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() {
- conn.Log("Deleting message from queue due to a bad error", fromQueue)
- err2 := conn.DelFromQueue(fromQueue, msg.Handle)
- if err2 != nil {
- conn.Log("Error deleting message from queue", err2)
- }
- ms, err2 := getMailSettings()
- if err2 != nil {
- conn.Log("Failed to mail settings ", err2)
- }
- if err2 == nil && ms.server != "" {
- logs, err2 := getlogs()
- if err2 != nil {
- conn.Log("Failed to get logs ", err2)
- logs = ""
- }
- msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" +
- "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" +
- " Fail message: %s\r\nFull log:\r\n%s\r\n",
- ms.to, ms.from, bookname, err, logs)
- host := fmt.Sprintf("%s:%s", ms.server, ms.port)
- auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server)
- err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg))
- if err2 != nil {
- conn.Log("Error sending email ", err2)
- }
- }
- }
- return err
- case <-done:
- }
-
- if toQueue != "" && toQueue != conn.OCRPageQueueId() {
- conn.Log("Sending", bookname, "to queue", toQueue)
- err = conn.AddToQueue(toQueue, bookname)
- if err != nil {
- t.Stop()
- _ = os.RemoveAll(d)
- return fmt.Errorf("Error adding to queue %s: %s", bookname, err)
- }
- }
-
- t.Stop()
-
- // check whether we're using a newer msg handle
- select {
- case m, ok := <-msgc:
- if ok {
- msg = m
- conn.Log("Using new message handle to delete message from queue")
- }
- default:
- conn.Log("Using original message handle to delete message from queue")
- }
-
- conn.Log("Deleting original message from queue", fromQueue)
- err = conn.DelFromQueue(fromQueue, msg.Handle)
- if err != nil {
- _ = os.RemoveAll(d)
- return fmt.Errorf("Error deleting message from queue: %s", err)
- }
-
- err = os.RemoveAll(d)
- if err != nil {
- return fmt.Errorf("Failed to remove directory %s: %s", d, err)
- }
-
- return nil
-}
-
func stopTimer(t *time.Timer) {
if !t.Stop() {
<-t.C
@@ -724,50 +87,6 @@ func resetTimer(t *time.Timer, d time.Duration) {
}
}
-// TODO: rather than relying on journald, would be nicer to save the logs
-// ourselves maybe, so that we weren't relying on a particular systemd
-// setup. this can be done by having the conn.Log also append line
-// to a file (though that would mean everything would have to go through
-// conn.Log, which we're not consistently doing yet). the correct thing
-// to do then would be to implement a new interface that covers the part
-// of log.Logger we use (e.g. Print and Printf), and then have an exported
-// conn struct that implements those, so that we could pass a log.Logger
-// or the new conn struct everywhere (we wouldn't be passing a log.Logger,
-// it's just good to be able to keep the compatibility)
-func getlogs() (string, error) {
- cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all")
- var stdout, stderr bytes.Buffer
- cmd.Stdout = &stdout
- cmd.Stderr = &stderr
- err := cmd.Run()
- return stdout.String(), err
-}
-
-func savelogs(conn Pipeliner, starttime int64, hostname string) error {
- logs, err := getlogs()
- if err != nil {
- return fmt.Errorf("Error getting logs, error: %v", err)
- }
- key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname)
- path := filepath.Join(os.TempDir(), key)
- f, err := os.Create(path)
- if err != nil {
- return fmt.Errorf("Error creating log file", err)
- }
- defer f.Close()
- _, err = f.WriteString(logs)
- if err != nil {
- return fmt.Errorf("Error saving log file", err)
- }
- _ = f.Close()
- err = conn.Upload(conn.WIPStorageId(), key, path)
- if err != nil {
- return fmt.Errorf("Error uploading log", err)
- }
- conn.Log("Log saved to", key)
- return nil
-}
-
func main() {
verbose := flag.Bool("v", false, "verbose")
training := flag.String("t", "rescribealphav5", "default tesseract training file to use (without the .traineddata part)")
@@ -809,7 +128,7 @@ func main() {
var err error
if *conntype != "local" {
- _, err = getMailSettings()
+ _, err = pipeline.GetMailSettings()
if err != nil {
conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err)
}
@@ -857,7 +176,7 @@ func main() {
for {
select {
case <-checkPreQueue:
- msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatSeconds*2)
+ msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs)
checkPreQueue = time.After(PauseBetweenChecks)
if err != nil {
conn.Log("Error checking preprocess queue", err)
@@ -869,13 +188,13 @@ func main() {
}
conn.Log("Message received on preprocess queue, processing", msg.Body)
stopTimer(stopIfQuiet)
- err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
+ err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during preprocess", err)
}
case <-checkWipeQueue:
- msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatSeconds*2)
+ msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs)
checkWipeQueue = time.After(PauseBetweenChecks)
if err != nil {
conn.Log("Error checking wipeonly queue", err)
@@ -887,13 +206,13 @@ func main() {
}
stopTimer(stopIfQuiet)
conn.Log("Message received on wipeonly queue, processing", msg.Body)
- err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
+ err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during wipe", err)
}
case <-checkOCRPageQueue:
- msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatSeconds*2)
+ msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs)
checkOCRPageQueue = time.After(PauseBetweenChecks)
if err != nil {
conn.Log("Error checking OCR Page queue", err)
@@ -907,13 +226,13 @@ func main() {
checkOCRPageQueue = time.After(0)
stopTimer(stopIfQuiet)
conn.Log("Message received on OCR Page queue, processing", msg.Body)
- err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId())
+ err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId())
resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during OCR Page process", err)
}
case <-checkAnalyseQueue:
- msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatSeconds*2)
+ msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs)
checkAnalyseQueue = time.After(PauseBetweenChecks)
if err != nil {
conn.Log("Error checking analyse queue", err)
@@ -925,14 +244,14 @@ func main() {
}
stopTimer(stopIfQuiet)
conn.Log("Message received on analyse queue, processing", msg.Body)
- err = processBook(msg, conn, analyse(conn), ocredPattern, conn.AnalyseQueueId(), "")
+ err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "")
resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during analysis", err)
}
case <-savelognow.C:
conn.Log("Saving logs")
- err = savelogs(conn, starttime, hostname)
+ err = pipeline.SaveLogs(conn, starttime, hostname)
if err != nil {
conn.Log("Error saving logs", err)
}
@@ -942,11 +261,11 @@ func main() {
}
if !*autoshutdown {
conn.Log("Stopping pipeline")
- _ = savelogs(conn, starttime, hostname)
+ _ = pipeline.SaveLogs(conn, starttime, hostname)
return
}
conn.Log("Shutting down")
- _ = savelogs(conn, starttime, hostname)
+ _ = pipeline.SaveLogs(conn, starttime, hostname)
cmd := exec.Command("sudo", "systemctl", "poweroff")
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
diff --git a/internal/pipeline/main.go b/internal/pipeline/main.go
new file mode 100644
index 0000000..6e03c50
--- /dev/null
+++ b/internal/pipeline/main.go
@@ -0,0 +1,725 @@
+// pipeline is a package used by the bookpipeline command, which
+// handles the core functionality, using channels heavily to
+// coordinate jobs. Note that it is considered an "internal" package,
+// not intended for external use, and no guarantee is made of the
+// stability of any interfaces provided.
+package pipeline
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "net/smtp"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "regexp"
+ "sort"
+ "strings"
+ "time"
+
+ "rescribe.xyz/bookpipeline"
+ "rescribe.xyz/preproc"
+ "rescribe.xyz/utils/pkg/hocr"
+)
+
+const HeartbeatSeconds = 60
+
+type Clouder interface {
+ Init() error
+ ListObjects(bucket string, prefix string) ([]string, error)
+ Download(bucket string, key string, fn string) error
+ Upload(bucket string, key string, path string) error
+ CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
+ AddToQueue(url string, msg string) error
+ DelFromQueue(url string, handle string) error
+ QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)
+}
+
+type Pipeliner interface {
+ Clouder
+ PreQueueId() string
+ WipeQueueId() string
+ OCRPageQueueId() string
+ AnalyseQueueId() string
+ WIPStorageId() string
+ GetLogger() *log.Logger
+ Log(v ...interface{})
+}
+
+type pageimg struct {
+ hocr, img string
+}
+
+type mailSettings struct {
+ server, port, user, pass, from, to string
+}
+
+func GetMailSettings() (mailSettings, error) {
+ p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings")
+ b, err := ioutil.ReadFile(p)
+ if err != nil {
+ return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err)
+ }
+ f := strings.Fields(string(b))
+ if len(f) != 6 {
+ return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f))
+ }
+ return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil
+}
+
+func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) {
+ for key := range dl {
+ fn := filepath.Join(dir, filepath.Base(key))
+ logger.Println("Downloading", key)
+ err := conn.Download(conn.WIPStorageId(), key, fn)
+ if err != nil {
+ for range dl {
+ } // consume the rest of the receiving channel so it isn't blocked
+ close(process)
+ errc <- err
+ return
+ }
+ process <- fn
+ }
+ close(process)
+}
+
+func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) {
+ for path := range c {
+ name := filepath.Base(path)
+ key := bookname + "/" + name
+ logger.Println("Uploading", key)
+ err := conn.Upload(conn.WIPStorageId(), key, path)
+ if err != nil {
+ for range c {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ err = os.Remove(path)
+ if err != nil {
+ for range c {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ }
+
+ done <- true
+}
+
+func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) {
+ for path := range c {
+ name := filepath.Base(path)
+ key := bookname + "/" + name
+ logger.Println("Uploading", key)
+ err := conn.Upload(conn.WIPStorageId(), key, path)
+ if err != nil {
+ for range c {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ err = os.Remove(path)
+ if err != nil {
+ for range c {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ logger.Println("Adding", key, training, "to queue", toQueue)
+ err = conn.AddToQueue(toQueue, key+" "+training)
+ if err != nil {
+ for range c {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ }
+
+ done <- true
+}
+
+func Preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) {
+ for path := range pre {
+ logger.Println("Preprocessing", path)
+ done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30)
+ if err != nil {
+ for range pre {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ _ = os.Remove(path)
+ for _, p := range done {
+ up <- p
+ }
+ }
+ close(up)
+}
+
+func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) {
+ for path := range towipe {
+ logger.Println("Wiping", path)
+ s := strings.Split(path, ".")
+ base := strings.Join(s[:len(s)-1], "")
+ outpath := base + "_bin0.0.png"
+ err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30)
+ if err != nil {
+ for range towipe {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ up <- outpath
+ }
+ close(up)
+}
+
+func Ocr(training string) func(chan string, chan string, chan error, *log.Logger) {
+ return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) {
+ for path := range toocr {
+ logger.Println("OCRing", path)
+ name := strings.Replace(path, ".png", "", 1)
+ cmd := exec.Command("tesseract", "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0")
+ var stdout, stderr bytes.Buffer
+ cmd.Stdout = &stdout
+ cmd.Stderr = &stderr
+ err := cmd.Run()
+ if err != nil {
+ for range toocr {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String())
+ return
+ }
+ up <- name + ".hocr"
+ }
+ close(up)
+ }
+}
+
+func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) {
+ return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) {
+ confs := make(map[string][]*bookpipeline.Conf)
+ bestconfs := make(map[string]*bookpipeline.Conf)
+ savedir := ""
+
+ for path := range toanalyse {
+ if savedir == "" {
+ savedir = filepath.Dir(path)
+ }
+ logger.Println("Calculating confidence for", path)
+ avg, err := hocr.GetAvgConf(path)
+ if err != nil && err.Error() == "No words found" {
+ continue
+ }
+ if err != nil {
+ for range toanalyse {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err)
+ return
+ }
+ base := filepath.Base(path)
+ codestart := strings.Index(base, "_bin")
+ name := base[0:codestart]
+ var c bookpipeline.Conf
+ c.Path = path
+ c.Code = base[codestart:]
+ c.Conf = avg
+ confs[name] = append(confs[name], &c)
+ }
+
+ fn := filepath.Join(savedir, "conf")
+ logger.Println("Saving confidences in file", fn)
+ f, err := os.Create(fn)
+ if err != nil {
+ errc <- fmt.Errorf("Error creating file %s: %s", fn, err)
+ return
+ }
+ defer f.Close()
+
+ logger.Println("Finding best confidence for each page, and saving all confidences")
+ for base, conf := range confs {
+ var best float64
+ for _, c := range conf {
+ if c.Conf > best {
+ best = c.Conf
+ bestconfs[base] = c
+ }
+ _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf)
+ if err != nil {
+ errc <- fmt.Errorf("Error writing confidences file: %s", err)
+ return
+ }
+ }
+ }
+ up <- fn
+
+ logger.Println("Creating best file listing the best file for each page")
+ fn = filepath.Join(savedir, "best")
+ f, err = os.Create(fn)
+ if err != nil {
+ errc <- fmt.Errorf("Error creating file %s: %s", fn, err)
+ return
+ }
+ defer f.Close()
+ for _, conf := range bestconfs {
+ _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path))
+ }
+ up <- fn
+
+ var pgs []string
+ for _, conf := range bestconfs {
+ pgs = append(pgs, conf.Path)
+ }
+ sort.Strings(pgs)
+
+ logger.Println("Downloading binarised and original images to create PDFs")
+ bookname, err := filepath.Rel(os.TempDir(), savedir)
+ if err != nil {
+ errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err)
+ return
+ }
+ colourpdf := new(bookpipeline.Fpdf)
+ err = colourpdf.Setup()
+ if err != nil {
+ errc <- fmt.Errorf("Failed to set up PDF: %s", err)
+ return
+ }
+ binarisedpdf := new(bookpipeline.Fpdf)
+ err = binarisedpdf.Setup()
+ if err != nil {
+ errc <- fmt.Errorf("Failed to set up PDF: %s", err)
+ return
+ }
+ binhascontent, colourhascontent := false, false
+
+ var colourimgs, binimgs []pageimg
+
+ for _, pg := range pgs {
+ base := filepath.Base(pg)
+ nosuffix := strings.TrimSuffix(base, ".hocr")
+ p := strings.SplitN(base, "_bin", 2)
+
+ var fn string
+ if len(p) > 1 {
+ fn = p[0] + ".jpg"
+ } else {
+ fn = nosuffix + ".jpg"
+ }
+
+ binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"})
+ colourimgs = append(colourimgs, pageimg{hocr: base, img: fn})
+ }
+
+ for _, pg := range binimgs {
+ logger.Println("Downloading binarised page to add to PDF", pg.img)
+ err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img))
+ if err != nil {
+ logger.Println("Download failed; skipping page", pg.img)
+ } else {
+ err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true)
+ if err != nil {
+ errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err)
+ return
+ }
+ binhascontent = true
+ err = os.Remove(filepath.Join(savedir, pg.img))
+ if err != nil {
+ errc <- err
+ return
+ }
+ }
+ }
+
+ if binhascontent {
+ fn = filepath.Join(savedir, bookname+".binarised.pdf")
+ err = binarisedpdf.Save(fn)
+ if err != nil {
+ errc <- fmt.Errorf("Failed to save binarised pdf: %s", err)
+ return
+ }
+ up <- fn
+ key := bookname + "/" + bookname + ".binarised.pdf"
+ conn.Log("Uploading", key)
+ err := conn.Upload(conn.WIPStorageId(), key, fn)
+ if err != nil {
+ }
+ }
+
+ for _, pg := range colourimgs {
+ logger.Println("Downloading colour page to add to PDF", pg.img)
+ colourfn := pg.img
+ err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn))
+ if err != nil {
+ colourfn = strings.Replace(pg.img, ".jpg", ".png", 1)
+ logger.Println("Download failed; trying", colourfn)
+ err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn))
+ if err != nil {
+ logger.Println("Download failed; skipping page", pg.img)
+ }
+ }
+ if err == nil {
+ err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true)
+ if err != nil {
+ errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err)
+ return
+ }
+ colourhascontent = true
+ err = os.Remove(filepath.Join(savedir, colourfn))
+ if err != nil {
+ errc <- err
+ return
+ }
+ }
+ }
+ if colourhascontent {
+ fn = filepath.Join(savedir, bookname+".colour.pdf")
+ err = colourpdf.Save(fn)
+ if err != nil {
+ errc <- fmt.Errorf("Failed to save colour pdf: %s", err)
+ return
+ }
+ up <- fn
+ }
+
+ logger.Println("Creating graph")
+ fn = filepath.Join(savedir, "graph.png")
+ f, err = os.Create(fn)
+ if err != nil {
+ errc <- fmt.Errorf("Error creating file %s: %s", fn, err)
+ return
+ }
+ defer f.Close()
+ err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f)
+ if err != nil && err.Error() != "Not enough valid confidences" {
+ errc <- fmt.Errorf("Error rendering graph: %s", err)
+ return
+ }
+ up <- fn
+
+ close(up)
+ }
+}
+
+func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) {
+ currentmsg := msg
+ for range t.C {
+ m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2)
+ if err != nil {
+ // This is for better debugging of the heartbeat issue
+ conn.Log("Error with heartbeat", err)
+ os.Exit(1)
+ // TODO: would be better to ensure this error stops any running
+ // processes, as they will ultimately fail in the case of
+ // it. could do this by setting a global variable that
+ // processes check each time they loop.
+ errc <- err
+ t.Stop()
+ return
+ }
+ if m.Id != "" {
+ conn.Log("Replaced message handle as visibilitytimeout limit was reached")
+ currentmsg = m
+ // TODO: maybe handle communicating new msg more gracefully than this
+ for range msgc {
+ } // throw away any old msgc
+ msgc <- m
+ }
+ }
+}
+
+// allOCRed checks whether all pages of a book have been OCRed.
+// This is determined by whether every _bin0.?.png file has a
+// corresponding .hocr file.
+func allOCRed(bookname string, conn Pipeliner) bool {
+ objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
+ if err != nil {
+ return false
+ }
+
+ preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`)
+
+ atleastone := false
+ for _, png := range objs {
+ if preprocessedPattern.MatchString(png) {
+ atleastone = true
+ found := false
+ b := strings.TrimSuffix(filepath.Base(png), ".png")
+ hocrname := bookname + "/" + b + ".hocr"
+ for _, hocr := range objs {
+ if hocr == hocrname {
+ found = true
+ break
+ }
+ }
+ if found == false {
+ return false
+ }
+ }
+ }
+ if atleastone == false {
+ return false
+ }
+ return true
+}
+
+// OcrPage OCRs a page based on a message. It may make sense to
+// roll this back into processBook (on which it is based) once
+// working well.
+func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error {
+ dl := make(chan string)
+ msgc := make(chan bookpipeline.Qmsg)
+ processc := make(chan string)
+ upc := make(chan string)
+ done := make(chan bool)
+ errc := make(chan error)
+
+ msgparts := strings.Split(msg.Body, " ")
+ bookname := filepath.Dir(msgparts[0])
+ if len(msgparts) > 1 && msgparts[1] != "" {
+ process = Ocr(msgparts[1])
+ }
+
+ d := filepath.Join(os.TempDir(), bookname)
+ err := os.MkdirAll(d, 0755)
+ if err != nil {
+ return fmt.Errorf("Failed to create directory %s: %s", d, err)
+ }
+
+ t := time.NewTicker(HeartbeatSeconds * time.Second)
+ go heartbeat(conn, t, msg, fromQueue, msgc, errc)
+
+ // these functions will do their jobs when their channels have data
+ go download(dl, processc, conn, d, errc, conn.GetLogger())
+ go process(processc, upc, errc, conn.GetLogger())
+ go up(upc, done, conn, bookname, errc, conn.GetLogger())
+
+ dl <- msgparts[0]
+ close(dl)
+
+ // wait for either the done or errc channel to be sent to
+ select {
+ case err = <-errc:
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return err
+ case <-done:
+ }
+
+ if allOCRed(bookname, conn) && toQueue != "" {
+ conn.Log("Sending", bookname, "to queue", toQueue)
+ err = conn.AddToQueue(toQueue, bookname)
+ if err != nil {
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return fmt.Errorf("Error adding to queue %s: %s", bookname, err)
+ }
+ }
+
+ t.Stop()
+
+ // check whether we're using a newer msg handle
+ select {
+ case m, ok := <-msgc:
+ if ok {
+ msg = m
+ conn.Log("Using new message handle to delete message from queue")
+ }
+ default:
+ conn.Log("Using original message handle to delete message from queue")
+ }
+
+ conn.Log("Deleting original message from queue", fromQueue)
+ err = conn.DelFromQueue(fromQueue, msg.Handle)
+ if err != nil {
+ _ = os.RemoveAll(d)
+ return fmt.Errorf("Error deleting message from queue: %s", err)
+ }
+
+ err = os.RemoveAll(d)
+ if err != nil {
+ return fmt.Errorf("Failed to remove directory %s: %s", d, err)
+ }
+
+ return nil
+}
+
+func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
+ dl := make(chan string)
+ msgc := make(chan bookpipeline.Qmsg)
+ processc := make(chan string)
+ upc := make(chan string)
+ done := make(chan bool)
+ errc := make(chan error)
+
+ msgparts := strings.Split(msg.Body, " ")
+ bookname := msgparts[0]
+
+ var training string
+ if len(msgparts) > 1 {
+ training = msgparts[1]
+ }
+
+ d := filepath.Join(os.TempDir(), bookname)
+ err := os.MkdirAll(d, 0755)
+ if err != nil {
+ return fmt.Errorf("Failed to create directory %s: %s", d, err)
+ }
+
+ t := time.NewTicker(HeartbeatSeconds * time.Second)
+ go heartbeat(conn, t, msg, fromQueue, msgc, errc)
+
+ // these functions will do their jobs when their channels have data
+ go download(dl, processc, conn, d, errc, conn.GetLogger())
+ go process(processc, upc, errc, conn.GetLogger())
+ if toQueue == conn.OCRPageQueueId() {
+ go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger())
+ } else {
+ go up(upc, done, conn, bookname, errc, conn.GetLogger())
+ }
+
+ conn.Log("Getting list of objects to download")
+ objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
+ if err != nil {
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err)
+ }
+ var todl []string
+ for _, n := range objs {
+ if !match.MatchString(n) {
+ conn.Log("Skipping item that doesn't match target", n)
+ continue
+ }
+ todl = append(todl, n)
+ }
+ for _, a := range todl {
+ dl <- a
+ }
+ close(dl)
+
+ // wait for either the done or errc channel to be sent to
+ select {
+ case err = <-errc:
+ t.Stop()
+ _ = os.RemoveAll(d)
+ // if the error is in preprocessing / wipeonly, chances are that it will never
+ // complete, and will fill the ocrpage queue with parts which succeeded
+ // on each run, so in that case it's better to delete the message from
+ // the queue and notify us.
+ if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() {
+ conn.Log("Deleting message from queue due to a bad error", fromQueue)
+ err2 := conn.DelFromQueue(fromQueue, msg.Handle)
+ if err2 != nil {
+ conn.Log("Error deleting message from queue", err2)
+ }
+ ms, err2 := GetMailSettings()
+ if err2 != nil {
+ conn.Log("Failed to mail settings ", err2)
+ }
+ if err2 == nil && ms.server != "" {
+ logs, err2 := getLogs()
+ if err2 != nil {
+ conn.Log("Failed to get logs ", err2)
+ logs = ""
+ }
+ msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" +
+ "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" +
+ " Fail message: %s\r\nFull log:\r\n%s\r\n",
+ ms.to, ms.from, bookname, err, logs)
+ host := fmt.Sprintf("%s:%s", ms.server, ms.port)
+ auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server)
+ err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg))
+ if err2 != nil {
+ conn.Log("Error sending email ", err2)
+ }
+ }
+ }
+ return err
+ case <-done:
+ }
+
+ if toQueue != "" && toQueue != conn.OCRPageQueueId() {
+ conn.Log("Sending", bookname, "to queue", toQueue)
+ err = conn.AddToQueue(toQueue, bookname)
+ if err != nil {
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return fmt.Errorf("Error adding to queue %s: %s", bookname, err)
+ }
+ }
+
+ t.Stop()
+
+ // check whether we're using a newer msg handle
+ select {
+ case m, ok := <-msgc:
+ if ok {
+ msg = m
+ conn.Log("Using new message handle to delete message from queue")
+ }
+ default:
+ conn.Log("Using original message handle to delete message from queue")
+ }
+
+ conn.Log("Deleting original message from queue", fromQueue)
+ err = conn.DelFromQueue(fromQueue, msg.Handle)
+ if err != nil {
+ _ = os.RemoveAll(d)
+ return fmt.Errorf("Error deleting message from queue: %s", err)
+ }
+
+ err = os.RemoveAll(d)
+ if err != nil {
+ return fmt.Errorf("Failed to remove directory %s: %s", d, err)
+ }
+
+ return nil
+}
+
+// TODO: rather than relying on journald, would be nicer to save the logs
+// ourselves maybe, so that we weren't relying on a particular systemd
+// setup. this can be done by having the conn.Log also append line
+// to a file (though that would mean everything would have to go through
+// conn.Log, which we're not consistently doing yet). the correct thing
+// to do then would be to implement a new interface that covers the part
+// of log.Logger we use (e.g. Print and Printf), and then have an exported
+// conn struct that implements those, so that we could pass a log.Logger
+// or the new conn struct everywhere (we wouldn't be passing a log.Logger,
+// it's just good to be able to keep the compatibility)
+func getLogs() (string, error) {
+ cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all")
+ var stdout, stderr bytes.Buffer
+ cmd.Stdout = &stdout
+ cmd.Stderr = &stderr
+ err := cmd.Run()
+ return stdout.String(), err
+}
+
+func SaveLogs(conn Pipeliner, starttime int64, hostname string) error {
+ logs, err := getLogs()
+ if err != nil {
+ return fmt.Errorf("Error getting logs, error: %v", err)
+ }
+ key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname)
+ path := filepath.Join(os.TempDir(), key)
+ f, err := os.Create(path)
+ if err != nil {
+ return fmt.Errorf("Error creating log file", err)
+ }
+ defer f.Close()
+ _, err = f.WriteString(logs)
+ if err != nil {
+ return fmt.Errorf("Error saving log file", err)
+ }
+ _ = f.Close()
+ err = conn.Upload(conn.WIPStorageId(), key, path)
+ if err != nil {
+ return fmt.Errorf("Error uploading log", err)
+ }
+ conn.Log("Log saved to", key)
+ return nil
+}