summaryrefslogtreecommitdiff
path: root/bookpipeline
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-08-22 10:43:03 +0100
committerNick White <git@njw.name>2019-08-22 10:43:03 +0100
commite3c849545079fb998bb21a3644f5a0da36bce5d3 (patch)
treeb421c793aaf1557fde37591a0de0b6e30699995f /bookpipeline
parenta60577af86c64001773e31f1691bf4853efc4772 (diff)
gofmt
Diffstat (limited to 'bookpipeline')
-rw-r--r--bookpipeline/aws.go84
-rw-r--r--bookpipeline/main.go28
2 files changed, 57 insertions, 55 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index 1ac06de..2322ea2 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -24,11 +24,11 @@ type awsConn struct {
logger *log.Logger
// these are used internally
- sess *session.Session
- s3svc *s3.S3
- sqssvc *sqs.SQS
- downloader *s3manager.Downloader
- uploader *s3manager.Uploader
+ sess *session.Session
+ s3svc *s3.S3
+ sqssvc *sqs.SQS
+ downloader *s3manager.Downloader
+ uploader *s3manager.Uploader
prequrl, ocrqurl, analysequrl string
}
@@ -52,32 +52,32 @@ func (a *awsConn) Init() error {
a.downloader = s3manager.NewDownloader(a.sess)
a.uploader = s3manager.NewUploader(a.sess)
- a.logger.Println("Getting preprocess queue URL")
- result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
- QueueName: aws.String("rescribepreprocess"),
- })
- if err != nil {
- return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err))
- }
- a.prequrl = *result.QueueUrl
-
- a.logger.Println("Getting OCR queue URL")
- result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
- QueueName: aws.String("rescribeocr"),
- })
- if err != nil {
- return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err))
- }
- a.ocrqurl = *result.QueueUrl
-
- a.logger.Println("Getting analyse queue URL")
- result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
- QueueName: aws.String("rescribeanalyse"),
- })
- if err != nil {
- return errors.New(fmt.Sprintf("Error getting analyse queue URL: %s", err))
- }
- a.analysequrl = *result.QueueUrl
+ a.logger.Println("Getting preprocess queue URL")
+ result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String("rescribepreprocess"),
+ })
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err))
+ }
+ a.prequrl = *result.QueueUrl
+
+ a.logger.Println("Getting OCR queue URL")
+ result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String("rescribeocr"),
+ })
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err))
+ }
+ a.ocrqurl = *result.QueueUrl
+
+ a.logger.Println("Getting analyse queue URL")
+ result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String("rescribeanalyse"),
+ })
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error getting analyse queue URL: %s", err))
+ }
+ a.analysequrl = *result.QueueUrl
return nil
}
@@ -85,16 +85,16 @@ func (a *awsConn) Init() error {
func (a *awsConn) CheckQueue(url string) (Qmsg, error) {
msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(1),
- VisibilityTimeout: aws.Int64(HeartbeatTime * 2),
- WaitTimeSeconds: aws.Int64(20),
- QueueUrl: &url,
+ VisibilityTimeout: aws.Int64(HeartbeatTime * 2),
+ WaitTimeSeconds: aws.Int64(20),
+ QueueUrl: &url,
})
if err != nil {
return Qmsg{}, err
}
if len(msgResult.Messages) > 0 {
- msg := Qmsg{ Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body }
+ msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body}
a.logger.Println("Message received:", msg.Body)
return msg, nil
} else {
@@ -121,8 +121,8 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string)
for _ = range t.C {
duration := int64(HeartbeatTime * 2)
_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
- ReceiptHandle: &msgHandle,
- QueueUrl: &qurl,
+ ReceiptHandle: &msgHandle,
+ QueueUrl: &qurl,
VisibilityTimeout: &duration,
})
if err != nil {
@@ -183,7 +183,7 @@ func (a *awsConn) ListToOCR(bookname string) ([]string, error) {
}
// Filter out any object that looks like it hasn't already been preprocessed
for _, n := range objs {
- if ! preprocessed.MatchString(n) {
+ if !preprocessed.MatchString(n) {
a.logger.Println("Skipping item that looks like it is not preprocessed", n)
continue
}
@@ -195,7 +195,7 @@ func (a *awsConn) ListToOCR(bookname string) ([]string, error) {
func (a *awsConn) AddToQueue(url string, msg string) error {
_, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{
MessageBody: &msg,
- QueueUrl: &url,
+ QueueUrl: &url,
})
return err
}
@@ -210,7 +210,7 @@ func (a *awsConn) AddToAnalyseQueue(msg string) error {
func (a *awsConn) DelFromQueue(url string, handle string) error {
_, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{
- QueueUrl: &url,
+ QueueUrl: &url,
ReceiptHandle: &handle,
})
return err
@@ -234,8 +234,8 @@ func (a *awsConn) Download(bucket string, key string, path string) error {
_, err = a.downloader.Download(f,
&s3.GetObjectInput{
Bucket: aws.String(bucket),
- Key: &key,
- })
+ Key: &key,
+ })
return err
}
diff --git a/bookpipeline/main.go b/bookpipeline/main.go
index c1fb547..24e4e74 100644
--- a/bookpipeline/main.go
+++ b/bookpipeline/main.go
@@ -1,4 +1,5 @@
package main
+
// TODO: have logs go somewhere useful, like email
// TODO: check if images are prebinarised and if so skip multiple binarisation
@@ -37,6 +38,7 @@ const training = "rescribealphav5" // TODO: allow to set on cmdline
// null writer to enable non-verbose logging to be discarded
type NullWriter bool
+
func (w NullWriter) Write(p []byte) (n int, err error) {
return len(p), nil
}
@@ -175,11 +177,11 @@ func preprocBook(msg Qmsg, conn Pipeliner) error {
// wait for either the done or errc channel to be sent to
select {
- case err = <-errc:
- t.Stop()
- _ = os.RemoveAll(d)
- return err
- case <-done:
+ case err = <-errc:
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return err
+ case <-done:
}
conn.Logger().Println("Sending", bookname, "to OCR queue")
@@ -246,11 +248,11 @@ func ocrBook(msg Qmsg, conn Pipeliner) error {
// wait for either the done or errc channel to be sent to
select {
- case err = <-errc:
- t.Stop()
- _ = os.RemoveAll(d)
- return err
- case <-done:
+ case err = <-errc:
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return err
+ case <-done:
}
conn.Logger().Println("Sending", bookname, "to analyse queue")
@@ -292,7 +294,7 @@ func main() {
}
var conn Pipeliner
- conn = &awsConn{ region: "eu-west-2", logger: verboselog }
+ conn = &awsConn{region: "eu-west-2", logger: verboselog}
verboselog.Println("Setting up AWS session")
err := conn.Init()
@@ -308,7 +310,7 @@ func main() {
for {
select {
- case <- checkPreQueue:
+ case <-checkPreQueue:
msg, err := conn.CheckPreQueue()
checkPreQueue = time.After(PauseBetweenChecks)
if err != nil {
@@ -323,7 +325,7 @@ func main() {
if err != nil {
log.Println("Error during preprocess", err)
}
- case <- checkOCRQueue:
+ case <-checkOCRQueue:
msg, err := conn.CheckOCRQueue()
checkOCRQueue = time.After(PauseBetweenChecks)
if err != nil {