summaryrefslogtreecommitdiff
path: root/bookpipeline/cmd/bookpipeline/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'bookpipeline/cmd/bookpipeline/main.go')
-rw-r--r--bookpipeline/cmd/bookpipeline/main.go38
1 files changed, 24 insertions, 14 deletions
diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go
index c7dde5b..913ccc7 100644
--- a/bookpipeline/cmd/bookpipeline/main.go
+++ b/bookpipeline/cmd/bookpipeline/main.go
@@ -52,10 +52,10 @@ type Clouder interface {
ListObjects(bucket string, prefix string) ([]string, error)
Download(bucket string, key string, fn string) error
Upload(bucket string, key string, path string) error
- CheckQueue(url string) (bookpipeline.Qmsg, error)
+ CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
AddToQueue(url string, msg string) error
DelFromQueue(url string, handle string) error
- QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error
+ QueueHeartbeat(msgHandle string, qurl string, duration int64) error
}
type Pipeliner interface {
@@ -225,24 +225,34 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log
close(up)
}
+func heartbeat(conn Pipeliner, t *time.Ticker, msg string, queue string, errc chan error) {
+ for range t.C {
+ err := conn.QueueHeartbeat(msg, queue, HeartbeatTime * 2)
+ if err != nil {
+ errc <- err
+ t.Stop()
+ return
+ }
+ }
+}
+
func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
- bookname := msg.Body
+ dl := make(chan string)
+ processc := make(chan string)
+ upc := make(chan string)
+ done := make(chan bool)
+ errc := make(chan error)
- t := time.NewTicker(HeartbeatTime * time.Second)
- go conn.QueueHeartbeat(t, msg.Handle, fromQueue)
+ bookname := msg.Body
d := filepath.Join(os.TempDir(), bookname)
err := os.MkdirAll(d, 0755)
if err != nil {
- t.Stop()
return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err))
}
- dl := make(chan string)
- processc := make(chan string)
- upc := make(chan string)
- done := make(chan bool)
- errc := make(chan error)
+ t := time.NewTicker(HeartbeatTime * time.Second)
+ go heartbeat(conn, t, msg.Handle, fromQueue, errc)
// these functions will do their jobs when their channels have data
go download(dl, processc, conn, d, errc, conn.GetLogger())
@@ -346,7 +356,7 @@ func main() {
for {
select {
case <-checkPreQueue:
- msg, err := conn.CheckQueue(conn.PreQueueId())
+ msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatTime * 2)
checkPreQueue = time.After(PauseBetweenChecks)
if err != nil {
log.Println("Error checking preprocess queue", err)
@@ -362,7 +372,7 @@ func main() {
log.Println("Error during preprocess", err)
}
case <-checkOCRQueue:
- msg, err := conn.CheckQueue(conn.OCRQueueId())
+ msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime * 2)
checkOCRQueue = time.After(PauseBetweenChecks)
if err != nil {
log.Println("Error checking OCR queue", err)
@@ -378,7 +388,7 @@ func main() {
log.Println("Error during OCR process", err)
}
case <-checkAnalyseQueue:
- msg, err := conn.CheckQueue(conn.AnalyseQueueId())
+ msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatTime * 2)
checkAnalyseQueue = time.After(PauseBetweenChecks)
if err != nil {
log.Println("Error checking analyse queue", err)