diff options
Diffstat (limited to 'pipelinepreprocess/main.go')
-rw-r--r-- | pipelinepreprocess/main.go | 35 |
1 files changed, 17 insertions, 18 deletions
diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go index c075fa9..1322652 100644 --- a/pipelinepreprocess/main.go +++ b/pipelinepreprocess/main.go @@ -87,7 +87,7 @@ type awsConn struct { prequrl, ocrqurl string } -func (a awsConn) Init() error { +func (a *awsConn) Init() error { if a.region == "" { return errors.New("No region set") } @@ -115,7 +115,6 @@ func (a awsConn) Init() error { return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err)) } a.prequrl = *result.QueueUrl - a.logger.Println("preprocess queue URL", a.prequrl) a.logger.Println("Getting OCR queue URL") result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ @@ -128,7 +127,7 @@ func (a awsConn) Init() error { return nil } -func (a awsConn) CheckQueue(url string) (qmsg, error) { +func (a *awsConn) CheckQueue(url string) (qmsg, error) { msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ MaxNumberOfMessages: aws.Int64(1), VisibilityTimeout: aws.Int64(HeartbeatTime * 2), @@ -148,12 +147,12 @@ func (a awsConn) CheckQueue(url string) (qmsg, error) { } } -func (a awsConn) CheckPreQueue() (qmsg, error) { - a.logger.Println("Checking preprocessing queue for new messages:", a.prequrl) +func (a *awsConn) CheckPreQueue() (qmsg, error) { + a.logger.Println("Checking preprocessing queue for new messages") return a.CheckQueue(a.prequrl) } -func (a awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { +func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { for _ = range t.C { duration := int64(HeartbeatTime * 2) _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ @@ -168,12 +167,12 @@ func (a awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) e return nil } -func (a awsConn) PreQueueHeartbeat(t *time.Ticker, msgHandle string) error { +func (a *awsConn) PreQueueHeartbeat(t *time.Ticker, msgHandle string) error { a.logger.Println("Starting preprocess queue heartbeat for", msgHandle) return a.QueueHeartbeat(t, msgHandle, a.prequrl) } -func (a awsConn) ListObjects(bucket string, prefix string, names chan string) error { +func (a *awsConn) ListObjects(bucket string, prefix string, names chan string) error { err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ Bucket: aws.String(bucket), Prefix: aws.String(prefix), @@ -191,11 +190,11 @@ func (a awsConn) ListObjects(bucket string, prefix string, names chan string) er return err } -func (a awsConn) ListInProgress(bookname string, names chan string) error { +func (a *awsConn) ListInProgress(bookname string, names chan string) error { return a.ListObjects("rescribeinprogress", bookname, names) } -func (a awsConn) AddToQueue(url string, msg string) error { +func (a *awsConn) AddToQueue(url string, msg string) error { _, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{ MessageBody: &msg, QueueUrl: &url, @@ -203,11 +202,11 @@ func (a awsConn) AddToQueue(url string, msg string) error { return err } -func (a awsConn) AddToOCRQueue(msg string) error { +func (a *awsConn) AddToOCRQueue(msg string) error { return a.AddToQueue(a.ocrqurl, msg) } -func (a awsConn) DelFromQueue(url string, handle string) error { +func (a *awsConn) DelFromQueue(url string, handle string) error { _, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ QueueUrl: &url, ReceiptHandle: &handle, @@ -215,11 +214,11 @@ func (a awsConn) DelFromQueue(url string, handle string) error { return err } -func (a awsConn) DelFromPreQueue(handle string) error { +func (a *awsConn) DelFromPreQueue(handle string) error { return a.DelFromQueue(a.prequrl, handle) } -func (a awsConn) Download(bucket string, key string, path string) error { +func (a *awsConn) Download(bucket string, key string, path string) error { f, err := os.Create(path) if err != nil { return err @@ -234,12 +233,12 @@ func (a awsConn) Download(bucket string, key string, path string) error { return err } -func (a awsConn) DownloadFromInProgress(key string, path string) error { +func (a *awsConn) DownloadFromInProgress(key string, path string) error { a.logger.Println("Downloading", key) return a.Download("rescribeinprogress", key, path) } -func (a awsConn) Upload(bucket string, key string, path string) error { +func (a *awsConn) Upload(bucket string, key string, path string) error { file, err := os.Open(path) if err != nil { log.Fatalln("Failed to open file", path, err) @@ -254,7 +253,7 @@ func (a awsConn) Upload(bucket string, key string, path string) error { return err } -func (a awsConn) UploadToInProgress(key string, path string) error { +func (a *awsConn) UploadToInProgress(key string, path string) error { a.logger.Println("Uploading", path) return a.Upload("rescribeinprogress", key, path) } @@ -331,7 +330,7 @@ func main() { alreadydone = regexp.MustCompile(PreprocPattern) var conn Pipeliner - conn = awsConn{ region: "eu-west-2", logger: verboselog } + conn = &awsConn{ region: "eu-west-2", logger: verboselog } verboselog.Println("Setting up AWS session") err := conn.Init() |