summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-09-16 14:57:14 +0100
committerNick White <git@njw.name>2019-09-16 14:57:14 +0100
commit5fe9dc3a403cdfe0b41830aac16ef8b02f7e0e4e (patch)
tree0218985d518e23c7ea5a718dbbf1fad37a9ac6b8
parent7b79288b55077c5f378b5f5e45f2ac8fd3be76ee (diff)
Be more careful to try to grab the message after a heartbeat failure more quickly
Rather than waiting for the whole length of a visibility timeout, in which time another process may grab the message, instead wait a short amount of time, each time the message is searched for. Also add a bit more logging.
-rw-r--r--bookpipeline/aws.go10
-rw-r--r--bookpipeline/cmd/bookpipeline/main.go2
2 files changed, 8 insertions, 4 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index d51e4ea..4d32f3d 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -16,6 +16,7 @@ import (
)
const PreprocPattern = `_bin[0-9].[0-9].png`
+const heartbeatRetry = 10
type Qmsg struct {
Id, Handle, Body string
@@ -132,10 +133,11 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e
// Check if the visibility timeout has exceeded the maximum allowed,
// and if so try to find the message again to get a new handle.
if ok && aerr.Code() == "InvalidParameterValue" {
- // Wait for existing visibilitytimeout to expire
- time.Sleep(time.Duration(duration) * time.Second)
- // Try 3 times to find the message
- for range [3]bool{} {
+ // Try heartbeatRetry times to find the message
+ for range [heartbeatRetry]bool{} {
+ // Wait a little in case existing visibilitytimeout needs to expire
+ time.Sleep((time.Duration(duration) / heartbeatRetry) * time.Second)
+
msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: &duration,
diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go
index f1309ca..f8b0b68 100644
--- a/bookpipeline/cmd/bookpipeline/main.go
+++ b/bookpipeline/cmd/bookpipeline/main.go
@@ -322,8 +322,10 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
case m, ok := <-msgc :
if ok {
msg = m
+ conn.GetLogger().Println("Using new message handle to delete message from old queue")
}
default:
+ conn.GetLogger().Println("Using original message handle to delete message from old queue")
}
conn.GetLogger().Println("Deleting original message from queue", fromQueue)