summaryrefslogtreecommitdiff
path: root/bookpipeline/aws.go
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-08-23 16:37:29 +0100
committerNick White <git@njw.name>2019-08-23 16:37:29 +0100
commite5d5f4c270ae48022f2fc87cc5d65d8276de4d71 (patch)
treece813918b0d693cfd38962fd9f0c0ca57655bf88 /bookpipeline/aws.go
parent8d47a1baed954cf46cd57afcb5b4ef54c774bff7 (diff)
Fix gaping bugs by using correct queues and downloads
This has involved refactoring to make the interface simpler, and just use the URLs / IDs for the necessary queues and storage locations, rather than wrap these in functions.
Diffstat (limited to 'bookpipeline/aws.go')
-rw-r--r--bookpipeline/aws.go99
1 files changed, 15 insertions, 84 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index 2322ea2..761031d 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -5,7 +5,6 @@ import (
"fmt"
"log"
"os"
- "regexp"
"time"
"github.com/aws/aws-sdk-go/aws"
@@ -30,6 +29,7 @@ type awsConn struct {
downloader *s3manager.Downloader
uploader *s3manager.Uploader
prequrl, ocrqurl, analysequrl string
+ wipstorageid string
}
func (a *awsConn) Init() error {
@@ -79,6 +79,8 @@ func (a *awsConn) Init() error {
}
a.analysequrl = *result.QueueUrl
+ a.wipstorageid = "rescribeinprogress"
+
return nil
}
@@ -102,21 +104,6 @@ func (a *awsConn) CheckQueue(url string) (Qmsg, error) {
}
}
-func (a *awsConn) CheckPreQueue() (Qmsg, error) {
- a.logger.Println("Checking preprocessing queue for new messages")
- return a.CheckQueue(a.prequrl)
-}
-
-func (a *awsConn) CheckOCRQueue() (Qmsg, error) {
- a.logger.Println("Checking OCR queue for new messages")
- return a.CheckQueue(a.ocrqurl)
-}
-
-func (a *awsConn) CheckAnalyseQueue() (Qmsg, error) {
- a.logger.Println("Checking analyse queue for new messages")
- return a.CheckQueue(a.ocrqurl)
-}
-
func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {
for _ = range t.C {
duration := int64(HeartbeatTime * 2)
@@ -132,14 +119,20 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string)
return nil
}
-func (a *awsConn) PreQueueHeartbeat(t *time.Ticker, msgHandle string) error {
- a.logger.Println("Starting preprocess queue heartbeat")
- return a.QueueHeartbeat(t, msgHandle, a.prequrl)
+func (a *awsConn) PreQueueId() string {
+ return a.prequrl
}
-func (a *awsConn) OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error {
- a.logger.Println("Starting ocr queue heartbeat")
- return a.QueueHeartbeat(t, msgHandle, a.ocrqurl)
+func (a *awsConn) OCRQueueId() string {
+ return a.ocrqurl
+}
+
+func (a *awsConn) AnalyseQueueId() string {
+ return a.analysequrl
+}
+
+func (a *awsConn) WIPStorageId() string {
+ return a.wipstorageid
}
func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) {
@@ -156,42 +149,6 @@ func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) {
return names, err
}
-func (a *awsConn) ListToPreprocess(bookname string) ([]string, error) {
- var names []string
- preprocessed := regexp.MustCompile(PreprocPattern)
- objs, err := a.ListObjects("rescribeinprogress", bookname)
- if err != nil {
- return names, err
- }
- // Filter out any object that looks like it's already been preprocessed
- for _, n := range objs {
- if preprocessed.MatchString(n) {
- a.logger.Println("Skipping item that looks like it has already been processed", n)
- continue
- }
- names = append(names, n)
- }
- return names, nil
-}
-
-func (a *awsConn) ListToOCR(bookname string) ([]string, error) {
- var names []string
- preprocessed := regexp.MustCompile(PreprocPattern)
- objs, err := a.ListObjects("rescribeinprogress", bookname)
- if err != nil {
- return names, err
- }
- // Filter out any object that looks like it hasn't already been preprocessed
- for _, n := range objs {
- if !preprocessed.MatchString(n) {
- a.logger.Println("Skipping item that looks like it is not preprocessed", n)
- continue
- }
- names = append(names, n)
- }
- return names, nil
-}
-
func (a *awsConn) AddToQueue(url string, msg string) error {
_, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{
MessageBody: &msg,
@@ -200,14 +157,6 @@ func (a *awsConn) AddToQueue(url string, msg string) error {
return err
}
-func (a *awsConn) AddToOCRQueue(msg string) error {
- return a.AddToQueue(a.ocrqurl, msg)
-}
-
-func (a *awsConn) AddToAnalyseQueue(msg string) error {
- return a.AddToQueue(a.analysequrl, msg)
-}
-
func (a *awsConn) DelFromQueue(url string, handle string) error {
_, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: &url,
@@ -216,14 +165,6 @@ func (a *awsConn) DelFromQueue(url string, handle string) error {
return err
}
-func (a *awsConn) DelFromPreQueue(handle string) error {
- return a.DelFromQueue(a.prequrl, handle)
-}
-
-func (a *awsConn) DelFromOCRQueue(handle string) error {
- return a.DelFromQueue(a.ocrqurl, handle)
-}
-
func (a *awsConn) Download(bucket string, key string, path string) error {
f, err := os.Create(path)
if err != nil {
@@ -239,11 +180,6 @@ func (a *awsConn) Download(bucket string, key string, path string) error {
return err
}
-func (a *awsConn) DownloadFromInProgress(key string, path string) error {
- a.logger.Println("Downloading", key)
- return a.Download("rescribeinprogress", key, path)
-}
-
func (a *awsConn) Upload(bucket string, key string, path string) error {
file, err := os.Open(path)
if err != nil {
@@ -259,11 +195,6 @@ func (a *awsConn) Upload(bucket string, key string, path string) error {
return err
}
-func (a *awsConn) UploadToInProgress(key string, path string) error {
- a.logger.Println("Uploading", path)
- return a.Upload("rescribeinprogress", key, path)
-}
-
func (a *awsConn) Logger() *log.Logger {
return a.logger
}