diff options
-rw-r--r-- | pipelinepreprocess/main.go | 70 |
1 files changed, 40 insertions, 30 deletions
diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go index b513f92..20a682b 100644 --- a/pipelinepreprocess/main.go +++ b/pipelinepreprocess/main.go @@ -6,6 +6,8 @@ package main // TODO: check if images are prebinarised and if so skip multiple binarisation import ( + "errors" + "fmt" "log" "os" "os/exec" @@ -28,8 +30,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) { const PauseBetweenChecks = 60 * time.Second -// TODO: consider having the download etc functions return a channel like a generator, like in rob pike's talk - type Clouder interface { Init() error ListObjects(bucket string, prefix string) ([]string, error) @@ -123,7 +123,7 @@ func ocr(toocr chan string, up chan string, logger *log.Logger) { close(up) } -func preProcBook(msg Qmsg, conn Pipeliner) { +func preProcBook(msg Qmsg, conn Pipeliner) error { bookname := msg.Body t := time.NewTicker(HeartbeatTime * time.Second) @@ -132,9 +132,8 @@ func preProcBook(msg Qmsg, conn Pipeliner) { d := filepath.Join(os.TempDir(), bookname) err := os.MkdirAll(d, 0755) if err != nil { - log.Println("Failed to create directory", d, err) t.Stop() - return + return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err)) } dl := make(chan string) @@ -150,9 +149,9 @@ func preProcBook(msg Qmsg, conn Pipeliner) { conn.Logger().Println("Getting list of objects to download") todl, err := conn.ListToPreprocess(bookname) if err != nil { - log.Println("Failed to get list of files for book", bookname, err) t.Stop() - return + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err)) } for _, d := range todl { dl <- d @@ -164,9 +163,9 @@ func preProcBook(msg Qmsg, conn Pipeliner) { conn.Logger().Println("Sending", bookname, "to OCR queue") err = conn.AddToOCRQueue(bookname) if err != nil { - log.Println("Error adding to ocr queue", bookname, err) t.Stop() - return + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Error adding to ocr queue %s: %s", bookname, err)) } t.Stop() @@ -174,16 +173,19 @@ func preProcBook(msg Qmsg, conn Pipeliner) { conn.Logger().Println("Deleting original message from preprocessing queue") err = conn.DelFromPreQueue(msg.Handle) if err != nil { - log.Println("Error deleting message from preprocessing queue", err) + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Error deleting message from preprocessing queue: %s", err)) } err = os.RemoveAll(d) if err != nil { - log.Println("Failed to remove directory", d, err) + return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err)) } + + return nil } -func ocrBook(msg Qmsg, conn Pipeliner) { +func ocrBook(msg Qmsg, conn Pipeliner) error { bookname := msg.Body t := time.NewTicker(HeartbeatTime * time.Second) @@ -192,9 +194,8 @@ func ocrBook(msg Qmsg, conn Pipeliner) { d := filepath.Join(os.TempDir(), bookname) err := os.MkdirAll(d, 0755) if err != nil { - log.Println("Failed to create directory", d, err) t.Stop() - return + return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err)) } dl := make(chan string) @@ -210,12 +211,12 @@ func ocrBook(msg Qmsg, conn Pipeliner) { conn.Logger().Println("Getting list of objects to download") todl, err := conn.ListToOCR(bookname) if err != nil { - log.Println("Failed to get list of files for book", bookname, err) t.Stop() - return + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err)) } - for _, d := range todl { - dl <- d + for _, a := range todl { + dl <- a } // wait for the done channel to be posted to @@ -224,9 +225,9 @@ func ocrBook(msg Qmsg, conn Pipeliner) { conn.Logger().Println("Sending", bookname, "to analyse queue") err = conn.AddToAnalyseQueue(bookname) if err != nil { - log.Println("Error adding to analyse queue", bookname, err) t.Stop() - return + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Error adding to analyse queue %s: %s", bookname, err)) } t.Stop() @@ -234,13 +235,16 @@ func ocrBook(msg Qmsg, conn Pipeliner) { conn.Logger().Println("Deleting original message from OCR queue") err = conn.DelFromOCRQueue(msg.Handle) if err != nil { - log.Println("Error deleting message from OCR queue", err) + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Error deleting message from OCR queue: %s", err)) } err = os.RemoveAll(d) if err != nil { - log.Println("Failed to remove directory", d, err) + return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err)) } + + return nil } func main() { @@ -271,36 +275,42 @@ func main() { checkPreQueue = time.After(0) checkOCRQueue = time.After(0) - // TODO: use a buffer or something to limit number of running processes - // could start preprocbook / ocrbook and just have them listen on - // channels for stuff to do, that way they'd do things one at a time - // TODO: don't trigger the checkOCRQueue until a running thing has finished for { select { case <- checkPreQueue: msg, err := conn.CheckPreQueue() - checkPreQueue = time.After(PauseBetweenChecks) if err != nil { log.Println("Error checking preprocess queue", err) + checkPreQueue = time.After(PauseBetweenChecks) continue } if msg.Handle == "" { verboselog.Println("No message received on preprocess queue, sleeping") + checkPreQueue = time.After(PauseBetweenChecks) continue } - go preProcBook(msg, conn) + err = preProcBook(msg, conn) + if err != nil { + log.Println("Error during preprocess", err) + } + checkPreQueue = time.After(0) case <- checkOCRQueue: msg, err := conn.CheckOCRQueue() - //checkOCRQueue = time.After(PauseBetweenChecks) if err != nil { log.Println("Error checking OCR queue", err) + checkOCRQueue = time.After(PauseBetweenChecks) continue } if msg.Handle == "" { verboselog.Println("No message received on OCR queue, sleeping") + checkOCRQueue = time.After(PauseBetweenChecks) continue } - go ocrBook(msg, conn) + err = ocrBook(msg, conn) + if err != nil { + log.Println("Error during OCR process", err) + } + checkOCRQueue = time.After(0) } } } |