summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-08-23 16:37:29 +0100
committerNick White <git@njw.name>2019-08-23 16:37:29 +0100
commite5d5f4c270ae48022f2fc87cc5d65d8276de4d71 (patch)
treece813918b0d693cfd38962fd9f0c0ca57655bf88
parent8d47a1baed954cf46cd57afcb5b4ef54c774bff7 (diff)
Fix gaping bugs by using correct queues and downloads
This has involved refactoring to make the interface simpler, and just use the URLs / IDs for the necessary queues and storage locations, rather than wrap these in functions.
-rw-r--r--bookpipeline/aws.go99
-rw-r--r--bookpipeline/main.go61
2 files changed, 48 insertions, 112 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index 2322ea2..761031d 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -5,7 +5,6 @@ import (
"fmt"
"log"
"os"
- "regexp"
"time"
"github.com/aws/aws-sdk-go/aws"
@@ -30,6 +29,7 @@ type awsConn struct {
downloader *s3manager.Downloader
uploader *s3manager.Uploader
prequrl, ocrqurl, analysequrl string
+ wipstorageid string
}
func (a *awsConn) Init() error {
@@ -79,6 +79,8 @@ func (a *awsConn) Init() error {
}
a.analysequrl = *result.QueueUrl
+ a.wipstorageid = "rescribeinprogress"
+
return nil
}
@@ -102,21 +104,6 @@ func (a *awsConn) CheckQueue(url string) (Qmsg, error) {
}
}
-func (a *awsConn) CheckPreQueue() (Qmsg, error) {
- a.logger.Println("Checking preprocessing queue for new messages")
- return a.CheckQueue(a.prequrl)
-}
-
-func (a *awsConn) CheckOCRQueue() (Qmsg, error) {
- a.logger.Println("Checking OCR queue for new messages")
- return a.CheckQueue(a.ocrqurl)
-}
-
-func (a *awsConn) CheckAnalyseQueue() (Qmsg, error) {
- a.logger.Println("Checking analyse queue for new messages")
- return a.CheckQueue(a.ocrqurl)
-}
-
func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {
for _ = range t.C {
duration := int64(HeartbeatTime * 2)
@@ -132,14 +119,20 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string)
return nil
}
-func (a *awsConn) PreQueueHeartbeat(t *time.Ticker, msgHandle string) error {
- a.logger.Println("Starting preprocess queue heartbeat")
- return a.QueueHeartbeat(t, msgHandle, a.prequrl)
+func (a *awsConn) PreQueueId() string {
+ return a.prequrl
}
-func (a *awsConn) OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error {
- a.logger.Println("Starting ocr queue heartbeat")
- return a.QueueHeartbeat(t, msgHandle, a.ocrqurl)
+func (a *awsConn) OCRQueueId() string {
+ return a.ocrqurl
+}
+
+func (a *awsConn) AnalyseQueueId() string {
+ return a.analysequrl
+}
+
+func (a *awsConn) WIPStorageId() string {
+ return a.wipstorageid
}
func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) {
@@ -156,42 +149,6 @@ func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) {
return names, err
}
-func (a *awsConn) ListToPreprocess(bookname string) ([]string, error) {
- var names []string
- preprocessed := regexp.MustCompile(PreprocPattern)
- objs, err := a.ListObjects("rescribeinprogress", bookname)
- if err != nil {
- return names, err
- }
- // Filter out any object that looks like it's already been preprocessed
- for _, n := range objs {
- if preprocessed.MatchString(n) {
- a.logger.Println("Skipping item that looks like it has already been processed", n)
- continue
- }
- names = append(names, n)
- }
- return names, nil
-}
-
-func (a *awsConn) ListToOCR(bookname string) ([]string, error) {
- var names []string
- preprocessed := regexp.MustCompile(PreprocPattern)
- objs, err := a.ListObjects("rescribeinprogress", bookname)
- if err != nil {
- return names, err
- }
- // Filter out any object that looks like it hasn't already been preprocessed
- for _, n := range objs {
- if !preprocessed.MatchString(n) {
- a.logger.Println("Skipping item that looks like it is not preprocessed", n)
- continue
- }
- names = append(names, n)
- }
- return names, nil
-}
-
func (a *awsConn) AddToQueue(url string, msg string) error {
_, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{
MessageBody: &msg,
@@ -200,14 +157,6 @@ func (a *awsConn) AddToQueue(url string, msg string) error {
return err
}
-func (a *awsConn) AddToOCRQueue(msg string) error {
- return a.AddToQueue(a.ocrqurl, msg)
-}
-
-func (a *awsConn) AddToAnalyseQueue(msg string) error {
- return a.AddToQueue(a.analysequrl, msg)
-}
-
func (a *awsConn) DelFromQueue(url string, handle string) error {
_, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: &url,
@@ -216,14 +165,6 @@ func (a *awsConn) DelFromQueue(url string, handle string) error {
return err
}
-func (a *awsConn) DelFromPreQueue(handle string) error {
- return a.DelFromQueue(a.prequrl, handle)
-}
-
-func (a *awsConn) DelFromOCRQueue(handle string) error {
- return a.DelFromQueue(a.ocrqurl, handle)
-}
-
func (a *awsConn) Download(bucket string, key string, path string) error {
f, err := os.Create(path)
if err != nil {
@@ -239,11 +180,6 @@ func (a *awsConn) Download(bucket string, key string, path string) error {
return err
}
-func (a *awsConn) DownloadFromInProgress(key string, path string) error {
- a.logger.Println("Downloading", key)
- return a.Download("rescribeinprogress", key, path)
-}
-
func (a *awsConn) Upload(bucket string, key string, path string) error {
file, err := os.Open(path)
if err != nil {
@@ -259,11 +195,6 @@ func (a *awsConn) Upload(bucket string, key string, path string) error {
return err
}
-func (a *awsConn) UploadToInProgress(key string, path string) error {
- a.logger.Println("Uploading", path)
- return a.Upload("rescribeinprogress", key, path)
-}
-
func (a *awsConn) Logger() *log.Logger {
return a.logger
}
diff --git a/bookpipeline/main.go b/bookpipeline/main.go
index 6a68c57..30f9cfa 100644
--- a/bookpipeline/main.go
+++ b/bookpipeline/main.go
@@ -11,6 +11,7 @@ import (
"os"
"os/exec"
"path/filepath"
+ "regexp"
"strings"
"time"
@@ -56,19 +57,10 @@ type Clouder interface {
type Pipeliner interface {
Clouder
- ListToPreprocess(bookname string) ([]string, error)
- ListToOCR(bookname string) ([]string, error)
- DownloadFromInProgress(key string, fn string) error
- UploadToInProgress(key string, path string) error
- CheckPreQueue() (Qmsg, error)
- CheckOCRQueue() (Qmsg, error)
- CheckAnalyseQueue() (Qmsg, error)
- AddToOCRQueue(msg string) error
- AddToAnalyseQueue(msg string) error
- DelFromPreQueue(handle string) error
- DelFromOCRQueue(handle string) error
- PreQueueHeartbeat(t *time.Ticker, msgHandle string) error
- OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error
+ PreQueueId() string
+ OCRQueueId() string
+ AnalyseQueueId() string
+ WIPStorageId() string
Logger() *log.Logger
}
@@ -79,7 +71,7 @@ type Qmsg struct {
func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error) {
for key := range dl {
fn := filepath.Join(dir, filepath.Base(key))
- err := conn.DownloadFromInProgress(key, fn)
+ err := conn.Download(conn.WIPStorageId(), key, fn)
if err != nil {
close(process)
errc <- err
@@ -94,7 +86,7 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha
for path := range c {
name := filepath.Base(path)
key := filepath.Join(bookname, name)
- err := conn.UploadToInProgress(key, path)
+ err := conn.Upload(conn.WIPStorageId(), key, path)
if err != nil {
errc <- err
return
@@ -138,11 +130,11 @@ func ocr(training string) func(chan string, chan string, chan error, *log.Logger
}
}
-func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger)) error {
+func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
bookname := msg.Body
t := time.NewTicker(HeartbeatTime * time.Second)
- go conn.OCRQueueHeartbeat(t, msg.Handle)
+ go conn.QueueHeartbeat(t, msg.Handle, fromQueue)
d := filepath.Join(os.TempDir(), bookname)
err := os.MkdirAll(d, 0755)
@@ -163,12 +155,20 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string
go up(upc, done, conn, bookname, errc)
conn.Logger().Println("Getting list of objects to download")
- todl, err := conn.ListToOCR(bookname)
+ objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
if err != nil {
t.Stop()
_ = os.RemoveAll(d)
return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err))
}
+ var todl []string
+ for _, n := range objs {
+ if !match.MatchString(n) {
+ conn.Logger().Println("Skipping item that doesn't match target", n)
+ continue
+ }
+ todl = append(todl, n)
+ }
for _, a := range todl {
dl <- a
}
@@ -183,21 +183,21 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string
case <-done:
}
- conn.Logger().Println("Sending", bookname, "to analyse queue")
- err = conn.AddToAnalyseQueue(bookname)
+ conn.Logger().Println("Sending", bookname, "to queue")
+ err = conn.AddToQueue(toQueue, bookname)
if err != nil {
t.Stop()
_ = os.RemoveAll(d)
- return errors.New(fmt.Sprintf("Error adding to analyse queue %s: %s", bookname, err))
+ return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err))
}
t.Stop()
- conn.Logger().Println("Deleting original message from OCR queue")
- err = conn.DelFromOCRQueue(msg.Handle)
+ conn.Logger().Println("Deleting original message from queue")
+ err = conn.DelFromQueue(fromQueue, msg.Handle)
if err != nil {
_ = os.RemoveAll(d)
- return errors.New(fmt.Sprintf("Error deleting message from OCR queue: %s", err))
+ return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err))
}
err = os.RemoveAll(d)
@@ -225,6 +225,11 @@ func main() {
verboselog = log.New(n, "", log.LstdFlags)
}
+ // TODO: match jpg too
+ origPattern := regexp.MustCompile(`[0-9]{4}.png$`) // TODO: match other file naming
+ preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`)
+ //ocredPattern := regexp.MustCompile(`.hocr$`)
+
var conn Pipeliner
conn = &awsConn{region: "eu-west-2", logger: verboselog}
@@ -243,7 +248,7 @@ func main() {
for {
select {
case <-checkPreQueue:
- msg, err := conn.CheckPreQueue()
+ msg, err := conn.CheckQueue(conn.PreQueueId())
checkPreQueue = time.After(PauseBetweenChecks)
if err != nil {
log.Println("Error checking preprocess queue", err)
@@ -253,12 +258,12 @@ func main() {
verboselog.Println("No message received on preprocess queue, sleeping")
continue
}
- err = processBook(msg, conn, preprocess)
+ err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId())
if err != nil {
log.Println("Error during preprocess", err)
}
case <-checkOCRQueue:
- msg, err := conn.CheckOCRQueue()
+ msg, err := conn.CheckQueue(conn.OCRQueueId())
checkOCRQueue = time.After(PauseBetweenChecks)
if err != nil {
log.Println("Error checking OCR queue", err)
@@ -268,7 +273,7 @@ func main() {
verboselog.Println("No message received on OCR queue, sleeping")
continue
}
- err = processBook(msg, conn, ocr(*training))
+ err = processBook(msg, conn, ocr(*training), preprocessedPattern, conn.OCRQueueId(), conn.AnalyseQueueId())
if err != nil {
log.Println("Error during OCR process", err)
}