diff options
| -rw-r--r-- | pipelinepreprocess/main.go | 212 | ||||
| -rw-r--r-- | preproc/preprocmulti.go | 93 | 
2 files changed, 305 insertions, 0 deletions
| diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go new file mode 100644 index 0000000..d4aa6a4 --- /dev/null +++ b/pipelinepreprocess/main.go @@ -0,0 +1,212 @@ +package main +// TODO: have logs go somewhere useful, like email +// TODO: handle errors more smartly than just always fatal erroring +//       - read the sdk guarantees on retrying and ensure we retry some times before giving up if necessary +//       - cancel the current book processing rather than killing the program in the case of a nonrecoverable error  + +import ( +	"log" +	"os" +	"path/filepath" +	"time" + +	"github.com/aws/aws-sdk-go/aws" +	"github.com/aws/aws-sdk-go/aws/session" +	"github.com/aws/aws-sdk-go/service/s3" +	"github.com/aws/aws-sdk-go/service/s3/s3manager" +	"github.com/aws/aws-sdk-go/service/sqs" + +	"rescribe.xyz/go.git/preproc" +) + +const HeartbeatTime = 60 +const PauseBetweenChecks = 60 * time.Second + +// TODO: could restructure like so: +//       have the goroutine functions run outside of the main loop in the program, +//       so use them for multiple books indefinitely. would require finding a way to +//       signal when the queues need to be updated (e.g. when a book is finished) +// +// MAYBE use a struct holding config info ala downloader in +// https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sdk-utilities.html +// +// TODO: consider having the download etc functions return a channel like a generator, like in rob pike's talk + +func download(dl chan string, pre chan string, downloader *s3manager.Downloader, dir string) { +	for key := range dl { +		fn := filepath.Join(dir, filepath.Base(key)) +		f, err := os.Create(fn) +		if err != nil { +			log.Fatalln("Failed to create file", fn, err) +		} +		defer f.Close() + +		_, err = downloader.Download(f, +			&s3.GetObjectInput{ +				Bucket: aws.String("inprogress"), +				Key: &key }) +		if err != nil { +			log.Fatalln("Failed to download", key, err) +		} +		pre <- fn +	} +	close(pre) +} + +func preprocess(pre chan string, up chan string) { +	for path := range pre { +		done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30) +		if err != nil { +			log.Fatalln("Error preprocessing", path, err) +		} +		for _, p := range done { +			up <- p +		} +	} +	close(up) +} + +func up(c chan string, done chan bool, uploader *s3manager.Uploader, bookname string) { +	for path := range c { +		name := filepath.Base(path) +		file, err := os.Open(path) +		if err != nil { +			log.Fatalln("Failed to open file", path, err) +		} +		defer file.Close() + +		_, err = uploader.Upload(&s3manager.UploadInput{ +			Bucket: aws.String("rescribeinprogress"), +			Key:    aws.String(filepath.Join(bookname, name)), +			Body:   file, +		}) +		if err != nil { +			log.Fatalln("Failed to upload", path, err) +		} +	} + +	done <- true +} + +func heartbeat(h *time.Ticker, msgHandle string, qurl string, sqssvc *sqs.SQS) { +	for _ = range h.C { +		duration := int64(HeartbeatTime * 2) +		_, err := sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ +			ReceiptHandle: &msgHandle, +			QueueUrl: &qurl, +			VisibilityTimeout: &duration, +		}) +		if err != nil { +			log.Fatalln("Error updating queue duration:", err) +		} +	} +} + +func main() { +	if len(os.Args) != 1 { +		log.Fatal("Usage: pipelinepreprocess\n\nContinuously checks the preprocess queue for books.\nWhen a book is found it's downloaded from the S3 inprogress bucket, preprocessed, and the results are uploaded to the S3 inprogress bucket. The book name is then added to the ocr queue, and removed from the preprocess queue.\n") +	} + +	sess, err := session.NewSession(&aws.Config{ +		Region: aws.String("eu-west-2"), +	}) +	if err != nil { +		log.Fatalln("Error: failed to set up aws session:", err) +	} +	s3svc := s3.New(sess) +	sqssvc := sqs.New(sess) +	downloader := s3manager.NewDownloader(sess) +	uploader := s3manager.NewUploader(sess) + +	preqname := "rescribepreprocess" +	result, err := sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ +		QueueName: aws.String(preqname), +	}) +	if err != nil { +		log.Fatalln("Error getting queue URL for", preqname, ":", err) +	} +	prequrl := *result.QueueUrl + +	ocrqname := "rescribeocr" +	result, err = sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ +		QueueName: aws.String(ocrqname), +	}) +	if err != nil { +		log.Fatalln("Error getting queue URL for", ocrqname, ":", err) +	} +	ocrqurl := *result.QueueUrl + +	for { +		msgResult, err := sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ +			MaxNumberOfMessages: aws.Int64(1), +			VisibilityTimeout: aws.Int64(HeartbeatTime * 2), +			WaitTimeSeconds: aws.Int64(HeartbeatTime), +			QueueUrl: &prequrl, +		}) +		if err != nil { +			log.Fatalln("Error checking queue", preqname, ":", err) +		} + +		var bookname string +		if len(msgResult.Messages) > 0 { +			bookname = *msgResult.Messages[0].Body +		} else { +			time.Sleep(PauseBetweenChecks) +			continue +		} + +		t := time.NewTicker(HeartbeatTime * time.Second) +		go heartbeat(t, *msgResult.Messages[0].ReceiptHandle, prequrl, sqssvc) + + +		d := filepath.Join(os.TempDir(), bookname) +		err = os.Mkdir(d, 0755) +		if err != nil { +			log.Fatalln("Failed to create directory", d, err) +		} + +		dl := make(chan string) +		pre := make(chan string) +		upc := make(chan string) // TODO: rename +		done := make(chan bool) // this is just to communicate when up has finished, so the queues can be updated + +		// these functions will do their jobs when their channels have data +		go download(dl, pre, downloader, d) +		go preprocess(pre, upc) +		go up(upc, done, uploader, bookname) + + +		err = s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ +			Bucket: aws.String("rescribeinprogress"), +			Prefix: aws.String(bookname), +		}, func(page *s3.ListObjectsV2Output, last bool) bool { +			for _, r := range page.Contents { +				dl <- *r.Key +			} +			return true +		}) +		close(dl) + + +		// wait for the done channel to be posted to +		<-done + +		_, err = sqssvc.SendMessage(&sqs.SendMessageInput{ +			MessageBody: aws.String(bookname), +			QueueUrl: &ocrqurl, +		}) +		if err != nil { +			log.Fatalln("Error sending message to queue", ocrqname, ":", err) +		} + +		t.Stop() + +		_, err = sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ +			QueueUrl: &prequrl, +			ReceiptHandle: msgResult.Messages[0].ReceiptHandle, +		}) +		if err != nil { +			log.Fatalln("Error deleting message from queue", preqname, ":", err) +		} +	} +} diff --git a/preproc/preprocmulti.go b/preproc/preprocmulti.go new file mode 100644 index 0000000..cd7ad34 --- /dev/null +++ b/preproc/preprocmulti.go @@ -0,0 +1,93 @@ +package preproc + +// TODO: come up with a way to set a good ksize automatically + +import ( +	"fmt" +	"image" +	"image/draw" +	_ "image/jpeg" +	"image/png" +	"os" +	"strings" + +	"rescribe.xyz/go.git/integralimg" +) + +// TODO: do more testing to see how good this assumption is +func autowsize(bounds image.Rectangle) int { +	return bounds.Dx() / 60 +} + +// PreProcMulti binarizes and preprocesses an image with multiple binarisation levels. +// inPath: Path of input image. +// ksizes: Slice of k values to pass to Sauvola algorithm +// binType: Type of binarization threshold. binary or zeroinv are currently implemented. +// binWsize: Window size for sauvola binarization algorithm. Set automatically based on resolution if 0. +// wipe: Whether to wipe (clear sides) the image +// wipeWsize: Window size for wiping algorithm +// wipeMinWidthPerc: Minimum percentage of the image width for the content width calculation to be considered valid +// Note: copied from cmd/preprocmulti/main.go, should think about the best way +//       to organise this code later. +// TODO: return errors that encapsulate the err describing where it was encountered +func PreProcMulti(inPath string, ksizes []float64, binType string, binWsize int, wipe bool, wipeWsize int, wipeMinWidthPerc int) ([]string, error) { +	// Make outBase inPath up to final . +	s := strings.Split(inPath, ".") +	outBase := strings.Join(s[:len(s)-1], "") + +	var donePaths []string + +	f, err := os.Open(inPath) +	if err != nil { +		return donePaths, err +	} +	defer f.Close() +	img, _, err := image.Decode(f) +	if err != nil { +		return donePaths, err +	} +	b := img.Bounds() +	gray := image.NewGray(image.Rect(0, 0, b.Dx(), b.Dy())) +	draw.Draw(gray, b, img, b.Min, draw.Src) + +	if binWsize == 0 { +		binWsize = autowsize(b) +	} + +	if binWsize%2 == 0 { +		binWsize++ +	} + +	var clean, threshimg image.Image +	integrals := integralimg.ToAllIntegralImg(gray) + +	for _, k := range ksizes { +		threshimg = PreCalcedSauvola(integrals, gray, k, binWsize) + +		if binType == "zeroinv" { +			threshimg, err = BinToZeroInv(threshimg.(*image.Gray), img.(*image.RGBA)) +			if err != nil { +				return donePaths, err +			} +		} + +		if wipe { +			clean = Wipe(threshimg.(*image.Gray), wipeWsize, k * 0.02, wipeMinWidthPerc) +		} else { +			clean = threshimg +		} + +		savefn := fmt.Sprintf("%s_bin%0.1f.png", outBase, k) +		f, err = os.Create(savefn) +		if err != nil { +			return donePaths, err +		} +		defer f.Close() +		err = png.Encode(f, clean) +		if err != nil { +			return donePaths, err +		} +		donePaths = append(donePaths, savefn) +	} +	return donePaths, nil +} | 
