From fc6becf5ed98e9c0815532fd76639c15eb481ed1 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 17:33:52 +0000 Subject: [rescribe] work in progress at a self-contained local pipeline processor, called rescribe --- internal/pipeline/pipeline.go | 729 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 729 insertions(+) create mode 100644 internal/pipeline/pipeline.go (limited to 'internal/pipeline/pipeline.go') diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go new file mode 100644 index 0000000..b1c3cb9 --- /dev/null +++ b/internal/pipeline/pipeline.go @@ -0,0 +1,729 @@ +// Copyright 2020 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// pipeline is a package used by the bookpipeline command, which +// handles the core functionality, using channels heavily to +// coordinate jobs. Note that it is considered an "internal" package, +// not intended for external use, and no guarantee is made of the +// stability of any interfaces provided. +package pipeline + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "net/smtp" + "os" + "os/exec" + "path/filepath" + "regexp" + "sort" + "strings" + "time" + + "rescribe.xyz/bookpipeline" + "rescribe.xyz/preproc" + "rescribe.xyz/utils/pkg/hocr" +) + +const HeartbeatSeconds = 60 + +type Clouder interface { + Init() error + ListObjects(bucket string, prefix string) ([]string, error) + Download(bucket string, key string, fn string) error + Upload(bucket string, key string, path string) error + CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) + AddToQueue(url string, msg string) error + DelFromQueue(url string, handle string) error + QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) +} + +type Pipeliner interface { + Clouder + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string + WIPStorageId() string + GetLogger() *log.Logger + Log(v ...interface{}) +} + +type pageimg struct { + hocr, img string +} + +type mailSettings struct { + server, port, user, pass, from, to string +} + +func GetMailSettings() (mailSettings, error) { + p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings") + b, err := ioutil.ReadFile(p) + if err != nil { + return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err) + } + f := strings.Fields(string(b)) + if len(f) != 6 { + return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f)) + } + return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil +} + +func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { + for key := range dl { + fn := filepath.Join(dir, filepath.Base(key)) + logger.Println("Downloading", key) + err := conn.Download(conn.WIPStorageId(), key, fn) + if err != nil { + for range dl { + } // consume the rest of the receiving channel so it isn't blocked + close(process) + errc <- err + return + } + process <- fn + } + close(process) +} + +func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { + for path := range c { + name := filepath.Base(path) + key := bookname + "/" + name + logger.Println("Uploading", key) + err := conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + err = os.Remove(path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + } + + done <- true +} + +func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { + for path := range c { + name := filepath.Base(path) + key := bookname + "/" + name + logger.Println("Uploading", key) + err := conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + err = os.Remove(path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + logger.Println("Adding", key, training, "to queue", toQueue) + err = conn.AddToQueue(toQueue, key+" "+training) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + } + + done <- true +} + +func Preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range pre { + logger.Println("Preprocessing", path) + done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30) + if err != nil { + for range pre { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + _ = os.Remove(path) + for _, p := range done { + up <- p + } + } + close(up) +} + +func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range towipe { + logger.Println("Wiping", path) + s := strings.Split(path, ".") + base := strings.Join(s[:len(s)-1], "") + outpath := base + "_bin0.0.png" + err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30) + if err != nil { + for range towipe { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + up <- outpath + } + close(up) +} + +func Ocr(training string) func(chan string, chan string, chan error, *log.Logger) { + return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range toocr { + logger.Println("OCRing", path) + name := strings.Replace(path, ".png", "", 1) + cmd := exec.Command("tesseract", "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + if err != nil { + for range toocr { + } // consume the rest of the receiving channel so it isn't blocked + errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String()) + return + } + up <- name + ".hocr" + } + close(up) + } +} + +func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { + return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { + confs := make(map[string][]*bookpipeline.Conf) + bestconfs := make(map[string]*bookpipeline.Conf) + savedir := "" + + for path := range toanalyse { + if savedir == "" { + savedir = filepath.Dir(path) + } + logger.Println("Calculating confidence for", path) + avg, err := hocr.GetAvgConf(path) + if err != nil && err.Error() == "No words found" { + continue + } + if err != nil { + for range toanalyse { + } // consume the rest of the receiving channel so it isn't blocked + errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err) + return + } + base := filepath.Base(path) + codestart := strings.Index(base, "_bin") + name := base[0:codestart] + var c bookpipeline.Conf + c.Path = path + c.Code = base[codestart:] + c.Conf = avg + confs[name] = append(confs[name], &c) + } + + fn := filepath.Join(savedir, "conf") + logger.Println("Saving confidences in file", fn) + f, err := os.Create(fn) + if err != nil { + errc <- fmt.Errorf("Error creating file %s: %s", fn, err) + return + } + defer f.Close() + + logger.Println("Finding best confidence for each page, and saving all confidences") + for base, conf := range confs { + var best float64 + for _, c := range conf { + if c.Conf > best { + best = c.Conf + bestconfs[base] = c + } + _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) + if err != nil { + errc <- fmt.Errorf("Error writing confidences file: %s", err) + return + } + } + } + up <- fn + + logger.Println("Creating best file listing the best file for each page") + fn = filepath.Join(savedir, "best") + f, err = os.Create(fn) + if err != nil { + errc <- fmt.Errorf("Error creating file %s: %s", fn, err) + return + } + defer f.Close() + for _, conf := range bestconfs { + _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) + } + up <- fn + + var pgs []string + for _, conf := range bestconfs { + pgs = append(pgs, conf.Path) + } + sort.Strings(pgs) + + logger.Println("Downloading binarised and original images to create PDFs") + bookname, err := filepath.Rel(os.TempDir(), savedir) + if err != nil { + errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err) + return + } + colourpdf := new(bookpipeline.Fpdf) + err = colourpdf.Setup() + if err != nil { + errc <- fmt.Errorf("Failed to set up PDF: %s", err) + return + } + binarisedpdf := new(bookpipeline.Fpdf) + err = binarisedpdf.Setup() + if err != nil { + errc <- fmt.Errorf("Failed to set up PDF: %s", err) + return + } + binhascontent, colourhascontent := false, false + + var colourimgs, binimgs []pageimg + + for _, pg := range pgs { + base := filepath.Base(pg) + nosuffix := strings.TrimSuffix(base, ".hocr") + p := strings.SplitN(base, "_bin", 2) + + var fn string + if len(p) > 1 { + fn = p[0] + ".jpg" + } else { + fn = nosuffix + ".jpg" + } + + binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"}) + colourimgs = append(colourimgs, pageimg{hocr: base, img: fn}) + } + + for _, pg := range binimgs { + logger.Println("Downloading binarised page to add to PDF", pg.img) + err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img)) + if err != nil { + logger.Println("Download failed; skipping page", pg.img) + } else { + err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true) + if err != nil { + errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) + return + } + binhascontent = true + err = os.Remove(filepath.Join(savedir, pg.img)) + if err != nil { + errc <- err + return + } + } + } + + if binhascontent { + fn = filepath.Join(savedir, bookname+".binarised.pdf") + err = binarisedpdf.Save(fn) + if err != nil { + errc <- fmt.Errorf("Failed to save binarised pdf: %s", err) + return + } + up <- fn + key := bookname + "/" + bookname + ".binarised.pdf" + conn.Log("Uploading", key) + err := conn.Upload(conn.WIPStorageId(), key, fn) + if err != nil { + } + } + + for _, pg := range colourimgs { + logger.Println("Downloading colour page to add to PDF", pg.img) + colourfn := pg.img + err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) + if err != nil { + colourfn = strings.Replace(pg.img, ".jpg", ".png", 1) + logger.Println("Download failed; trying", colourfn) + err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) + if err != nil { + logger.Println("Download failed; skipping page", pg.img) + } + } + if err == nil { + err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true) + if err != nil { + errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) + return + } + colourhascontent = true + err = os.Remove(filepath.Join(savedir, colourfn)) + if err != nil { + errc <- err + return + } + } + } + if colourhascontent { + fn = filepath.Join(savedir, bookname+".colour.pdf") + err = colourpdf.Save(fn) + if err != nil { + errc <- fmt.Errorf("Failed to save colour pdf: %s", err) + return + } + up <- fn + } + + logger.Println("Creating graph") + fn = filepath.Join(savedir, "graph.png") + f, err = os.Create(fn) + if err != nil { + errc <- fmt.Errorf("Error creating file %s: %s", fn, err) + return + } + defer f.Close() + err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) + if err != nil && err.Error() != "Not enough valid confidences" { + errc <- fmt.Errorf("Error rendering graph: %s", err) + return + } + up <- fn + + close(up) + } +} + +func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { + currentmsg := msg + for range t.C { + m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) + if err != nil { + // This is for better debugging of the heartbeat issue + conn.Log("Error with heartbeat", err) + os.Exit(1) + // TODO: would be better to ensure this error stops any running + // processes, as they will ultimately fail in the case of + // it. could do this by setting a global variable that + // processes check each time they loop. + errc <- err + t.Stop() + return + } + if m.Id != "" { + conn.Log("Replaced message handle as visibilitytimeout limit was reached") + currentmsg = m + // TODO: maybe handle communicating new msg more gracefully than this + for range msgc { + } // throw away any old msgc + msgc <- m + } + } +} + +// allOCRed checks whether all pages of a book have been OCRed. +// This is determined by whether every _bin0.?.png file has a +// corresponding .hocr file. +func allOCRed(bookname string, conn Pipeliner) bool { + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + return false + } + + preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) + + atleastone := false + for _, png := range objs { + if preprocessedPattern.MatchString(png) { + atleastone = true + found := false + b := strings.TrimSuffix(filepath.Base(png), ".png") + hocrname := bookname + "/" + b + ".hocr" + for _, hocr := range objs { + if hocr == hocrname { + found = true + break + } + } + if found == false { + return false + } + } + } + if atleastone == false { + return false + } + return true +} + +// OcrPage OCRs a page based on a message. It may make sense to +// roll this back into processBook (on which it is based) once +// working well. +func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { + dl := make(chan string) + msgc := make(chan bookpipeline.Qmsg) + processc := make(chan string) + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) + + msgparts := strings.Split(msg.Body, " ") + bookname := filepath.Dir(msgparts[0]) + if len(msgparts) > 1 && msgparts[1] != "" { + process = Ocr(msgparts[1]) + } + + d := filepath.Join(os.TempDir(), bookname) + err := os.MkdirAll(d, 0755) + if err != nil { + return fmt.Errorf("Failed to create directory %s: %s", d, err) + } + + t := time.NewTicker(HeartbeatSeconds * time.Second) + go heartbeat(conn, t, msg, fromQueue, msgc, errc) + + // these functions will do their jobs when their channels have data + go download(dl, processc, conn, d, errc, conn.GetLogger()) + go process(processc, upc, errc, conn.GetLogger()) + go up(upc, done, conn, bookname, errc, conn.GetLogger()) + + dl <- msgparts[0] + close(dl) + + // wait for either the done or errc channel to be sent to + select { + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + return err + case <-done: + } + + if allOCRed(bookname, conn) && toQueue != "" { + conn.Log("Sending", bookname, "to queue", toQueue) + err = conn.AddToQueue(toQueue, bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return fmt.Errorf("Error adding to queue %s: %s", bookname, err) + } + } + + t.Stop() + + // check whether we're using a newer msg handle + select { + case m, ok := <-msgc: + if ok { + msg = m + conn.Log("Using new message handle to delete message from queue") + } + default: + conn.Log("Using original message handle to delete message from queue") + } + + conn.Log("Deleting original message from queue", fromQueue) + err = conn.DelFromQueue(fromQueue, msg.Handle) + if err != nil { + _ = os.RemoveAll(d) + return fmt.Errorf("Error deleting message from queue: %s", err) + } + + err = os.RemoveAll(d) + if err != nil { + return fmt.Errorf("Failed to remove directory %s: %s", d, err) + } + + return nil +} + +func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { + dl := make(chan string) + msgc := make(chan bookpipeline.Qmsg) + processc := make(chan string) + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) + + msgparts := strings.Split(msg.Body, " ") + bookname := msgparts[0] + + var training string + if len(msgparts) > 1 { + training = msgparts[1] + } + + d := filepath.Join(os.TempDir(), bookname) + err := os.MkdirAll(d, 0755) + if err != nil { + return fmt.Errorf("Failed to create directory %s: %s", d, err) + } + + t := time.NewTicker(HeartbeatSeconds * time.Second) + go heartbeat(conn, t, msg, fromQueue, msgc, errc) + + // these functions will do their jobs when their channels have data + go download(dl, processc, conn, d, errc, conn.GetLogger()) + go process(processc, upc, errc, conn.GetLogger()) + if toQueue == conn.OCRPageQueueId() { + go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) + } else { + go up(upc, done, conn, bookname, errc, conn.GetLogger()) + } + + conn.Log("Getting list of objects to download") + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err) + } + var todl []string + for _, n := range objs { + if !match.MatchString(n) { + conn.Log("Skipping item that doesn't match target", n) + continue + } + todl = append(todl, n) + } + for _, a := range todl { + dl <- a + } + close(dl) + + // wait for either the done or errc channel to be sent to + select { + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + // if the error is in preprocessing / wipeonly, chances are that it will never + // complete, and will fill the ocrpage queue with parts which succeeded + // on each run, so in that case it's better to delete the message from + // the queue and notify us. + if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { + conn.Log("Deleting message from queue due to a bad error", fromQueue) + err2 := conn.DelFromQueue(fromQueue, msg.Handle) + if err2 != nil { + conn.Log("Error deleting message from queue", err2) + } + ms, err2 := GetMailSettings() + if err2 != nil { + conn.Log("Failed to mail settings ", err2) + } + if err2 == nil && ms.server != "" { + logs, err2 := getLogs() + if err2 != nil { + conn.Log("Failed to get logs ", err2) + logs = "" + } + msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" + + "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" + + " Fail message: %s\r\nFull log:\r\n%s\r\n", + ms.to, ms.from, bookname, err, logs) + host := fmt.Sprintf("%s:%s", ms.server, ms.port) + auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server) + err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg)) + if err2 != nil { + conn.Log("Error sending email ", err2) + } + } + } + return err + case <-done: + } + + if toQueue != "" && toQueue != conn.OCRPageQueueId() { + conn.Log("Sending", bookname, "to queue", toQueue) + err = conn.AddToQueue(toQueue, bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return fmt.Errorf("Error adding to queue %s: %s", bookname, err) + } + } + + t.Stop() + + // check whether we're using a newer msg handle + select { + case m, ok := <-msgc: + if ok { + msg = m + conn.Log("Using new message handle to delete message from queue") + } + default: + conn.Log("Using original message handle to delete message from queue") + } + + conn.Log("Deleting original message from queue", fromQueue) + err = conn.DelFromQueue(fromQueue, msg.Handle) + if err != nil { + _ = os.RemoveAll(d) + return fmt.Errorf("Error deleting message from queue: %s", err) + } + + err = os.RemoveAll(d) + if err != nil { + return fmt.Errorf("Failed to remove directory %s: %s", d, err) + } + + return nil +} + +// TODO: rather than relying on journald, would be nicer to save the logs +// ourselves maybe, so that we weren't relying on a particular systemd +// setup. this can be done by having the conn.Log also append line +// to a file (though that would mean everything would have to go through +// conn.Log, which we're not consistently doing yet). the correct thing +// to do then would be to implement a new interface that covers the part +// of log.Logger we use (e.g. Print and Printf), and then have an exported +// conn struct that implements those, so that we could pass a log.Logger +// or the new conn struct everywhere (we wouldn't be passing a log.Logger, +// it's just good to be able to keep the compatibility) +func getLogs() (string, error) { + cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all") + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + return stdout.String(), err +} + +func SaveLogs(conn Pipeliner, starttime int64, hostname string) error { + logs, err := getLogs() + if err != nil { + return fmt.Errorf("Error getting logs, error: %v", err) + } + key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname) + path := filepath.Join(os.TempDir(), key) + f, err := os.Create(path) + if err != nil { + return fmt.Errorf("Error creating log file", err) + } + defer f.Close() + _, err = f.WriteString(logs) + if err != nil { + return fmt.Errorf("Error saving log file", err) + } + _ = f.Close() + err = conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + return fmt.Errorf("Error uploading log", err) + } + conn.Log("Log saved to", key) + return nil +} -- cgit v1.2.1-24-ge1ad From f19df9e8c1213a49c426caefd2fadc711f5faf11 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 18:55:36 +0000 Subject: Switch Preprocess() to take the thresholds to use, and have rescribe tool only use 0.1,0.2,0.3 --- internal/pipeline/pipeline.go | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) (limited to 'internal/pipeline/pipeline.go') diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index b1c3cb9..cce5c19 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -146,22 +146,24 @@ func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, b done <- true } -func Preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range pre { - logger.Println("Preprocessing", path) - done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30) - if err != nil { - for range pre { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - _ = os.Remove(path) - for _, p := range done { - up <- p +func Preprocess(thresholds []float64) func(chan string, chan string, chan error, *log.Logger) { + return func(pre chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range pre { + logger.Println("Preprocessing", path) + done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, true, 5, 30, 120, 30) + if err != nil { + for range pre { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + _ = os.Remove(path) + for _, p := range done { + up <- p + } } + close(up) } - close(up) } func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { -- cgit v1.2.1-24-ge1ad From 198f8215f8dd0460608abcd03fa49451462c9d11 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 10 Nov 2020 10:41:15 +0000 Subject: [getpipelinebook] Rewrite to use internal package functions --- internal/pipeline/pipeline.go | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'internal/pipeline/pipeline.go') diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index cce5c19..c0accdb 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -52,6 +52,11 @@ type Pipeliner interface { Log(v ...interface{}) } +type MinPipeliner interface { + Pipeliner + MinimalInit() error +} + type pageimg struct { hocr, img string } -- cgit v1.2.1-24-ge1ad From ad7aaf490e78e969bb5495dfda06a33d2a176aec Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 10 Nov 2020 12:28:50 +0000 Subject: [rescribe] Enable custom paths to tesseract command to be set (also improve some error output) --- internal/pipeline/pipeline.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'internal/pipeline/pipeline.go') diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index c0accdb..f6598fd 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -189,12 +189,15 @@ func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logge close(up) } -func Ocr(training string) func(chan string, chan string, chan error, *log.Logger) { +func Ocr(training string, tesscmd string) func(chan string, chan string, chan error, *log.Logger) { return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { + if tesscmd == "" { + tesscmd = "tesseract" + } for path := range toocr { logger.Println("OCRing", path) name := strings.Replace(path, ".png", "", 1) - cmd := exec.Command("tesseract", "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") + cmd := exec.Command(tesscmd, "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr @@ -491,7 +494,7 @@ func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, ch msgparts := strings.Split(msg.Body, " ") bookname := filepath.Dir(msgparts[0]) if len(msgparts) > 1 && msgparts[1] != "" { - process = Ocr(msgparts[1]) + process = Ocr(msgparts[1], "") } d := filepath.Join(os.TempDir(), bookname) -- cgit v1.2.1-24-ge1ad From 33f1726a4c9f8013dcde39e644281059d9766bc4 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 10 Nov 2020 12:30:15 +0000 Subject: gofmt --- internal/pipeline/pipeline.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'internal/pipeline/pipeline.go') diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index f6598fd..280e4d2 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -640,8 +640,8 @@ func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string conn.Log("Failed to get logs ", err2) logs = "" } - msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" + - "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" + + msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n"+ + "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n"+ " Fail message: %s\r\nFull log:\r\n%s\r\n", ms.to, ms.from, bookname, err, logs) host := fmt.Sprintf("%s:%s", ms.server, ms.port) -- cgit v1.2.1-24-ge1ad From cfbb3481368714adcd734906d8a460b873551c90 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 16 Nov 2020 17:43:27 +0000 Subject: Some changes to ensure the pipeline works correctly on Windows There were a couple of places where a file was uploaded while still open, which resulted in an attempt to remove it, which causes an error from Windows. The allOCRed function also included an assumption that the path separator would be a /, which is always correct for AWS, and correct for local on Linux and OSX, but not for local Windows. Fixed by leaving the separator well alone. Also, the local connection was not stripping leading \, like it did /, which caused an issue with Windows local. Windows local is now tested and working, at least through wine. --- internal/pipeline/pipeline.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'internal/pipeline/pipeline.go') diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 280e4d2..20400ad 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -269,6 +269,7 @@ func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Log } } } + f.Close() up <- fn logger.Println("Creating best file listing the best file for each page") @@ -282,6 +283,7 @@ func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Log for _, conf := range bestconfs { _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) } + f.Close() up <- fn var pgs []string @@ -461,8 +463,7 @@ func allOCRed(bookname string, conn Pipeliner) bool { if preprocessedPattern.MatchString(png) { atleastone = true found := false - b := strings.TrimSuffix(filepath.Base(png), ".png") - hocrname := bookname + "/" + b + ".hocr" + hocrname := strings.TrimSuffix(png, ".png") + ".hocr" for _, hocr := range objs { if hocr == hocrname { found = true -- cgit v1.2.1-24-ge1ad From 38dbdd0b21fb363e3f63fd3ea50272975e98eb77 Mon Sep 17 00:00:00 2001 From: Nick White Date: Thu, 3 Dec 2020 15:13:22 +0000 Subject: Don't upload binarised pdf twice needlessly This can also result in the file being uploaded twice simultaneously, as up() is running in a separate goroutine. This can cause failures on Windows as the file is attempted to be removed by one upload process while being open to upload by the other process. Probably it could also fail if the process completed by one (so the file was deleted) before being started by the other. --- internal/pipeline/pipeline.go | 5 ----- 1 file changed, 5 deletions(-) (limited to 'internal/pipeline/pipeline.go') diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 20400ad..13339d7 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -358,11 +358,6 @@ func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Log return } up <- fn - key := bookname + "/" + bookname + ".binarised.pdf" - conn.Log("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, fn) - if err != nil { - } } for _, pg := range colourimgs { -- cgit v1.2.1-24-ge1ad