diff options
| -rw-r--r-- | bookpipeline/aws.go | 25 | ||||
| -rw-r--r-- | bookpipeline/cmd/bookpipeline/main.go | 38 | 
2 files changed, 34 insertions, 29 deletions
| diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index 7409434..a111ebf 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -5,7 +5,6 @@ import (  	"fmt"  	"log"  	"os" -	"time"  	"github.com/aws/aws-sdk-go/aws"  	"github.com/aws/aws-sdk-go/aws/session" @@ -15,7 +14,6 @@ import (  )  const PreprocPattern = `_bin[0-9].[0-9].png` -const HeartbeatTime = 60  type Qmsg struct {          Handle, Body string @@ -88,10 +86,10 @@ func (a *AwsConn) Init() error {  	return nil  } -func (a *AwsConn) CheckQueue(url string) (Qmsg, error) { +func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) {  	msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{  		MaxNumberOfMessages: aws.Int64(1), -		VisibilityTimeout:   aws.Int64(HeartbeatTime * 2), +		VisibilityTimeout:   &timeout,  		WaitTimeSeconds:     aws.Int64(20),  		QueueUrl:            &url,  	}) @@ -108,17 +106,14 @@ func (a *AwsConn) CheckQueue(url string) (Qmsg, error) {  	}  } -func (a *AwsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { -	for _ = range t.C { -		duration := int64(HeartbeatTime * 2) -		_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ -			ReceiptHandle:     &msgHandle, -			QueueUrl:          &qurl, -			VisibilityTimeout: &duration, -		}) -		if err != nil { -			return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err)) -		} +func (a *AwsConn) QueueHeartbeat(msgHandle string, qurl string, duration int64) error { +	_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ +		ReceiptHandle:     &msgHandle, +		QueueUrl:          &qurl, +		VisibilityTimeout: &duration, +	}) +	if err != nil { +		return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err))  	}  	return nil  } diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go index c7dde5b..913ccc7 100644 --- a/bookpipeline/cmd/bookpipeline/main.go +++ b/bookpipeline/cmd/bookpipeline/main.go @@ -52,10 +52,10 @@ type Clouder interface {  	ListObjects(bucket string, prefix string) ([]string, error)  	Download(bucket string, key string, fn string) error  	Upload(bucket string, key string, path string) error -	CheckQueue(url string) (bookpipeline.Qmsg, error) +	CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)  	AddToQueue(url string, msg string) error  	DelFromQueue(url string, handle string) error -	QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error +	QueueHeartbeat(msgHandle string, qurl string, duration int64) error  }  type Pipeliner interface { @@ -225,24 +225,34 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log  	close(up)  } +func heartbeat(conn Pipeliner, t *time.Ticker, msg string, queue string, errc chan error) { +	for range t.C { +		err := conn.QueueHeartbeat(msg, queue, HeartbeatTime * 2) +		if err != nil { +			errc <- err +			t.Stop() +			return +		} +	} +} +  func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { -	bookname := msg.Body +	dl := make(chan string) +	processc := make(chan string) +	upc := make(chan string) +	done := make(chan bool) +	errc := make(chan error) -	t := time.NewTicker(HeartbeatTime * time.Second) -	go conn.QueueHeartbeat(t, msg.Handle, fromQueue) +	bookname := msg.Body  	d := filepath.Join(os.TempDir(), bookname)  	err := os.MkdirAll(d, 0755)  	if err != nil { -		t.Stop()  		return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err))  	} -	dl := make(chan string) -	processc := make(chan string) -	upc := make(chan string) -	done := make(chan bool) -	errc := make(chan error) +	t := time.NewTicker(HeartbeatTime * time.Second) +	go heartbeat(conn, t, msg.Handle, fromQueue, errc)  	// these functions will do their jobs when their channels have data  	go download(dl, processc, conn, d, errc, conn.GetLogger()) @@ -346,7 +356,7 @@ func main() {  	for {  		select {  		case <-checkPreQueue: -			msg, err := conn.CheckQueue(conn.PreQueueId()) +			msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatTime * 2)  			checkPreQueue = time.After(PauseBetweenChecks)  			if err != nil {  				log.Println("Error checking preprocess queue", err) @@ -362,7 +372,7 @@ func main() {  				log.Println("Error during preprocess", err)  			}  		case <-checkOCRQueue: -			msg, err := conn.CheckQueue(conn.OCRQueueId()) +			msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime * 2)  			checkOCRQueue = time.After(PauseBetweenChecks)  			if err != nil {  				log.Println("Error checking OCR queue", err) @@ -378,7 +388,7 @@ func main() {  				log.Println("Error during OCR process", err)  			}  		case <-checkAnalyseQueue: -			msg, err := conn.CheckQueue(conn.AnalyseQueueId()) +			msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatTime * 2)  			checkAnalyseQueue = time.After(PauseBetweenChecks)  			if err != nil {  				log.Println("Error checking analyse queue", err) | 
