summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pipelinepreprocess/main.go26
1 files changed, 24 insertions, 2 deletions
diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go
index d4aa6a4..9a77cf6 100644
--- a/pipelinepreprocess/main.go
+++ b/pipelinepreprocess/main.go
@@ -3,6 +3,7 @@ package main
// TODO: handle errors more smartly than just always fatal erroring
// - read the sdk guarantees on retrying and ensure we retry some times before giving up if necessary
// - cancel the current book processing rather than killing the program in the case of a nonrecoverable error
+// TODO: check if images are prebinarised and if so skip multiple binarisation
import (
"log"
@@ -19,6 +20,15 @@ import (
"rescribe.xyz/go.git/preproc"
)
+const usage = "Usage: pipelinepreprocess [-v]\n\nContinuously checks the preprocess queue for books.\nWhen a book is found it's downloaded from the S3 inprogress bucket, preprocessed, and the results are uploaded to the S3 inprogress bucket. The book name is then added to the ocr queue, and removed from the preprocess queue.\n\n -v: verbose\n"
+
+// null writer to enable non-verbose logging to be discarded
+type NullWriter bool
+func (w NullWriter) Write(p []byte) (n int, err error) {
+ return len(p), nil
+}
+var verboselog *log.Logger
+
const HeartbeatTime = 60
const PauseBetweenChecks = 60 * time.Second
@@ -34,6 +44,7 @@ const PauseBetweenChecks = 60 * time.Second
func download(dl chan string, pre chan string, downloader *s3manager.Downloader, dir string) {
for key := range dl {
+ verboselog.Println("Downloading", key)
fn := filepath.Join(dir, filepath.Base(key))
f, err := os.Create(fn)
if err != nil {
@@ -55,6 +66,7 @@ func download(dl chan string, pre chan string, downloader *s3manager.Downloader,
func preprocess(pre chan string, up chan string) {
for path := range pre {
+ verboselog.Println("Preprocessing", path)
done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30)
if err != nil {
log.Fatalln("Error preprocessing", path, err)
@@ -68,6 +80,7 @@ func preprocess(pre chan string, up chan string) {
func up(c chan string, done chan bool, uploader *s3manager.Uploader, bookname string) {
for path := range c {
+ verboselog.Println("Uploading", path)
name := filepath.Base(path)
file, err := os.Open(path)
if err != nil {
@@ -103,8 +116,15 @@ func heartbeat(h *time.Ticker, msgHandle string, qurl string, sqssvc *sqs.SQS) {
}
func main() {
- if len(os.Args) != 1 {
- log.Fatal("Usage: pipelinepreprocess\n\nContinuously checks the preprocess queue for books.\nWhen a book is found it's downloaded from the S3 inprogress bucket, preprocessed, and the results are uploaded to the S3 inprogress bucket. The book name is then added to the ocr queue, and removed from the preprocess queue.\n")
+ if len(os.Args) > 1 {
+ if os.Args[1] == "-v" {
+ verboselog = log.New(os.Stdout, "", log.LstdFlags)
+ } else {
+ log.Fatal(usage)
+ }
+ } else {
+ var n NullWriter
+ verboselog = log.New(n, "", log.LstdFlags)
}
sess, err := session.NewSession(&aws.Config{
@@ -137,6 +157,7 @@ func main() {
ocrqurl := *result.QueueUrl
for {
+ verboselog.Println("Checking preprocessing queue for new messages")
msgResult, err := sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(1),
VisibilityTimeout: aws.Int64(HeartbeatTime * 2),
@@ -150,6 +171,7 @@ func main() {
var bookname string
if len(msgResult.Messages) > 0 {
bookname = *msgResult.Messages[0].Body
+ verboselog.Println("Message received:", bookname)
} else {
time.Sleep(PauseBetweenChecks)
continue