From e5d5f4c270ae48022f2fc87cc5d65d8276de4d71 Mon Sep 17 00:00:00 2001 From: Nick White Date: Fri, 23 Aug 2019 16:37:29 +0100 Subject: Fix gaping bugs by using correct queues and downloads This has involved refactoring to make the interface simpler, and just use the URLs / IDs for the necessary queues and storage locations, rather than wrap these in functions. --- bookpipeline/main.go | 61 ++++++++++++++++++++++++++++------------------------ 1 file changed, 33 insertions(+), 28 deletions(-) (limited to 'bookpipeline/main.go') diff --git a/bookpipeline/main.go b/bookpipeline/main.go index 6a68c57..30f9cfa 100644 --- a/bookpipeline/main.go +++ b/bookpipeline/main.go @@ -11,6 +11,7 @@ import ( "os" "os/exec" "path/filepath" + "regexp" "strings" "time" @@ -56,19 +57,10 @@ type Clouder interface { type Pipeliner interface { Clouder - ListToPreprocess(bookname string) ([]string, error) - ListToOCR(bookname string) ([]string, error) - DownloadFromInProgress(key string, fn string) error - UploadToInProgress(key string, path string) error - CheckPreQueue() (Qmsg, error) - CheckOCRQueue() (Qmsg, error) - CheckAnalyseQueue() (Qmsg, error) - AddToOCRQueue(msg string) error - AddToAnalyseQueue(msg string) error - DelFromPreQueue(handle string) error - DelFromOCRQueue(handle string) error - PreQueueHeartbeat(t *time.Ticker, msgHandle string) error - OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error + PreQueueId() string + OCRQueueId() string + AnalyseQueueId() string + WIPStorageId() string Logger() *log.Logger } @@ -79,7 +71,7 @@ type Qmsg struct { func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error) { for key := range dl { fn := filepath.Join(dir, filepath.Base(key)) - err := conn.DownloadFromInProgress(key, fn) + err := conn.Download(conn.WIPStorageId(), key, fn) if err != nil { close(process) errc <- err @@ -94,7 +86,7 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha for path := range c { name := filepath.Base(path) key := filepath.Join(bookname, name) - err := conn.UploadToInProgress(key, path) + err := conn.Upload(conn.WIPStorageId(), key, path) if err != nil { errc <- err return @@ -138,11 +130,11 @@ func ocr(training string) func(chan string, chan string, chan error, *log.Logger } } -func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger)) error { +func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { bookname := msg.Body t := time.NewTicker(HeartbeatTime * time.Second) - go conn.OCRQueueHeartbeat(t, msg.Handle) + go conn.QueueHeartbeat(t, msg.Handle, fromQueue) d := filepath.Join(os.TempDir(), bookname) err := os.MkdirAll(d, 0755) @@ -163,12 +155,20 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string go up(upc, done, conn, bookname, errc) conn.Logger().Println("Getting list of objects to download") - todl, err := conn.ListToOCR(bookname) + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) if err != nil { t.Stop() _ = os.RemoveAll(d) return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err)) } + var todl []string + for _, n := range objs { + if !match.MatchString(n) { + conn.Logger().Println("Skipping item that doesn't match target", n) + continue + } + todl = append(todl, n) + } for _, a := range todl { dl <- a } @@ -183,21 +183,21 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string case <-done: } - conn.Logger().Println("Sending", bookname, "to analyse queue") - err = conn.AddToAnalyseQueue(bookname) + conn.Logger().Println("Sending", bookname, "to queue") + err = conn.AddToQueue(toQueue, bookname) if err != nil { t.Stop() _ = os.RemoveAll(d) - return errors.New(fmt.Sprintf("Error adding to analyse queue %s: %s", bookname, err)) + return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err)) } t.Stop() - conn.Logger().Println("Deleting original message from OCR queue") - err = conn.DelFromOCRQueue(msg.Handle) + conn.Logger().Println("Deleting original message from queue") + err = conn.DelFromQueue(fromQueue, msg.Handle) if err != nil { _ = os.RemoveAll(d) - return errors.New(fmt.Sprintf("Error deleting message from OCR queue: %s", err)) + return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err)) } err = os.RemoveAll(d) @@ -225,6 +225,11 @@ func main() { verboselog = log.New(n, "", log.LstdFlags) } + // TODO: match jpg too + origPattern := regexp.MustCompile(`[0-9]{4}.png$`) // TODO: match other file naming + preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) + //ocredPattern := regexp.MustCompile(`.hocr$`) + var conn Pipeliner conn = &awsConn{region: "eu-west-2", logger: verboselog} @@ -243,7 +248,7 @@ func main() { for { select { case <-checkPreQueue: - msg, err := conn.CheckPreQueue() + msg, err := conn.CheckQueue(conn.PreQueueId()) checkPreQueue = time.After(PauseBetweenChecks) if err != nil { log.Println("Error checking preprocess queue", err) @@ -253,12 +258,12 @@ func main() { verboselog.Println("No message received on preprocess queue, sleeping") continue } - err = processBook(msg, conn, preprocess) + err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId()) if err != nil { log.Println("Error during preprocess", err) } case <-checkOCRQueue: - msg, err := conn.CheckOCRQueue() + msg, err := conn.CheckQueue(conn.OCRQueueId()) checkOCRQueue = time.After(PauseBetweenChecks) if err != nil { log.Println("Error checking OCR queue", err) @@ -268,7 +273,7 @@ func main() { verboselog.Println("No message received on OCR queue, sleeping") continue } - err = processBook(msg, conn, ocr(*training)) + err = processBook(msg, conn, ocr(*training), preprocessedPattern, conn.OCRQueueId(), conn.AnalyseQueueId()) if err != nil { log.Println("Error during OCR process", err) } -- cgit v1.2.1-24-ge1ad