diff options
Diffstat (limited to 'bookpipeline/aws.go')
-rw-r--r-- | bookpipeline/aws.go | 84 |
1 files changed, 42 insertions, 42 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index 1ac06de..2322ea2 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -24,11 +24,11 @@ type awsConn struct { logger *log.Logger // these are used internally - sess *session.Session - s3svc *s3.S3 - sqssvc *sqs.SQS - downloader *s3manager.Downloader - uploader *s3manager.Uploader + sess *session.Session + s3svc *s3.S3 + sqssvc *sqs.SQS + downloader *s3manager.Downloader + uploader *s3manager.Uploader prequrl, ocrqurl, analysequrl string } @@ -52,32 +52,32 @@ func (a *awsConn) Init() error { a.downloader = s3manager.NewDownloader(a.sess) a.uploader = s3manager.NewUploader(a.sess) - a.logger.Println("Getting preprocess queue URL") - result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String("rescribepreprocess"), - }) - if err != nil { - return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err)) - } - a.prequrl = *result.QueueUrl - - a.logger.Println("Getting OCR queue URL") - result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String("rescribeocr"), - }) - if err != nil { - return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err)) - } - a.ocrqurl = *result.QueueUrl - - a.logger.Println("Getting analyse queue URL") - result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String("rescribeanalyse"), - }) - if err != nil { - return errors.New(fmt.Sprintf("Error getting analyse queue URL: %s", err)) - } - a.analysequrl = *result.QueueUrl + a.logger.Println("Getting preprocess queue URL") + result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String("rescribepreprocess"), + }) + if err != nil { + return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err)) + } + a.prequrl = *result.QueueUrl + + a.logger.Println("Getting OCR queue URL") + result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String("rescribeocr"), + }) + if err != nil { + return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err)) + } + a.ocrqurl = *result.QueueUrl + + a.logger.Println("Getting analyse queue URL") + result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String("rescribeanalyse"), + }) + if err != nil { + return errors.New(fmt.Sprintf("Error getting analyse queue URL: %s", err)) + } + a.analysequrl = *result.QueueUrl return nil } @@ -85,16 +85,16 @@ func (a *awsConn) Init() error { func (a *awsConn) CheckQueue(url string) (Qmsg, error) { msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ MaxNumberOfMessages: aws.Int64(1), - VisibilityTimeout: aws.Int64(HeartbeatTime * 2), - WaitTimeSeconds: aws.Int64(20), - QueueUrl: &url, + VisibilityTimeout: aws.Int64(HeartbeatTime * 2), + WaitTimeSeconds: aws.Int64(20), + QueueUrl: &url, }) if err != nil { return Qmsg{}, err } if len(msgResult.Messages) > 0 { - msg := Qmsg{ Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body } + msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body} a.logger.Println("Message received:", msg.Body) return msg, nil } else { @@ -121,8 +121,8 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) for _ = range t.C { duration := int64(HeartbeatTime * 2) _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ - ReceiptHandle: &msgHandle, - QueueUrl: &qurl, + ReceiptHandle: &msgHandle, + QueueUrl: &qurl, VisibilityTimeout: &duration, }) if err != nil { @@ -183,7 +183,7 @@ func (a *awsConn) ListToOCR(bookname string) ([]string, error) { } // Filter out any object that looks like it hasn't already been preprocessed for _, n := range objs { - if ! preprocessed.MatchString(n) { + if !preprocessed.MatchString(n) { a.logger.Println("Skipping item that looks like it is not preprocessed", n) continue } @@ -195,7 +195,7 @@ func (a *awsConn) ListToOCR(bookname string) ([]string, error) { func (a *awsConn) AddToQueue(url string, msg string) error { _, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{ MessageBody: &msg, - QueueUrl: &url, + QueueUrl: &url, }) return err } @@ -210,7 +210,7 @@ func (a *awsConn) AddToAnalyseQueue(msg string) error { func (a *awsConn) DelFromQueue(url string, handle string) error { _, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ - QueueUrl: &url, + QueueUrl: &url, ReceiptHandle: &handle, }) return err @@ -234,8 +234,8 @@ func (a *awsConn) Download(bucket string, key string, path string) error { _, err = a.downloader.Download(f, &s3.GetObjectInput{ Bucket: aws.String(bucket), - Key: &key, - }) + Key: &key, + }) return err } |