diff options
Diffstat (limited to 'bookpipeline')
-rw-r--r-- | bookpipeline/aws.go | 54 | ||||
-rw-r--r-- | bookpipeline/cmd/bookpipeline/main.go | 26 |
2 files changed, 70 insertions, 10 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index a956cf1..e7ecd07 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -7,6 +7,7 @@ import ( "os" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" @@ -16,7 +17,7 @@ import ( const PreprocPattern = `_bin[0-9].[0-9].png` type Qmsg struct { - Handle, Body string + Id, Handle, Body string } type AwsConn struct { @@ -98,7 +99,9 @@ func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) { } if len(msgResult.Messages) > 0 { - msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body} + msg := Qmsg{Id: *msgResult.Messages[0].MessageId, + Handle: *msgResult.Messages[0].ReceiptHandle, + Body: *msgResult.Messages[0].Body} a.Logger.Println("Message received:", msg.Body) return msg, nil } else { @@ -106,16 +109,55 @@ func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) { } } -func (a *AwsConn) QueueHeartbeat(msgHandle string, qurl string, duration int64) error { +// QueueHeartbeat updates the visibility timeout of a message. This +// ensures that the message remains "in flight", meaning that it +// cannot be seen by other processes, but if this process fails the +// timeout will expire and it will go back to being available for +// any other process to retrieve and process. +// +// SQS only allows messages to be "in flight" for up to 12 hours, so +// this will detect if the request for an update to visibility timeout +// fails, and if so will attempt to find the message on the queue, and +// return it, as the handle will have changed. +func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, error) { _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ - ReceiptHandle: &msgHandle, + ReceiptHandle: &msg.Handle, QueueUrl: &qurl, VisibilityTimeout: &duration, }) if err != nil { - return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err)) + aerr, ok := err.(awserr.Error) + + // Check if the visibility timeout has exceeded the maximum allowed, + // and if so try to find the message again to get a new handle. + if ok && aerr.Code() == "InvalidParameterValue" { + // Try 3 times to find the message + for range [3]bool{} { + msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ + MaxNumberOfMessages: aws.Int64(10), + VisibilityTimeout: &duration, + WaitTimeSeconds: aws.Int64(20), + QueueUrl: &qurl, + }) + if err != nil { + return Qmsg{}, errors.New(fmt.Sprintf("Heartbeat error looking for message to update heartbeat: %s", err)) + } + for _, m := range msgResult.Messages { + if *m.MessageId == msg.Id { + return Qmsg{ + Id: *m.MessageId, + Handle: *m.ReceiptHandle, + Body: *m.Body, + }, nil + } + } + } + return Qmsg{}, errors.New("Heartbeat error failed to find message to update heartbeat") + } else { + return Qmsg{}, errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err)) + } } - return nil + return Qmsg{}, nil } func (a *AwsConn) PreQueueId() string { diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go index 7ffacf8..1c2592b 100644 --- a/bookpipeline/cmd/bookpipeline/main.go +++ b/bookpipeline/cmd/bookpipeline/main.go @@ -54,7 +54,7 @@ type Clouder interface { CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) AddToQueue(url string, msg string) error DelFromQueue(url string, handle string) error - QueueHeartbeat(msgHandle string, qurl string, duration int64) error + QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) } type Pipeliner interface { @@ -232,19 +232,28 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log close(up) } -func heartbeat(conn Pipeliner, t *time.Ticker, msg string, queue string, errc chan error) { +func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { + currentmsg := msg for range t.C { - err := conn.QueueHeartbeat(msg, queue, HeartbeatTime*2) + m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatTime*2) if err != nil { errc <- err t.Stop() return } + if m.Id != "" { + currentmsg = m + // TODO: maybe handle communicating new msg more gracefully than this + for range msgc { + } // throw away any old msgc + msgc <- m + } } } func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { dl := make(chan string) + msgc := make(chan bookpipeline.Qmsg) processc := make(chan string) upc := make(chan string) done := make(chan bool) @@ -259,7 +268,7 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string } t := time.NewTicker(HeartbeatTime * time.Second) - go heartbeat(conn, t, msg.Handle, fromQueue, errc) + go heartbeat(conn, t, msg, fromQueue, msgc, errc) // these functions will do their jobs when their channels have data go download(dl, processc, conn, d, errc, conn.GetLogger()) @@ -307,6 +316,15 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string t.Stop() + // check whether we're using a newer msg handle + select { + case m, ok := <-msgc : + if ok { + msg = m + } + default: + } + conn.GetLogger().Println("Deleting original message from queue", fromQueue) err = conn.DelFromQueue(fromQueue, msg.Handle) if err != nil { |