diff options
| -rw-r--r-- | bookpipeline/aws.go | 56 | ||||
| -rw-r--r-- | bookpipeline/cmd/bookpipeline/main.go (renamed from bookpipeline/main.go) | 57 | ||||
| -rw-r--r-- | bookpipeline/graph.go | 35 | 
3 files changed, 74 insertions, 74 deletions
| diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index 761031d..7409434 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -1,4 +1,4 @@ -package main +package bookpipeline  import (  	"errors" @@ -17,10 +17,14 @@ import (  const PreprocPattern = `_bin[0-9].[0-9].png`  const HeartbeatTime = 60 -type awsConn struct { +type Qmsg struct { +        Handle, Body string +} + +type AwsConn struct {  	// these need to be set before running Init() -	region string -	logger *log.Logger +	Region string +	Logger *log.Logger  	// these are used internally  	sess                          *session.Session @@ -32,17 +36,17 @@ type awsConn struct {  	wipstorageid                  string  } -func (a *awsConn) Init() error { -	if a.region == "" { -		return errors.New("No region set") +func (a *AwsConn) Init() error { +	if a.Region == "" { +		return errors.New("No Region set")  	} -	if a.logger == nil { +	if a.Logger == nil {  		return errors.New("No logger set")  	}  	var err error  	a.sess, err = session.NewSession(&aws.Config{ -		Region: aws.String(a.region), +		Region: aws.String(a.Region),  	})  	if err != nil {  		return errors.New(fmt.Sprintf("Failed to set up aws session: %s", err)) @@ -52,7 +56,7 @@ func (a *awsConn) Init() error {  	a.downloader = s3manager.NewDownloader(a.sess)  	a.uploader = s3manager.NewUploader(a.sess) -	a.logger.Println("Getting preprocess queue URL") +	a.Logger.Println("Getting preprocess queue URL")  	result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{  		QueueName: aws.String("rescribepreprocess"),  	}) @@ -61,7 +65,7 @@ func (a *awsConn) Init() error {  	}  	a.prequrl = *result.QueueUrl -	a.logger.Println("Getting OCR queue URL") +	a.Logger.Println("Getting OCR queue URL")  	result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{  		QueueName: aws.String("rescribeocr"),  	}) @@ -70,7 +74,7 @@ func (a *awsConn) Init() error {  	}  	a.ocrqurl = *result.QueueUrl -	a.logger.Println("Getting analyse queue URL") +	a.Logger.Println("Getting analyse queue URL")  	result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{  		QueueName: aws.String("rescribeanalyse"),  	}) @@ -84,7 +88,7 @@ func (a *awsConn) Init() error {  	return nil  } -func (a *awsConn) CheckQueue(url string) (Qmsg, error) { +func (a *AwsConn) CheckQueue(url string) (Qmsg, error) {  	msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{  		MaxNumberOfMessages: aws.Int64(1),  		VisibilityTimeout:   aws.Int64(HeartbeatTime * 2), @@ -97,14 +101,14 @@ func (a *awsConn) CheckQueue(url string) (Qmsg, error) {  	if len(msgResult.Messages) > 0 {  		msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body} -		a.logger.Println("Message received:", msg.Body) +		a.Logger.Println("Message received:", msg.Body)  		return msg, nil  	} else {  		return Qmsg{}, nil  	}  } -func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { +func (a *AwsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {  	for _ = range t.C {  		duration := int64(HeartbeatTime * 2)  		_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ @@ -119,23 +123,23 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string)  	return nil  } -func (a *awsConn) PreQueueId() string { +func (a *AwsConn) PreQueueId() string {  	return a.prequrl  } -func (a *awsConn) OCRQueueId() string { +func (a *AwsConn) OCRQueueId() string {  	return a.ocrqurl  } -func (a *awsConn) AnalyseQueueId() string { +func (a *AwsConn) AnalyseQueueId() string {  	return a.analysequrl  } -func (a *awsConn) WIPStorageId() string { +func (a *AwsConn) WIPStorageId() string {  	return a.wipstorageid  } -func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) { +func (a *AwsConn) ListObjects(bucket string, prefix string) ([]string, error) {  	var names []string  	err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{  		Bucket: aws.String(bucket), @@ -149,7 +153,7 @@ func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) {  	return names, err  } -func (a *awsConn) AddToQueue(url string, msg string) error { +func (a *AwsConn) AddToQueue(url string, msg string) error {  	_, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{  		MessageBody: &msg,  		QueueUrl:    &url, @@ -157,7 +161,7 @@ func (a *awsConn) AddToQueue(url string, msg string) error {  	return err  } -func (a *awsConn) DelFromQueue(url string, handle string) error { +func (a *AwsConn) DelFromQueue(url string, handle string) error {  	_, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{  		QueueUrl:      &url,  		ReceiptHandle: &handle, @@ -165,7 +169,7 @@ func (a *awsConn) DelFromQueue(url string, handle string) error {  	return err  } -func (a *awsConn) Download(bucket string, key string, path string) error { +func (a *AwsConn) Download(bucket string, key string, path string) error {  	f, err := os.Create(path)  	if err != nil {  		return err @@ -180,7 +184,7 @@ func (a *awsConn) Download(bucket string, key string, path string) error {  	return err  } -func (a *awsConn) Upload(bucket string, key string, path string) error { +func (a *AwsConn) Upload(bucket string, key string, path string) error {  	file, err := os.Open(path)  	if err != nil {  		log.Fatalln("Failed to open file", path, err) @@ -195,6 +199,6 @@ func (a *awsConn) Upload(bucket string, key string, path string) error {  	return err  } -func (a *awsConn) Logger() *log.Logger { -	return a.logger +func (a *AwsConn) GetLogger() *log.Logger { +	return a.Logger  } diff --git a/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go index b7b01dd..9fa7159 100644 --- a/bookpipeline/main.go +++ b/bookpipeline/cmd/bookpipeline/main.go @@ -15,6 +15,7 @@ import (  	"strings"  	"time" +	"rescribe.xyz/go.git/bookpipeline"  	"rescribe.xyz/go.git/lib/hocr"  	"rescribe.xyz/go.git/preproc"  ) @@ -36,6 +37,9 @@ one is found this general process is followed:  ` +const PauseBetweenChecks = 3 * time.Minute +const HeartbeatTime = 60 +  // null writer to enable non-verbose logging to be discarded  type NullWriter bool @@ -43,14 +47,12 @@ func (w NullWriter) Write(p []byte) (n int, err error) {  	return len(p), nil  } -const PauseBetweenChecks = 3 * time.Minute -  type Clouder interface {  	Init() error  	ListObjects(bucket string, prefix string) ([]string, error)  	Download(bucket string, key string, fn string) error  	Upload(bucket string, key string, path string) error -	CheckQueue(url string) (Qmsg, error) +	CheckQueue(url string) (bookpipeline.Qmsg, error)  	AddToQueue(url string, msg string) error  	DelFromQueue(url string, handle string) error  	QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error @@ -62,11 +64,7 @@ type Pipeliner interface {  	OCRQueueId() string  	AnalyseQueueId() string  	WIPStorageId() string -	Logger() *log.Logger -} - -type Qmsg struct { -	Handle, Body string +	GetLogger() *log.Logger  }  func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error) { @@ -131,14 +129,9 @@ func ocr(training string) func(chan string, chan string, chan error, *log.Logger  	}  } -type Conf struct { -	path, code string -	conf float64 -} -  func analyse(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { -	confs := make(map[string][]*Conf) -	bestconfs := make(map[string]*Conf) +	confs := make(map[string][]*bookpipeline.Conf) +	bestconfs := make(map[string]*bookpipeline.Conf)  	savedir := ""  	for path := range toanalyse { @@ -155,10 +148,10 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log  		base := filepath.Base(path)  		codestart := strings.Index(base, "_bin")  		name := base[0:codestart] -		var c Conf -		c.path = path -		c.code = base[codestart:] -		c.conf = avg +		var c bookpipeline.Conf +		c.Path = path +		c.Code = base[codestart:] +		c.Conf = avg  		confs[name] = append(confs[name], &c)  	} @@ -177,11 +170,11 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log  	for base, conf := range confs {  		var best float64  		for _, c := range conf { -			if c.conf > best { -				best = c.conf +			if c.Conf > best { +				best = c.Conf  				bestconfs[base] = c  			} -			_, err = fmt.Fprintf(f, "%s\t%02.f\n", c.path, c.conf) +			_, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf)  			if err != nil {  				close(up)  				errc <- errors.New(fmt.Sprintf("Error writing confidences file: %s", err)) @@ -201,7 +194,7 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log  	}  	defer f.Close()  	for _, conf := range bestconfs { -		_, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.path)) +		_, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path))  	}  	up <- fn @@ -214,7 +207,7 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log  		return  	}  	defer f.Close() -	err = graph(bestconfs, filepath.Base(savedir), f) +	err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f)  	if err != nil {  		close(up)  		errc <- errors.New(fmt.Sprintf("Error rendering graph: %s", err)) @@ -222,12 +215,10 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log  	}  	up <- fn -	// TODO: generate a general report.txt with statistics etc for the book, send to up -  	close(up)  } -func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { +func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {  	bookname := msg.Body  	t := time.NewTicker(HeartbeatTime * time.Second) @@ -248,10 +239,10 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string  	// these functions will do their jobs when their channels have data  	go download(dl, processc, conn, d, errc) -	go process(processc, upc, errc, conn.Logger()) +	go process(processc, upc, errc, conn.GetLogger())  	go up(upc, done, conn, bookname, errc) -	conn.Logger().Println("Getting list of objects to download") +	conn.GetLogger().Println("Getting list of objects to download")  	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)  	if err != nil {  		t.Stop() @@ -261,7 +252,7 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string  	var todl []string  	for _, n := range objs {  		if !match.MatchString(n) { -			conn.Logger().Println("Skipping item that doesn't match target", n) +			conn.GetLogger().Println("Skipping item that doesn't match target", n)  			continue  		}  		todl = append(todl, n) @@ -281,7 +272,7 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string  	}  	if toQueue != "" { -		conn.Logger().Println("Sending", bookname, "to queue") +		conn.GetLogger().Println("Sending", bookname, "to queue")  		err = conn.AddToQueue(toQueue, bookname)  		if err != nil {  			t.Stop() @@ -292,7 +283,7 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string  	t.Stop() -	conn.Logger().Println("Deleting original message from queue") +	conn.GetLogger().Println("Deleting original message from queue")  	err = conn.DelFromQueue(fromQueue, msg.Handle)  	if err != nil {  		_ = os.RemoveAll(d) @@ -329,7 +320,7 @@ func main() {  	ocredPattern := regexp.MustCompile(`.hocr$`)  	var conn Pipeliner -	conn = &awsConn{region: "eu-west-2", logger: verboselog} +	conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}  	verboselog.Println("Setting up AWS session")  	err := conn.Init() diff --git a/bookpipeline/graph.go b/bookpipeline/graph.go index 27ffd39..a4fdee0 100644 --- a/bookpipeline/graph.go +++ b/bookpipeline/graph.go @@ -1,4 +1,4 @@ -package main +package bookpipeline  import (  	"fmt" @@ -14,26 +14,31 @@ import (  const maxticks = 20  const cutoff = 70 +type Conf struct { +        Path, Code string +        Conf float64 +} +  type GraphConf struct { -	pgnum, conf float64 +	Pgnum, Conf float64  } -func graph(confs map[string]*Conf, bookname string, w io.Writer) (error) { +func Graph(confs map[string]*Conf, bookname string, w io.Writer) (error) {  	// Organise confs to sort them by page  	var graphconf []GraphConf  	for _, conf := range confs { -		name := filepath.Base(conf.path) +		name := filepath.Base(conf.Path)  		numend := strings.Index(name, "_")  		pgnum, err := strconv.ParseFloat(name[0:numend], 64)  		if err != nil {  			continue  		}  		var c GraphConf -		c.pgnum = pgnum -		c.conf = conf.conf +		c.Pgnum = pgnum +		c.Conf = conf.Conf  		graphconf = append(graphconf, c)  	} -	sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].pgnum < graphconf[j].pgnum }) +	sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].Pgnum < graphconf[j].Pgnum })  	// Create main xvalues and yvalues, annotations and ticks  	var xvalues, yvalues []float64 @@ -43,13 +48,13 @@ func graph(confs map[string]*Conf, bookname string, w io.Writer) (error) {  	tickevery := len(graphconf) / maxticks  	for _, c := range graphconf {  		i = i + 1 -		xvalues = append(xvalues, c.pgnum) -		yvalues = append(yvalues, c.conf) -		if c.conf < cutoff { -			annotations = append(annotations, chart.Value2{Label: fmt.Sprintf("%.0f", c.pgnum), XValue: c.pgnum, YValue: c.conf}) +		xvalues = append(xvalues, c.Pgnum) +		yvalues = append(yvalues, c.Conf) +		if c.Conf < cutoff { +			annotations = append(annotations, chart.Value2{Label: fmt.Sprintf("%.0f", c.Pgnum), XValue: c.Pgnum, YValue: c.Conf})  		}  		if tickevery % i == 0 { -			ticks = append(ticks, chart.Tick{c.pgnum, fmt.Sprintf("%.0f", c.pgnum)}) +			ticks = append(ticks, chart.Tick{c.Pgnum, fmt.Sprintf("%.0f", c.Pgnum)})  		}  	}  	mainSeries := chart.ContinuousSeries{ @@ -73,9 +78,9 @@ func graph(confs map[string]*Conf, bookname string, w io.Writer) (error) {  	}  	// Create lines marking top and bottom 10% confidence -	sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].conf < graphconf[j].conf }) -	lowconf := graphconf[int(len(graphconf) / 10)].conf -	highconf := graphconf[int((len(graphconf) / 10) * 9)].conf +	sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].Conf < graphconf[j].Conf }) +	lowconf := graphconf[int(len(graphconf) / 10)].Conf +	highconf := graphconf[int((len(graphconf) / 10) * 9)].Conf  	yvalues = []float64{}  	for _ = range graphconf {  		yvalues = append(yvalues, lowconf) | 
