summaryrefslogtreecommitdiff
path: root/pipelinepreprocess
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-08-20 17:21:56 +0100
committerNick White <git@njw.name>2019-08-20 17:22:32 +0100
commit7ba8858782bf0b2f998ccc15f91d839074df94fc (patch)
tree90ae5afafbe4ea374ff73fb36b1bc959c9ce9bf8 /pipelinepreprocess
parent45de180dec57496b965c04285659a5a199cea603 (diff)
Handle errors properly with goroutines
Diffstat (limited to 'pipelinepreprocess')
-rw-r--r--pipelinepreprocess/main.go70
1 files changed, 41 insertions, 29 deletions
diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go
index 20a682b..197dd9c 100644
--- a/pipelinepreprocess/main.go
+++ b/pipelinepreprocess/main.go
@@ -1,8 +1,5 @@
package main
// TODO: have logs go somewhere useful, like email
-// TODO: handle errors more smartly than just always fatal erroring
-// - read the sdk guarantees on retrying and ensure we retry some times before giving up if necessary
-// - cancel the current book processing rather than killing the program in the case of a nonrecoverable error
// TODO: check if images are prebinarised and if so skip multiple binarisation
import (
@@ -63,40 +60,42 @@ type Qmsg struct {
Handle, Body string
}
-func download(dl chan string, pre chan string, conn Pipeliner, dir string) {
+func download(dl chan string, pre chan string, conn Pipeliner, dir string, errc chan error) {
for key := range dl {
fn := filepath.Join(dir, filepath.Base(key))
err := conn.DownloadFromInProgress(key, fn)
if err != nil {
- log.Fatalln("Failed to download", key, err)
+ close(pre)
+ errc <- err
+ return
}
pre <- fn
}
close(pre)
}
-func up(c chan string, done chan bool, conn Pipeliner, bookname string) {
+func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error) {
for path := range c {
name := filepath.Base(path)
key := filepath.Join(bookname, name)
err := conn.UploadToInProgress(key, path)
if err != nil {
- log.Fatalln("Failed to upload", path, err)
+ errc <- err
+ return
}
}
done <- true
}
-func preprocess(pre chan string, up chan string, logger *log.Logger) {
+func preprocess(pre chan string, up chan string, logger *log.Logger, errc chan error) {
for path := range pre {
logger.Println("Preprocessing", path)
done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30)
if err != nil {
- // TODO: have error channel to signal that things are screwy, which
- // can close channels and stop the heartbeat, rather than just kill
- // the whole program
- log.Fatalln("Error preprocessing", path, err)
+ close(up)
+ errc <- err
+ return
}
for _, p := range done {
up <- p
@@ -106,17 +105,16 @@ func preprocess(pre chan string, up chan string, logger *log.Logger) {
}
// TODO: use Tesseract API rather than calling the executable
-func ocr(toocr chan string, up chan string, logger *log.Logger) {
+func ocr(toocr chan string, up chan string, logger *log.Logger, errc chan error) {
for path := range toocr {
logger.Println("OCRing", path)
name := strings.Replace(path, ".png", "", 1) // TODO: handle any file extension
cmd := exec.Command("tesseract", "-l", training, path, name, "hocr")
err := cmd.Run()
if err != nil {
- // TODO: have error channel to signal that things are screwy, which
- // can close channels and stop the heartbeat, rather than just kill
- // the whole program
- log.Fatalln("Error ocring", path, err)
+ close(up)
+ errc <- errors.New(fmt.Sprintf("Error ocring %s: %s", path, err))
+ return
}
up <- name + ".hocr"
}
@@ -138,13 +136,14 @@ func preProcBook(msg Qmsg, conn Pipeliner) error {
dl := make(chan string)
pre := make(chan string)
- upc := make(chan string) // TODO: rename
- done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated
+ upc := make(chan string)
+ done := make(chan bool)
+ errc := make(chan error)
// these functions will do their jobs when their channels have data
- go download(dl, pre, conn, d)
- go preprocess(pre, upc, conn.Logger())
- go up(upc, done, conn, bookname)
+ go download(dl, pre, conn, d, errc)
+ go preprocess(pre, upc, conn.Logger(), errc)
+ go up(upc, done, conn, bookname, errc)
conn.Logger().Println("Getting list of objects to download")
todl, err := conn.ListToPreprocess(bookname)
@@ -157,8 +156,14 @@ func preProcBook(msg Qmsg, conn Pipeliner) error {
dl <- d
}
- // wait for the done channel to be posted to
- <-done
+ // wait for either the done or errc channel to be sent to
+ select {
+ case err = <-errc:
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return err
+ case <- done:
+ }
conn.Logger().Println("Sending", bookname, "to OCR queue")
err = conn.AddToOCRQueue(bookname)
@@ -202,11 +207,12 @@ func ocrBook(msg Qmsg, conn Pipeliner) error {
ocrc := make(chan string)
upc := make(chan string) // TODO: rename
done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated
+ errc := make(chan error)
// these functions will do their jobs when their channels have data
- go download(dl, ocrc, conn, d)
- go ocr(ocrc, upc, conn.Logger())
- go up(upc, done, conn, bookname)
+ go download(dl, ocrc, conn, d, errc)
+ go ocr(ocrc, upc, conn.Logger(), errc)
+ go up(upc, done, conn, bookname, errc)
conn.Logger().Println("Getting list of objects to download")
todl, err := conn.ListToOCR(bookname)
@@ -219,8 +225,14 @@ func ocrBook(msg Qmsg, conn Pipeliner) error {
dl <- a
}
- // wait for the done channel to be posted to
- <-done
+ // wait for either the done or errc channel to be sent to
+ select {
+ case err = <-errc:
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return err
+ case <- done:
+ }
conn.Logger().Println("Sending", bookname, "to analyse queue")
err = conn.AddToAnalyseQueue(bookname)