diff options
| -rw-r--r-- | bookpipeline/aws.go | 84 | ||||
| -rw-r--r-- | bookpipeline/main.go | 28 | 
2 files changed, 57 insertions, 55 deletions
| diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index 1ac06de..2322ea2 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -24,11 +24,11 @@ type awsConn struct {  	logger *log.Logger  	// these are used internally -	sess *session.Session -        s3svc *s3.S3 -        sqssvc *sqs.SQS -        downloader *s3manager.Downloader -	uploader *s3manager.Uploader +	sess                          *session.Session +	s3svc                         *s3.S3 +	sqssvc                        *sqs.SQS +	downloader                    *s3manager.Downloader +	uploader                      *s3manager.Uploader  	prequrl, ocrqurl, analysequrl string  } @@ -52,32 +52,32 @@ func (a *awsConn) Init() error {  	a.downloader = s3manager.NewDownloader(a.sess)  	a.uploader = s3manager.NewUploader(a.sess) -        a.logger.Println("Getting preprocess queue URL") -        result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ -                QueueName: aws.String("rescribepreprocess"), -        }) -        if err != nil { -                return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err)) -        } -        a.prequrl = *result.QueueUrl - -        a.logger.Println("Getting OCR queue URL") -        result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ -                QueueName: aws.String("rescribeocr"), -        }) -        if err != nil { -                return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err)) -        } -        a.ocrqurl = *result.QueueUrl - -        a.logger.Println("Getting analyse queue URL") -        result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ -                QueueName: aws.String("rescribeanalyse"), -        }) -        if err != nil { -                return errors.New(fmt.Sprintf("Error getting analyse queue URL: %s", err)) -        } -        a.analysequrl = *result.QueueUrl +	a.logger.Println("Getting preprocess queue URL") +	result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ +		QueueName: aws.String("rescribepreprocess"), +	}) +	if err != nil { +		return errors.New(fmt.Sprintf("Error getting preprocess queue URL: %s", err)) +	} +	a.prequrl = *result.QueueUrl + +	a.logger.Println("Getting OCR queue URL") +	result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ +		QueueName: aws.String("rescribeocr"), +	}) +	if err != nil { +		return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err)) +	} +	a.ocrqurl = *result.QueueUrl + +	a.logger.Println("Getting analyse queue URL") +	result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ +		QueueName: aws.String("rescribeanalyse"), +	}) +	if err != nil { +		return errors.New(fmt.Sprintf("Error getting analyse queue URL: %s", err)) +	} +	a.analysequrl = *result.QueueUrl  	return nil  } @@ -85,16 +85,16 @@ func (a *awsConn) Init() error {  func (a *awsConn) CheckQueue(url string) (Qmsg, error) {  	msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{  		MaxNumberOfMessages: aws.Int64(1), -		VisibilityTimeout: aws.Int64(HeartbeatTime * 2), -		WaitTimeSeconds: aws.Int64(20), -		QueueUrl: &url, +		VisibilityTimeout:   aws.Int64(HeartbeatTime * 2), +		WaitTimeSeconds:     aws.Int64(20), +		QueueUrl:            &url,  	})  	if err != nil {  		return Qmsg{}, err  	}  	if len(msgResult.Messages) > 0 { -		msg := Qmsg{ Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body } +		msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body}  		a.logger.Println("Message received:", msg.Body)  		return msg, nil  	} else { @@ -121,8 +121,8 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string)  	for _ = range t.C {  		duration := int64(HeartbeatTime * 2)  		_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ -			ReceiptHandle: &msgHandle, -			QueueUrl: &qurl, +			ReceiptHandle:     &msgHandle, +			QueueUrl:          &qurl,  			VisibilityTimeout: &duration,  		})  		if err != nil { @@ -183,7 +183,7 @@ func (a *awsConn) ListToOCR(bookname string) ([]string, error) {  	}  	// Filter out any object that looks like it hasn't already been preprocessed  	for _, n := range objs { -		if ! preprocessed.MatchString(n) { +		if !preprocessed.MatchString(n) {  			a.logger.Println("Skipping item that looks like it is not preprocessed", n)  			continue  		} @@ -195,7 +195,7 @@ func (a *awsConn) ListToOCR(bookname string) ([]string, error) {  func (a *awsConn) AddToQueue(url string, msg string) error {  	_, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{  		MessageBody: &msg, -		QueueUrl: &url, +		QueueUrl:    &url,  	})  	return err  } @@ -210,7 +210,7 @@ func (a *awsConn) AddToAnalyseQueue(msg string) error {  func (a *awsConn) DelFromQueue(url string, handle string) error {  	_, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ -		QueueUrl: &url, +		QueueUrl:      &url,  		ReceiptHandle: &handle,  	})  	return err @@ -234,8 +234,8 @@ func (a *awsConn) Download(bucket string, key string, path string) error {  	_, err = a.downloader.Download(f,  		&s3.GetObjectInput{  			Bucket: aws.String(bucket), -			Key: &key, -	}) +			Key:    &key, +		})  	return err  } diff --git a/bookpipeline/main.go b/bookpipeline/main.go index c1fb547..24e4e74 100644 --- a/bookpipeline/main.go +++ b/bookpipeline/main.go @@ -1,4 +1,5 @@  package main +  // TODO: have logs go somewhere useful, like email  // TODO: check if images are prebinarised and if so skip multiple binarisation @@ -37,6 +38,7 @@ const training = "rescribealphav5" // TODO: allow to set on cmdline  // null writer to enable non-verbose logging to be discarded  type NullWriter bool +  func (w NullWriter) Write(p []byte) (n int, err error) {  	return len(p), nil  } @@ -175,11 +177,11 @@ func preprocBook(msg Qmsg, conn Pipeliner) error {  	// wait for either the done or errc channel to be sent to  	select { -		case err = <-errc: -			t.Stop() -			_ = os.RemoveAll(d) -			return err -		case <-done: +	case err = <-errc: +		t.Stop() +		_ = os.RemoveAll(d) +		return err +	case <-done:  	}  	conn.Logger().Println("Sending", bookname, "to OCR queue") @@ -246,11 +248,11 @@ func ocrBook(msg Qmsg, conn Pipeliner) error {  	// wait for either the done or errc channel to be sent to  	select { -		case err = <-errc: -			t.Stop() -			_ = os.RemoveAll(d) -			return err -		case <-done: +	case err = <-errc: +		t.Stop() +		_ = os.RemoveAll(d) +		return err +	case <-done:  	}  	conn.Logger().Println("Sending", bookname, "to analyse queue") @@ -292,7 +294,7 @@ func main() {  	}  	var conn Pipeliner -	conn = &awsConn{ region: "eu-west-2", logger: verboselog } +	conn = &awsConn{region: "eu-west-2", logger: verboselog}  	verboselog.Println("Setting up AWS session")  	err := conn.Init() @@ -308,7 +310,7 @@ func main() {  	for {  		select { -		case <- checkPreQueue: +		case <-checkPreQueue:  			msg, err := conn.CheckPreQueue()  			checkPreQueue = time.After(PauseBetweenChecks)  			if err != nil { @@ -323,7 +325,7 @@ func main() {  			if err != nil {  				log.Println("Error during preprocess", err)  			} -		case <- checkOCRQueue: +		case <-checkOCRQueue:  			msg, err := conn.CheckOCRQueue()  			checkOCRQueue = time.After(PauseBetweenChecks)  			if err != nil { | 
