diff options
| -rw-r--r-- | bookpipeline/aws.go | 99 | ||||
| -rw-r--r-- | bookpipeline/main.go | 61 | 
2 files changed, 48 insertions, 112 deletions
| diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index 2322ea2..761031d 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -5,7 +5,6 @@ import (  	"fmt"  	"log"  	"os" -	"regexp"  	"time"  	"github.com/aws/aws-sdk-go/aws" @@ -30,6 +29,7 @@ type awsConn struct {  	downloader                    *s3manager.Downloader  	uploader                      *s3manager.Uploader  	prequrl, ocrqurl, analysequrl string +	wipstorageid                  string  }  func (a *awsConn) Init() error { @@ -79,6 +79,8 @@ func (a *awsConn) Init() error {  	}  	a.analysequrl = *result.QueueUrl +	a.wipstorageid = "rescribeinprogress" +  	return nil  } @@ -102,21 +104,6 @@ func (a *awsConn) CheckQueue(url string) (Qmsg, error) {  	}  } -func (a *awsConn) CheckPreQueue() (Qmsg, error) { -	a.logger.Println("Checking preprocessing queue for new messages") -	return a.CheckQueue(a.prequrl) -} - -func (a *awsConn) CheckOCRQueue() (Qmsg, error) { -	a.logger.Println("Checking OCR queue for new messages") -	return a.CheckQueue(a.ocrqurl) -} - -func (a *awsConn) CheckAnalyseQueue() (Qmsg, error) { -	a.logger.Println("Checking analyse queue for new messages") -	return a.CheckQueue(a.ocrqurl) -} -  func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {  	for _ = range t.C {  		duration := int64(HeartbeatTime * 2) @@ -132,14 +119,20 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string)  	return nil  } -func (a *awsConn) PreQueueHeartbeat(t *time.Ticker, msgHandle string) error { -	a.logger.Println("Starting preprocess queue heartbeat") -	return a.QueueHeartbeat(t, msgHandle, a.prequrl) +func (a *awsConn) PreQueueId() string { +	return a.prequrl  } -func (a *awsConn) OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error { -	a.logger.Println("Starting ocr queue heartbeat") -	return a.QueueHeartbeat(t, msgHandle, a.ocrqurl) +func (a *awsConn) OCRQueueId() string { +	return a.ocrqurl +} + +func (a *awsConn) AnalyseQueueId() string { +	return a.analysequrl +} + +func (a *awsConn) WIPStorageId() string { +	return a.wipstorageid  }  func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) { @@ -156,42 +149,6 @@ func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) {  	return names, err  } -func (a *awsConn) ListToPreprocess(bookname string) ([]string, error) { -	var names []string -	preprocessed := regexp.MustCompile(PreprocPattern) -	objs, err := a.ListObjects("rescribeinprogress", bookname) -	if err != nil { -		return names, err -	} -	// Filter out any object that looks like it's already been preprocessed -	for _, n := range objs { -		if preprocessed.MatchString(n) { -			a.logger.Println("Skipping item that looks like it has already been processed", n) -			continue -		} -		names = append(names, n) -	} -	return names, nil -} - -func (a *awsConn) ListToOCR(bookname string) ([]string, error) { -	var names []string -	preprocessed := regexp.MustCompile(PreprocPattern) -	objs, err := a.ListObjects("rescribeinprogress", bookname) -	if err != nil { -		return names, err -	} -	// Filter out any object that looks like it hasn't already been preprocessed -	for _, n := range objs { -		if !preprocessed.MatchString(n) { -			a.logger.Println("Skipping item that looks like it is not preprocessed", n) -			continue -		} -		names = append(names, n) -	} -	return names, nil -} -  func (a *awsConn) AddToQueue(url string, msg string) error {  	_, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{  		MessageBody: &msg, @@ -200,14 +157,6 @@ func (a *awsConn) AddToQueue(url string, msg string) error {  	return err  } -func (a *awsConn) AddToOCRQueue(msg string) error { -	return a.AddToQueue(a.ocrqurl, msg) -} - -func (a *awsConn) AddToAnalyseQueue(msg string) error { -	return a.AddToQueue(a.analysequrl, msg) -} -  func (a *awsConn) DelFromQueue(url string, handle string) error {  	_, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{  		QueueUrl:      &url, @@ -216,14 +165,6 @@ func (a *awsConn) DelFromQueue(url string, handle string) error {  	return err  } -func (a *awsConn) DelFromPreQueue(handle string) error { -	return a.DelFromQueue(a.prequrl, handle) -} - -func (a *awsConn) DelFromOCRQueue(handle string) error { -	return a.DelFromQueue(a.ocrqurl, handle) -} -  func (a *awsConn) Download(bucket string, key string, path string) error {  	f, err := os.Create(path)  	if err != nil { @@ -239,11 +180,6 @@ func (a *awsConn) Download(bucket string, key string, path string) error {  	return err  } -func (a *awsConn) DownloadFromInProgress(key string, path string) error { -	a.logger.Println("Downloading", key) -	return a.Download("rescribeinprogress", key, path) -} -  func (a *awsConn) Upload(bucket string, key string, path string) error {  	file, err := os.Open(path)  	if err != nil { @@ -259,11 +195,6 @@ func (a *awsConn) Upload(bucket string, key string, path string) error {  	return err  } -func (a *awsConn) UploadToInProgress(key string, path string) error { -	a.logger.Println("Uploading", path) -	return a.Upload("rescribeinprogress", key, path) -} -  func (a *awsConn) Logger() *log.Logger {  	return a.logger  } diff --git a/bookpipeline/main.go b/bookpipeline/main.go index 6a68c57..30f9cfa 100644 --- a/bookpipeline/main.go +++ b/bookpipeline/main.go @@ -11,6 +11,7 @@ import (  	"os"  	"os/exec"  	"path/filepath" +	"regexp"  	"strings"  	"time" @@ -56,19 +57,10 @@ type Clouder interface {  type Pipeliner interface {  	Clouder -	ListToPreprocess(bookname string) ([]string, error) -	ListToOCR(bookname string) ([]string, error) -	DownloadFromInProgress(key string, fn string) error -	UploadToInProgress(key string, path string) error -	CheckPreQueue() (Qmsg, error) -	CheckOCRQueue() (Qmsg, error) -	CheckAnalyseQueue() (Qmsg, error) -	AddToOCRQueue(msg string) error -	AddToAnalyseQueue(msg string) error -	DelFromPreQueue(handle string) error -	DelFromOCRQueue(handle string) error -	PreQueueHeartbeat(t *time.Ticker, msgHandle string) error -	OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error +	PreQueueId() string +	OCRQueueId() string +	AnalyseQueueId() string +	WIPStorageId() string  	Logger() *log.Logger  } @@ -79,7 +71,7 @@ type Qmsg struct {  func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error) {  	for key := range dl {  		fn := filepath.Join(dir, filepath.Base(key)) -		err := conn.DownloadFromInProgress(key, fn) +		err := conn.Download(conn.WIPStorageId(), key, fn)  		if err != nil {  			close(process)  			errc <- err @@ -94,7 +86,7 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha  	for path := range c {  		name := filepath.Base(path)  		key := filepath.Join(bookname, name) -		err := conn.UploadToInProgress(key, path) +		err := conn.Upload(conn.WIPStorageId(), key, path)  		if err != nil {  			errc <- err  			return @@ -138,11 +130,11 @@ func ocr(training string) func(chan string, chan string, chan error, *log.Logger  	}  } -func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger)) error { +func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {  	bookname := msg.Body  	t := time.NewTicker(HeartbeatTime * time.Second) -	go conn.OCRQueueHeartbeat(t, msg.Handle) +	go conn.QueueHeartbeat(t, msg.Handle, fromQueue)  	d := filepath.Join(os.TempDir(), bookname)  	err := os.MkdirAll(d, 0755) @@ -163,12 +155,20 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string  	go up(upc, done, conn, bookname, errc)  	conn.Logger().Println("Getting list of objects to download") -	todl, err := conn.ListToOCR(bookname) +	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)  	if err != nil {  		t.Stop()  		_ = os.RemoveAll(d)  		return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err))  	} +	var todl []string +	for _, n := range objs { +		if !match.MatchString(n) { +			conn.Logger().Println("Skipping item that doesn't match target", n) +			continue +		} +		todl = append(todl, n) +	}  	for _, a := range todl {  		dl <- a  	} @@ -183,21 +183,21 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string  	case <-done:  	} -	conn.Logger().Println("Sending", bookname, "to analyse queue") -	err = conn.AddToAnalyseQueue(bookname) +	conn.Logger().Println("Sending", bookname, "to queue") +	err = conn.AddToQueue(toQueue, bookname)  	if err != nil {  		t.Stop()  		_ = os.RemoveAll(d) -		return errors.New(fmt.Sprintf("Error adding to analyse queue %s: %s", bookname, err)) +		return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err))  	}  	t.Stop() -	conn.Logger().Println("Deleting original message from OCR queue") -	err = conn.DelFromOCRQueue(msg.Handle) +	conn.Logger().Println("Deleting original message from queue") +	err = conn.DelFromQueue(fromQueue, msg.Handle)  	if err != nil {  		_ = os.RemoveAll(d) -		return errors.New(fmt.Sprintf("Error deleting message from OCR queue: %s", err)) +		return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err))  	}  	err = os.RemoveAll(d) @@ -225,6 +225,11 @@ func main() {  		verboselog = log.New(n, "", log.LstdFlags)  	} +	// TODO: match jpg too +	origPattern := regexp.MustCompile(`[0-9]{4}.png$`) // TODO: match other file naming +	preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) +	//ocredPattern := regexp.MustCompile(`.hocr$`) +  	var conn Pipeliner  	conn = &awsConn{region: "eu-west-2", logger: verboselog} @@ -243,7 +248,7 @@ func main() {  	for {  		select {  		case <-checkPreQueue: -			msg, err := conn.CheckPreQueue() +			msg, err := conn.CheckQueue(conn.PreQueueId())  			checkPreQueue = time.After(PauseBetweenChecks)  			if err != nil {  				log.Println("Error checking preprocess queue", err) @@ -253,12 +258,12 @@ func main() {  				verboselog.Println("No message received on preprocess queue, sleeping")  				continue  			} -			err = processBook(msg, conn, preprocess) +			err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId())  			if err != nil {  				log.Println("Error during preprocess", err)  			}  		case <-checkOCRQueue: -			msg, err := conn.CheckOCRQueue() +			msg, err := conn.CheckQueue(conn.OCRQueueId())  			checkOCRQueue = time.After(PauseBetweenChecks)  			if err != nil {  				log.Println("Error checking OCR queue", err) @@ -268,7 +273,7 @@ func main() {  				verboselog.Println("No message received on OCR queue, sleeping")  				continue  			} -			err = processBook(msg, conn, ocr(*training)) +			err = processBook(msg, conn, ocr(*training), preprocessedPattern, conn.OCRQueueId(), conn.AnalyseQueueId())  			if err != nil {  				log.Println("Error during OCR process", err)  			} | 
