From e5d5f4c270ae48022f2fc87cc5d65d8276de4d71 Mon Sep 17 00:00:00 2001 From: Nick White Date: Fri, 23 Aug 2019 16:37:29 +0100 Subject: Fix gaping bugs by using correct queues and downloads This has involved refactoring to make the interface simpler, and just use the URLs / IDs for the necessary queues and storage locations, rather than wrap these in functions. --- bookpipeline/aws.go | 99 ++++++++-------------------------------------------- bookpipeline/main.go | 61 +++++++++++++++++--------------- 2 files changed, 48 insertions(+), 112 deletions(-) diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index 2322ea2..761031d 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -5,7 +5,6 @@ import ( "fmt" "log" "os" - "regexp" "time" "github.com/aws/aws-sdk-go/aws" @@ -30,6 +29,7 @@ type awsConn struct { downloader *s3manager.Downloader uploader *s3manager.Uploader prequrl, ocrqurl, analysequrl string + wipstorageid string } func (a *awsConn) Init() error { @@ -79,6 +79,8 @@ func (a *awsConn) Init() error { } a.analysequrl = *result.QueueUrl + a.wipstorageid = "rescribeinprogress" + return nil } @@ -102,21 +104,6 @@ func (a *awsConn) CheckQueue(url string) (Qmsg, error) { } } -func (a *awsConn) CheckPreQueue() (Qmsg, error) { - a.logger.Println("Checking preprocessing queue for new messages") - return a.CheckQueue(a.prequrl) -} - -func (a *awsConn) CheckOCRQueue() (Qmsg, error) { - a.logger.Println("Checking OCR queue for new messages") - return a.CheckQueue(a.ocrqurl) -} - -func (a *awsConn) CheckAnalyseQueue() (Qmsg, error) { - a.logger.Println("Checking analyse queue for new messages") - return a.CheckQueue(a.ocrqurl) -} - func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { for _ = range t.C { duration := int64(HeartbeatTime * 2) @@ -132,14 +119,20 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) return nil } -func (a *awsConn) PreQueueHeartbeat(t *time.Ticker, msgHandle string) error { - a.logger.Println("Starting preprocess queue heartbeat") - return a.QueueHeartbeat(t, msgHandle, a.prequrl) +func (a *awsConn) PreQueueId() string { + return a.prequrl } -func (a *awsConn) OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error { - a.logger.Println("Starting ocr queue heartbeat") - return a.QueueHeartbeat(t, msgHandle, a.ocrqurl) +func (a *awsConn) OCRQueueId() string { + return a.ocrqurl +} + +func (a *awsConn) AnalyseQueueId() string { + return a.analysequrl +} + +func (a *awsConn) WIPStorageId() string { + return a.wipstorageid } func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) { @@ -156,42 +149,6 @@ func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) { return names, err } -func (a *awsConn) ListToPreprocess(bookname string) ([]string, error) { - var names []string - preprocessed := regexp.MustCompile(PreprocPattern) - objs, err := a.ListObjects("rescribeinprogress", bookname) - if err != nil { - return names, err - } - // Filter out any object that looks like it's already been preprocessed - for _, n := range objs { - if preprocessed.MatchString(n) { - a.logger.Println("Skipping item that looks like it has already been processed", n) - continue - } - names = append(names, n) - } - return names, nil -} - -func (a *awsConn) ListToOCR(bookname string) ([]string, error) { - var names []string - preprocessed := regexp.MustCompile(PreprocPattern) - objs, err := a.ListObjects("rescribeinprogress", bookname) - if err != nil { - return names, err - } - // Filter out any object that looks like it hasn't already been preprocessed - for _, n := range objs { - if !preprocessed.MatchString(n) { - a.logger.Println("Skipping item that looks like it is not preprocessed", n) - continue - } - names = append(names, n) - } - return names, nil -} - func (a *awsConn) AddToQueue(url string, msg string) error { _, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{ MessageBody: &msg, @@ -200,14 +157,6 @@ func (a *awsConn) AddToQueue(url string, msg string) error { return err } -func (a *awsConn) AddToOCRQueue(msg string) error { - return a.AddToQueue(a.ocrqurl, msg) -} - -func (a *awsConn) AddToAnalyseQueue(msg string) error { - return a.AddToQueue(a.analysequrl, msg) -} - func (a *awsConn) DelFromQueue(url string, handle string) error { _, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ QueueUrl: &url, @@ -216,14 +165,6 @@ func (a *awsConn) DelFromQueue(url string, handle string) error { return err } -func (a *awsConn) DelFromPreQueue(handle string) error { - return a.DelFromQueue(a.prequrl, handle) -} - -func (a *awsConn) DelFromOCRQueue(handle string) error { - return a.DelFromQueue(a.ocrqurl, handle) -} - func (a *awsConn) Download(bucket string, key string, path string) error { f, err := os.Create(path) if err != nil { @@ -239,11 +180,6 @@ func (a *awsConn) Download(bucket string, key string, path string) error { return err } -func (a *awsConn) DownloadFromInProgress(key string, path string) error { - a.logger.Println("Downloading", key) - return a.Download("rescribeinprogress", key, path) -} - func (a *awsConn) Upload(bucket string, key string, path string) error { file, err := os.Open(path) if err != nil { @@ -259,11 +195,6 @@ func (a *awsConn) Upload(bucket string, key string, path string) error { return err } -func (a *awsConn) UploadToInProgress(key string, path string) error { - a.logger.Println("Uploading", path) - return a.Upload("rescribeinprogress", key, path) -} - func (a *awsConn) Logger() *log.Logger { return a.logger } diff --git a/bookpipeline/main.go b/bookpipeline/main.go index 6a68c57..30f9cfa 100644 --- a/bookpipeline/main.go +++ b/bookpipeline/main.go @@ -11,6 +11,7 @@ import ( "os" "os/exec" "path/filepath" + "regexp" "strings" "time" @@ -56,19 +57,10 @@ type Clouder interface { type Pipeliner interface { Clouder - ListToPreprocess(bookname string) ([]string, error) - ListToOCR(bookname string) ([]string, error) - DownloadFromInProgress(key string, fn string) error - UploadToInProgress(key string, path string) error - CheckPreQueue() (Qmsg, error) - CheckOCRQueue() (Qmsg, error) - CheckAnalyseQueue() (Qmsg, error) - AddToOCRQueue(msg string) error - AddToAnalyseQueue(msg string) error - DelFromPreQueue(handle string) error - DelFromOCRQueue(handle string) error - PreQueueHeartbeat(t *time.Ticker, msgHandle string) error - OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error + PreQueueId() string + OCRQueueId() string + AnalyseQueueId() string + WIPStorageId() string Logger() *log.Logger } @@ -79,7 +71,7 @@ type Qmsg struct { func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error) { for key := range dl { fn := filepath.Join(dir, filepath.Base(key)) - err := conn.DownloadFromInProgress(key, fn) + err := conn.Download(conn.WIPStorageId(), key, fn) if err != nil { close(process) errc <- err @@ -94,7 +86,7 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha for path := range c { name := filepath.Base(path) key := filepath.Join(bookname, name) - err := conn.UploadToInProgress(key, path) + err := conn.Upload(conn.WIPStorageId(), key, path) if err != nil { errc <- err return @@ -138,11 +130,11 @@ func ocr(training string) func(chan string, chan string, chan error, *log.Logger } } -func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger)) error { +func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { bookname := msg.Body t := time.NewTicker(HeartbeatTime * time.Second) - go conn.OCRQueueHeartbeat(t, msg.Handle) + go conn.QueueHeartbeat(t, msg.Handle, fromQueue) d := filepath.Join(os.TempDir(), bookname) err := os.MkdirAll(d, 0755) @@ -163,12 +155,20 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string go up(upc, done, conn, bookname, errc) conn.Logger().Println("Getting list of objects to download") - todl, err := conn.ListToOCR(bookname) + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) if err != nil { t.Stop() _ = os.RemoveAll(d) return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err)) } + var todl []string + for _, n := range objs { + if !match.MatchString(n) { + conn.Logger().Println("Skipping item that doesn't match target", n) + continue + } + todl = append(todl, n) + } for _, a := range todl { dl <- a } @@ -183,21 +183,21 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string case <-done: } - conn.Logger().Println("Sending", bookname, "to analyse queue") - err = conn.AddToAnalyseQueue(bookname) + conn.Logger().Println("Sending", bookname, "to queue") + err = conn.AddToQueue(toQueue, bookname) if err != nil { t.Stop() _ = os.RemoveAll(d) - return errors.New(fmt.Sprintf("Error adding to analyse queue %s: %s", bookname, err)) + return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err)) } t.Stop() - conn.Logger().Println("Deleting original message from OCR queue") - err = conn.DelFromOCRQueue(msg.Handle) + conn.Logger().Println("Deleting original message from queue") + err = conn.DelFromQueue(fromQueue, msg.Handle) if err != nil { _ = os.RemoveAll(d) - return errors.New(fmt.Sprintf("Error deleting message from OCR queue: %s", err)) + return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err)) } err = os.RemoveAll(d) @@ -225,6 +225,11 @@ func main() { verboselog = log.New(n, "", log.LstdFlags) } + // TODO: match jpg too + origPattern := regexp.MustCompile(`[0-9]{4}.png$`) // TODO: match other file naming + preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) + //ocredPattern := regexp.MustCompile(`.hocr$`) + var conn Pipeliner conn = &awsConn{region: "eu-west-2", logger: verboselog} @@ -243,7 +248,7 @@ func main() { for { select { case <-checkPreQueue: - msg, err := conn.CheckPreQueue() + msg, err := conn.CheckQueue(conn.PreQueueId()) checkPreQueue = time.After(PauseBetweenChecks) if err != nil { log.Println("Error checking preprocess queue", err) @@ -253,12 +258,12 @@ func main() { verboselog.Println("No message received on preprocess queue, sleeping") continue } - err = processBook(msg, conn, preprocess) + err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId()) if err != nil { log.Println("Error during preprocess", err) } case <-checkOCRQueue: - msg, err := conn.CheckOCRQueue() + msg, err := conn.CheckQueue(conn.OCRQueueId()) checkOCRQueue = time.After(PauseBetweenChecks) if err != nil { log.Println("Error checking OCR queue", err) @@ -268,7 +273,7 @@ func main() { verboselog.Println("No message received on OCR queue, sleeping") continue } - err = processBook(msg, conn, ocr(*training)) + err = processBook(msg, conn, ocr(*training), preprocessedPattern, conn.OCRQueueId(), conn.AnalyseQueueId()) if err != nil { log.Println("Error during OCR process", err) } -- cgit v1.2.1-24-ge1ad