diff options
author | Nick White <git@njw.name> | 2020-11-09 16:46:43 +0000 |
---|---|---|
committer | Nick White <git@njw.name> | 2020-11-09 16:46:43 +0000 |
commit | 4c7cdeb5646e84af3f15d4a7cd48f64d8086a6b9 (patch) | |
tree | a0a186d5ac1fbdf491a57f5ef25758deb5e51c8d /internal/pipeline | |
parent | fa1593d4b9b5f5e2b9f46137739557e29f65c765 (diff) |
[bookpipeline] Split most functionality out to package internal/pipeline
No functionality changes, but this should make it easier to make custom
builds using the pipeline in slightly different ways.
Diffstat (limited to 'internal/pipeline')
-rw-r--r-- | internal/pipeline/main.go | 725 |
1 files changed, 725 insertions, 0 deletions
diff --git a/internal/pipeline/main.go b/internal/pipeline/main.go new file mode 100644 index 0000000..6e03c50 --- /dev/null +++ b/internal/pipeline/main.go @@ -0,0 +1,725 @@ +// pipeline is a package used by the bookpipeline command, which +// handles the core functionality, using channels heavily to +// coordinate jobs. Note that it is considered an "internal" package, +// not intended for external use, and no guarantee is made of the +// stability of any interfaces provided. +package pipeline + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "net/smtp" + "os" + "os/exec" + "path/filepath" + "regexp" + "sort" + "strings" + "time" + + "rescribe.xyz/bookpipeline" + "rescribe.xyz/preproc" + "rescribe.xyz/utils/pkg/hocr" +) + +const HeartbeatSeconds = 60 + +type Clouder interface { + Init() error + ListObjects(bucket string, prefix string) ([]string, error) + Download(bucket string, key string, fn string) error + Upload(bucket string, key string, path string) error + CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) + AddToQueue(url string, msg string) error + DelFromQueue(url string, handle string) error + QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) +} + +type Pipeliner interface { + Clouder + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string + WIPStorageId() string + GetLogger() *log.Logger + Log(v ...interface{}) +} + +type pageimg struct { + hocr, img string +} + +type mailSettings struct { + server, port, user, pass, from, to string +} + +func GetMailSettings() (mailSettings, error) { + p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings") + b, err := ioutil.ReadFile(p) + if err != nil { + return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err) + } + f := strings.Fields(string(b)) + if len(f) != 6 { + return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f)) + } + return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil +} + +func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { + for key := range dl { + fn := filepath.Join(dir, filepath.Base(key)) + logger.Println("Downloading", key) + err := conn.Download(conn.WIPStorageId(), key, fn) + if err != nil { + for range dl { + } // consume the rest of the receiving channel so it isn't blocked + close(process) + errc <- err + return + } + process <- fn + } + close(process) +} + +func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { + for path := range c { + name := filepath.Base(path) + key := bookname + "/" + name + logger.Println("Uploading", key) + err := conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + err = os.Remove(path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + } + + done <- true +} + +func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { + for path := range c { + name := filepath.Base(path) + key := bookname + "/" + name + logger.Println("Uploading", key) + err := conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + err = os.Remove(path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + logger.Println("Adding", key, training, "to queue", toQueue) + err = conn.AddToQueue(toQueue, key+" "+training) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + } + + done <- true +} + +func Preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range pre { + logger.Println("Preprocessing", path) + done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30) + if err != nil { + for range pre { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + _ = os.Remove(path) + for _, p := range done { + up <- p + } + } + close(up) +} + +func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range towipe { + logger.Println("Wiping", path) + s := strings.Split(path, ".") + base := strings.Join(s[:len(s)-1], "") + outpath := base + "_bin0.0.png" + err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30) + if err != nil { + for range towipe { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + up <- outpath + } + close(up) +} + +func Ocr(training string) func(chan string, chan string, chan error, *log.Logger) { + return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range toocr { + logger.Println("OCRing", path) + name := strings.Replace(path, ".png", "", 1) + cmd := exec.Command("tesseract", "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + if err != nil { + for range toocr { + } // consume the rest of the receiving channel so it isn't blocked + errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String()) + return + } + up <- name + ".hocr" + } + close(up) + } +} + +func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { + return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { + confs := make(map[string][]*bookpipeline.Conf) + bestconfs := make(map[string]*bookpipeline.Conf) + savedir := "" + + for path := range toanalyse { + if savedir == "" { + savedir = filepath.Dir(path) + } + logger.Println("Calculating confidence for", path) + avg, err := hocr.GetAvgConf(path) + if err != nil && err.Error() == "No words found" { + continue + } + if err != nil { + for range toanalyse { + } // consume the rest of the receiving channel so it isn't blocked + errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err) + return + } + base := filepath.Base(path) + codestart := strings.Index(base, "_bin") + name := base[0:codestart] + var c bookpipeline.Conf + c.Path = path + c.Code = base[codestart:] + c.Conf = avg + confs[name] = append(confs[name], &c) + } + + fn := filepath.Join(savedir, "conf") + logger.Println("Saving confidences in file", fn) + f, err := os.Create(fn) + if err != nil { + errc <- fmt.Errorf("Error creating file %s: %s", fn, err) + return + } + defer f.Close() + + logger.Println("Finding best confidence for each page, and saving all confidences") + for base, conf := range confs { + var best float64 + for _, c := range conf { + if c.Conf > best { + best = c.Conf + bestconfs[base] = c + } + _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) + if err != nil { + errc <- fmt.Errorf("Error writing confidences file: %s", err) + return + } + } + } + up <- fn + + logger.Println("Creating best file listing the best file for each page") + fn = filepath.Join(savedir, "best") + f, err = os.Create(fn) + if err != nil { + errc <- fmt.Errorf("Error creating file %s: %s", fn, err) + return + } + defer f.Close() + for _, conf := range bestconfs { + _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) + } + up <- fn + + var pgs []string + for _, conf := range bestconfs { + pgs = append(pgs, conf.Path) + } + sort.Strings(pgs) + + logger.Println("Downloading binarised and original images to create PDFs") + bookname, err := filepath.Rel(os.TempDir(), savedir) + if err != nil { + errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err) + return + } + colourpdf := new(bookpipeline.Fpdf) + err = colourpdf.Setup() + if err != nil { + errc <- fmt.Errorf("Failed to set up PDF: %s", err) + return + } + binarisedpdf := new(bookpipeline.Fpdf) + err = binarisedpdf.Setup() + if err != nil { + errc <- fmt.Errorf("Failed to set up PDF: %s", err) + return + } + binhascontent, colourhascontent := false, false + + var colourimgs, binimgs []pageimg + + for _, pg := range pgs { + base := filepath.Base(pg) + nosuffix := strings.TrimSuffix(base, ".hocr") + p := strings.SplitN(base, "_bin", 2) + + var fn string + if len(p) > 1 { + fn = p[0] + ".jpg" + } else { + fn = nosuffix + ".jpg" + } + + binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"}) + colourimgs = append(colourimgs, pageimg{hocr: base, img: fn}) + } + + for _, pg := range binimgs { + logger.Println("Downloading binarised page to add to PDF", pg.img) + err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img)) + if err != nil { + logger.Println("Download failed; skipping page", pg.img) + } else { + err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true) + if err != nil { + errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) + return + } + binhascontent = true + err = os.Remove(filepath.Join(savedir, pg.img)) + if err != nil { + errc <- err + return + } + } + } + + if binhascontent { + fn = filepath.Join(savedir, bookname+".binarised.pdf") + err = binarisedpdf.Save(fn) + if err != nil { + errc <- fmt.Errorf("Failed to save binarised pdf: %s", err) + return + } + up <- fn + key := bookname + "/" + bookname + ".binarised.pdf" + conn.Log("Uploading", key) + err := conn.Upload(conn.WIPStorageId(), key, fn) + if err != nil { + } + } + + for _, pg := range colourimgs { + logger.Println("Downloading colour page to add to PDF", pg.img) + colourfn := pg.img + err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) + if err != nil { + colourfn = strings.Replace(pg.img, ".jpg", ".png", 1) + logger.Println("Download failed; trying", colourfn) + err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) + if err != nil { + logger.Println("Download failed; skipping page", pg.img) + } + } + if err == nil { + err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true) + if err != nil { + errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) + return + } + colourhascontent = true + err = os.Remove(filepath.Join(savedir, colourfn)) + if err != nil { + errc <- err + return + } + } + } + if colourhascontent { + fn = filepath.Join(savedir, bookname+".colour.pdf") + err = colourpdf.Save(fn) + if err != nil { + errc <- fmt.Errorf("Failed to save colour pdf: %s", err) + return + } + up <- fn + } + + logger.Println("Creating graph") + fn = filepath.Join(savedir, "graph.png") + f, err = os.Create(fn) + if err != nil { + errc <- fmt.Errorf("Error creating file %s: %s", fn, err) + return + } + defer f.Close() + err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) + if err != nil && err.Error() != "Not enough valid confidences" { + errc <- fmt.Errorf("Error rendering graph: %s", err) + return + } + up <- fn + + close(up) + } +} + +func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { + currentmsg := msg + for range t.C { + m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) + if err != nil { + // This is for better debugging of the heartbeat issue + conn.Log("Error with heartbeat", err) + os.Exit(1) + // TODO: would be better to ensure this error stops any running + // processes, as they will ultimately fail in the case of + // it. could do this by setting a global variable that + // processes check each time they loop. + errc <- err + t.Stop() + return + } + if m.Id != "" { + conn.Log("Replaced message handle as visibilitytimeout limit was reached") + currentmsg = m + // TODO: maybe handle communicating new msg more gracefully than this + for range msgc { + } // throw away any old msgc + msgc <- m + } + } +} + +// allOCRed checks whether all pages of a book have been OCRed. +// This is determined by whether every _bin0.?.png file has a +// corresponding .hocr file. +func allOCRed(bookname string, conn Pipeliner) bool { + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + return false + } + + preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) + + atleastone := false + for _, png := range objs { + if preprocessedPattern.MatchString(png) { + atleastone = true + found := false + b := strings.TrimSuffix(filepath.Base(png), ".png") + hocrname := bookname + "/" + b + ".hocr" + for _, hocr := range objs { + if hocr == hocrname { + found = true + break + } + } + if found == false { + return false + } + } + } + if atleastone == false { + return false + } + return true +} + +// OcrPage OCRs a page based on a message. It may make sense to +// roll this back into processBook (on which it is based) once +// working well. +func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { + dl := make(chan string) + msgc := make(chan bookpipeline.Qmsg) + processc := make(chan string) + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) + + msgparts := strings.Split(msg.Body, " ") + bookname := filepath.Dir(msgparts[0]) + if len(msgparts) > 1 && msgparts[1] != "" { + process = Ocr(msgparts[1]) + } + + d := filepath.Join(os.TempDir(), bookname) + err := os.MkdirAll(d, 0755) + if err != nil { + return fmt.Errorf("Failed to create directory %s: %s", d, err) + } + + t := time.NewTicker(HeartbeatSeconds * time.Second) + go heartbeat(conn, t, msg, fromQueue, msgc, errc) + + // these functions will do their jobs when their channels have data + go download(dl, processc, conn, d, errc, conn.GetLogger()) + go process(processc, upc, errc, conn.GetLogger()) + go up(upc, done, conn, bookname, errc, conn.GetLogger()) + + dl <- msgparts[0] + close(dl) + + // wait for either the done or errc channel to be sent to + select { + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + return err + case <-done: + } + + if allOCRed(bookname, conn) && toQueue != "" { + conn.Log("Sending", bookname, "to queue", toQueue) + err = conn.AddToQueue(toQueue, bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return fmt.Errorf("Error adding to queue %s: %s", bookname, err) + } + } + + t.Stop() + + // check whether we're using a newer msg handle + select { + case m, ok := <-msgc: + if ok { + msg = m + conn.Log("Using new message handle to delete message from queue") + } + default: + conn.Log("Using original message handle to delete message from queue") + } + + conn.Log("Deleting original message from queue", fromQueue) + err = conn.DelFromQueue(fromQueue, msg.Handle) + if err != nil { + _ = os.RemoveAll(d) + return fmt.Errorf("Error deleting message from queue: %s", err) + } + + err = os.RemoveAll(d) + if err != nil { + return fmt.Errorf("Failed to remove directory %s: %s", d, err) + } + + return nil +} + +func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { + dl := make(chan string) + msgc := make(chan bookpipeline.Qmsg) + processc := make(chan string) + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) + + msgparts := strings.Split(msg.Body, " ") + bookname := msgparts[0] + + var training string + if len(msgparts) > 1 { + training = msgparts[1] + } + + d := filepath.Join(os.TempDir(), bookname) + err := os.MkdirAll(d, 0755) + if err != nil { + return fmt.Errorf("Failed to create directory %s: %s", d, err) + } + + t := time.NewTicker(HeartbeatSeconds * time.Second) + go heartbeat(conn, t, msg, fromQueue, msgc, errc) + + // these functions will do their jobs when their channels have data + go download(dl, processc, conn, d, errc, conn.GetLogger()) + go process(processc, upc, errc, conn.GetLogger()) + if toQueue == conn.OCRPageQueueId() { + go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) + } else { + go up(upc, done, conn, bookname, errc, conn.GetLogger()) + } + + conn.Log("Getting list of objects to download") + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err) + } + var todl []string + for _, n := range objs { + if !match.MatchString(n) { + conn.Log("Skipping item that doesn't match target", n) + continue + } + todl = append(todl, n) + } + for _, a := range todl { + dl <- a + } + close(dl) + + // wait for either the done or errc channel to be sent to + select { + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + // if the error is in preprocessing / wipeonly, chances are that it will never + // complete, and will fill the ocrpage queue with parts which succeeded + // on each run, so in that case it's better to delete the message from + // the queue and notify us. + if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { + conn.Log("Deleting message from queue due to a bad error", fromQueue) + err2 := conn.DelFromQueue(fromQueue, msg.Handle) + if err2 != nil { + conn.Log("Error deleting message from queue", err2) + } + ms, err2 := GetMailSettings() + if err2 != nil { + conn.Log("Failed to mail settings ", err2) + } + if err2 == nil && ms.server != "" { + logs, err2 := getLogs() + if err2 != nil { + conn.Log("Failed to get logs ", err2) + logs = "" + } + msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" + + "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" + + " Fail message: %s\r\nFull log:\r\n%s\r\n", + ms.to, ms.from, bookname, err, logs) + host := fmt.Sprintf("%s:%s", ms.server, ms.port) + auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server) + err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg)) + if err2 != nil { + conn.Log("Error sending email ", err2) + } + } + } + return err + case <-done: + } + + if toQueue != "" && toQueue != conn.OCRPageQueueId() { + conn.Log("Sending", bookname, "to queue", toQueue) + err = conn.AddToQueue(toQueue, bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return fmt.Errorf("Error adding to queue %s: %s", bookname, err) + } + } + + t.Stop() + + // check whether we're using a newer msg handle + select { + case m, ok := <-msgc: + if ok { + msg = m + conn.Log("Using new message handle to delete message from queue") + } + default: + conn.Log("Using original message handle to delete message from queue") + } + + conn.Log("Deleting original message from queue", fromQueue) + err = conn.DelFromQueue(fromQueue, msg.Handle) + if err != nil { + _ = os.RemoveAll(d) + return fmt.Errorf("Error deleting message from queue: %s", err) + } + + err = os.RemoveAll(d) + if err != nil { + return fmt.Errorf("Failed to remove directory %s: %s", d, err) + } + + return nil +} + +// TODO: rather than relying on journald, would be nicer to save the logs +// ourselves maybe, so that we weren't relying on a particular systemd +// setup. this can be done by having the conn.Log also append line +// to a file (though that would mean everything would have to go through +// conn.Log, which we're not consistently doing yet). the correct thing +// to do then would be to implement a new interface that covers the part +// of log.Logger we use (e.g. Print and Printf), and then have an exported +// conn struct that implements those, so that we could pass a log.Logger +// or the new conn struct everywhere (we wouldn't be passing a log.Logger, +// it's just good to be able to keep the compatibility) +func getLogs() (string, error) { + cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all") + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + return stdout.String(), err +} + +func SaveLogs(conn Pipeliner, starttime int64, hostname string) error { + logs, err := getLogs() + if err != nil { + return fmt.Errorf("Error getting logs, error: %v", err) + } + key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname) + path := filepath.Join(os.TempDir(), key) + f, err := os.Create(path) + if err != nil { + return fmt.Errorf("Error creating log file", err) + } + defer f.Close() + _, err = f.WriteString(logs) + if err != nil { + return fmt.Errorf("Error saving log file", err) + } + _ = f.Close() + err = conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + return fmt.Errorf("Error uploading log", err) + } + conn.Log("Log saved to", key) + return nil +} |