diff options
author | Nick White <git@njw.name> | 2019-09-16 14:57:14 +0100 |
---|---|---|
committer | Nick White <git@njw.name> | 2019-09-16 14:57:14 +0100 |
commit | 5fe9dc3a403cdfe0b41830aac16ef8b02f7e0e4e (patch) | |
tree | 0218985d518e23c7ea5a718dbbf1fad37a9ac6b8 /bookpipeline | |
parent | 7b79288b55077c5f378b5f5e45f2ac8fd3be76ee (diff) |
Be more careful to try to grab the message after a heartbeat failure more quickly
Rather than waiting for the whole length of a visibility timeout,
in which time another process may grab the message, instead wait
a short amount of time, each time the message is searched for.
Also add a bit more logging.
Diffstat (limited to 'bookpipeline')
-rw-r--r-- | bookpipeline/aws.go | 10 | ||||
-rw-r--r-- | bookpipeline/cmd/bookpipeline/main.go | 2 |
2 files changed, 8 insertions, 4 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index d51e4ea..4d32f3d 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -16,6 +16,7 @@ import ( ) const PreprocPattern = `_bin[0-9].[0-9].png` +const heartbeatRetry = 10 type Qmsg struct { Id, Handle, Body string @@ -132,10 +133,11 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e // Check if the visibility timeout has exceeded the maximum allowed, // and if so try to find the message again to get a new handle. if ok && aerr.Code() == "InvalidParameterValue" { - // Wait for existing visibilitytimeout to expire - time.Sleep(time.Duration(duration) * time.Second) - // Try 3 times to find the message - for range [3]bool{} { + // Try heartbeatRetry times to find the message + for range [heartbeatRetry]bool{} { + // Wait a little in case existing visibilitytimeout needs to expire + time.Sleep((time.Duration(duration) / heartbeatRetry) * time.Second) + msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ MaxNumberOfMessages: aws.Int64(10), VisibilityTimeout: &duration, diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go index f1309ca..f8b0b68 100644 --- a/bookpipeline/cmd/bookpipeline/main.go +++ b/bookpipeline/cmd/bookpipeline/main.go @@ -322,8 +322,10 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string case m, ok := <-msgc : if ok { msg = m + conn.GetLogger().Println("Using new message handle to delete message from old queue") } default: + conn.GetLogger().Println("Using original message handle to delete message from old queue") } conn.GetLogger().Println("Deleting original message from queue", fromQueue) |