summaryrefslogtreecommitdiff
path: root/bookpipeline
diff options
context:
space:
mode:
Diffstat (limited to 'bookpipeline')
-rw-r--r--bookpipeline/aws.go29
-rw-r--r--bookpipeline/cmd/bookpipeline/main.go43
-rw-r--r--bookpipeline/cmd/lspipeline/main.go2
-rw-r--r--bookpipeline/cmd/mkpipeline/main.go2
4 files changed, 66 insertions, 10 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index d4cd306..0127d6e 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -33,14 +33,14 @@ type AwsConn struct {
Logger *log.Logger
// these are used internally
- sess *session.Session
- ec2svc *ec2.EC2
- s3svc *s3.S3
- sqssvc *sqs.SQS
- downloader *s3manager.Downloader
- uploader *s3manager.Uploader
- prequrl, ocrqurl, analysequrl string
- wipstorageid string
+ sess *session.Session
+ ec2svc *ec2.EC2
+ s3svc *s3.S3
+ sqssvc *sqs.SQS
+ downloader *s3manager.Downloader
+ uploader *s3manager.Uploader
+ wipequrl, prequrl, ocrqurl, analysequrl string
+ wipstorageid string
}
// TODO: split this up, as not everything is needed for different uses
@@ -74,6 +74,15 @@ func (a *AwsConn) Init() error {
}
a.prequrl = *result.QueueUrl
+ a.Logger.Println("Getting wipeonly queue URL")
+ result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String("rescribewipeonly"),
+ })
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error getting wipeonly queue URL: %s", err))
+ }
+ a.wipequrl = *result.QueueUrl
+
a.Logger.Println("Getting OCR queue URL")
result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String("rescribeocr"),
@@ -192,6 +201,10 @@ func (a *AwsConn) PreQueueId() string {
return a.prequrl
}
+func (a *AwsConn) WipeQueueId() string {
+ return a.wipequrl
+}
+
func (a *AwsConn) OCRQueueId() string {
return a.ocrqurl
}
diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go
index de9b072..7d04e3a 100644
--- a/bookpipeline/cmd/bookpipeline/main.go
+++ b/bookpipeline/cmd/bookpipeline/main.go
@@ -19,7 +19,7 @@ import (
"rescribe.xyz/go.git/preproc"
)
-const usage = `Usage: bookpipeline [-v] [-np] [-no] [-na] [-t training]
+const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-na] [-t training]
Watches the preprocess, ocr and analyse queues for book names. When
one is found this general process is followed:
@@ -60,6 +60,7 @@ type Clouder interface {
type Pipeliner interface {
Clouder
PreQueueId() string
+ WipeQueueId() string
OCRQueueId() string
AnalyseQueueId() string
WIPStorageId() string
@@ -118,6 +119,25 @@ func preprocess(pre chan string, up chan string, errc chan error, logger *log.Lo
close(up)
}
+func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) {
+ for path := range towipe {
+ logger.Println("Wiping", path)
+ s := strings.Split(path, ".")
+ base := strings.Join(s[:len(s)-1], "")
+ outpath := base + "_bin0.0.png"
+ err := preproc.WipeFile(path, outpath, 5, 0.03, 30)
+ if err != nil {
+ for range towipe {
+ } // consume the rest of the receiving channel so it isn't blocked
+ close(up)
+ errc <- err
+ return
+ }
+ up <- outpath
+ }
+ close(up)
+}
+
func ocr(training string) func(chan string, chan string, chan error, *log.Logger) {
return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) {
for path := range toocr {
@@ -347,6 +367,7 @@ func main() {
verbose := flag.Bool("v", false, "verbose")
training := flag.String("t", "rescribealphav5", "tesseract training file to use")
nopreproc := flag.Bool("np", false, "disable preprocessing")
+ nowipe := flag.Bool("nw", false, "disable wipeonly")
noocr := flag.Bool("no", false, "disable ocr")
noanalyse := flag.Bool("na", false, "disable analysis")
@@ -379,11 +400,15 @@ func main() {
verboselog.Println("Finished setting up AWS session")
var checkPreQueue <-chan time.Time
+ var checkWipeQueue <-chan time.Time
var checkOCRQueue <-chan time.Time
var checkAnalyseQueue <-chan time.Time
if !*nopreproc {
checkPreQueue = time.After(0)
}
+ if !*nowipe {
+ checkWipeQueue = time.After(0)
+ }
if !*noocr {
checkOCRQueue = time.After(0)
}
@@ -409,6 +434,22 @@ func main() {
if err != nil {
log.Println("Error during preprocess", err)
}
+ case <-checkWipeQueue:
+ msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatTime*2)
+ checkWipeQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ log.Println("Error checking wipeonly queue", err)
+ continue
+ }
+ if msg.Handle == "" {
+ verboselog.Println("No message received on wipeonly queue, sleeping")
+ continue
+ }
+ verboselog.Println("Message received on wipeonly queue, processing", msg.Body)
+ err = processBook(msg, conn, wipe, origPattern, conn.WipeQueueId(), conn.OCRQueueId())
+ if err != nil {
+ log.Println("Error during wipe", err)
+ }
case <-checkOCRQueue:
msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime*2)
checkOCRQueue = time.After(PauseBetweenChecks)
diff --git a/bookpipeline/cmd/lspipeline/main.go b/bookpipeline/cmd/lspipeline/main.go
index 3e9df61..46a1d63 100644
--- a/bookpipeline/cmd/lspipeline/main.go
+++ b/bookpipeline/cmd/lspipeline/main.go
@@ -24,6 +24,7 @@ Lists useful things related to the pipeline.
type LsPipeliner interface {
Init() error
PreQueueId() string
+ WipeQueueId() string
OCRQueueId() string
AnalyseQueueId() string
GetQueueDetails(url string) (string, string, error)
@@ -57,6 +58,7 @@ func getInstances(conn LsPipeliner, detailsc chan bookpipeline.InstanceDetails)
func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) {
queues := []struct{ name, id string }{
{"preprocess", conn.PreQueueId()},
+ {"wipeonly", conn.WipeQueueId()},
{"ocr", conn.OCRQueueId()},
{"analyse", conn.AnalyseQueueId()},
}
diff --git a/bookpipeline/cmd/mkpipeline/main.go b/bookpipeline/cmd/mkpipeline/main.go
index 970543e..e37a56d 100644
--- a/bookpipeline/cmd/mkpipeline/main.go
+++ b/bookpipeline/cmd/mkpipeline/main.go
@@ -34,7 +34,7 @@ func main() {
prefix := "rescribe"
buckets := []string{"inprogress", "done"}
- queues := []string{"preprocess", "ocr", "analyse"}
+ queues := []string{"preprocess", "wipeonly", "ocr", "analyse"}
for _, bucket := range buckets {
bname := prefix + bucket