diff options
Diffstat (limited to 'pipelinepreprocess/main.go')
-rw-r--r-- | pipelinepreprocess/main.go | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go new file mode 100644 index 0000000..d4aa6a4 --- /dev/null +++ b/pipelinepreprocess/main.go @@ -0,0 +1,212 @@ +package main +// TODO: have logs go somewhere useful, like email +// TODO: handle errors more smartly than just always fatal erroring +// - read the sdk guarantees on retrying and ensure we retry some times before giving up if necessary +// - cancel the current book processing rather than killing the program in the case of a nonrecoverable error + +import ( + "log" + "os" + "path/filepath" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/aws/aws-sdk-go/service/sqs" + + "rescribe.xyz/go.git/preproc" +) + +const HeartbeatTime = 60 +const PauseBetweenChecks = 60 * time.Second + +// TODO: could restructure like so: +// have the goroutine functions run outside of the main loop in the program, +// so use them for multiple books indefinitely. would require finding a way to +// signal when the queues need to be updated (e.g. when a book is finished) +// +// MAYBE use a struct holding config info ala downloader in +// https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sdk-utilities.html +// +// TODO: consider having the download etc functions return a channel like a generator, like in rob pike's talk + +func download(dl chan string, pre chan string, downloader *s3manager.Downloader, dir string) { + for key := range dl { + fn := filepath.Join(dir, filepath.Base(key)) + f, err := os.Create(fn) + if err != nil { + log.Fatalln("Failed to create file", fn, err) + } + defer f.Close() + + _, err = downloader.Download(f, + &s3.GetObjectInput{ + Bucket: aws.String("inprogress"), + Key: &key }) + if err != nil { + log.Fatalln("Failed to download", key, err) + } + pre <- fn + } + close(pre) +} + +func preprocess(pre chan string, up chan string) { + for path := range pre { + done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30) + if err != nil { + log.Fatalln("Error preprocessing", path, err) + } + for _, p := range done { + up <- p + } + } + close(up) +} + +func up(c chan string, done chan bool, uploader *s3manager.Uploader, bookname string) { + for path := range c { + name := filepath.Base(path) + file, err := os.Open(path) + if err != nil { + log.Fatalln("Failed to open file", path, err) + } + defer file.Close() + + _, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String("rescribeinprogress"), + Key: aws.String(filepath.Join(bookname, name)), + Body: file, + }) + if err != nil { + log.Fatalln("Failed to upload", path, err) + } + } + + done <- true +} + +func heartbeat(h *time.Ticker, msgHandle string, qurl string, sqssvc *sqs.SQS) { + for _ = range h.C { + duration := int64(HeartbeatTime * 2) + _, err := sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ + ReceiptHandle: &msgHandle, + QueueUrl: &qurl, + VisibilityTimeout: &duration, + }) + if err != nil { + log.Fatalln("Error updating queue duration:", err) + } + } +} + +func main() { + if len(os.Args) != 1 { + log.Fatal("Usage: pipelinepreprocess\n\nContinuously checks the preprocess queue for books.\nWhen a book is found it's downloaded from the S3 inprogress bucket, preprocessed, and the results are uploaded to the S3 inprogress bucket. The book name is then added to the ocr queue, and removed from the preprocess queue.\n") + } + + sess, err := session.NewSession(&aws.Config{ + Region: aws.String("eu-west-2"), + }) + if err != nil { + log.Fatalln("Error: failed to set up aws session:", err) + } + s3svc := s3.New(sess) + sqssvc := sqs.New(sess) + downloader := s3manager.NewDownloader(sess) + uploader := s3manager.NewUploader(sess) + + preqname := "rescribepreprocess" + result, err := sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(preqname), + }) + if err != nil { + log.Fatalln("Error getting queue URL for", preqname, ":", err) + } + prequrl := *result.QueueUrl + + ocrqname := "rescribeocr" + result, err = sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(ocrqname), + }) + if err != nil { + log.Fatalln("Error getting queue URL for", ocrqname, ":", err) + } + ocrqurl := *result.QueueUrl + + for { + msgResult, err := sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ + MaxNumberOfMessages: aws.Int64(1), + VisibilityTimeout: aws.Int64(HeartbeatTime * 2), + WaitTimeSeconds: aws.Int64(HeartbeatTime), + QueueUrl: &prequrl, + }) + if err != nil { + log.Fatalln("Error checking queue", preqname, ":", err) + } + + var bookname string + if len(msgResult.Messages) > 0 { + bookname = *msgResult.Messages[0].Body + } else { + time.Sleep(PauseBetweenChecks) + continue + } + + t := time.NewTicker(HeartbeatTime * time.Second) + go heartbeat(t, *msgResult.Messages[0].ReceiptHandle, prequrl, sqssvc) + + + d := filepath.Join(os.TempDir(), bookname) + err = os.Mkdir(d, 0755) + if err != nil { + log.Fatalln("Failed to create directory", d, err) + } + + dl := make(chan string) + pre := make(chan string) + upc := make(chan string) // TODO: rename + done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated + + // these functions will do their jobs when their channels have data + go download(dl, pre, downloader, d) + go preprocess(pre, upc) + go up(upc, done, uploader, bookname) + + + err = s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ + Bucket: aws.String("rescribeinprogress"), + Prefix: aws.String(bookname), + }, func(page *s3.ListObjectsV2Output, last bool) bool { + for _, r := range page.Contents { + dl <- *r.Key + } + return true + }) + close(dl) + + + // wait for the done channel to be posted to + <-done + + _, err = sqssvc.SendMessage(&sqs.SendMessageInput{ + MessageBody: aws.String(bookname), + QueueUrl: &ocrqurl, + }) + if err != nil { + log.Fatalln("Error sending message to queue", ocrqname, ":", err) + } + + t.Stop() + + _, err = sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: &prequrl, + ReceiptHandle: msgResult.Messages[0].ReceiptHandle, + }) + if err != nil { + log.Fatalln("Error deleting message from queue", preqname, ":", err) + } + } +} |