diff options
Diffstat (limited to 'bookpipeline/aws.go')
-rw-r--r-- | bookpipeline/aws.go | 54 |
1 files changed, 48 insertions, 6 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index a956cf1..e7ecd07 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -7,6 +7,7 @@ import ( "os" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" @@ -16,7 +17,7 @@ import ( const PreprocPattern = `_bin[0-9].[0-9].png` type Qmsg struct { - Handle, Body string + Id, Handle, Body string } type AwsConn struct { @@ -98,7 +99,9 @@ func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) { } if len(msgResult.Messages) > 0 { - msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body} + msg := Qmsg{Id: *msgResult.Messages[0].MessageId, + Handle: *msgResult.Messages[0].ReceiptHandle, + Body: *msgResult.Messages[0].Body} a.Logger.Println("Message received:", msg.Body) return msg, nil } else { @@ -106,16 +109,55 @@ func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) { } } -func (a *AwsConn) QueueHeartbeat(msgHandle string, qurl string, duration int64) error { +// QueueHeartbeat updates the visibility timeout of a message. This +// ensures that the message remains "in flight", meaning that it +// cannot be seen by other processes, but if this process fails the +// timeout will expire and it will go back to being available for +// any other process to retrieve and process. +// +// SQS only allows messages to be "in flight" for up to 12 hours, so +// this will detect if the request for an update to visibility timeout +// fails, and if so will attempt to find the message on the queue, and +// return it, as the handle will have changed. +func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, error) { _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ - ReceiptHandle: &msgHandle, + ReceiptHandle: &msg.Handle, QueueUrl: &qurl, VisibilityTimeout: &duration, }) if err != nil { - return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err)) + aerr, ok := err.(awserr.Error) + + // Check if the visibility timeout has exceeded the maximum allowed, + // and if so try to find the message again to get a new handle. + if ok && aerr.Code() == "InvalidParameterValue" { + // Try 3 times to find the message + for range [3]bool{} { + msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ + MaxNumberOfMessages: aws.Int64(10), + VisibilityTimeout: &duration, + WaitTimeSeconds: aws.Int64(20), + QueueUrl: &qurl, + }) + if err != nil { + return Qmsg{}, errors.New(fmt.Sprintf("Heartbeat error looking for message to update heartbeat: %s", err)) + } + for _, m := range msgResult.Messages { + if *m.MessageId == msg.Id { + return Qmsg{ + Id: *m.MessageId, + Handle: *m.ReceiptHandle, + Body: *m.Body, + }, nil + } + } + } + return Qmsg{}, errors.New("Heartbeat error failed to find message to update heartbeat") + } else { + return Qmsg{}, errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err)) + } } - return nil + return Qmsg{}, nil } func (a *AwsConn) PreQueueId() string { |