diff options
| -rw-r--r-- | bookpipeline/aws.go | 29 | ||||
| -rw-r--r-- | bookpipeline/cmd/bookpipeline/main.go | 43 | ||||
| -rw-r--r-- | bookpipeline/cmd/lspipeline/main.go | 2 | ||||
| -rw-r--r-- | bookpipeline/cmd/mkpipeline/main.go | 2 | ||||
| -rw-r--r-- | preproc/wipesides.go | 44 | 
5 files changed, 109 insertions, 11 deletions
| diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index d4cd306..0127d6e 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -33,14 +33,14 @@ type AwsConn struct {  	Logger *log.Logger  	// these are used internally -	sess                          *session.Session -	ec2svc                        *ec2.EC2 -	s3svc                         *s3.S3 -	sqssvc                        *sqs.SQS -	downloader                    *s3manager.Downloader -	uploader                      *s3manager.Uploader -	prequrl, ocrqurl, analysequrl string -	wipstorageid                  string +	sess                                    *session.Session +	ec2svc                                  *ec2.EC2 +	s3svc                                   *s3.S3 +	sqssvc                                  *sqs.SQS +	downloader                              *s3manager.Downloader +	uploader                                *s3manager.Uploader +	wipequrl, prequrl, ocrqurl, analysequrl string +	wipstorageid                            string  }  // TODO: split this up, as not everything is needed for different uses @@ -74,6 +74,15 @@ func (a *AwsConn) Init() error {  	}  	a.prequrl = *result.QueueUrl +	a.Logger.Println("Getting wipeonly queue URL") +	result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ +		QueueName: aws.String("rescribewipeonly"), +	}) +	if err != nil { +		return errors.New(fmt.Sprintf("Error getting wipeonly queue URL: %s", err)) +	} +	a.wipequrl = *result.QueueUrl +  	a.Logger.Println("Getting OCR queue URL")  	result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{  		QueueName: aws.String("rescribeocr"), @@ -192,6 +201,10 @@ func (a *AwsConn) PreQueueId() string {  	return a.prequrl  } +func (a *AwsConn) WipeQueueId() string { +	return a.wipequrl +} +  func (a *AwsConn) OCRQueueId() string {  	return a.ocrqurl  } diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go index de9b072..7d04e3a 100644 --- a/bookpipeline/cmd/bookpipeline/main.go +++ b/bookpipeline/cmd/bookpipeline/main.go @@ -19,7 +19,7 @@ import (  	"rescribe.xyz/go.git/preproc"  ) -const usage = `Usage: bookpipeline [-v] [-np] [-no] [-na] [-t training] +const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-na] [-t training]  Watches the preprocess, ocr and analyse queues for book names. When  one is found this general process is followed: @@ -60,6 +60,7 @@ type Clouder interface {  type Pipeliner interface {  	Clouder  	PreQueueId() string +	WipeQueueId() string  	OCRQueueId() string  	AnalyseQueueId() string  	WIPStorageId() string @@ -118,6 +119,25 @@ func preprocess(pre chan string, up chan string, errc chan error, logger *log.Lo  	close(up)  } +func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { +	for path := range towipe { +		logger.Println("Wiping", path) +		s := strings.Split(path, ".") +		base := strings.Join(s[:len(s)-1], "") +		outpath := base + "_bin0.0.png" +		err := preproc.WipeFile(path, outpath, 5, 0.03, 30) +		if err != nil { +			for range towipe { +			} // consume the rest of the receiving channel so it isn't blocked +			close(up) +			errc <- err +			return +		} +		up <- outpath +	} +	close(up) +} +  func ocr(training string) func(chan string, chan string, chan error, *log.Logger) {  	return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) {  		for path := range toocr { @@ -347,6 +367,7 @@ func main() {  	verbose := flag.Bool("v", false, "verbose")  	training := flag.String("t", "rescribealphav5", "tesseract training file to use")  	nopreproc := flag.Bool("np", false, "disable preprocessing") +	nowipe := flag.Bool("nw", false, "disable wipeonly")  	noocr := flag.Bool("no", false, "disable ocr")  	noanalyse := flag.Bool("na", false, "disable analysis") @@ -379,11 +400,15 @@ func main() {  	verboselog.Println("Finished setting up AWS session")  	var checkPreQueue <-chan time.Time +	var checkWipeQueue <-chan time.Time  	var checkOCRQueue <-chan time.Time  	var checkAnalyseQueue <-chan time.Time  	if !*nopreproc {  		checkPreQueue = time.After(0)  	} +	if !*nowipe { +		checkWipeQueue = time.After(0) +	}  	if !*noocr {  		checkOCRQueue = time.After(0)  	} @@ -409,6 +434,22 @@ func main() {  			if err != nil {  				log.Println("Error during preprocess", err)  			} +		case <-checkWipeQueue: +			msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatTime*2) +			checkWipeQueue = time.After(PauseBetweenChecks) +			if err != nil { +				log.Println("Error checking wipeonly queue", err) +				continue +			} +			if msg.Handle == "" { +				verboselog.Println("No message received on wipeonly queue, sleeping") +				continue +			} +			verboselog.Println("Message received on wipeonly queue, processing", msg.Body) +			err = processBook(msg, conn, wipe, origPattern, conn.WipeQueueId(), conn.OCRQueueId()) +			if err != nil { +				log.Println("Error during wipe", err) +			}  		case <-checkOCRQueue:  			msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime*2)  			checkOCRQueue = time.After(PauseBetweenChecks) diff --git a/bookpipeline/cmd/lspipeline/main.go b/bookpipeline/cmd/lspipeline/main.go index 3e9df61..46a1d63 100644 --- a/bookpipeline/cmd/lspipeline/main.go +++ b/bookpipeline/cmd/lspipeline/main.go @@ -24,6 +24,7 @@ Lists useful things related to the pipeline.  type LsPipeliner interface {  	Init() error  	PreQueueId() string +	WipeQueueId() string  	OCRQueueId() string  	AnalyseQueueId() string  	GetQueueDetails(url string) (string, string, error) @@ -57,6 +58,7 @@ func getInstances(conn LsPipeliner, detailsc chan bookpipeline.InstanceDetails)  func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) {  	queues := []struct{ name, id string }{  		{"preprocess", conn.PreQueueId()}, +		{"wipeonly", conn.WipeQueueId()},  		{"ocr", conn.OCRQueueId()},  		{"analyse", conn.AnalyseQueueId()},  	} diff --git a/bookpipeline/cmd/mkpipeline/main.go b/bookpipeline/cmd/mkpipeline/main.go index 970543e..e37a56d 100644 --- a/bookpipeline/cmd/mkpipeline/main.go +++ b/bookpipeline/cmd/mkpipeline/main.go @@ -34,7 +34,7 @@ func main() {  	prefix := "rescribe"  	buckets := []string{"inprogress", "done"} -	queues := []string{"preprocess", "ocr", "analyse"} +	queues := []string{"preprocess", "wipeonly", "ocr", "analyse"}  	for _, bucket := range buckets {  		bname := prefix + bucket diff --git a/preproc/wipesides.go b/preproc/wipesides.go index 24fb7bd..1b73791 100644 --- a/preproc/wipesides.go +++ b/preproc/wipesides.go @@ -4,8 +4,14 @@ package preproc  // TODO: switch to an interface rather than integralimg.I  import ( +	"errors" +	"fmt"  	"image"  	"image/color" +	"image/draw" +	_ "image/jpeg" +	"image/png" +	"os"  	"rescribe.xyz/go.git/integralimg"  ) @@ -106,7 +112,7 @@ func toonarrow(img *image.Gray, lowedge int, highedge int, min int) bool {  	return false  } -// wipe fills the sections of image which fall outside the content +// Wipe fills the sections of image which fall outside the content  // area with white, providing the content area is above min %  func Wipe(img *image.Gray, wsize int, thresh float64, min int) *image.Gray {  	integral := integralimg.ToIntegralImg(img) @@ -116,3 +122,39 @@ func Wipe(img *image.Gray, wsize int, thresh float64, min int) *image.Gray {  	}  	return wipesides(img, lowedge, highedge)  } + +// WipeFile wipes an image file, filling the sections of the image +// which fall outside the content area with white, providing the +// content area is above min %. +// inPath: path of the input image. +// outPath: path to save the output image. +// wsize: window size for wipe algorithm. +// thresh: threshold for wipe algorithm. +// min: minimum % of content area width to consider valid. +func WipeFile(inPath string, outPath string, wsize int, thresh float64, min int) error { +	f, err := os.Open(inPath) +	defer f.Close() +	if err != nil { +		return errors.New(fmt.Sprintf("Could not open file %s: %v", inPath, err)) +	} +	img, _, err := image.Decode(f) +	if err != nil { +		return errors.New(fmt.Sprintf("Could not decode image: %v", err)) +	} +	b := img.Bounds() +	gray := image.NewGray(image.Rect(0, 0, b.Dx(), b.Dy())) +	draw.Draw(gray, b, img, b.Min, draw.Src) + +	clean := Wipe(gray, wsize, thresh, min) + +	f, err = os.Create(outPath) +	if err != nil { +		return errors.New(fmt.Sprintf("Could not create file %s: %v", outPath, err)) +	} +	defer f.Close() +	err = png.Encode(f, clean) +	if err != nil { +		return errors.New(fmt.Sprintf("Could not encode image: %v", err)) +	} +	return nil +} | 
