summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-11-19 14:47:41 +0000
committerNick White <git@njw.name>2019-11-19 14:47:41 +0000
commit7693065b8b8a9eff1f17dc993ae1235984872e09 (patch)
treea3ca625040f7f016456957ecd48d557eadd0de65
parent08a305c263329c0d27efd36e3da6d4befcfb4240 (diff)
Add ocrpage queue for processing individual pages
This should be a good way to get around the ongoing heartbeat issue, as individual page jobs will never come close to a the 12 hour mark that can cause the bug. The OCR page processing is done and working now, still to do is to populate the queue (rather than the ocr queue) after preprocessing / wiping.
-rw-r--r--aws.go14
-rw-r--r--cmd/bookpipeline/main.go140
-rw-r--r--cmd/lspipeline/main.go2
-rw-r--r--cmd/mkpipeline/main.go2
4 files changed, 156 insertions, 2 deletions
diff --git a/aws.go b/aws.go
index 73f3b2f..f5ac338 100644
--- a/aws.go
+++ b/aws.go
@@ -44,6 +44,7 @@ type AwsConn struct {
downloader *s3manager.Downloader
uploader *s3manager.Uploader
wipequrl, prequrl, ocrqurl, analysequrl string
+ ocrpgqurl string
wipstorageid string
}
@@ -105,6 +106,15 @@ func (a *AwsConn) Init() error {
}
a.analysequrl = *result.QueueUrl
+ a.Logger.Println("Getting OCR Page queue URL")
+ result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String("rescribeocrpage"),
+ })
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error getting OCR Page queue URL: %s", err))
+ }
+ a.ocrpgqurl = *result.QueueUrl
+
a.wipstorageid = "rescribeinprogress"
return nil
@@ -224,6 +234,10 @@ func (a *AwsConn) OCRQueueId() string {
return a.ocrqurl
}
+func (a *AwsConn) OCRPageQueueId() string {
+ return a.ocrpgqurl
+}
+
func (a *AwsConn) AnalyseQueueId() string {
return a.analysequrl
}
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go
index 0ed0d67..d70dbc7 100644
--- a/cmd/bookpipeline/main.go
+++ b/cmd/bookpipeline/main.go
@@ -17,7 +17,7 @@ import (
"rescribe.xyz/utils/pkg/hocr"
)
-const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-na] [-t training]
+const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-nop] [-na] [-t training]
Watches the preprocess, ocr and analyse queues for book names. When
one is found this general process is followed:
@@ -60,6 +60,7 @@ type Pipeliner interface {
PreQueueId() string
WipeQueueId() string
OCRQueueId() string
+ OCRPageQueueId() string
AnalyseQueueId() string
WIPStorageId() string
GetLogger() *log.Logger
@@ -277,6 +278,122 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri
}
}
+// allOCRed checks whether all pages of a book have been OCRed.
+// This is determined by whether every _bin0.?.png file has a
+// corresponding .hocr file.
+func allOCRed(bookname string, conn Pipeliner) bool {
+ objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
+ if err != nil {
+ return false
+ }
+
+ // Full wipePattern can match things like 0000.png which getgbook
+ // can emit but aren't ocr-able
+ //wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`)
+ wipePattern := regexp.MustCompile(`[0-9]{6}(.bin)?.png$`)
+ preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`)
+
+ atleastone := false
+ for _, png := range objs {
+ if wipePattern.MatchString(png) || preprocessedPattern.MatchString(png) {
+ atleastone = true
+ found := false
+ b := strings.TrimSuffix(filepath.Base(png), ".png")
+ hocrname := bookname + "/" + b + ".hocr"
+ for _, hocr := range objs {
+ if hocr == hocrname {
+ found = true
+ break
+ }
+ }
+ if found == false {
+ return false
+ }
+ }
+ }
+ if atleastone == false {
+ return false
+ }
+ return true
+}
+
+// ocrPage OCRs a page based on a message. It may make sense to
+// roll this back into processBook (on which it is based) once
+// working well.
+func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error {
+ dl := make(chan string)
+ msgc := make(chan bookpipeline.Qmsg)
+ processc := make(chan string)
+ upc := make(chan string)
+ done := make(chan bool)
+ errc := make(chan error)
+
+ bookname := filepath.Dir(msg.Body)
+
+ d := filepath.Join(os.TempDir(), bookname)
+ err := os.MkdirAll(d, 0755)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err))
+ }
+
+ t := time.NewTicker(HeartbeatTime * time.Second)
+ go heartbeat(conn, t, msg, fromQueue, msgc, errc)
+
+ // these functions will do their jobs when their channels have data
+ go download(dl, processc, conn, d, errc, conn.GetLogger())
+ go process(processc, upc, errc, conn.GetLogger())
+ go up(upc, done, conn, bookname, errc, conn.GetLogger())
+
+ dl <- msg.Body
+ close(dl)
+
+ // wait for either the done or errc channel to be sent to
+ select {
+ case err = <-errc:
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return err
+ case <-done:
+ }
+
+ if allOCRed(bookname, conn) && toQueue != "" {
+ conn.GetLogger().Println("Sending", bookname, "to queue", toQueue)
+ err = conn.AddToQueue(toQueue, bookname)
+ if err != nil {
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err))
+ }
+ }
+
+ t.Stop()
+
+ // check whether we're using a newer msg handle
+ select {
+ case m, ok := <-msgc:
+ if ok {
+ msg = m
+ conn.GetLogger().Println("Using new message handle to delete message from old queue")
+ }
+ default:
+ conn.GetLogger().Println("Using original message handle to delete message from old queue")
+ }
+
+ conn.GetLogger().Println("Deleting original message from queue", fromQueue)
+ err = conn.DelFromQueue(fromQueue, msg.Handle)
+ if err != nil {
+ _ = os.RemoveAll(d)
+ return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err))
+ }
+
+ err = os.RemoveAll(d)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err))
+ }
+
+ return nil
+}
+
func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
dl := make(chan string)
msgc := make(chan bookpipeline.Qmsg)
@@ -374,6 +491,7 @@ func main() {
nopreproc := flag.Bool("np", false, "disable preprocessing")
nowipe := flag.Bool("nw", false, "disable wipeonly")
noocr := flag.Bool("no", false, "disable ocr")
+ noocrpg := flag.Bool("nop", false, "disable ocr on individual pages")
noanalyse := flag.Bool("na", false, "disable analysis")
flag.Usage = func() {
@@ -408,6 +526,7 @@ func main() {
var checkPreQueue <-chan time.Time
var checkWipeQueue <-chan time.Time
var checkOCRQueue <-chan time.Time
+ var checkOCRPageQueue <-chan time.Time
var checkAnalyseQueue <-chan time.Time
if !*nopreproc {
checkPreQueue = time.After(0)
@@ -418,6 +537,9 @@ func main() {
if !*noocr {
checkOCRQueue = time.After(0)
}
+ if !*noocrpg {
+ checkOCRPageQueue = time.After(0)
+ }
if !*noanalyse {
checkAnalyseQueue = time.After(0)
}
@@ -456,6 +578,22 @@ func main() {
if err != nil {
log.Println("Error during wipe", err)
}
+ case <-checkOCRPageQueue:
+ msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatTime*2)
+ checkOCRPageQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ log.Println("Error checking OCR Page queue", err)
+ continue
+ }
+ if msg.Handle == "" {
+ verboselog.Println("No message received on OCR Page queue, sleeping")
+ continue
+ }
+ verboselog.Println("Message received on OCR Page queue, processing", msg.Body)
+ err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId())
+ if err != nil {
+ log.Println("Error during OCR Page process", err)
+ }
case <-checkOCRQueue:
msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime*2)
checkOCRQueue = time.After(PauseBetweenChecks)
diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go
index 0b8ce49..a32d851 100644
--- a/cmd/lspipeline/main.go
+++ b/cmd/lspipeline/main.go
@@ -28,6 +28,7 @@ type LsPipeliner interface {
PreQueueId() string
WipeQueueId() string
OCRQueueId() string
+ OCRPageQueueId() string
AnalyseQueueId() string
GetQueueDetails(url string) (string, string, error)
GetInstanceDetails() ([]bookpipeline.InstanceDetails, error)
@@ -62,6 +63,7 @@ func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) {
{"preprocess", conn.PreQueueId()},
{"wipeonly", conn.WipeQueueId()},
{"ocr", conn.OCRQueueId()},
+ {"ocrpage", conn.OCRPageQueueId()},
{"analyse", conn.AnalyseQueueId()},
}
for _, q := range queues {
diff --git a/cmd/mkpipeline/main.go b/cmd/mkpipeline/main.go
index e37a56d..a32526a 100644
--- a/cmd/mkpipeline/main.go
+++ b/cmd/mkpipeline/main.go
@@ -34,7 +34,7 @@ func main() {
prefix := "rescribe"
buckets := []string{"inprogress", "done"}
- queues := []string{"preprocess", "wipeonly", "ocr", "analyse"}
+ queues := []string{"preprocess", "wipeonly", "ocr", "analyse", "ocrpage"}
for _, bucket := range buckets {
bname := prefix + bucket