summaryrefslogtreecommitdiff
path: root/bookpipeline/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'bookpipeline/main.go')
-rw-r--r--bookpipeline/main.go61
1 files changed, 33 insertions, 28 deletions
diff --git a/bookpipeline/main.go b/bookpipeline/main.go
index 6a68c57..30f9cfa 100644
--- a/bookpipeline/main.go
+++ b/bookpipeline/main.go
@@ -11,6 +11,7 @@ import (
"os"
"os/exec"
"path/filepath"
+ "regexp"
"strings"
"time"
@@ -56,19 +57,10 @@ type Clouder interface {
type Pipeliner interface {
Clouder
- ListToPreprocess(bookname string) ([]string, error)
- ListToOCR(bookname string) ([]string, error)
- DownloadFromInProgress(key string, fn string) error
- UploadToInProgress(key string, path string) error
- CheckPreQueue() (Qmsg, error)
- CheckOCRQueue() (Qmsg, error)
- CheckAnalyseQueue() (Qmsg, error)
- AddToOCRQueue(msg string) error
- AddToAnalyseQueue(msg string) error
- DelFromPreQueue(handle string) error
- DelFromOCRQueue(handle string) error
- PreQueueHeartbeat(t *time.Ticker, msgHandle string) error
- OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error
+ PreQueueId() string
+ OCRQueueId() string
+ AnalyseQueueId() string
+ WIPStorageId() string
Logger() *log.Logger
}
@@ -79,7 +71,7 @@ type Qmsg struct {
func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error) {
for key := range dl {
fn := filepath.Join(dir, filepath.Base(key))
- err := conn.DownloadFromInProgress(key, fn)
+ err := conn.Download(conn.WIPStorageId(), key, fn)
if err != nil {
close(process)
errc <- err
@@ -94,7 +86,7 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha
for path := range c {
name := filepath.Base(path)
key := filepath.Join(bookname, name)
- err := conn.UploadToInProgress(key, path)
+ err := conn.Upload(conn.WIPStorageId(), key, path)
if err != nil {
errc <- err
return
@@ -138,11 +130,11 @@ func ocr(training string) func(chan string, chan string, chan error, *log.Logger
}
}
-func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger)) error {
+func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
bookname := msg.Body
t := time.NewTicker(HeartbeatTime * time.Second)
- go conn.OCRQueueHeartbeat(t, msg.Handle)
+ go conn.QueueHeartbeat(t, msg.Handle, fromQueue)
d := filepath.Join(os.TempDir(), bookname)
err := os.MkdirAll(d, 0755)
@@ -163,12 +155,20 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string
go up(upc, done, conn, bookname, errc)
conn.Logger().Println("Getting list of objects to download")
- todl, err := conn.ListToOCR(bookname)
+ objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
if err != nil {
t.Stop()
_ = os.RemoveAll(d)
return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err))
}
+ var todl []string
+ for _, n := range objs {
+ if !match.MatchString(n) {
+ conn.Logger().Println("Skipping item that doesn't match target", n)
+ continue
+ }
+ todl = append(todl, n)
+ }
for _, a := range todl {
dl <- a
}
@@ -183,21 +183,21 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string
case <-done:
}
- conn.Logger().Println("Sending", bookname, "to analyse queue")
- err = conn.AddToAnalyseQueue(bookname)
+ conn.Logger().Println("Sending", bookname, "to queue")
+ err = conn.AddToQueue(toQueue, bookname)
if err != nil {
t.Stop()
_ = os.RemoveAll(d)
- return errors.New(fmt.Sprintf("Error adding to analyse queue %s: %s", bookname, err))
+ return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err))
}
t.Stop()
- conn.Logger().Println("Deleting original message from OCR queue")
- err = conn.DelFromOCRQueue(msg.Handle)
+ conn.Logger().Println("Deleting original message from queue")
+ err = conn.DelFromQueue(fromQueue, msg.Handle)
if err != nil {
_ = os.RemoveAll(d)
- return errors.New(fmt.Sprintf("Error deleting message from OCR queue: %s", err))
+ return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err))
}
err = os.RemoveAll(d)
@@ -225,6 +225,11 @@ func main() {
verboselog = log.New(n, "", log.LstdFlags)
}
+ // TODO: match jpg too
+ origPattern := regexp.MustCompile(`[0-9]{4}.png$`) // TODO: match other file naming
+ preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`)
+ //ocredPattern := regexp.MustCompile(`.hocr$`)
+
var conn Pipeliner
conn = &awsConn{region: "eu-west-2", logger: verboselog}
@@ -243,7 +248,7 @@ func main() {
for {
select {
case <-checkPreQueue:
- msg, err := conn.CheckPreQueue()
+ msg, err := conn.CheckQueue(conn.PreQueueId())
checkPreQueue = time.After(PauseBetweenChecks)
if err != nil {
log.Println("Error checking preprocess queue", err)
@@ -253,12 +258,12 @@ func main() {
verboselog.Println("No message received on preprocess queue, sleeping")
continue
}
- err = processBook(msg, conn, preprocess)
+ err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId())
if err != nil {
log.Println("Error during preprocess", err)
}
case <-checkOCRQueue:
- msg, err := conn.CheckOCRQueue()
+ msg, err := conn.CheckQueue(conn.OCRQueueId())
checkOCRQueue = time.After(PauseBetweenChecks)
if err != nil {
log.Println("Error checking OCR queue", err)
@@ -268,7 +273,7 @@ func main() {
verboselog.Println("No message received on OCR queue, sleeping")
continue
}
- err = processBook(msg, conn, ocr(*training))
+ err = processBook(msg, conn, ocr(*training), preprocessedPattern, conn.OCRQueueId(), conn.AnalyseQueueId())
if err != nil {
log.Println("Error during OCR process", err)
}