summaryrefslogtreecommitdiff
path: root/pipelinepreprocess/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'pipelinepreprocess/main.go')
-rw-r--r--pipelinepreprocess/main.go259
1 files changed, 192 insertions, 67 deletions
diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go
index a223d0b..61dec96 100644
--- a/pipelinepreprocess/main.go
+++ b/pipelinepreprocess/main.go
@@ -8,7 +8,9 @@ package main
import (
"log"
"os"
+ "os/exec"
"path/filepath"
+ "strings"
"time"
"rescribe.xyz/go.git/preproc"
@@ -16,6 +18,8 @@ import (
const usage = "Usage: pipelinepreprocess [-v]\n\nContinuously checks the preprocess queue for books.\nWhen a book is found it's downloaded from the S3 inprogress bucket, preprocessed, and the results are uploaded to the S3 inprogress bucket. The book name is then added to the ocr queue, and removed from the preprocess queue.\n\n-v verbose\n"
+const training = "rescribealphav5" // TODO: allow to set on cmdline
+
// null writer to enable non-verbose logging to be discarded
type NullWriter bool
func (w NullWriter) Write(p []byte) (n int, err error) {
@@ -28,7 +32,8 @@ const PauseBetweenChecks = 60 * time.Second
type Clouder interface {
Init() error
- ListObjects(bucket string, prefix string, names chan string) error
+ //ListObjects(bucket string, prefix string, names chan string) error
+ ListObjects(bucket string, prefix string, names chan string)
Download(bucket string, key string, fn string) error
Upload(bucket string, key string, path string) error
CheckQueue(url string) (Qmsg, error)
@@ -39,13 +44,20 @@ type Clouder interface {
type Pipeliner interface {
Clouder
- ListInProgress(bookname string, names chan string) error
+ ListToPreprocess(bookname string, names chan string) error
+ ListToOCR(bookname string, names chan string) error
DownloadFromInProgress(key string, fn string) error
UploadToInProgress(key string, path string) error
CheckPreQueue() (Qmsg, error)
+ CheckOCRQueue() (Qmsg, error)
+ CheckAnalyseQueue() (Qmsg, error)
AddToOCRQueue(msg string) error
+ AddToAnalyseQueue(msg string) error
DelFromPreQueue(handle string) error
+ DelFromOCRQueue(handle string) error
PreQueueHeartbeat(t *time.Ticker, msgHandle string) error
+ OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error
+ Logger() *log.Logger
}
type Qmsg struct {
@@ -64,11 +76,27 @@ func download(dl chan string, pre chan string, conn Pipeliner, dir string) {
close(pre)
}
+func up(c chan string, done chan bool, conn Pipeliner, bookname string) {
+ for path := range c {
+ name := filepath.Base(path)
+ key := filepath.Join(bookname, name)
+ err := conn.UploadToInProgress(key, path)
+ if err != nil {
+ log.Fatalln("Failed to upload", path, err)
+ }
+ }
+
+ done <- true
+}
+
func preprocess(pre chan string, up chan string, logger *log.Logger) {
for path := range pre {
logger.Println("Preprocessing", path)
done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30)
if err != nil {
+ // TODO: have error channel to signal that things are screwy, which
+ // can close channels and stop the heartbeat, rather than just kill
+ // the whole program
log.Fatalln("Error preprocessing", path, err)
}
for _, p := range done {
@@ -78,17 +106,137 @@ func preprocess(pre chan string, up chan string, logger *log.Logger) {
close(up)
}
-func up(c chan string, done chan bool, conn Pipeliner, bookname string) {
- for path := range c {
- name := filepath.Base(path)
- key := filepath.Join(bookname, name)
- err := conn.UploadToInProgress(key, path)
+// TODO: use Tesseract API rather than calling the executable
+func ocr(toocr chan string, up chan string, logger *log.Logger) {
+ for path := range toocr {
+ logger.Println("OCRing", path)
+ name := strings.Replace(path, ".png", "", 1) // TODO: handle any file extension
+ cmd := exec.Command("tesseract", "-l", training, path, name, "hocr")
+ err := cmd.Run()
if err != nil {
- log.Fatalln("Failed to upload", path, err)
+ // TODO: have error channel to signal that things are screwy, which
+ // can close channels and stop the heartbeat, rather than just kill
+ // the whole program
+ log.Fatalln("Error ocring", path, err)
}
+ up <- name + ".hocr"
}
+ close(up)
+}
- done <- true
+func preProcBook(msg Qmsg, conn Pipeliner) {
+ bookname := msg.Body
+
+ t := time.NewTicker(HeartbeatTime * time.Second)
+ go conn.PreQueueHeartbeat(t, msg.Handle)
+
+ d := filepath.Join(os.TempDir(), bookname)
+ err := os.MkdirAll(d, 0755)
+ if err != nil {
+ log.Println("Failed to create directory", d, err)
+ t.Stop()
+ return
+ }
+
+ dl := make(chan string)
+ pre := make(chan string)
+ upc := make(chan string) // TODO: rename
+ done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated
+
+ // these functions will do their jobs when their channels have data
+ go download(dl, pre, conn, d)
+ go preprocess(pre, upc, conn.Logger())
+ go up(upc, done, conn, bookname)
+
+ conn.Logger().Println("Getting list of objects to download")
+ err = conn.ListToPreprocess(bookname, dl)
+ if err != nil {
+ log.Println("Failed to get list of files for book", bookname, err)
+ t.Stop()
+ return
+ }
+
+ // wait for the done channel to be posted to
+ <-done
+
+ conn.Logger().Println("Sending", bookname, "to OCR queue")
+ err = conn.AddToOCRQueue(bookname)
+ if err != nil {
+ log.Println("Error adding to ocr queue", bookname, err)
+ t.Stop()
+ return
+ }
+
+ t.Stop()
+
+ conn.Logger().Println("Deleting original message from preprocessing queue")
+ err = conn.DelFromPreQueue(msg.Handle)
+ if err != nil {
+ log.Println("Error deleting message from preprocessing queue", err)
+ }
+
+ err = os.RemoveAll(d)
+ if err != nil {
+ log.Println("Failed to remove directory", d, err)
+ }
+}
+
+func ocrBook(msg Qmsg, conn Pipeliner) {
+ bookname := msg.Body
+
+ t := time.NewTicker(HeartbeatTime * time.Second)
+ go conn.OCRQueueHeartbeat(t, msg.Handle)
+
+ d := filepath.Join(os.TempDir(), bookname)
+ err := os.MkdirAll(d, 0755)
+ if err != nil {
+ log.Println("Failed to create directory", d, err)
+ t.Stop()
+ return
+ }
+
+ dl := make(chan string)
+ ocrc := make(chan string)
+ upc := make(chan string) // TODO: rename
+ done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated
+
+ // these functions will do their jobs when their channels have data
+ go download(dl, ocrc, conn, d)
+ go ocr(ocrc, upc, conn.Logger())
+ go up(upc, done, conn, bookname)
+
+ conn.Logger().Println("Getting list of objects to download")
+ go conn.ListToOCR(bookname, dl)
+ //err = conn.ListToOCR(bookname, dl)
+ //if err != nil {
+ // log.Println("Failed to get list of files for book", bookname, err)
+ // t.Stop()
+ // return
+ //}
+
+ // wait for the done channel to be posted to
+ <-done
+
+ conn.Logger().Println("Sending", bookname, "to analyse queue")
+ err = conn.AddToAnalyseQueue(bookname)
+ if err != nil {
+ log.Println("Error adding to analyse queue", bookname, err)
+ t.Stop()
+ return
+ }
+
+ t.Stop()
+
+ conn.Logger().Println("Deleting original message from OCR queue")
+ err = conn.DelFromOCRQueue(msg.Handle)
+ if err != nil {
+ log.Println("Error deleting message from OCR queue", err)
+ }
+
+ err = os.RemoveAll(d)
+ if err != nil {
+ log.Println("Failed to remove directory", d, err)
+ }
}
func main() {
@@ -112,66 +260,43 @@ func main() {
if err != nil {
log.Fatalln("Error setting up cloud connection:", err)
}
+ verboselog.Println("Finished setting up AWS session")
- for {
- msg, err := conn.CheckPreQueue()
- if err != nil {
- log.Fatalln("Error checking preprocess queue", err)
- }
- if msg.Handle == "" {
- verboselog.Println("No message received, sleeping")
- time.Sleep(PauseBetweenChecks)
- continue
- }
- bookname := msg.Body
-
- t := time.NewTicker(HeartbeatTime * time.Second)
- go conn.PreQueueHeartbeat(t, msg.Handle)
-
-
- d := filepath.Join(os.TempDir(), bookname)
- err = os.MkdirAll(d, 0755)
- if err != nil {
- log.Fatalln("Failed to create directory", d, err)
- }
-
- dl := make(chan string)
- pre := make(chan string)
- upc := make(chan string) // TODO: rename
- done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated
-
- // these functions will do their jobs when their channels have data
- go download(dl, pre, conn, d)
- go preprocess(pre, upc, verboselog)
- go up(upc, done, conn, bookname)
-
-
- verboselog.Println("Getting list of objects to download")
- err = conn.ListInProgress(bookname, dl)
- if err != nil {
- log.Fatalln("Failed to get list of files for book", bookname, err)
- }
+ var checkPreQueue <-chan time.Time
+ var checkOCRQueue <-chan time.Time
+ checkPreQueue = time.After(0)
+ checkOCRQueue = time.After(0)
- // wait for the done channel to be posted to
- <-done
-
- verboselog.Println("Sending", bookname, "to OCR queue")
- err = conn.AddToOCRQueue(bookname)
- if err != nil {
- log.Fatalln("Error adding to ocr queue", bookname, err)
- }
-
- t.Stop()
-
- verboselog.Println("Deleting original message from preprocessing queue")
- err = conn.DelFromPreQueue(msg.Handle)
- if err != nil {
- log.Fatalln("Error deleting message from preprocessing queue", err)
- }
-
- err = os.RemoveAll(d)
- if err != nil {
- log.Fatalln("Failed to remove directory", d, err)
+ // TODO: use a buffer or something to limit number of running processes
+ // could start preprocbook / ocrbook and just have them listen on
+ // channels for stuff to do, that way they'd do things one at a time
+ // TODO: don't trigger the checkOCRQueue until a running thing has finished
+ for {
+ select {
+ case <- checkPreQueue:
+ msg, err := conn.CheckPreQueue()
+ checkPreQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ log.Println("Error checking preprocess queue", err)
+ continue
+ }
+ if msg.Handle == "" {
+ verboselog.Println("No message received on preprocess queue, sleeping")
+ continue
+ }
+ go preProcBook(msg, conn)
+ case <- checkOCRQueue:
+ msg, err := conn.CheckOCRQueue()
+ //checkOCRQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ log.Println("Error checking OCR queue", err)
+ continue
+ }
+ if msg.Handle == "" {
+ verboselog.Println("No message received on OCR queue, sleeping")
+ continue
+ }
+ go ocrBook(msg, conn)
}
}
}