diff options
| -rw-r--r-- | aws.go | 17 | ||||
| -rw-r--r-- | cmd/bookpipeline/main.go | 4 | 
2 files changed, 17 insertions, 4 deletions
| @@ -17,7 +17,7 @@ import (  )  const PreprocPattern = `_bin[0-9].[0-9].png` -const heartbeatRetry = 20 +const heartbeatRetry = 100  type Qmsg struct {  	Id, Handle, Body string @@ -155,8 +155,17 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e  		// Check if the visibility timeout has exceeded the maximum allowed,  		// and if so try to find the message again to get a new handle.  		if ok && aerr.Code() == "InvalidParameterValue" { +			// First try to set the visibilitytimeout to zero to immediately +			// make the message available to receive +			z := int64(0) +			_, _ = a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ +				ReceiptHandle:     &msg.Handle, +				QueueUrl:          &qurl, +				VisibilityTimeout: &z, +			}) +  			// Try heartbeatRetry times to find the message -			for range [heartbeatRetry]bool{} { +			for i := 0; i < heartbeatRetry; i++ {  				msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{  					MaxNumberOfMessages: aws.Int64(10),  					VisibilityTimeout:   &duration, @@ -175,8 +184,8 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e  						}, nil  					}  				} -				// Wait a little in case existing visibilitytimeout needs to expire -				time.Sleep((2 * time.Duration(duration) / heartbeatRetry) * time.Second) +				// Wait a second before trying again +				time.Sleep(time.Second)  			}  			return Qmsg{}, errors.New("Heartbeat error failed to find message to update heartbeat")  		} else { diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 2179549..f7da588 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -255,6 +255,10 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri  	for range t.C {  		m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatTime*2)  		if err != nil { +			// TODO: would be better to ensure this error stops any running +			//       processes, as they will ultimately fail in the case of +			//       it. could do this by setting a global variable that +			//       processes check each time they loop.  			errc <- err  			t.Stop()  			return | 
