From e5d5f4c270ae48022f2fc87cc5d65d8276de4d71 Mon Sep 17 00:00:00 2001 From: Nick White Date: Fri, 23 Aug 2019 16:37:29 +0100 Subject: Fix gaping bugs by using correct queues and downloads This has involved refactoring to make the interface simpler, and just use the URLs / IDs for the necessary queues and storage locations, rather than wrap these in functions. --- bookpipeline/aws.go | 99 ++++++++--------------------------------------------- 1 file changed, 15 insertions(+), 84 deletions(-) (limited to 'bookpipeline/aws.go') diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index 2322ea2..761031d 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -5,7 +5,6 @@ import ( "fmt" "log" "os" - "regexp" "time" "github.com/aws/aws-sdk-go/aws" @@ -30,6 +29,7 @@ type awsConn struct { downloader *s3manager.Downloader uploader *s3manager.Uploader prequrl, ocrqurl, analysequrl string + wipstorageid string } func (a *awsConn) Init() error { @@ -79,6 +79,8 @@ func (a *awsConn) Init() error { } a.analysequrl = *result.QueueUrl + a.wipstorageid = "rescribeinprogress" + return nil } @@ -102,21 +104,6 @@ func (a *awsConn) CheckQueue(url string) (Qmsg, error) { } } -func (a *awsConn) CheckPreQueue() (Qmsg, error) { - a.logger.Println("Checking preprocessing queue for new messages") - return a.CheckQueue(a.prequrl) -} - -func (a *awsConn) CheckOCRQueue() (Qmsg, error) { - a.logger.Println("Checking OCR queue for new messages") - return a.CheckQueue(a.ocrqurl) -} - -func (a *awsConn) CheckAnalyseQueue() (Qmsg, error) { - a.logger.Println("Checking analyse queue for new messages") - return a.CheckQueue(a.ocrqurl) -} - func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { for _ = range t.C { duration := int64(HeartbeatTime * 2) @@ -132,14 +119,20 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) return nil } -func (a *awsConn) PreQueueHeartbeat(t *time.Ticker, msgHandle string) error { - a.logger.Println("Starting preprocess queue heartbeat") - return a.QueueHeartbeat(t, msgHandle, a.prequrl) +func (a *awsConn) PreQueueId() string { + return a.prequrl } -func (a *awsConn) OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error { - a.logger.Println("Starting ocr queue heartbeat") - return a.QueueHeartbeat(t, msgHandle, a.ocrqurl) +func (a *awsConn) OCRQueueId() string { + return a.ocrqurl +} + +func (a *awsConn) AnalyseQueueId() string { + return a.analysequrl +} + +func (a *awsConn) WIPStorageId() string { + return a.wipstorageid } func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) { @@ -156,42 +149,6 @@ func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) { return names, err } -func (a *awsConn) ListToPreprocess(bookname string) ([]string, error) { - var names []string - preprocessed := regexp.MustCompile(PreprocPattern) - objs, err := a.ListObjects("rescribeinprogress", bookname) - if err != nil { - return names, err - } - // Filter out any object that looks like it's already been preprocessed - for _, n := range objs { - if preprocessed.MatchString(n) { - a.logger.Println("Skipping item that looks like it has already been processed", n) - continue - } - names = append(names, n) - } - return names, nil -} - -func (a *awsConn) ListToOCR(bookname string) ([]string, error) { - var names []string - preprocessed := regexp.MustCompile(PreprocPattern) - objs, err := a.ListObjects("rescribeinprogress", bookname) - if err != nil { - return names, err - } - // Filter out any object that looks like it hasn't already been preprocessed - for _, n := range objs { - if !preprocessed.MatchString(n) { - a.logger.Println("Skipping item that looks like it is not preprocessed", n) - continue - } - names = append(names, n) - } - return names, nil -} - func (a *awsConn) AddToQueue(url string, msg string) error { _, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{ MessageBody: &msg, @@ -200,14 +157,6 @@ func (a *awsConn) AddToQueue(url string, msg string) error { return err } -func (a *awsConn) AddToOCRQueue(msg string) error { - return a.AddToQueue(a.ocrqurl, msg) -} - -func (a *awsConn) AddToAnalyseQueue(msg string) error { - return a.AddToQueue(a.analysequrl, msg) -} - func (a *awsConn) DelFromQueue(url string, handle string) error { _, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ QueueUrl: &url, @@ -216,14 +165,6 @@ func (a *awsConn) DelFromQueue(url string, handle string) error { return err } -func (a *awsConn) DelFromPreQueue(handle string) error { - return a.DelFromQueue(a.prequrl, handle) -} - -func (a *awsConn) DelFromOCRQueue(handle string) error { - return a.DelFromQueue(a.ocrqurl, handle) -} - func (a *awsConn) Download(bucket string, key string, path string) error { f, err := os.Create(path) if err != nil { @@ -239,11 +180,6 @@ func (a *awsConn) Download(bucket string, key string, path string) error { return err } -func (a *awsConn) DownloadFromInProgress(key string, path string) error { - a.logger.Println("Downloading", key) - return a.Download("rescribeinprogress", key, path) -} - func (a *awsConn) Upload(bucket string, key string, path string) error { file, err := os.Open(path) if err != nil { @@ -259,11 +195,6 @@ func (a *awsConn) Upload(bucket string, key string, path string) error { return err } -func (a *awsConn) UploadToInProgress(key string, path string) error { - a.logger.Println("Uploading", path) - return a.Upload("rescribeinprogress", key, path) -} - func (a *awsConn) Logger() *log.Logger { return a.logger } -- cgit v1.2.1-24-ge1ad