summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--aws.go64
-rw-r--r--cmd/booktopipeline/main.go2
-rw-r--r--cmd/mkpipeline/main.go57
3 files changed, 75 insertions, 48 deletions
diff --git a/aws.go b/aws.go
index c2aa35e..08a597e 100644
--- a/aws.go
+++ b/aws.go
@@ -48,8 +48,8 @@ type AwsConn struct {
wipstorageid string
}
-// TODO: split this up, as not everything is needed for different uses
-func (a *AwsConn) Init() error {
+// MinimalInit does the bare minimum to initialise aws services
+func (a *AwsConn) MinimalInit() error {
if a.Region == "" {
return errors.New("No Region set")
}
@@ -70,6 +70,19 @@ func (a *AwsConn) Init() error {
a.downloader = s3manager.NewDownloader(a.sess)
a.uploader = s3manager.NewUploader(a.sess)
+ a.wipstorageid = "rescribeinprogress"
+
+ return nil
+}
+
+// Init initialises aws services, also finding the urls needed to
+// address SQS queues directly.
+func (a *AwsConn) Init() error {
+ err := a.MinimalInit()
+ if err != nil {
+ return err
+ }
+
a.Logger.Println("Getting preprocess queue URL")
result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String("rescribepreprocess"),
@@ -115,8 +128,6 @@ func (a *AwsConn) Init() error {
}
a.ocrpgqurl = *result.QueueUrl
- a.wipstorageid = "rescribeinprogress"
-
return nil
}
@@ -289,6 +300,49 @@ func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) {
return prefixes, err
}
+// CreateBucket creates a new S3 bucket
+func (a *AwsConn) CreateBucket(name string) error {
+ _, err := a.s3svc.CreateBucket(&s3.CreateBucketInput{
+ Bucket: aws.String(name),
+ })
+ if err != nil {
+ aerr, ok := err.(awserr.Error)
+ if ok && (aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou) {
+ a.Logger.Println("Bucket already exists:", name)
+ } else {
+ return errors.New(fmt.Sprintf("Error creating bucket %s: %v", name, err))
+ }
+ }
+ return nil
+}
+
+// CreateQueue creates a new SQS queue
+// Note the queue attributes are currently hardcoded; it may make sense
+// to specify them as arguments in the future.
+func (a *AwsConn) CreateQueue(name string) error {
+ _, err := a.sqssvc.CreateQueue(&sqs.CreateQueueInput{
+ QueueName: aws.String(name),
+ Attributes: map[string]*string{
+ "VisibilityTimeout": aws.String("120"), // 2 minutes
+ "MessageRetentionPeriod": aws.String("1209600"), // 14 days; max allowed by sqs
+ "ReceiveMessageWaitTimeSeconds": aws.String("20"),
+ },
+ })
+ if err != nil {
+ aerr, ok := err.(awserr.Error)
+ // Note the QueueAlreadyExists code is only emitted if an existing queue
+ // has different attributes than the one that was being created. SQS just
+ // quietly ignores the CreateQueue request if it is identical to an
+ // existing queue.
+ if ok && aerr.Code() == sqs.ErrCodeQueueNameExists {
+ return errors.New("Error: Queue already exists but has different attributes:" + name)
+ } else {
+ return errors.New(fmt.Sprintf("Error creating queue %s: %v", name, err))
+ }
+ }
+ return nil
+}
+
func (a *AwsConn) AddToQueue(url string, msg string) error {
_, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{
MessageBody: &msg,
@@ -326,7 +380,7 @@ func (a *AwsConn) Download(bucket string, key string, path string) error {
func (a *AwsConn) Upload(bucket string, key string, path string) error {
file, err := os.Open(path)
if err != nil {
- log.Fatalln("Failed to open file", path, err)
+ return err
}
defer file.Close()
diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go
index c52527f..264b6ab 100644
--- a/cmd/booktopipeline/main.go
+++ b/cmd/booktopipeline/main.go
@@ -92,8 +92,6 @@ func main() {
qid = conn.PreQueueId()
}
- // concurrent walking upload based on example at
- // https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sdk-utilities.html
verboselog.Println("Walking", bookdir)
walker := make(fileWalk)
go func() {
diff --git a/cmd/mkpipeline/main.go b/cmd/mkpipeline/main.go
index a32526a..55e0417 100644
--- a/cmd/mkpipeline/main.go
+++ b/cmd/mkpipeline/main.go
@@ -1,36 +1,34 @@
package main
-// TODO: use the bookpipeline package for aws stuff
// TODO: set up iam role and policy needed for ec2 instances to access this stuff;
// see arn:aws:iam::557852942063:policy/pipelinestorageandqueue
// and arn:aws:iam::557852942063:role/pipeliner
// TODO: set up launch template for ec2 instances
-// NOTE: potentially use json templates to define things, ala aws cli
import (
"log"
"os"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/aws/awserr"
- "github.com/aws/aws-sdk-go/aws/session"
- "github.com/aws/aws-sdk-go/service/s3"
- "github.com/aws/aws-sdk-go/service/sqs"
+ "rescribe.xyz/bookpipeline"
)
+type MkPipeliner interface {
+ MinimalInit() error
+ CreateBucket(string) error
+ CreateQueue(string) error
+}
+
func main() {
if len(os.Args) != 1 {
- log.Fatal("Usage: mkpipeline\n\nSets up necessary S3 buckets and SQS queues for our AWS pipeline\n")
+ log.Fatal("Usage: mkpipeline\n\nSets up necessary buckets and queues for our cloud pipeline\n")
}
- sess, err := session.NewSession(&aws.Config{
- Region: aws.String("eu-west-2"),
- })
+ var conn MkPipeliner
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: log.New(os.Stdout, "", 0)}
+ err := conn.MinimalInit()
if err != nil {
- log.Fatalf("Error: failed to set up aws session: %v\n", err)
+ log.Fatalln("Failed to set up cloud connection:", err)
}
- s3svc := s3.New(sess)
- sqssvc := sqs.New(sess)
prefix := "rescribe"
buckets := []string{"inprogress", "done"}
@@ -39,41 +37,18 @@ func main() {
for _, bucket := range buckets {
bname := prefix + bucket
log.Printf("Creating bucket %s\n", bname)
- _, err = s3svc.CreateBucket(&s3.CreateBucketInput{
- Bucket: aws.String(bname),
- })
+ err = conn.CreateBucket(bname)
if err != nil {
- aerr, ok := err.(awserr.Error)
- if ok && (aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou) {
- log.Printf("Bucket %s already exists\n", bname)
- } else {
- log.Fatalf("Error creating bucket %s: %v\n", bname, err)
- }
+ log.Fatalln(err)
}
}
for _, queue := range queues {
qname := prefix + queue
log.Printf("Creating queue %s\n", qname)
- _, err = sqssvc.CreateQueue(&sqs.CreateQueueInput{
- QueueName: aws.String(qname),
- Attributes: map[string]*string{
- "VisibilityTimeout": aws.String("120"), // 2 minutes
- "MessageRetentionPeriod": aws.String("1209600"), // 14 days; max allowed by sqs
- "ReceiveMessageWaitTimeSeconds": aws.String("20"),
- },
- })
+ err = conn.CreateQueue(qname)
if err != nil {
- aerr, ok := err.(awserr.Error)
- // Note the QueueAlreadyExists code is only emitted if an existing queue
- // has different attributes than the one that was being created. SQS just
- // quietly ignores the CreateQueue request if it is identical to an
- // existing queue.
- if ok && aerr.Code() == sqs.ErrCodeQueueNameExists {
- log.Fatalf("Error: Queue %s already exists but has different attributes\n", qname)
- } else {
- log.Fatalf("Error creating queue %s: %v\n", qname, err)
- }
+ log.Fatalln(err)
}
}
}