diff options
| author | Nick White <git@njw.name> | 2019-09-04 20:40:07 +0100 | 
|---|---|---|
| committer | Nick White <git@njw.name> | 2019-09-04 20:40:07 +0100 | 
| commit | c47e27ee5146b8c2dcf058ed58270ce691b43ff7 (patch) | |
| tree | 4f70cec07b0b3e8945bfe443fb836110b4eff61b /bookpipeline/aws.go | |
| parent | 026ebd62c0deec8da03ee22959f433db82bfda4e (diff) | |
Rewrite heartbeat so errors during it will be reported, and the aws api doesn't rely on channels
Diffstat (limited to 'bookpipeline/aws.go')
| -rw-r--r-- | bookpipeline/aws.go | 25 | 
1 files changed, 10 insertions, 15 deletions
| diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index 7409434..a111ebf 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -5,7 +5,6 @@ import (  	"fmt"  	"log"  	"os" -	"time"  	"github.com/aws/aws-sdk-go/aws"  	"github.com/aws/aws-sdk-go/aws/session" @@ -15,7 +14,6 @@ import (  )  const PreprocPattern = `_bin[0-9].[0-9].png` -const HeartbeatTime = 60  type Qmsg struct {          Handle, Body string @@ -88,10 +86,10 @@ func (a *AwsConn) Init() error {  	return nil  } -func (a *AwsConn) CheckQueue(url string) (Qmsg, error) { +func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) {  	msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{  		MaxNumberOfMessages: aws.Int64(1), -		VisibilityTimeout:   aws.Int64(HeartbeatTime * 2), +		VisibilityTimeout:   &timeout,  		WaitTimeSeconds:     aws.Int64(20),  		QueueUrl:            &url,  	}) @@ -108,17 +106,14 @@ func (a *AwsConn) CheckQueue(url string) (Qmsg, error) {  	}  } -func (a *AwsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { -	for _ = range t.C { -		duration := int64(HeartbeatTime * 2) -		_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ -			ReceiptHandle:     &msgHandle, -			QueueUrl:          &qurl, -			VisibilityTimeout: &duration, -		}) -		if err != nil { -			return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err)) -		} +func (a *AwsConn) QueueHeartbeat(msgHandle string, qurl string, duration int64) error { +	_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ +		ReceiptHandle:     &msgHandle, +		QueueUrl:          &qurl, +		VisibilityTimeout: &duration, +	}) +	if err != nil { +		return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err))  	}  	return nil  } | 
