From ca1ee4ccb33784103339b4b52f8948aa0dd30263 Mon Sep 17 00:00:00 2001 From: Nick White Date: Wed, 11 Dec 2019 09:31:18 +0000 Subject: Use aws.go with mkpipeline too, plus fix one log.Fatal call in aws.go which should have been handled by caller --- aws.go | 64 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 5 deletions(-) (limited to 'aws.go') diff --git a/aws.go b/aws.go index c2aa35e..08a597e 100644 --- a/aws.go +++ b/aws.go @@ -48,8 +48,8 @@ type AwsConn struct { wipstorageid string } -// TODO: split this up, as not everything is needed for different uses -func (a *AwsConn) Init() error { +// MinimalInit does the bare minimum to initialise aws services +func (a *AwsConn) MinimalInit() error { if a.Region == "" { return errors.New("No Region set") } @@ -70,6 +70,19 @@ func (a *AwsConn) Init() error { a.downloader = s3manager.NewDownloader(a.sess) a.uploader = s3manager.NewUploader(a.sess) + a.wipstorageid = "rescribeinprogress" + + return nil +} + +// Init initialises aws services, also finding the urls needed to +// address SQS queues directly. +func (a *AwsConn) Init() error { + err := a.MinimalInit() + if err != nil { + return err + } + a.Logger.Println("Getting preprocess queue URL") result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String("rescribepreprocess"), @@ -115,8 +128,6 @@ func (a *AwsConn) Init() error { } a.ocrpgqurl = *result.QueueUrl - a.wipstorageid = "rescribeinprogress" - return nil } @@ -289,6 +300,49 @@ func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) { return prefixes, err } +// CreateBucket creates a new S3 bucket +func (a *AwsConn) CreateBucket(name string) error { + _, err := a.s3svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(name), + }) + if err != nil { + aerr, ok := err.(awserr.Error) + if ok && (aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou) { + a.Logger.Println("Bucket already exists:", name) + } else { + return errors.New(fmt.Sprintf("Error creating bucket %s: %v", name, err)) + } + } + return nil +} + +// CreateQueue creates a new SQS queue +// Note the queue attributes are currently hardcoded; it may make sense +// to specify them as arguments in the future. +func (a *AwsConn) CreateQueue(name string) error { + _, err := a.sqssvc.CreateQueue(&sqs.CreateQueueInput{ + QueueName: aws.String(name), + Attributes: map[string]*string{ + "VisibilityTimeout": aws.String("120"), // 2 minutes + "MessageRetentionPeriod": aws.String("1209600"), // 14 days; max allowed by sqs + "ReceiveMessageWaitTimeSeconds": aws.String("20"), + }, + }) + if err != nil { + aerr, ok := err.(awserr.Error) + // Note the QueueAlreadyExists code is only emitted if an existing queue + // has different attributes than the one that was being created. SQS just + // quietly ignores the CreateQueue request if it is identical to an + // existing queue. + if ok && aerr.Code() == sqs.ErrCodeQueueNameExists { + return errors.New("Error: Queue already exists but has different attributes:" + name) + } else { + return errors.New(fmt.Sprintf("Error creating queue %s: %v", name, err)) + } + } + return nil +} + func (a *AwsConn) AddToQueue(url string, msg string) error { _, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{ MessageBody: &msg, @@ -326,7 +380,7 @@ func (a *AwsConn) Download(bucket string, key string, path string) error { func (a *AwsConn) Upload(bucket string, key string, path string) error { file, err := os.Open(path) if err != nil { - log.Fatalln("Failed to open file", path, err) + return err } defer file.Close() -- cgit v1.2.1-24-ge1ad