diff options
| -rw-r--r-- | pipelinepreprocess/main.go | 70 | 
1 files changed, 41 insertions, 29 deletions
| diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go index 20a682b..197dd9c 100644 --- a/pipelinepreprocess/main.go +++ b/pipelinepreprocess/main.go @@ -1,8 +1,5 @@  package main  // TODO: have logs go somewhere useful, like email -// TODO: handle errors more smartly than just always fatal erroring -//       - read the sdk guarantees on retrying and ensure we retry some times before giving up if necessary -//       - cancel the current book processing rather than killing the program in the case of a nonrecoverable error   // TODO: check if images are prebinarised and if so skip multiple binarisation  import ( @@ -63,40 +60,42 @@ type Qmsg struct {  	Handle, Body string  } -func download(dl chan string, pre chan string, conn Pipeliner, dir string) { +func download(dl chan string, pre chan string, conn Pipeliner, dir string, errc chan error) {  	for key := range dl {  		fn := filepath.Join(dir, filepath.Base(key))  		err := conn.DownloadFromInProgress(key, fn)  		if err != nil { -			log.Fatalln("Failed to download", key, err) +			close(pre) +			errc <- err +			return  		}  		pre <- fn  	}  	close(pre)  } -func up(c chan string, done chan bool, conn Pipeliner, bookname string) { +func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error) {  	for path := range c {  		name := filepath.Base(path)  		key := filepath.Join(bookname, name)  		err := conn.UploadToInProgress(key, path)  		if err != nil { -			log.Fatalln("Failed to upload", path, err) +			errc <- err +			return  		}  	}  	done <- true  } -func preprocess(pre chan string, up chan string, logger *log.Logger) { +func preprocess(pre chan string, up chan string, logger *log.Logger, errc chan error) {  	for path := range pre {  		logger.Println("Preprocessing", path)  		done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30)  		if err != nil { -			// TODO: have error channel to signal that things are screwy, which -			// can close channels and stop the heartbeat, rather than just kill -			// the whole program -			log.Fatalln("Error preprocessing", path, err) +			close(up) +			errc <- err +			return  		}  		for _, p := range done {  			up <- p @@ -106,17 +105,16 @@ func preprocess(pre chan string, up chan string, logger *log.Logger) {  }  // TODO: use Tesseract API rather than calling the executable -func ocr(toocr chan string, up chan string, logger *log.Logger) { +func ocr(toocr chan string, up chan string, logger *log.Logger, errc chan error) {  	for path := range toocr {  		logger.Println("OCRing", path)  		name := strings.Replace(path, ".png", "", 1) // TODO: handle any file extension  		cmd := exec.Command("tesseract", "-l", training, path, name, "hocr")  		err := cmd.Run()  		if err != nil { -			// TODO: have error channel to signal that things are screwy, which -			// can close channels and stop the heartbeat, rather than just kill -			// the whole program -			log.Fatalln("Error ocring", path, err) +			close(up) +			errc <- errors.New(fmt.Sprintf("Error ocring %s: %s", path, err)) +			return  		}  		up <- name + ".hocr"  	} @@ -138,13 +136,14 @@ func preProcBook(msg Qmsg, conn Pipeliner) error {  	dl := make(chan string)  	pre := make(chan string) -	upc := make(chan string) // TODO: rename -	done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated +	upc := make(chan string) +	done := make(chan bool) +	errc := make(chan error)  	// these functions will do their jobs when their channels have data -	go download(dl, pre, conn, d) -	go preprocess(pre, upc, conn.Logger()) -	go up(upc, done, conn, bookname) +	go download(dl, pre, conn, d, errc) +	go preprocess(pre, upc, conn.Logger(), errc) +	go up(upc, done, conn, bookname, errc)  	conn.Logger().Println("Getting list of objects to download")  	todl, err := conn.ListToPreprocess(bookname) @@ -157,8 +156,14 @@ func preProcBook(msg Qmsg, conn Pipeliner) error {  		dl <- d  	} -	// wait for the done channel to be posted to -	<-done +	// wait for either the done or errc channel to be sent to +	select { +		case err = <-errc: +			t.Stop() +			_ = os.RemoveAll(d) +			return err +		case <- done: +	}  	conn.Logger().Println("Sending", bookname, "to OCR queue")  	err = conn.AddToOCRQueue(bookname) @@ -202,11 +207,12 @@ func ocrBook(msg Qmsg, conn Pipeliner) error {  	ocrc := make(chan string)  	upc := make(chan string) // TODO: rename  	done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated +	errc := make(chan error)  	// these functions will do their jobs when their channels have data -	go download(dl, ocrc, conn, d) -	go ocr(ocrc, upc, conn.Logger()) -	go up(upc, done, conn, bookname) +	go download(dl, ocrc, conn, d, errc) +	go ocr(ocrc, upc, conn.Logger(), errc) +	go up(upc, done, conn, bookname, errc)  	conn.Logger().Println("Getting list of objects to download")  	todl, err := conn.ListToOCR(bookname) @@ -219,8 +225,14 @@ func ocrBook(msg Qmsg, conn Pipeliner) error {  		dl <- a  	} -	// wait for the done channel to be posted to -	<-done +	// wait for either the done or errc channel to be sent to +	select { +		case err = <-errc: +			t.Stop() +			_ = os.RemoveAll(d) +			return err +		case <- done: +	}  	conn.Logger().Println("Sending", bookname, "to analyse queue")  	err = conn.AddToAnalyseQueue(bookname) | 
