From 34b5735503edb9c5ab635c84cd356f19df7d7381 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 12:40:57 +0000 Subject: Set hocr config options directly rather than relying on 'hocr' config file This ensures that bookpipeline will still work even if TESSDATA_PREFIX has been set to a directory without configs in it. --- cmd/bookpipeline/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'cmd') diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 36295a6..b3ffc53 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -216,7 +216,7 @@ func ocr(training string) func(chan string, chan string, chan error, *log.Logger for path := range toocr { logger.Println("OCRing", path) name := strings.Replace(path, ".png", "", 1) - cmd := exec.Command("tesseract", "-l", training, path, name, "hocr") + cmd := exec.Command("tesseract", "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr -- cgit v1.2.1-24-ge1ad From c36597e65956383ec830f04e56f54158ae839a72 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 15:19:55 +0000 Subject: [bookpipeline] Improve interface, particularly for local use, by disabling (failing) log saving, mail sending, and removing erroneous references to AWS --- cmd/bookpipeline/main.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) (limited to 'cmd') diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index b3ffc53..7a7a277 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -801,17 +801,20 @@ func main() { log.Fatalln("Unknown connection type") } - _, err := getMailSettings() - if err != nil { - conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err) + var err error + if *conntype != "local" { + _, err = getMailSettings() + if err != nil { + conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err) + } } - conn.Log("Setting up AWS session") + conn.Log("Setting up session") err = conn.Init() if err != nil { - log.Fatalln("Error setting up cloud connection:", err) + log.Fatalln("Error setting up connection:", err) } - conn.Log("Finished setting up AWS session") + conn.Log("Finished setting up session") starttime := time.Now().Unix() hostname, err := os.Hostname() @@ -836,6 +839,9 @@ func main() { } shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown) savelognow = time.NewTicker(LogSaveTime) + if *conntype == "local" { + savelognow.Stop() + } for { select { -- cgit v1.2.1-24-ge1ad From fa1593d4b9b5f5e2b9f46137739557e29f65c765 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 15:20:09 +0000 Subject: Add -autostop, so time to shutdown can be specified, and so the process can just be stopped after a period, rather than the whole computer shut down --- cmd/bookpipeline/main.go | 46 +++++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 15 deletions(-) (limited to 'cmd') diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 7a7a277..5a6606d 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -27,7 +27,7 @@ import ( "rescribe.xyz/utils/pkg/hocr" ) -const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] +const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] [-autostop secs] Watches the preprocess, wipeonly, ocrpage and analyse queues for messages. When one is found this general process is followed: @@ -48,7 +48,6 @@ the contents: {smtpserver} {port} {username} {password} {from} {to} ` const PauseBetweenChecks = 3 * time.Minute -const TimeBeforeShutdown = 5 * time.Minute const LogSaveTime = 1 * time.Minute const HeartbeatSeconds = 60 @@ -719,6 +718,12 @@ func stopTimer(t *time.Timer) { } } +func resetTimer(t *time.Timer, d time.Duration) { + if d > 0 { + t.Reset(d) + } +} + // TODO: rather than relying on journald, would be nicer to save the logs // ourselves maybe, so that we weren't relying on a particular systemd // setup. this can be done by having the conn.Log also append line @@ -770,7 +775,8 @@ func main() { nowipe := flag.Bool("nw", false, "disable wipeonly") noocrpg := flag.Bool("nop", false, "disable ocr on individual pages") noanalyse := flag.Bool("na", false, "disable analysis") - autoshutdown := flag.Bool("shutdown", false, "automatically shut down if no work has been available for 5 minutes") + autostop := flag.Int64("autostop", 300, "automatically stop process if no work has been available for this number of seconds (to disable autostop set to 0)") + autoshutdown := flag.Bool("shutdown", false, "automatically shut down host computer if there has been no work to do for the duration set with -autostop") conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") flag.Usage = func() { @@ -823,7 +829,7 @@ func main() { var checkWipeQueue <-chan time.Time var checkOCRPageQueue <-chan time.Time var checkAnalyseQueue <-chan time.Time - var shutdownIfQuiet *time.Timer + var stopIfQuiet *time.Timer var savelognow *time.Ticker if !*nopreproc { checkPreQueue = time.After(0) @@ -837,7 +843,12 @@ func main() { if !*noanalyse { checkAnalyseQueue = time.After(0) } - shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown) + var quietTime = time.Duration(*autostop) * time.Second + stopIfQuiet = time.NewTimer(quietTime) + if quietTime == 0 { + stopIfQuiet.Stop() + } + savelognow = time.NewTicker(LogSaveTime) if *conntype == "local" { savelognow.Stop() @@ -857,9 +868,9 @@ func main() { continue } conn.Log("Message received on preprocess queue, processing", msg.Body) - stopTimer(shutdownIfQuiet) + stopTimer(stopIfQuiet) err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) - shutdownIfQuiet.Reset(TimeBeforeShutdown) + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during preprocess", err) } @@ -874,10 +885,10 @@ func main() { conn.Log("No message received on wipeonly queue, sleeping") continue } - stopTimer(shutdownIfQuiet) + stopTimer(stopIfQuiet) conn.Log("Message received on wipeonly queue, processing", msg.Body) err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) - shutdownIfQuiet.Reset(TimeBeforeShutdown) + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during wipe", err) } @@ -894,10 +905,10 @@ func main() { // Have OCRPageQueue checked immediately after completion, as chances are high that // there will be more pages that should be done without delay checkOCRPageQueue = time.After(0) - stopTimer(shutdownIfQuiet) + stopTimer(stopIfQuiet) conn.Log("Message received on OCR Page queue, processing", msg.Body) err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) - shutdownIfQuiet.Reset(TimeBeforeShutdown) + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during OCR Page process", err) } @@ -912,10 +923,10 @@ func main() { conn.Log("No message received on analyse queue, sleeping") continue } - stopTimer(shutdownIfQuiet) + stopTimer(stopIfQuiet) conn.Log("Message received on analyse queue, processing", msg.Body) err = processBook(msg, conn, analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") - shutdownIfQuiet.Reset(TimeBeforeShutdown) + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during analysis", err) } @@ -925,10 +936,15 @@ func main() { if err != nil { conn.Log("Error saving logs", err) } - case <-shutdownIfQuiet.C: - if !*autoshutdown { + case <-stopIfQuiet.C: + if quietTime == 0 { continue } + if !*autoshutdown { + conn.Log("Stopping pipeline") + _ = savelogs(conn, starttime, hostname) + return + } conn.Log("Shutting down") _ = savelogs(conn, starttime, hostname) cmd := exec.Command("sudo", "systemctl", "poweroff") -- cgit v1.2.1-24-ge1ad From 4c7cdeb5646e84af3f15d4a7cd48f64d8086a6b9 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 16:46:43 +0000 Subject: [bookpipeline] Split most functionality out to package internal/pipeline No functionality changes, but this should make it easier to make custom builds using the pipeline in slightly different ways. --- cmd/bookpipeline/main.go | 711 +---------------------------------------------- 1 file changed, 15 insertions(+), 696 deletions(-) (limited to 'cmd') diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 5a6606d..aff7b87 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -11,20 +11,15 @@ import ( "bytes" "flag" "fmt" - "io/ioutil" "log" - "net/smtp" "os" "os/exec" - "path/filepath" "regexp" - "sort" - "strings" "time" "rescribe.xyz/bookpipeline" - "rescribe.xyz/preproc" - "rescribe.xyz/utils/pkg/hocr" + + "rescribe.xyz/bookpipeline/internal/pipeline" ) const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] [-autostop secs] @@ -47,9 +42,9 @@ this put a text file in {UserConfigDir}/bookpipeline/mailsettings with the contents: {smtpserver} {port} {username} {password} {from} {to} ` +const QueueTimeoutSecs = 2 * 60 const PauseBetweenChecks = 3 * time.Minute const LogSaveTime = 1 * time.Minute -const HeartbeatSeconds = 60 // null writer to enable non-verbose logging to be discarded type NullWriter bool @@ -80,638 +75,6 @@ type Pipeliner interface { Log(v ...interface{}) } -type pageimg struct { - hocr, img string -} - -type mailSettings struct { - server, port, user, pass, from, to string -} - -func getMailSettings() (mailSettings, error) { - p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings") - b, err := ioutil.ReadFile(p) - if err != nil { - return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err) - } - f := strings.Fields(string(b)) - if len(f) != 6 { - return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f)) - } - return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil -} - -func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { - for key := range dl { - fn := filepath.Join(dir, filepath.Base(key)) - logger.Println("Downloading", key) - err := conn.Download(conn.WIPStorageId(), key, fn) - if err != nil { - for range dl { - } // consume the rest of the receiving channel so it isn't blocked - close(process) - errc <- err - return - } - process <- fn - } - close(process) -} - -func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { - for path := range c { - name := filepath.Base(path) - key := bookname + "/" + name - logger.Println("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - err = os.Remove(path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - } - - done <- true -} - -func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { - for path := range c { - name := filepath.Base(path) - key := bookname + "/" + name - logger.Println("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - err = os.Remove(path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - logger.Println("Adding", key, training, "to queue", toQueue) - err = conn.AddToQueue(toQueue, key+" "+training) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - } - - done <- true -} - -func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range pre { - logger.Println("Preprocessing", path) - done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30) - if err != nil { - for range pre { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - _ = os.Remove(path) - for _, p := range done { - up <- p - } - } - close(up) -} - -func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range towipe { - logger.Println("Wiping", path) - s := strings.Split(path, ".") - base := strings.Join(s[:len(s)-1], "") - outpath := base + "_bin0.0.png" - err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30) - if err != nil { - for range towipe { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - up <- outpath - } - close(up) -} - -func ocr(training string) func(chan string, chan string, chan error, *log.Logger) { - return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range toocr { - logger.Println("OCRing", path) - name := strings.Replace(path, ".png", "", 1) - cmd := exec.Command("tesseract", "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - err := cmd.Run() - if err != nil { - for range toocr { - } // consume the rest of the receiving channel so it isn't blocked - errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String()) - return - } - up <- name + ".hocr" - } - close(up) - } -} - -func analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { - return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { - confs := make(map[string][]*bookpipeline.Conf) - bestconfs := make(map[string]*bookpipeline.Conf) - savedir := "" - - for path := range toanalyse { - if savedir == "" { - savedir = filepath.Dir(path) - } - logger.Println("Calculating confidence for", path) - avg, err := hocr.GetAvgConf(path) - if err != nil && err.Error() == "No words found" { - continue - } - if err != nil { - for range toanalyse { - } // consume the rest of the receiving channel so it isn't blocked - errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err) - return - } - base := filepath.Base(path) - codestart := strings.Index(base, "_bin") - name := base[0:codestart] - var c bookpipeline.Conf - c.Path = path - c.Code = base[codestart:] - c.Conf = avg - confs[name] = append(confs[name], &c) - } - - fn := filepath.Join(savedir, "conf") - logger.Println("Saving confidences in file", fn) - f, err := os.Create(fn) - if err != nil { - errc <- fmt.Errorf("Error creating file %s: %s", fn, err) - return - } - defer f.Close() - - logger.Println("Finding best confidence for each page, and saving all confidences") - for base, conf := range confs { - var best float64 - for _, c := range conf { - if c.Conf > best { - best = c.Conf - bestconfs[base] = c - } - _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) - if err != nil { - errc <- fmt.Errorf("Error writing confidences file: %s", err) - return - } - } - } - up <- fn - - logger.Println("Creating best file listing the best file for each page") - fn = filepath.Join(savedir, "best") - f, err = os.Create(fn) - if err != nil { - errc <- fmt.Errorf("Error creating file %s: %s", fn, err) - return - } - defer f.Close() - for _, conf := range bestconfs { - _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) - } - up <- fn - - var pgs []string - for _, conf := range bestconfs { - pgs = append(pgs, conf.Path) - } - sort.Strings(pgs) - - logger.Println("Downloading binarised and original images to create PDFs") - bookname, err := filepath.Rel(os.TempDir(), savedir) - if err != nil { - errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err) - return - } - colourpdf := new(bookpipeline.Fpdf) - err = colourpdf.Setup() - if err != nil { - errc <- fmt.Errorf("Failed to set up PDF: %s", err) - return - } - binarisedpdf := new(bookpipeline.Fpdf) - err = binarisedpdf.Setup() - if err != nil { - errc <- fmt.Errorf("Failed to set up PDF: %s", err) - return - } - binhascontent, colourhascontent := false, false - - var colourimgs, binimgs []pageimg - - for _, pg := range pgs { - base := filepath.Base(pg) - nosuffix := strings.TrimSuffix(base, ".hocr") - p := strings.SplitN(base, "_bin", 2) - - var fn string - if len(p) > 1 { - fn = p[0] + ".jpg" - } else { - fn = nosuffix + ".jpg" - } - - binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"}) - colourimgs = append(colourimgs, pageimg{hocr: base, img: fn}) - } - - for _, pg := range binimgs { - logger.Println("Downloading binarised page to add to PDF", pg.img) - err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img)) - if err != nil { - logger.Println("Download failed; skipping page", pg.img) - } else { - err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true) - if err != nil { - errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) - return - } - binhascontent = true - err = os.Remove(filepath.Join(savedir, pg.img)) - if err != nil { - errc <- err - return - } - } - } - - if binhascontent { - fn = filepath.Join(savedir, bookname+".binarised.pdf") - err = binarisedpdf.Save(fn) - if err != nil { - errc <- fmt.Errorf("Failed to save binarised pdf: %s", err) - return - } - up <- fn - key := bookname + "/" + bookname + ".binarised.pdf" - conn.Log("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, fn) - if err != nil { - } - } - - for _, pg := range colourimgs { - logger.Println("Downloading colour page to add to PDF", pg.img) - colourfn := pg.img - err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) - if err != nil { - colourfn = strings.Replace(pg.img, ".jpg", ".png", 1) - logger.Println("Download failed; trying", colourfn) - err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) - if err != nil { - logger.Println("Download failed; skipping page", pg.img) - } - } - if err == nil { - err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true) - if err != nil { - errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) - return - } - colourhascontent = true - err = os.Remove(filepath.Join(savedir, colourfn)) - if err != nil { - errc <- err - return - } - } - } - if colourhascontent { - fn = filepath.Join(savedir, bookname+".colour.pdf") - err = colourpdf.Save(fn) - if err != nil { - errc <- fmt.Errorf("Failed to save colour pdf: %s", err) - return - } - up <- fn - } - - logger.Println("Creating graph") - fn = filepath.Join(savedir, "graph.png") - f, err = os.Create(fn) - if err != nil { - errc <- fmt.Errorf("Error creating file %s: %s", fn, err) - return - } - defer f.Close() - err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) - if err != nil && err.Error() != "Not enough valid confidences" { - errc <- fmt.Errorf("Error rendering graph: %s", err) - return - } - up <- fn - - close(up) - } -} - -func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { - currentmsg := msg - for range t.C { - m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) - if err != nil { - // This is for better debugging of the heartbeat issue - conn.Log("Error with heartbeat", err) - os.Exit(1) - // TODO: would be better to ensure this error stops any running - // processes, as they will ultimately fail in the case of - // it. could do this by setting a global variable that - // processes check each time they loop. - errc <- err - t.Stop() - return - } - if m.Id != "" { - conn.Log("Replaced message handle as visibilitytimeout limit was reached") - currentmsg = m - // TODO: maybe handle communicating new msg more gracefully than this - for range msgc { - } // throw away any old msgc - msgc <- m - } - } -} - -// allOCRed checks whether all pages of a book have been OCRed. -// This is determined by whether every _bin0.?.png file has a -// corresponding .hocr file. -func allOCRed(bookname string, conn Pipeliner) bool { - objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) - if err != nil { - return false - } - - preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) - - atleastone := false - for _, png := range objs { - if preprocessedPattern.MatchString(png) { - atleastone = true - found := false - b := strings.TrimSuffix(filepath.Base(png), ".png") - hocrname := bookname + "/" + b + ".hocr" - for _, hocr := range objs { - if hocr == hocrname { - found = true - break - } - } - if found == false { - return false - } - } - } - if atleastone == false { - return false - } - return true -} - -// ocrPage OCRs a page based on a message. It may make sense to -// roll this back into processBook (on which it is based) once -// working well. -func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { - dl := make(chan string) - msgc := make(chan bookpipeline.Qmsg) - processc := make(chan string) - upc := make(chan string) - done := make(chan bool) - errc := make(chan error) - - msgparts := strings.Split(msg.Body, " ") - bookname := filepath.Dir(msgparts[0]) - if len(msgparts) > 1 && msgparts[1] != "" { - process = ocr(msgparts[1]) - } - - d := filepath.Join(os.TempDir(), bookname) - err := os.MkdirAll(d, 0755) - if err != nil { - return fmt.Errorf("Failed to create directory %s: %s", d, err) - } - - t := time.NewTicker(HeartbeatSeconds * time.Second) - go heartbeat(conn, t, msg, fromQueue, msgc, errc) - - // these functions will do their jobs when their channels have data - go download(dl, processc, conn, d, errc, conn.GetLogger()) - go process(processc, upc, errc, conn.GetLogger()) - go up(upc, done, conn, bookname, errc, conn.GetLogger()) - - dl <- msgparts[0] - close(dl) - - // wait for either the done or errc channel to be sent to - select { - case err = <-errc: - t.Stop() - _ = os.RemoveAll(d) - return err - case <-done: - } - - if allOCRed(bookname, conn) && toQueue != "" { - conn.Log("Sending", bookname, "to queue", toQueue) - err = conn.AddToQueue(toQueue, bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return fmt.Errorf("Error adding to queue %s: %s", bookname, err) - } - } - - t.Stop() - - // check whether we're using a newer msg handle - select { - case m, ok := <-msgc: - if ok { - msg = m - conn.Log("Using new message handle to delete message from queue") - } - default: - conn.Log("Using original message handle to delete message from queue") - } - - conn.Log("Deleting original message from queue", fromQueue) - err = conn.DelFromQueue(fromQueue, msg.Handle) - if err != nil { - _ = os.RemoveAll(d) - return fmt.Errorf("Error deleting message from queue: %s", err) - } - - err = os.RemoveAll(d) - if err != nil { - return fmt.Errorf("Failed to remove directory %s: %s", d, err) - } - - return nil -} - -func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { - dl := make(chan string) - msgc := make(chan bookpipeline.Qmsg) - processc := make(chan string) - upc := make(chan string) - done := make(chan bool) - errc := make(chan error) - - msgparts := strings.Split(msg.Body, " ") - bookname := msgparts[0] - - var training string - if len(msgparts) > 1 { - training = msgparts[1] - } - - d := filepath.Join(os.TempDir(), bookname) - err := os.MkdirAll(d, 0755) - if err != nil { - return fmt.Errorf("Failed to create directory %s: %s", d, err) - } - - t := time.NewTicker(HeartbeatSeconds * time.Second) - go heartbeat(conn, t, msg, fromQueue, msgc, errc) - - // these functions will do their jobs when their channels have data - go download(dl, processc, conn, d, errc, conn.GetLogger()) - go process(processc, upc, errc, conn.GetLogger()) - if toQueue == conn.OCRPageQueueId() { - go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) - } else { - go up(upc, done, conn, bookname, errc, conn.GetLogger()) - } - - conn.Log("Getting list of objects to download") - objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err) - } - var todl []string - for _, n := range objs { - if !match.MatchString(n) { - conn.Log("Skipping item that doesn't match target", n) - continue - } - todl = append(todl, n) - } - for _, a := range todl { - dl <- a - } - close(dl) - - // wait for either the done or errc channel to be sent to - select { - case err = <-errc: - t.Stop() - _ = os.RemoveAll(d) - // if the error is in preprocessing / wipeonly, chances are that it will never - // complete, and will fill the ocrpage queue with parts which succeeded - // on each run, so in that case it's better to delete the message from - // the queue and notify us. - if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { - conn.Log("Deleting message from queue due to a bad error", fromQueue) - err2 := conn.DelFromQueue(fromQueue, msg.Handle) - if err2 != nil { - conn.Log("Error deleting message from queue", err2) - } - ms, err2 := getMailSettings() - if err2 != nil { - conn.Log("Failed to mail settings ", err2) - } - if err2 == nil && ms.server != "" { - logs, err2 := getlogs() - if err2 != nil { - conn.Log("Failed to get logs ", err2) - logs = "" - } - msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" + - "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" + - " Fail message: %s\r\nFull log:\r\n%s\r\n", - ms.to, ms.from, bookname, err, logs) - host := fmt.Sprintf("%s:%s", ms.server, ms.port) - auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server) - err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg)) - if err2 != nil { - conn.Log("Error sending email ", err2) - } - } - } - return err - case <-done: - } - - if toQueue != "" && toQueue != conn.OCRPageQueueId() { - conn.Log("Sending", bookname, "to queue", toQueue) - err = conn.AddToQueue(toQueue, bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return fmt.Errorf("Error adding to queue %s: %s", bookname, err) - } - } - - t.Stop() - - // check whether we're using a newer msg handle - select { - case m, ok := <-msgc: - if ok { - msg = m - conn.Log("Using new message handle to delete message from queue") - } - default: - conn.Log("Using original message handle to delete message from queue") - } - - conn.Log("Deleting original message from queue", fromQueue) - err = conn.DelFromQueue(fromQueue, msg.Handle) - if err != nil { - _ = os.RemoveAll(d) - return fmt.Errorf("Error deleting message from queue: %s", err) - } - - err = os.RemoveAll(d) - if err != nil { - return fmt.Errorf("Failed to remove directory %s: %s", d, err) - } - - return nil -} - func stopTimer(t *time.Timer) { if !t.Stop() { <-t.C @@ -724,50 +87,6 @@ func resetTimer(t *time.Timer, d time.Duration) { } } -// TODO: rather than relying on journald, would be nicer to save the logs -// ourselves maybe, so that we weren't relying on a particular systemd -// setup. this can be done by having the conn.Log also append line -// to a file (though that would mean everything would have to go through -// conn.Log, which we're not consistently doing yet). the correct thing -// to do then would be to implement a new interface that covers the part -// of log.Logger we use (e.g. Print and Printf), and then have an exported -// conn struct that implements those, so that we could pass a log.Logger -// or the new conn struct everywhere (we wouldn't be passing a log.Logger, -// it's just good to be able to keep the compatibility) -func getlogs() (string, error) { - cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all") - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - err := cmd.Run() - return stdout.String(), err -} - -func savelogs(conn Pipeliner, starttime int64, hostname string) error { - logs, err := getlogs() - if err != nil { - return fmt.Errorf("Error getting logs, error: %v", err) - } - key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname) - path := filepath.Join(os.TempDir(), key) - f, err := os.Create(path) - if err != nil { - return fmt.Errorf("Error creating log file", err) - } - defer f.Close() - _, err = f.WriteString(logs) - if err != nil { - return fmt.Errorf("Error saving log file", err) - } - _ = f.Close() - err = conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - return fmt.Errorf("Error uploading log", err) - } - conn.Log("Log saved to", key) - return nil -} - func main() { verbose := flag.Bool("v", false, "verbose") training := flag.String("t", "rescribealphav5", "default tesseract training file to use (without the .traineddata part)") @@ -809,7 +128,7 @@ func main() { var err error if *conntype != "local" { - _, err = getMailSettings() + _, err = pipeline.GetMailSettings() if err != nil { conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err) } @@ -857,7 +176,7 @@ func main() { for { select { case <-checkPreQueue: - msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs) checkPreQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking preprocess queue", err) @@ -869,13 +188,13 @@ func main() { } conn.Log("Message received on preprocess queue, processing", msg.Body) stopTimer(stopIfQuiet) - err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during preprocess", err) } case <-checkWipeQueue: - msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) checkWipeQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking wipeonly queue", err) @@ -887,13 +206,13 @@ func main() { } stopTimer(stopIfQuiet) conn.Log("Message received on wipeonly queue, processing", msg.Body) - err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during wipe", err) } case <-checkOCRPageQueue: - msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs) checkOCRPageQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking OCR Page queue", err) @@ -907,13 +226,13 @@ func main() { checkOCRPageQueue = time.After(0) stopTimer(stopIfQuiet) conn.Log("Message received on OCR Page queue, processing", msg.Body) - err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during OCR Page process", err) } case <-checkAnalyseQueue: - msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs) checkAnalyseQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking analyse queue", err) @@ -925,14 +244,14 @@ func main() { } stopTimer(stopIfQuiet) conn.Log("Message received on analyse queue, processing", msg.Body) - err = processBook(msg, conn, analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") + err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during analysis", err) } case <-savelognow.C: conn.Log("Saving logs") - err = savelogs(conn, starttime, hostname) + err = pipeline.SaveLogs(conn, starttime, hostname) if err != nil { conn.Log("Error saving logs", err) } @@ -942,11 +261,11 @@ func main() { } if !*autoshutdown { conn.Log("Stopping pipeline") - _ = savelogs(conn, starttime, hostname) + _ = pipeline.SaveLogs(conn, starttime, hostname) return } conn.Log("Shutting down") - _ = savelogs(conn, starttime, hostname) + _ = pipeline.SaveLogs(conn, starttime, hostname) cmd := exec.Command("sudo", "systemctl", "poweroff") var stdout, stderr bytes.Buffer cmd.Stdout = &stdout -- cgit v1.2.1-24-ge1ad From fc6becf5ed98e9c0815532fd76639c15eb481ed1 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 17:33:52 +0000 Subject: [rescribe] work in progress at a self-contained local pipeline processor, called rescribe --- cmd/rescribe/main.go | 247 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 247 insertions(+) create mode 100644 cmd/rescribe/main.go (limited to 'cmd') diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go new file mode 100644 index 0000000..e3781cb --- /dev/null +++ b/cmd/rescribe/main.go @@ -0,0 +1,247 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// rescribe is a modification of bookpipeline designed for local-only +// operation, which rolls uploading, processing, and downloading of +// a single book by the pipeline into one command. +package main + +import ( + "flag" + "fmt" + "log" + "os" + "path/filepath" + "regexp" + "time" + + "rescribe.xyz/bookpipeline" + + "rescribe.xyz/bookpipeline/internal/pipeline" +) + +const usage = `Usage: rescribe [-v] [-t training] bookdir + +Process and OCR a book using the Rescribe pipeline on a local machine. +` + +const QueueTimeoutSecs = 2 * 60 +const PauseBetweenChecks = 1 * time.Second +const LogSaveTime = 1 * time.Minute + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} + +type Clouder interface { + Init() error + ListObjects(bucket string, prefix string) ([]string, error) + Download(bucket string, key string, fn string) error + Upload(bucket string, key string, path string) error + CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) + AddToQueue(url string, msg string) error + DelFromQueue(url string, handle string) error + QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) +} + +type Pipeliner interface { + Clouder + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string + WIPStorageId() string + GetLogger() *log.Logger + Log(v ...interface{}) +} + +func stopTimer(t *time.Timer) { + if !t.Stop() { + <-t.C + } +} + +func resetTimer(t *time.Timer, d time.Duration) { + if d > 0 { + t.Reset(d) + } +} + +func main() { + verbose := flag.Bool("v", false, "verbose") + training := flag.String("t", "rescribealphav5", "default tesseract training file to use (without the .traineddata part)") + + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + if flag.NArg() < 1 || flag.NArg() > 3 { + flag.Usage() + return + } + + bookdir := flag.Arg(0) + var bookname string + if flag.NArg() > 2 { + bookname = flag.Arg(1) + } else { + bookname = filepath.Base(bookdir) + } + + var verboselog *log.Logger + if *verbose { + verboselog = log.New(os.Stdout, "", 0) + } else { + var n NullWriter + verboselog = log.New(n, "", 0) + } + + var conn Pipeliner + // TODO: set tmpdir to a specific random thing for this run only + conn = &bookpipeline.LocalConn{Logger: verboselog} + + conn.Log("Setting up session") + err := conn.Init() + if err != nil { + log.Fatalln("Error setting up connection:", err) + } + conn.Log("Finished setting up session") + + uploadbook(bookdir, bookname, *training, conn) + + processbook(*training, conn) + + // TODO: save book +} + +func uploadbook(dir string, name string, training string, conn Pipeliner) error { + err := pipeline.CheckImages(dir) + if err != nil { + return fmt.Errorf("Error with images in %s: %v", dir, err) + } + err = pipeline.UploadImages(dir, name, conn) + if err != nil { + return fmt.Errorf("Error saving images to process from %s: %v", dir, err) + } + + qid := pipeline.DetectQueueType(dir, conn) + if training != "" { + name = name + " " + training + } + err = conn.AddToQueue(qid, name) + if err != nil { + return fmt.Errorf("Error adding book job to queue %s: %v", qid, err) + } + + return nil +} + + +func processbook(training string, conn Pipeliner) { + origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) + wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`) + ocredPattern := regexp.MustCompile(`.hocr$`) + + var checkPreQueue <-chan time.Time + var checkWipeQueue <-chan time.Time + var checkOCRPageQueue <-chan time.Time + var checkAnalyseQueue <-chan time.Time + var stopIfQuiet *time.Timer + checkPreQueue = time.After(0) + checkWipeQueue = time.After(0) + checkOCRPageQueue = time.After(0) + checkAnalyseQueue = time.After(0) + var quietTime = 1 * time.Second + stopIfQuiet = time.NewTimer(quietTime) + if quietTime == 0 { + stopIfQuiet.Stop() + } + + for { + select { + case <-checkPreQueue: + msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs) + checkPreQueue = time.After(PauseBetweenChecks) + if err != nil { + conn.Log("Error checking preprocess queue", err) + continue + } + if msg.Handle == "" { + conn.Log("No message received on preprocess queue, sleeping") + continue + } + conn.Log("Message received on preprocess queue, processing", msg.Body) + stopTimer(stopIfQuiet) + err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + resetTimer(stopIfQuiet, quietTime) + if err != nil { + conn.Log("Error during preprocess", err) + } + case <-checkWipeQueue: + msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) + checkWipeQueue = time.After(PauseBetweenChecks) + if err != nil { + conn.Log("Error checking wipeonly queue", err) + continue + } + if msg.Handle == "" { + conn.Log("No message received on wipeonly queue, sleeping") + continue + } + stopTimer(stopIfQuiet) + conn.Log("Message received on wipeonly queue, processing", msg.Body) + err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) + resetTimer(stopIfQuiet, quietTime) + if err != nil { + conn.Log("Error during wipe", err) + } + case <-checkOCRPageQueue: + msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs) + checkOCRPageQueue = time.After(PauseBetweenChecks) + if err != nil { + conn.Log("Error checking OCR Page queue", err) + continue + } + if msg.Handle == "" { + continue + } + // Have OCRPageQueue checked immediately after completion, as chances are high that + // there will be more pages that should be done without delay + checkOCRPageQueue = time.After(0) + stopTimer(stopIfQuiet) + conn.Log("Message received on OCR Page queue, processing", msg.Body) + err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + resetTimer(stopIfQuiet, quietTime) + if err != nil { + conn.Log("Error during OCR Page process", err) + } + case <-checkAnalyseQueue: + msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs) + checkAnalyseQueue = time.After(PauseBetweenChecks) + if err != nil { + conn.Log("Error checking analyse queue", err) + continue + } + if msg.Handle == "" { + conn.Log("No message received on analyse queue, sleeping") + continue + } + stopTimer(stopIfQuiet) + conn.Log("Message received on analyse queue, processing", msg.Body) + err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") + resetTimer(stopIfQuiet, quietTime) + if err != nil { + conn.Log("Error during analysis", err) + } + case <-stopIfQuiet.C: + conn.Log("Processing finished") + return + } + } +} -- cgit v1.2.1-24-ge1ad From a1de8862a091f9584220db40671a0d43346c4519 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 18:29:56 +0000 Subject: [rescribe] Local only combo tool basically now working. Testing is still minimal. --- cmd/rescribe/main.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 49 insertions(+), 6 deletions(-) (limited to 'cmd') diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index e3781cb..c309367 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -10,6 +10,7 @@ package main import ( "flag" "fmt" + "io/ioutil" "log" "os" "path/filepath" @@ -88,7 +89,7 @@ func main() { bookdir := flag.Arg(0) var bookname string - if flag.NArg() > 2 { + if flag.NArg() > 1 { bookname = flag.Arg(1) } else { bookname = filepath.Base(bookdir) @@ -102,22 +103,41 @@ func main() { verboselog = log.New(n, "", 0) } + tempdir, err := ioutil.TempDir("", "bookpipeline") + if err != nil { + log.Fatalln("Error setting up temporary directory:", err) + } + var conn Pipeliner - // TODO: set tmpdir to a specific random thing for this run only - conn = &bookpipeline.LocalConn{Logger: verboselog} + conn = &bookpipeline.LocalConn{Logger: verboselog, TempDir: tempdir} conn.Log("Setting up session") - err := conn.Init() + err = conn.Init() if err != nil { log.Fatalln("Error setting up connection:", err) } conn.Log("Finished setting up session") - uploadbook(bookdir, bookname, *training, conn) + fmt.Printf("Copying book to pipeline\n") + err = uploadbook(bookdir, bookname, *training, conn) + if err != nil { + log.Fatalln(err) + } + + fmt.Printf("Processing book (this may take some time)\n") processbook(*training, conn) - // TODO: save book + fmt.Printf("Saving finished book to %s\n", bookname) + err = downloadbook(bookname, conn) + if err != nil { + log.Fatalln(err) + } + + err = os.RemoveAll(tempdir) + if err != nil { + log.Fatalf("Error removing temporary directory %s: %v", tempdir, err) + } } func uploadbook(dir string, name string, training string, conn Pipeliner) error { @@ -142,6 +162,29 @@ func uploadbook(dir string, name string, training string, conn Pipeliner) error return nil } +func downloadbook(name string, conn Pipeliner) error { + err := os.MkdirAll(name, 0755) + if err != nil { + log.Fatalln("Failed to create directory", name, err) + } + + err = pipeline.DownloadBestPages(name, conn) + if err != nil { + return fmt.Errorf("Error downloading best pages: %v", err) + } + + err = pipeline.DownloadPdfs(name, conn) + if err != nil { + return fmt.Errorf("Error downloading PDFs: %v", err) + } + + err = pipeline.DownloadAnalyses(name, conn) + if err != nil { + return fmt.Errorf("Error downloading analyses: %v", err) + } + + return nil +} func processbook(training string, conn Pipeliner) { origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) -- cgit v1.2.1-24-ge1ad From f19df9e8c1213a49c426caefd2fadc711f5faf11 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 18:55:36 +0000 Subject: Switch Preprocess() to take the thresholds to use, and have rescribe tool only use 0.1,0.2,0.3 --- cmd/bookpipeline/main.go | 2 +- cmd/rescribe/main.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'cmd') diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index aff7b87..12d5eec 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -188,7 +188,7 @@ func main() { } conn.Log("Message received on preprocess queue, processing", msg.Body) stopTimer(stopIfQuiet) - err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during preprocess", err) diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index c309367..1a3dcff 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -221,7 +221,7 @@ func processbook(training string, conn Pipeliner) { } conn.Log("Message received on preprocess queue, processing", msg.Body) stopTimer(stopIfQuiet) - err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.3}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during preprocess", err) -- cgit v1.2.1-24-ge1ad From 2c040f73ce7bbba480c441a0433fc8b4d6449254 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 9 Nov 2020 18:57:21 +0000 Subject: Add a couple of things that should not be forgotten --- cmd/booktopipeline/main.go | 2 ++ cmd/getpipelinebook/main.go | 2 ++ 2 files changed, 4 insertions(+) (limited to 'cmd') diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go index 60d1f81..96a6f6c 100644 --- a/cmd/booktopipeline/main.go +++ b/cmd/booktopipeline/main.go @@ -19,6 +19,8 @@ import ( "rescribe.xyz/bookpipeline" ) +// TODO: use internal/pipeline/get.go functions + const usage = `Usage: booktopipeline [-c conn] [-t training] [-prebinarised] [-notbinarised] [-v] bookdir [bookname] Uploads the book in bookdir to the S3 'inprogress' bucket and adds it diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go index 03e709b..ef13db5 100644 --- a/cmd/getpipelinebook/main.go +++ b/cmd/getpipelinebook/main.go @@ -17,6 +17,8 @@ import ( "rescribe.xyz/bookpipeline" ) +// TODO: use internal/pipeline/get.go functions + const usage = `Usage: getpipelinebook [-c conn] [-a] [-graph] [-pdf] [-png] [-v] bookname Downloads the pipeline results for a book. -- cgit v1.2.1-24-ge1ad From c48630f590fe2c877e899948f1bf88458d3fd813 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 10 Nov 2020 10:05:16 +0000 Subject: Switch booktopipeline to use internal pipeline functions --- cmd/booktopipeline/main.go | 85 ++++++---------------------------------------- 1 file changed, 11 insertions(+), 74 deletions(-) (limited to 'cmd') diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go index 96a6f6c..7254d78 100644 --- a/cmd/booktopipeline/main.go +++ b/cmd/booktopipeline/main.go @@ -9,17 +9,14 @@ package main import ( "flag" "fmt" - "image" - _ "image/png" - _ "image/jpeg" "log" "os" "path/filepath" "rescribe.xyz/bookpipeline" -) -// TODO: use internal/pipeline/get.go functions + "rescribe.xyz/bookpipeline/internal/pipeline" +) const usage = `Usage: booktopipeline [-c conn] [-t training] [-prebinarised] [-notbinarised] [-v] bookdir [bookname] @@ -34,15 +31,6 @@ using the flags -prebinarised (for the wipeonly queue) or If bookname is omitted the last part of the bookdir is used. ` -type Pipeliner interface { - Init() error - PreQueueId() string - WipeQueueId() string - WIPStorageId() string - AddToQueue(url string, msg string) error - Upload(bucket string, key string, path string) error -} - // null writer to enable non-verbose logging to be discarded type NullWriter bool @@ -52,18 +40,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) { var verboselog *log.Logger -type fileWalk chan string - -func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if !info.IsDir() { - f <- path - } - return nil -} - func main() { verbose := flag.Bool("v", false, "Verbose") conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") @@ -96,7 +72,7 @@ func main() { verboselog = log.New(n, "", log.LstdFlags) } - var conn Pipeliner + var conn pipeline.Pipeliner switch *conntype { case "aws": conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} @@ -110,18 +86,7 @@ func main() { log.Fatalln("Failed to set up cloud connection:", err) } - qid := conn.PreQueueId() - - // Auto detect type of queue to send to based on file extension - pngdirs, _ := filepath.Glob(bookdir + "/*.png") - jpgdirs, _ := filepath.Glob(bookdir + "/*.jpg") - pngcount := len(pngdirs) - jpgcount := len(jpgdirs) - if pngcount > jpgcount { - qid = conn.WipeQueueId() - } else { - qid = conn.PreQueueId() - } + qid := pipeline.DetectQueueType(bookdir, conn) // Flags set override the queue selection if *wipeonly { @@ -132,43 +97,15 @@ func main() { } verboselog.Println("Checking that all images are valid in", bookdir) - checker := make(fileWalk) - go func() { - err = filepath.Walk(bookdir, checker.Walk) - if err != nil { - log.Fatalln("Filesystem walk failed:", err) - } - close(checker) - }() - - for path := range checker { - f, err := os.Open(path) - if err != nil { - log.Fatalln("Opening image %s failed, bailing: %v", path, err) - } - _, _, err = image.Decode(f) - if err != nil { - log.Fatalf("Decoding image %s failed, bailing: %v", path, err) - } + err = pipeline.CheckImages(bookdir) + if err != nil { + log.Fatalln(err) } - verboselog.Println("Walking", bookdir) - walker := make(fileWalk) - go func() { - err = filepath.Walk(bookdir, walker.Walk) - if err != nil { - log.Fatalln("Filesystem walk failed:", err) - } - close(walker) - }() - - for path := range walker { - verboselog.Println("Uploading", path) - name := filepath.Base(path) - err = conn.Upload(conn.WIPStorageId(), filepath.Join(bookname, name), path) - if err != nil { - log.Fatalln("Failed to upload", path, err) - } + verboselog.Println("Uploading all images are valid in", bookdir) + err = pipeline.UploadImages(bookdir, bookname, conn) + if err != nil { + log.Fatalln(err) } if *training != "" { -- cgit v1.2.1-24-ge1ad From 198f8215f8dd0460608abcd03fa49451462c9d11 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 10 Nov 2020 10:41:15 +0000 Subject: [getpipelinebook] Rewrite to use internal package functions --- cmd/getpipelinebook/main.go | 100 ++++++++------------------------------------ cmd/rescribe/main.go | 2 +- 2 files changed, 19 insertions(+), 83 deletions(-) (limited to 'cmd') diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go index ef13db5..5116414 100644 --- a/cmd/getpipelinebook/main.go +++ b/cmd/getpipelinebook/main.go @@ -6,18 +6,16 @@ package main import ( - "bufio" "flag" "fmt" "log" "os" "path/filepath" - "strings" "rescribe.xyz/bookpipeline" -) -// TODO: use internal/pipeline/get.go functions + "rescribe.xyz/bookpipeline/internal/pipeline" +) const usage = `Usage: getpipelinebook [-c conn] [-a] [-graph] [-pdf] [-png] [-v] bookname @@ -35,28 +33,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) { return len(p), nil } -type Pipeliner interface { - MinimalInit() error - ListObjects(bucket string, prefix string) ([]string, error) - Download(bucket string, key string, fn string) error - Upload(bucket string, key string, path string) error - CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) - AddToQueue(url string, msg string) error - DelFromQueue(url string, handle string) error - WIPStorageId() string -} - -func getpdfs(conn Pipeliner, l *log.Logger, bookname string) { - for _, suffix := range []string{".colour.pdf", ".binarised.pdf"} { - fn := filepath.Join(bookname, bookname+suffix) - l.Println("Downloading PDF", fn) - err := conn.Download(conn.WIPStorageId(), fn, fn) - if err != nil { - log.Printf("Failed to download %s: %s\n", fn, err) - } - } -} - func main() { all := flag.Bool("a", false, "Get all files for book") conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") @@ -85,7 +61,7 @@ func main() { verboselog = log.New(n, "", log.LstdFlags) } - var conn Pipeliner + var conn pipeline.MinPipeliner switch *conntype { case "aws": conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} @@ -111,18 +87,10 @@ func main() { if *all { verboselog.Println("Downloading all files for", bookname) - objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + err = pipeline.DownloadAll(bookname, conn) if err != nil { - log.Fatalln("Failed to get list of files for book", bookname, err) - } - for _, i := range objs { - verboselog.Println("Downloading", i) - err = conn.Download(conn.WIPStorageId(), i, i) - if err != nil { - log.Fatalln("Failed to download file", i, err) - } + log.Fatalln(err) } - return } if *binarisedpdf { @@ -153,61 +121,29 @@ func main() { } if *pdf { - getpdfs(conn, verboselog, bookname) + verboselog.Println("Downloading PDFs") + pipeline.DownloadPdfs(bookname, conn) } if *binarisedpdf || *colourpdf || *graph || *pdf { return } - verboselog.Println("Downloading best file") - fn := filepath.Join(bookname, "best") - err = conn.Download(conn.WIPStorageId(), fn, fn) + verboselog.Println("Downloading best pages") + err = pipeline.DownloadBestPages(bookname, conn, *png) if err != nil { - log.Fatalln("Failed to download 'best' file", err) - } - f, err := os.Open(fn) - if err != nil { - log.Fatalln("Failed to open best file", err) - } - defer f.Close() - - if *png { - verboselog.Println("Downloading png files") - s := bufio.NewScanner(f) - for s.Scan() { - txtfn := filepath.Join(bookname, s.Text()) - fn = strings.Replace(txtfn, ".hocr", ".png", 1) - verboselog.Println("Downloading file", fn) - err = conn.Download(conn.WIPStorageId(), fn, fn) - if err != nil { - log.Fatalln("Failed to download file", fn, err) - } - } - return + log.Fatalln(err) } - verboselog.Println("Downloading HOCR files") - s := bufio.NewScanner(f) - for s.Scan() { - fn = filepath.Join(bookname, s.Text()) - verboselog.Println("Downloading file", fn) - err = conn.Download(conn.WIPStorageId(), fn, fn) - if err != nil { - log.Fatalln("Failed to download file", fn, err) - } + verboselog.Println("Downloading PDFs") + pipeline.DownloadPdfs(bookname, conn) + if err != nil { + log.Fatalln(err) } - verboselog.Println("Downloading PDF files") - getpdfs(conn, verboselog, bookname) - - verboselog.Println("Downloading analysis files") - for _, a := range []string{"conf", "graph.png"} { - fn = filepath.Join(bookname, a) - verboselog.Println("Downloading file", fn) - err = conn.Download(conn.WIPStorageId(), fn, fn) - if err != nil { - log.Fatalln("Failed to download file", fn, err) - } + verboselog.Println("Downloading analyses") + err = pipeline.DownloadAnalyses(bookname, conn) + if err != nil { + log.Fatalln(err) } } diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 1a3dcff..8e2fe69 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -168,7 +168,7 @@ func downloadbook(name string, conn Pipeliner) error { log.Fatalln("Failed to create directory", name, err) } - err = pipeline.DownloadBestPages(name, conn) + err = pipeline.DownloadBestPages(name, conn, false) if err != nil { return fmt.Errorf("Error downloading best pages: %v", err) } -- cgit v1.2.1-24-ge1ad From 7921b5ca6d6667dda09ae67dcc1ee987aef62ebb Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 10 Nov 2020 11:22:36 +0000 Subject: [rescribe] Handle errors in processbook correctly, and improve console output --- cmd/rescribe/main.go | 44 +++++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 17 deletions(-) (limited to 'cmd') diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 8e2fe69..3b69b21 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -122,15 +122,21 @@ func main() { err = uploadbook(bookdir, bookname, *training, conn) if err != nil { + _ = os.RemoveAll(tempdir) log.Fatalln(err) } - fmt.Printf("Processing book (this may take some time)\n") - processbook(*training, conn) + fmt.Printf("Processing book\n") + err = processbook(*training, conn) + if err != nil { + _ = os.RemoveAll(tempdir) + log.Fatalln(err) + } fmt.Printf("Saving finished book to %s\n", bookname) err = downloadbook(bookname, conn) if err != nil { + _ = os.RemoveAll(tempdir) log.Fatalln(err) } @@ -186,7 +192,7 @@ func downloadbook(name string, conn Pipeliner) error { return nil } -func processbook(training string, conn Pipeliner) { +func processbook(training string, conn Pipeliner) error { origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`) ocredPattern := regexp.MustCompile(`.hocr$`) @@ -212,26 +218,26 @@ func processbook(training string, conn Pipeliner) { msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs) checkPreQueue = time.After(PauseBetweenChecks) if err != nil { - conn.Log("Error checking preprocess queue", err) - continue + return fmt.Errorf("Error checking preprocess queue", err) } if msg.Handle == "" { conn.Log("No message received on preprocess queue, sleeping") continue } - conn.Log("Message received on preprocess queue, processing", msg.Body) stopTimer(stopIfQuiet) + conn.Log("Message received on preprocess queue, processing", msg.Body) + fmt.Printf(" Preprocessing book (binarising and wiping)\n") err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.3}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output resetTimer(stopIfQuiet, quietTime) if err != nil { - conn.Log("Error during preprocess", err) + return fmt.Errorf("Error during preprocess", err) } case <-checkWipeQueue: msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) checkWipeQueue = time.After(PauseBetweenChecks) if err != nil { - conn.Log("Error checking wipeonly queue", err) - continue + return fmt.Errorf("Error checking wipeonly queue", err) } if msg.Handle == "" { conn.Log("No message received on wipeonly queue, sleeping") @@ -239,17 +245,18 @@ func processbook(training string, conn Pipeliner) { } stopTimer(stopIfQuiet) conn.Log("Message received on wipeonly queue, processing", msg.Body) + fmt.Printf(" Preprocessing book (wiping only)\n") err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) + fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output resetTimer(stopIfQuiet, quietTime) if err != nil { - conn.Log("Error during wipe", err) + return fmt.Errorf("Error during wipe", err) } case <-checkOCRPageQueue: msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs) checkOCRPageQueue = time.After(PauseBetweenChecks) if err != nil { - conn.Log("Error checking OCR Page queue", err) - continue + return fmt.Errorf("Error checking OCR Page queue", err) } if msg.Handle == "" { continue @@ -259,17 +266,17 @@ func processbook(training string, conn Pipeliner) { checkOCRPageQueue = time.After(0) stopTimer(stopIfQuiet) conn.Log("Message received on OCR Page queue, processing", msg.Body) + fmt.Printf(".") err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { - conn.Log("Error during OCR Page process", err) + return fmt.Errorf("\nError during OCR Page process", err) } case <-checkAnalyseQueue: msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs) checkAnalyseQueue = time.After(PauseBetweenChecks) if err != nil { - conn.Log("Error checking analyse queue", err) - continue + return fmt.Errorf("Error checking analyse queue", err) } if msg.Handle == "" { conn.Log("No message received on analyse queue, sleeping") @@ -277,14 +284,17 @@ func processbook(training string, conn Pipeliner) { } stopTimer(stopIfQuiet) conn.Log("Message received on analyse queue, processing", msg.Body) + fmt.Printf("\n Analysing OCR and compiling PDFs\n") err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") resetTimer(stopIfQuiet, quietTime) if err != nil { - conn.Log("Error during analysis", err) + return fmt.Errorf("Error during analysis", err) } case <-stopIfQuiet.C: conn.Log("Processing finished") - return + return nil } } + + return fmt.Errorf("Ended unexpectedly") // should never be reached } -- cgit v1.2.1-24-ge1ad From dac2f1ad471cd9896c16569fe02c69ff9b9855ba Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 10 Nov 2020 11:59:14 +0000 Subject: [rescribe] Change -t to the path of the traineddata file, and set TESSDATA_PREFIX accordingly --- cmd/rescribe/main.go | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) (limited to 'cmd') diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 3b69b21..8d7c07b 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -13,8 +13,10 @@ import ( "io/ioutil" "log" "os" + "os/exec" "path/filepath" "regexp" + "strings" "time" "rescribe.xyz/bookpipeline" @@ -74,7 +76,7 @@ func resetTimer(t *time.Timer, d time.Duration) { func main() { verbose := flag.Bool("v", false, "verbose") - training := flag.String("t", "rescribealphav5", "default tesseract training file to use (without the .traineddata part)") + training := flag.String("t", "training/rescribev7_fast.traineddata", "path to the tesseract training file to use") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) @@ -103,6 +105,33 @@ func main() { verboselog = log.New(n, "", 0) } + f, err := os.Open(*training) + if err != nil { + fmt.Fprintf(os.Stderr, "Error: Training file %s could not be opened.\n", *training) + fmt.Fprintf(os.Stderr, "Set the `-t` flag with path to a tesseract .traineddata file.\n") + os.Exit(1) + } + f.Close() + + abstraining, err := filepath.Abs(*training) + if err != nil { + log.Fatalf("Error getting absolute path of training %s: %v", err) + } + tessPrefix, trainingName := filepath.Split(abstraining) + trainingName = strings.TrimSuffix(trainingName, ".traineddata") + err = os.Setenv("TESSDATA_PREFIX", tessPrefix) + if err != nil { + log.Fatalln("Error setting TESSDATA_PREFIX:", err) + } + + // TODO: would be good to be able to set custom path to tesseract + _, err = exec.Command("tesseract", "--help").Output() + if err != nil { + fmt.Fprintf(os.Stderr, "Error: Can't run Tesseract.\n") + fmt.Fprintf(os.Stderr, "Ensure that Tesseract is installed and available.\n") + os.Exit(1) + } + tempdir, err := ioutil.TempDir("", "bookpipeline") if err != nil { log.Fatalln("Error setting up temporary directory:", err) @@ -120,14 +149,14 @@ func main() { fmt.Printf("Copying book to pipeline\n") - err = uploadbook(bookdir, bookname, *training, conn) + err = uploadbook(bookdir, bookname, trainingName, conn) if err != nil { _ = os.RemoveAll(tempdir) log.Fatalln(err) } fmt.Printf("Processing book\n") - err = processbook(*training, conn) + err = processbook(trainingName, conn) if err != nil { _ = os.RemoveAll(tempdir) log.Fatalln(err) -- cgit v1.2.1-24-ge1ad From ad7aaf490e78e969bb5495dfda06a33d2a176aec Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 10 Nov 2020 12:28:50 +0000 Subject: [rescribe] Enable custom paths to tesseract command to be set (also improve some error output) --- cmd/bookpipeline/main.go | 2 +- cmd/rescribe/main.go | 36 ++++++++++++++++++------------------ 2 files changed, 19 insertions(+), 19 deletions(-) (limited to 'cmd') diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 12d5eec..909b431 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -226,7 +226,7 @@ func main() { checkOCRPageQueue = time.After(0) stopTimer(stopIfQuiet) conn.Log("Message received on OCR Page queue, processing", msg.Body) - err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training, ""), conn.OCRPageQueueId(), conn.AnalyseQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during OCR Page process", err) diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 8d7c07b..6a2fb9f 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -77,6 +77,7 @@ func resetTimer(t *time.Timer, d time.Duration) { func main() { verbose := flag.Bool("v", false, "verbose") training := flag.String("t", "training/rescribev7_fast.traineddata", "path to the tesseract training file to use") + tesscmd := flag.String("tesscmd", "tesseract", "The Tesseract executable to run. You may need to set this to the full path of Tesseract.exe if you're on Windows.") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) @@ -124,11 +125,12 @@ func main() { log.Fatalln("Error setting TESSDATA_PREFIX:", err) } - // TODO: would be good to be able to set custom path to tesseract - _, err = exec.Command("tesseract", "--help").Output() + _, err = exec.Command(*tesscmd, "--help").Output() if err != nil { fmt.Fprintf(os.Stderr, "Error: Can't run Tesseract.\n") fmt.Fprintf(os.Stderr, "Ensure that Tesseract is installed and available.\n") + fmt.Fprintf(os.Stderr, "You may need to -tesscmd to the full path of Tesseract.exe if you're on Windows, like this:\n") + fmt.Fprintf(os.Stderr, " rescribe -tesscmd 'C:\\Program Files\\Tesseract OCR\\tesseract.exe' ...\n") os.Exit(1) } @@ -149,14 +151,14 @@ func main() { fmt.Printf("Copying book to pipeline\n") - err = uploadbook(bookdir, bookname, trainingName, conn) + err = uploadbook(bookdir, bookname, conn) if err != nil { _ = os.RemoveAll(tempdir) log.Fatalln(err) } fmt.Printf("Processing book\n") - err = processbook(trainingName, conn) + err = processbook(trainingName, *tesscmd, conn) if err != nil { _ = os.RemoveAll(tempdir) log.Fatalln(err) @@ -175,7 +177,7 @@ func main() { } } -func uploadbook(dir string, name string, training string, conn Pipeliner) error { +func uploadbook(dir string, name string, conn Pipeliner) error { err := pipeline.CheckImages(dir) if err != nil { return fmt.Errorf("Error with images in %s: %v", dir, err) @@ -186,9 +188,7 @@ func uploadbook(dir string, name string, training string, conn Pipeliner) error } qid := pipeline.DetectQueueType(dir, conn) - if training != "" { - name = name + " " + training - } + err = conn.AddToQueue(qid, name) if err != nil { return fmt.Errorf("Error adding book job to queue %s: %v", qid, err) @@ -221,7 +221,7 @@ func downloadbook(name string, conn Pipeliner) error { return nil } -func processbook(training string, conn Pipeliner) error { +func processbook(training string, tesscmd string, conn Pipeliner) error { origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`) ocredPattern := regexp.MustCompile(`.hocr$`) @@ -247,7 +247,7 @@ func processbook(training string, conn Pipeliner) error { msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs) checkPreQueue = time.After(PauseBetweenChecks) if err != nil { - return fmt.Errorf("Error checking preprocess queue", err) + return fmt.Errorf("Error checking preprocess queue: %v", err) } if msg.Handle == "" { conn.Log("No message received on preprocess queue, sleeping") @@ -260,13 +260,13 @@ func processbook(training string, conn Pipeliner) error { fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output resetTimer(stopIfQuiet, quietTime) if err != nil { - return fmt.Errorf("Error during preprocess", err) + return fmt.Errorf("Error during preprocess: %v", err) } case <-checkWipeQueue: msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) checkWipeQueue = time.After(PauseBetweenChecks) if err != nil { - return fmt.Errorf("Error checking wipeonly queue", err) + return fmt.Errorf("Error checking wipeonly queue, %v", err) } if msg.Handle == "" { conn.Log("No message received on wipeonly queue, sleeping") @@ -279,13 +279,13 @@ func processbook(training string, conn Pipeliner) error { fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output resetTimer(stopIfQuiet, quietTime) if err != nil { - return fmt.Errorf("Error during wipe", err) + return fmt.Errorf("Error during wipe: %v", err) } case <-checkOCRPageQueue: msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs) checkOCRPageQueue = time.After(PauseBetweenChecks) if err != nil { - return fmt.Errorf("Error checking OCR Page queue", err) + return fmt.Errorf("Error checking OCR Page queue: %v", err) } if msg.Handle == "" { continue @@ -296,16 +296,16 @@ func processbook(training string, conn Pipeliner) error { stopTimer(stopIfQuiet) conn.Log("Message received on OCR Page queue, processing", msg.Body) fmt.Printf(".") - err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training, tesscmd), conn.OCRPageQueueId(), conn.AnalyseQueueId()) resetTimer(stopIfQuiet, quietTime) if err != nil { - return fmt.Errorf("\nError during OCR Page process", err) + return fmt.Errorf("\nError during OCR Page process: %v", err) } case <-checkAnalyseQueue: msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs) checkAnalyseQueue = time.After(PauseBetweenChecks) if err != nil { - return fmt.Errorf("Error checking analyse queue", err) + return fmt.Errorf("Error checking analyse queue: %v", err) } if msg.Handle == "" { conn.Log("No message received on analyse queue, sleeping") @@ -317,7 +317,7 @@ func processbook(training string, conn Pipeliner) error { err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") resetTimer(stopIfQuiet, quietTime) if err != nil { - return fmt.Errorf("Error during analysis", err) + return fmt.Errorf("Error during analysis: %v", err) } case <-stopIfQuiet.C: conn.Log("Processing finished") -- cgit v1.2.1-24-ge1ad From 33f1726a4c9f8013dcde39e644281059d9766bc4 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 10 Nov 2020 12:30:15 +0000 Subject: gofmt --- cmd/getbests/main.go | 4 +-- cmd/postprocess-bythresh/main.go | 71 ++++++++++++++++++---------------------- 2 files changed, 34 insertions(+), 41 deletions(-) (limited to 'cmd') diff --git a/cmd/getbests/main.go b/cmd/getbests/main.go index 9eca0d8..c1ee50d 100644 --- a/cmd/getbests/main.go +++ b/cmd/getbests/main.go @@ -62,8 +62,8 @@ func main() { log.Println("Downloading all best files found") for _, i := range objs { parts := strings.Split(i, "/") - if parts[len(parts) - 1] == "best" { - err = conn.Download(conn.WIPStorageId(), i, parts[0] + "-best") + if parts[len(parts)-1] == "best" { + err = conn.Download(conn.WIPStorageId(), i, parts[0]+"-best") if err != nil { log.Fatalln("Failed to download file", i, err) } diff --git a/cmd/postprocess-bythresh/main.go b/cmd/postprocess-bythresh/main.go index 37b77e7..5bdb839 100644 --- a/cmd/postprocess-bythresh/main.go +++ b/cmd/postprocess-bythresh/main.go @@ -19,7 +19,6 @@ import ( //TO DO: make writetofile return an error and handle that accordingly // potential TO DO: add text versions where footer is cropped on odd/even pages only - // the trimblanks function trims the blank lines from a text input func trimblanks(hocrfile string) string { @@ -50,7 +49,7 @@ func dehyphenateString(in string) string { words := strings.Split(line, " ") last := words[len(words)-1] // the - 2 here is to account for a trailing newline and counting from zero - if len(last) > 0 && last[len(last) - 1] == '-' && i < len(lines) - 2 { + if len(last) > 0 && last[len(last)-1] == '-' && i < len(lines)-2 { nextwords := strings.Split(lines[i+1], " ") if len(nextwords) > 0 { line = line[0:len(line)-1] + nextwords[0] @@ -66,17 +65,15 @@ func dehyphenateString(in string) string { return strings.Join(newlines, " ") } - // the fullcrop function takes a text input and crops the first and the last line (if text is at least 2 lines long) func fullcrop(noblanks string) string { - alllines := strings.Split(noblanks, "\n") - + if len(alllines) <= 2 { - return "" - } else { - return strings.Join(alllines[1:len(alllines)-2], "\n") + return "" + } else { + return strings.Join(alllines[1:len(alllines)-2], "\n") } } @@ -132,7 +129,6 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string, var killheadtxt string var footkilltxt string - hocrfilepath := filepath.Join(bookdirectory, hocrfilename) confpath := filepath.Join(bookdirectory, "conf") @@ -165,18 +161,16 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string, if err != nil { log.Fatal(err) } - - + trimbest := trimblanks(hocrfiletext) - + alltxt = dehyphenateString(trimbest) - + croptxt = dehyphenateString(fullcrop(trimbest)) - + killheadtxt = dehyphenateString(headcrop(trimbest)) - + footkilltxt = dehyphenateString(footcrop(trimbest)) - } return alltxt, croptxt, killheadtxt, footkilltxt @@ -185,7 +179,7 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string, // the writetofile function takes a directory, filename and text input and creates a text file within the bookdirectory from them. func writetofile(bookdirectory, textfilebase, txt string) error { alltxtfile := filepath.Join(bookdirectory, textfilebase) - + file, err := os.Create(alltxtfile) if err != nil { return fmt.Errorf("Error opening file %s: %v", alltxtfile, err) @@ -194,7 +188,7 @@ func writetofile(bookdirectory, textfilebase, txt string) error { if _, err := file.WriteString(txt); err != nil { log.Println(err) } -return err + return err } @@ -215,7 +209,7 @@ func main() { bookdirectory := flag.Arg(0) confthreshstring := strconv.Itoa(*confthresh) - + fmt.Println("Postprocessing", bookdirectory, "with threshold", *confthresh) bestpath := filepath.Join(bookdirectory, "best") @@ -239,32 +233,31 @@ func main() { crop = crop + " " + croptxt killhead = killhead + " " + killheadtxt killfoot = killfoot + " " + footkilltxt - + } } - - - bookname:= filepath.Base(bookdirectory) - b := bookname + "_" + confthreshstring - err1 := writetofile(bookdirectory, b + "_all.txt", all) - if err1 != nil { + bookname := filepath.Base(bookdirectory) + b := bookname + "_" + confthreshstring + + err1 := writetofile(bookdirectory, b+"_all.txt", all) + if err1 != nil { log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err1) - } - - err2 := writetofile(bookdirectory, b + "_crop.txt", crop) - if err2 != nil { + } + + err2 := writetofile(bookdirectory, b+"_crop.txt", crop) + if err2 != nil { log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err2) - } - - err3 := writetofile(bookdirectory, b + "_nohead.txt", killhead) - if err3 != nil { + } + + err3 := writetofile(bookdirectory, b+"_nohead.txt", killhead) + if err3 != nil { log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err3) - } - - err4 := writetofile(bookdirectory, b + "_nofoot.txt", killfoot) - if err4 != nil { + } + + err4 := writetofile(bookdirectory, b+"_nofoot.txt", killfoot) + if err4 != nil { log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err4) - } + } } -- cgit v1.2.1-24-ge1ad From 6b5145f0b75c8d5719bf44d5f654b9a2d1e3b2cd Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 16 Nov 2020 16:43:53 +0000 Subject: [rescribe] Mention in usage that things can be saved in a different directory --- cmd/rescribe/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'cmd') diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 6a2fb9f..8ca4189 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -24,7 +24,7 @@ import ( "rescribe.xyz/bookpipeline/internal/pipeline" ) -const usage = `Usage: rescribe [-v] [-t training] bookdir +const usage = `Usage: rescribe [-v] [-t training] bookdir [savedir] Process and OCR a book using the Rescribe pipeline on a local machine. ` -- cgit v1.2.1-24-ge1ad From 56c1cf041aec9cb2352a3bd4a4b46e65a3cc04c0 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 16 Nov 2020 16:44:42 +0000 Subject: [rescribe] Add txt output, only keep colour pdf, and reorganise files so they're more user-friendly --- cmd/rescribe/main.go | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) (limited to 'cmd') diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 8ca4189..fe36aea 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -20,6 +20,7 @@ import ( "time" "rescribe.xyz/bookpipeline" + "rescribe.xyz/utils/pkg/hocr" "rescribe.xyz/bookpipeline/internal/pipeline" ) @@ -175,6 +176,55 @@ func main() { if err != nil { log.Fatalf("Error removing temporary directory %s: %v", tempdir, err) } + + hocrs, err := filepath.Glob(fmt.Sprintf("%s/*hocr", bookname)) + if err != nil { + log.Fatalf("Error looking for .hocr files: %v", err) + } + + for _, v := range hocrs { + err = addTxtVersion(v) + if err != nil { + log.Fatalf("Error creating txt version of %s: %v", v, err) + } + + err = os.MkdirAll(filepath.Join(bookname, "hocr"), 0755) + if err != nil { + log.Fatalf("Error creating hocr directory: %v", err) + } + + err = os.Rename(v, filepath.Join(bookname, "hocr", filepath.Base(v))) + if err != nil { + log.Fatalf("Error moving hocr %s to hocr directory: %v", v, err) + } + } + + // For simplicity, remove .binarised.pdf and rename .colour.pdf to .pdf + _ = os.Remove(filepath.Join(bookname, bookname + ".binarised.pdf")) + _ = os.Rename(filepath.Join(bookname, bookname + ".colour.pdf"), filepath.Join(bookname, bookname + ".pdf")) +} + +func addTxtVersion(hocrfn string) error { + dir := filepath.Dir(hocrfn) + err := os.MkdirAll(filepath.Join(dir, "text"), 0755) + if err != nil { + log.Fatalf("Error creating text directory: %v", err) + } + + t, err := hocr.GetText(hocrfn) + if err != nil { + return fmt.Errorf("Error getting text from hocr file %s: %v", hocrfn, err) + } + + basefn := strings.TrimSuffix(filepath.Base(hocrfn), ".hocr") + ".txt" + fn := filepath.Join(dir, "text", basefn) + + err = ioutil.WriteFile(fn, []byte(t), 0644) + if err != nil { + return fmt.Errorf("Error creating text file %s: %v", fn, err) + } + + return nil } func uploadbook(dir string, name string, conn Pipeliner) error { -- cgit v1.2.1-24-ge1ad From eefa8f50d7ab915ce426c837cf504d26b7d4ccee Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 16 Nov 2020 17:43:13 +0000 Subject: [rescribe] Default to an appropriate tesscmd for Windows --- cmd/rescribe/main.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'cmd') diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index fe36aea..2320a2c 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -16,6 +16,7 @@ import ( "os/exec" "path/filepath" "regexp" + "runtime" "strings" "time" @@ -76,9 +77,14 @@ func resetTimer(t *time.Timer, d time.Duration) { } func main() { + deftesscmd := "tesseract" + if runtime.GOOS == "windows" { + deftesscmd = "C:\\Program Files\\Tesseract-OCR\\tesseract.exe" + } + verbose := flag.Bool("v", false, "verbose") training := flag.String("t", "training/rescribev7_fast.traineddata", "path to the tesseract training file to use") - tesscmd := flag.String("tesscmd", "tesseract", "The Tesseract executable to run. You may need to set this to the full path of Tesseract.exe if you're on Windows.") + tesscmd := flag.String("tesscmd", deftesscmd, "The Tesseract executable to run. You may need to set this to the full path of Tesseract.exe if you're on Windows.") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) @@ -131,7 +137,7 @@ func main() { fmt.Fprintf(os.Stderr, "Error: Can't run Tesseract.\n") fmt.Fprintf(os.Stderr, "Ensure that Tesseract is installed and available.\n") fmt.Fprintf(os.Stderr, "You may need to -tesscmd to the full path of Tesseract.exe if you're on Windows, like this:\n") - fmt.Fprintf(os.Stderr, " rescribe -tesscmd 'C:\\Program Files\\Tesseract OCR\\tesseract.exe' ...\n") + fmt.Fprintf(os.Stderr, " rescribe -tesscmd 'C:\\Program Files\\Tesseract OCR (x86)\\tesseract.exe' ...\n") os.Exit(1) } -- cgit v1.2.1-24-ge1ad From f71fd636f151e5cb7eafb2ae6c21c1c188d43fdd Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 17 Nov 2020 12:24:42 +0000 Subject: Remove _bin0.x from txt filenames --- cmd/rescribe/main.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'cmd') diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 2320a2c..f4489d8 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -34,6 +34,7 @@ Process and OCR a book using the Rescribe pipeline on a local machine. const QueueTimeoutSecs = 2 * 60 const PauseBetweenChecks = 1 * time.Second const LogSaveTime = 1 * time.Minute +var thresholds = []float64{0.1, 0.2, 0.3} // null writer to enable non-verbose logging to be discarded type NullWriter bool @@ -222,8 +223,11 @@ func addTxtVersion(hocrfn string) error { return fmt.Errorf("Error getting text from hocr file %s: %v", hocrfn, err) } - basefn := strings.TrimSuffix(filepath.Base(hocrfn), ".hocr") + ".txt" - fn := filepath.Join(dir, "text", basefn) + basefn := filepath.Base(hocrfn) + for _, v := range thresholds { + basefn = strings.TrimSuffix(basefn, fmt.Sprintf("_bin%.1f.hocr", v)) + } + fn := filepath.Join(dir, "text", basefn + ".txt") err = ioutil.WriteFile(fn, []byte(t), 0644) if err != nil { @@ -312,7 +316,7 @@ func processbook(training string, tesscmd string, conn Pipeliner) error { stopTimer(stopIfQuiet) conn.Log("Message received on preprocess queue, processing", msg.Body) fmt.Printf(" Preprocessing book (binarising and wiping)\n") - err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.3}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess(thresholds), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output resetTimer(stopIfQuiet, quietTime) if err != nil { -- cgit v1.2.1-24-ge1ad From 2717c5ed21a082a7f24833f3d57b303fd22bd4e5 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 17 Nov 2020 16:37:54 +0000 Subject: Add trimqueue and logwholequeue utilities which can help deal with weird queue states --- cmd/logwholequeue/main.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++ cmd/trimqueue/main.go | 84 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 169 insertions(+) create mode 100644 cmd/logwholequeue/main.go create mode 100644 cmd/trimqueue/main.go (limited to 'cmd') diff --git a/cmd/logwholequeue/main.go b/cmd/logwholequeue/main.go new file mode 100644 index 0000000..71e8927 --- /dev/null +++ b/cmd/logwholequeue/main.go @@ -0,0 +1,85 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// logwholequeue gets all messages in a queue. This can be useful +// for debugging queue issues. +package main + +import ( + "flag" + "fmt" + "log" + + "rescribe.xyz/bookpipeline" +) + +const usage = `Usage: logwholequeue qname + +logwholequeue gets all messages in a queue. + +This can be useful for debugging queue issues. + +Valid queue names: +- preprocess +- wipeonly +- ocrpage +- analyse +` + +type QueuePipeliner interface { + Init() error + LogQueue(url string) error + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string +} + +func main() { + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + if flag.NArg() != 1 { + flag.Usage() + return + } + + var conn QueuePipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2"} + + err := conn.Init() + if err != nil { + log.Fatalln("Error setting up cloud connection:", err) + } + + qdetails := []struct { + id, name string + }{ + {conn.PreQueueId(), "preprocess"}, + {conn.WipeQueueId(), "wipeonly"}, + {conn.OCRPageQueueId(), "ocrpage"}, + {conn.AnalyseQueueId(), "analyse"}, + } + + qname := flag.Arg(0) + + var qid string + for i, n := range qdetails { + if n.name == qname { + qid = qdetails[i].id + break + } + } + if qid == "" { + log.Fatalln("Error, no queue named", qname) + } + + err = conn.LogQueue(qid) + if err != nil { + log.Fatalln("Error getting queue", qname, ":", err) + } +} diff --git a/cmd/trimqueue/main.go b/cmd/trimqueue/main.go new file mode 100644 index 0000000..cf65c4d --- /dev/null +++ b/cmd/trimqueue/main.go @@ -0,0 +1,84 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// trimqueue deletes any messages in a queue that match a specified +// prefix. +package main + +import ( + "flag" + "fmt" + "log" + + "rescribe.xyz/bookpipeline" +) + +const usage = `Usage: trimprefix qname prefix + +trimqueue deletes any messages in a queue that match a specified +prefix. + +Valid queue names: +- preprocess +- wipeonly +- ocrpage +- analyse +` + +type QueuePipeliner interface { + Init() error + RemovePrefixesFromQueue(url string, prefix string) error + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string +} + +func main() { + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + if flag.NArg() != 2 { + flag.Usage() + return + } + + var conn QueuePipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2"} + + err := conn.Init() + if err != nil { + log.Fatalln("Error setting up cloud connection:", err) + } + + qdetails := []struct { + id, name string + }{ + {conn.PreQueueId(), "preprocess"}, + {conn.WipeQueueId(), "wipeonly"}, + {conn.OCRPageQueueId(), "ocrpage"}, + {conn.AnalyseQueueId(), "analyse"}, + } + + qname := flag.Arg(0) + + var qid string + for i, n := range qdetails { + if n.name == qname { + qid = qdetails[i].id + break + } + } + if qid == "" { + log.Fatalln("Error, no queue named", qname) + } + + err = conn.RemovePrefixesFromQueue(qid, flag.Arg(1)) + if err != nil { + log.Fatalln("Error removing prefixes from queue", qname, ":", err) + } +} -- cgit v1.2.1-24-ge1ad From 0d914a5de3f8169d41df4fcff1ee4aea6d01afbe Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 24 Nov 2020 12:40:54 +0000 Subject: [booktopipeline] Add a check to disallow adding a book that already exists This is important as if a book is added which has already been done, then an analyse job will be added every time a page is OCRed, which will clog up the pipeline with unnecessary work. Also if a book was added with the same name but differently named files, or a different number of pages, the results would almost certainly not be as intended. In the case of a book really wanting to be added with a particular name, either the original directory can be removed on S3, or "v2" or similar can be appended to the book name before calling booktopipeline. --- cmd/booktopipeline/main.go | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'cmd') diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go index 7254d78..b4f4d99 100644 --- a/cmd/booktopipeline/main.go +++ b/cmd/booktopipeline/main.go @@ -102,6 +102,15 @@ func main() { log.Fatalln(err) } + verboselog.Println("Checking that a book hasn't already been uploaded with that name") + list, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + log.Fatalln(err) + } + if len(list) > 0 { + log.Fatalf("Error: There is already a book in S3 named %s", bookname) + } + verboselog.Println("Uploading all images are valid in", bookdir) err = pipeline.UploadImages(bookdir, bookname, conn) if err != nil { -- cgit v1.2.1-24-ge1ad From cbe02a57377787cd34172453a477f68f200448e8 Mon Sep 17 00:00:00 2001 From: Nick White Date: Thu, 3 Dec 2020 15:20:15 +0000 Subject: [rescribe] Fix portability issue where hocrs may not be correctly moved and txt-ified on windows --- cmd/rescribe/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'cmd') diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index f4489d8..880bbc2 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -184,7 +184,7 @@ func main() { log.Fatalf("Error removing temporary directory %s: %v", tempdir, err) } - hocrs, err := filepath.Glob(fmt.Sprintf("%s/*hocr", bookname)) + hocrs, err := filepath.Glob(fmt.Sprintf("%s%s*hocr", bookname, string(filepath.Separator))) if err != nil { log.Fatalf("Error looking for .hocr files: %v", err) } -- cgit v1.2.1-24-ge1ad From 068ad0b666705a49ab22d7b48cd6a7d67b37f234 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 7 Dec 2020 16:53:58 +0000 Subject: [rescribe] Allow saving of results to somewhere other than a directory named after the book being processed --- cmd/getpipelinebook/main.go | 10 +++++----- cmd/rescribe/main.go | 38 ++++++++++++++++++++++---------------- 2 files changed, 27 insertions(+), 21 deletions(-) (limited to 'cmd') diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go index 5116414..ccedd72 100644 --- a/cmd/getpipelinebook/main.go +++ b/cmd/getpipelinebook/main.go @@ -87,7 +87,7 @@ func main() { if *all { verboselog.Println("Downloading all files for", bookname) - err = pipeline.DownloadAll(bookname, conn) + err = pipeline.DownloadAll(bookname, bookname, conn) if err != nil { log.Fatalln(err) } @@ -122,7 +122,7 @@ func main() { if *pdf { verboselog.Println("Downloading PDFs") - pipeline.DownloadPdfs(bookname, conn) + pipeline.DownloadPdfs(bookname, bookname, conn) } if *binarisedpdf || *colourpdf || *graph || *pdf { @@ -130,19 +130,19 @@ func main() { } verboselog.Println("Downloading best pages") - err = pipeline.DownloadBestPages(bookname, conn, *png) + err = pipeline.DownloadBestPages(bookname, bookname, conn, *png) if err != nil { log.Fatalln(err) } verboselog.Println("Downloading PDFs") - pipeline.DownloadPdfs(bookname, conn) + pipeline.DownloadPdfs(bookname, bookname, conn) if err != nil { log.Fatalln(err) } verboselog.Println("Downloading analyses") - err = pipeline.DownloadAnalyses(bookname, conn) + err = pipeline.DownloadAnalyses(bookname, bookname, conn) if err != nil { log.Fatalln(err) } diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 880bbc2..8414c53 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -29,6 +29,9 @@ import ( const usage = `Usage: rescribe [-v] [-t training] bookdir [savedir] Process and OCR a book using the Rescribe pipeline on a local machine. + +OCR results are saved into the bookdir directory unless savedir is +specified. ` const QueueTimeoutSecs = 2 * 60 @@ -93,17 +96,16 @@ func main() { } flag.Parse() - if flag.NArg() < 1 || flag.NArg() > 3 { + if flag.NArg() < 1 || flag.NArg() > 2 { flag.Usage() return } bookdir := flag.Arg(0) - var bookname string + bookname := filepath.Base(bookdir) + savedir := bookdir if flag.NArg() > 1 { - bookname = flag.Arg(1) - } else { - bookname = filepath.Base(bookdir) + savedir = flag.Arg(1) } var verboselog *log.Logger @@ -172,8 +174,12 @@ func main() { log.Fatalln(err) } - fmt.Printf("Saving finished book to %s\n", bookname) - err = downloadbook(bookname, conn) + fmt.Printf("Saving finished book to %s\n", savedir) + err = os.MkdirAll(savedir, 0755) + if err != nil { + log.Fatalf("Error creating save directory %s: %v", savedir, err) + } + err = downloadbook(savedir, bookname, conn) if err != nil { _ = os.RemoveAll(tempdir) log.Fatalln(err) @@ -184,7 +190,7 @@ func main() { log.Fatalf("Error removing temporary directory %s: %v", tempdir, err) } - hocrs, err := filepath.Glob(fmt.Sprintf("%s%s*hocr", bookname, string(filepath.Separator))) + hocrs, err := filepath.Glob(fmt.Sprintf("%s%s*hocr", savedir, string(filepath.Separator))) if err != nil { log.Fatalf("Error looking for .hocr files: %v", err) } @@ -195,20 +201,20 @@ func main() { log.Fatalf("Error creating txt version of %s: %v", v, err) } - err = os.MkdirAll(filepath.Join(bookname, "hocr"), 0755) + err = os.MkdirAll(filepath.Join(savedir, "hocr"), 0755) if err != nil { log.Fatalf("Error creating hocr directory: %v", err) } - err = os.Rename(v, filepath.Join(bookname, "hocr", filepath.Base(v))) + err = os.Rename(v, filepath.Join(savedir, "hocr", filepath.Base(v))) if err != nil { log.Fatalf("Error moving hocr %s to hocr directory: %v", v, err) } } // For simplicity, remove .binarised.pdf and rename .colour.pdf to .pdf - _ = os.Remove(filepath.Join(bookname, bookname + ".binarised.pdf")) - _ = os.Rename(filepath.Join(bookname, bookname + ".colour.pdf"), filepath.Join(bookname, bookname + ".pdf")) + _ = os.Remove(filepath.Join(savedir, bookname + ".binarised.pdf")) + _ = os.Rename(filepath.Join(savedir, bookname + ".colour.pdf"), filepath.Join(savedir, bookname + ".pdf")) } func addTxtVersion(hocrfn string) error { @@ -257,23 +263,23 @@ func uploadbook(dir string, name string, conn Pipeliner) error { return nil } -func downloadbook(name string, conn Pipeliner) error { +func downloadbook(dir string, name string, conn Pipeliner) error { err := os.MkdirAll(name, 0755) if err != nil { log.Fatalln("Failed to create directory", name, err) } - err = pipeline.DownloadBestPages(name, conn, false) + err = pipeline.DownloadBestPages(dir, name, conn, false) if err != nil { return fmt.Errorf("Error downloading best pages: %v", err) } - err = pipeline.DownloadPdfs(name, conn) + err = pipeline.DownloadPdfs(dir, name, conn) if err != nil { return fmt.Errorf("Error downloading PDFs: %v", err) } - err = pipeline.DownloadAnalyses(name, conn) + err = pipeline.DownloadAnalyses(dir, name, conn) if err != nil { return fmt.Errorf("Error downloading analyses: %v", err) } -- cgit v1.2.1-24-ge1ad From 17b2d91d5f323fd985ca012e50d36908cbceba87 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 7 Dec 2020 17:04:12 +0000 Subject: [rescribe] Fix up *.hocr glob, which ensures that using a savedir that already has a hocr directory in it will work --- cmd/rescribe/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'cmd') diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go index 8414c53..07eeaf0 100644 --- a/cmd/rescribe/main.go +++ b/cmd/rescribe/main.go @@ -190,7 +190,7 @@ func main() { log.Fatalf("Error removing temporary directory %s: %v", tempdir, err) } - hocrs, err := filepath.Glob(fmt.Sprintf("%s%s*hocr", savedir, string(filepath.Separator))) + hocrs, err := filepath.Glob(fmt.Sprintf("%s%s*.hocr", savedir, string(filepath.Separator))) if err != nil { log.Fatalf("Error looking for .hocr files: %v", err) } -- cgit v1.2.1-24-ge1ad From 9147e57a3a634ad303e8f1e7c456988996d5c75b Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 14 Dec 2020 17:08:14 +0000 Subject: Add rmbook tool --- cmd/rmbook/main.go | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 cmd/rmbook/main.go (limited to 'cmd') diff --git a/cmd/rmbook/main.go b/cmd/rmbook/main.go new file mode 100644 index 0000000..8d434a9 --- /dev/null +++ b/cmd/rmbook/main.go @@ -0,0 +1,78 @@ +// Copyright 2020 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// rmbook removes a book from cloud storage. +package main + +import ( + "flag" + "fmt" + "log" + + "rescribe.xyz/bookpipeline" +) + +const usage = `Usage: rmbook bookname + +Removes a book from cloud storage. +` + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} + +type RmPipeliner interface { + MinimalInit() error + WIPStorageId() string + DeleteObjects(bucket string, keys []string) error + ListObjects(bucket string, prefix string) ([]string, error) +} + +func main() { + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + if flag.NArg() < 1 { + flag.Usage() + return + } + + var n NullWriter + verboselog := log.New(n, "", log.LstdFlags) + + var conn RmPipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + + fmt.Println("Setting up cloud connection") + err := conn.MinimalInit() + if err != nil { + log.Fatalln("Error setting up cloud connection:", err) + } + + bookname := flag.Arg(0) + + fmt.Println("Getting list of files for book") + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + log.Fatalln("Error in listing book items:", err) + } + + if len(objs) == 0 { + log.Fatalln("No files found for book:", bookname) + } + + fmt.Println("Deleting all files for book") + err = conn.DeleteObjects(conn.WIPStorageId(), objs) + if err != nil { + log.Fatalln("Error deleting book files:", err) + } + + fmt.Println("Finished deleting files") +} -- cgit v1.2.1-24-ge1ad From fb1069b504e8cd37b9a2bcdccefa9699d0e1dee9 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 15 Dec 2020 12:37:43 +0000 Subject: [rmbook] Add -dryrun flag --- cmd/rmbook/main.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'cmd') diff --git a/cmd/rmbook/main.go b/cmd/rmbook/main.go index 8d434a9..c195d85 100644 --- a/cmd/rmbook/main.go +++ b/cmd/rmbook/main.go @@ -13,7 +13,7 @@ import ( "rescribe.xyz/bookpipeline" ) -const usage = `Usage: rmbook bookname +const usage = `Usage: rmbook [-dryrun] bookname Removes a book from cloud storage. ` @@ -33,6 +33,7 @@ type RmPipeliner interface { } func main() { + dryrun := flag.Bool("dryrun", false, "print which files would be deleted but don't delete") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) flag.PrintDefaults() @@ -68,6 +69,14 @@ func main() { log.Fatalln("No files found for book:", bookname) } + if *dryrun { + fmt.Printf("I would delete these files:\n") + for _, v := range objs { + fmt.Println(v) + } + return + } + fmt.Println("Deleting all files for book") err = conn.DeleteObjects(conn.WIPStorageId(), objs) if err != nil { -- cgit v1.2.1-24-ge1ad From 7d9f77c2f1102aec026d1af78d1fe4725ed76674 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 15 Dec 2020 12:38:36 +0000 Subject: [rmbook] Append / to end of bookname, to ensure e.g. "1" doesnt match all books starting with "1" --- cmd/rmbook/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'cmd') diff --git a/cmd/rmbook/main.go b/cmd/rmbook/main.go index c195d85..fcacc2e 100644 --- a/cmd/rmbook/main.go +++ b/cmd/rmbook/main.go @@ -57,7 +57,7 @@ func main() { log.Fatalln("Error setting up cloud connection:", err) } - bookname := flag.Arg(0) + bookname := flag.Arg(0) + "/" fmt.Println("Getting list of files for book") objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) -- cgit v1.2.1-24-ge1ad From 86cc5d6c921ac05e0d08f66b205b51e1f5adb938 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 26 Jan 2021 13:50:02 +0000 Subject: Speed up lspipeline by making s3 requests concurrently and only processing single results from ListObjects requests --- cmd/lspipeline/main.go | 95 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 77 insertions(+), 18 deletions(-) (limited to 'cmd') diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index b649778..bf19b52 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -12,6 +12,8 @@ import ( "os/exec" "sort" "strings" + "sync" + "time" "rescribe.xyz/bookpipeline" ) @@ -100,6 +102,46 @@ func (o ObjMetas) Less(i, j int) bool { return o[i].Date.Before(o[j].Date) } +// getBookDetails determines whether a book is done and what date +// it was completed, or if it has not finished, the date of any +// book file. +func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, err error) { + // First try to get the graph.png file from the book, which marks + // it as done + objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), key+"graph.png") + if err == nil && len(objs) > 0 { + return objs[0].Date, true, nil + } + + // Otherwise get any file from the book to get a date to sort by + objs, err = conn.ListObjectsWithMeta(conn.WIPStorageId(), key) + if err != nil { + return time.Time{}, false, err + } + if len(objs) == 0 { + return time.Time{}, false, fmt.Errorf("No files found for book %s", key) + } + return objs[0].Date, false, nil +} + +// getBookDetailsWg gets the details for a book putting it into either the +// done or inprogress channels as appropriate, and using a sync.WaitGroup Done +// signal so it can be tracked. On error it sends to the errc channel. +func getBookDetailsWg(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error, wg *sync.WaitGroup) { + defer wg.Done() + date, isdone, err := getBookDetails(conn, key) + if err != nil { + errc <- err + return + } + meta := bookpipeline.ObjMeta{Name: strings.TrimSuffix(key, "/"), Date: date} + if isdone { + done <- meta + } else { + inprogress <- meta + } +} + // getBookStatus returns a list of in progress and done books. // It determines this by finding all prefixes, and splitting them // into two lists, those which have a 'graph.png' file (the done @@ -108,35 +150,52 @@ func (o ObjMetas) Less(i, j int) bool { // of a random file with the prefix if no graph.png was found. func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) { prefixes, err := conn.ListObjectPrefixes(conn.WIPStorageId()) - var inprogressmeta, donemeta ObjMetas if err != nil { log.Println("Error getting object prefixes:", err) return } - // Search for graph.png to determine done books (and save the date of it to sort with) + + // 100,000 size buffer is to ensure we never block, as we're using waitgroup + // rather than channel blocking to determine when to continue. Probably there + // is a better way to do this, though, like reading the done and inprogress + // channels in a goroutine and doing wg.Done() when each is read there instead. + donec := make(chan bookpipeline.ObjMeta, 100000) + inprogressc := make(chan bookpipeline.ObjMeta, 100000) + errc := make(chan error, 100000) + + var wg sync.WaitGroup for _, p := range prefixes { - objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), p+"graph.png") - if err != nil || len(objs) == 0 { - inprogressmeta = append(inprogressmeta, bookpipeline.ObjMeta{Name: p}) - } else { - donemeta = append(donemeta, bookpipeline.ObjMeta{Name: p, Date: objs[0].Date}) - } + wg.Add(1) + go getBookDetailsWg(conn, p, donec, inprogressc, errc, &wg) } - // Get a random file from the inprogress list to get a date to sort by - for _, i := range inprogressmeta { - objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), i.Name) - if err != nil || len(objs) == 0 { - continue - } - i.Date = objs[0].Date + wg.Wait() + close(donec) + close(inprogressc) + + select { + case err = <-errc: + return inprogress, done, err + default: + break + } + + var inprogressmeta, donemeta ObjMetas + + for i := range donec { + donemeta = append(donemeta, i) + } + for i := range inprogressc { + inprogressmeta = append(inprogressmeta, i) } + sort.Sort(donemeta) + sort.Sort(inprogressmeta) + for _, i := range donemeta { - done = append(done, strings.TrimSuffix(i.Name, "/")) + done = append(done, i.Name) } - sort.Sort(inprogressmeta) for _, i := range inprogressmeta { - inprogress = append(inprogress, strings.TrimSuffix(i.Name, "/")) + inprogress = append(inprogress, i.Name) } return -- cgit v1.2.1-24-ge1ad From 54150b54cd06e3deba44e73b151070b74a4d8e76 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 26 Jan 2021 14:17:19 +0000 Subject: Improve lspipeline concurrency by removing WaitGroup stuff --- cmd/lspipeline/main.go | 52 ++++++++++++++++++++------------------------------ 1 file changed, 21 insertions(+), 31 deletions(-) (limited to 'cmd') diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index bf19b52..8980c59 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -12,7 +12,6 @@ import ( "os/exec" "sort" "strings" - "sync" "time" "rescribe.xyz/bookpipeline" @@ -124,11 +123,10 @@ func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, er return objs[0].Date, false, nil } -// getBookDetailsWg gets the details for a book putting it into either the -// done or inprogress channels as appropriate, and using a sync.WaitGroup Done -// signal so it can be tracked. On error it sends to the errc channel. -func getBookDetailsWg(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error, wg *sync.WaitGroup) { - defer wg.Done() +// getBookDetailsChan gets the details for a book putting it into either the +// done or inprogress channels as appropriate, or sending an error to errc +// on failure. +func getBookDetailsChan(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error) { date, isdone, err := getBookDetails(conn, key) if err != nil { errc <- err @@ -148,6 +146,8 @@ func getBookDetailsWg(conn LsPipeliner, key string, done chan bookpipeline.ObjMe // list), and those which do not (the inprogress list). They are // sorted according to the date of the graph.png file, or the date // of a random file with the prefix if no graph.png was found. +// It spins up many goroutines to do query the book status and +// dates, as it is far faster to do concurrently. func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) { prefixes, err := conn.ListObjectPrefixes(conn.WIPStorageId()) if err != nil { @@ -155,37 +155,27 @@ func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err er return } - // 100,000 size buffer is to ensure we never block, as we're using waitgroup - // rather than channel blocking to determine when to continue. Probably there - // is a better way to do this, though, like reading the done and inprogress - // channels in a goroutine and doing wg.Done() when each is read there instead. - donec := make(chan bookpipeline.ObjMeta, 100000) - inprogressc := make(chan bookpipeline.ObjMeta, 100000) - errc := make(chan error, 100000) + donec := make(chan bookpipeline.ObjMeta, 100) + inprogressc := make(chan bookpipeline.ObjMeta, 100) + errc := make(chan error) - var wg sync.WaitGroup for _, p := range prefixes { - wg.Add(1) - go getBookDetailsWg(conn, p, donec, inprogressc, errc, &wg) - } - wg.Wait() - close(donec) - close(inprogressc) - - select { - case err = <-errc: - return inprogress, done, err - default: - break + go getBookDetailsChan(conn, p, donec, inprogressc, errc) } var inprogressmeta, donemeta ObjMetas - for i := range donec { - donemeta = append(donemeta, i) - } - for i := range inprogressc { - inprogressmeta = append(inprogressmeta, i) + // there will be exactly as many sends to donec or inprogressc + // as there are prefixes + for range prefixes { + select { + case i := <-donec: + donemeta = append(donemeta, i) + case i := <-inprogressc: + inprogressmeta = append(inprogressmeta, i) + case err = <-errc: + return inprogress, done, err + } } sort.Sort(donemeta) -- cgit v1.2.1-24-ge1ad From 5c3cee66a90ce6ef87e125b3bf011a6903d38083 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 26 Jan 2021 14:56:10 +0000 Subject: Make ListObjectsWithMeta generic again and create a specialised ListObjectWithMeta for single file listing, so we can still be as fast, but do not have a misleading api --- cmd/lspipeline/main.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) (limited to 'cmd') diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index 8980c59..131ff12 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -36,7 +36,7 @@ type LsPipeliner interface { AnalyseQueueId() string GetQueueDetails(url string) (string, string, error) GetInstanceDetails() ([]bookpipeline.InstanceDetails, error) - ListObjectsWithMeta(bucket string, prefix string) ([]bookpipeline.ObjMeta, error) + ListObjectWithMeta(bucket string, prefix string) (bookpipeline.ObjMeta, error) ListObjectPrefixes(bucket string) ([]string, error) WIPStorageId() string } @@ -107,20 +107,17 @@ func (o ObjMetas) Less(i, j int) bool { func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, err error) { // First try to get the graph.png file from the book, which marks // it as done - objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), key+"graph.png") - if err == nil && len(objs) > 0 { - return objs[0].Date, true, nil + obj, err := conn.ListObjectWithMeta(conn.WIPStorageId(), key+"graph.png") + if err == nil { + return obj.Date, true, nil } // Otherwise get any file from the book to get a date to sort by - objs, err = conn.ListObjectsWithMeta(conn.WIPStorageId(), key) + obj, err = conn.ListObjectWithMeta(conn.WIPStorageId(), key) if err != nil { return time.Time{}, false, err } - if len(objs) == 0 { - return time.Time{}, false, fmt.Errorf("No files found for book %s", key) - } - return objs[0].Date, false, nil + return obj.Date, false, nil } // getBookDetailsChan gets the details for a book putting it into either the -- cgit v1.2.1-24-ge1ad