From c47e27ee5146b8c2dcf058ed58270ce691b43ff7 Mon Sep 17 00:00:00 2001 From: Nick White Date: Wed, 4 Sep 2019 20:40:07 +0100 Subject: Rewrite heartbeat so errors during it will be reported, and the aws api doesn't rely on channels --- bookpipeline/aws.go | 25 +++++++++-------------- bookpipeline/cmd/bookpipeline/main.go | 38 ++++++++++++++++++++++------------- 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index 7409434..a111ebf 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -5,7 +5,6 @@ import ( "fmt" "log" "os" - "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -15,7 +14,6 @@ import ( ) const PreprocPattern = `_bin[0-9].[0-9].png` -const HeartbeatTime = 60 type Qmsg struct { Handle, Body string @@ -88,10 +86,10 @@ func (a *AwsConn) Init() error { return nil } -func (a *AwsConn) CheckQueue(url string) (Qmsg, error) { +func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) { msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ MaxNumberOfMessages: aws.Int64(1), - VisibilityTimeout: aws.Int64(HeartbeatTime * 2), + VisibilityTimeout: &timeout, WaitTimeSeconds: aws.Int64(20), QueueUrl: &url, }) @@ -108,17 +106,14 @@ func (a *AwsConn) CheckQueue(url string) (Qmsg, error) { } } -func (a *AwsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { - for _ = range t.C { - duration := int64(HeartbeatTime * 2) - _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ - ReceiptHandle: &msgHandle, - QueueUrl: &qurl, - VisibilityTimeout: &duration, - }) - if err != nil { - return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err)) - } +func (a *AwsConn) QueueHeartbeat(msgHandle string, qurl string, duration int64) error { + _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ + ReceiptHandle: &msgHandle, + QueueUrl: &qurl, + VisibilityTimeout: &duration, + }) + if err != nil { + return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err)) } return nil } diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go index c7dde5b..913ccc7 100644 --- a/bookpipeline/cmd/bookpipeline/main.go +++ b/bookpipeline/cmd/bookpipeline/main.go @@ -52,10 +52,10 @@ type Clouder interface { ListObjects(bucket string, prefix string) ([]string, error) Download(bucket string, key string, fn string) error Upload(bucket string, key string, path string) error - CheckQueue(url string) (bookpipeline.Qmsg, error) + CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) AddToQueue(url string, msg string) error DelFromQueue(url string, handle string) error - QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error + QueueHeartbeat(msgHandle string, qurl string, duration int64) error } type Pipeliner interface { @@ -225,24 +225,34 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log close(up) } +func heartbeat(conn Pipeliner, t *time.Ticker, msg string, queue string, errc chan error) { + for range t.C { + err := conn.QueueHeartbeat(msg, queue, HeartbeatTime * 2) + if err != nil { + errc <- err + t.Stop() + return + } + } +} + func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { - bookname := msg.Body + dl := make(chan string) + processc := make(chan string) + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) - t := time.NewTicker(HeartbeatTime * time.Second) - go conn.QueueHeartbeat(t, msg.Handle, fromQueue) + bookname := msg.Body d := filepath.Join(os.TempDir(), bookname) err := os.MkdirAll(d, 0755) if err != nil { - t.Stop() return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err)) } - dl := make(chan string) - processc := make(chan string) - upc := make(chan string) - done := make(chan bool) - errc := make(chan error) + t := time.NewTicker(HeartbeatTime * time.Second) + go heartbeat(conn, t, msg.Handle, fromQueue, errc) // these functions will do their jobs when their channels have data go download(dl, processc, conn, d, errc, conn.GetLogger()) @@ -346,7 +356,7 @@ func main() { for { select { case <-checkPreQueue: - msg, err := conn.CheckQueue(conn.PreQueueId()) + msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatTime * 2) checkPreQueue = time.After(PauseBetweenChecks) if err != nil { log.Println("Error checking preprocess queue", err) @@ -362,7 +372,7 @@ func main() { log.Println("Error during preprocess", err) } case <-checkOCRQueue: - msg, err := conn.CheckQueue(conn.OCRQueueId()) + msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime * 2) checkOCRQueue = time.After(PauseBetweenChecks) if err != nil { log.Println("Error checking OCR queue", err) @@ -378,7 +388,7 @@ func main() { log.Println("Error during OCR process", err) } case <-checkAnalyseQueue: - msg, err := conn.CheckQueue(conn.AnalyseQueueId()) + msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatTime * 2) checkAnalyseQueue = time.After(PauseBetweenChecks) if err != nil { log.Println("Error checking analyse queue", err) -- cgit v1.2.1-24-ge1ad