summaryrefslogtreecommitdiff
path: root/bookpipeline
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-09-04 20:40:07 +0100
committerNick White <git@njw.name>2019-09-04 20:40:07 +0100
commitc47e27ee5146b8c2dcf058ed58270ce691b43ff7 (patch)
tree4f70cec07b0b3e8945bfe443fb836110b4eff61b /bookpipeline
parent026ebd62c0deec8da03ee22959f433db82bfda4e (diff)
Rewrite heartbeat so errors during it will be reported, and the aws api doesn't rely on channels
Diffstat (limited to 'bookpipeline')
-rw-r--r--bookpipeline/aws.go25
-rw-r--r--bookpipeline/cmd/bookpipeline/main.go38
2 files changed, 34 insertions, 29 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index 7409434..a111ebf 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -5,7 +5,6 @@ import (
"fmt"
"log"
"os"
- "time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
@@ -15,7 +14,6 @@ import (
)
const PreprocPattern = `_bin[0-9].[0-9].png`
-const HeartbeatTime = 60
type Qmsg struct {
Handle, Body string
@@ -88,10 +86,10 @@ func (a *AwsConn) Init() error {
return nil
}
-func (a *AwsConn) CheckQueue(url string) (Qmsg, error) {
+func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) {
msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(1),
- VisibilityTimeout: aws.Int64(HeartbeatTime * 2),
+ VisibilityTimeout: &timeout,
WaitTimeSeconds: aws.Int64(20),
QueueUrl: &url,
})
@@ -108,17 +106,14 @@ func (a *AwsConn) CheckQueue(url string) (Qmsg, error) {
}
}
-func (a *AwsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {
- for _ = range t.C {
- duration := int64(HeartbeatTime * 2)
- _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
- ReceiptHandle: &msgHandle,
- QueueUrl: &qurl,
- VisibilityTimeout: &duration,
- })
- if err != nil {
- return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err))
- }
+func (a *AwsConn) QueueHeartbeat(msgHandle string, qurl string, duration int64) error {
+ _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
+ ReceiptHandle: &msgHandle,
+ QueueUrl: &qurl,
+ VisibilityTimeout: &duration,
+ })
+ if err != nil {
+ return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err))
}
return nil
}
diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go
index c7dde5b..913ccc7 100644
--- a/bookpipeline/cmd/bookpipeline/main.go
+++ b/bookpipeline/cmd/bookpipeline/main.go
@@ -52,10 +52,10 @@ type Clouder interface {
ListObjects(bucket string, prefix string) ([]string, error)
Download(bucket string, key string, fn string) error
Upload(bucket string, key string, path string) error
- CheckQueue(url string) (bookpipeline.Qmsg, error)
+ CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
AddToQueue(url string, msg string) error
DelFromQueue(url string, handle string) error
- QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error
+ QueueHeartbeat(msgHandle string, qurl string, duration int64) error
}
type Pipeliner interface {
@@ -225,24 +225,34 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log
close(up)
}
+func heartbeat(conn Pipeliner, t *time.Ticker, msg string, queue string, errc chan error) {
+ for range t.C {
+ err := conn.QueueHeartbeat(msg, queue, HeartbeatTime * 2)
+ if err != nil {
+ errc <- err
+ t.Stop()
+ return
+ }
+ }
+}
+
func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
- bookname := msg.Body
+ dl := make(chan string)
+ processc := make(chan string)
+ upc := make(chan string)
+ done := make(chan bool)
+ errc := make(chan error)
- t := time.NewTicker(HeartbeatTime * time.Second)
- go conn.QueueHeartbeat(t, msg.Handle, fromQueue)
+ bookname := msg.Body
d := filepath.Join(os.TempDir(), bookname)
err := os.MkdirAll(d, 0755)
if err != nil {
- t.Stop()
return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err))
}
- dl := make(chan string)
- processc := make(chan string)
- upc := make(chan string)
- done := make(chan bool)
- errc := make(chan error)
+ t := time.NewTicker(HeartbeatTime * time.Second)
+ go heartbeat(conn, t, msg.Handle, fromQueue, errc)
// these functions will do their jobs when their channels have data
go download(dl, processc, conn, d, errc, conn.GetLogger())
@@ -346,7 +356,7 @@ func main() {
for {
select {
case <-checkPreQueue:
- msg, err := conn.CheckQueue(conn.PreQueueId())
+ msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatTime * 2)
checkPreQueue = time.After(PauseBetweenChecks)
if err != nil {
log.Println("Error checking preprocess queue", err)
@@ -362,7 +372,7 @@ func main() {
log.Println("Error during preprocess", err)
}
case <-checkOCRQueue:
- msg, err := conn.CheckQueue(conn.OCRQueueId())
+ msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime * 2)
checkOCRQueue = time.After(PauseBetweenChecks)
if err != nil {
log.Println("Error checking OCR queue", err)
@@ -378,7 +388,7 @@ func main() {
log.Println("Error during OCR process", err)
}
case <-checkAnalyseQueue:
- msg, err := conn.CheckQueue(conn.AnalyseQueueId())
+ msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatTime * 2)
checkAnalyseQueue = time.After(PauseBetweenChecks)
if err != nil {
log.Println("Error checking analyse queue", err)