diff options
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/bookpipeline/main.go | 50 |
1 files changed, 40 insertions, 10 deletions
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index d70dbc7..3a539c1 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -35,6 +35,7 @@ one is found this general process is followed: ` const PauseBetweenChecks = 3 * time.Minute +const PauseBetweenOCRPageChecks = 1 * time.Second const HeartbeatTime = 60 // null writer to enable non-verbose logging to be discarded @@ -100,6 +101,31 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha done <- true } +func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { + for path := range c { + name := filepath.Base(path) + key := filepath.Join(bookname, name) + logger.Println("Uploading", key) + err := conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + conn.GetLogger().Println("Adding", key, "to queue", toQueue) + err = conn.AddToQueue(toQueue, key) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + } + + done <- true +} + func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { for path := range pre { logger.Println("Preprocessing", path) @@ -373,10 +399,10 @@ func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, ch case m, ok := <-msgc: if ok { msg = m - conn.GetLogger().Println("Using new message handle to delete message from old queue") + conn.GetLogger().Println("Using new message handle to delete message from queue") } default: - conn.GetLogger().Println("Using original message handle to delete message from old queue") + conn.GetLogger().Println("Using original message handle to delete message from queue") } conn.GetLogger().Println("Deleting original message from queue", fromQueue) @@ -416,7 +442,11 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string // these functions will do their jobs when their channels have data go download(dl, processc, conn, d, errc, conn.GetLogger()) go process(processc, upc, errc, conn.GetLogger()) - go up(upc, done, conn, bookname, errc, conn.GetLogger()) + if toQueue == conn.OCRPageQueueId() { + go upAndQueue(upc, done, toQueue, conn, bookname, errc, conn.GetLogger()) + } else { + go up(upc, done, conn, bookname, errc, conn.GetLogger()) + } conn.GetLogger().Println("Getting list of objects to download") objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) @@ -447,7 +477,8 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string case <-done: } - if toQueue != "" { + if toQueue != "" && toQueue != conn.OCRPageQueueId() { + go upAndQueue(upc, done, toQueue, conn, bookname, errc, conn.GetLogger()) conn.GetLogger().Println("Sending", bookname, "to queue", toQueue) err = conn.AddToQueue(toQueue, bookname) if err != nil { @@ -464,10 +495,10 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string case m, ok := <-msgc: if ok { msg = m - conn.GetLogger().Println("Using new message handle to delete message from old queue") + conn.GetLogger().Println("Using new message handle to delete message from queue") } default: - conn.GetLogger().Println("Using original message handle to delete message from old queue") + conn.GetLogger().Println("Using original message handle to delete message from queue") } conn.GetLogger().Println("Deleting original message from queue", fromQueue) @@ -558,7 +589,7 @@ func main() { continue } verboselog.Println("Message received on preprocess queue, processing", msg.Body) - err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId()) + err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) if err != nil { log.Println("Error during preprocess", err) } @@ -574,19 +605,18 @@ func main() { continue } verboselog.Println("Message received on wipeonly queue, processing", msg.Body) - err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRQueueId()) + err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) if err != nil { log.Println("Error during wipe", err) } case <-checkOCRPageQueue: msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatTime*2) - checkOCRPageQueue = time.After(PauseBetweenChecks) + checkOCRPageQueue = time.After(PauseBetweenOCRPageChecks) if err != nil { log.Println("Error checking OCR Page queue", err) continue } if msg.Handle == "" { - verboselog.Println("No message received on OCR Page queue, sleeping") continue } verboselog.Println("Message received on OCR Page queue, processing", msg.Body) |