summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--aws.go14
-rw-r--r--cmd/bookpipeline/main.go140
-rw-r--r--cmd/lspipeline/main.go2
-rw-r--r--cmd/mkpipeline/main.go2
4 files changed, 156 insertions, 2 deletions
diff --git a/aws.go b/aws.go
index 73f3b2f..f5ac338 100644
--- a/aws.go
+++ b/aws.go
@@ -44,6 +44,7 @@ type AwsConn struct {
downloader *s3manager.Downloader
uploader *s3manager.Uploader
wipequrl, prequrl, ocrqurl, analysequrl string
+ ocrpgqurl string
wipstorageid string
}
@@ -105,6 +106,15 @@ func (a *AwsConn) Init() error {
}
a.analysequrl = *result.QueueUrl
+ a.Logger.Println("Getting OCR Page queue URL")
+ result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String("rescribeocrpage"),
+ })
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error getting OCR Page queue URL: %s", err))
+ }
+ a.ocrpgqurl = *result.QueueUrl
+
a.wipstorageid = "rescribeinprogress"
return nil
@@ -224,6 +234,10 @@ func (a *AwsConn) OCRQueueId() string {
return a.ocrqurl
}
+func (a *AwsConn) OCRPageQueueId() string {
+ return a.ocrpgqurl
+}
+
func (a *AwsConn) AnalyseQueueId() string {
return a.analysequrl
}
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go
index 0ed0d67..d70dbc7 100644
--- a/cmd/bookpipeline/main.go
+++ b/cmd/bookpipeline/main.go
@@ -17,7 +17,7 @@ import (
"rescribe.xyz/utils/pkg/hocr"
)
-const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-na] [-t training]
+const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-nop] [-na] [-t training]
Watches the preprocess, ocr and analyse queues for book names. When
one is found this general process is followed:
@@ -60,6 +60,7 @@ type Pipeliner interface {
PreQueueId() string
WipeQueueId() string
OCRQueueId() string
+ OCRPageQueueId() string
AnalyseQueueId() string
WIPStorageId() string
GetLogger() *log.Logger
@@ -277,6 +278,122 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri
}
}
+// allOCRed checks whether all pages of a book have been OCRed.
+// This is determined by whether every _bin0.?.png file has a
+// corresponding .hocr file.
+func allOCRed(bookname string, conn Pipeliner) bool {
+ objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
+ if err != nil {
+ return false
+ }
+
+ // Full wipePattern can match things like 0000.png which getgbook
+ // can emit but aren't ocr-able
+ //wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`)
+ wipePattern := regexp.MustCompile(`[0-9]{6}(.bin)?.png$`)
+ preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`)
+
+ atleastone := false
+ for _, png := range objs {
+ if wipePattern.MatchString(png) || preprocessedPattern.MatchString(png) {
+ atleastone = true
+ found := false
+ b := strings.TrimSuffix(filepath.Base(png), ".png")
+ hocrname := bookname + "/" + b + ".hocr"
+ for _, hocr := range objs {
+ if hocr == hocrname {
+ found = true
+ break
+ }
+ }
+ if found == false {
+ return false
+ }
+ }
+ }
+ if atleastone == false {
+ return false
+ }
+ return true
+}
+
+// ocrPage OCRs a page based on a message. It may make sense to
+// roll this back into processBook (on which it is based) once
+// working well.
+func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error {
+ dl := make(chan string)
+ msgc := make(chan bookpipeline.Qmsg)
+ processc := make(chan string)
+ upc := make(chan string)
+ done := make(chan bool)
+ errc := make(chan error)
+
+ bookname := filepath.Dir(msg.Body)
+
+ d := filepath.Join(os.TempDir(), bookname)
+ err := os.MkdirAll(d, 0755)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err))
+ }
+
+ t := time.NewTicker(HeartbeatTime * time.Second)
+ go heartbeat(conn, t, msg, fromQueue, msgc, errc)
+
+ // these functions will do their jobs when their channels have data
+ go download(dl, processc, conn, d, errc, conn.GetLogger())
+ go process(processc, upc, errc, conn.GetLogger())
+ go up(upc, done, conn, bookname, errc, conn.GetLogger())
+
+ dl <- msg.Body
+ close(dl)
+
+ // wait for either the done or errc channel to be sent to
+ select {
+ case err = <-errc:
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return err
+ case <-done:
+ }
+
+ if allOCRed(bookname, conn) && toQueue != "" {
+ conn.GetLogger().Println("Sending", bookname, "to queue", toQueue)
+ err = conn.AddToQueue(toQueue, bookname)
+ if err != nil {
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err))
+ }
+ }
+
+ t.Stop()
+
+ // check whether we're using a newer msg handle
+ select {
+ case m, ok := <-msgc:
+ if ok {
+ msg = m
+ conn.GetLogger().Println("Using new message handle to delete message from old queue")
+ }
+ default:
+ conn.GetLogger().Println("Using original message handle to delete message from old queue")
+ }
+
+ conn.GetLogger().Println("Deleting original message from queue", fromQueue)
+ err = conn.DelFromQueue(fromQueue, msg.Handle)
+ if err != nil {
+ _ = os.RemoveAll(d)
+ return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err))
+ }
+
+ err = os.RemoveAll(d)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err))
+ }
+
+ return nil
+}
+
func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
dl := make(chan string)
msgc := make(chan bookpipeline.Qmsg)
@@ -374,6 +491,7 @@ func main() {
nopreproc := flag.Bool("np", false, "disable preprocessing")
nowipe := flag.Bool("nw", false, "disable wipeonly")
noocr := flag.Bool("no", false, "disable ocr")
+ noocrpg := flag.Bool("nop", false, "disable ocr on individual pages")
noanalyse := flag.Bool("na", false, "disable analysis")
flag.Usage = func() {
@@ -408,6 +526,7 @@ func main() {
var checkPreQueue <-chan time.Time
var checkWipeQueue <-chan time.Time
var checkOCRQueue <-chan time.Time
+ var checkOCRPageQueue <-chan time.Time
var checkAnalyseQueue <-chan time.Time
if !*nopreproc {
checkPreQueue = time.After(0)
@@ -418,6 +537,9 @@ func main() {
if !*noocr {
checkOCRQueue = time.After(0)
}
+ if !*noocrpg {
+ checkOCRPageQueue = time.After(0)
+ }
if !*noanalyse {
checkAnalyseQueue = time.After(0)
}
@@ -456,6 +578,22 @@ func main() {
if err != nil {
log.Println("Error during wipe", err)
}
+ case <-checkOCRPageQueue:
+ msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatTime*2)
+ checkOCRPageQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ log.Println("Error checking OCR Page queue", err)
+ continue
+ }
+ if msg.Handle == "" {
+ verboselog.Println("No message received on OCR Page queue, sleeping")
+ continue
+ }
+ verboselog.Println("Message received on OCR Page queue, processing", msg.Body)
+ err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId())
+ if err != nil {
+ log.Println("Error during OCR Page process", err)
+ }
case <-checkOCRQueue:
msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime*2)
checkOCRQueue = time.After(PauseBetweenChecks)
diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go
index 0b8ce49..a32d851 100644
--- a/cmd/lspipeline/main.go
+++ b/cmd/lspipeline/main.go
@@ -28,6 +28,7 @@ type LsPipeliner interface {
PreQueueId() string
WipeQueueId() string
OCRQueueId() string
+ OCRPageQueueId() string
AnalyseQueueId() string
GetQueueDetails(url string) (string, string, error)
GetInstanceDetails() ([]bookpipeline.InstanceDetails, error)
@@ -62,6 +63,7 @@ func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) {
{"preprocess", conn.PreQueueId()},
{"wipeonly", conn.WipeQueueId()},
{"ocr", conn.OCRQueueId()},
+ {"ocrpage", conn.OCRPageQueueId()},
{"analyse", conn.AnalyseQueueId()},
}
for _, q := range queues {
diff --git a/cmd/mkpipeline/main.go b/cmd/mkpipeline/main.go
index e37a56d..a32526a 100644
--- a/cmd/mkpipeline/main.go
+++ b/cmd/mkpipeline/main.go
@@ -34,7 +34,7 @@ func main() {
prefix := "rescribe"
buckets := []string{"inprogress", "done"}
- queues := []string{"preprocess", "wipeonly", "ocr", "analyse"}
+ queues := []string{"preprocess", "wipeonly", "ocr", "analyse", "ocrpage"}
for _, bucket := range buckets {
bname := prefix + bucket