summaryrefslogtreecommitdiff
path: root/pipelinepreprocess/main.go
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-08-19 17:19:11 +0100
committerNick White <git@njw.name>2019-08-19 17:19:11 +0100
commitfa5868074ba517c07849ca4bc0b51199d120eb61 (patch)
treea1870a0bbaaf3dfad888216f4e0184edb06af9ed /pipelinepreprocess/main.go
parent79aa0597bb1a21c2078fe0d3f27d9037604d91bf (diff)
Fix pipelinepreprocess segfaults
These were caused by using non-pointer methods, which meant that the values set in Init() were not saved.
Diffstat (limited to 'pipelinepreprocess/main.go')
-rw-r--r--pipelinepreprocess/main.go35
1 files changed, 17 insertions, 18 deletions
diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go
index c075fa9..1322652 100644
--- a/pipelinepreprocess/main.go
+++ b/pipelinepreprocess/main.go
@@ -87,7 +87,7 @@ type awsConn struct {
prequrl, ocrqurl string
}
-func (a awsConn) Init() error {
+func (a *awsConn) Init() error {
if a.region == "" {
return errors.New("No region set")
}
@@ -115,7 +115,6 @@ func (a awsConn) Init() error {
return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err))
}
a.prequrl = *result.QueueUrl
- a.logger.Println("preprocess queue URL", a.prequrl)
a.logger.Println("Getting OCR queue URL")
result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
@@ -128,7 +127,7 @@ func (a awsConn) Init() error {
return nil
}
-func (a awsConn) CheckQueue(url string) (qmsg, error) {
+func (a *awsConn) CheckQueue(url string) (qmsg, error) {
msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(1),
VisibilityTimeout: aws.Int64(HeartbeatTime * 2),
@@ -148,12 +147,12 @@ func (a awsConn) CheckQueue(url string) (qmsg, error) {
}
}
-func (a awsConn) CheckPreQueue() (qmsg, error) {
- a.logger.Println("Checking preprocessing queue for new messages:", a.prequrl)
+func (a *awsConn) CheckPreQueue() (qmsg, error) {
+ a.logger.Println("Checking preprocessing queue for new messages")
return a.CheckQueue(a.prequrl)
}
-func (a awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {
+func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {
for _ = range t.C {
duration := int64(HeartbeatTime * 2)
_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
@@ -168,12 +167,12 @@ func (a awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) e
return nil
}
-func (a awsConn) PreQueueHeartbeat(t *time.Ticker, msgHandle string) error {
+func (a *awsConn) PreQueueHeartbeat(t *time.Ticker, msgHandle string) error {
a.logger.Println("Starting preprocess queue heartbeat for", msgHandle)
return a.QueueHeartbeat(t, msgHandle, a.prequrl)
}
-func (a awsConn) ListObjects(bucket string, prefix string, names chan string) error {
+func (a *awsConn) ListObjects(bucket string, prefix string, names chan string) error {
err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
@@ -191,11 +190,11 @@ func (a awsConn) ListObjects(bucket string, prefix string, names chan string) er
return err
}
-func (a awsConn) ListInProgress(bookname string, names chan string) error {
+func (a *awsConn) ListInProgress(bookname string, names chan string) error {
return a.ListObjects("rescribeinprogress", bookname, names)
}
-func (a awsConn) AddToQueue(url string, msg string) error {
+func (a *awsConn) AddToQueue(url string, msg string) error {
_, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{
MessageBody: &msg,
QueueUrl: &url,
@@ -203,11 +202,11 @@ func (a awsConn) AddToQueue(url string, msg string) error {
return err
}
-func (a awsConn) AddToOCRQueue(msg string) error {
+func (a *awsConn) AddToOCRQueue(msg string) error {
return a.AddToQueue(a.ocrqurl, msg)
}
-func (a awsConn) DelFromQueue(url string, handle string) error {
+func (a *awsConn) DelFromQueue(url string, handle string) error {
_, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: &url,
ReceiptHandle: &handle,
@@ -215,11 +214,11 @@ func (a awsConn) DelFromQueue(url string, handle string) error {
return err
}
-func (a awsConn) DelFromPreQueue(handle string) error {
+func (a *awsConn) DelFromPreQueue(handle string) error {
return a.DelFromQueue(a.prequrl, handle)
}
-func (a awsConn) Download(bucket string, key string, path string) error {
+func (a *awsConn) Download(bucket string, key string, path string) error {
f, err := os.Create(path)
if err != nil {
return err
@@ -234,12 +233,12 @@ func (a awsConn) Download(bucket string, key string, path string) error {
return err
}
-func (a awsConn) DownloadFromInProgress(key string, path string) error {
+func (a *awsConn) DownloadFromInProgress(key string, path string) error {
a.logger.Println("Downloading", key)
return a.Download("rescribeinprogress", key, path)
}
-func (a awsConn) Upload(bucket string, key string, path string) error {
+func (a *awsConn) Upload(bucket string, key string, path string) error {
file, err := os.Open(path)
if err != nil {
log.Fatalln("Failed to open file", path, err)
@@ -254,7 +253,7 @@ func (a awsConn) Upload(bucket string, key string, path string) error {
return err
}
-func (a awsConn) UploadToInProgress(key string, path string) error {
+func (a *awsConn) UploadToInProgress(key string, path string) error {
a.logger.Println("Uploading", path)
return a.Upload("rescribeinprogress", key, path)
}
@@ -331,7 +330,7 @@ func main() {
alreadydone = regexp.MustCompile(PreprocPattern)
var conn Pipeliner
- conn = awsConn{ region: "eu-west-2", logger: verboselog }
+ conn = &awsConn{ region: "eu-west-2", logger: verboselog }
verboselog.Println("Setting up AWS session")
err := conn.Init()