diff options
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/bookpipeline/main.go | 140 | ||||
-rw-r--r-- | cmd/lspipeline/main.go | 2 | ||||
-rw-r--r-- | cmd/mkpipeline/main.go | 2 |
3 files changed, 142 insertions, 2 deletions
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 0ed0d67..d70dbc7 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -17,7 +17,7 @@ import ( "rescribe.xyz/utils/pkg/hocr" ) -const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-na] [-t training] +const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-nop] [-na] [-t training] Watches the preprocess, ocr and analyse queues for book names. When one is found this general process is followed: @@ -60,6 +60,7 @@ type Pipeliner interface { PreQueueId() string WipeQueueId() string OCRQueueId() string + OCRPageQueueId() string AnalyseQueueId() string WIPStorageId() string GetLogger() *log.Logger @@ -277,6 +278,122 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri } } +// allOCRed checks whether all pages of a book have been OCRed. +// This is determined by whether every _bin0.?.png file has a +// corresponding .hocr file. +func allOCRed(bookname string, conn Pipeliner) bool { + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + return false + } + + // Full wipePattern can match things like 0000.png which getgbook + // can emit but aren't ocr-able + //wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`) + wipePattern := regexp.MustCompile(`[0-9]{6}(.bin)?.png$`) + preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) + + atleastone := false + for _, png := range objs { + if wipePattern.MatchString(png) || preprocessedPattern.MatchString(png) { + atleastone = true + found := false + b := strings.TrimSuffix(filepath.Base(png), ".png") + hocrname := bookname + "/" + b + ".hocr" + for _, hocr := range objs { + if hocr == hocrname { + found = true + break + } + } + if found == false { + return false + } + } + } + if atleastone == false { + return false + } + return true +} + +// ocrPage OCRs a page based on a message. It may make sense to +// roll this back into processBook (on which it is based) once +// working well. +func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { + dl := make(chan string) + msgc := make(chan bookpipeline.Qmsg) + processc := make(chan string) + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) + + bookname := filepath.Dir(msg.Body) + + d := filepath.Join(os.TempDir(), bookname) + err := os.MkdirAll(d, 0755) + if err != nil { + return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err)) + } + + t := time.NewTicker(HeartbeatTime * time.Second) + go heartbeat(conn, t, msg, fromQueue, msgc, errc) + + // these functions will do their jobs when their channels have data + go download(dl, processc, conn, d, errc, conn.GetLogger()) + go process(processc, upc, errc, conn.GetLogger()) + go up(upc, done, conn, bookname, errc, conn.GetLogger()) + + dl <- msg.Body + close(dl) + + // wait for either the done or errc channel to be sent to + select { + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + return err + case <-done: + } + + if allOCRed(bookname, conn) && toQueue != "" { + conn.GetLogger().Println("Sending", bookname, "to queue", toQueue) + err = conn.AddToQueue(toQueue, bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err)) + } + } + + t.Stop() + + // check whether we're using a newer msg handle + select { + case m, ok := <-msgc: + if ok { + msg = m + conn.GetLogger().Println("Using new message handle to delete message from old queue") + } + default: + conn.GetLogger().Println("Using original message handle to delete message from old queue") + } + + conn.GetLogger().Println("Deleting original message from queue", fromQueue) + err = conn.DelFromQueue(fromQueue, msg.Handle) + if err != nil { + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err)) + } + + err = os.RemoveAll(d) + if err != nil { + return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err)) + } + + return nil +} + func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { dl := make(chan string) msgc := make(chan bookpipeline.Qmsg) @@ -374,6 +491,7 @@ func main() { nopreproc := flag.Bool("np", false, "disable preprocessing") nowipe := flag.Bool("nw", false, "disable wipeonly") noocr := flag.Bool("no", false, "disable ocr") + noocrpg := flag.Bool("nop", false, "disable ocr on individual pages") noanalyse := flag.Bool("na", false, "disable analysis") flag.Usage = func() { @@ -408,6 +526,7 @@ func main() { var checkPreQueue <-chan time.Time var checkWipeQueue <-chan time.Time var checkOCRQueue <-chan time.Time + var checkOCRPageQueue <-chan time.Time var checkAnalyseQueue <-chan time.Time if !*nopreproc { checkPreQueue = time.After(0) @@ -418,6 +537,9 @@ func main() { if !*noocr { checkOCRQueue = time.After(0) } + if !*noocrpg { + checkOCRPageQueue = time.After(0) + } if !*noanalyse { checkAnalyseQueue = time.After(0) } @@ -456,6 +578,22 @@ func main() { if err != nil { log.Println("Error during wipe", err) } + case <-checkOCRPageQueue: + msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatTime*2) + checkOCRPageQueue = time.After(PauseBetweenChecks) + if err != nil { + log.Println("Error checking OCR Page queue", err) + continue + } + if msg.Handle == "" { + verboselog.Println("No message received on OCR Page queue, sleeping") + continue + } + verboselog.Println("Message received on OCR Page queue, processing", msg.Body) + err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) + if err != nil { + log.Println("Error during OCR Page process", err) + } case <-checkOCRQueue: msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime*2) checkOCRQueue = time.After(PauseBetweenChecks) diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index 0b8ce49..a32d851 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -28,6 +28,7 @@ type LsPipeliner interface { PreQueueId() string WipeQueueId() string OCRQueueId() string + OCRPageQueueId() string AnalyseQueueId() string GetQueueDetails(url string) (string, string, error) GetInstanceDetails() ([]bookpipeline.InstanceDetails, error) @@ -62,6 +63,7 @@ func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) { {"preprocess", conn.PreQueueId()}, {"wipeonly", conn.WipeQueueId()}, {"ocr", conn.OCRQueueId()}, + {"ocrpage", conn.OCRPageQueueId()}, {"analyse", conn.AnalyseQueueId()}, } for _, q := range queues { diff --git a/cmd/mkpipeline/main.go b/cmd/mkpipeline/main.go index e37a56d..a32526a 100644 --- a/cmd/mkpipeline/main.go +++ b/cmd/mkpipeline/main.go @@ -34,7 +34,7 @@ func main() { prefix := "rescribe" buckets := []string{"inprogress", "done"} - queues := []string{"preprocess", "wipeonly", "ocr", "analyse"} + queues := []string{"preprocess", "wipeonly", "ocr", "analyse", "ocrpage"} for _, bucket := range buckets { bname := prefix + bucket |