From 3b14764fda520266411500d09acb0da475c2c114 Mon Sep 17 00:00:00 2001 From: Nick White Date: Wed, 16 Oct 2019 18:01:50 +0100 Subject: Rewrite booktopipeline to use bookpipeline aws interface --- cmd/booktopipeline/main.go | 65 ++++++++++++++-------------------------------- 1 file changed, 19 insertions(+), 46 deletions(-) (limited to 'cmd') diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go index e6d08d8..c52527f 100644 --- a/cmd/booktopipeline/main.go +++ b/cmd/booktopipeline/main.go @@ -1,7 +1,5 @@ package main -// TODO: use bookpipeline package to do aws stuff - import ( "flag" "fmt" @@ -9,10 +7,7 @@ import ( "os" "path/filepath" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3/s3manager" - "github.com/aws/aws-sdk-go/service/sqs" + "rescribe.xyz/bookpipeline" ) const usage = `Usage: booktopipeline [-prebinarised] [-v] bookdir [bookname] @@ -24,6 +19,15 @@ prebinarised flag is set. If bookname is omitted the last part of the bookdir is used. ` +type Pipeliner interface { + Init() error + PreQueueId() string + WipeQueueId() string + WIPStorageId() string + AddToQueue(url string, msg string) error + Upload(bucket string, key string, path string) error +} + // null writer to enable non-verbose logging to be discarded type NullWriter bool @@ -74,30 +78,19 @@ func main() { verboselog = log.New(n, "", log.LstdFlags) } - verboselog.Println("Setting up AWS session") - sess, err := session.NewSession(&aws.Config{ - Region: aws.String("eu-west-2"), - }) + var conn Pipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + err := conn.Init() if err != nil { - log.Fatalln("Error: failed to set up aws session:", err) + log.Fatalln("Failed to set up cloud connection:", err) } - sqssvc := sqs.New(sess) - uploader := s3manager.NewUploader(sess) - var qname string + var qid string if *wipeonly { - qname = "rescribewipeonly" + qid = conn.WipeQueueId() } else { - qname = "rescribepreprocess" - } - verboselog.Println("Getting Queue URL for", qname) - result, err := sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String(qname), - }) - if err != nil { - log.Fatalln("Error getting queue URL for", qname, ":", err) + qid = conn.PreQueueId() } - qurl := *result.QueueUrl // concurrent walking upload based on example at // https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sdk-utilities.html @@ -114,33 +107,13 @@ func main() { for path := range walker { verboselog.Println("Uploading", path) name := filepath.Base(path) - file, err := os.Open(path) - if err != nil { - log.Fatalln("Open file", path, "failed:", err) - } - //defer file.Close() // done explicitly below - _, err = uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String("rescribeinprogress"), - Key: aws.String(filepath.Join(bookname, name)), - Body: file, - }) + err = conn.Upload(conn.WIPStorageId(), filepath.Join(bookname, name), path) if err != nil { log.Fatalln("Failed to upload", path, err) } - // Explicitly close here rather than wait for the defer, so we - // don't end up with too many open files which can cause os.Open - // to fail. - err = file.Close() - if err != nil { - log.Fatalln("Failed to close file", path, err) - } } - verboselog.Println("Sending message", bookname, "to queue", qurl) - _, err = sqssvc.SendMessage(&sqs.SendMessageInput{ - MessageBody: aws.String(bookname), - QueueUrl: &qurl, - }) + err = conn.AddToQueue(qid, bookname) if err != nil { log.Fatalln("Error adding book to queue:", err) } -- cgit v1.2.1-24-ge1ad