summaryrefslogtreecommitdiff
path: root/bookpipeline/aws.go
diff options
context:
space:
mode:
Diffstat (limited to 'bookpipeline/aws.go')
-rw-r--r--bookpipeline/aws.go25
1 files changed, 10 insertions, 15 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index 7409434..a111ebf 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -5,7 +5,6 @@ import (
"fmt"
"log"
"os"
- "time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
@@ -15,7 +14,6 @@ import (
)
const PreprocPattern = `_bin[0-9].[0-9].png`
-const HeartbeatTime = 60
type Qmsg struct {
Handle, Body string
@@ -88,10 +86,10 @@ func (a *AwsConn) Init() error {
return nil
}
-func (a *AwsConn) CheckQueue(url string) (Qmsg, error) {
+func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) {
msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(1),
- VisibilityTimeout: aws.Int64(HeartbeatTime * 2),
+ VisibilityTimeout: &timeout,
WaitTimeSeconds: aws.Int64(20),
QueueUrl: &url,
})
@@ -108,17 +106,14 @@ func (a *AwsConn) CheckQueue(url string) (Qmsg, error) {
}
}
-func (a *AwsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {
- for _ = range t.C {
- duration := int64(HeartbeatTime * 2)
- _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
- ReceiptHandle: &msgHandle,
- QueueUrl: &qurl,
- VisibilityTimeout: &duration,
- })
- if err != nil {
- return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err))
- }
+func (a *AwsConn) QueueHeartbeat(msgHandle string, qurl string, duration int64) error {
+ _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
+ ReceiptHandle: &msgHandle,
+ QueueUrl: &qurl,
+ VisibilityTimeout: &duration,
+ })
+ if err != nil {
+ return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err))
}
return nil
}