summaryrefslogtreecommitdiff
path: root/aws.go
diff options
context:
space:
mode:
Diffstat (limited to 'aws.go')
-rw-r--r--aws.go17
1 files changed, 13 insertions, 4 deletions
diff --git a/aws.go b/aws.go
index 5d66178..e3381ef 100644
--- a/aws.go
+++ b/aws.go
@@ -17,7 +17,7 @@ import (
)
const PreprocPattern = `_bin[0-9].[0-9].png`
-const heartbeatRetry = 20
+const heartbeatRetry = 100
type Qmsg struct {
Id, Handle, Body string
@@ -155,8 +155,17 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e
// Check if the visibility timeout has exceeded the maximum allowed,
// and if so try to find the message again to get a new handle.
if ok && aerr.Code() == "InvalidParameterValue" {
+ // First try to set the visibilitytimeout to zero to immediately
+ // make the message available to receive
+ z := int64(0)
+ _, _ = a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
+ ReceiptHandle: &msg.Handle,
+ QueueUrl: &qurl,
+ VisibilityTimeout: &z,
+ })
+
// Try heartbeatRetry times to find the message
- for range [heartbeatRetry]bool{} {
+ for i := 0; i < heartbeatRetry; i++ {
msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: &duration,
@@ -175,8 +184,8 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e
}, nil
}
}
- // Wait a little in case existing visibilitytimeout needs to expire
- time.Sleep((2 * time.Duration(duration) / heartbeatRetry) * time.Second)
+ // Wait a second before trying again
+ time.Sleep(time.Second)
}
return Qmsg{}, errors.New("Heartbeat error failed to find message to update heartbeat")
} else {