diff options
Diffstat (limited to 'pipelinepreprocess')
-rw-r--r-- | pipelinepreprocess/main.go | 70 |
1 files changed, 41 insertions, 29 deletions
diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go index 20a682b..197dd9c 100644 --- a/pipelinepreprocess/main.go +++ b/pipelinepreprocess/main.go @@ -1,8 +1,5 @@ package main // TODO: have logs go somewhere useful, like email -// TODO: handle errors more smartly than just always fatal erroring -// - read the sdk guarantees on retrying and ensure we retry some times before giving up if necessary -// - cancel the current book processing rather than killing the program in the case of a nonrecoverable error // TODO: check if images are prebinarised and if so skip multiple binarisation import ( @@ -63,40 +60,42 @@ type Qmsg struct { Handle, Body string } -func download(dl chan string, pre chan string, conn Pipeliner, dir string) { +func download(dl chan string, pre chan string, conn Pipeliner, dir string, errc chan error) { for key := range dl { fn := filepath.Join(dir, filepath.Base(key)) err := conn.DownloadFromInProgress(key, fn) if err != nil { - log.Fatalln("Failed to download", key, err) + close(pre) + errc <- err + return } pre <- fn } close(pre) } -func up(c chan string, done chan bool, conn Pipeliner, bookname string) { +func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error) { for path := range c { name := filepath.Base(path) key := filepath.Join(bookname, name) err := conn.UploadToInProgress(key, path) if err != nil { - log.Fatalln("Failed to upload", path, err) + errc <- err + return } } done <- true } -func preprocess(pre chan string, up chan string, logger *log.Logger) { +func preprocess(pre chan string, up chan string, logger *log.Logger, errc chan error) { for path := range pre { logger.Println("Preprocessing", path) done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30) if err != nil { - // TODO: have error channel to signal that things are screwy, which - // can close channels and stop the heartbeat, rather than just kill - // the whole program - log.Fatalln("Error preprocessing", path, err) + close(up) + errc <- err + return } for _, p := range done { up <- p @@ -106,17 +105,16 @@ func preprocess(pre chan string, up chan string, logger *log.Logger) { } // TODO: use Tesseract API rather than calling the executable -func ocr(toocr chan string, up chan string, logger *log.Logger) { +func ocr(toocr chan string, up chan string, logger *log.Logger, errc chan error) { for path := range toocr { logger.Println("OCRing", path) name := strings.Replace(path, ".png", "", 1) // TODO: handle any file extension cmd := exec.Command("tesseract", "-l", training, path, name, "hocr") err := cmd.Run() if err != nil { - // TODO: have error channel to signal that things are screwy, which - // can close channels and stop the heartbeat, rather than just kill - // the whole program - log.Fatalln("Error ocring", path, err) + close(up) + errc <- errors.New(fmt.Sprintf("Error ocring %s: %s", path, err)) + return } up <- name + ".hocr" } @@ -138,13 +136,14 @@ func preProcBook(msg Qmsg, conn Pipeliner) error { dl := make(chan string) pre := make(chan string) - upc := make(chan string) // TODO: rename - done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) // these functions will do their jobs when their channels have data - go download(dl, pre, conn, d) - go preprocess(pre, upc, conn.Logger()) - go up(upc, done, conn, bookname) + go download(dl, pre, conn, d, errc) + go preprocess(pre, upc, conn.Logger(), errc) + go up(upc, done, conn, bookname, errc) conn.Logger().Println("Getting list of objects to download") todl, err := conn.ListToPreprocess(bookname) @@ -157,8 +156,14 @@ func preProcBook(msg Qmsg, conn Pipeliner) error { dl <- d } - // wait for the done channel to be posted to - <-done + // wait for either the done or errc channel to be sent to + select { + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + return err + case <- done: + } conn.Logger().Println("Sending", bookname, "to OCR queue") err = conn.AddToOCRQueue(bookname) @@ -202,11 +207,12 @@ func ocrBook(msg Qmsg, conn Pipeliner) error { ocrc := make(chan string) upc := make(chan string) // TODO: rename done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated + errc := make(chan error) // these functions will do their jobs when their channels have data - go download(dl, ocrc, conn, d) - go ocr(ocrc, upc, conn.Logger()) - go up(upc, done, conn, bookname) + go download(dl, ocrc, conn, d, errc) + go ocr(ocrc, upc, conn.Logger(), errc) + go up(upc, done, conn, bookname, errc) conn.Logger().Println("Getting list of objects to download") todl, err := conn.ListToOCR(bookname) @@ -219,8 +225,14 @@ func ocrBook(msg Qmsg, conn Pipeliner) error { dl <- a } - // wait for the done channel to be posted to - <-done + // wait for either the done or errc channel to be sent to + select { + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + return err + case <- done: + } conn.Logger().Println("Sending", bookname, "to analyse queue") err = conn.AddToAnalyseQueue(bookname) |