summaryrefslogtreecommitdiff
path: root/bookpipeline/aws.go
diff options
context:
space:
mode:
Diffstat (limited to 'bookpipeline/aws.go')
-rw-r--r--bookpipeline/aws.go54
1 files changed, 48 insertions, 6 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index a956cf1..e7ecd07 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -7,6 +7,7 @@ import (
"os"
"github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
@@ -16,7 +17,7 @@ import (
const PreprocPattern = `_bin[0-9].[0-9].png`
type Qmsg struct {
- Handle, Body string
+ Id, Handle, Body string
}
type AwsConn struct {
@@ -98,7 +99,9 @@ func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) {
}
if len(msgResult.Messages) > 0 {
- msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body}
+ msg := Qmsg{Id: *msgResult.Messages[0].MessageId,
+ Handle: *msgResult.Messages[0].ReceiptHandle,
+ Body: *msgResult.Messages[0].Body}
a.Logger.Println("Message received:", msg.Body)
return msg, nil
} else {
@@ -106,16 +109,55 @@ func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) {
}
}
-func (a *AwsConn) QueueHeartbeat(msgHandle string, qurl string, duration int64) error {
+// QueueHeartbeat updates the visibility timeout of a message. This
+// ensures that the message remains "in flight", meaning that it
+// cannot be seen by other processes, but if this process fails the
+// timeout will expire and it will go back to being available for
+// any other process to retrieve and process.
+//
+// SQS only allows messages to be "in flight" for up to 12 hours, so
+// this will detect if the request for an update to visibility timeout
+// fails, and if so will attempt to find the message on the queue, and
+// return it, as the handle will have changed.
+func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, error) {
_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
- ReceiptHandle: &msgHandle,
+ ReceiptHandle: &msg.Handle,
QueueUrl: &qurl,
VisibilityTimeout: &duration,
})
if err != nil {
- return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err))
+ aerr, ok := err.(awserr.Error)
+
+ // Check if the visibility timeout has exceeded the maximum allowed,
+ // and if so try to find the message again to get a new handle.
+ if ok && aerr.Code() == "InvalidParameterValue" {
+ // Try 3 times to find the message
+ for range [3]bool{} {
+ msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
+ MaxNumberOfMessages: aws.Int64(10),
+ VisibilityTimeout: &duration,
+ WaitTimeSeconds: aws.Int64(20),
+ QueueUrl: &qurl,
+ })
+ if err != nil {
+ return Qmsg{}, errors.New(fmt.Sprintf("Heartbeat error looking for message to update heartbeat: %s", err))
+ }
+ for _, m := range msgResult.Messages {
+ if *m.MessageId == msg.Id {
+ return Qmsg{
+ Id: *m.MessageId,
+ Handle: *m.ReceiptHandle,
+ Body: *m.Body,
+ }, nil
+ }
+ }
+ }
+ return Qmsg{}, errors.New("Heartbeat error failed to find message to update heartbeat")
+ } else {
+ return Qmsg{}, errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err))
+ }
}
- return nil
+ return Qmsg{}, nil
}
func (a *AwsConn) PreQueueId() string {