From 4c7cdeb5646e84af3f15d4a7cd48f64d8086a6b9 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 16:46:43 +0000 Subject: [bookpipeline] Split most functionality out to package internal/pipeline No functionality changes, but this should make it easier to make custom builds using the pipeline in slightly different ways. --- cmd/bookpipeline/main.go | 711 +---------------------------------------------- 1 file changed, 15 insertions(+), 696 deletions(-) (limited to 'cmd/bookpipeline') diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 5a6606d..aff7b87 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -11,20 +11,15 @@ import ( "bytes" "flag" "fmt" - "io/ioutil" "log" - "net/smtp" "os" "os/exec" - "path/filepath" "regexp" - "sort" - "strings" "time" "rescribe.xyz/bookpipeline" - "rescribe.xyz/preproc" - "rescribe.xyz/utils/pkg/hocr" + + "rescribe.xyz/bookpipeline/internal/pipeline" ) const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] [-autostop secs] @@ -47,9 +42,9 @@ this put a text file in {UserConfigDir}/bookpipeline/mailsettings with the contents: {smtpserver} {port} {username} {password} {from} {to} ` +const QueueTimeoutSecs = 2 * 60 const PauseBetweenChecks = 3 * time.Minute const LogSaveTime = 1 * time.Minute -const HeartbeatSeconds = 60 // null writer to enable non-verbose logging to be discarded type NullWriter bool @@ -80,638 +75,6 @@ type Pipeliner interface { Log(v ...interface{}) } -type pageimg struct { - hocr, img string -} - -type mailSettings struct { - server, port, user, pass, from, to string -} - -func getMailSettings() (mailSettings, error) { - p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings") - b, err := ioutil.ReadFile(p) - if err != nil { - return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err) - } - f := strings.Fields(string(b)) - if len(f) != 6 { - return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f)) - } - return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil -} - -func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { - for key := range dl { - fn := filepath.Join(dir, filepath.Base(key)) - logger.Println("Downloading", key) - err := conn.Download(conn.WIPStorageId(), key, fn) - if err != nil { - for range dl { - } // consume the rest of the receiving channel so it isn't blocked - close(process) - errc <- err - return - } - process <- fn - } - close(process) -} - -func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { - for path := range c { - name := filepath.Base(path) - key := bookname + "/" + name - logger.Println("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - err = os.Remove(path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - } - - done <- true -} - -func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { - for path := range c { - name := filepath.Base(path) - key := bookname + "/" + name - logger.Println("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - err = os.Remove(path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - logger.Println("Adding", key, training, "to queue", toQueue) - err = conn.AddToQueue(toQueue, key+" "+training) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - } - - done <- true -} - -func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range pre { - logger.Println("Preprocessing", path) - done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30) - if err != nil { - for range pre { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - _ = os.Remove(path) - for _, p := range done { - up <- p - } - } - close(up) -} - -func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range towipe { - logger.Println("Wiping", path) - s := strings.Split(path, ".") - base := strings.Join(s[:len(s)-1], "") - outpath := base + "_bin0.0.png" - err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30) - if err != nil { - for range towipe { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - up <- outpath - } - close(up) -} - -func ocr(training string) func(chan string, chan string, chan error, *log.Logger) { - return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range toocr { - logger.Println("OCRing", path) - name := strings.Replace(path, ".png", "", 1) - cmd := exec.Command("tesseract", "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - err := cmd.Run() - if err != nil { - for range toocr { - } // consume the rest of the receiving channel so it isn't blocked - errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String()) - return - } - up <- name + ".hocr" - } - close(up) - } -} - -func analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { - return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { - confs := make(map[string][]*bookpipeline.Conf) - bestconfs := make(map[string]*bookpipeline.Conf) - savedir := "" - - for path := range toanalyse { - if savedir == "" { - savedir = filepath.Dir(path) - } - logger.Println("Calculating confidence for", path) - avg, err := hocr.GetAvgConf(path) - if err != nil && err.Error() == "No words found" { - continue - } - if err != nil { - for range toanalyse { - } // consume the rest of the receiving channel so it isn't blocked - errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err) - return - } - base := filepath.Base(path) - codestart := strings.Index(base, "_bin") - name := base[0:codestart] - var c bookpipeline.Conf - c.Path = path - c.Code = base[codestart:] - c.Conf = avg - confs[name] = append(confs[name], &c) - } - - fn := filepath.Join(savedir, "conf") - logger.Println("Saving confidences in file", fn) - f, err := os.Create(fn) - if err != nil { - errc <- fmt.Errorf("Error creating file %s: %s", fn, err) - return - } - defer f.Close() - - logger.Println("Finding best confidence for each page, and saving all confidences") - for base, conf := range confs { - var best float64 - for _, c := range conf { - if c.Conf > best { - best = c.Conf - bestconfs[base] = c - } - _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) - if err != nil { - errc <- fmt.Errorf("Error writing confidences file: %s", err) - return - } - } - } - up <- fn - - logger.Println("Creating best file listing the best file for each page") - fn = filepath.Join(savedir, "best") - f, err = os.Create(fn) - if err != nil { - errc <- fmt.Errorf("Error creating file %s: %s", fn, err) - return - } - defer f.Close() - for _, conf := range bestconfs { - _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) - } - up <- fn - - var pgs []string - for _, conf := range bestconfs { - pgs = append(pgs, conf.Path) - } - sort.Strings(pgs) - - logger.Println("Downloading binarised and original images to create PDFs") - bookname, err := filepath.Rel(os.TempDir(), savedir) - if err != nil { - errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err) - return - } - colourpdf := new(bookpipeline.Fpdf) - err = colourpdf.Setup() - if err != nil { - errc <- fmt.Errorf("Failed to set up PDF: %s", err) - return - } - binarisedpdf := new(bookpipeline.Fpdf) - err = binarisedpdf.Setup() - if err != nil { - errc <- fmt.Errorf("Failed to set up PDF: %s", err) - return - } - binhascontent, colourhascontent := false, false - - var colourimgs, binimgs []pageimg - - for _, pg := range pgs { - base := filepath.Base(pg) - nosuffix := strings.TrimSuffix(base, ".hocr") - p := strings.SplitN(base, "_bin", 2) - - var fn string - if len(p) > 1 { - fn = p[0] + ".jpg" - } else { - fn = nosuffix + ".jpg" - } - - binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"}) - colourimgs = append(colourimgs, pageimg{hocr: base, img: fn}) - } - - for _, pg := range binimgs { - logger.Println("Downloading binarised page to add to PDF", pg.img) - err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img)) - if err != nil { - logger.Println("Download failed; skipping page", pg.img) - } else { - err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true) - if err != nil { - errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) - return - } - binhascontent = true - err = os.Remove(filepath.Join(savedir, pg.img)) - if err != nil { - errc <- err - return - } - } - } - - if binhascontent { - fn = filepath.Join(savedir, bookname+".binarised.pdf") - err = binarisedpdf.Save(fn) - if err != nil { - errc <- fmt.Errorf("Failed to save binarised pdf: %s", err) - return - } - up <- fn - key := bookname + "/" + bookname + ".binarised.pdf" - conn.Log("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, fn) - if err != nil { - } - } - - for _, pg := range colourimgs { - logger.Println("Downloading colour page to add to PDF", pg.img) - colourfn := pg.img - err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) - if err != nil { - colourfn = strings.Replace(pg.img, ".jpg", ".png", 1) - logger.Println("Download failed; trying", colourfn) - err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) - if err != nil { - logger.Println("Download failed; skipping page", pg.img) - } - } - if err == nil { - err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true) - if err != nil { - errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) - return - } - colourhascontent = true - err = os.Remove(filepath.Join(savedir, colourfn)) - if err != nil { - errc <- err - return - } - } - } - if colourhascontent { - fn = filepath.Join(savedir, bookname+".colour.pdf") - err = colourpdf.Save(fn) - if err != nil { - errc <- fmt.Errorf("Failed to save colour pdf: %s", err) - return - } - up <- fn - } - - logger.Println("Creating graph") - fn = filepath.Join(savedir, "graph.png") - f, err = os.Create(fn) - if err != nil { - errc <- fmt.Errorf("Error creating file %s: %s", fn, err) - return - } - defer f.Close() - err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) - if err != nil && err.Error() != "Not enough valid confidences" { - errc <- fmt.Errorf("Error rendering graph: %s", err) - return - } - up <- fn - - close(up) - } -} - -func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { - currentmsg := msg - for range t.C { - m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) - if err != nil { - // This is for better debugging of the heartbeat issue - conn.Log("Error with heartbeat", err) - os.Exit(1) - // TODO: would be better to ensure this error stops any running - // processes, as they will ultimately fail in the case of - // it. could do this by setting a global variable that - // processes check each time they loop. - errc <- err - t.Stop() - return - } - if m.Id != "" { - conn.Log("Replaced message handle as visibilitytimeout limit was reached") - currentmsg = m - // TODO: maybe handle communicating new msg more gracefully than this - for range msgc { - } // throw away any old msgc - msgc <- m - } - } -} - -// allOCRed checks whether all pages of a book have been OCRed. -// This is determined by whether every _bin0.?.png file has a -// corresponding .hocr file. -func allOCRed(bookname string, conn Pipeliner) bool { - objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) - if err != nil { - return false - } - - preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) - - atleastone := false - for _, png := range objs { - if preprocessedPattern.MatchString(png) { - atleastone = true - found := false - b := strings.TrimSuffix(filepath.Base(png), ".png") - hocrname := bookname + "/" + b + ".hocr" - for _, hocr := range objs { - if hocr == hocrname { - found = true - break - } - } - if found == false { - return false - } - } - } - if atleastone == false { - return false - } - return true -} - -// ocrPage OCRs a page based on a message. It may make sense to -// roll this back into processBook (on which it is based) once -// working well. -func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { - dl := make(chan string) - msgc := make(chan bookpipeline.Qmsg) - processc := make(chan string) - upc := make(chan string) - done := make(chan bool) - errc := make(chan error) - - msgparts := strings.Split(msg.Body, " ") - bookname := filepath.Dir(msgparts[0]) - if len(msgparts) > 1 && msgparts[1] != "" { - process = ocr(msgparts[1]) - } - - d := filepath.Join(os.TempDir(), bookname) - err := os.MkdirAll(d, 0755) - if err != nil { - return fmt.Errorf("Failed to create directory %s: %s", d, err) - } - - t := time.NewTicker(HeartbeatSeconds * time.Second) - go heartbeat(conn, t, msg, fromQueue, msgc, errc) - - // these functions will do their jobs when their channels have data - go download(dl, processc, conn, d, errc, conn.GetLogger()) - go process(processc, upc, errc, conn.GetLogger()) - go up(upc, done, conn, bookname, errc, conn.GetLogger()) - - dl <- msgparts[0] - close(dl) - - // wait for either the done or errc channel to be sent to - select { - case err = <-errc: - t.Stop() - _ = os.RemoveAll(d) - return err - case <-done: - } - - if allOCRed(bookname, conn) && toQueue != "" { - conn.Log("Sending", bookname, "to queue", toQueue) - err = conn.AddToQueue(toQueue, bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return fmt.Errorf("Error adding to queue %s: %s", bookname, err) - } - } - - t.Stop() - - // check whether we're using a newer msg handle - select { - case m, ok := <-msgc: - if ok { - msg = m - conn.Log("Using new message handle to delete message from queue") - } - default: - conn.Log("Using original message handle to delete message from queue") - } - - conn.Log("Deleting original message from queue", fromQueue) - err = conn.DelFromQueue(fromQueue, msg.Handle) - if err != nil { - _ = os.RemoveAll(d) - return fmt.Errorf("Error deleting message from queue: %s", err) - } - - err = os.RemoveAll(d) - if err != nil { - return fmt.Errorf("Failed to remove directory %s: %s", d, err) - } - - return nil -} - -func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { - dl := make(chan string) - msgc := make(chan bookpipeline.Qmsg) - processc := make(chan string) - upc := make(chan string) - done := make(chan bool) - errc := make(chan error) - - msgparts := strings.Split(msg.Body, " ") - bookname := msgparts[0] - - var training string - if len(msgparts) > 1 { - training = msgparts[1] - } - - d := filepath.Join(os.TempDir(), bookname) - err := os.MkdirAll(d, 0755) - if err != nil { - return fmt.Errorf("Failed to create directory %s: %s", d, err) - } - - t := time.NewTicker(HeartbeatSeconds * time.Second) - go heartbeat(conn, t, msg, fromQueue, msgc, errc) - - // these functions will do their jobs when their channels have data - go download(dl, processc, conn, d, errc, conn.GetLogger()) - go process(processc, upc, errc, conn.GetLogger()) - if toQueue == conn.OCRPageQueueId() { - go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) - } else { - go up(upc, done, conn, bookname, errc, conn.GetLogger()) - } - - conn.Log("Getting list of objects to download") - objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err) - } - var todl []string - for _, n := range objs { - if !match.MatchString(n) { - conn.Log("Skipping item that doesn't match target", n) - continue - } - todl = append(todl, n) - } - for _, a := range todl { - dl <- a - } - close(dl) - - // wait for either the done or errc channel to be sent to - select { - case err = <-errc: - t.Stop() - _ = os.RemoveAll(d) - // if the error is in preprocessing / wipeonly, chances are that it will never - // complete, and will fill the ocrpage queue with parts which succeeded - // on each run, so in that case it's better to delete the message from - // the queue and notify us. - if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { - conn.Log("Deleting message from queue due to a bad error", fromQueue) - err2 := conn.DelFromQueue(fromQueue, msg.Handle) - if err2 != nil { - conn.Log("Error deleting message from queue", err2) - } - ms, err2 := getMailSettings() - if err2 != nil { - conn.Log("Failed to mail settings ", err2) - } - if err2 == nil && ms.server != "" { - logs, err2 := getlogs() - if err2 != nil { - conn.Log("Failed to get logs ", err2) - logs = "" - } - msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" + - "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" + - " Fail message: %s\r\nFull log:\r\n%s\r\n", - ms.to, ms.from, bookname, err, logs) - host := fmt.Sprintf("%s:%s", ms.server, ms.port) - auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server) - err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg)) - if err2 != nil { - conn.Log("Error sending email ", err2) - } - } - } - return err - case <-done: - } - - if toQueue != "" && toQueue != conn.OCRPageQueueId() { - conn.Log("Sending", bookname, "to queue", toQueue) - err = conn.AddToQueue(toQueue, bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return fmt.Errorf("Error adding to queue %s: %s", bookname, err) - } - } - - t.Stop() - - // check whether we're using a newer msg handle - select { - case m, ok := <-msgc: - if ok { - msg = m - conn.Log("Using new message handle to delete message from queue") - } - default: - conn.Log("Using original message handle to delete message from queue") - } - - conn.Log("Deleting original message from queue", fromQueue) - err = conn.DelFromQueue(fromQueue, msg.Handle) - if err != nil { - _ = os.RemoveAll(d) - return fmt.Errorf("Error deleting message from queue: %s", err) - } - - err = os.RemoveAll(d) - if err != nil { - return fmt.Errorf("Failed to remove directory %s: %s", d, err) - } - - return nil -} - func stopTimer(t *time.Timer) { if !t.Stop() { <-t.C @@ -724,50 +87,6 @@ func resetTimer(t *time.Timer, d time.Duration) { } } -// TODO: rather than relying on journald, would be nicer to save the logs -// ourselves maybe, so that we weren't relying on a particular systemd -// setup. this can be done by having the conn.Log also append line -// to a file (though that would mean everything would have to go through -// conn.Log, which we're not consistently doing yet). the correct thing -// to do then would be to implement a new interface that covers the part -// of log.Logger we use (e.g. Print and Printf), and then have an exported -// conn struct that implements those, so that we could pass a log.Logger -// or the new conn struct everywhere (we wouldn't be passing a log.Logger, -// it's just good to be able to keep the compatibility) -func getlogs() (string, error) { - cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all") - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - err := cmd.Run() - return stdout.String(), err -} - -func savelogs(conn Pipeliner, starttime int64, hostname string) error { - logs, err := getlogs() - if err != nil { - return fmt.Errorf("Error getting logs, error: %v", err) - } - key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname) - path := filepath.Join(os.TempDir(), key) - f, err := os.Create(path) - if err != nil { - return fmt.Errorf("Error creating log file", err) - } - defer f.Close() - _, err = f.WriteString(logs) - if err != nil { - return fmt.Errorf("Error saving log file", err) - } - _ = f.Close() - err = conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - return fmt.Errorf("Error uploading log", err) - } - conn.Log("Log saved to", key) - return nil -} - func main() { verbose := flag.Bool("v", false, "verbose") training := flag.String("t", "rescribealphav5", "default tesseract training file to use (without the .traineddata part)") @@ -809,7 +128,7 @@ func main() { var err error if *conntype != "local" { - _, err = getMailSettings() + _, err = pipeline.GetMailSettings() if err != nil { conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err) } @@ -857,7 +176,7 @@ func main() { for { select { case <-checkPreQueue: - msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs) checkPreQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking preprocess queue", err) @@ -869,13 +188,13 @@ func main() { } conn.Log("Message received on preprocess queue, processing", msg.Body) stopTimer(stopIfQuiet) - err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during preprocess", err) } case <-checkWipeQueue: - msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) checkWipeQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking wipeonly queue", err) @@ -887,13 +206,13 @@ func main() { } stopTimer(stopIfQuiet) conn.Log("Message received on wipeonly queue, processing", msg.Body) - err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during wipe", err) } case <-checkOCRPageQueue: - msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs) checkOCRPageQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking OCR Page queue", err) @@ -907,13 +226,13 @@ func main() { checkOCRPageQueue = time.After(0) stopTimer(stopIfQuiet) conn.Log("Message received on OCR Page queue, processing", msg.Body) - err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during OCR Page process", err) } case <-checkAnalyseQueue: - msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs) checkAnalyseQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking analyse queue", err) @@ -925,14 +244,14 @@ func main() { } stopTimer(stopIfQuiet) conn.Log("Message received on analyse queue, processing", msg.Body) - err = processBook(msg, conn, analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") + err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during analysis", err) } case <-savelognow.C: conn.Log("Saving logs") - err = savelogs(conn, starttime, hostname) + err = pipeline.SaveLogs(conn, starttime, hostname) if err != nil { conn.Log("Error saving logs", err) } @@ -942,11 +261,11 @@ func main() { } if !*autoshutdown { conn.Log("Stopping pipeline") - _ = savelogs(conn, starttime, hostname) + _ = pipeline.SaveLogs(conn, starttime, hostname) return } conn.Log("Shutting down") - _ = savelogs(conn, starttime, hostname) + _ = pipeline.SaveLogs(conn, starttime, hostname) cmd := exec.Command("sudo", "systemctl", "poweroff") var stdout, stderr bytes.Buffer cmd.Stdout = &stdout -- cgit v1.2.1-24-ge1ad