summaryrefslogtreecommitdiff
path: root/pipelinepreprocess
diff options
context:
space:
mode:
Diffstat (limited to 'pipelinepreprocess')
-rw-r--r--pipelinepreprocess/main.go70
1 files changed, 40 insertions, 30 deletions
diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go
index b513f92..20a682b 100644
--- a/pipelinepreprocess/main.go
+++ b/pipelinepreprocess/main.go
@@ -6,6 +6,8 @@ package main
// TODO: check if images are prebinarised and if so skip multiple binarisation
import (
+ "errors"
+ "fmt"
"log"
"os"
"os/exec"
@@ -28,8 +30,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) {
const PauseBetweenChecks = 60 * time.Second
-// TODO: consider having the download etc functions return a channel like a generator, like in rob pike's talk
-
type Clouder interface {
Init() error
ListObjects(bucket string, prefix string) ([]string, error)
@@ -123,7 +123,7 @@ func ocr(toocr chan string, up chan string, logger *log.Logger) {
close(up)
}
-func preProcBook(msg Qmsg, conn Pipeliner) {
+func preProcBook(msg Qmsg, conn Pipeliner) error {
bookname := msg.Body
t := time.NewTicker(HeartbeatTime * time.Second)
@@ -132,9 +132,8 @@ func preProcBook(msg Qmsg, conn Pipeliner) {
d := filepath.Join(os.TempDir(), bookname)
err := os.MkdirAll(d, 0755)
if err != nil {
- log.Println("Failed to create directory", d, err)
t.Stop()
- return
+ return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err))
}
dl := make(chan string)
@@ -150,9 +149,9 @@ func preProcBook(msg Qmsg, conn Pipeliner) {
conn.Logger().Println("Getting list of objects to download")
todl, err := conn.ListToPreprocess(bookname)
if err != nil {
- log.Println("Failed to get list of files for book", bookname, err)
t.Stop()
- return
+ _ = os.RemoveAll(d)
+ return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err))
}
for _, d := range todl {
dl <- d
@@ -164,9 +163,9 @@ func preProcBook(msg Qmsg, conn Pipeliner) {
conn.Logger().Println("Sending", bookname, "to OCR queue")
err = conn.AddToOCRQueue(bookname)
if err != nil {
- log.Println("Error adding to ocr queue", bookname, err)
t.Stop()
- return
+ _ = os.RemoveAll(d)
+ return errors.New(fmt.Sprintf("Error adding to ocr queue %s: %s", bookname, err))
}
t.Stop()
@@ -174,16 +173,19 @@ func preProcBook(msg Qmsg, conn Pipeliner) {
conn.Logger().Println("Deleting original message from preprocessing queue")
err = conn.DelFromPreQueue(msg.Handle)
if err != nil {
- log.Println("Error deleting message from preprocessing queue", err)
+ _ = os.RemoveAll(d)
+ return errors.New(fmt.Sprintf("Error deleting message from preprocessing queue: %s", err))
}
err = os.RemoveAll(d)
if err != nil {
- log.Println("Failed to remove directory", d, err)
+ return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err))
}
+
+ return nil
}
-func ocrBook(msg Qmsg, conn Pipeliner) {
+func ocrBook(msg Qmsg, conn Pipeliner) error {
bookname := msg.Body
t := time.NewTicker(HeartbeatTime * time.Second)
@@ -192,9 +194,8 @@ func ocrBook(msg Qmsg, conn Pipeliner) {
d := filepath.Join(os.TempDir(), bookname)
err := os.MkdirAll(d, 0755)
if err != nil {
- log.Println("Failed to create directory", d, err)
t.Stop()
- return
+ return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err))
}
dl := make(chan string)
@@ -210,12 +211,12 @@ func ocrBook(msg Qmsg, conn Pipeliner) {
conn.Logger().Println("Getting list of objects to download")
todl, err := conn.ListToOCR(bookname)
if err != nil {
- log.Println("Failed to get list of files for book", bookname, err)
t.Stop()
- return
+ _ = os.RemoveAll(d)
+ return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err))
}
- for _, d := range todl {
- dl <- d
+ for _, a := range todl {
+ dl <- a
}
// wait for the done channel to be posted to
@@ -224,9 +225,9 @@ func ocrBook(msg Qmsg, conn Pipeliner) {
conn.Logger().Println("Sending", bookname, "to analyse queue")
err = conn.AddToAnalyseQueue(bookname)
if err != nil {
- log.Println("Error adding to analyse queue", bookname, err)
t.Stop()
- return
+ _ = os.RemoveAll(d)
+ return errors.New(fmt.Sprintf("Error adding to analyse queue %s: %s", bookname, err))
}
t.Stop()
@@ -234,13 +235,16 @@ func ocrBook(msg Qmsg, conn Pipeliner) {
conn.Logger().Println("Deleting original message from OCR queue")
err = conn.DelFromOCRQueue(msg.Handle)
if err != nil {
- log.Println("Error deleting message from OCR queue", err)
+ _ = os.RemoveAll(d)
+ return errors.New(fmt.Sprintf("Error deleting message from OCR queue: %s", err))
}
err = os.RemoveAll(d)
if err != nil {
- log.Println("Failed to remove directory", d, err)
+ return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err))
}
+
+ return nil
}
func main() {
@@ -271,36 +275,42 @@ func main() {
checkPreQueue = time.After(0)
checkOCRQueue = time.After(0)
- // TODO: use a buffer or something to limit number of running processes
- // could start preprocbook / ocrbook and just have them listen on
- // channels for stuff to do, that way they'd do things one at a time
- // TODO: don't trigger the checkOCRQueue until a running thing has finished
for {
select {
case <- checkPreQueue:
msg, err := conn.CheckPreQueue()
- checkPreQueue = time.After(PauseBetweenChecks)
if err != nil {
log.Println("Error checking preprocess queue", err)
+ checkPreQueue = time.After(PauseBetweenChecks)
continue
}
if msg.Handle == "" {
verboselog.Println("No message received on preprocess queue, sleeping")
+ checkPreQueue = time.After(PauseBetweenChecks)
continue
}
- go preProcBook(msg, conn)
+ err = preProcBook(msg, conn)
+ if err != nil {
+ log.Println("Error during preprocess", err)
+ }
+ checkPreQueue = time.After(0)
case <- checkOCRQueue:
msg, err := conn.CheckOCRQueue()
- //checkOCRQueue = time.After(PauseBetweenChecks)
if err != nil {
log.Println("Error checking OCR queue", err)
+ checkOCRQueue = time.After(PauseBetweenChecks)
continue
}
if msg.Handle == "" {
verboselog.Println("No message received on OCR queue, sleeping")
+ checkOCRQueue = time.After(PauseBetweenChecks)
continue
}
- go ocrBook(msg, conn)
+ err = ocrBook(msg, conn)
+ if err != nil {
+ log.Println("Error during OCR process", err)
+ }
+ checkOCRQueue = time.After(0)
}
}
}