diff options
Diffstat (limited to 'pipelinepreprocess')
| -rw-r--r-- | pipelinepreprocess/main.go | 70 | 
1 files changed, 40 insertions, 30 deletions
| diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go index b513f92..20a682b 100644 --- a/pipelinepreprocess/main.go +++ b/pipelinepreprocess/main.go @@ -6,6 +6,8 @@ package main  // TODO: check if images are prebinarised and if so skip multiple binarisation  import ( +	"errors" +	"fmt"  	"log"  	"os"  	"os/exec" @@ -28,8 +30,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) {  const PauseBetweenChecks = 60 * time.Second -// TODO: consider having the download etc functions return a channel like a generator, like in rob pike's talk -  type Clouder interface {  	Init() error  	ListObjects(bucket string, prefix string) ([]string, error) @@ -123,7 +123,7 @@ func ocr(toocr chan string, up chan string, logger *log.Logger) {  	close(up)  } -func preProcBook(msg Qmsg, conn Pipeliner) { +func preProcBook(msg Qmsg, conn Pipeliner) error {  	bookname := msg.Body  	t := time.NewTicker(HeartbeatTime * time.Second) @@ -132,9 +132,8 @@ func preProcBook(msg Qmsg, conn Pipeliner) {  	d := filepath.Join(os.TempDir(), bookname)  	err := os.MkdirAll(d, 0755)  	if err != nil { -		log.Println("Failed to create directory", d, err)  		t.Stop() -		return +		return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err))  	}  	dl := make(chan string) @@ -150,9 +149,9 @@ func preProcBook(msg Qmsg, conn Pipeliner) {  	conn.Logger().Println("Getting list of objects to download")  	todl, err := conn.ListToPreprocess(bookname)  	if err != nil { -		log.Println("Failed to get list of files for book", bookname, err)  		t.Stop() -		return +		_ = os.RemoveAll(d) +		return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err))  	}  	for _, d := range todl {  		dl <- d @@ -164,9 +163,9 @@ func preProcBook(msg Qmsg, conn Pipeliner) {  	conn.Logger().Println("Sending", bookname, "to OCR queue")  	err = conn.AddToOCRQueue(bookname)  	if err != nil { -		log.Println("Error adding to ocr queue", bookname, err)  		t.Stop() -		return +		_ = os.RemoveAll(d) +		return errors.New(fmt.Sprintf("Error adding to ocr queue %s: %s", bookname, err))  	}  	t.Stop() @@ -174,16 +173,19 @@ func preProcBook(msg Qmsg, conn Pipeliner) {  	conn.Logger().Println("Deleting original message from preprocessing queue")  	err = conn.DelFromPreQueue(msg.Handle)  	if err != nil { -		log.Println("Error deleting message from preprocessing queue", err) +		_ = os.RemoveAll(d) +		return errors.New(fmt.Sprintf("Error deleting message from preprocessing queue: %s", err))  	}  	err = os.RemoveAll(d)  	if err != nil { -		log.Println("Failed to remove directory", d, err) +		return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err))  	} + +	return nil  } -func ocrBook(msg Qmsg, conn Pipeliner) { +func ocrBook(msg Qmsg, conn Pipeliner) error {  	bookname := msg.Body  	t := time.NewTicker(HeartbeatTime * time.Second) @@ -192,9 +194,8 @@ func ocrBook(msg Qmsg, conn Pipeliner) {  	d := filepath.Join(os.TempDir(), bookname)  	err := os.MkdirAll(d, 0755)  	if err != nil { -		log.Println("Failed to create directory", d, err)  		t.Stop() -		return +		return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err))  	}  	dl := make(chan string) @@ -210,12 +211,12 @@ func ocrBook(msg Qmsg, conn Pipeliner) {  	conn.Logger().Println("Getting list of objects to download")  	todl, err := conn.ListToOCR(bookname)  	if err != nil { -		log.Println("Failed to get list of files for book", bookname, err)  		t.Stop() -		return +		_ = os.RemoveAll(d) +		return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err))  	} -	for _, d := range todl { -		dl <- d +	for _, a := range todl { +		dl <- a  	}  	// wait for the done channel to be posted to @@ -224,9 +225,9 @@ func ocrBook(msg Qmsg, conn Pipeliner) {  	conn.Logger().Println("Sending", bookname, "to analyse queue")  	err = conn.AddToAnalyseQueue(bookname)  	if err != nil { -		log.Println("Error adding to analyse queue", bookname, err)  		t.Stop() -		return +		_ = os.RemoveAll(d) +		return errors.New(fmt.Sprintf("Error adding to analyse queue %s: %s", bookname, err))  	}  	t.Stop() @@ -234,13 +235,16 @@ func ocrBook(msg Qmsg, conn Pipeliner) {  	conn.Logger().Println("Deleting original message from OCR queue")  	err = conn.DelFromOCRQueue(msg.Handle)  	if err != nil { -		log.Println("Error deleting message from OCR queue", err) +		_ = os.RemoveAll(d) +		return errors.New(fmt.Sprintf("Error deleting message from OCR queue: %s", err))  	}  	err = os.RemoveAll(d)  	if err != nil { -		log.Println("Failed to remove directory", d, err) +		return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err))  	} + +	return nil  }  func main() { @@ -271,36 +275,42 @@ func main() {  	checkPreQueue = time.After(0)  	checkOCRQueue = time.After(0) -	// TODO: use a buffer or something to limit number of running processes -	//       could start preprocbook / ocrbook and just have them listen on -	//       channels for stuff to do, that way they'd do things one at a time -	// TODO: don't trigger the checkOCRQueue until a running thing has finished  	for {  		select {  		case <- checkPreQueue:  			msg, err := conn.CheckPreQueue() -			checkPreQueue = time.After(PauseBetweenChecks)  			if err != nil {  				log.Println("Error checking preprocess queue", err) +				checkPreQueue = time.After(PauseBetweenChecks)  				continue  			}  			if msg.Handle == "" {  				verboselog.Println("No message received on preprocess queue, sleeping") +				checkPreQueue = time.After(PauseBetweenChecks)  				continue  			} -			go preProcBook(msg, conn) +			err = preProcBook(msg, conn) +			if err != nil { +				log.Println("Error during preprocess", err) +			} +			checkPreQueue = time.After(0)  		case <- checkOCRQueue:  			msg, err := conn.CheckOCRQueue() -			//checkOCRQueue = time.After(PauseBetweenChecks)  			if err != nil {  				log.Println("Error checking OCR queue", err) +				checkOCRQueue = time.After(PauseBetweenChecks)  				continue  			}  			if msg.Handle == "" {  				verboselog.Println("No message received on OCR queue, sleeping") +				checkOCRQueue = time.After(PauseBetweenChecks)  				continue  			} -			go ocrBook(msg, conn) +			err = ocrBook(msg, conn) +			if err != nil { +				log.Println("Error during OCR process", err) +			} +			checkOCRQueue = time.After(0)  		}  	}  } | 
