summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-10-28 15:51:44 +0000
committerNick White <git@njw.name>2019-10-28 16:04:03 +0000
commitffbda7e158e21b2889b1726ae398f3822a3f51df (patch)
tree85941ec0676be2215136312a18edfa13a07f4349
parent01c99da2c2c7960d0cf6e0a49ede919948898a2c (diff)
Try to fix heartbeat renew issue more fully
This approach first sets the remaining visibility timeout to zero. This should ensure that the message is available to re-find as soon as the process looks for it. Correspondingly the delay between checks is much shorter, as there shouldn't be a reason for much delay.
-rw-r--r--aws.go17
-rw-r--r--cmd/bookpipeline/main.go4
2 files changed, 17 insertions, 4 deletions
diff --git a/aws.go b/aws.go
index 5d66178..e3381ef 100644
--- a/aws.go
+++ b/aws.go
@@ -17,7 +17,7 @@ import (
)
const PreprocPattern = `_bin[0-9].[0-9].png`
-const heartbeatRetry = 20
+const heartbeatRetry = 100
type Qmsg struct {
Id, Handle, Body string
@@ -155,8 +155,17 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e
// Check if the visibility timeout has exceeded the maximum allowed,
// and if so try to find the message again to get a new handle.
if ok && aerr.Code() == "InvalidParameterValue" {
+ // First try to set the visibilitytimeout to zero to immediately
+ // make the message available to receive
+ z := int64(0)
+ _, _ = a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
+ ReceiptHandle: &msg.Handle,
+ QueueUrl: &qurl,
+ VisibilityTimeout: &z,
+ })
+
// Try heartbeatRetry times to find the message
- for range [heartbeatRetry]bool{} {
+ for i := 0; i < heartbeatRetry; i++ {
msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: &duration,
@@ -175,8 +184,8 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e
}, nil
}
}
- // Wait a little in case existing visibilitytimeout needs to expire
- time.Sleep((2 * time.Duration(duration) / heartbeatRetry) * time.Second)
+ // Wait a second before trying again
+ time.Sleep(time.Second)
}
return Qmsg{}, errors.New("Heartbeat error failed to find message to update heartbeat")
} else {
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go
index 2179549..f7da588 100644
--- a/cmd/bookpipeline/main.go
+++ b/cmd/bookpipeline/main.go
@@ -255,6 +255,10 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri
for range t.C {
m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatTime*2)
if err != nil {
+ // TODO: would be better to ensure this error stops any running
+ // processes, as they will ultimately fail in the case of
+ // it. could do this by setting a global variable that
+ // processes check each time they loop.
errc <- err
t.Stop()
return