diff options
-rw-r--r-- | aws.go | 18 |
1 files changed, 10 insertions, 8 deletions
@@ -17,7 +17,6 @@ import ( ) const PreprocPattern = `_bin[0-9].[0-9].png` -const heartbeatRetry = 100 type Qmsg struct { Id, Handle, Body string @@ -157,19 +156,17 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e if ok && aerr.Code() == "InvalidParameterValue" { // First try to set the visibilitytimeout to zero to immediately // make the message available to receive - z := int64(0) _, _ = a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ ReceiptHandle: &msg.Handle, QueueUrl: &qurl, - VisibilityTimeout: &z, + VisibilityTimeout: aws.Int64(0), }) - // Try heartbeatRetry times to find the message - for i := 0; i < heartbeatRetry; i++ { + for i := 0; i < int(duration) * 5; i++ { msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ MaxNumberOfMessages: aws.Int64(10), VisibilityTimeout: &duration, - WaitTimeSeconds: aws.Int64(20), + WaitTimeSeconds: aws.Int64(1), QueueUrl: &qurl, }) if err != nil { @@ -184,8 +181,13 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e }, nil } } - // Wait a second before trying again - time.Sleep(time.Second) + // Wait a second before trying again if the ReceiveMessage + // call succeeded but didn't contain our message (otherwise + // the WaitTimeSeconds will have applied and we will already + // have waited a second) + if len(msgResult.Messages) > 0 { + time.Sleep(time.Second) + } } return Qmsg{}, errors.New("Heartbeat error failed to find message to update heartbeat") } else { |