From 7ba8858782bf0b2f998ccc15f91d839074df94fc Mon Sep 17 00:00:00 2001
From: Nick White <git@njw.name>
Date: Tue, 20 Aug 2019 17:21:56 +0100
Subject: Handle errors properly with goroutines

---
 pipelinepreprocess/main.go | 70 +++++++++++++++++++++++++++-------------------
 1 file changed, 41 insertions(+), 29 deletions(-)

(limited to 'pipelinepreprocess')

diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go
index 20a682b..197dd9c 100644
--- a/pipelinepreprocess/main.go
+++ b/pipelinepreprocess/main.go
@@ -1,8 +1,5 @@
 package main
 // TODO: have logs go somewhere useful, like email
-// TODO: handle errors more smartly than just always fatal erroring
-//       - read the sdk guarantees on retrying and ensure we retry some times before giving up if necessary
-//       - cancel the current book processing rather than killing the program in the case of a nonrecoverable error 
 // TODO: check if images are prebinarised and if so skip multiple binarisation
 
 import (
@@ -63,40 +60,42 @@ type Qmsg struct {
 	Handle, Body string
 }
 
-func download(dl chan string, pre chan string, conn Pipeliner, dir string) {
+func download(dl chan string, pre chan string, conn Pipeliner, dir string, errc chan error) {
 	for key := range dl {
 		fn := filepath.Join(dir, filepath.Base(key))
 		err := conn.DownloadFromInProgress(key, fn)
 		if err != nil {
-			log.Fatalln("Failed to download", key, err)
+			close(pre)
+			errc <- err
+			return
 		}
 		pre <- fn
 	}
 	close(pre)
 }
 
-func up(c chan string, done chan bool, conn Pipeliner, bookname string) {
+func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error) {
 	for path := range c {
 		name := filepath.Base(path)
 		key := filepath.Join(bookname, name)
 		err := conn.UploadToInProgress(key, path)
 		if err != nil {
-			log.Fatalln("Failed to upload", path, err)
+			errc <- err
+			return
 		}
 	}
 
 	done <- true
 }
 
-func preprocess(pre chan string, up chan string, logger *log.Logger) {
+func preprocess(pre chan string, up chan string, logger *log.Logger, errc chan error) {
 	for path := range pre {
 		logger.Println("Preprocessing", path)
 		done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30)
 		if err != nil {
-			// TODO: have error channel to signal that things are screwy, which
-			// can close channels and stop the heartbeat, rather than just kill
-			// the whole program
-			log.Fatalln("Error preprocessing", path, err)
+			close(up)
+			errc <- err
+			return
 		}
 		for _, p := range done {
 			up <- p
@@ -106,17 +105,16 @@ func preprocess(pre chan string, up chan string, logger *log.Logger) {
 }
 
 // TODO: use Tesseract API rather than calling the executable
-func ocr(toocr chan string, up chan string, logger *log.Logger) {
+func ocr(toocr chan string, up chan string, logger *log.Logger, errc chan error) {
 	for path := range toocr {
 		logger.Println("OCRing", path)
 		name := strings.Replace(path, ".png", "", 1) // TODO: handle any file extension
 		cmd := exec.Command("tesseract", "-l", training, path, name, "hocr")
 		err := cmd.Run()
 		if err != nil {
-			// TODO: have error channel to signal that things are screwy, which
-			// can close channels and stop the heartbeat, rather than just kill
-			// the whole program
-			log.Fatalln("Error ocring", path, err)
+			close(up)
+			errc <- errors.New(fmt.Sprintf("Error ocring %s: %s", path, err))
+			return
 		}
 		up <- name + ".hocr"
 	}
@@ -138,13 +136,14 @@ func preProcBook(msg Qmsg, conn Pipeliner) error {
 
 	dl := make(chan string)
 	pre := make(chan string)
-	upc := make(chan string) // TODO: rename
-	done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated
+	upc := make(chan string)
+	done := make(chan bool)
+	errc := make(chan error)
 
 	// these functions will do their jobs when their channels have data
-	go download(dl, pre, conn, d)
-	go preprocess(pre, upc, conn.Logger())
-	go up(upc, done, conn, bookname)
+	go download(dl, pre, conn, d, errc)
+	go preprocess(pre, upc, conn.Logger(), errc)
+	go up(upc, done, conn, bookname, errc)
 
 	conn.Logger().Println("Getting list of objects to download")
 	todl, err := conn.ListToPreprocess(bookname)
@@ -157,8 +156,14 @@ func preProcBook(msg Qmsg, conn Pipeliner) error {
 		dl <- d
 	}
 
-	// wait for the done channel to be posted to
-	<-done
+	// wait for either the done or errc channel to be sent to
+	select {
+		case err = <-errc:
+			t.Stop()
+			_ = os.RemoveAll(d)
+			return err
+		case <- done:
+	}
 
 	conn.Logger().Println("Sending", bookname, "to OCR queue")
 	err = conn.AddToOCRQueue(bookname)
@@ -202,11 +207,12 @@ func ocrBook(msg Qmsg, conn Pipeliner) error {
 	ocrc := make(chan string)
 	upc := make(chan string) // TODO: rename
 	done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated
+	errc := make(chan error)
 
 	// these functions will do their jobs when their channels have data
-	go download(dl, ocrc, conn, d)
-	go ocr(ocrc, upc, conn.Logger())
-	go up(upc, done, conn, bookname)
+	go download(dl, ocrc, conn, d, errc)
+	go ocr(ocrc, upc, conn.Logger(), errc)
+	go up(upc, done, conn, bookname, errc)
 
 	conn.Logger().Println("Getting list of objects to download")
 	todl, err := conn.ListToOCR(bookname)
@@ -219,8 +225,14 @@ func ocrBook(msg Qmsg, conn Pipeliner) error {
 		dl <- a
 	}
 
-	// wait for the done channel to be posted to
-	<-done
+	// wait for either the done or errc channel to be sent to
+	select {
+		case err = <-errc:
+			t.Stop()
+			_ = os.RemoveAll(d)
+			return err
+		case <- done:
+	}
 
 	conn.Logger().Println("Sending", bookname, "to analyse queue")
 	err = conn.AddToAnalyseQueue(bookname)
-- 
cgit v1.2.1-24-ge1ad