From e9f6e6546ba6f8039897fe8e4396858f300af217 Mon Sep 17 00:00:00 2001 From: Nick White Date: Fri, 6 Nov 2020 16:44:42 +0000 Subject: Add git clone advice to readme --- README | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README b/README index 6829c1c..2f1a95a 100644 --- a/README +++ b/README @@ -9,6 +9,11 @@ by running `go get rescribe.xyz/bookpipeline/...` and documentation can be read with the `go doc` command or online at . +If you just want to install and use the commands, you can get the +package with `git clone https://git.rescribe.xyz/bookpipeline`, and +then install them with `go install ./...` from within the +`bookpipeline` directory. + ## Commands The commands in the cmd/ directory are at the heart of this -- cgit v1.2.1-24-ge1ad From ad4416a7d53e3fb783d15a67095994a7623bf109 Mon Sep 17 00:00:00 2001 From: Nick White Date: Fri, 6 Nov 2020 16:57:20 +0000 Subject: Document the local mode --- README | 14 ++++++++++++++ doc.go | 17 +++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/README b/README index 2f1a95a..22c4bbc 100644 --- a/README +++ b/README @@ -46,6 +46,20 @@ setting: - pdfbook : creates a searchable PDF from a directory of hOCR and image files +## Local operation + +While bookpipeline was built with cloud based operation in mind, there is also +a local mode that can be used to run OCR jobs from a single computer, with all +the benefits of preprocessing, choosing the best threshold for each image, +graph creation, PDF creation, and so on that the pipeline provides. + +You can use this by passing the '-c local' flag to the core bookpipeline +commands. Here is a simple example run: + + booktopipeline -c local MyBook + bookpipeline -v -c local # run until MyBook has finished processing + getpipelinebook -c local MyBook + ## Contributions Any and all comments, bug reports, patches or pull requests would diff --git a/doc.go b/doc.go index 823ac2f..179a4f3 100644 --- a/doc.go +++ b/doc.go @@ -168,5 +168,22 @@ At present the bookpipeline has some silly limitations of file names for book pages to be recognised. This is something which will be fixed in due course. Pages that are to be fully processed: *[0-9]{4}.jpg$ Pages that are to be wiped only: *[0-9]{6}(.bin)?.png$ + +Local operation + +While bookpipeline was built with cloud based operation in mind, there is also +a local mode that can be used to run OCR jobs from a single computer, with all +the benefits of preprocessing, choosing the best threshold for each image, +graph creation and so on that the pipeline provides. + +You can use this by passing the '-c local' flag to the core bookpipeline +commands. Here is a simple example run: + + booktopipeline -c local MyBook + bookpipeline -v -c local # run until MyBook has finished processing + getpipelinebook -c local MyBook + +Note that the local mode is not as well tested as the core cloud modes; please +report any bugs you find with it. */ package bookpipeline -- cgit v1.2.1-24-ge1ad From 48f817f0dfd3e89c372ac358418fe69b43eefa1b Mon Sep 17 00:00:00 2001 From: Nick White Date: Fri, 6 Nov 2020 17:00:12 +0000 Subject: Fix the README to be valid markdown in the local example --- README | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README b/README index 22c4bbc..681d731 100644 --- a/README +++ b/README @@ -56,9 +56,9 @@ graph creation, PDF creation, and so on that the pipeline provides. You can use this by passing the '-c local' flag to the core bookpipeline commands. Here is a simple example run: - booktopipeline -c local MyBook - bookpipeline -v -c local # run until MyBook has finished processing - getpipelinebook -c local MyBook + booktopipeline -c local MyBook + bookpipeline -v -c local # run until MyBook has finished processing + getpipelinebook -c local MyBook ## Contributions -- cgit v1.2.1-24-ge1ad From 34b5735503edb9c5ab635c84cd356f19df7d7381 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 12:40:57 +0000 Subject: Set hocr config options directly rather than relying on 'hocr' config file This ensures that bookpipeline will still work even if TESSDATA_PREFIX has been set to a directory without configs in it. --- cmd/bookpipeline/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 36295a6..b3ffc53 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -216,7 +216,7 @@ func ocr(training string) func(chan string, chan string, chan error, *log.Logger for path := range toocr { logger.Println("OCRing", path) name := strings.Replace(path, ".png", "", 1) - cmd := exec.Command("tesseract", "-l", training, path, name, "hocr") + cmd := exec.Command("tesseract", "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr -- cgit v1.2.1-24-ge1ad From c36597e65956383ec830f04e56f54158ae839a72 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 15:19:55 +0000 Subject: [bookpipeline] Improve interface, particularly for local use, by disabling (failing) log saving, mail sending, and removing erroneous references to AWS --- cmd/bookpipeline/main.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index b3ffc53..7a7a277 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -801,17 +801,20 @@ func main() { log.Fatalln("Unknown connection type") } - _, err := getMailSettings() - if err != nil { - conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err) + var err error + if *conntype != "local" { + _, err = getMailSettings() + if err != nil { + conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err) + } } - conn.Log("Setting up AWS session") + conn.Log("Setting up session") err = conn.Init() if err != nil { - log.Fatalln("Error setting up cloud connection:", err) + log.Fatalln("Error setting up connection:", err) } - conn.Log("Finished setting up AWS session") + conn.Log("Finished setting up session") starttime := time.Now().Unix() hostname, err := os.Hostname() @@ -836,6 +839,9 @@ func main() { } shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown) savelognow = time.NewTicker(LogSaveTime) + if *conntype == "local" { + savelognow.Stop() + } for { select { -- cgit v1.2.1-24-ge1ad From fa1593d4b9b5f5e2b9f46137739557e29f65c765 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 15:20:09 +0000 Subject: Add -autostop, so time to shutdown can be specified, and so the process can just be stopped after a period, rather than the whole computer shut down --- cmd/bookpipeline/main.go | 46 +++++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 7a7a277..5a6606d 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -27,7 +27,7 @@ import ( "rescribe.xyz/utils/pkg/hocr" ) -const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] +const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] [-autostop secs] Watches the preprocess, wipeonly, ocrpage and analyse queues for messages. When one is found this general process is followed: @@ -48,7 +48,6 @@ the contents: {smtpserver} {port} {username} {password} {from} {to} ` const PauseBetweenChecks = 3 * time.Minute -const TimeBeforeShutdown = 5 * time.Minute const LogSaveTime = 1 * time.Minute const HeartbeatSeconds = 60 @@ -719,6 +718,12 @@ func stopTimer(t *time.Timer) { } } +func resetTimer(t *time.Timer, d time.Duration) { + if d > 0 { + t.Reset(d) + } +} + // TODO: rather than relying on journald, would be nicer to save the logs // ourselves maybe, so that we weren't relying on a particular systemd // setup. this can be done by having the conn.Log also append line @@ -770,7 +775,8 @@ func main() { nowipe := flag.Bool("nw", false, "disable wipeonly") noocrpg := flag.Bool("nop", false, "disable ocr on individual pages") noanalyse := flag.Bool("na", false, "disable analysis") - autoshutdown := flag.Bool("shutdown", false, "automatically shut down if no work has been available for 5 minutes") + autostop := flag.Int64("autostop", 300, "automatically stop process if no work has been available for this number of seconds (to disable autostop set to 0)") + autoshutdown := flag.Bool("shutdown", false, "automatically shut down host computer if there has been no work to do for the duration set with -autostop") conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") flag.Usage = func() { @@ -823,7 +829,7 @@ func main() { var checkWipeQueue <-chan time.Time var checkOCRPageQueue <-chan time.Time var checkAnalyseQueue <-chan time.Time - var shutdownIfQuiet *time.Timer + var stopIfQuiet *time.Timer var savelognow *time.Ticker if !*nopreproc { checkPreQueue = time.After(0) @@ -837,7 +843,12 @@ func main() { if !*noanalyse { checkAnalyseQueue = time.After(0) } - shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown) + var quietTime = time.Duration(*autostop) * time.Second + stopIfQuiet = time.NewTimer(quietTime) + if quietTime == 0 { + stopIfQuiet.Stop() + } + savelognow = time.NewTicker(LogSaveTime) if *conntype == "local" { savelognow.Stop() @@ -857,9 +868,9 @@ func main() { continue } conn.Log("Message received on preprocess queue, processing", msg.Body) - stopTimer(shutdownIfQuiet) + stopTimer(stopIfQuiet) err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) - shutdownIfQuiet.Reset(TimeBeforeShutdown) + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during preprocess", err) } @@ -874,10 +885,10 @@ func main() { conn.Log("No message received on wipeonly queue, sleeping") continue } - stopTimer(shutdownIfQuiet) + stopTimer(stopIfQuiet) conn.Log("Message received on wipeonly queue, processing", msg.Body) err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) - shutdownIfQuiet.Reset(TimeBeforeShutdown) + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during wipe", err) } @@ -894,10 +905,10 @@ func main() { // Have OCRPageQueue checked immediately after completion, as chances are high that // there will be more pages that should be done without delay checkOCRPageQueue = time.After(0) - stopTimer(shutdownIfQuiet) + stopTimer(stopIfQuiet) conn.Log("Message received on OCR Page queue, processing", msg.Body) err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) - shutdownIfQuiet.Reset(TimeBeforeShutdown) + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during OCR Page process", err) } @@ -912,10 +923,10 @@ func main() { conn.Log("No message received on analyse queue, sleeping") continue } - stopTimer(shutdownIfQuiet) + stopTimer(stopIfQuiet) conn.Log("Message received on analyse queue, processing", msg.Body) err = processBook(msg, conn, analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") - shutdownIfQuiet.Reset(TimeBeforeShutdown) + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during analysis", err) } @@ -925,10 +936,15 @@ func main() { if err != nil { conn.Log("Error saving logs", err) } - case <-shutdownIfQuiet.C: - if !*autoshutdown { + case <-stopIfQuiet.C: + if quietTime == 0 { continue } + if !*autoshutdown { + conn.Log("Stopping pipeline") + _ = savelogs(conn, starttime, hostname) + return + } conn.Log("Shutting down") _ = savelogs(conn, starttime, hostname) cmd := exec.Command("sudo", "systemctl", "poweroff") -- cgit v1.2.1-24-ge1ad From 4c7cdeb5646e84af3f15d4a7cd48f64d8086a6b9 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 16:46:43 +0000 Subject: [bookpipeline] Split most functionality out to package internal/pipeline No functionality changes, but this should make it easier to make custom builds using the pipeline in slightly different ways. --- cmd/bookpipeline/main.go | 711 +-------------------------------------------- internal/pipeline/main.go | 725 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 740 insertions(+), 696 deletions(-) create mode 100644 internal/pipeline/main.go diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 5a6606d..aff7b87 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -11,20 +11,15 @@ import ( "bytes" "flag" "fmt" - "io/ioutil" "log" - "net/smtp" "os" "os/exec" - "path/filepath" "regexp" - "sort" - "strings" "time" "rescribe.xyz/bookpipeline" - "rescribe.xyz/preproc" - "rescribe.xyz/utils/pkg/hocr" + + "rescribe.xyz/bookpipeline/internal/pipeline" ) const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] [-autostop secs] @@ -47,9 +42,9 @@ this put a text file in {UserConfigDir}/bookpipeline/mailsettings with the contents: {smtpserver} {port} {username} {password} {from} {to} ` +const QueueTimeoutSecs = 2 * 60 const PauseBetweenChecks = 3 * time.Minute const LogSaveTime = 1 * time.Minute -const HeartbeatSeconds = 60 // null writer to enable non-verbose logging to be discarded type NullWriter bool @@ -80,638 +75,6 @@ type Pipeliner interface { Log(v ...interface{}) } -type pageimg struct { - hocr, img string -} - -type mailSettings struct { - server, port, user, pass, from, to string -} - -func getMailSettings() (mailSettings, error) { - p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings") - b, err := ioutil.ReadFile(p) - if err != nil { - return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err) - } - f := strings.Fields(string(b)) - if len(f) != 6 { - return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f)) - } - return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil -} - -func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { - for key := range dl { - fn := filepath.Join(dir, filepath.Base(key)) - logger.Println("Downloading", key) - err := conn.Download(conn.WIPStorageId(), key, fn) - if err != nil { - for range dl { - } // consume the rest of the receiving channel so it isn't blocked - close(process) - errc <- err - return - } - process <- fn - } - close(process) -} - -func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { - for path := range c { - name := filepath.Base(path) - key := bookname + "/" + name - logger.Println("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - err = os.Remove(path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - } - - done <- true -} - -func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { - for path := range c { - name := filepath.Base(path) - key := bookname + "/" + name - logger.Println("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - err = os.Remove(path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - logger.Println("Adding", key, training, "to queue", toQueue) - err = conn.AddToQueue(toQueue, key+" "+training) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - } - - done <- true -} - -func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range pre { - logger.Println("Preprocessing", path) - done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30) - if err != nil { - for range pre { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - _ = os.Remove(path) - for _, p := range done { - up <- p - } - } - close(up) -} - -func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range towipe { - logger.Println("Wiping", path) - s := strings.Split(path, ".") - base := strings.Join(s[:len(s)-1], "") - outpath := base + "_bin0.0.png" - err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30) - if err != nil { - for range towipe { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - up <- outpath - } - close(up) -} - -func ocr(training string) func(chan string, chan string, chan error, *log.Logger) { - return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range toocr { - logger.Println("OCRing", path) - name := strings.Replace(path, ".png", "", 1) - cmd := exec.Command("tesseract", "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - err := cmd.Run() - if err != nil { - for range toocr { - } // consume the rest of the receiving channel so it isn't blocked - errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String()) - return - } - up <- name + ".hocr" - } - close(up) - } -} - -func analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { - return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { - confs := make(map[string][]*bookpipeline.Conf) - bestconfs := make(map[string]*bookpipeline.Conf) - savedir := "" - - for path := range toanalyse { - if savedir == "" { - savedir = filepath.Dir(path) - } - logger.Println("Calculating confidence for", path) - avg, err := hocr.GetAvgConf(path) - if err != nil && err.Error() == "No words found" { - continue - } - if err != nil { - for range toanalyse { - } // consume the rest of the receiving channel so it isn't blocked - errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err) - return - } - base := filepath.Base(path) - codestart := strings.Index(base, "_bin") - name := base[0:codestart] - var c bookpipeline.Conf - c.Path = path - c.Code = base[codestart:] - c.Conf = avg - confs[name] = append(confs[name], &c) - } - - fn := filepath.Join(savedir, "conf") - logger.Println("Saving confidences in file", fn) - f, err := os.Create(fn) - if err != nil { - errc <- fmt.Errorf("Error creating file %s: %s", fn, err) - return - } - defer f.Close() - - logger.Println("Finding best confidence for each page, and saving all confidences") - for base, conf := range confs { - var best float64 - for _, c := range conf { - if c.Conf > best { - best = c.Conf - bestconfs[base] = c - } - _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) - if err != nil { - errc <- fmt.Errorf("Error writing confidences file: %s", err) - return - } - } - } - up <- fn - - logger.Println("Creating best file listing the best file for each page") - fn = filepath.Join(savedir, "best") - f, err = os.Create(fn) - if err != nil { - errc <- fmt.Errorf("Error creating file %s: %s", fn, err) - return - } - defer f.Close() - for _, conf := range bestconfs { - _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) - } - up <- fn - - var pgs []string - for _, conf := range bestconfs { - pgs = append(pgs, conf.Path) - } - sort.Strings(pgs) - - logger.Println("Downloading binarised and original images to create PDFs") - bookname, err := filepath.Rel(os.TempDir(), savedir) - if err != nil { - errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err) - return - } - colourpdf := new(bookpipeline.Fpdf) - err = colourpdf.Setup() - if err != nil { - errc <- fmt.Errorf("Failed to set up PDF: %s", err) - return - } - binarisedpdf := new(bookpipeline.Fpdf) - err = binarisedpdf.Setup() - if err != nil { - errc <- fmt.Errorf("Failed to set up PDF: %s", err) - return - } - binhascontent, colourhascontent := false, false - - var colourimgs, binimgs []pageimg - - for _, pg := range pgs { - base := filepath.Base(pg) - nosuffix := strings.TrimSuffix(base, ".hocr") - p := strings.SplitN(base, "_bin", 2) - - var fn string - if len(p) > 1 { - fn = p[0] + ".jpg" - } else { - fn = nosuffix + ".jpg" - } - - binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"}) - colourimgs = append(colourimgs, pageimg{hocr: base, img: fn}) - } - - for _, pg := range binimgs { - logger.Println("Downloading binarised page to add to PDF", pg.img) - err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img)) - if err != nil { - logger.Println("Download failed; skipping page", pg.img) - } else { - err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true) - if err != nil { - errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) - return - } - binhascontent = true - err = os.Remove(filepath.Join(savedir, pg.img)) - if err != nil { - errc <- err - return - } - } - } - - if binhascontent { - fn = filepath.Join(savedir, bookname+".binarised.pdf") - err = binarisedpdf.Save(fn) - if err != nil { - errc <- fmt.Errorf("Failed to save binarised pdf: %s", err) - return - } - up <- fn - key := bookname + "/" + bookname + ".binarised.pdf" - conn.Log("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, fn) - if err != nil { - } - } - - for _, pg := range colourimgs { - logger.Println("Downloading colour page to add to PDF", pg.img) - colourfn := pg.img - err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) - if err != nil { - colourfn = strings.Replace(pg.img, ".jpg", ".png", 1) - logger.Println("Download failed; trying", colourfn) - err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) - if err != nil { - logger.Println("Download failed; skipping page", pg.img) - } - } - if err == nil { - err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true) - if err != nil { - errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) - return - } - colourhascontent = true - err = os.Remove(filepath.Join(savedir, colourfn)) - if err != nil { - errc <- err - return - } - } - } - if colourhascontent { - fn = filepath.Join(savedir, bookname+".colour.pdf") - err = colourpdf.Save(fn) - if err != nil { - errc <- fmt.Errorf("Failed to save colour pdf: %s", err) - return - } - up <- fn - } - - logger.Println("Creating graph") - fn = filepath.Join(savedir, "graph.png") - f, err = os.Create(fn) - if err != nil { - errc <- fmt.Errorf("Error creating file %s: %s", fn, err) - return - } - defer f.Close() - err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) - if err != nil && err.Error() != "Not enough valid confidences" { - errc <- fmt.Errorf("Error rendering graph: %s", err) - return - } - up <- fn - - close(up) - } -} - -func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { - currentmsg := msg - for range t.C { - m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) - if err != nil { - // This is for better debugging of the heartbeat issue - conn.Log("Error with heartbeat", err) - os.Exit(1) - // TODO: would be better to ensure this error stops any running - // processes, as they will ultimately fail in the case of - // it. could do this by setting a global variable that - // processes check each time they loop. - errc <- err - t.Stop() - return - } - if m.Id != "" { - conn.Log("Replaced message handle as visibilitytimeout limit was reached") - currentmsg = m - // TODO: maybe handle communicating new msg more gracefully than this - for range msgc { - } // throw away any old msgc - msgc <- m - } - } -} - -// allOCRed checks whether all pages of a book have been OCRed. -// This is determined by whether every _bin0.?.png file has a -// corresponding .hocr file. -func allOCRed(bookname string, conn Pipeliner) bool { - objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) - if err != nil { - return false - } - - preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) - - atleastone := false - for _, png := range objs { - if preprocessedPattern.MatchString(png) { - atleastone = true - found := false - b := strings.TrimSuffix(filepath.Base(png), ".png") - hocrname := bookname + "/" + b + ".hocr" - for _, hocr := range objs { - if hocr == hocrname { - found = true - break - } - } - if found == false { - return false - } - } - } - if atleastone == false { - return false - } - return true -} - -// ocrPage OCRs a page based on a message. It may make sense to -// roll this back into processBook (on which it is based) once -// working well. -func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { - dl := make(chan string) - msgc := make(chan bookpipeline.Qmsg) - processc := make(chan string) - upc := make(chan string) - done := make(chan bool) - errc := make(chan error) - - msgparts := strings.Split(msg.Body, " ") - bookname := filepath.Dir(msgparts[0]) - if len(msgparts) > 1 && msgparts[1] != "" { - process = ocr(msgparts[1]) - } - - d := filepath.Join(os.TempDir(), bookname) - err := os.MkdirAll(d, 0755) - if err != nil { - return fmt.Errorf("Failed to create directory %s: %s", d, err) - } - - t := time.NewTicker(HeartbeatSeconds * time.Second) - go heartbeat(conn, t, msg, fromQueue, msgc, errc) - - // these functions will do their jobs when their channels have data - go download(dl, processc, conn, d, errc, conn.GetLogger()) - go process(processc, upc, errc, conn.GetLogger()) - go up(upc, done, conn, bookname, errc, conn.GetLogger()) - - dl <- msgparts[0] - close(dl) - - // wait for either the done or errc channel to be sent to - select { - case err = <-errc: - t.Stop() - _ = os.RemoveAll(d) - return err - case <-done: - } - - if allOCRed(bookname, conn) && toQueue != "" { - conn.Log("Sending", bookname, "to queue", toQueue) - err = conn.AddToQueue(toQueue, bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return fmt.Errorf("Error adding to queue %s: %s", bookname, err) - } - } - - t.Stop() - - // check whether we're using a newer msg handle - select { - case m, ok := <-msgc: - if ok { - msg = m - conn.Log("Using new message handle to delete message from queue") - } - default: - conn.Log("Using original message handle to delete message from queue") - } - - conn.Log("Deleting original message from queue", fromQueue) - err = conn.DelFromQueue(fromQueue, msg.Handle) - if err != nil { - _ = os.RemoveAll(d) - return fmt.Errorf("Error deleting message from queue: %s", err) - } - - err = os.RemoveAll(d) - if err != nil { - return fmt.Errorf("Failed to remove directory %s: %s", d, err) - } - - return nil -} - -func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { - dl := make(chan string) - msgc := make(chan bookpipeline.Qmsg) - processc := make(chan string) - upc := make(chan string) - done := make(chan bool) - errc := make(chan error) - - msgparts := strings.Split(msg.Body, " ") - bookname := msgparts[0] - - var training string - if len(msgparts) > 1 { - training = msgparts[1] - } - - d := filepath.Join(os.TempDir(), bookname) - err := os.MkdirAll(d, 0755) - if err != nil { - return fmt.Errorf("Failed to create directory %s: %s", d, err) - } - - t := time.NewTicker(HeartbeatSeconds * time.Second) - go heartbeat(conn, t, msg, fromQueue, msgc, errc) - - // these functions will do their jobs when their channels have data - go download(dl, processc, conn, d, errc, conn.GetLogger()) - go process(processc, upc, errc, conn.GetLogger()) - if toQueue == conn.OCRPageQueueId() { - go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) - } else { - go up(upc, done, conn, bookname, errc, conn.GetLogger()) - } - - conn.Log("Getting list of objects to download") - objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err) - } - var todl []string - for _, n := range objs { - if !match.MatchString(n) { - conn.Log("Skipping item that doesn't match target", n) - continue - } - todl = append(todl, n) - } - for _, a := range todl { - dl <- a - } - close(dl) - - // wait for either the done or errc channel to be sent to - select { - case err = <-errc: - t.Stop() - _ = os.RemoveAll(d) - // if the error is in preprocessing / wipeonly, chances are that it will never - // complete, and will fill the ocrpage queue with parts which succeeded - // on each run, so in that case it's better to delete the message from - // the queue and notify us. - if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { - conn.Log("Deleting message from queue due to a bad error", fromQueue) - err2 := conn.DelFromQueue(fromQueue, msg.Handle) - if err2 != nil { - conn.Log("Error deleting message from queue", err2) - } - ms, err2 := getMailSettings() - if err2 != nil { - conn.Log("Failed to mail settings ", err2) - } - if err2 == nil && ms.server != "" { - logs, err2 := getlogs() - if err2 != nil { - conn.Log("Failed to get logs ", err2) - logs = "" - } - msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" + - "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" + - " Fail message: %s\r\nFull log:\r\n%s\r\n", - ms.to, ms.from, bookname, err, logs) - host := fmt.Sprintf("%s:%s", ms.server, ms.port) - auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server) - err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg)) - if err2 != nil { - conn.Log("Error sending email ", err2) - } - } - } - return err - case <-done: - } - - if toQueue != "" && toQueue != conn.OCRPageQueueId() { - conn.Log("Sending", bookname, "to queue", toQueue) - err = conn.AddToQueue(toQueue, bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return fmt.Errorf("Error adding to queue %s: %s", bookname, err) - } - } - - t.Stop() - - // check whether we're using a newer msg handle - select { - case m, ok := <-msgc: - if ok { - msg = m - conn.Log("Using new message handle to delete message from queue") - } - default: - conn.Log("Using original message handle to delete message from queue") - } - - conn.Log("Deleting original message from queue", fromQueue) - err = conn.DelFromQueue(fromQueue, msg.Handle) - if err != nil { - _ = os.RemoveAll(d) - return fmt.Errorf("Error deleting message from queue: %s", err) - } - - err = os.RemoveAll(d) - if err != nil { - return fmt.Errorf("Failed to remove directory %s: %s", d, err) - } - - return nil -} - func stopTimer(t *time.Timer) { if !t.Stop() { <-t.C @@ -724,50 +87,6 @@ func resetTimer(t *time.Timer, d time.Duration) { } } -// TODO: rather than relying on journald, would be nicer to save the logs -// ourselves maybe, so that we weren't relying on a particular systemd -// setup. this can be done by having the conn.Log also append line -// to a file (though that would mean everything would have to go through -// conn.Log, which we're not consistently doing yet). the correct thing -// to do then would be to implement a new interface that covers the part -// of log.Logger we use (e.g. Print and Printf), and then have an exported -// conn struct that implements those, so that we could pass a log.Logger -// or the new conn struct everywhere (we wouldn't be passing a log.Logger, -// it's just good to be able to keep the compatibility) -func getlogs() (string, error) { - cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all") - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - err := cmd.Run() - return stdout.String(), err -} - -func savelogs(conn Pipeliner, starttime int64, hostname string) error { - logs, err := getlogs() - if err != nil { - return fmt.Errorf("Error getting logs, error: %v", err) - } - key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname) - path := filepath.Join(os.TempDir(), key) - f, err := os.Create(path) - if err != nil { - return fmt.Errorf("Error creating log file", err) - } - defer f.Close() - _, err = f.WriteString(logs) - if err != nil { - return fmt.Errorf("Error saving log file", err) - } - _ = f.Close() - err = conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - return fmt.Errorf("Error uploading log", err) - } - conn.Log("Log saved to", key) - return nil -} - func main() { verbose := flag.Bool("v", false, "verbose") training := flag.String("t", "rescribealphav5", "default tesseract training file to use (without the .traineddata part)") @@ -809,7 +128,7 @@ func main() { var err error if *conntype != "local" { - _, err = getMailSettings() + _, err = pipeline.GetMailSettings() if err != nil { conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err) } @@ -857,7 +176,7 @@ func main() { for { select { case <-checkPreQueue: - msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs) checkPreQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking preprocess queue", err) @@ -869,13 +188,13 @@ func main() { } conn.Log("Message received on preprocess queue, processing", msg.Body) stopTimer(stopIfQuiet) - err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during preprocess", err) } case <-checkWipeQueue: - msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) checkWipeQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking wipeonly queue", err) @@ -887,13 +206,13 @@ func main() { } stopTimer(stopIfQuiet) conn.Log("Message received on wipeonly queue, processing", msg.Body) - err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during wipe", err) } case <-checkOCRPageQueue: - msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs) checkOCRPageQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking OCR Page queue", err) @@ -907,13 +226,13 @@ func main() { checkOCRPageQueue = time.After(0) stopTimer(stopIfQuiet) conn.Log("Message received on OCR Page queue, processing", msg.Body) - err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during OCR Page process", err) } case <-checkAnalyseQueue: - msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs) checkAnalyseQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking analyse queue", err) @@ -925,14 +244,14 @@ func main() { } stopTimer(stopIfQuiet) conn.Log("Message received on analyse queue, processing", msg.Body) - err = processBook(msg, conn, analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") + err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during analysis", err) } case <-savelognow.C: conn.Log("Saving logs") - err = savelogs(conn, starttime, hostname) + err = pipeline.SaveLogs(conn, starttime, hostname) if err != nil { conn.Log("Error saving logs", err) } @@ -942,11 +261,11 @@ func main() { } if !*autoshutdown { conn.Log("Stopping pipeline") - _ = savelogs(conn, starttime, hostname) + _ = pipeline.SaveLogs(conn, starttime, hostname) return } conn.Log("Shutting down") - _ = savelogs(conn, starttime, hostname) + _ = pipeline.SaveLogs(conn, starttime, hostname) cmd := exec.Command("sudo", "systemctl", "poweroff") var stdout, stderr bytes.Buffer cmd.Stdout = &stdout diff --git a/internal/pipeline/main.go b/internal/pipeline/main.go new file mode 100644 index 0000000..6e03c50 --- /dev/null +++ b/internal/pipeline/main.go @@ -0,0 +1,725 @@ +// pipeline is a package used by the bookpipeline command, which +// handles the core functionality, using channels heavily to +// coordinate jobs. Note that it is considered an "internal" package, +// not intended for external use, and no guarantee is made of the +// stability of any interfaces provided. +package pipeline + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "net/smtp" + "os" + "os/exec" + "path/filepath" + "regexp" + "sort" + "strings" + "time" + + "rescribe.xyz/bookpipeline" + "rescribe.xyz/preproc" + "rescribe.xyz/utils/pkg/hocr" +) + +const HeartbeatSeconds = 60 + +type Clouder interface { + Init() error + ListObjects(bucket string, prefix string) ([]string, error) + Download(bucket string, key string, fn string) error + Upload(bucket string, key string, path string) error + CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) + AddToQueue(url string, msg string) error + DelFromQueue(url string, handle string) error + QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) +} + +type Pipeliner interface { + Clouder + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string + WIPStorageId() string + GetLogger() *log.Logger + Log(v ...interface{}) +} + +type pageimg struct { + hocr, img string +} + +type mailSettings struct { + server, port, user, pass, from, to string +} + +func GetMailSettings() (mailSettings, error) { + p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings") + b, err := ioutil.ReadFile(p) + if err != nil { + return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err) + } + f := strings.Fields(string(b)) + if len(f) != 6 { + return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f)) + } + return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil +} + +func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { + for key := range dl { + fn := filepath.Join(dir, filepath.Base(key)) + logger.Println("Downloading", key) + err := conn.Download(conn.WIPStorageId(), key, fn) + if err != nil { + for range dl { + } // consume the rest of the receiving channel so it isn't blocked + close(process) + errc <- err + return + } + process <- fn + } + close(process) +} + +func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { + for path := range c { + name := filepath.Base(path) + key := bookname + "/" + name + logger.Println("Uploading", key) + err := conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + err = os.Remove(path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + } + + done <- true +} + +func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { + for path := range c { + name := filepath.Base(path) + key := bookname + "/" + name + logger.Println("Uploading", key) + err := conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + err = os.Remove(path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + logger.Println("Adding", key, training, "to queue", toQueue) + err = conn.AddToQueue(toQueue, key+" "+training) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + } + + done <- true +} + +func Preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range pre { + logger.Println("Preprocessing", path) + done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30) + if err != nil { + for range pre { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + _ = os.Remove(path) + for _, p := range done { + up <- p + } + } + close(up) +} + +func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range towipe { + logger.Println("Wiping", path) + s := strings.Split(path, ".") + base := strings.Join(s[:len(s)-1], "") + outpath := base + "_bin0.0.png" + err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30) + if err != nil { + for range towipe { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + up <- outpath + } + close(up) +} + +func Ocr(training string) func(chan string, chan string, chan error, *log.Logger) { + return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range toocr { + logger.Println("OCRing", path) + name := strings.Replace(path, ".png", "", 1) + cmd := exec.Command("tesseract", "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + if err != nil { + for range toocr { + } // consume the rest of the receiving channel so it isn't blocked + errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String()) + return + } + up <- name + ".hocr" + } + close(up) + } +} + +func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { + return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { + confs := make(map[string][]*bookpipeline.Conf) + bestconfs := make(map[string]*bookpipeline.Conf) + savedir := "" + + for path := range toanalyse { + if savedir == "" { + savedir = filepath.Dir(path) + } + logger.Println("Calculating confidence for", path) + avg, err := hocr.GetAvgConf(path) + if err != nil && err.Error() == "No words found" { + continue + } + if err != nil { + for range toanalyse { + } // consume the rest of the receiving channel so it isn't blocked + errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err) + return + } + base := filepath.Base(path) + codestart := strings.Index(base, "_bin") + name := base[0:codestart] + var c bookpipeline.Conf + c.Path = path + c.Code = base[codestart:] + c.Conf = avg + confs[name] = append(confs[name], &c) + } + + fn := filepath.Join(savedir, "conf") + logger.Println("Saving confidences in file", fn) + f, err := os.Create(fn) + if err != nil { + errc <- fmt.Errorf("Error creating file %s: %s", fn, err) + return + } + defer f.Close() + + logger.Println("Finding best confidence for each page, and saving all confidences") + for base, conf := range confs { + var best float64 + for _, c := range conf { + if c.Conf > best { + best = c.Conf + bestconfs[base] = c + } + _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) + if err != nil { + errc <- fmt.Errorf("Error writing confidences file: %s", err) + return + } + } + } + up <- fn + + logger.Println("Creating best file listing the best file for each page") + fn = filepath.Join(savedir, "best") + f, err = os.Create(fn) + if err != nil { + errc <- fmt.Errorf("Error creating file %s: %s", fn, err) + return + } + defer f.Close() + for _, conf := range bestconfs { + _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) + } + up <- fn + + var pgs []string + for _, conf := range bestconfs { + pgs = append(pgs, conf.Path) + } + sort.Strings(pgs) + + logger.Println("Downloading binarised and original images to create PDFs") + bookname, err := filepath.Rel(os.TempDir(), savedir) + if err != nil { + errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err) + return + } + colourpdf := new(bookpipeline.Fpdf) + err = colourpdf.Setup() + if err != nil { + errc <- fmt.Errorf("Failed to set up PDF: %s", err) + return + } + binarisedpdf := new(bookpipeline.Fpdf) + err = binarisedpdf.Setup() + if err != nil { + errc <- fmt.Errorf("Failed to set up PDF: %s", err) + return + } + binhascontent, colourhascontent := false, false + + var colourimgs, binimgs []pageimg + + for _, pg := range pgs { + base := filepath.Base(pg) + nosuffix := strings.TrimSuffix(base, ".hocr") + p := strings.SplitN(base, "_bin", 2) + + var fn string + if len(p) > 1 { + fn = p[0] + ".jpg" + } else { + fn = nosuffix + ".jpg" + } + + binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"}) + colourimgs = append(colourimgs, pageimg{hocr: base, img: fn}) + } + + for _, pg := range binimgs { + logger.Println("Downloading binarised page to add to PDF", pg.img) + err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img)) + if err != nil { + logger.Println("Download failed; skipping page", pg.img) + } else { + err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true) + if err != nil { + errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) + return + } + binhascontent = true + err = os.Remove(filepath.Join(savedir, pg.img)) + if err != nil { + errc <- err + return + } + } + } + + if binhascontent { + fn = filepath.Join(savedir, bookname+".binarised.pdf") + err = binarisedpdf.Save(fn) + if err != nil { + errc <- fmt.Errorf("Failed to save binarised pdf: %s", err) + return + } + up <- fn + key := bookname + "/" + bookname + ".binarised.pdf" + conn.Log("Uploading", key) + err := conn.Upload(conn.WIPStorageId(), key, fn) + if err != nil { + } + } + + for _, pg := range colourimgs { + logger.Println("Downloading colour page to add to PDF", pg.img) + colourfn := pg.img + err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) + if err != nil { + colourfn = strings.Replace(pg.img, ".jpg", ".png", 1) + logger.Println("Download failed; trying", colourfn) + err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) + if err != nil { + logger.Println("Download failed; skipping page", pg.img) + } + } + if err == nil { + err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true) + if err != nil { + errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) + return + } + colourhascontent = true + err = os.Remove(filepath.Join(savedir, colourfn)) + if err != nil { + errc <- err + return + } + } + } + if colourhascontent { + fn = filepath.Join(savedir, bookname+".colour.pdf") + err = colourpdf.Save(fn) + if err != nil { + errc <- fmt.Errorf("Failed to save colour pdf: %s", err) + return + } + up <- fn + } + + logger.Println("Creating graph") + fn = filepath.Join(savedir, "graph.png") + f, err = os.Create(fn) + if err != nil { + errc <- fmt.Errorf("Error creating file %s: %s", fn, err) + return + } + defer f.Close() + err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) + if err != nil && err.Error() != "Not enough valid confidences" { + errc <- fmt.Errorf("Error rendering graph: %s", err) + return + } + up <- fn + + close(up) + } +} + +func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { + currentmsg := msg + for range t.C { + m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) + if err != nil { + // This is for better debugging of the heartbeat issue + conn.Log("Error with heartbeat", err) + os.Exit(1) + // TODO: would be better to ensure this error stops any running + // processes, as they will ultimately fail in the case of + // it. could do this by setting a global variable that + // processes check each time they loop. + errc <- err + t.Stop() + return + } + if m.Id != "" { + conn.Log("Replaced message handle as visibilitytimeout limit was reached") + currentmsg = m + // TODO: maybe handle communicating new msg more gracefully than this + for range msgc { + } // throw away any old msgc + msgc <- m + } + } +} + +// allOCRed checks whether all pages of a book have been OCRed. +// This is determined by whether every _bin0.?.png file has a +// corresponding .hocr file. +func allOCRed(bookname string, conn Pipeliner) bool { + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + return false + } + + preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) + + atleastone := false + for _, png := range objs { + if preprocessedPattern.MatchString(png) { + atleastone = true + found := false + b := strings.TrimSuffix(filepath.Base(png), ".png") + hocrname := bookname + "/" + b + ".hocr" + for _, hocr := range objs { + if hocr == hocrname { + found = true + break + } + } + if found == false { + return false + } + } + } + if atleastone == false { + return false + } + return true +} + +// OcrPage OCRs a page based on a message. It may make sense to +// roll this back into processBook (on which it is based) once +// working well. +func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { + dl := make(chan string) + msgc := make(chan bookpipeline.Qmsg) + processc := make(chan string) + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) + + msgparts := strings.Split(msg.Body, " ") + bookname := filepath.Dir(msgparts[0]) + if len(msgparts) > 1 && msgparts[1] != "" { + process = Ocr(msgparts[1]) + } + + d := filepath.Join(os.TempDir(), bookname) + err := os.MkdirAll(d, 0755) + if err != nil { + return fmt.Errorf("Failed to create directory %s: %s", d, err) + } + + t := time.NewTicker(HeartbeatSeconds * time.Second) + go heartbeat(conn, t, msg, fromQueue, msgc, errc) + + // these functions will do their jobs when their channels have data + go download(dl, processc, conn, d, errc, conn.GetLogger()) + go process(processc, upc, errc, conn.GetLogger()) + go up(upc, done, conn, bookname, errc, conn.GetLogger()) + + dl <- msgparts[0] + close(dl) + + // wait for either the done or errc channel to be sent to + select { + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + return err + case <-done: + } + + if allOCRed(bookname, conn) && toQueue != "" { + conn.Log("Sending", bookname, "to queue", toQueue) + err = conn.AddToQueue(toQueue, bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return fmt.Errorf("Error adding to queue %s: %s", bookname, err) + } + } + + t.Stop() + + // check whether we're using a newer msg handle + select { + case m, ok := <-msgc: + if ok { + msg = m + conn.Log("Using new message handle to delete message from queue") + } + default: + conn.Log("Using original message handle to delete message from queue") + } + + conn.Log("Deleting original message from queue", fromQueue) + err = conn.DelFromQueue(fromQueue, msg.Handle) + if err != nil { + _ = os.RemoveAll(d) + return fmt.Errorf("Error deleting message from queue: %s", err) + } + + err = os.RemoveAll(d) + if err != nil { + return fmt.Errorf("Failed to remove directory %s: %s", d, err) + } + + return nil +} + +func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { + dl := make(chan string) + msgc := make(chan bookpipeline.Qmsg) + processc := make(chan string) + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) + + msgparts := strings.Split(msg.Body, " ") + bookname := msgparts[0] + + var training string + if len(msgparts) > 1 { + training = msgparts[1] + } + + d := filepath.Join(os.TempDir(), bookname) + err := os.MkdirAll(d, 0755) + if err != nil { + return fmt.Errorf("Failed to create directory %s: %s", d, err) + } + + t := time.NewTicker(HeartbeatSeconds * time.Second) + go heartbeat(conn, t, msg, fromQueue, msgc, errc) + + // these functions will do their jobs when their channels have data + go download(dl, processc, conn, d, errc, conn.GetLogger()) + go process(processc, upc, errc, conn.GetLogger()) + if toQueue == conn.OCRPageQueueId() { + go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) + } else { + go up(upc, done, conn, bookname, errc, conn.GetLogger()) + } + + conn.Log("Getting list of objects to download") + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err) + } + var todl []string + for _, n := range objs { + if !match.MatchString(n) { + conn.Log("Skipping item that doesn't match target", n) + continue + } + todl = append(todl, n) + } + for _, a := range todl { + dl <- a + } + close(dl) + + // wait for either the done or errc channel to be sent to + select { + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + // if the error is in preprocessing / wipeonly, chances are that it will never + // complete, and will fill the ocrpage queue with parts which succeeded + // on each run, so in that case it's better to delete the message from + // the queue and notify us. + if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { + conn.Log("Deleting message from queue due to a bad error", fromQueue) + err2 := conn.DelFromQueue(fromQueue, msg.Handle) + if err2 != nil { + conn.Log("Error deleting message from queue", err2) + } + ms, err2 := GetMailSettings() + if err2 != nil { + conn.Log("Failed to mail settings ", err2) + } + if err2 == nil && ms.server != "" { + logs, err2 := getLogs() + if err2 != nil { + conn.Log("Failed to get logs ", err2) + logs = "" + } + msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" + + "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" + + " Fail message: %s\r\nFull log:\r\n%s\r\n", + ms.to, ms.from, bookname, err, logs) + host := fmt.Sprintf("%s:%s", ms.server, ms.port) + auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server) + err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg)) + if err2 != nil { + conn.Log("Error sending email ", err2) + } + } + } + return err + case <-done: + } + + if toQueue != "" && toQueue != conn.OCRPageQueueId() { + conn.Log("Sending", bookname, "to queue", toQueue) + err = conn.AddToQueue(toQueue, bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return fmt.Errorf("Error adding to queue %s: %s", bookname, err) + } + } + + t.Stop() + + // check whether we're using a newer msg handle + select { + case m, ok := <-msgc: + if ok { + msg = m + conn.Log("Using new message handle to delete message from queue") + } + default: + conn.Log("Using original message handle to delete message from queue") + } + + conn.Log("Deleting original message from queue", fromQueue) + err = conn.DelFromQueue(fromQueue, msg.Handle) + if err != nil { + _ = os.RemoveAll(d) + return fmt.Errorf("Error deleting message from queue: %s", err) + } + + err = os.RemoveAll(d) + if err != nil { + return fmt.Errorf("Failed to remove directory %s: %s", d, err) + } + + return nil +} + +// TODO: rather than relying on journald, would be nicer to save the logs +// ourselves maybe, so that we weren't relying on a particular systemd +// setup. this can be done by having the conn.Log also append line +// to a file (though that would mean everything would have to go through +// conn.Log, which we're not consistently doing yet). the correct thing +// to do then would be to implement a new interface that covers the part +// of log.Logger we use (e.g. Print and Printf), and then have an exported +// conn struct that implements those, so that we could pass a log.Logger +// or the new conn struct everywhere (we wouldn't be passing a log.Logger, +// it's just good to be able to keep the compatibility) +func getLogs() (string, error) { + cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all") + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + return stdout.String(), err +} + +func SaveLogs(conn Pipeliner, starttime int64, hostname string) error { + logs, err := getLogs() + if err != nil { + return fmt.Errorf("Error getting logs, error: %v", err) + } + key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname) + path := filepath.Join(os.TempDir(), key) + f, err := os.Create(path) + if err != nil { + return fmt.Errorf("Error creating log file", err) + } + defer f.Close() + _, err = f.WriteString(logs) + if err != nil { + return fmt.Errorf("Error saving log file", err) + } + _ = f.Close() + err = conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + return fmt.Errorf("Error uploading log", err) + } + conn.Log("Log saved to", key) + return nil +} -- cgit v1.2.1-24-ge1ad From fc6becf5ed98e9c0815532fd76639c15eb481ed1 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 17:33:52 +0000 Subject: [rescribe] work in progress at a self-contained local pipeline processor, called rescribe --- cmd/rescribe/main.go | 247 ++++++++++++++ internal/pipeline/main.go | 725 ----------------------------------------- internal/pipeline/pipeline.go | 729 ++++++++++++++++++++++++++++++++++++++++++ internal/pipeline/put.go | 85 +++++ 4 files changed, 1061 insertions(+), 725 deletions(-) create mode 100644 cmd/rescribe/main.go delete mode 100644 internal/pipeline/main.go create mode 100644 internal/pipeline/pipeline.go create mode 100644 internal/pipeline/put.go diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go new file mode 100644 index 0000000..e3781cb --- /dev/null +++ b/cmd/rescribe/main.go @@ -0,0 +1,247 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// rescribe is a modification of bookpipeline designed for local-only +// operation, which rolls uploading, processing, and downloading of +// a single book by the pipeline into one command. +package main + +import ( + "flag" + "fmt" + "log" + "os" + "path/filepath" + "regexp" + "time" + + "rescribe.xyz/bookpipeline" + + "rescribe.xyz/bookpipeline/internal/pipeline" +) + +const usage = `Usage: rescribe [-v] [-t training] bookdir + +Process and OCR a book using the Rescribe pipeline on a local machine. +` + +const QueueTimeoutSecs = 2 * 60 +const PauseBetweenChecks = 1 * time.Second +const LogSaveTime = 1 * time.Minute + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} + +type Clouder interface { + Init() error + ListObjects(bucket string, prefix string) ([]string, error) + Download(bucket string, key string, fn string) error + Upload(bucket string, key string, path string) error + CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) + AddToQueue(url string, msg string) error + DelFromQueue(url string, handle string) error + QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) +} + +type Pipeliner interface { + Clouder + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string + WIPStorageId() string + GetLogger() *log.Logger + Log(v ...interface{}) +} + +func stopTimer(t *time.Timer) { + if !t.Stop() { + <-t.C + } +} + +func resetTimer(t *time.Timer, d time.Duration) { + if d > 0 { + t.Reset(d) + } +} + +func main() { + verbose := flag.Bool("v", false, "verbose") + training := flag.String("t", "rescribealphav5", "default tesseract training file to use (without the .traineddata part)") + + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + if flag.NArg() < 1 || flag.NArg() > 3 { + flag.Usage() + return + } + + bookdir := flag.Arg(0) + var bookname string + if flag.NArg() > 2 { + bookname = flag.Arg(1) + } else { + bookname = filepath.Base(bookdir) + } + + var verboselog *log.Logger + if *verbose { + verboselog = log.New(os.Stdout, "", 0) + } else { + var n NullWriter + verboselog = log.New(n, "", 0) + } + + var conn Pipeliner + // TODO: set tmpdir to a specific random thing for this run only + conn = &bookpipeline.LocalConn{Logger: verboselog} + + conn.Log("Setting up session") + err := conn.Init() + if err != nil { + log.Fatalln("Error setting up connection:", err) + } + conn.Log("Finished setting up session") + + uploadbook(bookdir, bookname, *training, conn) + + processbook(*training, conn) + + // TODO: save book +} + +func uploadbook(dir string, name string, training string, conn Pipeliner) error { + err := pipeline.CheckImages(dir) + if err != nil { + return fmt.Errorf("Error with images in %s: %v", dir, err) + } + err = pipeline.UploadImages(dir, name, conn) + if err != nil { + return fmt.Errorf("Error saving images to process from %s: %v", dir, err) + } + + qid := pipeline.DetectQueueType(dir, conn) + if training != "" { + name = name + " " + training + } + err = conn.AddToQueue(qid, name) + if err != nil { + return fmt.Errorf("Error adding book job to queue %s: %v", qid, err) + } + + return nil +} + + +func processbook(training string, conn Pipeliner) { + origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) + wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`) + ocredPattern := regexp.MustCompile(`.hocr$`) + + var checkPreQueue <-chan time.Time + var checkWipeQueue <-chan time.Time + var checkOCRPageQueue <-chan time.Time + var checkAnalyseQueue <-chan time.Time + var stopIfQuiet *time.Timer + checkPreQueue = time.After(0) + checkWipeQueue = time.After(0) + checkOCRPageQueue = time.After(0) + checkAnalyseQueue = time.After(0) + var quietTime = 1 * time.Second + stopIfQuiet = time.NewTimer(quietTime) + if quietTime == 0 { + stopIfQuiet.Stop() + } + + for { + select { + case <-checkPreQueue: + msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs) + checkPreQueue = time.After(PauseBetweenChecks) + if err != nil { + conn.Log("Error checking preprocess queue", err) + continue + } + if msg.Handle == "" { + conn.Log("No message received on preprocess queue, sleeping") + continue + } + conn.Log("Message received on preprocess queue, processing", msg.Body) + stopTimer(stopIfQuiet) + err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + resetTimer(stopIfQuiet, quietTime) + if err != nil { + conn.Log("Error during preprocess", err) + } + case <-checkWipeQueue: + msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) + checkWipeQueue = time.After(PauseBetweenChecks) + if err != nil { + conn.Log("Error checking wipeonly queue", err) + continue + } + if msg.Handle == "" { + conn.Log("No message received on wipeonly queue, sleeping") + continue + } + stopTimer(stopIfQuiet) + conn.Log("Message received on wipeonly queue, processing", msg.Body) + err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) + resetTimer(stopIfQuiet, quietTime) + if err != nil { + conn.Log("Error during wipe", err) + } + case <-checkOCRPageQueue: + msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs) + checkOCRPageQueue = time.After(PauseBetweenChecks) + if err != nil { + conn.Log("Error checking OCR Page queue", err) + continue + } + if msg.Handle == "" { + continue + } + // Have OCRPageQueue checked immediately after completion, as chances are high that + // there will be more pages that should be done without delay + checkOCRPageQueue = time.After(0) + stopTimer(stopIfQuiet) + conn.Log("Message received on OCR Page queue, processing", msg.Body) + err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + resetTimer(stopIfQuiet, quietTime) + if err != nil { + conn.Log("Error during OCR Page process", err) + } + case <-checkAnalyseQueue: + msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs) + checkAnalyseQueue = time.After(PauseBetweenChecks) + if err != nil { + conn.Log("Error checking analyse queue", err) + continue + } + if msg.Handle == "" { + conn.Log("No message received on analyse queue, sleeping") + continue + } + stopTimer(stopIfQuiet) + conn.Log("Message received on analyse queue, processing", msg.Body) + err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") + resetTimer(stopIfQuiet, quietTime) + if err != nil { + conn.Log("Error during analysis", err) + } + case <-stopIfQuiet.C: + conn.Log("Processing finished") + return + } + } +} diff --git a/internal/pipeline/main.go b/internal/pipeline/main.go deleted file mode 100644 index 6e03c50..0000000 --- a/internal/pipeline/main.go +++ /dev/null @@ -1,725 +0,0 @@ -// pipeline is a package used by the bookpipeline command, which -// handles the core functionality, using channels heavily to -// coordinate jobs. Note that it is considered an "internal" package, -// not intended for external use, and no guarantee is made of the -// stability of any interfaces provided. -package pipeline - -import ( - "bytes" - "fmt" - "io/ioutil" - "log" - "net/smtp" - "os" - "os/exec" - "path/filepath" - "regexp" - "sort" - "strings" - "time" - - "rescribe.xyz/bookpipeline" - "rescribe.xyz/preproc" - "rescribe.xyz/utils/pkg/hocr" -) - -const HeartbeatSeconds = 60 - -type Clouder interface { - Init() error - ListObjects(bucket string, prefix string) ([]string, error) - Download(bucket string, key string, fn string) error - Upload(bucket string, key string, path string) error - CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) - AddToQueue(url string, msg string) error - DelFromQueue(url string, handle string) error - QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) -} - -type Pipeliner interface { - Clouder - PreQueueId() string - WipeQueueId() string - OCRPageQueueId() string - AnalyseQueueId() string - WIPStorageId() string - GetLogger() *log.Logger - Log(v ...interface{}) -} - -type pageimg struct { - hocr, img string -} - -type mailSettings struct { - server, port, user, pass, from, to string -} - -func GetMailSettings() (mailSettings, error) { - p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings") - b, err := ioutil.ReadFile(p) - if err != nil { - return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err) - } - f := strings.Fields(string(b)) - if len(f) != 6 { - return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f)) - } - return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil -} - -func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { - for key := range dl { - fn := filepath.Join(dir, filepath.Base(key)) - logger.Println("Downloading", key) - err := conn.Download(conn.WIPStorageId(), key, fn) - if err != nil { - for range dl { - } // consume the rest of the receiving channel so it isn't blocked - close(process) - errc <- err - return - } - process <- fn - } - close(process) -} - -func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { - for path := range c { - name := filepath.Base(path) - key := bookname + "/" + name - logger.Println("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - err = os.Remove(path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - } - - done <- true -} - -func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { - for path := range c { - name := filepath.Base(path) - key := bookname + "/" + name - logger.Println("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - err = os.Remove(path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - logger.Println("Adding", key, training, "to queue", toQueue) - err = conn.AddToQueue(toQueue, key+" "+training) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - } - - done <- true -} - -func Preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range pre { - logger.Println("Preprocessing", path) - done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30) - if err != nil { - for range pre { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - _ = os.Remove(path) - for _, p := range done { - up <- p - } - } - close(up) -} - -func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range towipe { - logger.Println("Wiping", path) - s := strings.Split(path, ".") - base := strings.Join(s[:len(s)-1], "") - outpath := base + "_bin0.0.png" - err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30) - if err != nil { - for range towipe { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - up <- outpath - } - close(up) -} - -func Ocr(training string) func(chan string, chan string, chan error, *log.Logger) { - return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range toocr { - logger.Println("OCRing", path) - name := strings.Replace(path, ".png", "", 1) - cmd := exec.Command("tesseract", "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - err := cmd.Run() - if err != nil { - for range toocr { - } // consume the rest of the receiving channel so it isn't blocked - errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String()) - return - } - up <- name + ".hocr" - } - close(up) - } -} - -func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { - return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { - confs := make(map[string][]*bookpipeline.Conf) - bestconfs := make(map[string]*bookpipeline.Conf) - savedir := "" - - for path := range toanalyse { - if savedir == "" { - savedir = filepath.Dir(path) - } - logger.Println("Calculating confidence for", path) - avg, err := hocr.GetAvgConf(path) - if err != nil && err.Error() == "No words found" { - continue - } - if err != nil { - for range toanalyse { - } // consume the rest of the receiving channel so it isn't blocked - errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err) - return - } - base := filepath.Base(path) - codestart := strings.Index(base, "_bin") - name := base[0:codestart] - var c bookpipeline.Conf - c.Path = path - c.Code = base[codestart:] - c.Conf = avg - confs[name] = append(confs[name], &c) - } - - fn := filepath.Join(savedir, "conf") - logger.Println("Saving confidences in file", fn) - f, err := os.Create(fn) - if err != nil { - errc <- fmt.Errorf("Error creating file %s: %s", fn, err) - return - } - defer f.Close() - - logger.Println("Finding best confidence for each page, and saving all confidences") - for base, conf := range confs { - var best float64 - for _, c := range conf { - if c.Conf > best { - best = c.Conf - bestconfs[base] = c - } - _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) - if err != nil { - errc <- fmt.Errorf("Error writing confidences file: %s", err) - return - } - } - } - up <- fn - - logger.Println("Creating best file listing the best file for each page") - fn = filepath.Join(savedir, "best") - f, err = os.Create(fn) - if err != nil { - errc <- fmt.Errorf("Error creating file %s: %s", fn, err) - return - } - defer f.Close() - for _, conf := range bestconfs { - _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) - } - up <- fn - - var pgs []string - for _, conf := range bestconfs { - pgs = append(pgs, conf.Path) - } - sort.Strings(pgs) - - logger.Println("Downloading binarised and original images to create PDFs") - bookname, err := filepath.Rel(os.TempDir(), savedir) - if err != nil { - errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err) - return - } - colourpdf := new(bookpipeline.Fpdf) - err = colourpdf.Setup() - if err != nil { - errc <- fmt.Errorf("Failed to set up PDF: %s", err) - return - } - binarisedpdf := new(bookpipeline.Fpdf) - err = binarisedpdf.Setup() - if err != nil { - errc <- fmt.Errorf("Failed to set up PDF: %s", err) - return - } - binhascontent, colourhascontent := false, false - - var colourimgs, binimgs []pageimg - - for _, pg := range pgs { - base := filepath.Base(pg) - nosuffix := strings.TrimSuffix(base, ".hocr") - p := strings.SplitN(base, "_bin", 2) - - var fn string - if len(p) > 1 { - fn = p[0] + ".jpg" - } else { - fn = nosuffix + ".jpg" - } - - binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"}) - colourimgs = append(colourimgs, pageimg{hocr: base, img: fn}) - } - - for _, pg := range binimgs { - logger.Println("Downloading binarised page to add to PDF", pg.img) - err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img)) - if err != nil { - logger.Println("Download failed; skipping page", pg.img) - } else { - err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true) - if err != nil { - errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) - return - } - binhascontent = true - err = os.Remove(filepath.Join(savedir, pg.img)) - if err != nil { - errc <- err - return - } - } - } - - if binhascontent { - fn = filepath.Join(savedir, bookname+".binarised.pdf") - err = binarisedpdf.Save(fn) - if err != nil { - errc <- fmt.Errorf("Failed to save binarised pdf: %s", err) - return - } - up <- fn - key := bookname + "/" + bookname + ".binarised.pdf" - conn.Log("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, fn) - if err != nil { - } - } - - for _, pg := range colourimgs { - logger.Println("Downloading colour page to add to PDF", pg.img) - colourfn := pg.img - err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) - if err != nil { - colourfn = strings.Replace(pg.img, ".jpg", ".png", 1) - logger.Println("Download failed; trying", colourfn) - err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) - if err != nil { - logger.Println("Download failed; skipping page", pg.img) - } - } - if err == nil { - err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true) - if err != nil { - errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) - return - } - colourhascontent = true - err = os.Remove(filepath.Join(savedir, colourfn)) - if err != nil { - errc <- err - return - } - } - } - if colourhascontent { - fn = filepath.Join(savedir, bookname+".colour.pdf") - err = colourpdf.Save(fn) - if err != nil { - errc <- fmt.Errorf("Failed to save colour pdf: %s", err) - return - } - up <- fn - } - - logger.Println("Creating graph") - fn = filepath.Join(savedir, "graph.png") - f, err = os.Create(fn) - if err != nil { - errc <- fmt.Errorf("Error creating file %s: %s", fn, err) - return - } - defer f.Close() - err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) - if err != nil && err.Error() != "Not enough valid confidences" { - errc <- fmt.Errorf("Error rendering graph: %s", err) - return - } - up <- fn - - close(up) - } -} - -func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { - currentmsg := msg - for range t.C { - m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) - if err != nil { - // This is for better debugging of the heartbeat issue - conn.Log("Error with heartbeat", err) - os.Exit(1) - // TODO: would be better to ensure this error stops any running - // processes, as they will ultimately fail in the case of - // it. could do this by setting a global variable that - // processes check each time they loop. - errc <- err - t.Stop() - return - } - if m.Id != "" { - conn.Log("Replaced message handle as visibilitytimeout limit was reached") - currentmsg = m - // TODO: maybe handle communicating new msg more gracefully than this - for range msgc { - } // throw away any old msgc - msgc <- m - } - } -} - -// allOCRed checks whether all pages of a book have been OCRed. -// This is determined by whether every _bin0.?.png file has a -// corresponding .hocr file. -func allOCRed(bookname string, conn Pipeliner) bool { - objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) - if err != nil { - return false - } - - preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) - - atleastone := false - for _, png := range objs { - if preprocessedPattern.MatchString(png) { - atleastone = true - found := false - b := strings.TrimSuffix(filepath.Base(png), ".png") - hocrname := bookname + "/" + b + ".hocr" - for _, hocr := range objs { - if hocr == hocrname { - found = true - break - } - } - if found == false { - return false - } - } - } - if atleastone == false { - return false - } - return true -} - -// OcrPage OCRs a page based on a message. It may make sense to -// roll this back into processBook (on which it is based) once -// working well. -func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { - dl := make(chan string) - msgc := make(chan bookpipeline.Qmsg) - processc := make(chan string) - upc := make(chan string) - done := make(chan bool) - errc := make(chan error) - - msgparts := strings.Split(msg.Body, " ") - bookname := filepath.Dir(msgparts[0]) - if len(msgparts) > 1 && msgparts[1] != "" { - process = Ocr(msgparts[1]) - } - - d := filepath.Join(os.TempDir(), bookname) - err := os.MkdirAll(d, 0755) - if err != nil { - return fmt.Errorf("Failed to create directory %s: %s", d, err) - } - - t := time.NewTicker(HeartbeatSeconds * time.Second) - go heartbeat(conn, t, msg, fromQueue, msgc, errc) - - // these functions will do their jobs when their channels have data - go download(dl, processc, conn, d, errc, conn.GetLogger()) - go process(processc, upc, errc, conn.GetLogger()) - go up(upc, done, conn, bookname, errc, conn.GetLogger()) - - dl <- msgparts[0] - close(dl) - - // wait for either the done or errc channel to be sent to - select { - case err = <-errc: - t.Stop() - _ = os.RemoveAll(d) - return err - case <-done: - } - - if allOCRed(bookname, conn) && toQueue != "" { - conn.Log("Sending", bookname, "to queue", toQueue) - err = conn.AddToQueue(toQueue, bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return fmt.Errorf("Error adding to queue %s: %s", bookname, err) - } - } - - t.Stop() - - // check whether we're using a newer msg handle - select { - case m, ok := <-msgc: - if ok { - msg = m - conn.Log("Using new message handle to delete message from queue") - } - default: - conn.Log("Using original message handle to delete message from queue") - } - - conn.Log("Deleting original message from queue", fromQueue) - err = conn.DelFromQueue(fromQueue, msg.Handle) - if err != nil { - _ = os.RemoveAll(d) - return fmt.Errorf("Error deleting message from queue: %s", err) - } - - err = os.RemoveAll(d) - if err != nil { - return fmt.Errorf("Failed to remove directory %s: %s", d, err) - } - - return nil -} - -func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { - dl := make(chan string) - msgc := make(chan bookpipeline.Qmsg) - processc := make(chan string) - upc := make(chan string) - done := make(chan bool) - errc := make(chan error) - - msgparts := strings.Split(msg.Body, " ") - bookname := msgparts[0] - - var training string - if len(msgparts) > 1 { - training = msgparts[1] - } - - d := filepath.Join(os.TempDir(), bookname) - err := os.MkdirAll(d, 0755) - if err != nil { - return fmt.Errorf("Failed to create directory %s: %s", d, err) - } - - t := time.NewTicker(HeartbeatSeconds * time.Second) - go heartbeat(conn, t, msg, fromQueue, msgc, errc) - - // these functions will do their jobs when their channels have data - go download(dl, processc, conn, d, errc, conn.GetLogger()) - go process(processc, upc, errc, conn.GetLogger()) - if toQueue == conn.OCRPageQueueId() { - go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) - } else { - go up(upc, done, conn, bookname, errc, conn.GetLogger()) - } - - conn.Log("Getting list of objects to download") - objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err) - } - var todl []string - for _, n := range objs { - if !match.MatchString(n) { - conn.Log("Skipping item that doesn't match target", n) - continue - } - todl = append(todl, n) - } - for _, a := range todl { - dl <- a - } - close(dl) - - // wait for either the done or errc channel to be sent to - select { - case err = <-errc: - t.Stop() - _ = os.RemoveAll(d) - // if the error is in preprocessing / wipeonly, chances are that it will never - // complete, and will fill the ocrpage queue with parts which succeeded - // on each run, so in that case it's better to delete the message from - // the queue and notify us. - if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { - conn.Log("Deleting message from queue due to a bad error", fromQueue) - err2 := conn.DelFromQueue(fromQueue, msg.Handle) - if err2 != nil { - conn.Log("Error deleting message from queue", err2) - } - ms, err2 := GetMailSettings() - if err2 != nil { - conn.Log("Failed to mail settings ", err2) - } - if err2 == nil && ms.server != "" { - logs, err2 := getLogs() - if err2 != nil { - conn.Log("Failed to get logs ", err2) - logs = "" - } - msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" + - "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" + - " Fail message: %s\r\nFull log:\r\n%s\r\n", - ms.to, ms.from, bookname, err, logs) - host := fmt.Sprintf("%s:%s", ms.server, ms.port) - auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server) - err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg)) - if err2 != nil { - conn.Log("Error sending email ", err2) - } - } - } - return err - case <-done: - } - - if toQueue != "" && toQueue != conn.OCRPageQueueId() { - conn.Log("Sending", bookname, "to queue", toQueue) - err = conn.AddToQueue(toQueue, bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return fmt.Errorf("Error adding to queue %s: %s", bookname, err) - } - } - - t.Stop() - - // check whether we're using a newer msg handle - select { - case m, ok := <-msgc: - if ok { - msg = m - conn.Log("Using new message handle to delete message from queue") - } - default: - conn.Log("Using original message handle to delete message from queue") - } - - conn.Log("Deleting original message from queue", fromQueue) - err = conn.DelFromQueue(fromQueue, msg.Handle) - if err != nil { - _ = os.RemoveAll(d) - return fmt.Errorf("Error deleting message from queue: %s", err) - } - - err = os.RemoveAll(d) - if err != nil { - return fmt.Errorf("Failed to remove directory %s: %s", d, err) - } - - return nil -} - -// TODO: rather than relying on journald, would be nicer to save the logs -// ourselves maybe, so that we weren't relying on a particular systemd -// setup. this can be done by having the conn.Log also append line -// to a file (though that would mean everything would have to go through -// conn.Log, which we're not consistently doing yet). the correct thing -// to do then would be to implement a new interface that covers the part -// of log.Logger we use (e.g. Print and Printf), and then have an exported -// conn struct that implements those, so that we could pass a log.Logger -// or the new conn struct everywhere (we wouldn't be passing a log.Logger, -// it's just good to be able to keep the compatibility) -func getLogs() (string, error) { - cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all") - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - err := cmd.Run() - return stdout.String(), err -} - -func SaveLogs(conn Pipeliner, starttime int64, hostname string) error { - logs, err := getLogs() - if err != nil { - return fmt.Errorf("Error getting logs, error: %v", err) - } - key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname) - path := filepath.Join(os.TempDir(), key) - f, err := os.Create(path) - if err != nil { - return fmt.Errorf("Error creating log file", err) - } - defer f.Close() - _, err = f.WriteString(logs) - if err != nil { - return fmt.Errorf("Error saving log file", err) - } - _ = f.Close() - err = conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - return fmt.Errorf("Error uploading log", err) - } - conn.Log("Log saved to", key) - return nil -} diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go new file mode 100644 index 0000000..b1c3cb9 --- /dev/null +++ b/internal/pipeline/pipeline.go @@ -0,0 +1,729 @@ +// Copyright 2020 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// pipeline is a package used by the bookpipeline command, which +// handles the core functionality, using channels heavily to +// coordinate jobs. Note that it is considered an "internal" package, +// not intended for external use, and no guarantee is made of the +// stability of any interfaces provided. +package pipeline + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "net/smtp" + "os" + "os/exec" + "path/filepath" + "regexp" + "sort" + "strings" + "time" + + "rescribe.xyz/bookpipeline" + "rescribe.xyz/preproc" + "rescribe.xyz/utils/pkg/hocr" +) + +const HeartbeatSeconds = 60 + +type Clouder interface { + Init() error + ListObjects(bucket string, prefix string) ([]string, error) + Download(bucket string, key string, fn string) error + Upload(bucket string, key string, path string) error + CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) + AddToQueue(url string, msg string) error + DelFromQueue(url string, handle string) error + QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) +} + +type Pipeliner interface { + Clouder + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string + WIPStorageId() string + GetLogger() *log.Logger + Log(v ...interface{}) +} + +type pageimg struct { + hocr, img string +} + +type mailSettings struct { + server, port, user, pass, from, to string +} + +func GetMailSettings() (mailSettings, error) { + p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings") + b, err := ioutil.ReadFile(p) + if err != nil { + return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err) + } + f := strings.Fields(string(b)) + if len(f) != 6 { + return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f)) + } + return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil +} + +func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { + for key := range dl { + fn := filepath.Join(dir, filepath.Base(key)) + logger.Println("Downloading", key) + err := conn.Download(conn.WIPStorageId(), key, fn) + if err != nil { + for range dl { + } // consume the rest of the receiving channel so it isn't blocked + close(process) + errc <- err + return + } + process <- fn + } + close(process) +} + +func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { + for path := range c { + name := filepath.Base(path) + key := bookname + "/" + name + logger.Println("Uploading", key) + err := conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + err = os.Remove(path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + } + + done <- true +} + +func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { + for path := range c { + name := filepath.Base(path) + key := bookname + "/" + name + logger.Println("Uploading", key) + err := conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + err = os.Remove(path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + logger.Println("Adding", key, training, "to queue", toQueue) + err = conn.AddToQueue(toQueue, key+" "+training) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + } + + done <- true +} + +func Preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range pre { + logger.Println("Preprocessing", path) + done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30) + if err != nil { + for range pre { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + _ = os.Remove(path) + for _, p := range done { + up <- p + } + } + close(up) +} + +func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range towipe { + logger.Println("Wiping", path) + s := strings.Split(path, ".") + base := strings.Join(s[:len(s)-1], "") + outpath := base + "_bin0.0.png" + err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30) + if err != nil { + for range towipe { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + up <- outpath + } + close(up) +} + +func Ocr(training string) func(chan string, chan string, chan error, *log.Logger) { + return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range toocr { + logger.Println("OCRing", path) + name := strings.Replace(path, ".png", "", 1) + cmd := exec.Command("tesseract", "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + if err != nil { + for range toocr { + } // consume the rest of the receiving channel so it isn't blocked + errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String()) + return + } + up <- name + ".hocr" + } + close(up) + } +} + +func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { + return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { + confs := make(map[string][]*bookpipeline.Conf) + bestconfs := make(map[string]*bookpipeline.Conf) + savedir := "" + + for path := range toanalyse { + if savedir == "" { + savedir = filepath.Dir(path) + } + logger.Println("Calculating confidence for", path) + avg, err := hocr.GetAvgConf(path) + if err != nil && err.Error() == "No words found" { + continue + } + if err != nil { + for range toanalyse { + } // consume the rest of the receiving channel so it isn't blocked + errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err) + return + } + base := filepath.Base(path) + codestart := strings.Index(base, "_bin") + name := base[0:codestart] + var c bookpipeline.Conf + c.Path = path + c.Code = base[codestart:] + c.Conf = avg + confs[name] = append(confs[name], &c) + } + + fn := filepath.Join(savedir, "conf") + logger.Println("Saving confidences in file", fn) + f, err := os.Create(fn) + if err != nil { + errc <- fmt.Errorf("Error creating file %s: %s", fn, err) + return + } + defer f.Close() + + logger.Println("Finding best confidence for each page, and saving all confidences") + for base, conf := range confs { + var best float64 + for _, c := range conf { + if c.Conf > best { + best = c.Conf + bestconfs[base] = c + } + _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) + if err != nil { + errc <- fmt.Errorf("Error writing confidences file: %s", err) + return + } + } + } + up <- fn + + logger.Println("Creating best file listing the best file for each page") + fn = filepath.Join(savedir, "best") + f, err = os.Create(fn) + if err != nil { + errc <- fmt.Errorf("Error creating file %s: %s", fn, err) + return + } + defer f.Close() + for _, conf := range bestconfs { + _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) + } + up <- fn + + var pgs []string + for _, conf := range bestconfs { + pgs = append(pgs, conf.Path) + } + sort.Strings(pgs) + + logger.Println("Downloading binarised and original images to create PDFs") + bookname, err := filepath.Rel(os.TempDir(), savedir) + if err != nil { + errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err) + return + } + colourpdf := new(bookpipeline.Fpdf) + err = colourpdf.Setup() + if err != nil { + errc <- fmt.Errorf("Failed to set up PDF: %s", err) + return + } + binarisedpdf := new(bookpipeline.Fpdf) + err = binarisedpdf.Setup() + if err != nil { + errc <- fmt.Errorf("Failed to set up PDF: %s", err) + return + } + binhascontent, colourhascontent := false, false + + var colourimgs, binimgs []pageimg + + for _, pg := range pgs { + base := filepath.Base(pg) + nosuffix := strings.TrimSuffix(base, ".hocr") + p := strings.SplitN(base, "_bin", 2) + + var fn string + if len(p) > 1 { + fn = p[0] + ".jpg" + } else { + fn = nosuffix + ".jpg" + } + + binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"}) + colourimgs = append(colourimgs, pageimg{hocr: base, img: fn}) + } + + for _, pg := range binimgs { + logger.Println("Downloading binarised page to add to PDF", pg.img) + err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img)) + if err != nil { + logger.Println("Download failed; skipping page", pg.img) + } else { + err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true) + if err != nil { + errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) + return + } + binhascontent = true + err = os.Remove(filepath.Join(savedir, pg.img)) + if err != nil { + errc <- err + return + } + } + } + + if binhascontent { + fn = filepath.Join(savedir, bookname+".binarised.pdf") + err = binarisedpdf.Save(fn) + if err != nil { + errc <- fmt.Errorf("Failed to save binarised pdf: %s", err) + return + } + up <- fn + key := bookname + "/" + bookname + ".binarised.pdf" + conn.Log("Uploading", key) + err := conn.Upload(conn.WIPStorageId(), key, fn) + if err != nil { + } + } + + for _, pg := range colourimgs { + logger.Println("Downloading colour page to add to PDF", pg.img) + colourfn := pg.img + err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) + if err != nil { + colourfn = strings.Replace(pg.img, ".jpg", ".png", 1) + logger.Println("Download failed; trying", colourfn) + err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) + if err != nil { + logger.Println("Download failed; skipping page", pg.img) + } + } + if err == nil { + err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true) + if err != nil { + errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) + return + } + colourhascontent = true + err = os.Remove(filepath.Join(savedir, colourfn)) + if err != nil { + errc <- err + return + } + } + } + if colourhascontent { + fn = filepath.Join(savedir, bookname+".colour.pdf") + err = colourpdf.Save(fn) + if err != nil { + errc <- fmt.Errorf("Failed to save colour pdf: %s", err) + return + } + up <- fn + } + + logger.Println("Creating graph") + fn = filepath.Join(savedir, "graph.png") + f, err = os.Create(fn) + if err != nil { + errc <- fmt.Errorf("Error creating file %s: %s", fn, err) + return + } + defer f.Close() + err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) + if err != nil && err.Error() != "Not enough valid confidences" { + errc <- fmt.Errorf("Error rendering graph: %s", err) + return + } + up <- fn + + close(up) + } +} + +func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { + currentmsg := msg + for range t.C { + m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) + if err != nil { + // This is for better debugging of the heartbeat issue + conn.Log("Error with heartbeat", err) + os.Exit(1) + // TODO: would be better to ensure this error stops any running + // processes, as they will ultimately fail in the case of + // it. could do this by setting a global variable that + // processes check each time they loop. + errc <- err + t.Stop() + return + } + if m.Id != "" { + conn.Log("Replaced message handle as visibilitytimeout limit was reached") + currentmsg = m + // TODO: maybe handle communicating new msg more gracefully than this + for range msgc { + } // throw away any old msgc + msgc <- m + } + } +} + +// allOCRed checks whether all pages of a book have been OCRed. +// This is determined by whether every _bin0.?.png file has a +// corresponding .hocr file. +func allOCRed(bookname string, conn Pipeliner) bool { + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + return false + } + + preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) + + atleastone := false + for _, png := range objs { + if preprocessedPattern.MatchString(png) { + atleastone = true + found := false + b := strings.TrimSuffix(filepath.Base(png), ".png") + hocrname := bookname + "/" + b + ".hocr" + for _, hocr := range objs { + if hocr == hocrname { + found = true + break + } + } + if found == false { + return false + } + } + } + if atleastone == false { + return false + } + return true +} + +// OcrPage OCRs a page based on a message. It may make sense to +// roll this back into processBook (on which it is based) once +// working well. +func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { + dl := make(chan string) + msgc := make(chan bookpipeline.Qmsg) + processc := make(chan string) + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) + + msgparts := strings.Split(msg.Body, " ") + bookname := filepath.Dir(msgparts[0]) + if len(msgparts) > 1 && msgparts[1] != "" { + process = Ocr(msgparts[1]) + } + + d := filepath.Join(os.TempDir(), bookname) + err := os.MkdirAll(d, 0755) + if err != nil { + return fmt.Errorf("Failed to create directory %s: %s", d, err) + } + + t := time.NewTicker(HeartbeatSeconds * time.Second) + go heartbeat(conn, t, msg, fromQueue, msgc, errc) + + // these functions will do their jobs when their channels have data + go download(dl, processc, conn, d, errc, conn.GetLogger()) + go process(processc, upc, errc, conn.GetLogger()) + go up(upc, done, conn, bookname, errc, conn.GetLogger()) + + dl <- msgparts[0] + close(dl) + + // wait for either the done or errc channel to be sent to + select { + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + return err + case <-done: + } + + if allOCRed(bookname, conn) && toQueue != "" { + conn.Log("Sending", bookname, "to queue", toQueue) + err = conn.AddToQueue(toQueue, bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return fmt.Errorf("Error adding to queue %s: %s", bookname, err) + } + } + + t.Stop() + + // check whether we're using a newer msg handle + select { + case m, ok := <-msgc: + if ok { + msg = m + conn.Log("Using new message handle to delete message from queue") + } + default: + conn.Log("Using original message handle to delete message from queue") + } + + conn.Log("Deleting original message from queue", fromQueue) + err = conn.DelFromQueue(fromQueue, msg.Handle) + if err != nil { + _ = os.RemoveAll(d) + return fmt.Errorf("Error deleting message from queue: %s", err) + } + + err = os.RemoveAll(d) + if err != nil { + return fmt.Errorf("Failed to remove directory %s: %s", d, err) + } + + return nil +} + +func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { + dl := make(chan string) + msgc := make(chan bookpipeline.Qmsg) + processc := make(chan string) + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) + + msgparts := strings.Split(msg.Body, " ") + bookname := msgparts[0] + + var training string + if len(msgparts) > 1 { + training = msgparts[1] + } + + d := filepath.Join(os.TempDir(), bookname) + err := os.MkdirAll(d, 0755) + if err != nil { + return fmt.Errorf("Failed to create directory %s: %s", d, err) + } + + t := time.NewTicker(HeartbeatSeconds * time.Second) + go heartbeat(conn, t, msg, fromQueue, msgc, errc) + + // these functions will do their jobs when their channels have data + go download(dl, processc, conn, d, errc, conn.GetLogger()) + go process(processc, upc, errc, conn.GetLogger()) + if toQueue == conn.OCRPageQueueId() { + go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) + } else { + go up(upc, done, conn, bookname, errc, conn.GetLogger()) + } + + conn.Log("Getting list of objects to download") + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err) + } + var todl []string + for _, n := range objs { + if !match.MatchString(n) { + conn.Log("Skipping item that doesn't match target", n) + continue + } + todl = append(todl, n) + } + for _, a := range todl { + dl <- a + } + close(dl) + + // wait for either the done or errc channel to be sent to + select { + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + // if the error is in preprocessing / wipeonly, chances are that it will never + // complete, and will fill the ocrpage queue with parts which succeeded + // on each run, so in that case it's better to delete the message from + // the queue and notify us. + if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { + conn.Log("Deleting message from queue due to a bad error", fromQueue) + err2 := conn.DelFromQueue(fromQueue, msg.Handle) + if err2 != nil { + conn.Log("Error deleting message from queue", err2) + } + ms, err2 := GetMailSettings() + if err2 != nil { + conn.Log("Failed to mail settings ", err2) + } + if err2 == nil && ms.server != "" { + logs, err2 := getLogs() + if err2 != nil { + conn.Log("Failed to get logs ", err2) + logs = "" + } + msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" + + "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" + + " Fail message: %s\r\nFull log:\r\n%s\r\n", + ms.to, ms.from, bookname, err, logs) + host := fmt.Sprintf("%s:%s", ms.server, ms.port) + auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server) + err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg)) + if err2 != nil { + conn.Log("Error sending email ", err2) + } + } + } + return err + case <-done: + } + + if toQueue != "" && toQueue != conn.OCRPageQueueId() { + conn.Log("Sending", bookname, "to queue", toQueue) + err = conn.AddToQueue(toQueue, bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return fmt.Errorf("Error adding to queue %s: %s", bookname, err) + } + } + + t.Stop() + + // check whether we're using a newer msg handle + select { + case m, ok := <-msgc: + if ok { + msg = m + conn.Log("Using new message handle to delete message from queue") + } + default: + conn.Log("Using original message handle to delete message from queue") + } + + conn.Log("Deleting original message from queue", fromQueue) + err = conn.DelFromQueue(fromQueue, msg.Handle) + if err != nil { + _ = os.RemoveAll(d) + return fmt.Errorf("Error deleting message from queue: %s", err) + } + + err = os.RemoveAll(d) + if err != nil { + return fmt.Errorf("Failed to remove directory %s: %s", d, err) + } + + return nil +} + +// TODO: rather than relying on journald, would be nicer to save the logs +// ourselves maybe, so that we weren't relying on a particular systemd +// setup. this can be done by having the conn.Log also append line +// to a file (though that would mean everything would have to go through +// conn.Log, which we're not consistently doing yet). the correct thing +// to do then would be to implement a new interface that covers the part +// of log.Logger we use (e.g. Print and Printf), and then have an exported +// conn struct that implements those, so that we could pass a log.Logger +// or the new conn struct everywhere (we wouldn't be passing a log.Logger, +// it's just good to be able to keep the compatibility) +func getLogs() (string, error) { + cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all") + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + return stdout.String(), err +} + +func SaveLogs(conn Pipeliner, starttime int64, hostname string) error { + logs, err := getLogs() + if err != nil { + return fmt.Errorf("Error getting logs, error: %v", err) + } + key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname) + path := filepath.Join(os.TempDir(), key) + f, err := os.Create(path) + if err != nil { + return fmt.Errorf("Error creating log file", err) + } + defer f.Close() + _, err = f.WriteString(logs) + if err != nil { + return fmt.Errorf("Error saving log file", err) + } + _ = f.Close() + err = conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + return fmt.Errorf("Error uploading log", err) + } + conn.Log("Log saved to", key) + return nil +} diff --git a/internal/pipeline/put.go b/internal/pipeline/put.go new file mode 100644 index 0000000..8ada41f --- /dev/null +++ b/internal/pipeline/put.go @@ -0,0 +1,85 @@ +// Copyright 2020 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +package pipeline + +import ( + "fmt" + "image" + _ "image/png" + _ "image/jpeg" + "os" + "path/filepath" +) + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} + +type fileWalk chan string + +func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + f <- path + } + return nil +} + +func CheckImages(dir string) error { + checker := make(fileWalk) + go func() { + _ = filepath.Walk(dir, checker.Walk) + close(checker) + }() + + for path := range checker { + f, err := os.Open(path) + if err != nil { + return fmt.Errorf("Opening image %s failed: %v", path, err) + } + _, _, err = image.Decode(f) + if err != nil { + return fmt.Errorf("Decoding image %s failed: %v", path, err) + } + } + + return nil +} + +func DetectQueueType(dir string, conn Pipeliner) string { + // Auto detect type of queue to send to based on file extension + pngdirs, _ := filepath.Glob(dir + "/*.png") + jpgdirs, _ := filepath.Glob(dir + "/*.jpg") + pngcount := len(pngdirs) + jpgcount := len(jpgdirs) + if pngcount > jpgcount { + return conn.WipeQueueId() + } else { + return conn.PreQueueId() + } +} + +func UploadImages(dir string, bookname string, conn Pipeliner) error { + walker := make(fileWalk) + go func() { + _ = filepath.Walk(dir, walker.Walk) + close(walker) + }() + + for path := range walker { + name := filepath.Base(path) + err := conn.Upload(conn.WIPStorageId(), filepath.Join(bookname, name), path) + if err != nil { + return fmt.Errorf("Failed to upload %s: %v", path, err) + } + } + + return nil +} -- cgit v1.2.1-24-ge1ad From a1de8862a091f9584220db40671a0d43346c4519 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 18:29:56 +0000 Subject: [rescribe] Local only combo tool basically now working. Testing is still minimal. --- cmd/rescribe/main.go | 55 ++++++++++++++++++++++++++++++++++++++++----- internal/pipeline/get.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++++ local.go | 2 +- 3 files changed, 108 insertions(+), 7 deletions(-) create mode 100644 internal/pipeline/get.go diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index e3781cb..c309367 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -10,6 +10,7 @@ package main import ( "flag" "fmt" + "io/ioutil" "log" "os" "path/filepath" @@ -88,7 +89,7 @@ func main() { bookdir := flag.Arg(0) var bookname string - if flag.NArg() > 2 { + if flag.NArg() > 1 { bookname = flag.Arg(1) } else { bookname = filepath.Base(bookdir) @@ -102,22 +103,41 @@ func main() { verboselog = log.New(n, "", 0) } + tempdir, err := ioutil.TempDir("", "bookpipeline") + if err != nil { + log.Fatalln("Error setting up temporary directory:", err) + } + var conn Pipeliner - // TODO: set tmpdir to a specific random thing for this run only - conn = &bookpipeline.LocalConn{Logger: verboselog} + conn = &bookpipeline.LocalConn{Logger: verboselog, TempDir: tempdir} conn.Log("Setting up session") - err := conn.Init() + err = conn.Init() if err != nil { log.Fatalln("Error setting up connection:", err) } conn.Log("Finished setting up session") - uploadbook(bookdir, bookname, *training, conn) + fmt.Printf("Copying book to pipeline\n") + err = uploadbook(bookdir, bookname, *training, conn) + if err != nil { + log.Fatalln(err) + } + + fmt.Printf("Processing book (this may take some time)\n") processbook(*training, conn) - // TODO: save book + fmt.Printf("Saving finished book to %s\n", bookname) + err = downloadbook(bookname, conn) + if err != nil { + log.Fatalln(err) + } + + err = os.RemoveAll(tempdir) + if err != nil { + log.Fatalf("Error removing temporary directory %s: %v", tempdir, err) + } } func uploadbook(dir string, name string, training string, conn Pipeliner) error { @@ -142,6 +162,29 @@ func uploadbook(dir string, name string, training string, conn Pipeliner) error return nil } +func downloadbook(name string, conn Pipeliner) error { + err := os.MkdirAll(name, 0755) + if err != nil { + log.Fatalln("Failed to create directory", name, err) + } + + err = pipeline.DownloadBestPages(name, conn) + if err != nil { + return fmt.Errorf("Error downloading best pages: %v", err) + } + + err = pipeline.DownloadPdfs(name, conn) + if err != nil { + return fmt.Errorf("Error downloading PDFs: %v", err) + } + + err = pipeline.DownloadAnalyses(name, conn) + if err != nil { + return fmt.Errorf("Error downloading analyses: %v", err) + } + + return nil +} func processbook(training string, conn Pipeliner) { origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) diff --git a/internal/pipeline/get.go b/internal/pipeline/get.go new file mode 100644 index 0000000..8492d99 --- /dev/null +++ b/internal/pipeline/get.go @@ -0,0 +1,58 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +package pipeline + +import ( + "bufio" + "fmt" + "os" + "path/filepath" +) + +func DownloadBestPages(name string, conn Pipeliner) error { + fn := filepath.Join(name, "best") + err := conn.Download(conn.WIPStorageId(), fn, fn) + if err != nil { + return fmt.Errorf("Failed to download 'best' file: %v", err) + } + f, err := os.Open(fn) + if err != nil { + return fmt.Errorf("Failed to open best file: %v", err) + } + defer f.Close() + + s := bufio.NewScanner(f) + for s.Scan() { + fn = filepath.Join(name, s.Text()) + err = conn.Download(conn.WIPStorageId(), fn, fn) + if err != nil { + return fmt.Errorf("Failed to download file %s: %v", fn, err) + } + } + + return nil +} + +func DownloadPdfs(name string, conn Pipeliner) error { + for _, suffix := range []string{".colour.pdf", ".binarised.pdf"} { + fn := filepath.Join(name, name+suffix) + err := conn.Download(conn.WIPStorageId(), fn, fn) + if err != nil { + return fmt.Errorf("Failed to download PDF %s: %v", fn, err) + } + } + return nil +} + +func DownloadAnalyses(name string, conn Pipeliner) error { + for _, a := range []string{"conf", "graph.png"} { + fn := filepath.Join(name, a) + err := conn.Download(conn.WIPStorageId(), fn, fn) + if err != nil { + return fmt.Errorf("Failed to download analysis file %s: %v", fn, err) + } + } + return nil +} diff --git a/local.go b/local.go index ebc3611..0ccc761 100644 --- a/local.go +++ b/local.go @@ -36,7 +36,7 @@ func (a *LocalConn) MinimalInit() error { if a.TempDir == "" { a.TempDir = filepath.Join(os.TempDir(), "bookpipeline") } - err = os.Mkdir(a.TempDir, 0700) + err = os.MkdirAll(a.TempDir, 0700) if err != nil && !os.IsExist(err) { return fmt.Errorf("Error creating temporary directory: %v", err) } -- cgit v1.2.1-24-ge1ad From f19df9e8c1213a49c426caefd2fadc711f5faf11 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 18:55:36 +0000 Subject: Switch Preprocess() to take the thresholds to use, and have rescribe tool only use 0.1,0.2,0.3 --- cmd/bookpipeline/main.go | 2 +- cmd/rescribe/main.go | 2 +- internal/pipeline/pipeline.go | 30 ++++++++++++++++-------------- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index aff7b87..12d5eec 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -188,7 +188,7 @@ func main() { } conn.Log("Message received on preprocess queue, processing", msg.Body) stopTimer(stopIfQuiet) - err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during preprocess", err) diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index c309367..1a3dcff 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -221,7 +221,7 @@ func processbook(training string, conn Pipeliner) { } conn.Log("Message received on preprocess queue, processing", msg.Body) stopTimer(stopIfQuiet) - err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.3}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during preprocess", err) diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index b1c3cb9..cce5c19 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -146,22 +146,24 @@ func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, b done <- true } -func Preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range pre { - logger.Println("Preprocessing", path) - done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30) - if err != nil { - for range pre { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - _ = os.Remove(path) - for _, p := range done { - up <- p +func Preprocess(thresholds []float64) func(chan string, chan string, chan error, *log.Logger) { + return func(pre chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range pre { + logger.Println("Preprocessing", path) + done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, true, 5, 30, 120, 30) + if err != nil { + for range pre { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + _ = os.Remove(path) + for _, p := range done { + up <- p + } } + close(up) } - close(up) } func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { -- cgit v1.2.1-24-ge1ad From 2c040f73ce7bbba480c441a0433fc8b4d6449254 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 18:57:21 +0000 Subject: Add a couple of things that should not be forgotten --- cmd/booktopipeline/main.go | 2 ++ cmd/getpipelinebook/main.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go index 60d1f81..96a6f6c 100644 --- a/cmd/booktopipeline/main.go +++ b/cmd/booktopipeline/main.go @@ -19,6 +19,8 @@ import ( "rescribe.xyz/bookpipeline" ) +// TODO: use internal/pipeline/get.go functions + const usage = `Usage: booktopipeline [-c conn] [-t training] [-prebinarised] [-notbinarised] [-v] bookdir [bookname] Uploads the book in bookdir to the S3 'inprogress' bucket and adds it diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go index 03e709b..ef13db5 100644 --- a/cmd/getpipelinebook/main.go +++ b/cmd/getpipelinebook/main.go @@ -17,6 +17,8 @@ import ( "rescribe.xyz/bookpipeline" ) +// TODO: use internal/pipeline/get.go functions + const usage = `Usage: getpipelinebook [-c conn] [-a] [-graph] [-pdf] [-png] [-v] bookname Downloads the pipeline results for a book. -- cgit v1.2.1-24-ge1ad From c48630f590fe2c877e899948f1bf88458d3fd813 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 10 Nov 2020 10:05:16 +0000 Subject: Switch booktopipeline to use internal pipeline functions --- cmd/booktopipeline/main.go | 85 ++++++---------------------------------------- 1 file changed, 11 insertions(+), 74 deletions(-) diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go index 96a6f6c..7254d78 100644 --- a/cmd/booktopipeline/main.go +++ b/cmd/booktopipeline/main.go @@ -9,17 +9,14 @@ package main import ( "flag" "fmt" - "image" - _ "image/png" - _ "image/jpeg" "log" "os" "path/filepath" "rescribe.xyz/bookpipeline" -) -// TODO: use internal/pipeline/get.go functions + "rescribe.xyz/bookpipeline/internal/pipeline" +) const usage = `Usage: booktopipeline [-c conn] [-t training] [-prebinarised] [-notbinarised] [-v] bookdir [bookname] @@ -34,15 +31,6 @@ using the flags -prebinarised (for the wipeonly queue) or If bookname is omitted the last part of the bookdir is used. ` -type Pipeliner interface { - Init() error - PreQueueId() string - WipeQueueId() string - WIPStorageId() string - AddToQueue(url string, msg string) error - Upload(bucket string, key string, path string) error -} - // null writer to enable non-verbose logging to be discarded type NullWriter bool @@ -52,18 +40,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) { var verboselog *log.Logger -type fileWalk chan string - -func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if !info.IsDir() { - f <- path - } - return nil -} - func main() { verbose := flag.Bool("v", false, "Verbose") conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") @@ -96,7 +72,7 @@ func main() { verboselog = log.New(n, "", log.LstdFlags) } - var conn Pipeliner + var conn pipeline.Pipeliner switch *conntype { case "aws": conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} @@ -110,18 +86,7 @@ func main() { log.Fatalln("Failed to set up cloud connection:", err) } - qid := conn.PreQueueId() - - // Auto detect type of queue to send to based on file extension - pngdirs, _ := filepath.Glob(bookdir + "/*.png") - jpgdirs, _ := filepath.Glob(bookdir + "/*.jpg") - pngcount := len(pngdirs) - jpgcount := len(jpgdirs) - if pngcount > jpgcount { - qid = conn.WipeQueueId() - } else { - qid = conn.PreQueueId() - } + qid := pipeline.DetectQueueType(bookdir, conn) // Flags set override the queue selection if *wipeonly { @@ -132,43 +97,15 @@ func main() { } verboselog.Println("Checking that all images are valid in", bookdir) - checker := make(fileWalk) - go func() { - err = filepath.Walk(bookdir, checker.Walk) - if err != nil { - log.Fatalln("Filesystem walk failed:", err) - } - close(checker) - }() - - for path := range checker { - f, err := os.Open(path) - if err != nil { - log.Fatalln("Opening image %s failed, bailing: %v", path, err) - } - _, _, err = image.Decode(f) - if err != nil { - log.Fatalf("Decoding image %s failed, bailing: %v", path, err) - } + err = pipeline.CheckImages(bookdir) + if err != nil { + log.Fatalln(err) } - verboselog.Println("Walking", bookdir) - walker := make(fileWalk) - go func() { - err = filepath.Walk(bookdir, walker.Walk) - if err != nil { - log.Fatalln("Filesystem walk failed:", err) - } - close(walker) - }() - - for path := range walker { - verboselog.Println("Uploading", path) - name := filepath.Base(path) - err = conn.Upload(conn.WIPStorageId(), filepath.Join(bookname, name), path) - if err != nil { - log.Fatalln("Failed to upload", path, err) - } + verboselog.Println("Uploading all images are valid in", bookdir) + err = pipeline.UploadImages(bookdir, bookname, conn) + if err != nil { + log.Fatalln(err) } if *training != "" { -- cgit v1.2.1-24-ge1ad From 198f8215f8dd0460608abcd03fa49451462c9d11 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 10 Nov 2020 10:41:15 +0000 Subject: [getpipelinebook] Rewrite to use internal package functions --- cmd/getpipelinebook/main.go | 100 ++++++++---------------------------------- cmd/rescribe/main.go | 2 +- internal/pipeline/get.go | 33 +++++++++++++- internal/pipeline/pipeline.go | 5 +++ 4 files changed, 56 insertions(+), 84 deletions(-) diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go index ef13db5..5116414 100644 --- a/cmd/getpipelinebook/main.go +++ b/cmd/getpipelinebook/main.go @@ -6,18 +6,16 @@ package main import ( - "bufio" "flag" "fmt" "log" "os" "path/filepath" - "strings" "rescribe.xyz/bookpipeline" -) -// TODO: use internal/pipeline/get.go functions + "rescribe.xyz/bookpipeline/internal/pipeline" +) const usage = `Usage: getpipelinebook [-c conn] [-a] [-graph] [-pdf] [-png] [-v] bookname @@ -35,28 +33,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) { return len(p), nil } -type Pipeliner interface { - MinimalInit() error - ListObjects(bucket string, prefix string) ([]string, error) - Download(bucket string, key string, fn string) error - Upload(bucket string, key string, path string) error - CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) - AddToQueue(url string, msg string) error - DelFromQueue(url string, handle string) error - WIPStorageId() string -} - -func getpdfs(conn Pipeliner, l *log.Logger, bookname string) { - for _, suffix := range []string{".colour.pdf", ".binarised.pdf"} { - fn := filepath.Join(bookname, bookname+suffix) - l.Println("Downloading PDF", fn) - err := conn.Download(conn.WIPStorageId(), fn, fn) - if err != nil { - log.Printf("Failed to download %s: %s\n", fn, err) - } - } -} - func main() { all := flag.Bool("a", false, "Get all files for book") conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") @@ -85,7 +61,7 @@ func main() { verboselog = log.New(n, "", log.LstdFlags) } - var conn Pipeliner + var conn pipeline.MinPipeliner switch *conntype { case "aws": conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} @@ -111,18 +87,10 @@ func main() { if *all { verboselog.Println("Downloading all files for", bookname) - objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + err = pipeline.DownloadAll(bookname, conn) if err != nil { - log.Fatalln("Failed to get list of files for book", bookname, err) - } - for _, i := range objs { - verboselog.Println("Downloading", i) - err = conn.Download(conn.WIPStorageId(), i, i) - if err != nil { - log.Fatalln("Failed to download file", i, err) - } + log.Fatalln(err) } - return } if *binarisedpdf { @@ -153,61 +121,29 @@ func main() { } if *pdf { - getpdfs(conn, verboselog, bookname) + verboselog.Println("Downloading PDFs") + pipeline.DownloadPdfs(bookname, conn) } if *binarisedpdf || *colourpdf || *graph || *pdf { return } - verboselog.Println("Downloading best file") - fn := filepath.Join(bookname, "best") - err = conn.Download(conn.WIPStorageId(), fn, fn) + verboselog.Println("Downloading best pages") + err = pipeline.DownloadBestPages(bookname, conn, *png) if err != nil { - log.Fatalln("Failed to download 'best' file", err) - } - f, err := os.Open(fn) - if err != nil { - log.Fatalln("Failed to open best file", err) - } - defer f.Close() - - if *png { - verboselog.Println("Downloading png files") - s := bufio.NewScanner(f) - for s.Scan() { - txtfn := filepath.Join(bookname, s.Text()) - fn = strings.Replace(txtfn, ".hocr", ".png", 1) - verboselog.Println("Downloading file", fn) - err = conn.Download(conn.WIPStorageId(), fn, fn) - if err != nil { - log.Fatalln("Failed to download file", fn, err) - } - } - return + log.Fatalln(err) } - verboselog.Println("Downloading HOCR files") - s := bufio.NewScanner(f) - for s.Scan() { - fn = filepath.Join(bookname, s.Text()) - verboselog.Println("Downloading file", fn) - err = conn.Download(conn.WIPStorageId(), fn, fn) - if err != nil { - log.Fatalln("Failed to download file", fn, err) - } + verboselog.Println("Downloading PDFs") + pipeline.DownloadPdfs(bookname, conn) + if err != nil { + log.Fatalln(err) } - verboselog.Println("Downloading PDF files") - getpdfs(conn, verboselog, bookname) - - verboselog.Println("Downloading analysis files") - for _, a := range []string{"conf", "graph.png"} { - fn = filepath.Join(bookname, a) - verboselog.Println("Downloading file", fn) - err = conn.Download(conn.WIPStorageId(), fn, fn) - if err != nil { - log.Fatalln("Failed to download file", fn, err) - } + verboselog.Println("Downloading analyses") + err = pipeline.DownloadAnalyses(bookname, conn) + if err != nil { + log.Fatalln(err) } } diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 1a3dcff..8e2fe69 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -168,7 +168,7 @@ func downloadbook(name string, conn Pipeliner) error { log.Fatalln("Failed to create directory", name, err) } - err = pipeline.DownloadBestPages(name, conn) + err = pipeline.DownloadBestPages(name, conn, false) if err != nil { return fmt.Errorf("Error downloading best pages: %v", err) } diff --git a/internal/pipeline/get.go b/internal/pipeline/get.go index 8492d99..6949062 100644 --- a/internal/pipeline/get.go +++ b/internal/pipeline/get.go @@ -9,9 +9,10 @@ import ( "fmt" "os" "path/filepath" + "strings" ) -func DownloadBestPages(name string, conn Pipeliner) error { +func DownloadBestPages(name string, conn Pipeliner, pluspngs bool) error { fn := filepath.Join(name, "best") err := conn.Download(conn.WIPStorageId(), fn, fn) if err != nil { @@ -26,12 +27,27 @@ func DownloadBestPages(name string, conn Pipeliner) error { s := bufio.NewScanner(f) for s.Scan() { fn = filepath.Join(name, s.Text()) + conn.Log("Downloading file", fn) err = conn.Download(conn.WIPStorageId(), fn, fn) if err != nil { return fmt.Errorf("Failed to download file %s: %v", fn, err) } } + if !pluspngs { + return nil + } + + s = bufio.NewScanner(f) + for s.Scan() { + txtfn := filepath.Join(name, s.Text()) + fn = strings.Replace(txtfn, ".hocr", ".png", 1) + conn.Log("Downloading file", fn) + err = conn.Download(conn.WIPStorageId(), fn, fn) + if err != nil { + return fmt.Errorf("Failed to download file", fn, err) + } + } return nil } @@ -56,3 +72,18 @@ func DownloadAnalyses(name string, conn Pipeliner) error { } return nil } + +func DownloadAll(name string, conn Pipeliner) error { + objs, err := conn.ListObjects(conn.WIPStorageId(), name) + if err != nil { + return fmt.Errorf("Failed to get list of files for book", name, err) + } + for _, i := range objs { + conn.Log("Downloading", i) + err = conn.Download(conn.WIPStorageId(), i, i) + if err != nil { + return fmt.Errorf("Failed to download file", i, err) + } + } + return nil +} diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index cce5c19..c0accdb 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -52,6 +52,11 @@ type Pipeliner interface { Log(v ...interface{}) } +type MinPipeliner interface { + Pipeliner + MinimalInit() error +} + type pageimg struct { hocr, img string } -- cgit v1.2.1-24-ge1ad From 7921b5ca6d6667dda09ae67dcc1ee987aef62ebb Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 10 Nov 2020 11:22:36 +0000 Subject: [rescribe] Handle errors in processbook correctly, and improve console output --- cmd/rescribe/main.go | 44 +++++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 8e2fe69..3b69b21 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -122,15 +122,21 @@ func main() { err = uploadbook(bookdir, bookname, *training, conn) if err != nil { + _ = os.RemoveAll(tempdir) log.Fatalln(err) } - fmt.Printf("Processing book (this may take some time)\n") - processbook(*training, conn) + fmt.Printf("Processing book\n") + err = processbook(*training, conn) + if err != nil { + _ = os.RemoveAll(tempdir) + log.Fatalln(err) + } fmt.Printf("Saving finished book to %s\n", bookname) err = downloadbook(bookname, conn) if err != nil { + _ = os.RemoveAll(tempdir) log.Fatalln(err) } @@ -186,7 +192,7 @@ func downloadbook(name string, conn Pipeliner) error { return nil } -func processbook(training string, conn Pipeliner) { +func processbook(training string, conn Pipeliner) error { origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`) ocredPattern := regexp.MustCompile(`.hocr$`) @@ -212,26 +218,26 @@ func processbook(training string, conn Pipeliner) { msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs) checkPreQueue = time.After(PauseBetweenChecks) if err != nil { - conn.Log("Error checking preprocess queue", err) - continue + return fmt.Errorf("Error checking preprocess queue", err) } if msg.Handle == "" { conn.Log("No message received on preprocess queue, sleeping") continue } - conn.Log("Message received on preprocess queue, processing", msg.Body) stopTimer(stopIfQuiet) + conn.Log("Message received on preprocess queue, processing", msg.Body) + fmt.Printf(" Preprocessing book (binarising and wiping)\n") err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.3}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output resetTimer(stopIfQuiet, quietTime) if err != nil { - conn.Log("Error during preprocess", err) + return fmt.Errorf("Error during preprocess", err) } case <-checkWipeQueue: msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) checkWipeQueue = time.After(PauseBetweenChecks) if err != nil { - conn.Log("Error checking wipeonly queue", err) - continue + return fmt.Errorf("Error checking wipeonly queue", err) } if msg.Handle == "" { conn.Log("No message received on wipeonly queue, sleeping") @@ -239,17 +245,18 @@ func processbook(training string, conn Pipeliner) { } stopTimer(stopIfQuiet) conn.Log("Message received on wipeonly queue, processing", msg.Body) + fmt.Printf(" Preprocessing book (wiping only)\n") err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) + fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output resetTimer(stopIfQuiet, quietTime) if err != nil { - conn.Log("Error during wipe", err) + return fmt.Errorf("Error during wipe", err) } case <-checkOCRPageQueue: msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs) checkOCRPageQueue = time.After(PauseBetweenChecks) if err != nil { - conn.Log("Error checking OCR Page queue", err) - continue + return fmt.Errorf("Error checking OCR Page queue", err) } if msg.Handle == "" { continue @@ -259,17 +266,17 @@ func processbook(training string, conn Pipeliner) { checkOCRPageQueue = time.After(0) stopTimer(stopIfQuiet) conn.Log("Message received on OCR Page queue, processing", msg.Body) + fmt.Printf(".") err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { - conn.Log("Error during OCR Page process", err) + return fmt.Errorf("\nError during OCR Page process", err) } case <-checkAnalyseQueue: msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs) checkAnalyseQueue = time.After(PauseBetweenChecks) if err != nil { - conn.Log("Error checking analyse queue", err) - continue + return fmt.Errorf("Error checking analyse queue", err) } if msg.Handle == "" { conn.Log("No message received on analyse queue, sleeping") @@ -277,14 +284,17 @@ func processbook(training string, conn Pipeliner) { } stopTimer(stopIfQuiet) conn.Log("Message received on analyse queue, processing", msg.Body) + fmt.Printf("\n Analysing OCR and compiling PDFs\n") err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") resetTimer(stopIfQuiet, quietTime) if err != nil { - conn.Log("Error during analysis", err) + return fmt.Errorf("Error during analysis", err) } case <-stopIfQuiet.C: conn.Log("Processing finished") - return + return nil } } + + return fmt.Errorf("Ended unexpectedly") // should never be reached } -- cgit v1.2.1-24-ge1ad From dac2f1ad471cd9896c16569fe02c69ff9b9855ba Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 10 Nov 2020 11:59:14 +0000 Subject: [rescribe] Change -t to the path of the traineddata file, and set TESSDATA_PREFIX accordingly --- cmd/rescribe/main.go | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 3b69b21..8d7c07b 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -13,8 +13,10 @@ import ( "io/ioutil" "log" "os" + "os/exec" "path/filepath" "regexp" + "strings" "time" "rescribe.xyz/bookpipeline" @@ -74,7 +76,7 @@ func resetTimer(t *time.Timer, d time.Duration) { func main() { verbose := flag.Bool("v", false, "verbose") - training := flag.String("t", "rescribealphav5", "default tesseract training file to use (without the .traineddata part)") + training := flag.String("t", "training/rescribev7_fast.traineddata", "path to the tesseract training file to use") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) @@ -103,6 +105,33 @@ func main() { verboselog = log.New(n, "", 0) } + f, err := os.Open(*training) + if err != nil { + fmt.Fprintf(os.Stderr, "Error: Training file %s could not be opened.\n", *training) + fmt.Fprintf(os.Stderr, "Set the `-t` flag with path to a tesseract .traineddata file.\n") + os.Exit(1) + } + f.Close() + + abstraining, err := filepath.Abs(*training) + if err != nil { + log.Fatalf("Error getting absolute path of training %s: %v", err) + } + tessPrefix, trainingName := filepath.Split(abstraining) + trainingName = strings.TrimSuffix(trainingName, ".traineddata") + err = os.Setenv("TESSDATA_PREFIX", tessPrefix) + if err != nil { + log.Fatalln("Error setting TESSDATA_PREFIX:", err) + } + + // TODO: would be good to be able to set custom path to tesseract + _, err = exec.Command("tesseract", "--help").Output() + if err != nil { + fmt.Fprintf(os.Stderr, "Error: Can't run Tesseract.\n") + fmt.Fprintf(os.Stderr, "Ensure that Tesseract is installed and available.\n") + os.Exit(1) + } + tempdir, err := ioutil.TempDir("", "bookpipeline") if err != nil { log.Fatalln("Error setting up temporary directory:", err) @@ -120,14 +149,14 @@ func main() { fmt.Printf("Copying book to pipeline\n") - err = uploadbook(bookdir, bookname, *training, conn) + err = uploadbook(bookdir, bookname, trainingName, conn) if err != nil { _ = os.RemoveAll(tempdir) log.Fatalln(err) } fmt.Printf("Processing book\n") - err = processbook(*training, conn) + err = processbook(trainingName, conn) if err != nil { _ = os.RemoveAll(tempdir) log.Fatalln(err) -- cgit v1.2.1-24-ge1ad From ad7aaf490e78e969bb5495dfda06a33d2a176aec Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 10 Nov 2020 12:28:50 +0000 Subject: [rescribe] Enable custom paths to tesseract command to be set (also improve some error output) --- cmd/bookpipeline/main.go | 2 +- cmd/rescribe/main.go | 36 ++++++++++++++++++------------------ internal/pipeline/pipeline.go | 9 ++++++--- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 12d5eec..909b431 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -226,7 +226,7 @@ func main() { checkOCRPageQueue = time.After(0) stopTimer(stopIfQuiet) conn.Log("Message received on OCR Page queue, processing", msg.Body) - err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training, ""), conn.OCRPageQueueId(), conn.AnalyseQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during OCR Page process", err) diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 8d7c07b..6a2fb9f 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -77,6 +77,7 @@ func resetTimer(t *time.Timer, d time.Duration) { func main() { verbose := flag.Bool("v", false, "verbose") training := flag.String("t", "training/rescribev7_fast.traineddata", "path to the tesseract training file to use") + tesscmd := flag.String("tesscmd", "tesseract", "The Tesseract executable to run. You may need to set this to the full path of Tesseract.exe if you're on Windows.") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) @@ -124,11 +125,12 @@ func main() { log.Fatalln("Error setting TESSDATA_PREFIX:", err) } - // TODO: would be good to be able to set custom path to tesseract - _, err = exec.Command("tesseract", "--help").Output() + _, err = exec.Command(*tesscmd, "--help").Output() if err != nil { fmt.Fprintf(os.Stderr, "Error: Can't run Tesseract.\n") fmt.Fprintf(os.Stderr, "Ensure that Tesseract is installed and available.\n") + fmt.Fprintf(os.Stderr, "You may need to -tesscmd to the full path of Tesseract.exe if you're on Windows, like this:\n") + fmt.Fprintf(os.Stderr, " rescribe -tesscmd 'C:\\Program Files\\Tesseract OCR\\tesseract.exe' ...\n") os.Exit(1) } @@ -149,14 +151,14 @@ func main() { fmt.Printf("Copying book to pipeline\n") - err = uploadbook(bookdir, bookname, trainingName, conn) + err = uploadbook(bookdir, bookname, conn) if err != nil { _ = os.RemoveAll(tempdir) log.Fatalln(err) } fmt.Printf("Processing book\n") - err = processbook(trainingName, conn) + err = processbook(trainingName, *tesscmd, conn) if err != nil { _ = os.RemoveAll(tempdir) log.Fatalln(err) @@ -175,7 +177,7 @@ func main() { } } -func uploadbook(dir string, name string, training string, conn Pipeliner) error { +func uploadbook(dir string, name string, conn Pipeliner) error { err := pipeline.CheckImages(dir) if err != nil { return fmt.Errorf("Error with images in %s: %v", dir, err) @@ -186,9 +188,7 @@ func uploadbook(dir string, name string, training string, conn Pipeliner) error } qid := pipeline.DetectQueueType(dir, conn) - if training != "" { - name = name + " " + training - } + err = conn.AddToQueue(qid, name) if err != nil { return fmt.Errorf("Error adding book job to queue %s: %v", qid, err) @@ -221,7 +221,7 @@ func downloadbook(name string, conn Pipeliner) error { return nil } -func processbook(training string, conn Pipeliner) error { +func processbook(training string, tesscmd string, conn Pipeliner) error { origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`) ocredPattern := regexp.MustCompile(`.hocr$`) @@ -247,7 +247,7 @@ func processbook(training string, conn Pipeliner) error { msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs) checkPreQueue = time.After(PauseBetweenChecks) if err != nil { - return fmt.Errorf("Error checking preprocess queue", err) + return fmt.Errorf("Error checking preprocess queue: %v", err) } if msg.Handle == "" { conn.Log("No message received on preprocess queue, sleeping") @@ -260,13 +260,13 @@ func processbook(training string, conn Pipeliner) error { fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output resetTimer(stopIfQuiet, quietTime) if err != nil { - return fmt.Errorf("Error during preprocess", err) + return fmt.Errorf("Error during preprocess: %v", err) } case <-checkWipeQueue: msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) checkWipeQueue = time.After(PauseBetweenChecks) if err != nil { - return fmt.Errorf("Error checking wipeonly queue", err) + return fmt.Errorf("Error checking wipeonly queue, %v", err) } if msg.Handle == "" { conn.Log("No message received on wipeonly queue, sleeping") @@ -279,13 +279,13 @@ func processbook(training string, conn Pipeliner) error { fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output resetTimer(stopIfQuiet, quietTime) if err != nil { - return fmt.Errorf("Error during wipe", err) + return fmt.Errorf("Error during wipe: %v", err) } case <-checkOCRPageQueue: msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs) checkOCRPageQueue = time.After(PauseBetweenChecks) if err != nil { - return fmt.Errorf("Error checking OCR Page queue", err) + return fmt.Errorf("Error checking OCR Page queue: %v", err) } if msg.Handle == "" { continue @@ -296,16 +296,16 @@ func processbook(training string, conn Pipeliner) error { stopTimer(stopIfQuiet) conn.Log("Message received on OCR Page queue, processing", msg.Body) fmt.Printf(".") - err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training, tesscmd), conn.OCRPageQueueId(), conn.AnalyseQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { - return fmt.Errorf("\nError during OCR Page process", err) + return fmt.Errorf("\nError during OCR Page process: %v", err) } case <-checkAnalyseQueue: msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs) checkAnalyseQueue = time.After(PauseBetweenChecks) if err != nil { - return fmt.Errorf("Error checking analyse queue", err) + return fmt.Errorf("Error checking analyse queue: %v", err) } if msg.Handle == "" { conn.Log("No message received on analyse queue, sleeping") @@ -317,7 +317,7 @@ func processbook(training string, conn Pipeliner) error { err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") resetTimer(stopIfQuiet, quietTime) if err != nil { - return fmt.Errorf("Error during analysis", err) + return fmt.Errorf("Error during analysis: %v", err) } case <-stopIfQuiet.C: conn.Log("Processing finished") diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index c0accdb..f6598fd 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -189,12 +189,15 @@ func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logge close(up) } -func Ocr(training string) func(chan string, chan string, chan error, *log.Logger) { +func Ocr(training string, tesscmd string) func(chan string, chan string, chan error, *log.Logger) { return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { + if tesscmd == "" { + tesscmd = "tesseract" + } for path := range toocr { logger.Println("OCRing", path) name := strings.Replace(path, ".png", "", 1) - cmd := exec.Command("tesseract", "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") + cmd := exec.Command(tesscmd, "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr @@ -491,7 +494,7 @@ func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, ch msgparts := strings.Split(msg.Body, " ") bookname := filepath.Dir(msgparts[0]) if len(msgparts) > 1 && msgparts[1] != "" { - process = Ocr(msgparts[1]) + process = Ocr(msgparts[1], "") } d := filepath.Join(os.TempDir(), bookname) -- cgit v1.2.1-24-ge1ad From 33f1726a4c9f8013dcde39e644281059d9766bc4 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 10 Nov 2020 12:30:15 +0000 Subject: gofmt --- cmd/getbests/main.go | 4 +-- cmd/postprocess-bythresh/main.go | 71 ++++++++++++++++++---------------------- internal/pipeline/pipeline.go | 4 +-- internal/pipeline/put.go | 2 +- local.go | 6 ++-- 5 files changed, 40 insertions(+), 47 deletions(-) diff --git a/cmd/getbests/main.go b/cmd/getbests/main.go index 9eca0d8..c1ee50d 100644 --- a/cmd/getbests/main.go +++ b/cmd/getbests/main.go @@ -62,8 +62,8 @@ func main() { log.Println("Downloading all best files found") for _, i := range objs { parts := strings.Split(i, "/") - if parts[len(parts) - 1] == "best" { - err = conn.Download(conn.WIPStorageId(), i, parts[0] + "-best") + if parts[len(parts)-1] == "best" { + err = conn.Download(conn.WIPStorageId(), i, parts[0]+"-best") if err != nil { log.Fatalln("Failed to download file", i, err) } diff --git a/cmd/postprocess-bythresh/main.go b/cmd/postprocess-bythresh/main.go index 37b77e7..5bdb839 100644 --- a/cmd/postprocess-bythresh/main.go +++ b/cmd/postprocess-bythresh/main.go @@ -19,7 +19,6 @@ import ( //TO DO: make writetofile return an error and handle that accordingly // potential TO DO: add text versions where footer is cropped on odd/even pages only - // the trimblanks function trims the blank lines from a text input func trimblanks(hocrfile string) string { @@ -50,7 +49,7 @@ func dehyphenateString(in string) string { words := strings.Split(line, " ") last := words[len(words)-1] // the - 2 here is to account for a trailing newline and counting from zero - if len(last) > 0 && last[len(last) - 1] == '-' && i < len(lines) - 2 { + if len(last) > 0 && last[len(last)-1] == '-' && i < len(lines)-2 { nextwords := strings.Split(lines[i+1], " ") if len(nextwords) > 0 { line = line[0:len(line)-1] + nextwords[0] @@ -66,17 +65,15 @@ func dehyphenateString(in string) string { return strings.Join(newlines, " ") } - // the fullcrop function takes a text input and crops the first and the last line (if text is at least 2 lines long) func fullcrop(noblanks string) string { - alllines := strings.Split(noblanks, "\n") - + if len(alllines) <= 2 { - return "" - } else { - return strings.Join(alllines[1:len(alllines)-2], "\n") + return "" + } else { + return strings.Join(alllines[1:len(alllines)-2], "\n") } } @@ -132,7 +129,6 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string, var killheadtxt string var footkilltxt string - hocrfilepath := filepath.Join(bookdirectory, hocrfilename) confpath := filepath.Join(bookdirectory, "conf") @@ -165,18 +161,16 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string, if err != nil { log.Fatal(err) } - - + trimbest := trimblanks(hocrfiletext) - + alltxt = dehyphenateString(trimbest) - + croptxt = dehyphenateString(fullcrop(trimbest)) - + killheadtxt = dehyphenateString(headcrop(trimbest)) - + footkilltxt = dehyphenateString(footcrop(trimbest)) - } return alltxt, croptxt, killheadtxt, footkilltxt @@ -185,7 +179,7 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string, // the writetofile function takes a directory, filename and text input and creates a text file within the bookdirectory from them. func writetofile(bookdirectory, textfilebase, txt string) error { alltxtfile := filepath.Join(bookdirectory, textfilebase) - + file, err := os.Create(alltxtfile) if err != nil { return fmt.Errorf("Error opening file %s: %v", alltxtfile, err) @@ -194,7 +188,7 @@ func writetofile(bookdirectory, textfilebase, txt string) error { if _, err := file.WriteString(txt); err != nil { log.Println(err) } -return err + return err } @@ -215,7 +209,7 @@ func main() { bookdirectory := flag.Arg(0) confthreshstring := strconv.Itoa(*confthresh) - + fmt.Println("Postprocessing", bookdirectory, "with threshold", *confthresh) bestpath := filepath.Join(bookdirectory, "best") @@ -239,32 +233,31 @@ func main() { crop = crop + " " + croptxt killhead = killhead + " " + killheadtxt killfoot = killfoot + " " + footkilltxt - + } } - - - bookname:= filepath.Base(bookdirectory) - b := bookname + "_" + confthreshstring - err1 := writetofile(bookdirectory, b + "_all.txt", all) - if err1 != nil { + bookname := filepath.Base(bookdirectory) + b := bookname + "_" + confthreshstring + + err1 := writetofile(bookdirectory, b+"_all.txt", all) + if err1 != nil { log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err1) - } - - err2 := writetofile(bookdirectory, b + "_crop.txt", crop) - if err2 != nil { + } + + err2 := writetofile(bookdirectory, b+"_crop.txt", crop) + if err2 != nil { log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err2) - } - - err3 := writetofile(bookdirectory, b + "_nohead.txt", killhead) - if err3 != nil { + } + + err3 := writetofile(bookdirectory, b+"_nohead.txt", killhead) + if err3 != nil { log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err3) - } - - err4 := writetofile(bookdirectory, b + "_nofoot.txt", killfoot) - if err4 != nil { + } + + err4 := writetofile(bookdirectory, b+"_nofoot.txt", killfoot) + if err4 != nil { log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err4) - } + } } diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index f6598fd..280e4d2 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -640,8 +640,8 @@ func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string conn.Log("Failed to get logs ", err2) logs = "" } - msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" + - "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" + + msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n"+ + "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n"+ " Fail message: %s\r\nFull log:\r\n%s\r\n", ms.to, ms.from, bookname, err, logs) host := fmt.Sprintf("%s:%s", ms.server, ms.port) diff --git a/internal/pipeline/put.go b/internal/pipeline/put.go index 8ada41f..4b38ea5 100644 --- a/internal/pipeline/put.go +++ b/internal/pipeline/put.go @@ -7,8 +7,8 @@ package pipeline import ( "fmt" "image" - _ "image/png" _ "image/jpeg" + _ "image/png" "os" "path/filepath" ) diff --git a/local.go b/local.go index 0ccc761..e5d9bef 100644 --- a/local.go +++ b/local.go @@ -27,7 +27,7 @@ const storageId = "storage" type LocalConn struct { // these should be set before running Init(), or left to defaults TempDir string - Logger *log.Logger + Logger *log.Logger } // MinimalInit does the bare minimum initialisation @@ -184,12 +184,12 @@ func (a *LocalConn) DelFromQueue(url string, handle string) error { // store the joining of part before and part after handle var complete string - if len(s) >= len(handle) + 1 { + if len(s) >= len(handle)+1 { if i > 0 { complete = s[:i] } // the '+1' is for the newline character - complete += s[i + len(handle) + 1:] + complete += s[i+len(handle)+1:] } f, err := os.Create(filepath.Join(a.TempDir, url)) -- cgit v1.2.1-24-ge1ad From 514665f407af82e2fe215971f247acba60959977 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 16 Nov 2020 15:27:18 +0000 Subject: Add makefile for generating cross compiled rescribe binaries --- makefile | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 makefile diff --git a/makefile b/makefile new file mode 100644 index 0000000..6ba1af5 --- /dev/null +++ b/makefile @@ -0,0 +1,12 @@ +# See LICENSE file for copyright and license details. + +default: + @echo "To build and install use the basic go tools like so: go install ./..." + @echo "This makefile is just for cross compiling (for which the" + @echo "targets rescribe-osx and rescribe-w32 exist)" + +rescribe-osx: + GOOS=darwin GOARCH=amd64 go build -o $@ ./cmd/rescribe + +rescribe.exe: + GOOS=windows GOARCH=386 go build -o $@ ./cmd/rescribe -- cgit v1.2.1-24-ge1ad From 6b5145f0b75c8d5719bf44d5f654b9a2d1e3b2cd Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 16 Nov 2020 16:43:53 +0000 Subject: [rescribe] Mention in usage that things can be saved in a different directory --- cmd/rescribe/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 6a2fb9f..8ca4189 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -24,7 +24,7 @@ import ( "rescribe.xyz/bookpipeline/internal/pipeline" ) -const usage = `Usage: rescribe [-v] [-t training] bookdir +const usage = `Usage: rescribe [-v] [-t training] bookdir [savedir] Process and OCR a book using the Rescribe pipeline on a local machine. ` -- cgit v1.2.1-24-ge1ad From 56c1cf041aec9cb2352a3bd4a4b46e65a3cc04c0 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 16 Nov 2020 16:44:42 +0000 Subject: [rescribe] Add txt output, only keep colour pdf, and reorganise files so they're more user-friendly --- cmd/rescribe/main.go | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 8ca4189..fe36aea 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -20,6 +20,7 @@ import ( "time" "rescribe.xyz/bookpipeline" + "rescribe.xyz/utils/pkg/hocr" "rescribe.xyz/bookpipeline/internal/pipeline" ) @@ -175,6 +176,55 @@ func main() { if err != nil { log.Fatalf("Error removing temporary directory %s: %v", tempdir, err) } + + hocrs, err := filepath.Glob(fmt.Sprintf("%s/*hocr", bookname)) + if err != nil { + log.Fatalf("Error looking for .hocr files: %v", err) + } + + for _, v := range hocrs { + err = addTxtVersion(v) + if err != nil { + log.Fatalf("Error creating txt version of %s: %v", v, err) + } + + err = os.MkdirAll(filepath.Join(bookname, "hocr"), 0755) + if err != nil { + log.Fatalf("Error creating hocr directory: %v", err) + } + + err = os.Rename(v, filepath.Join(bookname, "hocr", filepath.Base(v))) + if err != nil { + log.Fatalf("Error moving hocr %s to hocr directory: %v", v, err) + } + } + + // For simplicity, remove .binarised.pdf and rename .colour.pdf to .pdf + _ = os.Remove(filepath.Join(bookname, bookname + ".binarised.pdf")) + _ = os.Rename(filepath.Join(bookname, bookname + ".colour.pdf"), filepath.Join(bookname, bookname + ".pdf")) +} + +func addTxtVersion(hocrfn string) error { + dir := filepath.Dir(hocrfn) + err := os.MkdirAll(filepath.Join(dir, "text"), 0755) + if err != nil { + log.Fatalf("Error creating text directory: %v", err) + } + + t, err := hocr.GetText(hocrfn) + if err != nil { + return fmt.Errorf("Error getting text from hocr file %s: %v", hocrfn, err) + } + + basefn := strings.TrimSuffix(filepath.Base(hocrfn), ".hocr") + ".txt" + fn := filepath.Join(dir, "text", basefn) + + err = ioutil.WriteFile(fn, []byte(t), 0644) + if err != nil { + return fmt.Errorf("Error creating text file %s: %v", fn, err) + } + + return nil } func uploadbook(dir string, name string, conn Pipeliner) error { -- cgit v1.2.1-24-ge1ad From eefa8f50d7ab915ce426c837cf504d26b7d4ccee Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 16 Nov 2020 17:43:13 +0000 Subject: [rescribe] Default to an appropriate tesscmd for Windows --- cmd/rescribe/main.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index fe36aea..2320a2c 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -16,6 +16,7 @@ import ( "os/exec" "path/filepath" "regexp" + "runtime" "strings" "time" @@ -76,9 +77,14 @@ func resetTimer(t *time.Timer, d time.Duration) { } func main() { + deftesscmd := "tesseract" + if runtime.GOOS == "windows" { + deftesscmd = "C:\\Program Files\\Tesseract-OCR\\tesseract.exe" + } + verbose := flag.Bool("v", false, "verbose") training := flag.String("t", "training/rescribev7_fast.traineddata", "path to the tesseract training file to use") - tesscmd := flag.String("tesscmd", "tesseract", "The Tesseract executable to run. You may need to set this to the full path of Tesseract.exe if you're on Windows.") + tesscmd := flag.String("tesscmd", deftesscmd, "The Tesseract executable to run. You may need to set this to the full path of Tesseract.exe if you're on Windows.") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) @@ -131,7 +137,7 @@ func main() { fmt.Fprintf(os.Stderr, "Error: Can't run Tesseract.\n") fmt.Fprintf(os.Stderr, "Ensure that Tesseract is installed and available.\n") fmt.Fprintf(os.Stderr, "You may need to -tesscmd to the full path of Tesseract.exe if you're on Windows, like this:\n") - fmt.Fprintf(os.Stderr, " rescribe -tesscmd 'C:\\Program Files\\Tesseract OCR\\tesseract.exe' ...\n") + fmt.Fprintf(os.Stderr, " rescribe -tesscmd 'C:\\Program Files\\Tesseract OCR (x86)\\tesseract.exe' ...\n") os.Exit(1) } -- cgit v1.2.1-24-ge1ad From cfbb3481368714adcd734906d8a460b873551c90 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 16 Nov 2020 17:43:27 +0000 Subject: Some changes to ensure the pipeline works correctly on Windows There were a couple of places where a file was uploaded while still open, which resulted in an attempt to remove it, which causes an error from Windows. The allOCRed function also included an assumption that the path separator would be a /, which is always correct for AWS, and correct for local on Linux and OSX, but not for local Windows. Fixed by leaving the separator well alone. Also, the local connection was not stripping leading \, like it did /, which caused an issue with Windows local. Windows local is now tested and working, at least through wine. --- internal/pipeline/pipeline.go | 5 +++-- local.go | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 280e4d2..20400ad 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -269,6 +269,7 @@ func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Log } } } + f.Close() up <- fn logger.Println("Creating best file listing the best file for each page") @@ -282,6 +283,7 @@ func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Log for _, conf := range bestconfs { _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) } + f.Close() up <- fn var pgs []string @@ -461,8 +463,7 @@ func allOCRed(bookname string, conn Pipeliner) bool { if preprocessedPattern.MatchString(png) { atleastone = true found := false - b := strings.TrimSuffix(filepath.Base(png), ".png") - hocrname := bookname + "/" + b + ".hocr" + hocrname := strings.TrimSuffix(png, ".png") + ".hocr" for _, hocr := range objs { if hocr == hocrname { found = true diff --git a/local.go b/local.go index e5d9bef..e66f477 100644 --- a/local.go +++ b/local.go @@ -134,6 +134,7 @@ func prefixwalker(dirpath string, prefix string, list *[]ObjMeta) filepath.WalkF } n := strings.TrimPrefix(path, dirpath) n = strings.TrimPrefix(n, "/") + n = strings.TrimPrefix(n, "\\") o := ObjMeta{Name: n, Date: info.ModTime()} *list = append(*list, o) return nil -- cgit v1.2.1-24-ge1ad From f71fd636f151e5cb7eafb2ae6c21c1c188d43fdd Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 17 Nov 2020 12:24:42 +0000 Subject: Remove _bin0.x from txt filenames --- cmd/rescribe/main.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 2320a2c..f4489d8 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -34,6 +34,7 @@ Process and OCR a book using the Rescribe pipeline on a local machine. const QueueTimeoutSecs = 2 * 60 const PauseBetweenChecks = 1 * time.Second const LogSaveTime = 1 * time.Minute +var thresholds = []float64{0.1, 0.2, 0.3} // null writer to enable non-verbose logging to be discarded type NullWriter bool @@ -222,8 +223,11 @@ func addTxtVersion(hocrfn string) error { return fmt.Errorf("Error getting text from hocr file %s: %v", hocrfn, err) } - basefn := strings.TrimSuffix(filepath.Base(hocrfn), ".hocr") + ".txt" - fn := filepath.Join(dir, "text", basefn) + basefn := filepath.Base(hocrfn) + for _, v := range thresholds { + basefn = strings.TrimSuffix(basefn, fmt.Sprintf("_bin%.1f.hocr", v)) + } + fn := filepath.Join(dir, "text", basefn + ".txt") err = ioutil.WriteFile(fn, []byte(t), 0644) if err != nil { @@ -312,7 +316,7 @@ func processbook(training string, tesscmd string, conn Pipeliner) error { stopTimer(stopIfQuiet) conn.Log("Message received on preprocess queue, processing", msg.Body) fmt.Printf(" Preprocessing book (binarising and wiping)\n") - err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.3}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess(thresholds), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output resetTimer(stopIfQuiet, quietTime) if err != nil { -- cgit v1.2.1-24-ge1ad From 2717c5ed21a082a7f24833f3d57b303fd22bd4e5 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 17 Nov 2020 16:37:54 +0000 Subject: Add trimqueue and logwholequeue utilities which can help deal with weird queue states --- aws.go | 58 ++++++++++++++++++++++++++++++++ cmd/logwholequeue/main.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++ cmd/trimqueue/main.go | 84 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 227 insertions(+) create mode 100644 cmd/logwholequeue/main.go create mode 100644 cmd/trimqueue/main.go diff --git a/aws.go b/aws.go index 5ebc79f..6b707fe 100644 --- a/aws.go +++ b/aws.go @@ -9,6 +9,7 @@ import ( "fmt" "log" "os" + "strings" "time" "github.com/aws/aws-sdk-go/aws" @@ -178,6 +179,63 @@ func (a *AwsConn) LogAndPurgeQueue(url string) error { return nil } +// LogQueue prints the body of all messages in a queue to the log +func (a *AwsConn) LogQueue(url string) error { + for { + msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ + MaxNumberOfMessages: aws.Int64(10), + VisibilityTimeout: aws.Int64(300), + QueueUrl: &url, + }) + if err != nil { + return err + } + + if len(msgResult.Messages) > 0 { + for _, m := range msgResult.Messages { + a.Logger.Println(*m.Body) + } + } else { + break + } + } + return nil +} + +// RemovePrefixesFromQueue removes any messages in a queue whose +// body starts with the specified prefix. +func (a *AwsConn) RemovePrefixesFromQueue(url string, prefix string) error { + for { + msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ + MaxNumberOfMessages: aws.Int64(10), + VisibilityTimeout: aws.Int64(300), + QueueUrl: &url, + }) + if err != nil { + return err + } + + if len(msgResult.Messages) > 0 { + for _, m := range msgResult.Messages { + if !strings.HasPrefix(*m.Body, prefix) { + continue + } + a.Logger.Printf("Removing %s from queue\n", *m.Body) + _, err = a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: &url, + ReceiptHandle: m.ReceiptHandle, + }) + if err != nil { + return err + } + } + } else { + break + } + } + return nil +} + // QueueHeartbeat updates the visibility timeout of a message. This // ensures that the message remains "in flight", meaning that it // cannot be seen by other processes, but if this process fails the diff --git a/cmd/logwholequeue/main.go b/cmd/logwholequeue/main.go new file mode 100644 index 0000000..71e8927 --- /dev/null +++ b/cmd/logwholequeue/main.go @@ -0,0 +1,85 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// logwholequeue gets all messages in a queue. This can be useful +// for debugging queue issues. +package main + +import ( + "flag" + "fmt" + "log" + + "rescribe.xyz/bookpipeline" +) + +const usage = `Usage: logwholequeue qname + +logwholequeue gets all messages in a queue. + +This can be useful for debugging queue issues. + +Valid queue names: +- preprocess +- wipeonly +- ocrpage +- analyse +` + +type QueuePipeliner interface { + Init() error + LogQueue(url string) error + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string +} + +func main() { + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + if flag.NArg() != 1 { + flag.Usage() + return + } + + var conn QueuePipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2"} + + err := conn.Init() + if err != nil { + log.Fatalln("Error setting up cloud connection:", err) + } + + qdetails := []struct { + id, name string + }{ + {conn.PreQueueId(), "preprocess"}, + {conn.WipeQueueId(), "wipeonly"}, + {conn.OCRPageQueueId(), "ocrpage"}, + {conn.AnalyseQueueId(), "analyse"}, + } + + qname := flag.Arg(0) + + var qid string + for i, n := range qdetails { + if n.name == qname { + qid = qdetails[i].id + break + } + } + if qid == "" { + log.Fatalln("Error, no queue named", qname) + } + + err = conn.LogQueue(qid) + if err != nil { + log.Fatalln("Error getting queue", qname, ":", err) + } +} diff --git a/cmd/trimqueue/main.go b/cmd/trimqueue/main.go new file mode 100644 index 0000000..cf65c4d --- /dev/null +++ b/cmd/trimqueue/main.go @@ -0,0 +1,84 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// trimqueue deletes any messages in a queue that match a specified +// prefix. +package main + +import ( + "flag" + "fmt" + "log" + + "rescribe.xyz/bookpipeline" +) + +const usage = `Usage: trimprefix qname prefix + +trimqueue deletes any messages in a queue that match a specified +prefix. + +Valid queue names: +- preprocess +- wipeonly +- ocrpage +- analyse +` + +type QueuePipeliner interface { + Init() error + RemovePrefixesFromQueue(url string, prefix string) error + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string +} + +func main() { + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + if flag.NArg() != 2 { + flag.Usage() + return + } + + var conn QueuePipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2"} + + err := conn.Init() + if err != nil { + log.Fatalln("Error setting up cloud connection:", err) + } + + qdetails := []struct { + id, name string + }{ + {conn.PreQueueId(), "preprocess"}, + {conn.WipeQueueId(), "wipeonly"}, + {conn.OCRPageQueueId(), "ocrpage"}, + {conn.AnalyseQueueId(), "analyse"}, + } + + qname := flag.Arg(0) + + var qid string + for i, n := range qdetails { + if n.name == qname { + qid = qdetails[i].id + break + } + } + if qid == "" { + log.Fatalln("Error, no queue named", qname) + } + + err = conn.RemovePrefixesFromQueue(qid, flag.Arg(1)) + if err != nil { + log.Fatalln("Error removing prefixes from queue", qname, ":", err) + } +} -- cgit v1.2.1-24-ge1ad From 82ee93b53ae4fcef619543f643dc626f6c9353cf Mon Sep 17 00:00:00 2001 From: Nick White Date: Wed, 18 Nov 2020 13:47:44 +0000 Subject: Describe rescribe tool in documentation --- README | 12 ++++++------ doc.go | 11 ++++------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/README b/README index 681d731..0d2119b 100644 --- a/README +++ b/README @@ -46,19 +46,19 @@ setting: - pdfbook : creates a searchable PDF from a directory of hOCR and image files -## Local operation +## Rescribe tool for local operation While bookpipeline was built with cloud based operation in mind, there is also a local mode that can be used to run OCR jobs from a single computer, with all the benefits of preprocessing, choosing the best threshold for each image, graph creation, PDF creation, and so on that the pipeline provides. -You can use this by passing the '-c local' flag to the core bookpipeline -commands. Here is a simple example run: +Several of the commands accept a `-c local` flag for local operation, but now +there is also a new command, named `rescribe`, that is designed to make things +much simpler for people just wanting to do some OCR on their local computer. - booktopipeline -c local MyBook - bookpipeline -v -c local # run until MyBook has finished processing - getpipelinebook -c local MyBook +More information about this, including links to prebuilt executables, can be +found on our blog at . ## Contributions diff --git a/doc.go b/doc.go index 179a4f3..bd4da11 100644 --- a/doc.go +++ b/doc.go @@ -174,14 +174,11 @@ Local operation While bookpipeline was built with cloud based operation in mind, there is also a local mode that can be used to run OCR jobs from a single computer, with all the benefits of preprocessing, choosing the best threshold for each image, -graph creation and so on that the pipeline provides. +graph creation, PDF creation, and so on that the pipeline provides. -You can use this by passing the '-c local' flag to the core bookpipeline -commands. Here is a simple example run: - - booktopipeline -c local MyBook - bookpipeline -v -c local # run until MyBook has finished processing - getpipelinebook -c local MyBook +Several of the commands accept a `-c local` flag for local operation, but now +there is also a new command, named rescribe, that is designed to make things +much simpler for people just wanting to do some OCR on their local computer. Note that the local mode is not as well tested as the core cloud modes; please report any bugs you find with it. -- cgit v1.2.1-24-ge1ad From 0b9bd466dd2e099bf6c7d3165f1285f4b7a8f38e Mon Sep 17 00:00:00 2001 From: Nick White Date: Wed, 18 Nov 2020 15:19:28 +0000 Subject: Switch to a maintained version of gofpdf --- go.mod | 2 +- go.sum | 6 +++--- pdf.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 4f8615e..666e3f4 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/aws/aws-sdk-go v1.30.5 github.com/blend/go-sdk v2.0.0+incompatible // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/jung-kurt/gofpdf v1.16.2 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect + github.com/phpdave11/gofpdf v1.4.2 github.com/wcharczuk/go-chart v2.0.2-0.20191206192251-962b9abdec2b+incompatible golang.org/x/image v0.0.0-20200618115811-c13761719519 golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect diff --git a/go.sum b/go.sum index 6689e31..d77f80b 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,6 @@ github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGw github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= -github.com/jung-kurt/gofpdf v1.16.2 h1:jgbatWHfRlPYiK85qgevsZTHviWXKwB1TTiKdz5PtRc= -github.com/jung-kurt/gofpdf v1.16.2/go.mod h1:1hl7y57EsiPAkLbOwzpzqgx1A30nQCk/YmFV8S2vmK0= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -26,7 +24,9 @@ github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= -github.com/phpdave11/gofpdi v1.0.7/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= +github.com/phpdave11/gofpdf v1.4.2 h1:KPKiIbfwbvC/wOncwhrpRdXVj2CZTCFlw4wnoyjtHfQ= +github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY= +github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/pdf.go b/pdf.go index f8217ba..64a8654 100644 --- a/pdf.go +++ b/pdf.go @@ -16,7 +16,7 @@ import ( "io/ioutil" "os" - "github.com/jung-kurt/gofpdf" + "github.com/phpdave11/gofpdf" "golang.org/x/image/draw" "rescribe.xyz/utils/pkg/hocr" ) -- cgit v1.2.1-24-ge1ad From 0d914a5de3f8169d41df4fcff1ee4aea6d01afbe Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 24 Nov 2020 12:40:54 +0000 Subject: [booktopipeline] Add a check to disallow adding a book that already exists This is important as if a book is added which has already been done, then an analyse job will be added every time a page is OCRed, which will clog up the pipeline with unnecessary work. Also if a book was added with the same name but differently named files, or a different number of pages, the results would almost certainly not be as intended. In the case of a book really wanting to be added with a particular name, either the original directory can be removed on S3, or "v2" or similar can be appended to the book name before calling booktopipeline. --- cmd/booktopipeline/main.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go index 7254d78..b4f4d99 100644 --- a/cmd/booktopipeline/main.go +++ b/cmd/booktopipeline/main.go @@ -102,6 +102,15 @@ func main() { log.Fatalln(err) } + verboselog.Println("Checking that a book hasn't already been uploaded with that name") + list, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + log.Fatalln(err) + } + if len(list) > 0 { + log.Fatalf("Error: There is already a book in S3 named %s", bookname) + } + verboselog.Println("Uploading all images are valid in", bookdir) err = pipeline.UploadImages(bookdir, bookname, conn) if err != nil { -- cgit v1.2.1-24-ge1ad From 38dbdd0b21fb363e3f63fd3ea50272975e98eb77 Mon Sep 17 00:00:00 2001 From: Nick White Date: Thu, 3 Dec 2020 15:13:22 +0000 Subject: Don't upload binarised pdf twice needlessly This can also result in the file being uploaded twice simultaneously, as up() is running in a separate goroutine. This can cause failures on Windows as the file is attempted to be removed by one upload process while being open to upload by the other process. Probably it could also fail if the process completed by one (so the file was deleted) before being started by the other. --- internal/pipeline/pipeline.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 20400ad..13339d7 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -358,11 +358,6 @@ func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Log return } up <- fn - key := bookname + "/" + bookname + ".binarised.pdf" - conn.Log("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, fn) - if err != nil { - } } for _, pg := range colourimgs { -- cgit v1.2.1-24-ge1ad From cbe02a57377787cd34172453a477f68f200448e8 Mon Sep 17 00:00:00 2001 From: Nick White Date: Thu, 3 Dec 2020 15:20:15 +0000 Subject: [rescribe] Fix portability issue where hocrs may not be correctly moved and txt-ified on windows --- cmd/rescribe/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index f4489d8..880bbc2 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -184,7 +184,7 @@ func main() { log.Fatalf("Error removing temporary directory %s: %v", tempdir, err) } - hocrs, err := filepath.Glob(fmt.Sprintf("%s/*hocr", bookname)) + hocrs, err := filepath.Glob(fmt.Sprintf("%s%s*hocr", bookname, string(filepath.Separator))) if err != nil { log.Fatalf("Error looking for .hocr files: %v", err) } -- cgit v1.2.1-24-ge1ad From 4fcbfba65689dc5e8ad46ba467343d3da376d92a Mon Sep 17 00:00:00 2001 From: Nick White Date: Fri, 4 Dec 2020 17:12:59 +0000 Subject: Ensure mkdir will succeed in upload --- local.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/local.go b/local.go index e66f477..0fceca2 100644 --- a/local.go +++ b/local.go @@ -222,7 +222,7 @@ func (a *LocalConn) Download(bucket string, key string, path string) error { // Upload just copies the file from path to TempDir/bucket/key func (a *LocalConn) Upload(bucket string, key string, path string) error { d := filepath.Join(a.TempDir, bucket, filepath.Dir(key)) - err := os.Mkdir(d, 0700) + err := os.MkdirAll(d, 0700) if err != nil && !os.IsExist(err) { return fmt.Errorf("Error creating temporary directory: %v", err) } -- cgit v1.2.1-24-ge1ad From 068ad0b666705a49ab22d7b48cd6a7d67b37f234 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 7 Dec 2020 16:53:58 +0000 Subject: [rescribe] Allow saving of results to somewhere other than a directory named after the book being processed --- cmd/getpipelinebook/main.go | 10 ++++----- cmd/rescribe/main.go | 38 ++++++++++++++++++-------------- internal/pipeline/get.go | 53 +++++++++++++++++++++++++-------------------- 3 files changed, 57 insertions(+), 44 deletions(-) diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go index 5116414..ccedd72 100644 --- a/cmd/getpipelinebook/main.go +++ b/cmd/getpipelinebook/main.go @@ -87,7 +87,7 @@ func main() { if *all { verboselog.Println("Downloading all files for", bookname) - err = pipeline.DownloadAll(bookname, conn) + err = pipeline.DownloadAll(bookname, bookname, conn) if err != nil { log.Fatalln(err) } @@ -122,7 +122,7 @@ func main() { if *pdf { verboselog.Println("Downloading PDFs") - pipeline.DownloadPdfs(bookname, conn) + pipeline.DownloadPdfs(bookname, bookname, conn) } if *binarisedpdf || *colourpdf || *graph || *pdf { @@ -130,19 +130,19 @@ func main() { } verboselog.Println("Downloading best pages") - err = pipeline.DownloadBestPages(bookname, conn, *png) + err = pipeline.DownloadBestPages(bookname, bookname, conn, *png) if err != nil { log.Fatalln(err) } verboselog.Println("Downloading PDFs") - pipeline.DownloadPdfs(bookname, conn) + pipeline.DownloadPdfs(bookname, bookname, conn) if err != nil { log.Fatalln(err) } verboselog.Println("Downloading analyses") - err = pipeline.DownloadAnalyses(bookname, conn) + err = pipeline.DownloadAnalyses(bookname, bookname, conn) if err != nil { log.Fatalln(err) } diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 880bbc2..8414c53 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -29,6 +29,9 @@ import ( const usage = `Usage: rescribe [-v] [-t training] bookdir [savedir] Process and OCR a book using the Rescribe pipeline on a local machine. + +OCR results are saved into the bookdir directory unless savedir is +specified. ` const QueueTimeoutSecs = 2 * 60 @@ -93,17 +96,16 @@ func main() { } flag.Parse() - if flag.NArg() < 1 || flag.NArg() > 3 { + if flag.NArg() < 1 || flag.NArg() > 2 { flag.Usage() return } bookdir := flag.Arg(0) - var bookname string + bookname := filepath.Base(bookdir) + savedir := bookdir if flag.NArg() > 1 { - bookname = flag.Arg(1) - } else { - bookname = filepath.Base(bookdir) + savedir = flag.Arg(1) } var verboselog *log.Logger @@ -172,8 +174,12 @@ func main() { log.Fatalln(err) } - fmt.Printf("Saving finished book to %s\n", bookname) - err = downloadbook(bookname, conn) + fmt.Printf("Saving finished book to %s\n", savedir) + err = os.MkdirAll(savedir, 0755) + if err != nil { + log.Fatalf("Error creating save directory %s: %v", savedir, err) + } + err = downloadbook(savedir, bookname, conn) if err != nil { _ = os.RemoveAll(tempdir) log.Fatalln(err) @@ -184,7 +190,7 @@ func main() { log.Fatalf("Error removing temporary directory %s: %v", tempdir, err) } - hocrs, err := filepath.Glob(fmt.Sprintf("%s%s*hocr", bookname, string(filepath.Separator))) + hocrs, err := filepath.Glob(fmt.Sprintf("%s%s*hocr", savedir, string(filepath.Separator))) if err != nil { log.Fatalf("Error looking for .hocr files: %v", err) } @@ -195,20 +201,20 @@ func main() { log.Fatalf("Error creating txt version of %s: %v", v, err) } - err = os.MkdirAll(filepath.Join(bookname, "hocr"), 0755) + err = os.MkdirAll(filepath.Join(savedir, "hocr"), 0755) if err != nil { log.Fatalf("Error creating hocr directory: %v", err) } - err = os.Rename(v, filepath.Join(bookname, "hocr", filepath.Base(v))) + err = os.Rename(v, filepath.Join(savedir, "hocr", filepath.Base(v))) if err != nil { log.Fatalf("Error moving hocr %s to hocr directory: %v", v, err) } } // For simplicity, remove .binarised.pdf and rename .colour.pdf to .pdf - _ = os.Remove(filepath.Join(bookname, bookname + ".binarised.pdf")) - _ = os.Rename(filepath.Join(bookname, bookname + ".colour.pdf"), filepath.Join(bookname, bookname + ".pdf")) + _ = os.Remove(filepath.Join(savedir, bookname + ".binarised.pdf")) + _ = os.Rename(filepath.Join(savedir, bookname + ".colour.pdf"), filepath.Join(savedir, bookname + ".pdf")) } func addTxtVersion(hocrfn string) error { @@ -257,23 +263,23 @@ func uploadbook(dir string, name string, conn Pipeliner) error { return nil } -func downloadbook(name string, conn Pipeliner) error { +func downloadbook(dir string, name string, conn Pipeliner) error { err := os.MkdirAll(name, 0755) if err != nil { log.Fatalln("Failed to create directory", name, err) } - err = pipeline.DownloadBestPages(name, conn, false) + err = pipeline.DownloadBestPages(dir, name, conn, false) if err != nil { return fmt.Errorf("Error downloading best pages: %v", err) } - err = pipeline.DownloadPdfs(name, conn) + err = pipeline.DownloadPdfs(dir, name, conn) if err != nil { return fmt.Errorf("Error downloading PDFs: %v", err) } - err = pipeline.DownloadAnalyses(name, conn) + err = pipeline.DownloadAnalyses(dir, name, conn) if err != nil { return fmt.Errorf("Error downloading analyses: %v", err) } diff --git a/internal/pipeline/get.go b/internal/pipeline/get.go index 6949062..6c5b92c 100644 --- a/internal/pipeline/get.go +++ b/internal/pipeline/get.go @@ -12,9 +12,10 @@ import ( "strings" ) -func DownloadBestPages(name string, conn Pipeliner, pluspngs bool) error { - fn := filepath.Join(name, "best") - err := conn.Download(conn.WIPStorageId(), fn, fn) +func DownloadBestPages(dir string, name string, conn Pipeliner, pluspngs bool) error { + key := filepath.Join(name, "best") + fn := filepath.Join(dir, "best") + err := conn.Download(conn.WIPStorageId(), key, fn) if err != nil { return fmt.Errorf("Failed to download 'best' file: %v", err) } @@ -26,11 +27,12 @@ func DownloadBestPages(name string, conn Pipeliner, pluspngs bool) error { s := bufio.NewScanner(f) for s.Scan() { - fn = filepath.Join(name, s.Text()) - conn.Log("Downloading file", fn) - err = conn.Download(conn.WIPStorageId(), fn, fn) + key = filepath.Join(name, s.Text()) + fn = filepath.Join(dir, s.Text()) + conn.Log("Downloading file", key) + err = conn.Download(conn.WIPStorageId(), key, fn) if err != nil { - return fmt.Errorf("Failed to download file %s: %v", fn, err) + return fmt.Errorf("Failed to download file %s: %v", key, err) } } @@ -40,49 +42,54 @@ func DownloadBestPages(name string, conn Pipeliner, pluspngs bool) error { s = bufio.NewScanner(f) for s.Scan() { - txtfn := filepath.Join(name, s.Text()) - fn = strings.Replace(txtfn, ".hocr", ".png", 1) - conn.Log("Downloading file", fn) - err = conn.Download(conn.WIPStorageId(), fn, fn) + imgname := strings.Replace(s.Text(), ".hocr", ".png", 1) + key = filepath.Join(name, imgname) + fn = filepath.Join(dir, imgname) + conn.Log("Downloading file", key) + err = conn.Download(conn.WIPStorageId(), key, fn) if err != nil { - return fmt.Errorf("Failed to download file", fn, err) + return fmt.Errorf("Failed to download file %s: %v", key, err) } } return nil } -func DownloadPdfs(name string, conn Pipeliner) error { +func DownloadPdfs(dir string, name string, conn Pipeliner) error { for _, suffix := range []string{".colour.pdf", ".binarised.pdf"} { - fn := filepath.Join(name, name+suffix) - err := conn.Download(conn.WIPStorageId(), fn, fn) + key := filepath.Join(name, name+suffix) + fn := filepath.Join(dir, name+suffix) + err := conn.Download(conn.WIPStorageId(), key, fn) if err != nil { - return fmt.Errorf("Failed to download PDF %s: %v", fn, err) + return fmt.Errorf("Failed to download PDF %s: %v", key, err) } } return nil } -func DownloadAnalyses(name string, conn Pipeliner) error { +func DownloadAnalyses(dir string, name string, conn Pipeliner) error { for _, a := range []string{"conf", "graph.png"} { - fn := filepath.Join(name, a) - err := conn.Download(conn.WIPStorageId(), fn, fn) + key := filepath.Join(name, a) + fn := filepath.Join(dir, a) + err := conn.Download(conn.WIPStorageId(), key, fn) if err != nil { - return fmt.Errorf("Failed to download analysis file %s: %v", fn, err) + return fmt.Errorf("Failed to download analysis file %s: %v", key, err) } } return nil } -func DownloadAll(name string, conn Pipeliner) error { +func DownloadAll(dir string, name string, conn Pipeliner) error { objs, err := conn.ListObjects(conn.WIPStorageId(), name) if err != nil { return fmt.Errorf("Failed to get list of files for book", name, err) } for _, i := range objs { + base := filepath.Base(i) + fn := filepath.Join(dir, base) conn.Log("Downloading", i) - err = conn.Download(conn.WIPStorageId(), i, i) + err = conn.Download(conn.WIPStorageId(), i, fn) if err != nil { - return fmt.Errorf("Failed to download file", i, err) + return fmt.Errorf("Failed to download file %s: %v", i, err) } } return nil -- cgit v1.2.1-24-ge1ad From 17b2d91d5f323fd985ca012e50d36908cbceba87 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 7 Dec 2020 17:04:12 +0000 Subject: [rescribe] Fix up *.hocr glob, which ensures that using a savedir that already has a hocr directory in it will work --- cmd/rescribe/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 8414c53..07eeaf0 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -190,7 +190,7 @@ func main() { log.Fatalf("Error removing temporary directory %s: %v", tempdir, err) } - hocrs, err := filepath.Glob(fmt.Sprintf("%s%s*hocr", savedir, string(filepath.Separator))) + hocrs, err := filepath.Glob(fmt.Sprintf("%s%s*.hocr", savedir, string(filepath.Separator))) if err != nil { log.Fatalf("Error looking for .hocr files: %v", err) } -- cgit v1.2.1-24-ge1ad From 452d3d2b6023eb245cb0586954a5fb510df567e1 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 14 Dec 2020 11:15:39 +0000 Subject: Update preproc module used to incorporate an important crash fix --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 666e3f4..b12606d 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,6 @@ require ( golang.org/x/text v0.3.2 // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/yaml.v2 v2.2.8 // indirect - rescribe.xyz/preproc v0.4.0 + rescribe.xyz/preproc v0.4.1 rescribe.xyz/utils v0.1.3 ) diff --git a/go.sum b/go.sum index d77f80b..2e19f05 100644 --- a/go.sum +++ b/go.sum @@ -67,7 +67,7 @@ gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= rescribe.xyz/integral v0.6.0 h1:CLF3sQ6th/OuG+/rp/lLR+AGOT4R7tG3IiUjSLKsriw= rescribe.xyz/integral v0.6.0/go.mod h1:gKJq4UaVn17RsMsUasEMcJDkTkwqeb6AzPIJtwcUipg= -rescribe.xyz/preproc v0.4.0 h1:HyR1R/e9hDuCdZouUgaojq0YSfJyWo87y31xAuhCdHE= -rescribe.xyz/preproc v0.4.0/go.mod h1:Yh3wyeoKK+pu50mNrYUN/zuUbRO0kxifIr9DeE3MZvY= +rescribe.xyz/preproc v0.4.1 h1:QAA9rBxVbFq1JH+uhRQeE507LxDl0uhgeSFTXCl/7rM= +rescribe.xyz/preproc v0.4.1/go.mod h1:Yh3wyeoKK+pu50mNrYUN/zuUbRO0kxifIr9DeE3MZvY= rescribe.xyz/utils v0.1.3 h1:2rlHbUjAGXy/xgtmUb6Y7Kbpxl3qkwtWzkFUQ/cOaIA= rescribe.xyz/utils v0.1.3/go.mod h1:4L2vClYUFklsXggN0CUyP/alcgzLNRT0dMpMfEiVbX8= -- cgit v1.2.1-24-ge1ad From 9147e57a3a634ad303e8f1e7c456988996d5c75b Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 14 Dec 2020 17:08:14 +0000 Subject: Add rmbook tool --- aws.go | 17 ++++++++++++ cmd/rmbook/main.go | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 cmd/rmbook/main.go diff --git a/aws.go b/aws.go index 6b707fe..40c452d 100644 --- a/aws.go +++ b/aws.go @@ -379,6 +379,23 @@ func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) { return prefixes, err } +// Deletes a list of objects +func (a *AwsConn) DeleteObjects(bucket string, keys []string) error { + objs := []*s3.ObjectIdentifier{} + for _, v := range keys { + o := s3.ObjectIdentifier{Key: aws.String(v)} + objs = append(objs, &o) + } + _, err := a.s3svc.DeleteObjects(&s3.DeleteObjectsInput{ + Bucket: aws.String(bucket), + Delete: &s3.Delete{ + Objects: objs, + Quiet: aws.Bool(true), + }, + }) + return err +} + // CreateBucket creates a new S3 bucket func (a *AwsConn) CreateBucket(name string) error { _, err := a.s3svc.CreateBucket(&s3.CreateBucketInput{ diff --git a/cmd/rmbook/main.go b/cmd/rmbook/main.go new file mode 100644 index 0000000..8d434a9 --- /dev/null +++ b/cmd/rmbook/main.go @@ -0,0 +1,78 @@ +// Copyright 2020 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// rmbook removes a book from cloud storage. +package main + +import ( + "flag" + "fmt" + "log" + + "rescribe.xyz/bookpipeline" +) + +const usage = `Usage: rmbook bookname + +Removes a book from cloud storage. +` + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} + +type RmPipeliner interface { + MinimalInit() error + WIPStorageId() string + DeleteObjects(bucket string, keys []string) error + ListObjects(bucket string, prefix string) ([]string, error) +} + +func main() { + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + if flag.NArg() < 1 { + flag.Usage() + return + } + + var n NullWriter + verboselog := log.New(n, "", log.LstdFlags) + + var conn RmPipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + + fmt.Println("Setting up cloud connection") + err := conn.MinimalInit() + if err != nil { + log.Fatalln("Error setting up cloud connection:", err) + } + + bookname := flag.Arg(0) + + fmt.Println("Getting list of files for book") + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + log.Fatalln("Error in listing book items:", err) + } + + if len(objs) == 0 { + log.Fatalln("No files found for book:", bookname) + } + + fmt.Println("Deleting all files for book") + err = conn.DeleteObjects(conn.WIPStorageId(), objs) + if err != nil { + log.Fatalln("Error deleting book files:", err) + } + + fmt.Println("Finished deleting files") +} -- cgit v1.2.1-24-ge1ad From fb1069b504e8cd37b9a2bcdccefa9699d0e1dee9 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 15 Dec 2020 12:37:43 +0000 Subject: [rmbook] Add -dryrun flag --- cmd/rmbook/main.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/cmd/rmbook/main.go b/cmd/rmbook/main.go index 8d434a9..c195d85 100644 --- a/cmd/rmbook/main.go +++ b/cmd/rmbook/main.go @@ -13,7 +13,7 @@ import ( "rescribe.xyz/bookpipeline" ) -const usage = `Usage: rmbook bookname +const usage = `Usage: rmbook [-dryrun] bookname Removes a book from cloud storage. ` @@ -33,6 +33,7 @@ type RmPipeliner interface { } func main() { + dryrun := flag.Bool("dryrun", false, "print which files would be deleted but don't delete") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) flag.PrintDefaults() @@ -68,6 +69,14 @@ func main() { log.Fatalln("No files found for book:", bookname) } + if *dryrun { + fmt.Printf("I would delete these files:\n") + for _, v := range objs { + fmt.Println(v) + } + return + } + fmt.Println("Deleting all files for book") err = conn.DeleteObjects(conn.WIPStorageId(), objs) if err != nil { -- cgit v1.2.1-24-ge1ad From 7d9f77c2f1102aec026d1af78d1fe4725ed76674 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 15 Dec 2020 12:38:36 +0000 Subject: [rmbook] Append / to end of bookname, to ensure e.g. "1" doesnt match all books starting with "1" --- cmd/rmbook/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/rmbook/main.go b/cmd/rmbook/main.go index c195d85..fcacc2e 100644 --- a/cmd/rmbook/main.go +++ b/cmd/rmbook/main.go @@ -57,7 +57,7 @@ func main() { log.Fatalln("Error setting up cloud connection:", err) } - bookname := flag.Arg(0) + bookname := flag.Arg(0) + "/" fmt.Println("Getting list of files for book") objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) -- cgit v1.2.1-24-ge1ad From 670d5c1b74f2fa4683bfe7e2d9b1baee14db9104 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 26 Jan 2021 10:54:19 +0000 Subject: Stop limiting keys returned from listobjectprefixes' api usage; this speeds up the request markedly --- aws.go | 1 - 1 file changed, 1 deletion(-) diff --git a/aws.go b/aws.go index 40c452d..57aadd3 100644 --- a/aws.go +++ b/aws.go @@ -369,7 +369,6 @@ func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) { err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ Bucket: aws.String(bucket), Delimiter: aws.String("/"), - MaxKeys: aws.Int64(1), }, func(page *s3.ListObjectsV2Output, last bool) bool { for _, r := range page.CommonPrefixes { prefixes = append(prefixes, *r.Prefix) -- cgit v1.2.1-24-ge1ad From 86cc5d6c921ac05e0d08f66b205b51e1f5adb938 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 26 Jan 2021 13:50:02 +0000 Subject: Speed up lspipeline by making s3 requests concurrently and only processing single results from ListObjects requests --- aws.go | 3 +- cmd/lspipeline/main.go | 95 ++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 79 insertions(+), 19 deletions(-) diff --git a/aws.go b/aws.go index 57aadd3..dd74a01 100644 --- a/aws.go +++ b/aws.go @@ -355,11 +355,12 @@ func (a *AwsConn) ListObjectsWithMeta(bucket string, prefix string) ([]ObjMeta, err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ Bucket: aws.String(bucket), Prefix: aws.String(prefix), + MaxKeys: aws.Int64(1), }, func(page *s3.ListObjectsV2Output, last bool) bool { for _, r := range page.Contents { objs = append(objs, ObjMeta{Name: *r.Key, Date: *r.LastModified}) } - return true + return false // only process the first page as that's all we need }) return objs, err } diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index b649778..bf19b52 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -12,6 +12,8 @@ import ( "os/exec" "sort" "strings" + "sync" + "time" "rescribe.xyz/bookpipeline" ) @@ -100,6 +102,46 @@ func (o ObjMetas) Less(i, j int) bool { return o[i].Date.Before(o[j].Date) } +// getBookDetails determines whether a book is done and what date +// it was completed, or if it has not finished, the date of any +// book file. +func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, err error) { + // First try to get the graph.png file from the book, which marks + // it as done + objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), key+"graph.png") + if err == nil && len(objs) > 0 { + return objs[0].Date, true, nil + } + + // Otherwise get any file from the book to get a date to sort by + objs, err = conn.ListObjectsWithMeta(conn.WIPStorageId(), key) + if err != nil { + return time.Time{}, false, err + } + if len(objs) == 0 { + return time.Time{}, false, fmt.Errorf("No files found for book %s", key) + } + return objs[0].Date, false, nil +} + +// getBookDetailsWg gets the details for a book putting it into either the +// done or inprogress channels as appropriate, and using a sync.WaitGroup Done +// signal so it can be tracked. On error it sends to the errc channel. +func getBookDetailsWg(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error, wg *sync.WaitGroup) { + defer wg.Done() + date, isdone, err := getBookDetails(conn, key) + if err != nil { + errc <- err + return + } + meta := bookpipeline.ObjMeta{Name: strings.TrimSuffix(key, "/"), Date: date} + if isdone { + done <- meta + } else { + inprogress <- meta + } +} + // getBookStatus returns a list of in progress and done books. // It determines this by finding all prefixes, and splitting them // into two lists, those which have a 'graph.png' file (the done @@ -108,35 +150,52 @@ func (o ObjMetas) Less(i, j int) bool { // of a random file with the prefix if no graph.png was found. func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) { prefixes, err := conn.ListObjectPrefixes(conn.WIPStorageId()) - var inprogressmeta, donemeta ObjMetas if err != nil { log.Println("Error getting object prefixes:", err) return } - // Search for graph.png to determine done books (and save the date of it to sort with) + + // 100,000 size buffer is to ensure we never block, as we're using waitgroup + // rather than channel blocking to determine when to continue. Probably there + // is a better way to do this, though, like reading the done and inprogress + // channels in a goroutine and doing wg.Done() when each is read there instead. + donec := make(chan bookpipeline.ObjMeta, 100000) + inprogressc := make(chan bookpipeline.ObjMeta, 100000) + errc := make(chan error, 100000) + + var wg sync.WaitGroup for _, p := range prefixes { - objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), p+"graph.png") - if err != nil || len(objs) == 0 { - inprogressmeta = append(inprogressmeta, bookpipeline.ObjMeta{Name: p}) - } else { - donemeta = append(donemeta, bookpipeline.ObjMeta{Name: p, Date: objs[0].Date}) - } + wg.Add(1) + go getBookDetailsWg(conn, p, donec, inprogressc, errc, &wg) } - // Get a random file from the inprogress list to get a date to sort by - for _, i := range inprogressmeta { - objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), i.Name) - if err != nil || len(objs) == 0 { - continue - } - i.Date = objs[0].Date + wg.Wait() + close(donec) + close(inprogressc) + + select { + case err = <-errc: + return inprogress, done, err + default: + break + } + + var inprogressmeta, donemeta ObjMetas + + for i := range donec { + donemeta = append(donemeta, i) + } + for i := range inprogressc { + inprogressmeta = append(inprogressmeta, i) } + sort.Sort(donemeta) + sort.Sort(inprogressmeta) + for _, i := range donemeta { - done = append(done, strings.TrimSuffix(i.Name, "/")) + done = append(done, i.Name) } - sort.Sort(inprogressmeta) for _, i := range inprogressmeta { - inprogress = append(inprogress, strings.TrimSuffix(i.Name, "/")) + inprogress = append(inprogress, i.Name) } return -- cgit v1.2.1-24-ge1ad From 54150b54cd06e3deba44e73b151070b74a4d8e76 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 26 Jan 2021 14:17:19 +0000 Subject: Improve lspipeline concurrency by removing WaitGroup stuff --- cmd/lspipeline/main.go | 52 ++++++++++++++++++++------------------------------ 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index bf19b52..8980c59 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -12,7 +12,6 @@ import ( "os/exec" "sort" "strings" - "sync" "time" "rescribe.xyz/bookpipeline" @@ -124,11 +123,10 @@ func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, er return objs[0].Date, false, nil } -// getBookDetailsWg gets the details for a book putting it into either the -// done or inprogress channels as appropriate, and using a sync.WaitGroup Done -// signal so it can be tracked. On error it sends to the errc channel. -func getBookDetailsWg(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error, wg *sync.WaitGroup) { - defer wg.Done() +// getBookDetailsChan gets the details for a book putting it into either the +// done or inprogress channels as appropriate, or sending an error to errc +// on failure. +func getBookDetailsChan(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error) { date, isdone, err := getBookDetails(conn, key) if err != nil { errc <- err @@ -148,6 +146,8 @@ func getBookDetailsWg(conn LsPipeliner, key string, done chan bookpipeline.ObjMe // list), and those which do not (the inprogress list). They are // sorted according to the date of the graph.png file, or the date // of a random file with the prefix if no graph.png was found. +// It spins up many goroutines to do query the book status and +// dates, as it is far faster to do concurrently. func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) { prefixes, err := conn.ListObjectPrefixes(conn.WIPStorageId()) if err != nil { @@ -155,37 +155,27 @@ func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err er return } - // 100,000 size buffer is to ensure we never block, as we're using waitgroup - // rather than channel blocking to determine when to continue. Probably there - // is a better way to do this, though, like reading the done and inprogress - // channels in a goroutine and doing wg.Done() when each is read there instead. - donec := make(chan bookpipeline.ObjMeta, 100000) - inprogressc := make(chan bookpipeline.ObjMeta, 100000) - errc := make(chan error, 100000) + donec := make(chan bookpipeline.ObjMeta, 100) + inprogressc := make(chan bookpipeline.ObjMeta, 100) + errc := make(chan error) - var wg sync.WaitGroup for _, p := range prefixes { - wg.Add(1) - go getBookDetailsWg(conn, p, donec, inprogressc, errc, &wg) - } - wg.Wait() - close(donec) - close(inprogressc) - - select { - case err = <-errc: - return inprogress, done, err - default: - break + go getBookDetailsChan(conn, p, donec, inprogressc, errc) } var inprogressmeta, donemeta ObjMetas - for i := range donec { - donemeta = append(donemeta, i) - } - for i := range inprogressc { - inprogressmeta = append(inprogressmeta, i) + // there will be exactly as many sends to donec or inprogressc + // as there are prefixes + for range prefixes { + select { + case i := <-donec: + donemeta = append(donemeta, i) + case i := <-inprogressc: + inprogressmeta = append(inprogressmeta, i) + case err = <-errc: + return inprogress, done, err + } } sort.Sort(donemeta) -- cgit v1.2.1-24-ge1ad From 5c3cee66a90ce6ef87e125b3bf011a6903d38083 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 26 Jan 2021 14:56:10 +0000 Subject: Make ListObjectsWithMeta generic again and create a specialised ListObjectWithMeta for single file listing, so we can still be as fast, but do not have a misleading api --- aws.go | 23 +++++++++++++++++++++-- cmd/lspipeline/main.go | 15 ++++++--------- local.go | 11 +++++++++++ 3 files changed, 38 insertions(+), 11 deletions(-) diff --git a/aws.go b/aws.go index dd74a01..035b08a 100644 --- a/aws.go +++ b/aws.go @@ -355,16 +355,35 @@ func (a *AwsConn) ListObjectsWithMeta(bucket string, prefix string) ([]ObjMeta, err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ Bucket: aws.String(bucket), Prefix: aws.String(prefix), - MaxKeys: aws.Int64(1), }, func(page *s3.ListObjectsV2Output, last bool) bool { for _, r := range page.Contents { objs = append(objs, ObjMeta{Name: *r.Key, Date: *r.LastModified}) } - return false // only process the first page as that's all we need + return true }) return objs, err } +// ListObjectWithMeta lists the name and last modified date of the +// first object with the specified prefix. +func (a *AwsConn) ListObjectWithMeta(bucket string, prefix string) (ObjMeta, error) { + var obj ObjMeta + err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + Prefix: aws.String(prefix), + MaxKeys: aws.Int64(1), + }, func(page *s3.ListObjectsV2Output, last bool) bool { + for _, r := range page.Contents { + obj = ObjMeta{Name: *r.Key, Date: *r.LastModified} + } + return false + }) + if obj.Name == "" && obj.Date.IsZero() && err == nil { + return obj, fmt.Errorf("No object could be found for %s", prefix) + } + return obj, err +} + func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) { var prefixes []string err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index 8980c59..131ff12 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -36,7 +36,7 @@ type LsPipeliner interface { AnalyseQueueId() string GetQueueDetails(url string) (string, string, error) GetInstanceDetails() ([]bookpipeline.InstanceDetails, error) - ListObjectsWithMeta(bucket string, prefix string) ([]bookpipeline.ObjMeta, error) + ListObjectWithMeta(bucket string, prefix string) (bookpipeline.ObjMeta, error) ListObjectPrefixes(bucket string) ([]string, error) WIPStorageId() string } @@ -107,20 +107,17 @@ func (o ObjMetas) Less(i, j int) bool { func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, err error) { // First try to get the graph.png file from the book, which marks // it as done - objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), key+"graph.png") - if err == nil && len(objs) > 0 { - return objs[0].Date, true, nil + obj, err := conn.ListObjectWithMeta(conn.WIPStorageId(), key+"graph.png") + if err == nil { + return obj.Date, true, nil } // Otherwise get any file from the book to get a date to sort by - objs, err = conn.ListObjectsWithMeta(conn.WIPStorageId(), key) + obj, err = conn.ListObjectWithMeta(conn.WIPStorageId(), key) if err != nil { return time.Time{}, false, err } - if len(objs) == 0 { - return time.Time{}, false, fmt.Errorf("No files found for book %s", key) - } - return objs[0].Date, false, nil + return obj.Date, false, nil } // getBookDetailsChan gets the details for a book putting it into either the diff --git a/local.go b/local.go index 0fceca2..31e44a9 100644 --- a/local.go +++ b/local.go @@ -159,6 +159,17 @@ func (a *LocalConn) ListObjectsWithMeta(bucket string, prefix string) ([]ObjMeta return list, err } +func (a *LocalConn) ListObjectWithMeta(bucket string, prefix string) (ObjMeta, error) { + list, err := a.ListObjectsWithMeta(bucket, prefix) + if err != nil { + return ObjMeta{}, err + } + if len(list) == 0 { + return ObjMeta{}, fmt.Errorf("No object found for %s", prefix) + } + return list[0], nil +} + // AddToQueue adds a message to a queue func (a *LocalConn) AddToQueue(url string, msg string) error { f, err := os.OpenFile(filepath.Join(a.TempDir, url), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) -- cgit v1.2.1-24-ge1ad From 16ea8034794ef030c969d586a7fc945bf4a2873a Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 1 Feb 2021 11:45:27 +0000 Subject: Ensure DeleteObjects can handle over 1000 files to delete; fixes rmbook for large books --- aws.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/aws.go b/aws.go index 035b08a..65671fa 100644 --- a/aws.go +++ b/aws.go @@ -401,9 +401,25 @@ func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) { // Deletes a list of objects func (a *AwsConn) DeleteObjects(bucket string, keys []string) error { objs := []*s3.ObjectIdentifier{} - for _, v := range keys { + for i, v := range keys { o := s3.ObjectIdentifier{Key: aws.String(v)} objs = append(objs, &o) + // s3.DeleteObjects can only take up to 1000 keys at a time, + // so if necessary delete those collected so far and empty + // the objs queue + if i % 1000 == 1 { + _, err := a.s3svc.DeleteObjects(&s3.DeleteObjectsInput{ + Bucket: aws.String(bucket), + Delete: &s3.Delete{ + Objects: objs, + Quiet: aws.Bool(true), + }, + }) + if err != nil { + return err + } + objs = []*s3.ObjectIdentifier{} + } } _, err := a.s3svc.DeleteObjects(&s3.DeleteObjectsInput{ Bucket: aws.String(bucket), -- cgit v1.2.1-24-ge1ad From a8c7481f0dc02bbda3b3a07091e9d61f6eb728b2 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 1 Feb 2021 11:46:21 +0000 Subject: Update AWS dependency to 1.37.1 --- go.mod | 5 +---- go.sum | 30 ++++++++++++++---------------- 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/go.mod b/go.mod index b12606d..dca73fc 100644 --- a/go.mod +++ b/go.mod @@ -3,17 +3,14 @@ module rescribe.xyz/bookpipeline go 1.14 require ( - github.com/aws/aws-sdk-go v1.30.5 + github.com/aws/aws-sdk-go v1.37.1 github.com/blend/go-sdk v2.0.0+incompatible // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/phpdave11/gofpdf v1.4.2 github.com/wcharczuk/go-chart v2.0.2-0.20191206192251-962b9abdec2b+incompatible golang.org/x/image v0.0.0-20200618115811-c13761719519 - golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect - golang.org/x/text v0.3.2 // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect - gopkg.in/yaml.v2 v2.2.8 // indirect rescribe.xyz/preproc v0.4.1 rescribe.xyz/utils v0.1.3 ) diff --git a/go.sum b/go.sum index 2e19f05..837c09d 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/DataDog/datadog-go v0.0.0-20180822151419-281ae9f2d895/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/airbrake/gobrake v3.6.1+incompatible/go.mod h1:wM4gu3Cn0W0K7GUuVWnlXZU11AGBXMILnrdOU8Kn00o= -github.com/aws/aws-sdk-go v1.30.5 h1:i+sSesaMrSxiUt3NJddOApe2mXK+VNBgfcmRTvNFrXM= -github.com/aws/aws-sdk-go v1.30.5/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/aws/aws-sdk-go v1.37.1 h1:BTHmuN+gzhxkvU9sac2tZvaY0gV9ihbHw+KxZOecYvY= +github.com/aws/aws-sdk-go v1.37.1/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/blend/go-sdk v1.1.1/go.mod h1:IP1XHXFveOXHRnojRJO7XvqWGqyzevtXND9AdSztAe8= github.com/blend/go-sdk v2.0.0+incompatible h1:FL9X/of4ZYO5D2JJNI4vHrbXPfuSDbUa7h8JP9+E92w= github.com/blend/go-sdk v2.0.0+incompatible/go.mod h1:3GUb0YsHFNTJ6hsJTpzdmCUl05o8HisKjx5OAlzYKdw= @@ -11,11 +11,12 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= -github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= -github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= @@ -35,34 +36,31 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/wcharczuk/go-chart v2.0.2-0.20191206192251-962b9abdec2b+incompatible h1:ahpaSRefPekV3gcXot2AOgngIV8WYqzvDyFe3i7W24w= github.com/wcharczuk/go-chart v2.0.2-0.20191206192251-962b9abdec2b+incompatible/go.mod h1:PF5tmL4EIx/7Wf+hEkpCqYi5He4u90sw+0+6FhrryuE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/image v0.0.0-20190910094157-69e4b8554b2a/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/image v0.0.0-20200618115811-c13761719519 h1:1e2ufUJNM3lCHEY5jIgac/7UTjd6cgJNdatjPdFWf34= golang.org/x/image v0.0.0-20200618115811-c13761719519/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= -golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e h1:3G+cUijn7XD+S4eJFddp53Pv7+slrESplyjG25HgL+k= -golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181205014116-22934f0fdb62/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= rescribe.xyz/integral v0.6.0 h1:CLF3sQ6th/OuG+/rp/lLR+AGOT4R7tG3IiUjSLKsriw= -- cgit v1.2.1-24-ge1ad