From c47e27ee5146b8c2dcf058ed58270ce691b43ff7 Mon Sep 17 00:00:00 2001 From: Nick White Date: Wed, 4 Sep 2019 20:40:07 +0100 Subject: Rewrite heartbeat so errors during it will be reported, and the aws api doesn't rely on channels --- bookpipeline/cmd/bookpipeline/main.go | 38 ++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 14 deletions(-) (limited to 'bookpipeline/cmd/bookpipeline/main.go') diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go index c7dde5b..913ccc7 100644 --- a/bookpipeline/cmd/bookpipeline/main.go +++ b/bookpipeline/cmd/bookpipeline/main.go @@ -52,10 +52,10 @@ type Clouder interface { ListObjects(bucket string, prefix string) ([]string, error) Download(bucket string, key string, fn string) error Upload(bucket string, key string, path string) error - CheckQueue(url string) (bookpipeline.Qmsg, error) + CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) AddToQueue(url string, msg string) error DelFromQueue(url string, handle string) error - QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error + QueueHeartbeat(msgHandle string, qurl string, duration int64) error } type Pipeliner interface { @@ -225,24 +225,34 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log close(up) } +func heartbeat(conn Pipeliner, t *time.Ticker, msg string, queue string, errc chan error) { + for range t.C { + err := conn.QueueHeartbeat(msg, queue, HeartbeatTime * 2) + if err != nil { + errc <- err + t.Stop() + return + } + } +} + func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { - bookname := msg.Body + dl := make(chan string) + processc := make(chan string) + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) - t := time.NewTicker(HeartbeatTime * time.Second) - go conn.QueueHeartbeat(t, msg.Handle, fromQueue) + bookname := msg.Body d := filepath.Join(os.TempDir(), bookname) err := os.MkdirAll(d, 0755) if err != nil { - t.Stop() return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err)) } - dl := make(chan string) - processc := make(chan string) - upc := make(chan string) - done := make(chan bool) - errc := make(chan error) + t := time.NewTicker(HeartbeatTime * time.Second) + go heartbeat(conn, t, msg.Handle, fromQueue, errc) // these functions will do their jobs when their channels have data go download(dl, processc, conn, d, errc, conn.GetLogger()) @@ -346,7 +356,7 @@ func main() { for { select { case <-checkPreQueue: - msg, err := conn.CheckQueue(conn.PreQueueId()) + msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatTime * 2) checkPreQueue = time.After(PauseBetweenChecks) if err != nil { log.Println("Error checking preprocess queue", err) @@ -362,7 +372,7 @@ func main() { log.Println("Error during preprocess", err) } case <-checkOCRQueue: - msg, err := conn.CheckQueue(conn.OCRQueueId()) + msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime * 2) checkOCRQueue = time.After(PauseBetweenChecks) if err != nil { log.Println("Error checking OCR queue", err) @@ -378,7 +388,7 @@ func main() { log.Println("Error during OCR process", err) } case <-checkAnalyseQueue: - msg, err := conn.CheckQueue(conn.AnalyseQueueId()) + msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatTime * 2) checkAnalyseQueue = time.After(PauseBetweenChecks) if err != nil { log.Println("Error checking analyse queue", err) -- cgit v1.2.1-24-ge1ad