diff options
Diffstat (limited to 'aws.go')
-rw-r--r-- | aws.go | 42 |
1 files changed, 32 insertions, 10 deletions
@@ -74,7 +74,7 @@ func (a *AwsConn) MinimalInit() error { a.downloader = s3manager.NewDownloader(a.sess) a.uploader = s3manager.NewUploader(a.sess) - a.wipstorageid = "rescribeinprogress" + a.wipstorageid = storageWip return nil } @@ -89,7 +89,7 @@ func (a *AwsConn) Init() error { a.Logger.Println("Getting preprocess queue URL") result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String("rescribepreprocess"), + QueueName: aws.String(queuePreProc), }) if err != nil { return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err)) @@ -98,7 +98,7 @@ func (a *AwsConn) Init() error { a.Logger.Println("Getting wipeonly queue URL") result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String("rescribewipeonly"), + QueueName: aws.String(queueWipeOnly), }) if err != nil { return errors.New(fmt.Sprintf("Error getting wipeonly queue URL: %s", err)) @@ -107,7 +107,7 @@ func (a *AwsConn) Init() error { a.Logger.Println("Getting OCR queue URL") result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String("rescribeocr"), + QueueName: aws.String(queueOcr), }) if err != nil { return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err)) @@ -116,7 +116,7 @@ func (a *AwsConn) Init() error { a.Logger.Println("Getting analyse queue URL") result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String("rescribeanalyse"), + QueueName: aws.String(queueAnalyse), }) if err != nil { return errors.New(fmt.Sprintf("Error getting analyse queue URL: %s", err)) @@ -125,7 +125,7 @@ func (a *AwsConn) Init() error { a.Logger.Println("Getting OCR Page queue URL") result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String("rescribeocrpage"), + QueueName: aws.String(queueOcrPage), }) if err != nil { return errors.New(fmt.Sprintf("Error getting OCR Page queue URL: %s", err)) @@ -445,12 +445,12 @@ func (a *AwsConn) StartInstances(n int) error { InstanceCount: aws.Int64(int64(n)), LaunchSpecification: &ec2.RequestSpotLaunchSpecification{ IamInstanceProfile: &ec2.IamInstanceProfileSpecification{ - Arn: aws.String("arn:aws:iam::557852942063:instance-profile/pipeliner"), + Arn: aws.String(spotProfile), }, - ImageId: aws.String("ami-0bc6ef6900f6da5d3"), - InstanceType: aws.String("m5.large"), + ImageId: aws.String(spotImage), + InstanceType: aws.String(spotType), SecurityGroupIds: []*string{ - aws.String("sg-0be8a3ab89e7136b9"), + aws.String(spotSg), }, }, Type: aws.String("one-time"), @@ -463,3 +463,25 @@ func (a *AwsConn) StartInstances(n int) error { func (a *AwsConn) Log(v ...interface{}) { a.Logger.Print(v...) } + +// mkpipeline sets up necessary buckets and queues for the pipeline +func (a *AwsConn) MkPipeline() error { + buckets := []string{storageWip} + queues := []string{queuePreProc, queueWipeOnly, queueOcr, queueAnalyse, queueOcrPage} + + for _, bucket := range buckets { + err := a.CreateBucket(bucket) + if err != nil { + return err + } + } + + for _, queue := range queues { + err := a.CreateQueue(queue) + if err != nil { + return err + } + } + + return nil +} |