diff options
| -rw-r--r-- | bookpipeline/aws.go | 54 | ||||
| -rw-r--r-- | bookpipeline/cmd/bookpipeline/main.go | 26 | 
2 files changed, 70 insertions, 10 deletions
| diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index a956cf1..e7ecd07 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -7,6 +7,7 @@ import (  	"os"  	"github.com/aws/aws-sdk-go/aws" +	"github.com/aws/aws-sdk-go/aws/awserr"  	"github.com/aws/aws-sdk-go/aws/session"  	"github.com/aws/aws-sdk-go/service/s3"  	"github.com/aws/aws-sdk-go/service/s3/s3manager" @@ -16,7 +17,7 @@ import (  const PreprocPattern = `_bin[0-9].[0-9].png`  type Qmsg struct { -	Handle, Body string +	Id, Handle, Body string  }  type AwsConn struct { @@ -98,7 +99,9 @@ func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) {  	}  	if len(msgResult.Messages) > 0 { -		msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body} +		msg := Qmsg{Id: *msgResult.Messages[0].MessageId, +		            Handle: *msgResult.Messages[0].ReceiptHandle, +		            Body: *msgResult.Messages[0].Body}  		a.Logger.Println("Message received:", msg.Body)  		return msg, nil  	} else { @@ -106,16 +109,55 @@ func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) {  	}  } -func (a *AwsConn) QueueHeartbeat(msgHandle string, qurl string, duration int64) error { +// QueueHeartbeat updates the visibility timeout of a message. This +// ensures that the message remains "in flight", meaning that it +// cannot be seen by other processes, but if this process fails the +// timeout will expire and it will go back to being available for +// any other process to retrieve and process. +// +// SQS only allows messages to be "in flight" for up to 12 hours, so +// this will detect if the request for an update to visibility timeout +// fails, and if so will attempt to find the message on the queue, and +// return it, as the handle will have changed. +func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, error) {  	_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ -		ReceiptHandle:     &msgHandle, +		ReceiptHandle:     &msg.Handle,  		QueueUrl:          &qurl,  		VisibilityTimeout: &duration,  	})  	if err != nil { -		return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err)) +		aerr, ok := err.(awserr.Error) + +		// Check if the visibility timeout has exceeded the maximum allowed, +		// and if so try to find the message again to get a new handle. +		if ok && aerr.Code() == "InvalidParameterValue" { +			// Try 3 times to find the message +			for range [3]bool{} { +				msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ +					MaxNumberOfMessages: aws.Int64(10), +					VisibilityTimeout:   &duration, +					WaitTimeSeconds:     aws.Int64(20), +					QueueUrl:            &qurl, +				}) +				if err != nil { +					return Qmsg{}, errors.New(fmt.Sprintf("Heartbeat error looking for message to update heartbeat: %s", err)) +				} +				for _, m := range msgResult.Messages { +					if *m.MessageId == msg.Id { +						return Qmsg{ +							Id: *m.MessageId, +							Handle: *m.ReceiptHandle, +							Body: *m.Body, +						}, nil +					} +				} +			} +			return Qmsg{}, errors.New("Heartbeat error failed to find message to update heartbeat") +		} else { +			return Qmsg{}, errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err)) +		}  	} -	return nil +	return Qmsg{}, nil  }  func (a *AwsConn) PreQueueId() string { diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go index 7ffacf8..1c2592b 100644 --- a/bookpipeline/cmd/bookpipeline/main.go +++ b/bookpipeline/cmd/bookpipeline/main.go @@ -54,7 +54,7 @@ type Clouder interface {  	CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)  	AddToQueue(url string, msg string) error  	DelFromQueue(url string, handle string) error -	QueueHeartbeat(msgHandle string, qurl string, duration int64) error +	QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)  }  type Pipeliner interface { @@ -232,19 +232,28 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log  	close(up)  } -func heartbeat(conn Pipeliner, t *time.Ticker, msg string, queue string, errc chan error) { +func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { +	currentmsg := msg  	for range t.C { -		err := conn.QueueHeartbeat(msg, queue, HeartbeatTime*2) +		m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatTime*2)  		if err != nil {  			errc <- err  			t.Stop()  			return  		} +		if m.Id != "" { +			currentmsg = m +			// TODO: maybe handle communicating new msg more gracefully than this +			for range msgc { +			} // throw away any old msgc +			msgc <- m +		}  	}  }  func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {  	dl := make(chan string) +	msgc := make(chan bookpipeline.Qmsg)  	processc := make(chan string)  	upc := make(chan string)  	done := make(chan bool) @@ -259,7 +268,7 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string  	}  	t := time.NewTicker(HeartbeatTime * time.Second) -	go heartbeat(conn, t, msg.Handle, fromQueue, errc) +	go heartbeat(conn, t, msg, fromQueue, msgc, errc)  	// these functions will do their jobs when their channels have data  	go download(dl, processc, conn, d, errc, conn.GetLogger()) @@ -307,6 +316,15 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string  	t.Stop() +	// check whether we're using a newer msg handle +	select { +	case m, ok := <-msgc : +		if ok { +			msg = m +		} +	default: +	} +  	conn.GetLogger().Println("Deleting original message from queue", fromQueue)  	err = conn.DelFromQueue(fromQueue, msg.Handle)  	if err != nil { | 
