diff options
Diffstat (limited to 'pipelinepreprocess/aws.go')
-rw-r--r-- | pipelinepreprocess/aws.go | 88 |
1 files changed, 77 insertions, 11 deletions
diff --git a/pipelinepreprocess/aws.go b/pipelinepreprocess/aws.go index bb969ed..75bf81c 100644 --- a/pipelinepreprocess/aws.go +++ b/pipelinepreprocess/aws.go @@ -29,7 +29,7 @@ type awsConn struct { sqssvc *sqs.SQS downloader *s3manager.Downloader uploader *s3manager.Uploader - prequrl, ocrqurl string + prequrl, ocrqurl, analysequrl string } func (a *awsConn) Init() error { @@ -69,6 +69,16 @@ func (a *awsConn) Init() error { return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err)) } a.ocrqurl = *result.QueueUrl + + a.logger.Println("Getting analyse queue URL") + result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String("rescribeanalyse"), + }) + if err != nil { + return errors.New(fmt.Sprintf("Error getting analyse queue URL: %s", err)) + } + a.analysequrl = *result.QueueUrl + return nil } @@ -97,6 +107,16 @@ func (a *awsConn) CheckPreQueue() (Qmsg, error) { return a.CheckQueue(a.prequrl) } +func (a *awsConn) CheckOCRQueue() (Qmsg, error) { + a.logger.Println("Checking OCR queue for new messages") + return a.CheckQueue(a.ocrqurl) +} + +func (a *awsConn) CheckAnalyseQueue() (Qmsg, error) { + a.logger.Println("Checking analyse queue for new messages") + return a.CheckQueue(a.ocrqurl) +} + func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { for _ = range t.C { duration := int64(HeartbeatTime * 2) @@ -113,31 +133,65 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) } func (a *awsConn) PreQueueHeartbeat(t *time.Ticker, msgHandle string) error { - a.logger.Println("Starting preprocess queue heartbeat for", msgHandle) + a.logger.Println("Starting preprocess queue heartbeat") return a.QueueHeartbeat(t, msgHandle, a.prequrl) } -func (a *awsConn) ListObjects(bucket string, prefix string, names chan string) error { - alreadydone := regexp.MustCompile(PreprocPattern) +func (a *awsConn) OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error { + a.logger.Println("Starting ocr queue heartbeat") + return a.QueueHeartbeat(t, msgHandle, a.ocrqurl) +} + +func (a *awsConn) ListObjects(bucket string, prefix string, names chan string) { err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ Bucket: aws.String(bucket), Prefix: aws.String(prefix), }, func(page *s3.ListObjectsV2Output, last bool) bool { for _, r := range page.Contents { - if alreadydone.MatchString(*r.Key) { - a.logger.Println("Skipping item that looks like it has already been processed", *r.Key) - continue - } names <- *r.Key } return true }) close(names) - return err + if err != nil { + // TODO: handle error properly + log.Println("Error getting objects") + } } -func (a *awsConn) ListInProgress(bookname string, names chan string) error { - return a.ListObjects("rescribeinprogress", bookname, names) +func (a *awsConn) ListToPreprocess(bookname string, names chan string) error { + objs := make(chan string) + preprocessed := regexp.MustCompile(PreprocPattern) + go a.ListObjects("rescribeinprogress", bookname, objs) + // Filter out any object that looks like it's already been preprocessed + for n := range objs { + if preprocessed.MatchString(n) { + a.logger.Println("Skipping item that looks like it has already been processed", n) + continue + } + names <- n + } + close(names) + // TODO: handle errors from ListObjects + return nil +} + +func (a *awsConn) ListToOCR(bookname string, names chan string) error { + objs := make(chan string) + preprocessed := regexp.MustCompile(PreprocPattern) + go a.ListObjects("rescribeinprogress", bookname, objs) + a.logger.Println("Completed running listobjects") + // Filter out any object that looks like it hasn't already been preprocessed + for n := range objs { + if ! preprocessed.MatchString(n) { + a.logger.Println("Skipping item that looks like it is not preprocessed", n) + continue + } + names <- n + } + close(names) + // TODO: handle errors from ListObjects + return nil } func (a *awsConn) AddToQueue(url string, msg string) error { @@ -152,6 +206,10 @@ func (a *awsConn) AddToOCRQueue(msg string) error { return a.AddToQueue(a.ocrqurl, msg) } +func (a *awsConn) AddToAnalyseQueue(msg string) error { + return a.AddToQueue(a.analysequrl, msg) +} + func (a *awsConn) DelFromQueue(url string, handle string) error { _, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ QueueUrl: &url, @@ -164,6 +222,10 @@ func (a *awsConn) DelFromPreQueue(handle string) error { return a.DelFromQueue(a.prequrl, handle) } +func (a *awsConn) DelFromOCRQueue(handle string) error { + return a.DelFromQueue(a.ocrqurl, handle) +} + func (a *awsConn) Download(bucket string, key string, path string) error { f, err := os.Create(path) if err != nil { @@ -203,3 +265,7 @@ func (a *awsConn) UploadToInProgress(key string, path string) error { a.logger.Println("Uploading", path) return a.Upload("rescribeinprogress", key, path) } + +func (a *awsConn) Logger() *log.Logger { + return a.logger +} |