diff options
Diffstat (limited to 'bookpipeline')
-rw-r--r-- | bookpipeline/aws.go | 84 | ||||
-rw-r--r-- | bookpipeline/main.go | 28 |
2 files changed, 57 insertions, 55 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index 1ac06de..2322ea2 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -24,11 +24,11 @@ type awsConn struct { logger *log.Logger // these are used internally - sess *session.Session - s3svc *s3.S3 - sqssvc *sqs.SQS - downloader *s3manager.Downloader - uploader *s3manager.Uploader + sess *session.Session + s3svc *s3.S3 + sqssvc *sqs.SQS + downloader *s3manager.Downloader + uploader *s3manager.Uploader prequrl, ocrqurl, analysequrl string } @@ -52,32 +52,32 @@ func (a *awsConn) Init() error { a.downloader = s3manager.NewDownloader(a.sess) a.uploader = s3manager.NewUploader(a.sess) - a.logger.Println("Getting preprocess queue URL") - result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String("rescribepreprocess"), - }) - if err != nil { - return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err)) - } - a.prequrl = *result.QueueUrl - - a.logger.Println("Getting OCR queue URL") - result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String("rescribeocr"), - }) - if err != nil { - return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err)) - } - a.ocrqurl = *result.QueueUrl - - a.logger.Println("Getting analyse queue URL") - result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String("rescribeanalyse"), - }) - if err != nil { - return errors.New(fmt.Sprintf("Error getting analyse queue URL: %s", err)) - } - a.analysequrl = *result.QueueUrl + a.logger.Println("Getting preprocess queue URL") + result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String("rescribepreprocess"), + }) + if err != nil { + return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err)) + } + a.prequrl = *result.QueueUrl + + a.logger.Println("Getting OCR queue URL") + result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String("rescribeocr"), + }) + if err != nil { + return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err)) + } + a.ocrqurl = *result.QueueUrl + + a.logger.Println("Getting analyse queue URL") + result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String("rescribeanalyse"), + }) + if err != nil { + return errors.New(fmt.Sprintf("Error getting analyse queue URL: %s", err)) + } + a.analysequrl = *result.QueueUrl return nil } @@ -85,16 +85,16 @@ func (a *awsConn) Init() error { func (a *awsConn) CheckQueue(url string) (Qmsg, error) { msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ MaxNumberOfMessages: aws.Int64(1), - VisibilityTimeout: aws.Int64(HeartbeatTime * 2), - WaitTimeSeconds: aws.Int64(20), - QueueUrl: &url, + VisibilityTimeout: aws.Int64(HeartbeatTime * 2), + WaitTimeSeconds: aws.Int64(20), + QueueUrl: &url, }) if err != nil { return Qmsg{}, err } if len(msgResult.Messages) > 0 { - msg := Qmsg{ Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body } + msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body} a.logger.Println("Message received:", msg.Body) return msg, nil } else { @@ -121,8 +121,8 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) for _ = range t.C { duration := int64(HeartbeatTime * 2) _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ - ReceiptHandle: &msgHandle, - QueueUrl: &qurl, + ReceiptHandle: &msgHandle, + QueueUrl: &qurl, VisibilityTimeout: &duration, }) if err != nil { @@ -183,7 +183,7 @@ func (a *awsConn) ListToOCR(bookname string) ([]string, error) { } // Filter out any object that looks like it hasn't already been preprocessed for _, n := range objs { - if ! preprocessed.MatchString(n) { + if !preprocessed.MatchString(n) { a.logger.Println("Skipping item that looks like it is not preprocessed", n) continue } @@ -195,7 +195,7 @@ func (a *awsConn) ListToOCR(bookname string) ([]string, error) { func (a *awsConn) AddToQueue(url string, msg string) error { _, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{ MessageBody: &msg, - QueueUrl: &url, + QueueUrl: &url, }) return err } @@ -210,7 +210,7 @@ func (a *awsConn) AddToAnalyseQueue(msg string) error { func (a *awsConn) DelFromQueue(url string, handle string) error { _, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ - QueueUrl: &url, + QueueUrl: &url, ReceiptHandle: &handle, }) return err @@ -234,8 +234,8 @@ func (a *awsConn) Download(bucket string, key string, path string) error { _, err = a.downloader.Download(f, &s3.GetObjectInput{ Bucket: aws.String(bucket), - Key: &key, - }) + Key: &key, + }) return err } diff --git a/bookpipeline/main.go b/bookpipeline/main.go index c1fb547..24e4e74 100644 --- a/bookpipeline/main.go +++ b/bookpipeline/main.go @@ -1,4 +1,5 @@ package main + // TODO: have logs go somewhere useful, like email // TODO: check if images are prebinarised and if so skip multiple binarisation @@ -37,6 +38,7 @@ const training = "rescribealphav5" // TODO: allow to set on cmdline // null writer to enable non-verbose logging to be discarded type NullWriter bool + func (w NullWriter) Write(p []byte) (n int, err error) { return len(p), nil } @@ -175,11 +177,11 @@ func preprocBook(msg Qmsg, conn Pipeliner) error { // wait for either the done or errc channel to be sent to select { - case err = <-errc: - t.Stop() - _ = os.RemoveAll(d) - return err - case <-done: + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + return err + case <-done: } conn.Logger().Println("Sending", bookname, "to OCR queue") @@ -246,11 +248,11 @@ func ocrBook(msg Qmsg, conn Pipeliner) error { // wait for either the done or errc channel to be sent to select { - case err = <-errc: - t.Stop() - _ = os.RemoveAll(d) - return err - case <-done: + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + return err + case <-done: } conn.Logger().Println("Sending", bookname, "to analyse queue") @@ -292,7 +294,7 @@ func main() { } var conn Pipeliner - conn = &awsConn{ region: "eu-west-2", logger: verboselog } + conn = &awsConn{region: "eu-west-2", logger: verboselog} verboselog.Println("Setting up AWS session") err := conn.Init() @@ -308,7 +310,7 @@ func main() { for { select { - case <- checkPreQueue: + case <-checkPreQueue: msg, err := conn.CheckPreQueue() checkPreQueue = time.After(PauseBetweenChecks) if err != nil { @@ -323,7 +325,7 @@ func main() { if err != nil { log.Println("Error during preprocess", err) } - case <- checkOCRQueue: + case <-checkOCRQueue: msg, err := conn.CheckOCRQueue() checkOCRQueue = time.After(PauseBetweenChecks) if err != nil { |