summaryrefslogtreecommitdiff
path: root/aws.go
diff options
context:
space:
mode:
Diffstat (limited to 'aws.go')
-rw-r--r--aws.go18
1 files changed, 10 insertions, 8 deletions
diff --git a/aws.go b/aws.go
index e3381ef..4aea082 100644
--- a/aws.go
+++ b/aws.go
@@ -17,7 +17,6 @@ import (
)
const PreprocPattern = `_bin[0-9].[0-9].png`
-const heartbeatRetry = 100
type Qmsg struct {
Id, Handle, Body string
@@ -157,19 +156,17 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e
if ok && aerr.Code() == "InvalidParameterValue" {
// First try to set the visibilitytimeout to zero to immediately
// make the message available to receive
- z := int64(0)
_, _ = a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
ReceiptHandle: &msg.Handle,
QueueUrl: &qurl,
- VisibilityTimeout: &z,
+ VisibilityTimeout: aws.Int64(0),
})
- // Try heartbeatRetry times to find the message
- for i := 0; i < heartbeatRetry; i++ {
+ for i := 0; i < int(duration) * 5; i++ {
msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: &duration,
- WaitTimeSeconds: aws.Int64(20),
+ WaitTimeSeconds: aws.Int64(1),
QueueUrl: &qurl,
})
if err != nil {
@@ -184,8 +181,13 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e
}, nil
}
}
- // Wait a second before trying again
- time.Sleep(time.Second)
+ // Wait a second before trying again if the ReceiveMessage
+ // call succeeded but didn't contain our message (otherwise
+ // the WaitTimeSeconds will have applied and we will already
+ // have waited a second)
+ if len(msgResult.Messages) > 0 {
+ time.Sleep(time.Second)
+ }
}
return Qmsg{}, errors.New("Heartbeat error failed to find message to update heartbeat")
} else {