package main // TODO: have logs go somewhere useful, like email // TODO: handle errors more smartly than just always fatal erroring // - read the sdk guarantees on retrying and ensure we retry some times before giving up if necessary // - cancel the current book processing rather than killing the program in the case of a nonrecoverable error import ( "log" "os" "path/filepath" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/sqs" "rescribe.xyz/go.git/preproc" ) const HeartbeatTime = 60 const PauseBetweenChecks = 60 * time.Second // TODO: could restructure like so: // have the goroutine functions run outside of the main loop in the program, // so use them for multiple books indefinitely. would require finding a way to // signal when the queues need to be updated (e.g. when a book is finished) // // MAYBE use a struct holding config info ala downloader in // https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sdk-utilities.html // // TODO: consider having the download etc functions return a channel like a generator, like in rob pike's talk func download(dl chan string, pre chan string, downloader *s3manager.Downloader, dir string) { for key := range dl { fn := filepath.Join(dir, filepath.Base(key)) f, err := os.Create(fn) if err != nil { log.Fatalln("Failed to create file", fn, err) } defer f.Close() _, err = downloader.Download(f, &s3.GetObjectInput{ Bucket: aws.String("inprogress"), Key: &key }) if err != nil { log.Fatalln("Failed to download", key, err) } pre <- fn } close(pre) } func preprocess(pre chan string, up chan string) { for path := range pre { done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30) if err != nil { log.Fatalln("Error preprocessing", path, err) } for _, p := range done { up <- p } } close(up) } func up(c chan string, done chan bool, uploader *s3manager.Uploader, bookname string) { for path := range c { name := filepath.Base(path) file, err := os.Open(path) if err != nil { log.Fatalln("Failed to open file", path, err) } defer file.Close() _, err = uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String("rescribeinprogress"), Key: aws.String(filepath.Join(bookname, name)), Body: file, }) if err != nil { log.Fatalln("Failed to upload", path, err) } } done <- true } func heartbeat(h *time.Ticker, msgHandle string, qurl string, sqssvc *sqs.SQS) { for _ = range h.C { duration := int64(HeartbeatTime * 2) _, err := sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ ReceiptHandle: &msgHandle, QueueUrl: &qurl, VisibilityTimeout: &duration, }) if err != nil { log.Fatalln("Error updating queue duration:", err) } } } func main() { if len(os.Args) != 1 { log.Fatal("Usage: pipelinepreprocess\n\nContinuously checks the preprocess queue for books.\nWhen a book is found it's downloaded from the S3 inprogress bucket, preprocessed, and the results are uploaded to the S3 inprogress bucket. The book name is then added to the ocr queue, and removed from the preprocess queue.\n") } sess, err := session.NewSession(&aws.Config{ Region: aws.String("eu-west-2"), }) if err != nil { log.Fatalln("Error: failed to set up aws session:", err) } s3svc := s3.New(sess) sqssvc := sqs.New(sess) downloader := s3manager.NewDownloader(sess) uploader := s3manager.NewUploader(sess) preqname := "rescribepreprocess" result, err := sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String(preqname), }) if err != nil { log.Fatalln("Error getting queue URL for", preqname, ":", err) } prequrl := *result.QueueUrl ocrqname := "rescribeocr" result, err = sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String(ocrqname), }) if err != nil { log.Fatalln("Error getting queue URL for", ocrqname, ":", err) } ocrqurl := *result.QueueUrl for { msgResult, err := sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ MaxNumberOfMessages: aws.Int64(1), VisibilityTimeout: aws.Int64(HeartbeatTime * 2), WaitTimeSeconds: aws.Int64(HeartbeatTime), QueueUrl: &prequrl, }) if err != nil { log.Fatalln("Error checking queue", preqname, ":", err) } var bookname string if len(msgResult.Messages) > 0 { bookname = *msgResult.Messages[0].Body } else { time.Sleep(PauseBetweenChecks) continue } t := time.NewTicker(HeartbeatTime * time.Second) go heartbeat(t, *msgResult.Messages[0].ReceiptHandle, prequrl, sqssvc) d := filepath.Join(os.TempDir(), bookname) err = os.Mkdir(d, 0755) if err != nil { log.Fatalln("Failed to create directory", d, err) } dl := make(chan string) pre := make(chan string) upc := make(chan string) // TODO: rename done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated // these functions will do their jobs when their channels have data go download(dl, pre, downloader, d) go preprocess(pre, upc) go up(upc, done, uploader, bookname) err = s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ Bucket: aws.String("rescribeinprogress"), Prefix: aws.String(bookname), }, func(page *s3.ListObjectsV2Output, last bool) bool { for _, r := range page.Contents { dl <- *r.Key } return true }) close(dl) // wait for the done channel to be posted to <-done _, err = sqssvc.SendMessage(&sqs.SendMessageInput{ MessageBody: aws.String(bookname), QueueUrl: &ocrqurl, }) if err != nil { log.Fatalln("Error sending message to queue", ocrqname, ":", err) } t.Stop() _, err = sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ QueueUrl: &prequrl, ReceiptHandle: msgResult.Messages[0].ReceiptHandle, }) if err != nil { log.Fatalln("Error deleting message from queue", preqname, ":", err) } } }