summaryrefslogtreecommitdiff
path: root/bookpipeline/aws.go
diff options
context:
space:
mode:
Diffstat (limited to 'bookpipeline/aws.go')
-rw-r--r--bookpipeline/aws.go84
1 files changed, 42 insertions, 42 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index 1ac06de..2322ea2 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -24,11 +24,11 @@ type awsConn struct {
logger *log.Logger
// these are used internally
- sess *session.Session
- s3svc *s3.S3
- sqssvc *sqs.SQS
- downloader *s3manager.Downloader
- uploader *s3manager.Uploader
+ sess *session.Session
+ s3svc *s3.S3
+ sqssvc *sqs.SQS
+ downloader *s3manager.Downloader
+ uploader *s3manager.Uploader
prequrl, ocrqurl, analysequrl string
}
@@ -52,32 +52,32 @@ func (a *awsConn) Init() error {
a.downloader = s3manager.NewDownloader(a.sess)
a.uploader = s3manager.NewUploader(a.sess)
- a.logger.Println("Getting preprocess queue URL")
- result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
- QueueName: aws.String("rescribepreprocess"),
- })
- if err != nil {
- return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err))
- }
- a.prequrl = *result.QueueUrl
-
- a.logger.Println("Getting OCR queue URL")
- result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
- QueueName: aws.String("rescribeocr"),
- })
- if err != nil {
- return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err))
- }
- a.ocrqurl = *result.QueueUrl
-
- a.logger.Println("Getting analyse queue URL")
- result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
- QueueName: aws.String("rescribeanalyse"),
- })
- if err != nil {
- return errors.New(fmt.Sprintf("Error getting analyse queue URL: %s", err))
- }
- a.analysequrl = *result.QueueUrl
+ a.logger.Println("Getting preprocess queue URL")
+ result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String("rescribepreprocess"),
+ })
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err))
+ }
+ a.prequrl = *result.QueueUrl
+
+ a.logger.Println("Getting OCR queue URL")
+ result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String("rescribeocr"),
+ })
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err))
+ }
+ a.ocrqurl = *result.QueueUrl
+
+ a.logger.Println("Getting analyse queue URL")
+ result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String("rescribeanalyse"),
+ })
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error getting analyse queue URL: %s", err))
+ }
+ a.analysequrl = *result.QueueUrl
return nil
}
@@ -85,16 +85,16 @@ func (a *awsConn) Init() error {
func (a *awsConn) CheckQueue(url string) (Qmsg, error) {
msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(1),
- VisibilityTimeout: aws.Int64(HeartbeatTime * 2),
- WaitTimeSeconds: aws.Int64(20),
- QueueUrl: &url,
+ VisibilityTimeout: aws.Int64(HeartbeatTime * 2),
+ WaitTimeSeconds: aws.Int64(20),
+ QueueUrl: &url,
})
if err != nil {
return Qmsg{}, err
}
if len(msgResult.Messages) > 0 {
- msg := Qmsg{ Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body }
+ msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body}
a.logger.Println("Message received:", msg.Body)
return msg, nil
} else {
@@ -121,8 +121,8 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string)
for _ = range t.C {
duration := int64(HeartbeatTime * 2)
_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
- ReceiptHandle: &msgHandle,
- QueueUrl: &qurl,
+ ReceiptHandle: &msgHandle,
+ QueueUrl: &qurl,
VisibilityTimeout: &duration,
})
if err != nil {
@@ -183,7 +183,7 @@ func (a *awsConn) ListToOCR(bookname string) ([]string, error) {
}
// Filter out any object that looks like it hasn't already been preprocessed
for _, n := range objs {
- if ! preprocessed.MatchString(n) {
+ if !preprocessed.MatchString(n) {
a.logger.Println("Skipping item that looks like it is not preprocessed", n)
continue
}
@@ -195,7 +195,7 @@ func (a *awsConn) ListToOCR(bookname string) ([]string, error) {
func (a *awsConn) AddToQueue(url string, msg string) error {
_, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{
MessageBody: &msg,
- QueueUrl: &url,
+ QueueUrl: &url,
})
return err
}
@@ -210,7 +210,7 @@ func (a *awsConn) AddToAnalyseQueue(msg string) error {
func (a *awsConn) DelFromQueue(url string, handle string) error {
_, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{
- QueueUrl: &url,
+ QueueUrl: &url,
ReceiptHandle: &handle,
})
return err
@@ -234,8 +234,8 @@ func (a *awsConn) Download(bucket string, key string, path string) error {
_, err = a.downloader.Download(f,
&s3.GetObjectInput{
Bucket: aws.String(bucket),
- Key: &key,
- })
+ Key: &key,
+ })
return err
}