diff options
Diffstat (limited to 'internal/pipeline')
-rw-r--r-- | internal/pipeline/get.go | 34 | ||||
-rw-r--r-- | internal/pipeline/pipeline.go | 251 | ||||
-rw-r--r-- | internal/pipeline/put.go | 35 | ||||
-rw-r--r-- | internal/pipeline/util.go | 16 | ||||
-rw-r--r-- | internal/pipeline/util_windows.go | 16 |
5 files changed, 318 insertions, 34 deletions
diff --git a/internal/pipeline/get.go b/internal/pipeline/get.go index 960c8f7..8fac060 100644 --- a/internal/pipeline/get.go +++ b/internal/pipeline/get.go @@ -12,7 +12,7 @@ import ( "strings" ) -func DownloadBestPages(dir string, name string, conn Downloader, pluspngs bool) error { +func DownloadBestPages(dir string, name string, conn Downloader) error { key := filepath.Join(name, "best") fn := filepath.Join(dir, "best") err := conn.Download(conn.WIPStorageId(), key, fn) @@ -35,12 +35,23 @@ func DownloadBestPages(dir string, name string, conn Downloader, pluspngs bool) return fmt.Errorf("Failed to download file %s: %v", key, err) } } + return nil +} - if !pluspngs { - return nil +func DownloadBestPngs(dir string, name string, conn Downloader) error { + key := filepath.Join(name, "best") + fn := filepath.Join(dir, "best") + err := conn.Download(conn.WIPStorageId(), key, fn) + if err != nil { + return fmt.Errorf("Failed to download 'best' file: %v", err) } + f, err := os.Open(fn) + if err != nil { + return fmt.Errorf("Failed to open best file: %v", err) + } + defer f.Close() - s = bufio.NewScanner(f) + s := bufio.NewScanner(f) for s.Scan() { imgname := strings.Replace(s.Text(), ".hocr", ".png", 1) key = filepath.Join(name, imgname) @@ -55,14 +66,22 @@ func DownloadBestPages(dir string, name string, conn Downloader, pluspngs bool) } func DownloadPdfs(dir string, name string, conn Downloader) error { - for _, suffix := range []string{".colour.pdf", ".binarised.pdf"} { + anydone := false + errmsg := "" + for _, suffix := range []string{".colour.pdf", ".binarised.pdf", ".original.pdf"} { key := filepath.Join(name, name+suffix) fn := filepath.Join(dir, name+suffix) err := conn.Download(conn.WIPStorageId(), key, fn) if err != nil { - return fmt.Errorf("Failed to download PDF %s: %v", key, err) + _ = os.Remove(fn) + errmsg += fmt.Sprintf("Failed to download PDF %s: %v\n", key, err) + } else { + anydone = true } } + if anydone == false { + return fmt.Errorf("No PDFs could be downloaded, error(s): %v", errmsg) + } return nil } @@ -71,7 +90,8 @@ func DownloadAnalyses(dir string, name string, conn Downloader) error { key := filepath.Join(name, a) fn := filepath.Join(dir, a) err := conn.Download(conn.WIPStorageId(), key, fn) - if err != nil { + // ignore errors with graph.png, as it will not exist in the case of a 1 page book + if err != nil && a != "graph.png" { return fmt.Errorf("Failed to download analysis file %s: %v", key, err) } } diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index e1a2c40..d8beeb9 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -11,6 +11,7 @@ package pipeline import ( "bytes" + "context" "fmt" "io/ioutil" "log" @@ -62,6 +63,7 @@ type Queuer interface { DelFromQueue(url string, handle string) error Log(v ...interface{}) OCRPageQueueId() string + PreNoWipeQueueId() string PreQueueId() string QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) WipeQueueId() string @@ -71,6 +73,7 @@ type UploadQueuer interface { Log(v ...interface{}) Upload(bucket string, key string, path string) error WIPStorageId() string + PreNoWipeQueueId() string PreQueueId() string WipeQueueId() string OCRPageQueueId() string @@ -92,6 +95,7 @@ type Pipeliner interface { ListObjects(bucket string, prefix string) ([]string, error) Log(v ...interface{}) OCRPageQueueId() string + PreNoWipeQueueId() string PreQueueId() string QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) Upload(bucket string, key string, path string) error @@ -129,8 +133,17 @@ func GetMailSettings() (mailSettings, error) { // dir, putting each successfully downloaded file name into the // process channel. If an error occurs it is sent to the errc channel // and the function returns early. -func download(dl chan string, process chan string, conn Downloader, dir string, errc chan error, logger *log.Logger) { +func download(ctx context.Context, dl chan string, process chan string, conn Downloader, dir string, errc chan error, logger *log.Logger) { for key := range dl { + select { + case <-ctx.Done(): + for range dl { + } // consume the rest of the receiving channel so it isn't blocked + errc <- ctx.Err() + close(process) + return + default: + } fn := filepath.Join(dir, filepath.Base(key)) logger.Println("Downloading", key) err := conn.Download(conn.WIPStorageId(), key, fn) @@ -151,8 +164,16 @@ func download(dl chan string, process chan string, conn Downloader, dir string, // once it has been successfully uploaded. The done channel is // then written to to signal completion. If an error occurs it // is sent to the errc channel and the function returns early. -func up(c chan string, done chan bool, conn Uploader, bookname string, errc chan error, logger *log.Logger) { +func up(ctx context.Context, c chan string, done chan bool, conn Uploader, bookname string, errc chan error, logger *log.Logger) { for path := range c { + select { + case <-ctx.Done(): + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- ctx.Err() + return + default: + } name := filepath.Base(path) key := bookname + "/" + name logger.Println("Uploading", key) @@ -181,8 +202,16 @@ func up(c chan string, done chan bool, conn Uploader, bookname string, errc chan // added to the toQueue once it has been uploaded. The done channel // is then written to to signal completion. If an error occurs it // is sent to the errc channel and the function returns early. -func upAndQueue(c chan string, done chan bool, toQueue string, conn UploadQueuer, bookname string, training string, errc chan error, logger *log.Logger) { +func upAndQueue(ctx context.Context, c chan string, done chan bool, toQueue string, conn UploadQueuer, bookname string, training string, errc chan error, logger *log.Logger) { for path := range c { + select { + case <-ctx.Done(): + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- ctx.Err() + return + default: + } name := filepath.Base(path) key := bookname + "/" + name logger.Println("Uploading", key) @@ -213,11 +242,19 @@ func upAndQueue(c chan string, done chan bool, toQueue string, conn UploadQueuer done <- true } -func Preprocess(thresholds []float64) func(chan string, chan string, chan error, *log.Logger) { - return func(pre chan string, up chan string, errc chan error, logger *log.Logger) { +func Preprocess(thresholds []float64, nowipe bool) func(context.Context, chan string, chan string, chan error, *log.Logger) { + return func(ctx context.Context, pre chan string, up chan string, errc chan error, logger *log.Logger) { for path := range pre { + select { + case <-ctx.Done(): + for range pre { + } // consume the rest of the receiving channel so it isn't blocked + errc <- ctx.Err() + return + default: + } logger.Println("Preprocessing", path) - done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, true, 5, 30, 120, 30) + done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, !nowipe, 5, 30, 120, 30) if err != nil { for range pre { } // consume the rest of the receiving channel so it isn't blocked @@ -233,8 +270,16 @@ func Preprocess(thresholds []float64) func(chan string, chan string, chan error, } } -func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { +func Wipe(ctx context.Context, towipe chan string, up chan string, errc chan error, logger *log.Logger) { for path := range towipe { + select { + case <-ctx.Done(): + for range towipe { + } // consume the rest of the receiving channel so it isn't blocked + errc <- ctx.Err() + return + default: + } logger.Println("Wiping", path) s := strings.Split(path, ".") base := strings.Join(s[:len(s)-1], "") @@ -251,15 +296,24 @@ func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logge close(up) } -func Ocr(training string, tesscmd string) func(chan string, chan string, chan error, *log.Logger) { - return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { +func Ocr(training string, tesscmd string) func(context.Context, chan string, chan string, chan error, *log.Logger) { + return func(ctx context.Context, toocr chan string, up chan string, errc chan error, logger *log.Logger) { if tesscmd == "" { tesscmd = "tesseract" } for path := range toocr { + select { + case <-ctx.Done(): + for range toocr { + } // consume the rest of the receiving channel so it isn't blocked + errc <- ctx.Err() + return + default: + } logger.Println("OCRing", path) name := strings.Replace(path, ".png", "", 1) cmd := exec.Command(tesscmd, "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") + HideCmd(cmd) var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr @@ -276,13 +330,21 @@ func Ocr(training string, tesscmd string) func(chan string, chan string, chan er } } -func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Logger) { - return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { +func Analyse(conn Downloader, mkfullpdf bool) func(context.Context, chan string, chan string, chan error, *log.Logger) { + return func(ctx context.Context, toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { confs := make(map[string][]*bookpipeline.Conf) bestconfs := make(map[string]*bookpipeline.Conf) savedir := "" for path := range toanalyse { + select { + case <-ctx.Done(): + for range toanalyse { + } // consume the rest of the receiving channel so it isn't blocked + errc <- ctx.Err() + return + default: + } if savedir == "" { savedir = filepath.Dir(path) } @@ -316,6 +378,13 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } defer f.Close() + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + logger.Println("Finding best confidence for each page, and saving all confidences") for base, conf := range confs { var best float64 @@ -334,6 +403,13 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo f.Close() up <- fn + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + logger.Println("Creating best file listing the best file for each page") fn = filepath.Join(savedir, "best") f, err = os.Create(fn) @@ -354,6 +430,13 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } sort.Strings(pgs) + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + logger.Println("Downloading binarised and original images to create PDFs") bookname, err := filepath.Rel(os.TempDir(), savedir) if err != nil { @@ -374,6 +457,13 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } binhascontent, colourhascontent := false, false + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + var colourimgs, binimgs []pageimg for _, pg := range pgs { @@ -393,6 +483,13 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } for _, pg := range binimgs { + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + logger.Println("Downloading binarised page to add to PDF", pg.img) err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img)) if err != nil { @@ -412,6 +509,13 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } } + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + if binhascontent { fn = filepath.Join(savedir, bookname+".binarised.pdf") err = binarisedpdf.Save(fn) @@ -423,6 +527,13 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } for _, pg := range colourimgs { + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + logger.Println("Downloading colour page to add to PDF", pg.img) colourfn := pg.img err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) @@ -448,6 +559,14 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } } } + + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + if colourhascontent { fn = filepath.Join(savedir, bookname+".colour.pdf") err = colourpdf.Save(fn) @@ -458,6 +577,71 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo up <- fn } + if mkfullpdf { + fullsizepdf := new(bookpipeline.Fpdf) + err = fullsizepdf.Setup() + if err != nil { + errc <- fmt.Errorf("Failed to set up PDF: %s", err) + return + } + for _, pg := range colourimgs { + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + + logger.Println("Downloading colour page to add to PDF", pg.img) + colourfn := pg.img + err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) + if err != nil { + colourfn = strings.Replace(pg.img, ".jpg", ".png", 1) + logger.Println("Download failed; trying", colourfn) + err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) + if err != nil { + logger.Println("Download failed; skipping page", pg.img) + } + } + if err == nil { + err = fullsizepdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), false) + if err != nil { + errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) + return + } + err = os.Remove(filepath.Join(savedir, colourfn)) + if err != nil { + errc <- err + return + } + } + } + + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + + if colourhascontent { + fn = filepath.Join(savedir, bookname+".original.pdf") + err = fullsizepdf.Save(fn) + if err != nil { + errc <- fmt.Errorf("Failed to save full size pdf: %s", err) + return + } + up <- fn + } + } + + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + logger.Println("Creating graph") fn = filepath.Join(savedir, "graph.png") f, err = os.Create(fn) @@ -467,11 +651,24 @@ func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Lo } defer f.Close() err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) + if err != nil { + _ = os.Remove(fn) + } if err != nil && err.Error() != "Not enough valid confidences" { errc <- fmt.Errorf("Error rendering graph: %s", err) return } - up <- fn + + select { + case <-ctx.Done(): + errc <- ctx.Err() + return + default: + } + + if err == nil { + up <- fn + } close(up) } @@ -541,7 +738,7 @@ func allOCRed(bookname string, conn Lister) bool { // OcrPage OCRs a page based on a message. It may make sense to // roll this back into processBook (on which it is based) once // working well. -func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { +func OcrPage(ctx context.Context, msg bookpipeline.Qmsg, conn Pipeliner, process func(context.Context, chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { dl := make(chan string) msgc := make(chan bookpipeline.Qmsg) processc := make(chan string) @@ -565,19 +762,23 @@ func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, ch go heartbeat(conn, t, msg, fromQueue, msgc, errc) // these functions will do their jobs when their channels have data - go download(dl, processc, conn, d, errc, conn.GetLogger()) - go process(processc, upc, errc, conn.GetLogger()) - go up(upc, done, conn, bookname, errc, conn.GetLogger()) + go download(ctx, dl, processc, conn, d, errc, conn.GetLogger()) + go process(ctx, processc, upc, errc, conn.GetLogger()) + go up(ctx, upc, done, conn, bookname, errc, conn.GetLogger()) dl <- msgparts[0] close(dl) - // wait for either the done or errc channel to be sent to + // wait for either the done or errc channels to be sent to select { case err = <-errc: t.Stop() _ = os.RemoveAll(d) return err + case <-ctx.Done(): + t.Stop() + _ = os.RemoveAll(d) + return ctx.Err() case <-done: } @@ -619,7 +820,7 @@ func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, ch return nil } -func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { +func ProcessBook(ctx context.Context, msg bookpipeline.Qmsg, conn Pipeliner, process func(context.Context, chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { dl := make(chan string) msgc := make(chan bookpipeline.Qmsg) processc := make(chan string) @@ -645,12 +846,12 @@ func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string go heartbeat(conn, t, msg, fromQueue, msgc, errc) // these functions will do their jobs when their channels have data - go download(dl, processc, conn, d, errc, conn.GetLogger()) - go process(processc, upc, errc, conn.GetLogger()) + go download(ctx, dl, processc, conn, d, errc, conn.GetLogger()) + go process(ctx, processc, upc, errc, conn.GetLogger()) if toQueue == conn.OCRPageQueueId() { - go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) + go upAndQueue(ctx, upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) } else { - go up(upc, done, conn, bookname, errc, conn.GetLogger()) + go up(ctx, upc, done, conn, bookname, errc, conn.GetLogger()) } conn.Log("Getting list of objects to download") @@ -682,7 +883,7 @@ func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string // complete, and will fill the ocrpage queue with parts which succeeded // on each run, so in that case it's better to delete the message from // the queue and notify us. - if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { + if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() || fromQueue == conn.PreNoWipeQueueId() { conn.Log("Deleting message from queue due to a bad error", fromQueue) err2 := conn.DelFromQueue(fromQueue, msg.Handle) if err2 != nil { @@ -711,6 +912,10 @@ func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string } } return err + case <-ctx.Done(): + t.Stop() + _ = os.RemoveAll(d) + return ctx.Err() case <-done: } diff --git a/internal/pipeline/put.go b/internal/pipeline/put.go index d44f74f..fed04f8 100644 --- a/internal/pipeline/put.go +++ b/internal/pipeline/put.go @@ -5,6 +5,7 @@ package pipeline import ( + "context" "fmt" "image" _ "image/jpeg" @@ -43,16 +44,25 @@ func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { // CheckImages checks that all files with a ".jpg" or ".png" suffix // in a directory are images that can be decoded (skipping dotfiles) -func CheckImages(dir string) error { +func CheckImages(ctx context.Context, dir string) error { checker := make(fileWalk) go func() { _ = filepath.Walk(dir, checker.Walk) close(checker) }() + n := 0 for path := range checker { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } suffix := filepath.Ext(path) lsuffix := strings.ToLower(suffix) + if lsuffix == ".jpeg" { + lsuffix = ".jpg" + } if lsuffix != ".jpg" && lsuffix != ".png" { continue } @@ -64,6 +74,11 @@ func CheckImages(dir string) error { if err != nil { return fmt.Errorf("Decoding image %s failed: %v", path, err) } + n++ + } + + if n == 0 { + return fmt.Errorf("No images found") } return nil @@ -71,7 +86,10 @@ func CheckImages(dir string) error { // DetectQueueType detects which queue to use based on the preponderance // of files of a particular extension in a directory -func DetectQueueType(dir string, conn Queuer) string { +func DetectQueueType(dir string, conn Queuer, nowipe bool) string { + if nowipe { + return conn.PreNoWipeQueueId() + } pngdirs, _ := filepath.Glob(dir + "/*.png") jpgdirs, _ := filepath.Glob(dir + "/*.jpg") pngcount := len(pngdirs) @@ -89,7 +107,7 @@ func DetectQueueType(dir string, conn Queuer) string { // slash. It also appends all file names with sequential numbers, like // 0001, to ensure they are appropriately named for further processing // in the pipeline. -func UploadImages(dir string, bookname string, conn Uploader) error { +func UploadImages(ctx context.Context, dir string, bookname string, conn Uploader) error { files, err := ioutil.ReadDir(dir) if err != nil { fmt.Errorf("Failed to read directory %s: %v", dir, err) @@ -97,11 +115,19 @@ func UploadImages(dir string, bookname string, conn Uploader) error { filenum := 0 for _, file := range files { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } if file.IsDir() { continue } origsuffix := filepath.Ext(file.Name()) lsuffix := strings.ToLower(origsuffix) + if lsuffix == ".jpeg" { + lsuffix = ".jpg" + } if lsuffix != ".jpg" && lsuffix != ".png" { continue } @@ -109,7 +135,8 @@ func UploadImages(dir string, bookname string, conn Uploader) error { origbase := strings.TrimSuffix(origname, origsuffix) origpath := filepath.Join(dir, origname) - newname := fmt.Sprintf("%s_%04d%s", origbase, filenum, origsuffix) + safebase := strings.ReplaceAll(origbase, " ", "_") + newname := fmt.Sprintf("%s_%04d%s", safebase, filenum, lsuffix) err = conn.Upload(conn.WIPStorageId(), filepath.Join(bookname, newname), origpath) if err != nil { return fmt.Errorf("Failed to upload %s: %v", origpath, err) diff --git a/internal/pipeline/util.go b/internal/pipeline/util.go new file mode 100644 index 0000000..092a9ee --- /dev/null +++ b/internal/pipeline/util.go @@ -0,0 +1,16 @@ +// Copyright 2022 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// +build !windows + +package pipeline + +import ( + "os/exec" +) + +// HideCmd adds a flag to hide any console window from being +// displayed, if necessary for the platform +func HideCmd(cmd *exec.Cmd) { +} diff --git a/internal/pipeline/util_windows.go b/internal/pipeline/util_windows.go new file mode 100644 index 0000000..08c321e --- /dev/null +++ b/internal/pipeline/util_windows.go @@ -0,0 +1,16 @@ +// Copyright 2022 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +package pipeline + +import ( + "os/exec" + "syscall" +) + +// HideCmd adds a flag to hide any console window from being +// displayed, if necessary for the platform +func HideCmd(cmd *exec.Cmd) { + cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true} +} |