summaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
Diffstat (limited to 'cmd')
-rw-r--r--cmd/bookpipeline/main.go50
1 files changed, 40 insertions, 10 deletions
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go
index d70dbc7..3a539c1 100644
--- a/cmd/bookpipeline/main.go
+++ b/cmd/bookpipeline/main.go
@@ -35,6 +35,7 @@ one is found this general process is followed:
`
const PauseBetweenChecks = 3 * time.Minute
+const PauseBetweenOCRPageChecks = 1 * time.Second
const HeartbeatTime = 60
// null writer to enable non-verbose logging to be discarded
@@ -100,6 +101,31 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha
done <- true
}
+func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) {
+ for path := range c {
+ name := filepath.Base(path)
+ key := filepath.Join(bookname, name)
+ logger.Println("Uploading", key)
+ err := conn.Upload(conn.WIPStorageId(), key, path)
+ if err != nil {
+ for range c {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ conn.GetLogger().Println("Adding", key, "to queue", toQueue)
+ err = conn.AddToQueue(toQueue, key)
+ if err != nil {
+ for range c {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ }
+
+ done <- true
+}
+
func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) {
for path := range pre {
logger.Println("Preprocessing", path)
@@ -373,10 +399,10 @@ func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, ch
case m, ok := <-msgc:
if ok {
msg = m
- conn.GetLogger().Println("Using new message handle to delete message from old queue")
+ conn.GetLogger().Println("Using new message handle to delete message from queue")
}
default:
- conn.GetLogger().Println("Using original message handle to delete message from old queue")
+ conn.GetLogger().Println("Using original message handle to delete message from queue")
}
conn.GetLogger().Println("Deleting original message from queue", fromQueue)
@@ -416,7 +442,11 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
// these functions will do their jobs when their channels have data
go download(dl, processc, conn, d, errc, conn.GetLogger())
go process(processc, upc, errc, conn.GetLogger())
- go up(upc, done, conn, bookname, errc, conn.GetLogger())
+ if toQueue == conn.OCRPageQueueId() {
+ go upAndQueue(upc, done, toQueue, conn, bookname, errc, conn.GetLogger())
+ } else {
+ go up(upc, done, conn, bookname, errc, conn.GetLogger())
+ }
conn.GetLogger().Println("Getting list of objects to download")
objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
@@ -447,7 +477,8 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
case <-done:
}
- if toQueue != "" {
+ if toQueue != "" && toQueue != conn.OCRPageQueueId() {
+ go upAndQueue(upc, done, toQueue, conn, bookname, errc, conn.GetLogger())
conn.GetLogger().Println("Sending", bookname, "to queue", toQueue)
err = conn.AddToQueue(toQueue, bookname)
if err != nil {
@@ -464,10 +495,10 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
case m, ok := <-msgc:
if ok {
msg = m
- conn.GetLogger().Println("Using new message handle to delete message from old queue")
+ conn.GetLogger().Println("Using new message handle to delete message from queue")
}
default:
- conn.GetLogger().Println("Using original message handle to delete message from old queue")
+ conn.GetLogger().Println("Using original message handle to delete message from queue")
}
conn.GetLogger().Println("Deleting original message from queue", fromQueue)
@@ -558,7 +589,7 @@ func main() {
continue
}
verboselog.Println("Message received on preprocess queue, processing", msg.Body)
- err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId())
+ err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
if err != nil {
log.Println("Error during preprocess", err)
}
@@ -574,19 +605,18 @@ func main() {
continue
}
verboselog.Println("Message received on wipeonly queue, processing", msg.Body)
- err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRQueueId())
+ err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
if err != nil {
log.Println("Error during wipe", err)
}
case <-checkOCRPageQueue:
msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatTime*2)
- checkOCRPageQueue = time.After(PauseBetweenChecks)
+ checkOCRPageQueue = time.After(PauseBetweenOCRPageChecks)
if err != nil {
log.Println("Error checking OCR Page queue", err)
continue
}
if msg.Handle == "" {
- verboselog.Println("No message received on OCR Page queue, sleeping")
continue
}
verboselog.Println("Message received on OCR Page queue, processing", msg.Body)