From ca1ee4ccb33784103339b4b52f8948aa0dd30263 Mon Sep 17 00:00:00 2001 From: Nick White Date: Wed, 11 Dec 2019 09:31:18 +0000 Subject: Use aws.go with mkpipeline too, plus fix one log.Fatal call in aws.go which should have been handled by caller --- aws.go | 64 ++++++++++++++++++++++++++++++++++++++++++---- cmd/booktopipeline/main.go | 2 -- cmd/mkpipeline/main.go | 57 ++++++++++++----------------------------- 3 files changed, 75 insertions(+), 48 deletions(-) diff --git a/aws.go b/aws.go index c2aa35e..08a597e 100644 --- a/aws.go +++ b/aws.go @@ -48,8 +48,8 @@ type AwsConn struct { wipstorageid string } -// TODO: split this up, as not everything is needed for different uses -func (a *AwsConn) Init() error { +// MinimalInit does the bare minimum to initialise aws services +func (a *AwsConn) MinimalInit() error { if a.Region == "" { return errors.New("No Region set") } @@ -70,6 +70,19 @@ func (a *AwsConn) Init() error { a.downloader = s3manager.NewDownloader(a.sess) a.uploader = s3manager.NewUploader(a.sess) + a.wipstorageid = "rescribeinprogress" + + return nil +} + +// Init initialises aws services, also finding the urls needed to +// address SQS queues directly. +func (a *AwsConn) Init() error { + err := a.MinimalInit() + if err != nil { + return err + } + a.Logger.Println("Getting preprocess queue URL") result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String("rescribepreprocess"), @@ -115,8 +128,6 @@ func (a *AwsConn) Init() error { } a.ocrpgqurl = *result.QueueUrl - a.wipstorageid = "rescribeinprogress" - return nil } @@ -289,6 +300,49 @@ func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) { return prefixes, err } +// CreateBucket creates a new S3 bucket +func (a *AwsConn) CreateBucket(name string) error { + _, err := a.s3svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(name), + }) + if err != nil { + aerr, ok := err.(awserr.Error) + if ok && (aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou) { + a.Logger.Println("Bucket already exists:", name) + } else { + return errors.New(fmt.Sprintf("Error creating bucket %s: %v", name, err)) + } + } + return nil +} + +// CreateQueue creates a new SQS queue +// Note the queue attributes are currently hardcoded; it may make sense +// to specify them as arguments in the future. +func (a *AwsConn) CreateQueue(name string) error { + _, err := a.sqssvc.CreateQueue(&sqs.CreateQueueInput{ + QueueName: aws.String(name), + Attributes: map[string]*string{ + "VisibilityTimeout": aws.String("120"), // 2 minutes + "MessageRetentionPeriod": aws.String("1209600"), // 14 days; max allowed by sqs + "ReceiveMessageWaitTimeSeconds": aws.String("20"), + }, + }) + if err != nil { + aerr, ok := err.(awserr.Error) + // Note the QueueAlreadyExists code is only emitted if an existing queue + // has different attributes than the one that was being created. SQS just + // quietly ignores the CreateQueue request if it is identical to an + // existing queue. + if ok && aerr.Code() == sqs.ErrCodeQueueNameExists { + return errors.New("Error: Queue already exists but has different attributes:" + name) + } else { + return errors.New(fmt.Sprintf("Error creating queue %s: %v", name, err)) + } + } + return nil +} + func (a *AwsConn) AddToQueue(url string, msg string) error { _, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{ MessageBody: &msg, @@ -326,7 +380,7 @@ func (a *AwsConn) Download(bucket string, key string, path string) error { func (a *AwsConn) Upload(bucket string, key string, path string) error { file, err := os.Open(path) if err != nil { - log.Fatalln("Failed to open file", path, err) + return err } defer file.Close() diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go index c52527f..264b6ab 100644 --- a/cmd/booktopipeline/main.go +++ b/cmd/booktopipeline/main.go @@ -92,8 +92,6 @@ func main() { qid = conn.PreQueueId() } - // concurrent walking upload based on example at - // https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sdk-utilities.html verboselog.Println("Walking", bookdir) walker := make(fileWalk) go func() { diff --git a/cmd/mkpipeline/main.go b/cmd/mkpipeline/main.go index a32526a..55e0417 100644 --- a/cmd/mkpipeline/main.go +++ b/cmd/mkpipeline/main.go @@ -1,36 +1,34 @@ package main -// TODO: use the bookpipeline package for aws stuff // TODO: set up iam role and policy needed for ec2 instances to access this stuff; // see arn:aws:iam::557852942063:policy/pipelinestorageandqueue // and arn:aws:iam::557852942063:role/pipeliner // TODO: set up launch template for ec2 instances -// NOTE: potentially use json templates to define things, ala aws cli import ( "log" "os" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/sqs" + "rescribe.xyz/bookpipeline" ) +type MkPipeliner interface { + MinimalInit() error + CreateBucket(string) error + CreateQueue(string) error +} + func main() { if len(os.Args) != 1 { - log.Fatal("Usage: mkpipeline\n\nSets up necessary S3 buckets and SQS queues for our AWS pipeline\n") + log.Fatal("Usage: mkpipeline\n\nSets up necessary buckets and queues for our cloud pipeline\n") } - sess, err := session.NewSession(&aws.Config{ - Region: aws.String("eu-west-2"), - }) + var conn MkPipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: log.New(os.Stdout, "", 0)} + err := conn.MinimalInit() if err != nil { - log.Fatalf("Error: failed to set up aws session: %v\n", err) + log.Fatalln("Failed to set up cloud connection:", err) } - s3svc := s3.New(sess) - sqssvc := sqs.New(sess) prefix := "rescribe" buckets := []string{"inprogress", "done"} @@ -39,41 +37,18 @@ func main() { for _, bucket := range buckets { bname := prefix + bucket log.Printf("Creating bucket %s\n", bname) - _, err = s3svc.CreateBucket(&s3.CreateBucketInput{ - Bucket: aws.String(bname), - }) + err = conn.CreateBucket(bname) if err != nil { - aerr, ok := err.(awserr.Error) - if ok && (aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou) { - log.Printf("Bucket %s already exists\n", bname) - } else { - log.Fatalf("Error creating bucket %s: %v\n", bname, err) - } + log.Fatalln(err) } } for _, queue := range queues { qname := prefix + queue log.Printf("Creating queue %s\n", qname) - _, err = sqssvc.CreateQueue(&sqs.CreateQueueInput{ - QueueName: aws.String(qname), - Attributes: map[string]*string{ - "VisibilityTimeout": aws.String("120"), // 2 minutes - "MessageRetentionPeriod": aws.String("1209600"), // 14 days; max allowed by sqs - "ReceiveMessageWaitTimeSeconds": aws.String("20"), - }, - }) + err = conn.CreateQueue(qname) if err != nil { - aerr, ok := err.(awserr.Error) - // Note the QueueAlreadyExists code is only emitted if an existing queue - // has different attributes than the one that was being created. SQS just - // quietly ignores the CreateQueue request if it is identical to an - // existing queue. - if ok && aerr.Code() == sqs.ErrCodeQueueNameExists { - log.Fatalf("Error: Queue %s already exists but has different attributes\n", qname) - } else { - log.Fatalf("Error creating queue %s: %v\n", qname, err) - } + log.Fatalln(err) } } } -- cgit v1.2.1-24-ge1ad