summaryrefslogtreecommitdiff
path: root/aws.go
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-12-11 09:31:18 +0000
committerNick White <git@njw.name>2019-12-11 09:31:18 +0000
commitca1ee4ccb33784103339b4b52f8948aa0dd30263 (patch)
treed6038aad2176c4bef6134735f0234e55b212cf05 /aws.go
parent844888519265ca6cf088f48640abbe55d734acc5 (diff)
Use aws.go with mkpipeline too, plus fix one log.Fatal call in aws.go which should have been handled by caller
Diffstat (limited to 'aws.go')
-rw-r--r--aws.go64
1 files changed, 59 insertions, 5 deletions
diff --git a/aws.go b/aws.go
index c2aa35e..08a597e 100644
--- a/aws.go
+++ b/aws.go
@@ -48,8 +48,8 @@ type AwsConn struct {
wipstorageid string
}
-// TODO: split this up, as not everything is needed for different uses
-func (a *AwsConn) Init() error {
+// MinimalInit does the bare minimum to initialise aws services
+func (a *AwsConn) MinimalInit() error {
if a.Region == "" {
return errors.New("No Region set")
}
@@ -70,6 +70,19 @@ func (a *AwsConn) Init() error {
a.downloader = s3manager.NewDownloader(a.sess)
a.uploader = s3manager.NewUploader(a.sess)
+ a.wipstorageid = "rescribeinprogress"
+
+ return nil
+}
+
+// Init initialises aws services, also finding the urls needed to
+// address SQS queues directly.
+func (a *AwsConn) Init() error {
+ err := a.MinimalInit()
+ if err != nil {
+ return err
+ }
+
a.Logger.Println("Getting preprocess queue URL")
result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String("rescribepreprocess"),
@@ -115,8 +128,6 @@ func (a *AwsConn) Init() error {
}
a.ocrpgqurl = *result.QueueUrl
- a.wipstorageid = "rescribeinprogress"
-
return nil
}
@@ -289,6 +300,49 @@ func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) {
return prefixes, err
}
+// CreateBucket creates a new S3 bucket
+func (a *AwsConn) CreateBucket(name string) error {
+ _, err := a.s3svc.CreateBucket(&s3.CreateBucketInput{
+ Bucket: aws.String(name),
+ })
+ if err != nil {
+ aerr, ok := err.(awserr.Error)
+ if ok && (aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou) {
+ a.Logger.Println("Bucket already exists:", name)
+ } else {
+ return errors.New(fmt.Sprintf("Error creating bucket %s: %v", name, err))
+ }
+ }
+ return nil
+}
+
+// CreateQueue creates a new SQS queue
+// Note the queue attributes are currently hardcoded; it may make sense
+// to specify them as arguments in the future.
+func (a *AwsConn) CreateQueue(name string) error {
+ _, err := a.sqssvc.CreateQueue(&sqs.CreateQueueInput{
+ QueueName: aws.String(name),
+ Attributes: map[string]*string{
+ "VisibilityTimeout": aws.String("120"), // 2 minutes
+ "MessageRetentionPeriod": aws.String("1209600"), // 14 days; max allowed by sqs
+ "ReceiveMessageWaitTimeSeconds": aws.String("20"),
+ },
+ })
+ if err != nil {
+ aerr, ok := err.(awserr.Error)
+ // Note the QueueAlreadyExists code is only emitted if an existing queue
+ // has different attributes than the one that was being created. SQS just
+ // quietly ignores the CreateQueue request if it is identical to an
+ // existing queue.
+ if ok && aerr.Code() == sqs.ErrCodeQueueNameExists {
+ return errors.New("Error: Queue already exists but has different attributes:" + name)
+ } else {
+ return errors.New(fmt.Sprintf("Error creating queue %s: %v", name, err))
+ }
+ }
+ return nil
+}
+
func (a *AwsConn) AddToQueue(url string, msg string) error {
_, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{
MessageBody: &msg,
@@ -326,7 +380,7 @@ func (a *AwsConn) Download(bucket string, key string, path string) error {
func (a *AwsConn) Upload(bucket string, key string, path string) error {
file, err := os.Open(path)
if err != nil {
- log.Fatalln("Failed to open file", path, err)
+ return err
}
defer file.Close()