summaryrefslogtreecommitdiff
path: root/pipelinepreprocess/aws.go
diff options
context:
space:
mode:
Diffstat (limited to 'pipelinepreprocess/aws.go')
-rw-r--r--pipelinepreprocess/aws.go88
1 files changed, 77 insertions, 11 deletions
diff --git a/pipelinepreprocess/aws.go b/pipelinepreprocess/aws.go
index bb969ed..75bf81c 100644
--- a/pipelinepreprocess/aws.go
+++ b/pipelinepreprocess/aws.go
@@ -29,7 +29,7 @@ type awsConn struct {
sqssvc *sqs.SQS
downloader *s3manager.Downloader
uploader *s3manager.Uploader
- prequrl, ocrqurl string
+ prequrl, ocrqurl, analysequrl string
}
func (a *awsConn) Init() error {
@@ -69,6 +69,16 @@ func (a *awsConn) Init() error {
return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err))
}
a.ocrqurl = *result.QueueUrl
+
+ a.logger.Println("Getting analyse queue URL")
+ result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String("rescribeanalyse"),
+ })
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error getting analyse queue URL: %s", err))
+ }
+ a.analysequrl = *result.QueueUrl
+
return nil
}
@@ -97,6 +107,16 @@ func (a *awsConn) CheckPreQueue() (Qmsg, error) {
return a.CheckQueue(a.prequrl)
}
+func (a *awsConn) CheckOCRQueue() (Qmsg, error) {
+ a.logger.Println("Checking OCR queue for new messages")
+ return a.CheckQueue(a.ocrqurl)
+}
+
+func (a *awsConn) CheckAnalyseQueue() (Qmsg, error) {
+ a.logger.Println("Checking analyse queue for new messages")
+ return a.CheckQueue(a.ocrqurl)
+}
+
func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {
for _ = range t.C {
duration := int64(HeartbeatTime * 2)
@@ -113,31 +133,65 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string)
}
func (a *awsConn) PreQueueHeartbeat(t *time.Ticker, msgHandle string) error {
- a.logger.Println("Starting preprocess queue heartbeat for", msgHandle)
+ a.logger.Println("Starting preprocess queue heartbeat")
return a.QueueHeartbeat(t, msgHandle, a.prequrl)
}
-func (a *awsConn) ListObjects(bucket string, prefix string, names chan string) error {
- alreadydone := regexp.MustCompile(PreprocPattern)
+func (a *awsConn) OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error {
+ a.logger.Println("Starting ocr queue heartbeat")
+ return a.QueueHeartbeat(t, msgHandle, a.ocrqurl)
+}
+
+func (a *awsConn) ListObjects(bucket string, prefix string, names chan string) {
err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
}, func(page *s3.ListObjectsV2Output, last bool) bool {
for _, r := range page.Contents {
- if alreadydone.MatchString(*r.Key) {
- a.logger.Println("Skipping item that looks like it has already been processed", *r.Key)
- continue
- }
names <- *r.Key
}
return true
})
close(names)
- return err
+ if err != nil {
+ // TODO: handle error properly
+ log.Println("Error getting objects")
+ }
}
-func (a *awsConn) ListInProgress(bookname string, names chan string) error {
- return a.ListObjects("rescribeinprogress", bookname, names)
+func (a *awsConn) ListToPreprocess(bookname string, names chan string) error {
+ objs := make(chan string)
+ preprocessed := regexp.MustCompile(PreprocPattern)
+ go a.ListObjects("rescribeinprogress", bookname, objs)
+ // Filter out any object that looks like it's already been preprocessed
+ for n := range objs {
+ if preprocessed.MatchString(n) {
+ a.logger.Println("Skipping item that looks like it has already been processed", n)
+ continue
+ }
+ names <- n
+ }
+ close(names)
+ // TODO: handle errors from ListObjects
+ return nil
+}
+
+func (a *awsConn) ListToOCR(bookname string, names chan string) error {
+ objs := make(chan string)
+ preprocessed := regexp.MustCompile(PreprocPattern)
+ go a.ListObjects("rescribeinprogress", bookname, objs)
+ a.logger.Println("Completed running listobjects")
+ // Filter out any object that looks like it hasn't already been preprocessed
+ for n := range objs {
+ if ! preprocessed.MatchString(n) {
+ a.logger.Println("Skipping item that looks like it is not preprocessed", n)
+ continue
+ }
+ names <- n
+ }
+ close(names)
+ // TODO: handle errors from ListObjects
+ return nil
}
func (a *awsConn) AddToQueue(url string, msg string) error {
@@ -152,6 +206,10 @@ func (a *awsConn) AddToOCRQueue(msg string) error {
return a.AddToQueue(a.ocrqurl, msg)
}
+func (a *awsConn) AddToAnalyseQueue(msg string) error {
+ return a.AddToQueue(a.analysequrl, msg)
+}
+
func (a *awsConn) DelFromQueue(url string, handle string) error {
_, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: &url,
@@ -164,6 +222,10 @@ func (a *awsConn) DelFromPreQueue(handle string) error {
return a.DelFromQueue(a.prequrl, handle)
}
+func (a *awsConn) DelFromOCRQueue(handle string) error {
+ return a.DelFromQueue(a.ocrqurl, handle)
+}
+
func (a *awsConn) Download(bucket string, key string, path string) error {
f, err := os.Create(path)
if err != nil {
@@ -203,3 +265,7 @@ func (a *awsConn) UploadToInProgress(key string, path string) error {
a.logger.Println("Uploading", path)
return a.Upload("rescribeinprogress", key, path)
}
+
+func (a *awsConn) Logger() *log.Logger {
+ return a.logger
+}