summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-10-02 12:32:59 +0100
committerNick White <git@njw.name>2019-10-02 12:32:59 +0100
commit94352c6124aed2d85cfb21e83c0dafb6f918fb7d (patch)
tree6bf574be2db265f714d16bf0140b5c3056b408e8
parent33e538c56f7513f1b98cab16de771a47a0e2a300 (diff)
Add wipeonly queue and functionality
This is useful for prebinarised images, which don't need full preprocessing, but do require wiping, albeit with a more conservative threshold.
-rw-r--r--bookpipeline/aws.go29
-rw-r--r--bookpipeline/cmd/bookpipeline/main.go43
-rw-r--r--bookpipeline/cmd/lspipeline/main.go2
-rw-r--r--bookpipeline/cmd/mkpipeline/main.go2
-rw-r--r--preproc/wipesides.go44
5 files changed, 109 insertions, 11 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index d4cd306..0127d6e 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -33,14 +33,14 @@ type AwsConn struct {
Logger *log.Logger
// these are used internally
- sess *session.Session
- ec2svc *ec2.EC2
- s3svc *s3.S3
- sqssvc *sqs.SQS
- downloader *s3manager.Downloader
- uploader *s3manager.Uploader
- prequrl, ocrqurl, analysequrl string
- wipstorageid string
+ sess *session.Session
+ ec2svc *ec2.EC2
+ s3svc *s3.S3
+ sqssvc *sqs.SQS
+ downloader *s3manager.Downloader
+ uploader *s3manager.Uploader
+ wipequrl, prequrl, ocrqurl, analysequrl string
+ wipstorageid string
}
// TODO: split this up, as not everything is needed for different uses
@@ -74,6 +74,15 @@ func (a *AwsConn) Init() error {
}
a.prequrl = *result.QueueUrl
+ a.Logger.Println("Getting wipeonly queue URL")
+ result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String("rescribewipeonly"),
+ })
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error getting wipeonly queue URL: %s", err))
+ }
+ a.wipequrl = *result.QueueUrl
+
a.Logger.Println("Getting OCR queue URL")
result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String("rescribeocr"),
@@ -192,6 +201,10 @@ func (a *AwsConn) PreQueueId() string {
return a.prequrl
}
+func (a *AwsConn) WipeQueueId() string {
+ return a.wipequrl
+}
+
func (a *AwsConn) OCRQueueId() string {
return a.ocrqurl
}
diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go
index de9b072..7d04e3a 100644
--- a/bookpipeline/cmd/bookpipeline/main.go
+++ b/bookpipeline/cmd/bookpipeline/main.go
@@ -19,7 +19,7 @@ import (
"rescribe.xyz/go.git/preproc"
)
-const usage = `Usage: bookpipeline [-v] [-np] [-no] [-na] [-t training]
+const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-na] [-t training]
Watches the preprocess, ocr and analyse queues for book names. When
one is found this general process is followed:
@@ -60,6 +60,7 @@ type Clouder interface {
type Pipeliner interface {
Clouder
PreQueueId() string
+ WipeQueueId() string
OCRQueueId() string
AnalyseQueueId() string
WIPStorageId() string
@@ -118,6 +119,25 @@ func preprocess(pre chan string, up chan string, errc chan error, logger *log.Lo
close(up)
}
+func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) {
+ for path := range towipe {
+ logger.Println("Wiping", path)
+ s := strings.Split(path, ".")
+ base := strings.Join(s[:len(s)-1], "")
+ outpath := base + "_bin0.0.png"
+ err := preproc.WipeFile(path, outpath, 5, 0.03, 30)
+ if err != nil {
+ for range towipe {
+ } // consume the rest of the receiving channel so it isn't blocked
+ close(up)
+ errc <- err
+ return
+ }
+ up <- outpath
+ }
+ close(up)
+}
+
func ocr(training string) func(chan string, chan string, chan error, *log.Logger) {
return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) {
for path := range toocr {
@@ -347,6 +367,7 @@ func main() {
verbose := flag.Bool("v", false, "verbose")
training := flag.String("t", "rescribealphav5", "tesseract training file to use")
nopreproc := flag.Bool("np", false, "disable preprocessing")
+ nowipe := flag.Bool("nw", false, "disable wipeonly")
noocr := flag.Bool("no", false, "disable ocr")
noanalyse := flag.Bool("na", false, "disable analysis")
@@ -379,11 +400,15 @@ func main() {
verboselog.Println("Finished setting up AWS session")
var checkPreQueue <-chan time.Time
+ var checkWipeQueue <-chan time.Time
var checkOCRQueue <-chan time.Time
var checkAnalyseQueue <-chan time.Time
if !*nopreproc {
checkPreQueue = time.After(0)
}
+ if !*nowipe {
+ checkWipeQueue = time.After(0)
+ }
if !*noocr {
checkOCRQueue = time.After(0)
}
@@ -409,6 +434,22 @@ func main() {
if err != nil {
log.Println("Error during preprocess", err)
}
+ case <-checkWipeQueue:
+ msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatTime*2)
+ checkWipeQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ log.Println("Error checking wipeonly queue", err)
+ continue
+ }
+ if msg.Handle == "" {
+ verboselog.Println("No message received on wipeonly queue, sleeping")
+ continue
+ }
+ verboselog.Println("Message received on wipeonly queue, processing", msg.Body)
+ err = processBook(msg, conn, wipe, origPattern, conn.WipeQueueId(), conn.OCRQueueId())
+ if err != nil {
+ log.Println("Error during wipe", err)
+ }
case <-checkOCRQueue:
msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime*2)
checkOCRQueue = time.After(PauseBetweenChecks)
diff --git a/bookpipeline/cmd/lspipeline/main.go b/bookpipeline/cmd/lspipeline/main.go
index 3e9df61..46a1d63 100644
--- a/bookpipeline/cmd/lspipeline/main.go
+++ b/bookpipeline/cmd/lspipeline/main.go
@@ -24,6 +24,7 @@ Lists useful things related to the pipeline.
type LsPipeliner interface {
Init() error
PreQueueId() string
+ WipeQueueId() string
OCRQueueId() string
AnalyseQueueId() string
GetQueueDetails(url string) (string, string, error)
@@ -57,6 +58,7 @@ func getInstances(conn LsPipeliner, detailsc chan bookpipeline.InstanceDetails)
func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) {
queues := []struct{ name, id string }{
{"preprocess", conn.PreQueueId()},
+ {"wipeonly", conn.WipeQueueId()},
{"ocr", conn.OCRQueueId()},
{"analyse", conn.AnalyseQueueId()},
}
diff --git a/bookpipeline/cmd/mkpipeline/main.go b/bookpipeline/cmd/mkpipeline/main.go
index 970543e..e37a56d 100644
--- a/bookpipeline/cmd/mkpipeline/main.go
+++ b/bookpipeline/cmd/mkpipeline/main.go
@@ -34,7 +34,7 @@ func main() {
prefix := "rescribe"
buckets := []string{"inprogress", "done"}
- queues := []string{"preprocess", "ocr", "analyse"}
+ queues := []string{"preprocess", "wipeonly", "ocr", "analyse"}
for _, bucket := range buckets {
bname := prefix + bucket
diff --git a/preproc/wipesides.go b/preproc/wipesides.go
index 24fb7bd..1b73791 100644
--- a/preproc/wipesides.go
+++ b/preproc/wipesides.go
@@ -4,8 +4,14 @@ package preproc
// TODO: switch to an interface rather than integralimg.I
import (
+ "errors"
+ "fmt"
"image"
"image/color"
+ "image/draw"
+ _ "image/jpeg"
+ "image/png"
+ "os"
"rescribe.xyz/go.git/integralimg"
)
@@ -106,7 +112,7 @@ func toonarrow(img *image.Gray, lowedge int, highedge int, min int) bool {
return false
}
-// wipe fills the sections of image which fall outside the content
+// Wipe fills the sections of image which fall outside the content
// area with white, providing the content area is above min %
func Wipe(img *image.Gray, wsize int, thresh float64, min int) *image.Gray {
integral := integralimg.ToIntegralImg(img)
@@ -116,3 +122,39 @@ func Wipe(img *image.Gray, wsize int, thresh float64, min int) *image.Gray {
}
return wipesides(img, lowedge, highedge)
}
+
+// WipeFile wipes an image file, filling the sections of the image
+// which fall outside the content area with white, providing the
+// content area is above min %.
+// inPath: path of the input image.
+// outPath: path to save the output image.
+// wsize: window size for wipe algorithm.
+// thresh: threshold for wipe algorithm.
+// min: minimum % of content area width to consider valid.
+func WipeFile(inPath string, outPath string, wsize int, thresh float64, min int) error {
+ f, err := os.Open(inPath)
+ defer f.Close()
+ if err != nil {
+ return errors.New(fmt.Sprintf("Could not open file %s: %v", inPath, err))
+ }
+ img, _, err := image.Decode(f)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Could not decode image: %v", err))
+ }
+ b := img.Bounds()
+ gray := image.NewGray(image.Rect(0, 0, b.Dx(), b.Dy()))
+ draw.Draw(gray, b, img, b.Min, draw.Src)
+
+ clean := Wipe(gray, wsize, thresh, min)
+
+ f, err = os.Create(outPath)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Could not create file %s: %v", outPath, err))
+ }
+ defer f.Close()
+ err = png.Encode(f, clean)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Could not encode image: %v", err))
+ }
+ return nil
+}