diff options
author | Nick White <git@njw.name> | 2019-10-28 15:51:44 +0000 |
---|---|---|
committer | Nick White <git@njw.name> | 2019-10-28 16:04:03 +0000 |
commit | ffbda7e158e21b2889b1726ae398f3822a3f51df (patch) | |
tree | 85941ec0676be2215136312a18edfa13a07f4349 | |
parent | 01c99da2c2c7960d0cf6e0a49ede919948898a2c (diff) |
Try to fix heartbeat renew issue more fully
This approach first sets the remaining visibility timeout to zero.
This should ensure that the message is available to re-find as soon
as the process looks for it.
Correspondingly the delay between checks is much shorter, as there
shouldn't be a reason for much delay.
-rw-r--r-- | aws.go | 17 | ||||
-rw-r--r-- | cmd/bookpipeline/main.go | 4 |
2 files changed, 17 insertions, 4 deletions
@@ -17,7 +17,7 @@ import ( ) const PreprocPattern = `_bin[0-9].[0-9].png` -const heartbeatRetry = 20 +const heartbeatRetry = 100 type Qmsg struct { Id, Handle, Body string @@ -155,8 +155,17 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e // Check if the visibility timeout has exceeded the maximum allowed, // and if so try to find the message again to get a new handle. if ok && aerr.Code() == "InvalidParameterValue" { + // First try to set the visibilitytimeout to zero to immediately + // make the message available to receive + z := int64(0) + _, _ = a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ + ReceiptHandle: &msg.Handle, + QueueUrl: &qurl, + VisibilityTimeout: &z, + }) + // Try heartbeatRetry times to find the message - for range [heartbeatRetry]bool{} { + for i := 0; i < heartbeatRetry; i++ { msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ MaxNumberOfMessages: aws.Int64(10), VisibilityTimeout: &duration, @@ -175,8 +184,8 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e }, nil } } - // Wait a little in case existing visibilitytimeout needs to expire - time.Sleep((2 * time.Duration(duration) / heartbeatRetry) * time.Second) + // Wait a second before trying again + time.Sleep(time.Second) } return Qmsg{}, errors.New("Heartbeat error failed to find message to update heartbeat") } else { diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 2179549..f7da588 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -255,6 +255,10 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri for range t.C { m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatTime*2) if err != nil { + // TODO: would be better to ensure this error stops any running + // processes, as they will ultimately fail in the case of + // it. could do this by setting a global variable that + // processes check each time they loop. errc <- err t.Stop() return |