summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/booktopipeline/main.go65
1 files changed, 19 insertions, 46 deletions
diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go
index e6d08d8..c52527f 100644
--- a/cmd/booktopipeline/main.go
+++ b/cmd/booktopipeline/main.go
@@ -1,7 +1,5 @@
package main
-// TODO: use bookpipeline package to do aws stuff
-
import (
"flag"
"fmt"
@@ -9,10 +7,7 @@ import (
"os"
"path/filepath"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/aws/session"
- "github.com/aws/aws-sdk-go/service/s3/s3manager"
- "github.com/aws/aws-sdk-go/service/sqs"
+ "rescribe.xyz/bookpipeline"
)
const usage = `Usage: booktopipeline [-prebinarised] [-v] bookdir [bookname]
@@ -24,6 +19,15 @@ prebinarised flag is set.
If bookname is omitted the last part of the bookdir is used.
`
+type Pipeliner interface {
+ Init() error
+ PreQueueId() string
+ WipeQueueId() string
+ WIPStorageId() string
+ AddToQueue(url string, msg string) error
+ Upload(bucket string, key string, path string) error
+}
+
// null writer to enable non-verbose logging to be discarded
type NullWriter bool
@@ -74,30 +78,19 @@ func main() {
verboselog = log.New(n, "", log.LstdFlags)
}
- verboselog.Println("Setting up AWS session")
- sess, err := session.NewSession(&aws.Config{
- Region: aws.String("eu-west-2"),
- })
+ var conn Pipeliner
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
+ err := conn.Init()
if err != nil {
- log.Fatalln("Error: failed to set up aws session:", err)
+ log.Fatalln("Failed to set up cloud connection:", err)
}
- sqssvc := sqs.New(sess)
- uploader := s3manager.NewUploader(sess)
- var qname string
+ var qid string
if *wipeonly {
- qname = "rescribewipeonly"
+ qid = conn.WipeQueueId()
} else {
- qname = "rescribepreprocess"
- }
- verboselog.Println("Getting Queue URL for", qname)
- result, err := sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
- QueueName: aws.String(qname),
- })
- if err != nil {
- log.Fatalln("Error getting queue URL for", qname, ":", err)
+ qid = conn.PreQueueId()
}
- qurl := *result.QueueUrl
// concurrent walking upload based on example at
// https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sdk-utilities.html
@@ -114,33 +107,13 @@ func main() {
for path := range walker {
verboselog.Println("Uploading", path)
name := filepath.Base(path)
- file, err := os.Open(path)
- if err != nil {
- log.Fatalln("Open file", path, "failed:", err)
- }
- //defer file.Close() // done explicitly below
- _, err = uploader.Upload(&s3manager.UploadInput{
- Bucket: aws.String("rescribeinprogress"),
- Key: aws.String(filepath.Join(bookname, name)),
- Body: file,
- })
+ err = conn.Upload(conn.WIPStorageId(), filepath.Join(bookname, name), path)
if err != nil {
log.Fatalln("Failed to upload", path, err)
}
- // Explicitly close here rather than wait for the defer, so we
- // don't end up with too many open files which can cause os.Open
- // to fail.
- err = file.Close()
- if err != nil {
- log.Fatalln("Failed to close file", path, err)
- }
}
- verboselog.Println("Sending message", bookname, "to queue", qurl)
- _, err = sqssvc.SendMessage(&sqs.SendMessageInput{
- MessageBody: aws.String(bookname),
- QueueUrl: &qurl,
- })
+ err = conn.AddToQueue(qid, bookname)
if err != nil {
log.Fatalln("Error adding book to queue:", err)
}