diff options
-rw-r--r-- | pipelinepreprocess/aws.go | 205 | ||||
-rw-r--r-- | pipelinepreprocess/main.go | 222 |
2 files changed, 205 insertions, 222 deletions
diff --git a/pipelinepreprocess/aws.go b/pipelinepreprocess/aws.go new file mode 100644 index 0000000..bb969ed --- /dev/null +++ b/pipelinepreprocess/aws.go @@ -0,0 +1,205 @@ +package main + +import ( + "errors" + "fmt" + "log" + "os" + "regexp" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/aws/aws-sdk-go/service/sqs" +) + +const PreprocPattern = `_bin[0-9].[0-9].png` +const HeartbeatTime = 60 + +type awsConn struct { + // these need to be set before running Init() + region string + logger *log.Logger + + // these are used internally + sess *session.Session + s3svc *s3.S3 + sqssvc *sqs.SQS + downloader *s3manager.Downloader + uploader *s3manager.Uploader + prequrl, ocrqurl string +} + +func (a *awsConn) Init() error { + if a.region == "" { + return errors.New("No region set") + } + if a.logger == nil { + return errors.New("No logger set") + } + + var err error + a.sess, err = session.NewSession(&aws.Config{ + Region: aws.String(a.region), + }) + if err != nil { + return errors.New(fmt.Sprintf("Failed to set up aws session: %s", err)) + } + a.s3svc = s3.New(a.sess) + a.sqssvc = sqs.New(a.sess) + a.downloader = s3manager.NewDownloader(a.sess) + a.uploader = s3manager.NewUploader(a.sess) + + a.logger.Println("Getting preprocess queue URL") + result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String("rescribepreprocess"), + }) + if err != nil { + return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err)) + } + a.prequrl = *result.QueueUrl + + a.logger.Println("Getting OCR queue URL") + result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String("rescribeocr"), + }) + if err != nil { + return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err)) + } + a.ocrqurl = *result.QueueUrl + return nil +} + +func (a *awsConn) CheckQueue(url string) (Qmsg, error) { + msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ + MaxNumberOfMessages: aws.Int64(1), + VisibilityTimeout: aws.Int64(HeartbeatTime * 2), + WaitTimeSeconds: aws.Int64(20), + QueueUrl: &url, + }) + if err != nil { + return Qmsg{}, err + } + + if len(msgResult.Messages) > 0 { + msg := Qmsg{ Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body } + a.logger.Println("Message received:", msg.Body) + return msg, nil + } else { + return Qmsg{}, nil + } +} + +func (a *awsConn) CheckPreQueue() (Qmsg, error) { + a.logger.Println("Checking preprocessing queue for new messages") + return a.CheckQueue(a.prequrl) +} + +func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { + for _ = range t.C { + duration := int64(HeartbeatTime * 2) + _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ + ReceiptHandle: &msgHandle, + QueueUrl: &qurl, + VisibilityTimeout: &duration, + }) + if err != nil { + return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err)) + } + } + return nil +} + +func (a *awsConn) PreQueueHeartbeat(t *time.Ticker, msgHandle string) error { + a.logger.Println("Starting preprocess queue heartbeat for", msgHandle) + return a.QueueHeartbeat(t, msgHandle, a.prequrl) +} + +func (a *awsConn) ListObjects(bucket string, prefix string, names chan string) error { + alreadydone := regexp.MustCompile(PreprocPattern) + err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + Prefix: aws.String(prefix), + }, func(page *s3.ListObjectsV2Output, last bool) bool { + for _, r := range page.Contents { + if alreadydone.MatchString(*r.Key) { + a.logger.Println("Skipping item that looks like it has already been processed", *r.Key) + continue + } + names <- *r.Key + } + return true + }) + close(names) + return err +} + +func (a *awsConn) ListInProgress(bookname string, names chan string) error { + return a.ListObjects("rescribeinprogress", bookname, names) +} + +func (a *awsConn) AddToQueue(url string, msg string) error { + _, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{ + MessageBody: &msg, + QueueUrl: &url, + }) + return err +} + +func (a *awsConn) AddToOCRQueue(msg string) error { + return a.AddToQueue(a.ocrqurl, msg) +} + +func (a *awsConn) DelFromQueue(url string, handle string) error { + _, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: &url, + ReceiptHandle: &handle, + }) + return err +} + +func (a *awsConn) DelFromPreQueue(handle string) error { + return a.DelFromQueue(a.prequrl, handle) +} + +func (a *awsConn) Download(bucket string, key string, path string) error { + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + _, err = a.downloader.Download(f, + &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: &key, + }) + return err +} + +func (a *awsConn) DownloadFromInProgress(key string, path string) error { + a.logger.Println("Downloading", key) + return a.Download("rescribeinprogress", key, path) +} + +func (a *awsConn) Upload(bucket string, key string, path string) error { + file, err := os.Open(path) + if err != nil { + log.Fatalln("Failed to open file", path, err) + } + defer file.Close() + + _, err = a.uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: file, + }) + return err +} + +func (a *awsConn) UploadToInProgress(key string, path string) error { + a.logger.Println("Uploading", path) + return a.Upload("rescribeinprogress", key, path) +} diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go index 407591f..a223d0b 100644 --- a/pipelinepreprocess/main.go +++ b/pipelinepreprocess/main.go @@ -6,20 +6,11 @@ package main // TODO: check if images are prebinarised and if so skip multiple binarisation import ( - "errors" - "fmt" "log" "os" "path/filepath" - "regexp" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3manager" - "github.com/aws/aws-sdk-go/service/sqs" - "rescribe.xyz/go.git/preproc" ) @@ -31,17 +22,8 @@ func (w NullWriter) Write(p []byte) (n int, err error) { return len(p), nil } -var alreadydone *regexp.Regexp - -const HeartbeatTime = 60 const PauseBetweenChecks = 60 * time.Second -const PreprocPattern = `_bin[0-9].[0-9].png` -// TODO: could restructure like so: -// have the goroutine functions run outside of the main loop in the program, -// so use them for multiple books indefinitely. would require finding a way to -// signal when the queues need to be updated (e.g. when a book is finished) -// // TODO: consider having the download etc functions return a channel like a generator, like in rob pike's talk type Clouder interface { @@ -70,194 +52,6 @@ type Qmsg struct { Handle, Body string } -type awsConn struct { - // these need to be set before running Init() - region string - logger *log.Logger - - // these are used internally - sess *session.Session - s3svc *s3.S3 - sqssvc *sqs.SQS - downloader *s3manager.Downloader - uploader *s3manager.Uploader - prequrl, ocrqurl string -} - -func (a *awsConn) Init() error { - if a.region == "" { - return errors.New("No region set") - } - if a.logger == nil { - return errors.New("No logger set") - } - - var err error - a.sess, err = session.NewSession(&aws.Config{ - Region: aws.String(a.region), - }) - if err != nil { - return errors.New(fmt.Sprintf("Failed to set up aws session: %s", err)) - } - a.s3svc = s3.New(a.sess) - a.sqssvc = sqs.New(a.sess) - a.downloader = s3manager.NewDownloader(a.sess) - a.uploader = s3manager.NewUploader(a.sess) - - a.logger.Println("Getting preprocess queue URL") - result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String("rescribepreprocess"), - }) - if err != nil { - return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err)) - } - a.prequrl = *result.QueueUrl - - a.logger.Println("Getting OCR queue URL") - result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String("rescribeocr"), - }) - if err != nil { - return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err)) - } - a.ocrqurl = *result.QueueUrl - return nil -} - -func (a *awsConn) CheckQueue(url string) (Qmsg, error) { - msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ - MaxNumberOfMessages: aws.Int64(1), - VisibilityTimeout: aws.Int64(HeartbeatTime * 2), - WaitTimeSeconds: aws.Int64(20), - QueueUrl: &url, - }) - if err != nil { - return Qmsg{}, err - } - - if len(msgResult.Messages) > 0 { - msg := Qmsg{ Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body } - a.logger.Println("Message received:", msg.Body) - return msg, nil - } else { - return Qmsg{}, nil - } -} - -func (a *awsConn) CheckPreQueue() (Qmsg, error) { - a.logger.Println("Checking preprocessing queue for new messages") - return a.CheckQueue(a.prequrl) -} - -func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { - for _ = range t.C { - duration := int64(HeartbeatTime * 2) - _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ - ReceiptHandle: &msgHandle, - QueueUrl: &qurl, - VisibilityTimeout: &duration, - }) - if err != nil { - return errors.New(fmt.Sprintf("Heartbeat error updating queue duration: %s", err)) - } - } - return nil -} - -func (a *awsConn) PreQueueHeartbeat(t *time.Ticker, msgHandle string) error { - a.logger.Println("Starting preprocess queue heartbeat for", msgHandle) - return a.QueueHeartbeat(t, msgHandle, a.prequrl) -} - -func (a *awsConn) ListObjects(bucket string, prefix string, names chan string) error { - err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ - Bucket: aws.String(bucket), - Prefix: aws.String(prefix), - }, func(page *s3.ListObjectsV2Output, last bool) bool { - for _, r := range page.Contents { - if alreadydone.MatchString(*r.Key) { - a.logger.Println("Skipping item that looks like it has already been processed", *r.Key) - continue - } - names <- *r.Key - } - return true - }) - close(names) - return err -} - -func (a *awsConn) ListInProgress(bookname string, names chan string) error { - return a.ListObjects("rescribeinprogress", bookname, names) -} - -func (a *awsConn) AddToQueue(url string, msg string) error { - _, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{ - MessageBody: &msg, - QueueUrl: &url, - }) - return err -} - -func (a *awsConn) AddToOCRQueue(msg string) error { - return a.AddToQueue(a.ocrqurl, msg) -} - -func (a *awsConn) DelFromQueue(url string, handle string) error { - _, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ - QueueUrl: &url, - ReceiptHandle: &handle, - }) - return err -} - -func (a *awsConn) DelFromPreQueue(handle string) error { - return a.DelFromQueue(a.prequrl, handle) -} - -func (a *awsConn) Download(bucket string, key string, path string) error { - f, err := os.Create(path) - if err != nil { - return err - } - defer f.Close() - - _, err = a.downloader.Download(f, - &s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: &key, - }) - return err -} - -func (a *awsConn) DownloadFromInProgress(key string, path string) error { - a.logger.Println("Downloading", key) - return a.Download("rescribeinprogress", key, path) -} - -func (a *awsConn) Upload(bucket string, key string, path string) error { - file, err := os.Open(path) - if err != nil { - log.Fatalln("Failed to open file", path, err) - } - defer file.Close() - - _, err = a.uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), - Body: file, - }) - return err -} - -func (a *awsConn) UploadToInProgress(key string, path string) error { - a.logger.Println("Uploading", path) - return a.Upload("rescribeinprogress", key, path) -} - - - - func download(dl chan string, pre chan string, conn Pipeliner, dir string) { for key := range dl { fn := filepath.Join(dir, filepath.Base(key)) @@ -297,20 +91,6 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string) { done <- true } -func heartbeat(h *time.Ticker, msgHandle string, qurl string, sqssvc *sqs.SQS) { - for _ = range h.C { - duration := int64(HeartbeatTime * 2) - _, err := sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ - ReceiptHandle: &msgHandle, - QueueUrl: &qurl, - VisibilityTimeout: &duration, - }) - if err != nil { - log.Fatalln("Error updating queue duration:", err) - } - } -} - func main() { var verboselog *log.Logger if len(os.Args) > 1 { @@ -324,8 +104,6 @@ func main() { verboselog = log.New(n, "", log.LstdFlags) } - alreadydone = regexp.MustCompile(PreprocPattern) - var conn Pipeliner conn = &awsConn{ region: "eu-west-2", logger: verboselog } |