diff options
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/bookpipeline/main.go | 765 | ||||
-rw-r--r-- | cmd/booktopipeline/main.go | 92 | ||||
-rw-r--r-- | cmd/getbests/main.go | 4 | ||||
-rw-r--r-- | cmd/getpipelinebook/main.go | 98 | ||||
-rw-r--r-- | cmd/logwholequeue/main.go | 85 | ||||
-rw-r--r-- | cmd/lspipeline/main.go | 82 | ||||
-rw-r--r-- | cmd/postprocess-bythresh/main.go | 71 | ||||
-rw-r--r-- | cmd/rescribe/main.go | 395 | ||||
-rw-r--r-- | cmd/rmbook/main.go | 87 | ||||
-rw-r--r-- | cmd/trimqueue/main.go | 84 |
10 files changed, 840 insertions, 923 deletions
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 36295a6..909b431 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -11,23 +11,18 @@ import ( "bytes" "flag" "fmt" - "io/ioutil" "log" - "net/smtp" "os" "os/exec" - "path/filepath" "regexp" - "sort" - "strings" "time" "rescribe.xyz/bookpipeline" - "rescribe.xyz/preproc" - "rescribe.xyz/utils/pkg/hocr" + + "rescribe.xyz/bookpipeline/internal/pipeline" ) -const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] +const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] [-autostop secs] Watches the preprocess, wipeonly, ocrpage and analyse queues for messages. When one is found this general process is followed: @@ -47,10 +42,9 @@ this put a text file in {UserConfigDir}/bookpipeline/mailsettings with the contents: {smtpserver} {port} {username} {password} {from} {to} ` +const QueueTimeoutSecs = 2 * 60 const PauseBetweenChecks = 3 * time.Minute -const TimeBeforeShutdown = 5 * time.Minute const LogSaveTime = 1 * time.Minute -const HeartbeatSeconds = 60 // null writer to enable non-verbose logging to be discarded type NullWriter bool @@ -81,686 +75,16 @@ type Pipeliner interface { Log(v ...interface{}) } -type pageimg struct { - hocr, img string -} - -type mailSettings struct { - server, port, user, pass, from, to string -} - -func getMailSettings() (mailSettings, error) { - p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings") - b, err := ioutil.ReadFile(p) - if err != nil { - return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err) - } - f := strings.Fields(string(b)) - if len(f) != 6 { - return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f)) - } - return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil -} - -func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { - for key := range dl { - fn := filepath.Join(dir, filepath.Base(key)) - logger.Println("Downloading", key) - err := conn.Download(conn.WIPStorageId(), key, fn) - if err != nil { - for range dl { - } // consume the rest of the receiving channel so it isn't blocked - close(process) - errc <- err - return - } - process <- fn - } - close(process) -} - -func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { - for path := range c { - name := filepath.Base(path) - key := bookname + "/" + name - logger.Println("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - err = os.Remove(path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - } - - done <- true -} - -func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { - for path := range c { - name := filepath.Base(path) - key := bookname + "/" + name - logger.Println("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - err = os.Remove(path) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - logger.Println("Adding", key, training, "to queue", toQueue) - err = conn.AddToQueue(toQueue, key+" "+training) - if err != nil { - for range c { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - } - - done <- true -} - -func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range pre { - logger.Println("Preprocessing", path) - done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30) - if err != nil { - for range pre { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - _ = os.Remove(path) - for _, p := range done { - up <- p - } - } - close(up) -} - -func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range towipe { - logger.Println("Wiping", path) - s := strings.Split(path, ".") - base := strings.Join(s[:len(s)-1], "") - outpath := base + "_bin0.0.png" - err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30) - if err != nil { - for range towipe { - } // consume the rest of the receiving channel so it isn't blocked - errc <- err - return - } - up <- outpath - } - close(up) -} - -func ocr(training string) func(chan string, chan string, chan error, *log.Logger) { - return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range toocr { - logger.Println("OCRing", path) - name := strings.Replace(path, ".png", "", 1) - cmd := exec.Command("tesseract", "-l", training, path, name, "hocr") - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - err := cmd.Run() - if err != nil { - for range toocr { - } // consume the rest of the receiving channel so it isn't blocked - errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String()) - return - } - up <- name + ".hocr" - } - close(up) - } -} - -func analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { - return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { - confs := make(map[string][]*bookpipeline.Conf) - bestconfs := make(map[string]*bookpipeline.Conf) - savedir := "" - - for path := range toanalyse { - if savedir == "" { - savedir = filepath.Dir(path) - } - logger.Println("Calculating confidence for", path) - avg, err := hocr.GetAvgConf(path) - if err != nil && err.Error() == "No words found" { - continue - } - if err != nil { - for range toanalyse { - } // consume the rest of the receiving channel so it isn't blocked - errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err) - return - } - base := filepath.Base(path) - codestart := strings.Index(base, "_bin") - name := base[0:codestart] - var c bookpipeline.Conf - c.Path = path - c.Code = base[codestart:] - c.Conf = avg - confs[name] = append(confs[name], &c) - } - - fn := filepath.Join(savedir, "conf") - logger.Println("Saving confidences in file", fn) - f, err := os.Create(fn) - if err != nil { - errc <- fmt.Errorf("Error creating file %s: %s", fn, err) - return - } - defer f.Close() - - logger.Println("Finding best confidence for each page, and saving all confidences") - for base, conf := range confs { - var best float64 - for _, c := range conf { - if c.Conf > best { - best = c.Conf - bestconfs[base] = c - } - _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) - if err != nil { - errc <- fmt.Errorf("Error writing confidences file: %s", err) - return - } - } - } - up <- fn - - logger.Println("Creating best file listing the best file for each page") - fn = filepath.Join(savedir, "best") - f, err = os.Create(fn) - if err != nil { - errc <- fmt.Errorf("Error creating file %s: %s", fn, err) - return - } - defer f.Close() - for _, conf := range bestconfs { - _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) - } - up <- fn - - var pgs []string - for _, conf := range bestconfs { - pgs = append(pgs, conf.Path) - } - sort.Strings(pgs) - - logger.Println("Downloading binarised and original images to create PDFs") - bookname, err := filepath.Rel(os.TempDir(), savedir) - if err != nil { - errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err) - return - } - colourpdf := new(bookpipeline.Fpdf) - err = colourpdf.Setup() - if err != nil { - errc <- fmt.Errorf("Failed to set up PDF: %s", err) - return - } - binarisedpdf := new(bookpipeline.Fpdf) - err = binarisedpdf.Setup() - if err != nil { - errc <- fmt.Errorf("Failed to set up PDF: %s", err) - return - } - binhascontent, colourhascontent := false, false - - var colourimgs, binimgs []pageimg - - for _, pg := range pgs { - base := filepath.Base(pg) - nosuffix := strings.TrimSuffix(base, ".hocr") - p := strings.SplitN(base, "_bin", 2) - - var fn string - if len(p) > 1 { - fn = p[0] + ".jpg" - } else { - fn = nosuffix + ".jpg" - } - - binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"}) - colourimgs = append(colourimgs, pageimg{hocr: base, img: fn}) - } - - for _, pg := range binimgs { - logger.Println("Downloading binarised page to add to PDF", pg.img) - err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img)) - if err != nil { - logger.Println("Download failed; skipping page", pg.img) - } else { - err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true) - if err != nil { - errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) - return - } - binhascontent = true - err = os.Remove(filepath.Join(savedir, pg.img)) - if err != nil { - errc <- err - return - } - } - } - - if binhascontent { - fn = filepath.Join(savedir, bookname+".binarised.pdf") - err = binarisedpdf.Save(fn) - if err != nil { - errc <- fmt.Errorf("Failed to save binarised pdf: %s", err) - return - } - up <- fn - key := bookname + "/" + bookname + ".binarised.pdf" - conn.Log("Uploading", key) - err := conn.Upload(conn.WIPStorageId(), key, fn) - if err != nil { - } - } - - for _, pg := range colourimgs { - logger.Println("Downloading colour page to add to PDF", pg.img) - colourfn := pg.img - err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) - if err != nil { - colourfn = strings.Replace(pg.img, ".jpg", ".png", 1) - logger.Println("Download failed; trying", colourfn) - err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) - if err != nil { - logger.Println("Download failed; skipping page", pg.img) - } - } - if err == nil { - err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true) - if err != nil { - errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) - return - } - colourhascontent = true - err = os.Remove(filepath.Join(savedir, colourfn)) - if err != nil { - errc <- err - return - } - } - } - if colourhascontent { - fn = filepath.Join(savedir, bookname+".colour.pdf") - err = colourpdf.Save(fn) - if err != nil { - errc <- fmt.Errorf("Failed to save colour pdf: %s", err) - return - } - up <- fn - } - - logger.Println("Creating graph") - fn = filepath.Join(savedir, "graph.png") - f, err = os.Create(fn) - if err != nil { - errc <- fmt.Errorf("Error creating file %s: %s", fn, err) - return - } - defer f.Close() - err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) - if err != nil && err.Error() != "Not enough valid confidences" { - errc <- fmt.Errorf("Error rendering graph: %s", err) - return - } - up <- fn - - close(up) - } -} - -func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { - currentmsg := msg - for range t.C { - m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) - if err != nil { - // This is for better debugging of the heartbeat issue - conn.Log("Error with heartbeat", err) - os.Exit(1) - // TODO: would be better to ensure this error stops any running - // processes, as they will ultimately fail in the case of - // it. could do this by setting a global variable that - // processes check each time they loop. - errc <- err - t.Stop() - return - } - if m.Id != "" { - conn.Log("Replaced message handle as visibilitytimeout limit was reached") - currentmsg = m - // TODO: maybe handle communicating new msg more gracefully than this - for range msgc { - } // throw away any old msgc - msgc <- m - } - } -} - -// allOCRed checks whether all pages of a book have been OCRed. -// This is determined by whether every _bin0.?.png file has a -// corresponding .hocr file. -func allOCRed(bookname string, conn Pipeliner) bool { - objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) - if err != nil { - return false - } - - preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) - - atleastone := false - for _, png := range objs { - if preprocessedPattern.MatchString(png) { - atleastone = true - found := false - b := strings.TrimSuffix(filepath.Base(png), ".png") - hocrname := bookname + "/" + b + ".hocr" - for _, hocr := range objs { - if hocr == hocrname { - found = true - break - } - } - if found == false { - return false - } - } - } - if atleastone == false { - return false - } - return true -} - -// ocrPage OCRs a page based on a message. It may make sense to -// roll this back into processBook (on which it is based) once -// working well. -func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { - dl := make(chan string) - msgc := make(chan bookpipeline.Qmsg) - processc := make(chan string) - upc := make(chan string) - done := make(chan bool) - errc := make(chan error) - - msgparts := strings.Split(msg.Body, " ") - bookname := filepath.Dir(msgparts[0]) - if len(msgparts) > 1 && msgparts[1] != "" { - process = ocr(msgparts[1]) - } - - d := filepath.Join(os.TempDir(), bookname) - err := os.MkdirAll(d, 0755) - if err != nil { - return fmt.Errorf("Failed to create directory %s: %s", d, err) - } - - t := time.NewTicker(HeartbeatSeconds * time.Second) - go heartbeat(conn, t, msg, fromQueue, msgc, errc) - - // these functions will do their jobs when their channels have data - go download(dl, processc, conn, d, errc, conn.GetLogger()) - go process(processc, upc, errc, conn.GetLogger()) - go up(upc, done, conn, bookname, errc, conn.GetLogger()) - - dl <- msgparts[0] - close(dl) - - // wait for either the done or errc channel to be sent to - select { - case err = <-errc: - t.Stop() - _ = os.RemoveAll(d) - return err - case <-done: - } - - if allOCRed(bookname, conn) && toQueue != "" { - conn.Log("Sending", bookname, "to queue", toQueue) - err = conn.AddToQueue(toQueue, bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return fmt.Errorf("Error adding to queue %s: %s", bookname, err) - } - } - - t.Stop() - - // check whether we're using a newer msg handle - select { - case m, ok := <-msgc: - if ok { - msg = m - conn.Log("Using new message handle to delete message from queue") - } - default: - conn.Log("Using original message handle to delete message from queue") - } - - conn.Log("Deleting original message from queue", fromQueue) - err = conn.DelFromQueue(fromQueue, msg.Handle) - if err != nil { - _ = os.RemoveAll(d) - return fmt.Errorf("Error deleting message from queue: %s", err) - } - - err = os.RemoveAll(d) - if err != nil { - return fmt.Errorf("Failed to remove directory %s: %s", d, err) - } - - return nil -} - -func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { - dl := make(chan string) - msgc := make(chan bookpipeline.Qmsg) - processc := make(chan string) - upc := make(chan string) - done := make(chan bool) - errc := make(chan error) - - msgparts := strings.Split(msg.Body, " ") - bookname := msgparts[0] - - var training string - if len(msgparts) > 1 { - training = msgparts[1] - } - - d := filepath.Join(os.TempDir(), bookname) - err := os.MkdirAll(d, 0755) - if err != nil { - return fmt.Errorf("Failed to create directory %s: %s", d, err) - } - - t := time.NewTicker(HeartbeatSeconds * time.Second) - go heartbeat(conn, t, msg, fromQueue, msgc, errc) - - // these functions will do their jobs when their channels have data - go download(dl, processc, conn, d, errc, conn.GetLogger()) - go process(processc, upc, errc, conn.GetLogger()) - if toQueue == conn.OCRPageQueueId() { - go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) - } else { - go up(upc, done, conn, bookname, errc, conn.GetLogger()) - } - - conn.Log("Getting list of objects to download") - objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err) - } - var todl []string - for _, n := range objs { - if !match.MatchString(n) { - conn.Log("Skipping item that doesn't match target", n) - continue - } - todl = append(todl, n) - } - for _, a := range todl { - dl <- a - } - close(dl) - - // wait for either the done or errc channel to be sent to - select { - case err = <-errc: - t.Stop() - _ = os.RemoveAll(d) - // if the error is in preprocessing / wipeonly, chances are that it will never - // complete, and will fill the ocrpage queue with parts which succeeded - // on each run, so in that case it's better to delete the message from - // the queue and notify us. - if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { - conn.Log("Deleting message from queue due to a bad error", fromQueue) - err2 := conn.DelFromQueue(fromQueue, msg.Handle) - if err2 != nil { - conn.Log("Error deleting message from queue", err2) - } - ms, err2 := getMailSettings() - if err2 != nil { - conn.Log("Failed to mail settings ", err2) - } - if err2 == nil && ms.server != "" { - logs, err2 := getlogs() - if err2 != nil { - conn.Log("Failed to get logs ", err2) - logs = "" - } - msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" + - "Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" + - " Fail message: %s\r\nFull log:\r\n%s\r\n", - ms.to, ms.from, bookname, err, logs) - host := fmt.Sprintf("%s:%s", ms.server, ms.port) - auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server) - err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg)) - if err2 != nil { - conn.Log("Error sending email ", err2) - } - } - } - return err - case <-done: - } - - if toQueue != "" && toQueue != conn.OCRPageQueueId() { - conn.Log("Sending", bookname, "to queue", toQueue) - err = conn.AddToQueue(toQueue, bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return fmt.Errorf("Error adding to queue %s: %s", bookname, err) - } - } - - t.Stop() - - // check whether we're using a newer msg handle - select { - case m, ok := <-msgc: - if ok { - msg = m - conn.Log("Using new message handle to delete message from queue") - } - default: - conn.Log("Using original message handle to delete message from queue") - } - - conn.Log("Deleting original message from queue", fromQueue) - err = conn.DelFromQueue(fromQueue, msg.Handle) - if err != nil { - _ = os.RemoveAll(d) - return fmt.Errorf("Error deleting message from queue: %s", err) - } - - err = os.RemoveAll(d) - if err != nil { - return fmt.Errorf("Failed to remove directory %s: %s", d, err) - } - - return nil -} - func stopTimer(t *time.Timer) { if !t.Stop() { <-t.C } } -// TODO: rather than relying on journald, would be nicer to save the logs -// ourselves maybe, so that we weren't relying on a particular systemd -// setup. this can be done by having the conn.Log also append line -// to a file (though that would mean everything would have to go through -// conn.Log, which we're not consistently doing yet). the correct thing -// to do then would be to implement a new interface that covers the part -// of log.Logger we use (e.g. Print and Printf), and then have an exported -// conn struct that implements those, so that we could pass a log.Logger -// or the new conn struct everywhere (we wouldn't be passing a log.Logger, -// it's just good to be able to keep the compatibility) -func getlogs() (string, error) { - cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all") - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - err := cmd.Run() - return stdout.String(), err -} - -func savelogs(conn Pipeliner, starttime int64, hostname string) error { - logs, err := getlogs() - if err != nil { - return fmt.Errorf("Error getting logs, error: %v", err) +func resetTimer(t *time.Timer, d time.Duration) { + if d > 0 { + t.Reset(d) } - key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname) - path := filepath.Join(os.TempDir(), key) - f, err := os.Create(path) - if err != nil { - return fmt.Errorf("Error creating log file", err) - } - defer f.Close() - _, err = f.WriteString(logs) - if err != nil { - return fmt.Errorf("Error saving log file", err) - } - _ = f.Close() - err = conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - return fmt.Errorf("Error uploading log", err) - } - conn.Log("Log saved to", key) - return nil } func main() { @@ -770,7 +94,8 @@ func main() { nowipe := flag.Bool("nw", false, "disable wipeonly") noocrpg := flag.Bool("nop", false, "disable ocr on individual pages") noanalyse := flag.Bool("na", false, "disable analysis") - autoshutdown := flag.Bool("shutdown", false, "automatically shut down if no work has been available for 5 minutes") + autostop := flag.Int64("autostop", 300, "automatically stop process if no work has been available for this number of seconds (to disable autostop set to 0)") + autoshutdown := flag.Bool("shutdown", false, "automatically shut down host computer if there has been no work to do for the duration set with -autostop") conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") flag.Usage = func() { @@ -801,17 +126,20 @@ func main() { log.Fatalln("Unknown connection type") } - _, err := getMailSettings() - if err != nil { - conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err) + var err error + if *conntype != "local" { + _, err = pipeline.GetMailSettings() + if err != nil { + conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err) + } } - conn.Log("Setting up AWS session") + conn.Log("Setting up session") err = conn.Init() if err != nil { - log.Fatalln("Error setting up cloud connection:", err) + log.Fatalln("Error setting up connection:", err) } - conn.Log("Finished setting up AWS session") + conn.Log("Finished setting up session") starttime := time.Now().Unix() hostname, err := os.Hostname() @@ -820,7 +148,7 @@ func main() { var checkWipeQueue <-chan time.Time var checkOCRPageQueue <-chan time.Time var checkAnalyseQueue <-chan time.Time - var shutdownIfQuiet *time.Timer + var stopIfQuiet *time.Timer var savelognow *time.Ticker if !*nopreproc { checkPreQueue = time.After(0) @@ -834,13 +162,21 @@ func main() { if !*noanalyse { checkAnalyseQueue = time.After(0) } - shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown) + var quietTime = time.Duration(*autostop) * time.Second + stopIfQuiet = time.NewTimer(quietTime) + if quietTime == 0 { + stopIfQuiet.Stop() + } + savelognow = time.NewTicker(LogSaveTime) + if *conntype == "local" { + savelognow.Stop() + } for { select { case <-checkPreQueue: - msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs) checkPreQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking preprocess queue", err) @@ -851,14 +187,14 @@ func main() { continue } conn.Log("Message received on preprocess queue, processing", msg.Body) - stopTimer(shutdownIfQuiet) - err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) - shutdownIfQuiet.Reset(TimeBeforeShutdown) + stopTimer(stopIfQuiet) + err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during preprocess", err) } case <-checkWipeQueue: - msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) checkWipeQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking wipeonly queue", err) @@ -868,15 +204,15 @@ func main() { conn.Log("No message received on wipeonly queue, sleeping") continue } - stopTimer(shutdownIfQuiet) + stopTimer(stopIfQuiet) conn.Log("Message received on wipeonly queue, processing", msg.Body) - err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) - shutdownIfQuiet.Reset(TimeBeforeShutdown) + err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during wipe", err) } case <-checkOCRPageQueue: - msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs) checkOCRPageQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking OCR Page queue", err) @@ -888,15 +224,15 @@ func main() { // Have OCRPageQueue checked immediately after completion, as chances are high that // there will be more pages that should be done without delay checkOCRPageQueue = time.After(0) - stopTimer(shutdownIfQuiet) + stopTimer(stopIfQuiet) conn.Log("Message received on OCR Page queue, processing", msg.Body) - err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) - shutdownIfQuiet.Reset(TimeBeforeShutdown) + err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training, ""), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during OCR Page process", err) } case <-checkAnalyseQueue: - msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatSeconds*2) + msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs) checkAnalyseQueue = time.After(PauseBetweenChecks) if err != nil { conn.Log("Error checking analyse queue", err) @@ -906,25 +242,30 @@ func main() { conn.Log("No message received on analyse queue, sleeping") continue } - stopTimer(shutdownIfQuiet) + stopTimer(stopIfQuiet) conn.Log("Message received on analyse queue, processing", msg.Body) - err = processBook(msg, conn, analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") - shutdownIfQuiet.Reset(TimeBeforeShutdown) + err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during analysis", err) } case <-savelognow.C: conn.Log("Saving logs") - err = savelogs(conn, starttime, hostname) + err = pipeline.SaveLogs(conn, starttime, hostname) if err != nil { conn.Log("Error saving logs", err) } - case <-shutdownIfQuiet.C: - if !*autoshutdown { + case <-stopIfQuiet.C: + if quietTime == 0 { continue } + if !*autoshutdown { + conn.Log("Stopping pipeline") + _ = pipeline.SaveLogs(conn, starttime, hostname) + return + } conn.Log("Shutting down") - _ = savelogs(conn, starttime, hostname) + _ = pipeline.SaveLogs(conn, starttime, hostname) cmd := exec.Command("sudo", "systemctl", "poweroff") var stdout, stderr bytes.Buffer cmd.Stdout = &stdout diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go index 60d1f81..b4f4d99 100644 --- a/cmd/booktopipeline/main.go +++ b/cmd/booktopipeline/main.go @@ -9,14 +9,13 @@ package main import ( "flag" "fmt" - "image" - _ "image/png" - _ "image/jpeg" "log" "os" "path/filepath" "rescribe.xyz/bookpipeline" + + "rescribe.xyz/bookpipeline/internal/pipeline" ) const usage = `Usage: booktopipeline [-c conn] [-t training] [-prebinarised] [-notbinarised] [-v] bookdir [bookname] @@ -32,15 +31,6 @@ using the flags -prebinarised (for the wipeonly queue) or If bookname is omitted the last part of the bookdir is used. ` -type Pipeliner interface { - Init() error - PreQueueId() string - WipeQueueId() string - WIPStorageId() string - AddToQueue(url string, msg string) error - Upload(bucket string, key string, path string) error -} - // null writer to enable non-verbose logging to be discarded type NullWriter bool @@ -50,18 +40,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) { var verboselog *log.Logger -type fileWalk chan string - -func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if !info.IsDir() { - f <- path - } - return nil -} - func main() { verbose := flag.Bool("v", false, "Verbose") conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") @@ -94,7 +72,7 @@ func main() { verboselog = log.New(n, "", log.LstdFlags) } - var conn Pipeliner + var conn pipeline.Pipeliner switch *conntype { case "aws": conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} @@ -108,18 +86,7 @@ func main() { log.Fatalln("Failed to set up cloud connection:", err) } - qid := conn.PreQueueId() - - // Auto detect type of queue to send to based on file extension - pngdirs, _ := filepath.Glob(bookdir + "/*.png") - jpgdirs, _ := filepath.Glob(bookdir + "/*.jpg") - pngcount := len(pngdirs) - jpgcount := len(jpgdirs) - if pngcount > jpgcount { - qid = conn.WipeQueueId() - } else { - qid = conn.PreQueueId() - } + qid := pipeline.DetectQueueType(bookdir, conn) // Flags set override the queue selection if *wipeonly { @@ -130,43 +97,24 @@ func main() { } verboselog.Println("Checking that all images are valid in", bookdir) - checker := make(fileWalk) - go func() { - err = filepath.Walk(bookdir, checker.Walk) - if err != nil { - log.Fatalln("Filesystem walk failed:", err) - } - close(checker) - }() - - for path := range checker { - f, err := os.Open(path) - if err != nil { - log.Fatalln("Opening image %s failed, bailing: %v", path, err) - } - _, _, err = image.Decode(f) - if err != nil { - log.Fatalf("Decoding image %s failed, bailing: %v", path, err) - } + err = pipeline.CheckImages(bookdir) + if err != nil { + log.Fatalln(err) } - verboselog.Println("Walking", bookdir) - walker := make(fileWalk) - go func() { - err = filepath.Walk(bookdir, walker.Walk) - if err != nil { - log.Fatalln("Filesystem walk failed:", err) - } - close(walker) - }() - - for path := range walker { - verboselog.Println("Uploading", path) - name := filepath.Base(path) - err = conn.Upload(conn.WIPStorageId(), filepath.Join(bookname, name), path) - if err != nil { - log.Fatalln("Failed to upload", path, err) - } + verboselog.Println("Checking that a book hasn't already been uploaded with that name") + list, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + log.Fatalln(err) + } + if len(list) > 0 { + log.Fatalf("Error: There is already a book in S3 named %s", bookname) + } + + verboselog.Println("Uploading all images are valid in", bookdir) + err = pipeline.UploadImages(bookdir, bookname, conn) + if err != nil { + log.Fatalln(err) } if *training != "" { diff --git a/cmd/getbests/main.go b/cmd/getbests/main.go index 9eca0d8..c1ee50d 100644 --- a/cmd/getbests/main.go +++ b/cmd/getbests/main.go @@ -62,8 +62,8 @@ func main() { log.Println("Downloading all best files found") for _, i := range objs { parts := strings.Split(i, "/") - if parts[len(parts) - 1] == "best" { - err = conn.Download(conn.WIPStorageId(), i, parts[0] + "-best") + if parts[len(parts)-1] == "best" { + err = conn.Download(conn.WIPStorageId(), i, parts[0]+"-best") if err != nil { log.Fatalln("Failed to download file", i, err) } diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go index 03e709b..ccedd72 100644 --- a/cmd/getpipelinebook/main.go +++ b/cmd/getpipelinebook/main.go @@ -6,15 +6,15 @@ package main import ( - "bufio" "flag" "fmt" "log" "os" "path/filepath" - "strings" "rescribe.xyz/bookpipeline" + + "rescribe.xyz/bookpipeline/internal/pipeline" ) const usage = `Usage: getpipelinebook [-c conn] [-a] [-graph] [-pdf] [-png] [-v] bookname @@ -33,28 +33,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) { return len(p), nil } -type Pipeliner interface { - MinimalInit() error - ListObjects(bucket string, prefix string) ([]string, error) - Download(bucket string, key string, fn string) error - Upload(bucket string, key string, path string) error - CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) - AddToQueue(url string, msg string) error - DelFromQueue(url string, handle string) error - WIPStorageId() string -} - -func getpdfs(conn Pipeliner, l *log.Logger, bookname string) { - for _, suffix := range []string{".colour.pdf", ".binarised.pdf"} { - fn := filepath.Join(bookname, bookname+suffix) - l.Println("Downloading PDF", fn) - err := conn.Download(conn.WIPStorageId(), fn, fn) - if err != nil { - log.Printf("Failed to download %s: %s\n", fn, err) - } - } -} - func main() { all := flag.Bool("a", false, "Get all files for book") conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") @@ -83,7 +61,7 @@ func main() { verboselog = log.New(n, "", log.LstdFlags) } - var conn Pipeliner + var conn pipeline.MinPipeliner switch *conntype { case "aws": conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} @@ -109,18 +87,10 @@ func main() { if *all { verboselog.Println("Downloading all files for", bookname) - objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + err = pipeline.DownloadAll(bookname, bookname, conn) if err != nil { - log.Fatalln("Failed to get list of files for book", bookname, err) - } - for _, i := range objs { - verboselog.Println("Downloading", i) - err = conn.Download(conn.WIPStorageId(), i, i) - if err != nil { - log.Fatalln("Failed to download file", i, err) - } + log.Fatalln(err) } - return } if *binarisedpdf { @@ -151,61 +121,29 @@ func main() { } if *pdf { - getpdfs(conn, verboselog, bookname) + verboselog.Println("Downloading PDFs") + pipeline.DownloadPdfs(bookname, bookname, conn) } if *binarisedpdf || *colourpdf || *graph || *pdf { return } - verboselog.Println("Downloading best file") - fn := filepath.Join(bookname, "best") - err = conn.Download(conn.WIPStorageId(), fn, fn) + verboselog.Println("Downloading best pages") + err = pipeline.DownloadBestPages(bookname, bookname, conn, *png) if err != nil { - log.Fatalln("Failed to download 'best' file", err) - } - f, err := os.Open(fn) - if err != nil { - log.Fatalln("Failed to open best file", err) - } - defer f.Close() - - if *png { - verboselog.Println("Downloading png files") - s := bufio.NewScanner(f) - for s.Scan() { - txtfn := filepath.Join(bookname, s.Text()) - fn = strings.Replace(txtfn, ".hocr", ".png", 1) - verboselog.Println("Downloading file", fn) - err = conn.Download(conn.WIPStorageId(), fn, fn) - if err != nil { - log.Fatalln("Failed to download file", fn, err) - } - } - return + log.Fatalln(err) } - verboselog.Println("Downloading HOCR files") - s := bufio.NewScanner(f) - for s.Scan() { - fn = filepath.Join(bookname, s.Text()) - verboselog.Println("Downloading file", fn) - err = conn.Download(conn.WIPStorageId(), fn, fn) - if err != nil { - log.Fatalln("Failed to download file", fn, err) - } + verboselog.Println("Downloading PDFs") + pipeline.DownloadPdfs(bookname, bookname, conn) + if err != nil { + log.Fatalln(err) } - verboselog.Println("Downloading PDF files") - getpdfs(conn, verboselog, bookname) - - verboselog.Println("Downloading analysis files") - for _, a := range []string{"conf", "graph.png"} { - fn = filepath.Join(bookname, a) - verboselog.Println("Downloading file", fn) - err = conn.Download(conn.WIPStorageId(), fn, fn) - if err != nil { - log.Fatalln("Failed to download file", fn, err) - } + verboselog.Println("Downloading analyses") + err = pipeline.DownloadAnalyses(bookname, bookname, conn) + if err != nil { + log.Fatalln(err) } } diff --git a/cmd/logwholequeue/main.go b/cmd/logwholequeue/main.go new file mode 100644 index 0000000..71e8927 --- /dev/null +++ b/cmd/logwholequeue/main.go @@ -0,0 +1,85 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// logwholequeue gets all messages in a queue. This can be useful +// for debugging queue issues. +package main + +import ( + "flag" + "fmt" + "log" + + "rescribe.xyz/bookpipeline" +) + +const usage = `Usage: logwholequeue qname + +logwholequeue gets all messages in a queue. + +This can be useful for debugging queue issues. + +Valid queue names: +- preprocess +- wipeonly +- ocrpage +- analyse +` + +type QueuePipeliner interface { + Init() error + LogQueue(url string) error + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string +} + +func main() { + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + if flag.NArg() != 1 { + flag.Usage() + return + } + + var conn QueuePipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2"} + + err := conn.Init() + if err != nil { + log.Fatalln("Error setting up cloud connection:", err) + } + + qdetails := []struct { + id, name string + }{ + {conn.PreQueueId(), "preprocess"}, + {conn.WipeQueueId(), "wipeonly"}, + {conn.OCRPageQueueId(), "ocrpage"}, + {conn.AnalyseQueueId(), "analyse"}, + } + + qname := flag.Arg(0) + + var qid string + for i, n := range qdetails { + if n.name == qname { + qid = qdetails[i].id + break + } + } + if qid == "" { + log.Fatalln("Error, no queue named", qname) + } + + err = conn.LogQueue(qid) + if err != nil { + log.Fatalln("Error getting queue", qname, ":", err) + } +} diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index b649778..131ff12 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -12,6 +12,7 @@ import ( "os/exec" "sort" "strings" + "time" "rescribe.xyz/bookpipeline" ) @@ -35,7 +36,7 @@ type LsPipeliner interface { AnalyseQueueId() string GetQueueDetails(url string) (string, string, error) GetInstanceDetails() ([]bookpipeline.InstanceDetails, error) - ListObjectsWithMeta(bucket string, prefix string) ([]bookpipeline.ObjMeta, error) + ListObjectWithMeta(bucket string, prefix string) (bookpipeline.ObjMeta, error) ListObjectPrefixes(bucket string) ([]string, error) WIPStorageId() string } @@ -100,43 +101,88 @@ func (o ObjMetas) Less(i, j int) bool { return o[i].Date.Before(o[j].Date) } +// getBookDetails determines whether a book is done and what date +// it was completed, or if it has not finished, the date of any +// book file. +func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, err error) { + // First try to get the graph.png file from the book, which marks + // it as done + obj, err := conn.ListObjectWithMeta(conn.WIPStorageId(), key+"graph.png") + if err == nil { + return obj.Date, true, nil + } + + // Otherwise get any file from the book to get a date to sort by + obj, err = conn.ListObjectWithMeta(conn.WIPStorageId(), key) + if err != nil { + return time.Time{}, false, err + } + return obj.Date, false, nil +} + +// getBookDetailsChan gets the details for a book putting it into either the +// done or inprogress channels as appropriate, or sending an error to errc +// on failure. +func getBookDetailsChan(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error) { + date, isdone, err := getBookDetails(conn, key) + if err != nil { + errc <- err + return + } + meta := bookpipeline.ObjMeta{Name: strings.TrimSuffix(key, "/"), Date: date} + if isdone { + done <- meta + } else { + inprogress <- meta + } +} + // getBookStatus returns a list of in progress and done books. // It determines this by finding all prefixes, and splitting them // into two lists, those which have a 'graph.png' file (the done // list), and those which do not (the inprogress list). They are // sorted according to the date of the graph.png file, or the date // of a random file with the prefix if no graph.png was found. +// It spins up many goroutines to do query the book status and +// dates, as it is far faster to do concurrently. func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) { prefixes, err := conn.ListObjectPrefixes(conn.WIPStorageId()) - var inprogressmeta, donemeta ObjMetas if err != nil { log.Println("Error getting object prefixes:", err) return } - // Search for graph.png to determine done books (and save the date of it to sort with) + + donec := make(chan bookpipeline.ObjMeta, 100) + inprogressc := make(chan bookpipeline.ObjMeta, 100) + errc := make(chan error) + for _, p := range prefixes { - objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), p+"graph.png") - if err != nil || len(objs) == 0 { - inprogressmeta = append(inprogressmeta, bookpipeline.ObjMeta{Name: p}) - } else { - donemeta = append(donemeta, bookpipeline.ObjMeta{Name: p, Date: objs[0].Date}) - } + go getBookDetailsChan(conn, p, donec, inprogressc, errc) } - // Get a random file from the inprogress list to get a date to sort by - for _, i := range inprogressmeta { - objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), i.Name) - if err != nil || len(objs) == 0 { - continue + + var inprogressmeta, donemeta ObjMetas + + // there will be exactly as many sends to donec or inprogressc + // as there are prefixes + for range prefixes { + select { + case i := <-donec: + donemeta = append(donemeta, i) + case i := <-inprogressc: + inprogressmeta = append(inprogressmeta, i) + case err = <-errc: + return inprogress, done, err } - i.Date = objs[0].Date } + sort.Sort(donemeta) + sort.Sort(inprogressmeta) + for _, i := range donemeta { - done = append(done, strings.TrimSuffix(i.Name, "/")) + done = append(done, i.Name) } - sort.Sort(inprogressmeta) for _, i := range inprogressmeta { - inprogress = append(inprogress, strings.TrimSuffix(i.Name, "/")) + inprogress = append(inprogress, i.Name) } return diff --git a/cmd/postprocess-bythresh/main.go b/cmd/postprocess-bythresh/main.go index 37b77e7..5bdb839 100644 --- a/cmd/postprocess-bythresh/main.go +++ b/cmd/postprocess-bythresh/main.go @@ -19,7 +19,6 @@ import ( //TO DO: make writetofile return an error and handle that accordingly // potential TO DO: add text versions where footer is cropped on odd/even pages only - // the trimblanks function trims the blank lines from a text input func trimblanks(hocrfile string) string { @@ -50,7 +49,7 @@ func dehyphenateString(in string) string { words := strings.Split(line, " ") last := words[len(words)-1] // the - 2 here is to account for a trailing newline and counting from zero - if len(last) > 0 && last[len(last) - 1] == '-' && i < len(lines) - 2 { + if len(last) > 0 && last[len(last)-1] == '-' && i < len(lines)-2 { nextwords := strings.Split(lines[i+1], " ") if len(nextwords) > 0 { line = line[0:len(line)-1] + nextwords[0] @@ -66,17 +65,15 @@ func dehyphenateString(in string) string { return strings.Join(newlines, " ") } - // the fullcrop function takes a text input and crops the first and the last line (if text is at least 2 lines long) func fullcrop(noblanks string) string { - alllines := strings.Split(noblanks, "\n") - + if len(alllines) <= 2 { - return "" - } else { - return strings.Join(alllines[1:len(alllines)-2], "\n") + return "" + } else { + return strings.Join(alllines[1:len(alllines)-2], "\n") } } @@ -132,7 +129,6 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string, var killheadtxt string var footkilltxt string - hocrfilepath := filepath.Join(bookdirectory, hocrfilename) confpath := filepath.Join(bookdirectory, "conf") @@ -165,18 +161,16 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string, if err != nil { log.Fatal(err) } - - + trimbest := trimblanks(hocrfiletext) - + alltxt = dehyphenateString(trimbest) - + croptxt = dehyphenateString(fullcrop(trimbest)) - + killheadtxt = dehyphenateString(headcrop(trimbest)) - + footkilltxt = dehyphenateString(footcrop(trimbest)) - } return alltxt, croptxt, killheadtxt, footkilltxt @@ -185,7 +179,7 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string, // the writetofile function takes a directory, filename and text input and creates a text file within the bookdirectory from them. func writetofile(bookdirectory, textfilebase, txt string) error { alltxtfile := filepath.Join(bookdirectory, textfilebase) - + file, err := os.Create(alltxtfile) if err != nil { return fmt.Errorf("Error opening file %s: %v", alltxtfile, err) @@ -194,7 +188,7 @@ func writetofile(bookdirectory, textfilebase, txt string) error { if _, err := file.WriteString(txt); err != nil { log.Println(err) } -return err + return err } @@ -215,7 +209,7 @@ func main() { bookdirectory := flag.Arg(0) confthreshstring := strconv.Itoa(*confthresh) - + fmt.Println("Postprocessing", bookdirectory, "with threshold", *confthresh) bestpath := filepath.Join(bookdirectory, "best") @@ -239,32 +233,31 @@ func main() { crop = crop + " " + croptxt killhead = killhead + " " + killheadtxt killfoot = killfoot + " " + footkilltxt - + } } - - - bookname:= filepath.Base(bookdirectory) - b := bookname + "_" + confthreshstring - err1 := writetofile(bookdirectory, b + "_all.txt", all) - if err1 != nil { + bookname := filepath.Base(bookdirectory) + b := bookname + "_" + confthreshstring + + err1 := writetofile(bookdirectory, b+"_all.txt", all) + if err1 != nil { log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err1) - } - - err2 := writetofile(bookdirectory, b + "_crop.txt", crop) - if err2 != nil { + } + + err2 := writetofile(bookdirectory, b+"_crop.txt", crop) + if err2 != nil { log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err2) - } - - err3 := writetofile(bookdirectory, b + "_nohead.txt", killhead) - if err3 != nil { + } + + err3 := writetofile(bookdirectory, b+"_nohead.txt", killhead) + if err3 != nil { log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err3) - } - - err4 := writetofile(bookdirectory, b + "_nofoot.txt", killfoot) - if err4 != nil { + } + + err4 := writetofile(bookdirectory, b+"_nofoot.txt", killfoot) + if err4 != nil { log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err4) - } + } } diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go new file mode 100644 index 0000000..07eeaf0 --- /dev/null +++ b/cmd/rescribe/main.go @@ -0,0 +1,395 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// rescribe is a modification of bookpipeline designed for local-only +// operation, which rolls uploading, processing, and downloading of +// a single book by the pipeline into one command. +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "log" + "os" + "os/exec" + "path/filepath" + "regexp" + "runtime" + "strings" + "time" + + "rescribe.xyz/bookpipeline" + "rescribe.xyz/utils/pkg/hocr" + + "rescribe.xyz/bookpipeline/internal/pipeline" +) + +const usage = `Usage: rescribe [-v] [-t training] bookdir [savedir] + +Process and OCR a book using the Rescribe pipeline on a local machine. + +OCR results are saved into the bookdir directory unless savedir is +specified. +` + +const QueueTimeoutSecs = 2 * 60 +const PauseBetweenChecks = 1 * time.Second +const LogSaveTime = 1 * time.Minute +var thresholds = []float64{0.1, 0.2, 0.3} + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} + +type Clouder interface { + Init() error + ListObjects(bucket string, prefix string) ([]string, error) + Download(bucket string, key string, fn string) error + Upload(bucket string, key string, path string) error + CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) + AddToQueue(url string, msg string) error + DelFromQueue(url string, handle string) error + QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) +} + +type Pipeliner interface { + Clouder + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string + WIPStorageId() string + GetLogger() *log.Logger + Log(v ...interface{}) +} + +func stopTimer(t *time.Timer) { + if !t.Stop() { + <-t.C + } +} + +func resetTimer(t *time.Timer, d time.Duration) { + if d > 0 { + t.Reset(d) + } +} + +func main() { + deftesscmd := "tesseract" + if runtime.GOOS == "windows" { + deftesscmd = "C:\\Program Files\\Tesseract-OCR\\tesseract.exe" + } + + verbose := flag.Bool("v", false, "verbose") + training := flag.String("t", "training/rescribev7_fast.traineddata", "path to the tesseract training file to use") + tesscmd := flag.String("tesscmd", deftesscmd, "The Tesseract executable to run. You may need to set this to the full path of Tesseract.exe if you're on Windows.") + + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + if flag.NArg() < 1 || flag.NArg() > 2 { + flag.Usage() + return + } + + bookdir := flag.Arg(0) + bookname := filepath.Base(bookdir) + savedir := bookdir + if flag.NArg() > 1 { + savedir = flag.Arg(1) + } + + var verboselog *log.Logger + if *verbose { + verboselog = log.New(os.Stdout, "", 0) + } else { + var n NullWriter + verboselog = log.New(n, "", 0) + } + + f, err := os.Open(*training) + if err != nil { + fmt.Fprintf(os.Stderr, "Error: Training file %s could not be opened.\n", *training) + fmt.Fprintf(os.Stderr, "Set the `-t` flag with path to a tesseract .traineddata file.\n") + os.Exit(1) + } + f.Close() + + abstraining, err := filepath.Abs(*training) + if err != nil { + log.Fatalf("Error getting absolute path of training %s: %v", err) + } + tessPrefix, trainingName := filepath.Split(abstraining) + trainingName = strings.TrimSuffix(trainingName, ".traineddata") + err = os.Setenv("TESSDATA_PREFIX", tessPrefix) + if err != nil { + log.Fatalln("Error setting TESSDATA_PREFIX:", err) + } + + _, err = exec.Command(*tesscmd, "--help").Output() + if err != nil { + fmt.Fprintf(os.Stderr, "Error: Can't run Tesseract.\n") + fmt.Fprintf(os.Stderr, "Ensure that Tesseract is installed and available.\n") + fmt.Fprintf(os.Stderr, "You may need to -tesscmd to the full path of Tesseract.exe if you're on Windows, like this:\n") + fmt.Fprintf(os.Stderr, " rescribe -tesscmd 'C:\\Program Files\\Tesseract OCR (x86)\\tesseract.exe' ...\n") + os.Exit(1) + } + + tempdir, err := ioutil.TempDir("", "bookpipeline") + if err != nil { + log.Fatalln("Error setting up temporary directory:", err) + } + + var conn Pipeliner + conn = &bookpipeline.LocalConn{Logger: verboselog, TempDir: tempdir} + + conn.Log("Setting up session") + err = conn.Init() + if err != nil { + log.Fatalln("Error setting up connection:", err) + } + conn.Log("Finished setting up session") + + fmt.Printf("Copying book to pipeline\n") + + err = uploadbook(bookdir, bookname, conn) + if err != nil { + _ = os.RemoveAll(tempdir) + log.Fatalln(err) + } + + fmt.Printf("Processing book\n") + err = processbook(trainingName, *tesscmd, conn) + if err != nil { + _ = os.RemoveAll(tempdir) + log.Fatalln(err) + } + + fmt.Printf("Saving finished book to %s\n", savedir) + err = os.MkdirAll(savedir, 0755) + if err != nil { + log.Fatalf("Error creating save directory %s: %v", savedir, err) + } + err = downloadbook(savedir, bookname, conn) + if err != nil { + _ = os.RemoveAll(tempdir) + log.Fatalln(err) + } + + err = os.RemoveAll(tempdir) + if err != nil { + log.Fatalf("Error removing temporary directory %s: %v", tempdir, err) + } + + hocrs, err := filepath.Glob(fmt.Sprintf("%s%s*.hocr", savedir, string(filepath.Separator))) + if err != nil { + log.Fatalf("Error looking for .hocr files: %v", err) + } + + for _, v := range hocrs { + err = addTxtVersion(v) + if err != nil { + log.Fatalf("Error creating txt version of %s: %v", v, err) + } + + err = os.MkdirAll(filepath.Join(savedir, "hocr"), 0755) + if err != nil { + log.Fatalf("Error creating hocr directory: %v", err) + } + + err = os.Rename(v, filepath.Join(savedir, "hocr", filepath.Base(v))) + if err != nil { + log.Fatalf("Error moving hocr %s to hocr directory: %v", v, err) + } + } + + // For simplicity, remove .binarised.pdf and rename .colour.pdf to .pdf + _ = os.Remove(filepath.Join(savedir, bookname + ".binarised.pdf")) + _ = os.Rename(filepath.Join(savedir, bookname + ".colour.pdf"), filepath.Join(savedir, bookname + ".pdf")) +} + +func addTxtVersion(hocrfn string) error { + dir := filepath.Dir(hocrfn) + err := os.MkdirAll(filepath.Join(dir, "text"), 0755) + if err != nil { + log.Fatalf("Error creating text directory: %v", err) + } + + t, err := hocr.GetText(hocrfn) + if err != nil { + return fmt.Errorf("Error getting text from hocr file %s: %v", hocrfn, err) + } + + basefn := filepath.Base(hocrfn) + for _, v := range thresholds { + basefn = strings.TrimSuffix(basefn, fmt.Sprintf("_bin%.1f.hocr", v)) + } + fn := filepath.Join(dir, "text", basefn + ".txt") + + err = ioutil.WriteFile(fn, []byte(t), 0644) + if err != nil { + return fmt.Errorf("Error creating text file %s: %v", fn, err) + } + + return nil +} + +func uploadbook(dir string, name string, conn Pipeliner) error { + err := pipeline.CheckImages(dir) + if err != nil { + return fmt.Errorf("Error with images in %s: %v", dir, err) + } + err = pipeline.UploadImages(dir, name, conn) + if err != nil { + return fmt.Errorf("Error saving images to process from %s: %v", dir, err) + } + + qid := pipeline.DetectQueueType(dir, conn) + + err = conn.AddToQueue(qid, name) + if err != nil { + return fmt.Errorf("Error adding book job to queue %s: %v", qid, err) + } + + return nil +} + +func downloadbook(dir string, name string, conn Pipeliner) error { + err := os.MkdirAll(name, 0755) + if err != nil { + log.Fatalln("Failed to create directory", name, err) + } + + err = pipeline.DownloadBestPages(dir, name, conn, false) + if err != nil { + return fmt.Errorf("Error downloading best pages: %v", err) + } + + err = pipeline.DownloadPdfs(dir, name, conn) + if err != nil { + return fmt.Errorf("Error downloading PDFs: %v", err) + } + + err = pipeline.DownloadAnalyses(dir, name, conn) + if err != nil { + return fmt.Errorf("Error downloading analyses: %v", err) + } + + return nil +} + +func processbook(training string, tesscmd string, conn Pipeliner) error { + origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) + wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`) + ocredPattern := regexp.MustCompile(`.hocr$`) + + var checkPreQueue <-chan time.Time + var checkWipeQueue <-chan time.Time + var checkOCRPageQueue <-chan time.Time + var checkAnalyseQueue <-chan time.Time + var stopIfQuiet *time.Timer + checkPreQueue = time.After(0) + checkWipeQueue = time.After(0) + checkOCRPageQueue = time.After(0) + checkAnalyseQueue = time.After(0) + var quietTime = 1 * time.Second + stopIfQuiet = time.NewTimer(quietTime) + if quietTime == 0 { + stopIfQuiet.Stop() + } + + for { + select { + case <-checkPreQueue: + msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs) + checkPreQueue = time.After(PauseBetweenChecks) + if err != nil { + return fmt.Errorf("Error checking preprocess queue: %v", err) + } + if msg.Handle == "" { + conn.Log("No message received on preprocess queue, sleeping") + continue + } + stopTimer(stopIfQuiet) + conn.Log("Message received on preprocess queue, processing", msg.Body) + fmt.Printf(" Preprocessing book (binarising and wiping)\n") + err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess(thresholds), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) + fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output + resetTimer(stopIfQuiet, quietTime) + if err != nil { + return fmt.Errorf("Error during preprocess: %v", err) + } + case <-checkWipeQueue: + msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) + checkWipeQueue = time.After(PauseBetweenChecks) + if err != nil { + return fmt.Errorf("Error checking wipeonly queue, %v", err) + } + if msg.Handle == "" { + conn.Log("No message received on wipeonly queue, sleeping") + continue + } + stopTimer(stopIfQuiet) + conn.Log("Message received on wipeonly queue, processing", msg.Body) + fmt.Printf(" Preprocessing book (wiping only)\n") + err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) + fmt.Printf(" OCRing pages ") // this is expected to be added to with dots by OCRPage output + resetTimer(stopIfQuiet, quietTime) + if err != nil { + return fmt.Errorf("Error during wipe: %v", err) + } + case <-checkOCRPageQueue: + msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs) + checkOCRPageQueue = time.After(PauseBetweenChecks) + if err != nil { + return fmt.Errorf("Error checking OCR Page queue: %v", err) + } + if msg.Handle == "" { + continue + } + // Have OCRPageQueue checked immediately after completion, as chances are high that + // there will be more pages that should be done without delay + checkOCRPageQueue = time.After(0) + stopTimer(stopIfQuiet) + conn.Log("Message received on OCR Page queue, processing", msg.Body) + fmt.Printf(".") + err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training, tesscmd), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + resetTimer(stopIfQuiet, quietTime) + if err != nil { + return fmt.Errorf("\nError during OCR Page process: %v", err) + } + case <-checkAnalyseQueue: + msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs) + checkAnalyseQueue = time.After(PauseBetweenChecks) + if err != nil { + return fmt.Errorf("Error checking analyse queue: %v", err) + } + if msg.Handle == "" { + conn.Log("No message received on analyse queue, sleeping") + continue + } + stopTimer(stopIfQuiet) + conn.Log("Message received on analyse queue, processing", msg.Body) + fmt.Printf("\n Analysing OCR and compiling PDFs\n") + err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") + resetTimer(stopIfQuiet, quietTime) + if err != nil { + return fmt.Errorf("Error during analysis: %v", err) + } + case <-stopIfQuiet.C: + conn.Log("Processing finished") + return nil + } + } + + return fmt.Errorf("Ended unexpectedly") // should never be reached +} diff --git a/cmd/rmbook/main.go b/cmd/rmbook/main.go new file mode 100644 index 0000000..fcacc2e --- /dev/null +++ b/cmd/rmbook/main.go @@ -0,0 +1,87 @@ +// Copyright 2020 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// rmbook removes a book from cloud storage. +package main + +import ( + "flag" + "fmt" + "log" + + "rescribe.xyz/bookpipeline" +) + +const usage = `Usage: rmbook [-dryrun] bookname + +Removes a book from cloud storage. +` + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} + +type RmPipeliner interface { + MinimalInit() error + WIPStorageId() string + DeleteObjects(bucket string, keys []string) error + ListObjects(bucket string, prefix string) ([]string, error) +} + +func main() { + dryrun := flag.Bool("dryrun", false, "print which files would be deleted but don't delete") + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + if flag.NArg() < 1 { + flag.Usage() + return + } + + var n NullWriter + verboselog := log.New(n, "", log.LstdFlags) + + var conn RmPipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + + fmt.Println("Setting up cloud connection") + err := conn.MinimalInit() + if err != nil { + log.Fatalln("Error setting up cloud connection:", err) + } + + bookname := flag.Arg(0) + "/" + + fmt.Println("Getting list of files for book") + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + log.Fatalln("Error in listing book items:", err) + } + + if len(objs) == 0 { + log.Fatalln("No files found for book:", bookname) + } + + if *dryrun { + fmt.Printf("I would delete these files:\n") + for _, v := range objs { + fmt.Println(v) + } + return + } + + fmt.Println("Deleting all files for book") + err = conn.DeleteObjects(conn.WIPStorageId(), objs) + if err != nil { + log.Fatalln("Error deleting book files:", err) + } + + fmt.Println("Finished deleting files") +} diff --git a/cmd/trimqueue/main.go b/cmd/trimqueue/main.go new file mode 100644 index 0000000..cf65c4d --- /dev/null +++ b/cmd/trimqueue/main.go @@ -0,0 +1,84 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// trimqueue deletes any messages in a queue that match a specified +// prefix. +package main + +import ( + "flag" + "fmt" + "log" + + "rescribe.xyz/bookpipeline" +) + +const usage = `Usage: trimprefix qname prefix + +trimqueue deletes any messages in a queue that match a specified +prefix. + +Valid queue names: +- preprocess +- wipeonly +- ocrpage +- analyse +` + +type QueuePipeliner interface { + Init() error + RemovePrefixesFromQueue(url string, prefix string) error + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string +} + +func main() { + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + if flag.NArg() != 2 { + flag.Usage() + return + } + + var conn QueuePipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2"} + + err := conn.Init() + if err != nil { + log.Fatalln("Error setting up cloud connection:", err) + } + + qdetails := []struct { + id, name string + }{ + {conn.PreQueueId(), "preprocess"}, + {conn.WipeQueueId(), "wipeonly"}, + {conn.OCRPageQueueId(), "ocrpage"}, + {conn.AnalyseQueueId(), "analyse"}, + } + + qname := flag.Arg(0) + + var qid string + for i, n := range qdetails { + if n.name == qname { + qid = qdetails[i].id + break + } + } + if qid == "" { + log.Fatalln("Error, no queue named", qname) + } + + err = conn.RemovePrefixesFromQueue(qid, flag.Arg(1)) + if err != nil { + log.Fatalln("Error removing prefixes from queue", qname, ":", err) + } +} |