summaryrefslogtreecommitdiff
path: root/aws.go
diff options
context:
space:
mode:
Diffstat (limited to 'aws.go')
-rw-r--r--aws.go42
1 files changed, 32 insertions, 10 deletions
diff --git a/aws.go b/aws.go
index ecea193..74e6142 100644
--- a/aws.go
+++ b/aws.go
@@ -74,7 +74,7 @@ func (a *AwsConn) MinimalInit() error {
a.downloader = s3manager.NewDownloader(a.sess)
a.uploader = s3manager.NewUploader(a.sess)
- a.wipstorageid = "rescribeinprogress"
+ a.wipstorageid = storageWip
return nil
}
@@ -89,7 +89,7 @@ func (a *AwsConn) Init() error {
a.Logger.Println("Getting preprocess queue URL")
result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
- QueueName: aws.String("rescribepreprocess"),
+ QueueName: aws.String(queuePreProc),
})
if err != nil {
return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err))
@@ -98,7 +98,7 @@ func (a *AwsConn) Init() error {
a.Logger.Println("Getting wipeonly queue URL")
result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
- QueueName: aws.String("rescribewipeonly"),
+ QueueName: aws.String(queueWipeOnly),
})
if err != nil {
return errors.New(fmt.Sprintf("Error getting wipeonly queue URL: %s", err))
@@ -107,7 +107,7 @@ func (a *AwsConn) Init() error {
a.Logger.Println("Getting OCR queue URL")
result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
- QueueName: aws.String("rescribeocr"),
+ QueueName: aws.String(queueOcr),
})
if err != nil {
return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err))
@@ -116,7 +116,7 @@ func (a *AwsConn) Init() error {
a.Logger.Println("Getting analyse queue URL")
result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
- QueueName: aws.String("rescribeanalyse"),
+ QueueName: aws.String(queueAnalyse),
})
if err != nil {
return errors.New(fmt.Sprintf("Error getting analyse queue URL: %s", err))
@@ -125,7 +125,7 @@ func (a *AwsConn) Init() error {
a.Logger.Println("Getting OCR Page queue URL")
result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
- QueueName: aws.String("rescribeocrpage"),
+ QueueName: aws.String(queueOcrPage),
})
if err != nil {
return errors.New(fmt.Sprintf("Error getting OCR Page queue URL: %s", err))
@@ -445,12 +445,12 @@ func (a *AwsConn) StartInstances(n int) error {
InstanceCount: aws.Int64(int64(n)),
LaunchSpecification: &ec2.RequestSpotLaunchSpecification{
IamInstanceProfile: &ec2.IamInstanceProfileSpecification{
- Arn: aws.String("arn:aws:iam::557852942063:instance-profile/pipeliner"),
+ Arn: aws.String(spotProfile),
},
- ImageId: aws.String("ami-0bc6ef6900f6da5d3"),
- InstanceType: aws.String("m5.large"),
+ ImageId: aws.String(spotImage),
+ InstanceType: aws.String(spotType),
SecurityGroupIds: []*string{
- aws.String("sg-0be8a3ab89e7136b9"),
+ aws.String(spotSg),
},
},
Type: aws.String("one-time"),
@@ -463,3 +463,25 @@ func (a *AwsConn) StartInstances(n int) error {
func (a *AwsConn) Log(v ...interface{}) {
a.Logger.Print(v...)
}
+
+// mkpipeline sets up necessary buckets and queues for the pipeline
+func (a *AwsConn) MkPipeline() error {
+ buckets := []string{storageWip}
+ queues := []string{queuePreProc, queueWipeOnly, queueOcr, queueAnalyse, queueOcrPage}
+
+ for _, bucket := range buckets {
+ err := a.CreateBucket(bucket)
+ if err != nil {
+ return err
+ }
+ }
+
+ for _, queue := range queues {
+ err := a.CreateQueue(queue)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}