summaryrefslogtreecommitdiff
path: root/bookpipeline
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-09-11 15:41:23 +0100
committerNick White <git@njw.name>2019-09-11 15:41:23 +0100
commit7d627f29e76225c99e5ab062a59cfddfbe8fa43c (patch)
treea81db1a129c3405d3a2f2d9cf3af13684bb37faf /bookpipeline
parentcde53a94fd0eeb65209e549d3e13486dd2681b7e (diff)
Work around the SQS limit of 12 hours of visibility timeout
This is done by checking for the error that is emitted in such a case, and if it's found trying several times to find the message back in the queue, and returning the message with an updated handle back to the caller to use in the future.
Diffstat (limited to 'bookpipeline')
-rw-r--r--bookpipeline/aws.go54
-rw-r--r--bookpipeline/cmd/bookpipeline/main.go26
2 files changed, 70 insertions, 10 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index a956cf1..e7ecd07 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -7,6 +7,7 @@ import (
"os"
"github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
@@ -16,7 +17,7 @@ import (
const PreprocPattern = `_bin[0-9].[0-9].png`
type Qmsg struct {
- Handle, Body string
+ Id, Handle, Body string
}
type AwsConn struct {
@@ -98,7 +99,9 @@ func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) {
}
if len(msgResult.Messages) > 0 {
- msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body}
+ msg := Qmsg{Id: *msgResult.Messages[0].MessageId,
+ Handle: *msgResult.Messages[0].ReceiptHandle,
+ Body: *msgResult.Messages[0].Body}
a.Logger.Println("Message received:", msg.Body)
return msg, nil
} else {
@@ -106,16 +109,55 @@ func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) {
}
}
-func (a *AwsConn) QueueHeartbeat(msgHandle string, qurl string, duration int64) error {
+// QueueHeartbeat updates the visibility timeout of a message. This
+// ensures that the message remains "in flight", meaning that it
+// cannot be seen by other processes, but if this process fails the
+// timeout will expire and it will go back to being available for
+// any other process to retrieve and process.
+//
+// SQS only allows messages to be "in flight" for up to 12 hours, so
+// this will detect if the request for an update to visibility timeout
+// fails, and if so will attempt to find the message on the queue, and
+// return it, as the handle will have changed.
+func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, error) {
_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
- ReceiptHandle: &msgHandle,
+ ReceiptHandle: &msg.Handle,
QueueUrl: &qurl,
VisibilityTimeout: &duration,
})
if err != nil {
- return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err))
+ aerr, ok := err.(awserr.Error)
+
+ // Check if the visibility timeout has exceeded the maximum allowed,
+ // and if so try to find the message again to get a new handle.
+ if ok && aerr.Code() == "InvalidParameterValue" {
+ // Try 3 times to find the message
+ for range [3]bool{} {
+ msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
+ MaxNumberOfMessages: aws.Int64(10),
+ VisibilityTimeout: &duration,
+ WaitTimeSeconds: aws.Int64(20),
+ QueueUrl: &qurl,
+ })
+ if err != nil {
+ return Qmsg{}, errors.New(fmt.Sprintf("Heartbeat error looking for message to update heartbeat: %s", err))
+ }
+ for _, m := range msgResult.Messages {
+ if *m.MessageId == msg.Id {
+ return Qmsg{
+ Id: *m.MessageId,
+ Handle: *m.ReceiptHandle,
+ Body: *m.Body,
+ }, nil
+ }
+ }
+ }
+ return Qmsg{}, errors.New("Heartbeat error failed to find message to update heartbeat")
+ } else {
+ return Qmsg{}, errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err))
+ }
}
- return nil
+ return Qmsg{}, nil
}
func (a *AwsConn) PreQueueId() string {
diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go
index 7ffacf8..1c2592b 100644
--- a/bookpipeline/cmd/bookpipeline/main.go
+++ b/bookpipeline/cmd/bookpipeline/main.go
@@ -54,7 +54,7 @@ type Clouder interface {
CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
AddToQueue(url string, msg string) error
DelFromQueue(url string, handle string) error
- QueueHeartbeat(msgHandle string, qurl string, duration int64) error
+ QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)
}
type Pipeliner interface {
@@ -232,19 +232,28 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log
close(up)
}
-func heartbeat(conn Pipeliner, t *time.Ticker, msg string, queue string, errc chan error) {
+func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) {
+ currentmsg := msg
for range t.C {
- err := conn.QueueHeartbeat(msg, queue, HeartbeatTime*2)
+ m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatTime*2)
if err != nil {
errc <- err
t.Stop()
return
}
+ if m.Id != "" {
+ currentmsg = m
+ // TODO: maybe handle communicating new msg more gracefully than this
+ for range msgc {
+ } // throw away any old msgc
+ msgc <- m
+ }
}
}
func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
dl := make(chan string)
+ msgc := make(chan bookpipeline.Qmsg)
processc := make(chan string)
upc := make(chan string)
done := make(chan bool)
@@ -259,7 +268,7 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
}
t := time.NewTicker(HeartbeatTime * time.Second)
- go heartbeat(conn, t, msg.Handle, fromQueue, errc)
+ go heartbeat(conn, t, msg, fromQueue, msgc, errc)
// these functions will do their jobs when their channels have data
go download(dl, processc, conn, d, errc, conn.GetLogger())
@@ -307,6 +316,15 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
t.Stop()
+ // check whether we're using a newer msg handle
+ select {
+ case m, ok := <-msgc :
+ if ok {
+ msg = m
+ }
+ default:
+ }
+
conn.GetLogger().Println("Deleting original message from queue", fromQueue)
err = conn.DelFromQueue(fromQueue, msg.Handle)
if err != nil {