summaryrefslogtreecommitdiff
path: root/bookpipeline/aws.go
diff options
context:
space:
mode:
Diffstat (limited to 'bookpipeline/aws.go')
-rw-r--r--bookpipeline/aws.go56
1 files changed, 30 insertions, 26 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index 761031d..7409434 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -1,4 +1,4 @@
-package main
+package bookpipeline
import (
"errors"
@@ -17,10 +17,14 @@ import (
const PreprocPattern = `_bin[0-9].[0-9].png`
const HeartbeatTime = 60
-type awsConn struct {
+type Qmsg struct {
+ Handle, Body string
+}
+
+type AwsConn struct {
// these need to be set before running Init()
- region string
- logger *log.Logger
+ Region string
+ Logger *log.Logger
// these are used internally
sess *session.Session
@@ -32,17 +36,17 @@ type awsConn struct {
wipstorageid string
}
-func (a *awsConn) Init() error {
- if a.region == "" {
- return errors.New("No region set")
+func (a *AwsConn) Init() error {
+ if a.Region == "" {
+ return errors.New("No Region set")
}
- if a.logger == nil {
+ if a.Logger == nil {
return errors.New("No logger set")
}
var err error
a.sess, err = session.NewSession(&aws.Config{
- Region: aws.String(a.region),
+ Region: aws.String(a.Region),
})
if err != nil {
return errors.New(fmt.Sprintf("Failed to set up aws session: %s", err))
@@ -52,7 +56,7 @@ func (a *awsConn) Init() error {
a.downloader = s3manager.NewDownloader(a.sess)
a.uploader = s3manager.NewUploader(a.sess)
- a.logger.Println("Getting preprocess queue URL")
+ a.Logger.Println("Getting preprocess queue URL")
result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String("rescribepreprocess"),
})
@@ -61,7 +65,7 @@ func (a *awsConn) Init() error {
}
a.prequrl = *result.QueueUrl
- a.logger.Println("Getting OCR queue URL")
+ a.Logger.Println("Getting OCR queue URL")
result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String("rescribeocr"),
})
@@ -70,7 +74,7 @@ func (a *awsConn) Init() error {
}
a.ocrqurl = *result.QueueUrl
- a.logger.Println("Getting analyse queue URL")
+ a.Logger.Println("Getting analyse queue URL")
result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String("rescribeanalyse"),
})
@@ -84,7 +88,7 @@ func (a *awsConn) Init() error {
return nil
}
-func (a *awsConn) CheckQueue(url string) (Qmsg, error) {
+func (a *AwsConn) CheckQueue(url string) (Qmsg, error) {
msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(1),
VisibilityTimeout: aws.Int64(HeartbeatTime * 2),
@@ -97,14 +101,14 @@ func (a *awsConn) CheckQueue(url string) (Qmsg, error) {
if len(msgResult.Messages) > 0 {
msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body}
- a.logger.Println("Message received:", msg.Body)
+ a.Logger.Println("Message received:", msg.Body)
return msg, nil
} else {
return Qmsg{}, nil
}
}
-func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {
+func (a *AwsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {
for _ = range t.C {
duration := int64(HeartbeatTime * 2)
_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
@@ -119,23 +123,23 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string)
return nil
}
-func (a *awsConn) PreQueueId() string {
+func (a *AwsConn) PreQueueId() string {
return a.prequrl
}
-func (a *awsConn) OCRQueueId() string {
+func (a *AwsConn) OCRQueueId() string {
return a.ocrqurl
}
-func (a *awsConn) AnalyseQueueId() string {
+func (a *AwsConn) AnalyseQueueId() string {
return a.analysequrl
}
-func (a *awsConn) WIPStorageId() string {
+func (a *AwsConn) WIPStorageId() string {
return a.wipstorageid
}
-func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) {
+func (a *AwsConn) ListObjects(bucket string, prefix string) ([]string, error) {
var names []string
err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
@@ -149,7 +153,7 @@ func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) {
return names, err
}
-func (a *awsConn) AddToQueue(url string, msg string) error {
+func (a *AwsConn) AddToQueue(url string, msg string) error {
_, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{
MessageBody: &msg,
QueueUrl: &url,
@@ -157,7 +161,7 @@ func (a *awsConn) AddToQueue(url string, msg string) error {
return err
}
-func (a *awsConn) DelFromQueue(url string, handle string) error {
+func (a *AwsConn) DelFromQueue(url string, handle string) error {
_, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: &url,
ReceiptHandle: &handle,
@@ -165,7 +169,7 @@ func (a *awsConn) DelFromQueue(url string, handle string) error {
return err
}
-func (a *awsConn) Download(bucket string, key string, path string) error {
+func (a *AwsConn) Download(bucket string, key string, path string) error {
f, err := os.Create(path)
if err != nil {
return err
@@ -180,7 +184,7 @@ func (a *awsConn) Download(bucket string, key string, path string) error {
return err
}
-func (a *awsConn) Upload(bucket string, key string, path string) error {
+func (a *AwsConn) Upload(bucket string, key string, path string) error {
file, err := os.Open(path)
if err != nil {
log.Fatalln("Failed to open file", path, err)
@@ -195,6 +199,6 @@ func (a *awsConn) Upload(bucket string, key string, path string) error {
return err
}
-func (a *awsConn) Logger() *log.Logger {
- return a.logger
+func (a *AwsConn) GetLogger() *log.Logger {
+ return a.Logger
}