summaryrefslogtreecommitdiff
path: root/bookpipeline
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-08-28 18:09:06 +0100
committerNick White <git@njw.name>2019-08-28 18:09:06 +0100
commit74b89d5f2cd968e58be9a28f1dbce7a1ebda581e (patch)
treef9e5247ae85894c374fde261c357f6967a42553f /bookpipeline
parent49b87574ba54b7450f7c0520153f5d4a4695076f (diff)
Split out bookpipeline to cmd/
Diffstat (limited to 'bookpipeline')
-rw-r--r--bookpipeline/aws.go56
-rw-r--r--bookpipeline/cmd/bookpipeline/main.go (renamed from bookpipeline/main.go)57
-rw-r--r--bookpipeline/graph.go35
3 files changed, 74 insertions, 74 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index 761031d..7409434 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -1,4 +1,4 @@
-package main
+package bookpipeline
import (
"errors"
@@ -17,10 +17,14 @@ import (
const PreprocPattern = `_bin[0-9].[0-9].png`
const HeartbeatTime = 60
-type awsConn struct {
+type Qmsg struct {
+ Handle, Body string
+}
+
+type AwsConn struct {
// these need to be set before running Init()
- region string
- logger *log.Logger
+ Region string
+ Logger *log.Logger
// these are used internally
sess *session.Session
@@ -32,17 +36,17 @@ type awsConn struct {
wipstorageid string
}
-func (a *awsConn) Init() error {
- if a.region == "" {
- return errors.New("No region set")
+func (a *AwsConn) Init() error {
+ if a.Region == "" {
+ return errors.New("No Region set")
}
- if a.logger == nil {
+ if a.Logger == nil {
return errors.New("No logger set")
}
var err error
a.sess, err = session.NewSession(&aws.Config{
- Region: aws.String(a.region),
+ Region: aws.String(a.Region),
})
if err != nil {
return errors.New(fmt.Sprintf("Failed to set up aws session: %s", err))
@@ -52,7 +56,7 @@ func (a *awsConn) Init() error {
a.downloader = s3manager.NewDownloader(a.sess)
a.uploader = s3manager.NewUploader(a.sess)
- a.logger.Println("Getting preprocess queue URL")
+ a.Logger.Println("Getting preprocess queue URL")
result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String("rescribepreprocess"),
})
@@ -61,7 +65,7 @@ func (a *awsConn) Init() error {
}
a.prequrl = *result.QueueUrl
- a.logger.Println("Getting OCR queue URL")
+ a.Logger.Println("Getting OCR queue URL")
result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String("rescribeocr"),
})
@@ -70,7 +74,7 @@ func (a *awsConn) Init() error {
}
a.ocrqurl = *result.QueueUrl
- a.logger.Println("Getting analyse queue URL")
+ a.Logger.Println("Getting analyse queue URL")
result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String("rescribeanalyse"),
})
@@ -84,7 +88,7 @@ func (a *awsConn) Init() error {
return nil
}
-func (a *awsConn) CheckQueue(url string) (Qmsg, error) {
+func (a *AwsConn) CheckQueue(url string) (Qmsg, error) {
msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(1),
VisibilityTimeout: aws.Int64(HeartbeatTime * 2),
@@ -97,14 +101,14 @@ func (a *awsConn) CheckQueue(url string) (Qmsg, error) {
if len(msgResult.Messages) > 0 {
msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body}
- a.logger.Println("Message received:", msg.Body)
+ a.Logger.Println("Message received:", msg.Body)
return msg, nil
} else {
return Qmsg{}, nil
}
}
-func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {
+func (a *AwsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error {
for _ = range t.C {
duration := int64(HeartbeatTime * 2)
_, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
@@ -119,23 +123,23 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string)
return nil
}
-func (a *awsConn) PreQueueId() string {
+func (a *AwsConn) PreQueueId() string {
return a.prequrl
}
-func (a *awsConn) OCRQueueId() string {
+func (a *AwsConn) OCRQueueId() string {
return a.ocrqurl
}
-func (a *awsConn) AnalyseQueueId() string {
+func (a *AwsConn) AnalyseQueueId() string {
return a.analysequrl
}
-func (a *awsConn) WIPStorageId() string {
+func (a *AwsConn) WIPStorageId() string {
return a.wipstorageid
}
-func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) {
+func (a *AwsConn) ListObjects(bucket string, prefix string) ([]string, error) {
var names []string
err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
@@ -149,7 +153,7 @@ func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) {
return names, err
}
-func (a *awsConn) AddToQueue(url string, msg string) error {
+func (a *AwsConn) AddToQueue(url string, msg string) error {
_, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{
MessageBody: &msg,
QueueUrl: &url,
@@ -157,7 +161,7 @@ func (a *awsConn) AddToQueue(url string, msg string) error {
return err
}
-func (a *awsConn) DelFromQueue(url string, handle string) error {
+func (a *AwsConn) DelFromQueue(url string, handle string) error {
_, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: &url,
ReceiptHandle: &handle,
@@ -165,7 +169,7 @@ func (a *awsConn) DelFromQueue(url string, handle string) error {
return err
}
-func (a *awsConn) Download(bucket string, key string, path string) error {
+func (a *AwsConn) Download(bucket string, key string, path string) error {
f, err := os.Create(path)
if err != nil {
return err
@@ -180,7 +184,7 @@ func (a *awsConn) Download(bucket string, key string, path string) error {
return err
}
-func (a *awsConn) Upload(bucket string, key string, path string) error {
+func (a *AwsConn) Upload(bucket string, key string, path string) error {
file, err := os.Open(path)
if err != nil {
log.Fatalln("Failed to open file", path, err)
@@ -195,6 +199,6 @@ func (a *awsConn) Upload(bucket string, key string, path string) error {
return err
}
-func (a *awsConn) Logger() *log.Logger {
- return a.logger
+func (a *AwsConn) GetLogger() *log.Logger {
+ return a.Logger
}
diff --git a/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go
index b7b01dd..9fa7159 100644
--- a/bookpipeline/main.go
+++ b/bookpipeline/cmd/bookpipeline/main.go
@@ -15,6 +15,7 @@ import (
"strings"
"time"
+ "rescribe.xyz/go.git/bookpipeline"
"rescribe.xyz/go.git/lib/hocr"
"rescribe.xyz/go.git/preproc"
)
@@ -36,6 +37,9 @@ one is found this general process is followed:
`
+const PauseBetweenChecks = 3 * time.Minute
+const HeartbeatTime = 60
+
// null writer to enable non-verbose logging to be discarded
type NullWriter bool
@@ -43,14 +47,12 @@ func (w NullWriter) Write(p []byte) (n int, err error) {
return len(p), nil
}
-const PauseBetweenChecks = 3 * time.Minute
-
type Clouder interface {
Init() error
ListObjects(bucket string, prefix string) ([]string, error)
Download(bucket string, key string, fn string) error
Upload(bucket string, key string, path string) error
- CheckQueue(url string) (Qmsg, error)
+ CheckQueue(url string) (bookpipeline.Qmsg, error)
AddToQueue(url string, msg string) error
DelFromQueue(url string, handle string) error
QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error
@@ -62,11 +64,7 @@ type Pipeliner interface {
OCRQueueId() string
AnalyseQueueId() string
WIPStorageId() string
- Logger() *log.Logger
-}
-
-type Qmsg struct {
- Handle, Body string
+ GetLogger() *log.Logger
}
func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error) {
@@ -131,14 +129,9 @@ func ocr(training string) func(chan string, chan string, chan error, *log.Logger
}
}
-type Conf struct {
- path, code string
- conf float64
-}
-
func analyse(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) {
- confs := make(map[string][]*Conf)
- bestconfs := make(map[string]*Conf)
+ confs := make(map[string][]*bookpipeline.Conf)
+ bestconfs := make(map[string]*bookpipeline.Conf)
savedir := ""
for path := range toanalyse {
@@ -155,10 +148,10 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log
base := filepath.Base(path)
codestart := strings.Index(base, "_bin")
name := base[0:codestart]
- var c Conf
- c.path = path
- c.code = base[codestart:]
- c.conf = avg
+ var c bookpipeline.Conf
+ c.Path = path
+ c.Code = base[codestart:]
+ c.Conf = avg
confs[name] = append(confs[name], &c)
}
@@ -177,11 +170,11 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log
for base, conf := range confs {
var best float64
for _, c := range conf {
- if c.conf > best {
- best = c.conf
+ if c.Conf > best {
+ best = c.Conf
bestconfs[base] = c
}
- _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.path, c.conf)
+ _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf)
if err != nil {
close(up)
errc <- errors.New(fmt.Sprintf("Error writing confidences file: %s", err))
@@ -201,7 +194,7 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log
}
defer f.Close()
for _, conf := range bestconfs {
- _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.path))
+ _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path))
}
up <- fn
@@ -214,7 +207,7 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log
return
}
defer f.Close()
- err = graph(bestconfs, filepath.Base(savedir), f)
+ err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f)
if err != nil {
close(up)
errc <- errors.New(fmt.Sprintf("Error rendering graph: %s", err))
@@ -222,12 +215,10 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log
}
up <- fn
- // TODO: generate a general report.txt with statistics etc for the book, send to up
-
close(up)
}
-func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
+func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
bookname := msg.Body
t := time.NewTicker(HeartbeatTime * time.Second)
@@ -248,10 +239,10 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string
// these functions will do their jobs when their channels have data
go download(dl, processc, conn, d, errc)
- go process(processc, upc, errc, conn.Logger())
+ go process(processc, upc, errc, conn.GetLogger())
go up(upc, done, conn, bookname, errc)
- conn.Logger().Println("Getting list of objects to download")
+ conn.GetLogger().Println("Getting list of objects to download")
objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
if err != nil {
t.Stop()
@@ -261,7 +252,7 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string
var todl []string
for _, n := range objs {
if !match.MatchString(n) {
- conn.Logger().Println("Skipping item that doesn't match target", n)
+ conn.GetLogger().Println("Skipping item that doesn't match target", n)
continue
}
todl = append(todl, n)
@@ -281,7 +272,7 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string
}
if toQueue != "" {
- conn.Logger().Println("Sending", bookname, "to queue")
+ conn.GetLogger().Println("Sending", bookname, "to queue")
err = conn.AddToQueue(toQueue, bookname)
if err != nil {
t.Stop()
@@ -292,7 +283,7 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string
t.Stop()
- conn.Logger().Println("Deleting original message from queue")
+ conn.GetLogger().Println("Deleting original message from queue")
err = conn.DelFromQueue(fromQueue, msg.Handle)
if err != nil {
_ = os.RemoveAll(d)
@@ -329,7 +320,7 @@ func main() {
ocredPattern := regexp.MustCompile(`.hocr$`)
var conn Pipeliner
- conn = &awsConn{region: "eu-west-2", logger: verboselog}
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
verboselog.Println("Setting up AWS session")
err := conn.Init()
diff --git a/bookpipeline/graph.go b/bookpipeline/graph.go
index 27ffd39..a4fdee0 100644
--- a/bookpipeline/graph.go
+++ b/bookpipeline/graph.go
@@ -1,4 +1,4 @@
-package main
+package bookpipeline
import (
"fmt"
@@ -14,26 +14,31 @@ import (
const maxticks = 20
const cutoff = 70
+type Conf struct {
+ Path, Code string
+ Conf float64
+}
+
type GraphConf struct {
- pgnum, conf float64
+ Pgnum, Conf float64
}
-func graph(confs map[string]*Conf, bookname string, w io.Writer) (error) {
+func Graph(confs map[string]*Conf, bookname string, w io.Writer) (error) {
// Organise confs to sort them by page
var graphconf []GraphConf
for _, conf := range confs {
- name := filepath.Base(conf.path)
+ name := filepath.Base(conf.Path)
numend := strings.Index(name, "_")
pgnum, err := strconv.ParseFloat(name[0:numend], 64)
if err != nil {
continue
}
var c GraphConf
- c.pgnum = pgnum
- c.conf = conf.conf
+ c.Pgnum = pgnum
+ c.Conf = conf.Conf
graphconf = append(graphconf, c)
}
- sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].pgnum < graphconf[j].pgnum })
+ sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].Pgnum < graphconf[j].Pgnum })
// Create main xvalues and yvalues, annotations and ticks
var xvalues, yvalues []float64
@@ -43,13 +48,13 @@ func graph(confs map[string]*Conf, bookname string, w io.Writer) (error) {
tickevery := len(graphconf) / maxticks
for _, c := range graphconf {
i = i + 1
- xvalues = append(xvalues, c.pgnum)
- yvalues = append(yvalues, c.conf)
- if c.conf < cutoff {
- annotations = append(annotations, chart.Value2{Label: fmt.Sprintf("%.0f", c.pgnum), XValue: c.pgnum, YValue: c.conf})
+ xvalues = append(xvalues, c.Pgnum)
+ yvalues = append(yvalues, c.Conf)
+ if c.Conf < cutoff {
+ annotations = append(annotations, chart.Value2{Label: fmt.Sprintf("%.0f", c.Pgnum), XValue: c.Pgnum, YValue: c.Conf})
}
if tickevery % i == 0 {
- ticks = append(ticks, chart.Tick{c.pgnum, fmt.Sprintf("%.0f", c.pgnum)})
+ ticks = append(ticks, chart.Tick{c.Pgnum, fmt.Sprintf("%.0f", c.Pgnum)})
}
}
mainSeries := chart.ContinuousSeries{
@@ -73,9 +78,9 @@ func graph(confs map[string]*Conf, bookname string, w io.Writer) (error) {
}
// Create lines marking top and bottom 10% confidence
- sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].conf < graphconf[j].conf })
- lowconf := graphconf[int(len(graphconf) / 10)].conf
- highconf := graphconf[int((len(graphconf) / 10) * 9)].conf
+ sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].Conf < graphconf[j].Conf })
+ lowconf := graphconf[int(len(graphconf) / 10)].Conf
+ highconf := graphconf[int((len(graphconf) / 10) * 9)].Conf
yvalues = []float64{}
for _ = range graphconf {
yvalues = append(yvalues, lowconf)