summaryrefslogtreecommitdiff
path: root/bookpipeline
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-09-14 08:24:27 +0100
committerNick White <git@njw.name>2019-09-14 08:24:27 +0100
commit7b79288b55077c5f378b5f5e45f2ac8fd3be76ee (patch)
treec3d86dc52b1a6ef23b307c91cd263a020bd499bf /bookpipeline
parentf0b9f708496bb5335102846b65c1880596ee2ad3 (diff)
Ensure enough time has elapsed before looking for the message to reget in the case of heartbeat running out
Diffstat (limited to 'bookpipeline')
-rw-r--r--bookpipeline/aws.go3
-rw-r--r--bookpipeline/cmd/bookpipeline/main.go1
2 files changed, 4 insertions, 0 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index e7ecd07..d51e4ea 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -5,6 +5,7 @@ import (
"fmt"
"log"
"os"
+ "time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
@@ -131,6 +132,8 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e
// Check if the visibility timeout has exceeded the maximum allowed,
// and if so try to find the message again to get a new handle.
if ok && aerr.Code() == "InvalidParameterValue" {
+ // Wait for existing visibilitytimeout to expire
+ time.Sleep(time.Duration(duration) * time.Second)
// Try 3 times to find the message
for range [3]bool{} {
msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go
index 610b969..f1309ca 100644
--- a/bookpipeline/cmd/bookpipeline/main.go
+++ b/bookpipeline/cmd/bookpipeline/main.go
@@ -242,6 +242,7 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri
return
}
if m.Id != "" {
+ conn.GetLogger().Println("Replaced message handle as visibilitytimeout limit was reached")
currentmsg = m
// TODO: maybe handle communicating new msg more gracefully than this
for range msgc {