summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--aws.go17
-rw-r--r--cmd/bookpipeline/main.go4
2 files changed, 17 insertions, 4 deletions
diff --git a/aws.go b/aws.go
index 5d66178..e3381ef 100644
--- a/aws.go
+++ b/aws.go
@@ -17,7 +17,7 @@ import (
)
const PreprocPattern = `_bin[0-9].[0-9].png`
-const heartbeatRetry = 20
+const heartbeatRetry = 100
type Qmsg struct {
Id, Handle, Body string
@@ -155,8 +155,17 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e
// Check if the visibility timeout has exceeded the maximum allowed,
// and if so try to find the message again to get a new handle.
if ok && aerr.Code() == "InvalidParameterValue" {
+ // First try to set the visibilitytimeout to zero to immediately
+ // make the message available to receive
+ z := int64(0)
+ _, _ = a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
+ ReceiptHandle: &msg.Handle,
+ QueueUrl: &qurl,
+ VisibilityTimeout: &z,
+ })
+
// Try heartbeatRetry times to find the message
- for range [heartbeatRetry]bool{} {
+ for i := 0; i < heartbeatRetry; i++ {
msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: &duration,
@@ -175,8 +184,8 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e
}, nil
}
}
- // Wait a little in case existing visibilitytimeout needs to expire
- time.Sleep((2 * time.Duration(duration) / heartbeatRetry) * time.Second)
+ // Wait a second before trying again
+ time.Sleep(time.Second)
}
return Qmsg{}, errors.New("Heartbeat error failed to find message to update heartbeat")
} else {
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go
index 2179549..f7da588 100644
--- a/cmd/bookpipeline/main.go
+++ b/cmd/bookpipeline/main.go
@@ -255,6 +255,10 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri
for range t.C {
m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatTime*2)
if err != nil {
+ // TODO: would be better to ensure this error stops any running
+ // processes, as they will ultimately fail in the case of
+ // it. could do this by setting a global variable that
+ // processes check each time they loop.
errc <- err
t.Stop()
return