diff options
Diffstat (limited to 'internal/pipeline/pipeline.go')
-rw-r--r-- | internal/pipeline/pipeline.go | 251 |
1 files changed, 228 insertions, 23 deletions
diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index e1a2c40..d8beeb9 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -11,6 +11,7 @@ package pipeline import ( "bytes" + "context" "fmt" "io/ioutil" "log" @@ -62,6 +63,7 @@ type Queuer interface { DelFromQueue(url string, handle string) error Log(v ...interface{}) OCRPageQueueId() string + PreNoWipeQueueId() string PreQueueId() string QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) WipeQueueId() string @@ -71,6 +73,7 @@ type UploadQueuer interface { Log(v ...interface{}) Upload(bucket string, key string, path string) error WIPStorageId() string + PreNoWipeQueueId() string PreQueueId() string WipeQueueId() string OCRPageQueueId() string @@ -92,6 +95,7 @@ type Pipeliner interface { ListObjects(bucket string, prefix string) ([]string, error) Log(v ...interface{}) OCRPageQueueId() string + PreNoWipeQueueId() string PreQueueId() string QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) Upload(bucket string, key string, path string) error @@ -129,8 +133,17 @@ func GetMailSettings() (mailSettings, error) { // dir, putting each successfully downloaded file name into the // process channel. If an error occurs it is sent to the errc channel // and the function returns early. -func download(dl chan string, process chan string, conn Downloader, dir string, errc chan error, logger *log.Logger) { +func download(ctx context.Context, dl chan string, process chan string, conn Downloader, dir string, errc chan error, logger *log.Logger) { for key := range dl { + select { + case <-ctx.Done(): + for range dl { + } // consume the rest of the receiving channel so it isn't blocked + errc <- ctx.Err() + close(process) + return + default: + } fn := filepath.Join(dir, filepath.Base(key)) logger.Println("Downloading", key) err := conn.Download(conn.WIPStorageId(), key, fn) @@ -151,8 +164,16 @@ func download(dl chan string, process chan string, conn Downloader, dir string, // once it has been successfully uploaded. The done channel is // then written to to signal completion. If an error occurs it // is sent to the errc channel and the function returns early. -func up(c chan string, done chan bool, conn Uploader, bookname string, errc chan error, logger *log.Logger) { +func up(ctx context.Context, c chan string, done chan bool, conn Uploader, bookname string, errc chan error, logger *log.Logger) { for path := range c { + select { + case <-ctx.Done(): + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- ctx.Err() + return + default: + } name := filepath.Base(path) key := bookname + "/" + name logger.Println("Uploading", key) @@ -181,8 +202,16 @@ func up(c chan string, done chan bool, conn Uploader, bookname string, errc chan // added to the toQueue once it has been uploaded. The done channel // is then written to to signal completion. If an error occurs it // is sent to the errc channel and the function returns early. -func upAndQueue(c chan string, done chan bool, toQueue string, conn UploadQueuer, bookname string, training string, errc chan error, logger *log.Logger) { +func upAndQueue(ctx context.Context, c chan string, done chan bool, toQueue string, conn UploadQueuer, bookname string, training string, errc chan error, logger *log.Logger) { for path := range c { + select { + case <-ctx.Done(): + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- ctx.Err() + return + default: + } name := filepath.Base(path) key := bookname + "/" + name logger.Println("Uploading", key) @@ -213,11 +242,19 @@ func upAndQueue(c chan string, done chan bool, toQueue string, conn UploadQueuer done <- true } -func Preprocess(thresholds []float64) func(chan string, chan string, chan error, *log.Logger) { - return func(pre chan string, up chan string, errc chan error, logger *log.Logger) { +func Preprocess(thresholds []float64, nowipe bool) func(context.Context, chan string, chan string, chan error, *log.Logger) { + return func(ctx context.Context, pre chan string, up chan string, errc chan error, logger *log.Logger) { for path := range pre { + select { + case <-ctx.Done(): + for range pre { + } // consume the rest of the receiving channel so it isn't blocked + errc <- ctx.Err() + return + default: + } logger.Println("Preprocessing", path) - done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, true, 5, 30, 120, 30) + done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, !nowipe, 5, 30, 120, 30) if err != nil { for range pre { } // consume the rest of the receiving channel so it isn't blocked @@ -233,8 +270,16 @@ func Preprocess(thresholds []float64) func(chan string, chan string, chan error, } } -func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { +func Wipe(ctx context.Context, towipe chan string, up chan string, errc chan error, logger *log.Logger) { for path := range towipe { + select { + case <-ctx.Done(): + for range towipe { + } // consume the rest of the receiving channel so it isn't blocked + errc <- ctx.Err() + return + default: + } logger.Println("Wiping", path) s := strings.Split(path, ".") base := strings.Join(s[:len(s)-1], "") @@ -251,15 +296,24 @@ func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logge close(up) } -func Ocr(training string, tesscmd string) func(chan string, chan string, chan error, *log.Logger) { - return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { +func Ocr(training string, tesscmd string) func(context.Context, chan string, chan string, chan error, *log.Logger) { + return func(ctx context.Context, toocr chan string, up chan string, errc chan error, logger *log.Logger) { if tesscmd == "" { tesscmd = "tesseract" } for path := range toocr { + select { + case <-ctx.Done(): + for range toocr { + } // consume the rest of the receiving channel so it isn't blocked + errc <- ctx.Err() + return + default: + } logger.Println("OCRing", path) name := strings.Replace(path, ".png", "", 1) cmd := exec.Command(tesscmd, "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") + HideCmd(cmd) var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr @@ -276,13 +330,21 @@ func Ocr(training string, tesscmd string) func(chan string, chan string, chan er } } -func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Logger) { - return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { +func Analyse(conn Downloader, mkfullpdf bool) func(context.Context, chan string, chan string, chan error, *log.Logger) { + return func(ctx context.Context, toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { confs := make(map[string][]*bookpipeline.Conf) bestconfs := make(map[string]*bookpipeline.Conf) savedir := "" for path := range toanalyse { + select { + case <-ctx.Done(): + for range toanalyse { + } // consume the rest of the receiving channel so it isn't blocked + errc <- ctx.Err() + return + default: + } if savedir == "" { savedir = filepath.Dir(path) } @@ -316,6 +378,13 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } defer f.Close() + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + logger.Println("Finding best confidence for each page, and saving all confidences") for base, conf := range confs { var best float64 @@ -334,6 +403,13 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo f.Close() up <- fn + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + logger.Println("Creating best file listing the best file for each page") fn = filepath.Join(savedir, "best") f, err = os.Create(fn) @@ -354,6 +430,13 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } sort.Strings(pgs) + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + logger.Println("Downloading binarised and original images to create PDFs") bookname, err := filepath.Rel(os.TempDir(), savedir) if err != nil { @@ -374,6 +457,13 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } binhascontent, colourhascontent := false, false + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + var colourimgs, binimgs []pageimg for _, pg := range pgs { @@ -393,6 +483,13 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } for _, pg := range binimgs { + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + logger.Println("Downloading binarised page to add to PDF", pg.img) err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img)) if err != nil { @@ -412,6 +509,13 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } } + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + if binhascontent { fn = filepath.Join(savedir, bookname+".binarised.pdf") err = binarisedpdf.Save(fn) @@ -423,6 +527,13 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } for _, pg := range colourimgs { + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + logger.Println("Downloading colour page to add to PDF", pg.img) colourfn := pg.img err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) @@ -448,6 +559,14 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } } } + + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + if colourhascontent { fn = filepath.Join(savedir, bookname+".colour.pdf") err = colourpdf.Save(fn) @@ -458,6 +577,71 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo up <- fn } + if mkfullpdf { + fullsizepdf := new(bookpipeline.Fpdf) + err = fullsizepdf.Setup() + if err != nil { + errc <- fmt.Errorf("Failed to set up PDF: %s", err) + return + } + for _, pg := range colourimgs { + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + + logger.Println("Downloading colour page to add to PDF", pg.img) + colourfn := pg.img + err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) + if err != nil { + colourfn = strings.Replace(pg.img, ".jpg", ".png", 1) + logger.Println("Download failed; trying", colourfn) + err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) + if err != nil { + logger.Println("Download failed; skipping page", pg.img) + } + } + if err == nil { + err = fullsizepdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), false) + if err != nil { + errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) + return + } + err = os.Remove(filepath.Join(savedir, colourfn)) + if err != nil { + errc <- err + return + } + } + } + + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + + if colourhascontent { + fn = filepath.Join(savedir, bookname+".original.pdf") + err = fullsizepdf.Save(fn) + if err != nil { + errc <- fmt.Errorf("Failed to save full size pdf: %s", err) + return + } + up <- fn + } + } + + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + logger.Println("Creating graph") fn = filepath.Join(savedir, "graph.png") f, err = os.Create(fn) @@ -467,11 +651,24 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } defer f.Close() err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) + if err != nil { + _ = os.Remove(fn) + } if err != nil && err.Error() != "Not enough valid confidences" { errc <- fmt.Errorf("Error rendering graph: %s", err) return } - up <- fn + + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + + if err == nil { + up <- fn + } close(up) } @@ -541,7 +738,7 @@ func allOCRed(bookname string, conn Lister) bool { // OcrPage OCRs a page based on a message. It may make sense to // roll this back into processBook (on which it is based) once // working well. -func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { +func OcrPage(ctx context.Context, msg bookpipeline.Qmsg, conn Pipeliner, process func(context.Context, chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { dl := make(chan string) msgc := make(chan bookpipeline.Qmsg) processc := make(chan string) @@ -565,19 +762,23 @@ func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, ch go heartbeat(conn, t, msg, fromQueue, msgc, errc) // these functions will do their jobs when their channels have data - go download(dl, processc, conn, d, errc, conn.GetLogger()) - go process(processc, upc, errc, conn.GetLogger()) - go up(upc, done, conn, bookname, errc, conn.GetLogger()) + go download(ctx, dl, processc, conn, d, errc, conn.GetLogger()) + go process(ctx, processc, upc, errc, conn.GetLogger()) + go up(ctx, upc, done, conn, bookname, errc, conn.GetLogger()) dl <- msgparts[0] close(dl) - // wait for either the done or errc channel to be sent to + // wait for either the done or errc channels to be sent to select { case err = <-errc: t.Stop() _ = os.RemoveAll(d) return err + case <-ctx.Done(): + t.Stop() + _ = os.RemoveAll(d) + return ctx.Err() case <-done: } @@ -619,7 +820,7 @@ func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, ch return nil } -func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { +func ProcessBook(ctx context.Context, msg bookpipeline.Qmsg, conn Pipeliner, process func(context.Context, chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { dl := make(chan string) msgc := make(chan bookpipeline.Qmsg) processc := make(chan string) @@ -645,12 +846,12 @@ func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string go heartbeat(conn, t, msg, fromQueue, msgc, errc) // these functions will do their jobs when their channels have data - go download(dl, processc, conn, d, errc, conn.GetLogger()) - go process(processc, upc, errc, conn.GetLogger()) + go download(ctx, dl, processc, conn, d, errc, conn.GetLogger()) + go process(ctx, processc, upc, errc, conn.GetLogger()) if toQueue == conn.OCRPageQueueId() { - go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) + go upAndQueue(ctx, upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) } else { - go up(upc, done, conn, bookname, errc, conn.GetLogger()) + go up(ctx, upc, done, conn, bookname, errc, conn.GetLogger()) } conn.Log("Getting list of objects to download") @@ -682,7 +883,7 @@ func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string // complete, and will fill the ocrpage queue with parts which succeeded // on each run, so in that case it's better to delete the message from // the queue and notify us. - if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { + if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() || fromQueue == conn.PreNoWipeQueueId() { conn.Log("Deleting message from queue due to a bad error", fromQueue) err2 := conn.DelFromQueue(fromQueue, msg.Handle) if err2 != nil { @@ -711,6 +912,10 @@ func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string } } return err + case <-ctx.Done(): + t.Stop() + _ = os.RemoveAll(d) + return ctx.Err() case <-done: } |