diff options
Diffstat (limited to 'bookpipeline/aws.go')
-rw-r--r-- | bookpipeline/aws.go | 56 |
1 files changed, 30 insertions, 26 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index 761031d..7409434 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -1,4 +1,4 @@ -package main +package bookpipeline import ( "errors" @@ -17,10 +17,14 @@ import ( const PreprocPattern = `_bin[0-9].[0-9].png` const HeartbeatTime = 60 -type awsConn struct { +type Qmsg struct { + Handle, Body string +} + +type AwsConn struct { // these need to be set before running Init() - region string - logger *log.Logger + Region string + Logger *log.Logger // these are used internally sess *session.Session @@ -32,17 +36,17 @@ type awsConn struct { wipstorageid string } -func (a *awsConn) Init() error { - if a.region == "" { - return errors.New("No region set") +func (a *AwsConn) Init() error { + if a.Region == "" { + return errors.New("No Region set") } - if a.logger == nil { + if a.Logger == nil { return errors.New("No logger set") } var err error a.sess, err = session.NewSession(&aws.Config{ - Region: aws.String(a.region), + Region: aws.String(a.Region), }) if err != nil { return errors.New(fmt.Sprintf("Failed to set up aws session: %s", err)) @@ -52,7 +56,7 @@ func (a *awsConn) Init() error { a.downloader = s3manager.NewDownloader(a.sess) a.uploader = s3manager.NewUploader(a.sess) - a.logger.Println("Getting preprocess queue URL") + a.Logger.Println("Getting preprocess queue URL") result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String("rescribepreprocess"), }) @@ -61,7 +65,7 @@ func (a *awsConn) Init() error { } a.prequrl = *result.QueueUrl - a.logger.Println("Getting OCR queue URL") + a.Logger.Println("Getting OCR queue URL") result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String("rescribeocr"), }) @@ -70,7 +74,7 @@ func (a *awsConn) Init() error { } a.ocrqurl = *result.QueueUrl - a.logger.Println("Getting analyse queue URL") + a.Logger.Println("Getting analyse queue URL") result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String("rescribeanalyse"), }) @@ -84,7 +88,7 @@ func (a *awsConn) Init() error { return nil } -func (a *awsConn) CheckQueue(url string) (Qmsg, error) { +func (a *AwsConn) CheckQueue(url string) (Qmsg, error) { msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ MaxNumberOfMessages: aws.Int64(1), VisibilityTimeout: aws.Int64(HeartbeatTime * 2), @@ -97,14 +101,14 @@ func (a *awsConn) CheckQueue(url string) (Qmsg, error) { if len(msgResult.Messages) > 0 { msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body} - a.logger.Println("Message received:", msg.Body) + a.Logger.Println("Message received:", msg.Body) return msg, nil } else { return Qmsg{}, nil } } -func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { +func (a *AwsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { for _ = range t.C { duration := int64(HeartbeatTime * 2) _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ @@ -119,23 +123,23 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) return nil } -func (a *awsConn) PreQueueId() string { +func (a *AwsConn) PreQueueId() string { return a.prequrl } -func (a *awsConn) OCRQueueId() string { +func (a *AwsConn) OCRQueueId() string { return a.ocrqurl } -func (a *awsConn) AnalyseQueueId() string { +func (a *AwsConn) AnalyseQueueId() string { return a.analysequrl } -func (a *awsConn) WIPStorageId() string { +func (a *AwsConn) WIPStorageId() string { return a.wipstorageid } -func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) { +func (a *AwsConn) ListObjects(bucket string, prefix string) ([]string, error) { var names []string err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ Bucket: aws.String(bucket), @@ -149,7 +153,7 @@ func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) { return names, err } -func (a *awsConn) AddToQueue(url string, msg string) error { +func (a *AwsConn) AddToQueue(url string, msg string) error { _, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{ MessageBody: &msg, QueueUrl: &url, @@ -157,7 +161,7 @@ func (a *awsConn) AddToQueue(url string, msg string) error { return err } -func (a *awsConn) DelFromQueue(url string, handle string) error { +func (a *AwsConn) DelFromQueue(url string, handle string) error { _, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ QueueUrl: &url, ReceiptHandle: &handle, @@ -165,7 +169,7 @@ func (a *awsConn) DelFromQueue(url string, handle string) error { return err } -func (a *awsConn) Download(bucket string, key string, path string) error { +func (a *AwsConn) Download(bucket string, key string, path string) error { f, err := os.Create(path) if err != nil { return err @@ -180,7 +184,7 @@ func (a *awsConn) Download(bucket string, key string, path string) error { return err } -func (a *awsConn) Upload(bucket string, key string, path string) error { +func (a *AwsConn) Upload(bucket string, key string, path string) error { file, err := os.Open(path) if err != nil { log.Fatalln("Failed to open file", path, err) @@ -195,6 +199,6 @@ func (a *awsConn) Upload(bucket string, key string, path string) error { return err } -func (a *awsConn) Logger() *log.Logger { - return a.logger +func (a *AwsConn) GetLogger() *log.Logger { + return a.Logger } |