summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bookpipeline/aws.go54
-rw-r--r--bookpipeline/cmd/bookpipeline/main.go26
2 files changed, 70 insertions, 10 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index a956cf1..e7ecd07 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -7,6 +7,7 @@ import (
"os"
"github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
@@ -16,7 +17,7 @@ import (
const PreprocPattern = `_bin[0-9].[0-9].png`
type Qmsg struct {
- Handle, Body string
+ Id, Handle, Body string
}
type AwsConn struct {
@@ -98,7 +99,9 @@ func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) {
}
if len(msgResult.Messages) > 0 {
- msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body}
+ msg := Qmsg{Id: *msgResult.Messages[0].MessageId,
+ Handle: *msgResult.Messages[0].ReceiptHandle,
+ Body: *msgResult.Messages[0].Body}
a.Logger.Println("Message received:", msg.Body)
return msg, nil
} else {
@@ -106,16 +109,55 @@ func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) {
}
}
-func (a *AwsConn) QueueHeartbeat(msgHandle string, qurl string, duration int64) error {
+// QueueHeartbeat updates the visibility timeout of a message. This
+// ensures that the message remains "in flight", meaning that it
+// cannot be seen by other processes, but if this process fails the
+// timeout will expire and it will go back to being available for
+// any other process to retrieve and process.
+//
+// SQS only allows messages to be "in flight" for up to 12 hours, so
+// this will detect if the request for an update to visibility timeout
+// fails, and if so will attempt to find the message on the queue, and
+// return it, as the handle will have changed.
+func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, error) {
_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
- ReceiptHandle: &msgHandle,
+ ReceiptHandle: &msg.Handle,
QueueUrl: &qurl,
VisibilityTimeout: &duration,
})
if err != nil {
- return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err))
+ aerr, ok := err.(awserr.Error)
+
+ // Check if the visibility timeout has exceeded the maximum allowed,
+ // and if so try to find the message again to get a new handle.
+ if ok && aerr.Code() == "InvalidParameterValue" {
+ // Try 3 times to find the message
+ for range [3]bool{} {
+ msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
+ MaxNumberOfMessages: aws.Int64(10),
+ VisibilityTimeout: &duration,
+ WaitTimeSeconds: aws.Int64(20),
+ QueueUrl: &qurl,
+ })
+ if err != nil {
+ return Qmsg{}, errors.New(fmt.Sprintf("Heartbeat error looking for message to update heartbeat: %s", err))
+ }
+ for _, m := range msgResult.Messages {
+ if *m.MessageId == msg.Id {
+ return Qmsg{
+ Id: *m.MessageId,
+ Handle: *m.ReceiptHandle,
+ Body: *m.Body,
+ }, nil
+ }
+ }
+ }
+ return Qmsg{}, errors.New("Heartbeat error failed to find message to update heartbeat")
+ } else {
+ return Qmsg{}, errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err))
+ }
}
- return nil
+ return Qmsg{}, nil
}
func (a *AwsConn) PreQueueId() string {
diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go
index 7ffacf8..1c2592b 100644
--- a/bookpipeline/cmd/bookpipeline/main.go
+++ b/bookpipeline/cmd/bookpipeline/main.go
@@ -54,7 +54,7 @@ type Clouder interface {
CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
AddToQueue(url string, msg string) error
DelFromQueue(url string, handle string) error
- QueueHeartbeat(msgHandle string, qurl string, duration int64) error
+ QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)
}
type Pipeliner interface {
@@ -232,19 +232,28 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log
close(up)
}
-func heartbeat(conn Pipeliner, t *time.Ticker, msg string, queue string, errc chan error) {
+func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) {
+ currentmsg := msg
for range t.C {
- err := conn.QueueHeartbeat(msg, queue, HeartbeatTime*2)
+ m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatTime*2)
if err != nil {
errc <- err
t.Stop()
return
}
+ if m.Id != "" {
+ currentmsg = m
+ // TODO: maybe handle communicating new msg more gracefully than this
+ for range msgc {
+ } // throw away any old msgc
+ msgc <- m
+ }
}
}
func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
dl := make(chan string)
+ msgc := make(chan bookpipeline.Qmsg)
processc := make(chan string)
upc := make(chan string)
done := make(chan bool)
@@ -259,7 +268,7 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
}
t := time.NewTicker(HeartbeatTime * time.Second)
- go heartbeat(conn, t, msg.Handle, fromQueue, errc)
+ go heartbeat(conn, t, msg, fromQueue, msgc, errc)
// these functions will do their jobs when their channels have data
go download(dl, processc, conn, d, errc, conn.GetLogger())
@@ -307,6 +316,15 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
t.Stop()
+ // check whether we're using a newer msg handle
+ select {
+ case m, ok := <-msgc :
+ if ok {
+ msg = m
+ }
+ default:
+ }
+
conn.GetLogger().Println("Deleting original message from queue", fromQueue)
err = conn.DelFromQueue(fromQueue, msg.Handle)
if err != nil {