diff options
-rw-r--r-- | bookpipeline/aws.go | 56 | ||||
-rw-r--r-- | bookpipeline/cmd/bookpipeline/main.go (renamed from bookpipeline/main.go) | 57 | ||||
-rw-r--r-- | bookpipeline/graph.go | 35 |
3 files changed, 74 insertions, 74 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index 761031d..7409434 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -1,4 +1,4 @@ -package main +package bookpipeline import ( "errors" @@ -17,10 +17,14 @@ import ( const PreprocPattern = `_bin[0-9].[0-9].png` const HeartbeatTime = 60 -type awsConn struct { +type Qmsg struct { + Handle, Body string +} + +type AwsConn struct { // these need to be set before running Init() - region string - logger *log.Logger + Region string + Logger *log.Logger // these are used internally sess *session.Session @@ -32,17 +36,17 @@ type awsConn struct { wipstorageid string } -func (a *awsConn) Init() error { - if a.region == "" { - return errors.New("No region set") +func (a *AwsConn) Init() error { + if a.Region == "" { + return errors.New("No Region set") } - if a.logger == nil { + if a.Logger == nil { return errors.New("No logger set") } var err error a.sess, err = session.NewSession(&aws.Config{ - Region: aws.String(a.region), + Region: aws.String(a.Region), }) if err != nil { return errors.New(fmt.Sprintf("Failed to set up aws session: %s", err)) @@ -52,7 +56,7 @@ func (a *awsConn) Init() error { a.downloader = s3manager.NewDownloader(a.sess) a.uploader = s3manager.NewUploader(a.sess) - a.logger.Println("Getting preprocess queue URL") + a.Logger.Println("Getting preprocess queue URL") result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String("rescribepreprocess"), }) @@ -61,7 +65,7 @@ func (a *awsConn) Init() error { } a.prequrl = *result.QueueUrl - a.logger.Println("Getting OCR queue URL") + a.Logger.Println("Getting OCR queue URL") result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String("rescribeocr"), }) @@ -70,7 +74,7 @@ func (a *awsConn) Init() error { } a.ocrqurl = *result.QueueUrl - a.logger.Println("Getting analyse queue URL") + a.Logger.Println("Getting analyse queue URL") result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String("rescribeanalyse"), }) @@ -84,7 +88,7 @@ func (a *awsConn) Init() error { return nil } -func (a *awsConn) CheckQueue(url string) (Qmsg, error) { +func (a *AwsConn) CheckQueue(url string) (Qmsg, error) { msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ MaxNumberOfMessages: aws.Int64(1), VisibilityTimeout: aws.Int64(HeartbeatTime * 2), @@ -97,14 +101,14 @@ func (a *awsConn) CheckQueue(url string) (Qmsg, error) { if len(msgResult.Messages) > 0 { msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body} - a.logger.Println("Message received:", msg.Body) + a.Logger.Println("Message received:", msg.Body) return msg, nil } else { return Qmsg{}, nil } } -func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { +func (a *AwsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { for _ = range t.C { duration := int64(HeartbeatTime * 2) _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ @@ -119,23 +123,23 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) return nil } -func (a *awsConn) PreQueueId() string { +func (a *AwsConn) PreQueueId() string { return a.prequrl } -func (a *awsConn) OCRQueueId() string { +func (a *AwsConn) OCRQueueId() string { return a.ocrqurl } -func (a *awsConn) AnalyseQueueId() string { +func (a *AwsConn) AnalyseQueueId() string { return a.analysequrl } -func (a *awsConn) WIPStorageId() string { +func (a *AwsConn) WIPStorageId() string { return a.wipstorageid } -func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) { +func (a *AwsConn) ListObjects(bucket string, prefix string) ([]string, error) { var names []string err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ Bucket: aws.String(bucket), @@ -149,7 +153,7 @@ func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) { return names, err } -func (a *awsConn) AddToQueue(url string, msg string) error { +func (a *AwsConn) AddToQueue(url string, msg string) error { _, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{ MessageBody: &msg, QueueUrl: &url, @@ -157,7 +161,7 @@ func (a *awsConn) AddToQueue(url string, msg string) error { return err } -func (a *awsConn) DelFromQueue(url string, handle string) error { +func (a *AwsConn) DelFromQueue(url string, handle string) error { _, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ QueueUrl: &url, ReceiptHandle: &handle, @@ -165,7 +169,7 @@ func (a *awsConn) DelFromQueue(url string, handle string) error { return err } -func (a *awsConn) Download(bucket string, key string, path string) error { +func (a *AwsConn) Download(bucket string, key string, path string) error { f, err := os.Create(path) if err != nil { return err @@ -180,7 +184,7 @@ func (a *awsConn) Download(bucket string, key string, path string) error { return err } -func (a *awsConn) Upload(bucket string, key string, path string) error { +func (a *AwsConn) Upload(bucket string, key string, path string) error { file, err := os.Open(path) if err != nil { log.Fatalln("Failed to open file", path, err) @@ -195,6 +199,6 @@ func (a *awsConn) Upload(bucket string, key string, path string) error { return err } -func (a *awsConn) Logger() *log.Logger { - return a.logger +func (a *AwsConn) GetLogger() *log.Logger { + return a.Logger } diff --git a/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go index b7b01dd..9fa7159 100644 --- a/bookpipeline/main.go +++ b/bookpipeline/cmd/bookpipeline/main.go @@ -15,6 +15,7 @@ import ( "strings" "time" + "rescribe.xyz/go.git/bookpipeline" "rescribe.xyz/go.git/lib/hocr" "rescribe.xyz/go.git/preproc" ) @@ -36,6 +37,9 @@ one is found this general process is followed: ` +const PauseBetweenChecks = 3 * time.Minute +const HeartbeatTime = 60 + // null writer to enable non-verbose logging to be discarded type NullWriter bool @@ -43,14 +47,12 @@ func (w NullWriter) Write(p []byte) (n int, err error) { return len(p), nil } -const PauseBetweenChecks = 3 * time.Minute - type Clouder interface { Init() error ListObjects(bucket string, prefix string) ([]string, error) Download(bucket string, key string, fn string) error Upload(bucket string, key string, path string) error - CheckQueue(url string) (Qmsg, error) + CheckQueue(url string) (bookpipeline.Qmsg, error) AddToQueue(url string, msg string) error DelFromQueue(url string, handle string) error QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error @@ -62,11 +64,7 @@ type Pipeliner interface { OCRQueueId() string AnalyseQueueId() string WIPStorageId() string - Logger() *log.Logger -} - -type Qmsg struct { - Handle, Body string + GetLogger() *log.Logger } func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error) { @@ -131,14 +129,9 @@ func ocr(training string) func(chan string, chan string, chan error, *log.Logger } } -type Conf struct { - path, code string - conf float64 -} - func analyse(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { - confs := make(map[string][]*Conf) - bestconfs := make(map[string]*Conf) + confs := make(map[string][]*bookpipeline.Conf) + bestconfs := make(map[string]*bookpipeline.Conf) savedir := "" for path := range toanalyse { @@ -155,10 +148,10 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log base := filepath.Base(path) codestart := strings.Index(base, "_bin") name := base[0:codestart] - var c Conf - c.path = path - c.code = base[codestart:] - c.conf = avg + var c bookpipeline.Conf + c.Path = path + c.Code = base[codestart:] + c.Conf = avg confs[name] = append(confs[name], &c) } @@ -177,11 +170,11 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log for base, conf := range confs { var best float64 for _, c := range conf { - if c.conf > best { - best = c.conf + if c.Conf > best { + best = c.Conf bestconfs[base] = c } - _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.path, c.conf) + _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) if err != nil { close(up) errc <- errors.New(fmt.Sprintf("Error writing confidences file: %s", err)) @@ -201,7 +194,7 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log } defer f.Close() for _, conf := range bestconfs { - _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.path)) + _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) } up <- fn @@ -214,7 +207,7 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log return } defer f.Close() - err = graph(bestconfs, filepath.Base(savedir), f) + err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) if err != nil { close(up) errc <- errors.New(fmt.Sprintf("Error rendering graph: %s", err)) @@ -222,12 +215,10 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log } up <- fn - // TODO: generate a general report.txt with statistics etc for the book, send to up - close(up) } -func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { +func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { bookname := msg.Body t := time.NewTicker(HeartbeatTime * time.Second) @@ -248,10 +239,10 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string // these functions will do their jobs when their channels have data go download(dl, processc, conn, d, errc) - go process(processc, upc, errc, conn.Logger()) + go process(processc, upc, errc, conn.GetLogger()) go up(upc, done, conn, bookname, errc) - conn.Logger().Println("Getting list of objects to download") + conn.GetLogger().Println("Getting list of objects to download") objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) if err != nil { t.Stop() @@ -261,7 +252,7 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string var todl []string for _, n := range objs { if !match.MatchString(n) { - conn.Logger().Println("Skipping item that doesn't match target", n) + conn.GetLogger().Println("Skipping item that doesn't match target", n) continue } todl = append(todl, n) @@ -281,7 +272,7 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string } if toQueue != "" { - conn.Logger().Println("Sending", bookname, "to queue") + conn.GetLogger().Println("Sending", bookname, "to queue") err = conn.AddToQueue(toQueue, bookname) if err != nil { t.Stop() @@ -292,7 +283,7 @@ func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string t.Stop() - conn.Logger().Println("Deleting original message from queue") + conn.GetLogger().Println("Deleting original message from queue") err = conn.DelFromQueue(fromQueue, msg.Handle) if err != nil { _ = os.RemoveAll(d) @@ -329,7 +320,7 @@ func main() { ocredPattern := regexp.MustCompile(`.hocr$`) var conn Pipeliner - conn = &awsConn{region: "eu-west-2", logger: verboselog} + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} verboselog.Println("Setting up AWS session") err := conn.Init() diff --git a/bookpipeline/graph.go b/bookpipeline/graph.go index 27ffd39..a4fdee0 100644 --- a/bookpipeline/graph.go +++ b/bookpipeline/graph.go @@ -1,4 +1,4 @@ -package main +package bookpipeline import ( "fmt" @@ -14,26 +14,31 @@ import ( const maxticks = 20 const cutoff = 70 +type Conf struct { + Path, Code string + Conf float64 +} + type GraphConf struct { - pgnum, conf float64 + Pgnum, Conf float64 } -func graph(confs map[string]*Conf, bookname string, w io.Writer) (error) { +func Graph(confs map[string]*Conf, bookname string, w io.Writer) (error) { // Organise confs to sort them by page var graphconf []GraphConf for _, conf := range confs { - name := filepath.Base(conf.path) + name := filepath.Base(conf.Path) numend := strings.Index(name, "_") pgnum, err := strconv.ParseFloat(name[0:numend], 64) if err != nil { continue } var c GraphConf - c.pgnum = pgnum - c.conf = conf.conf + c.Pgnum = pgnum + c.Conf = conf.Conf graphconf = append(graphconf, c) } - sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].pgnum < graphconf[j].pgnum }) + sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].Pgnum < graphconf[j].Pgnum }) // Create main xvalues and yvalues, annotations and ticks var xvalues, yvalues []float64 @@ -43,13 +48,13 @@ func graph(confs map[string]*Conf, bookname string, w io.Writer) (error) { tickevery := len(graphconf) / maxticks for _, c := range graphconf { i = i + 1 - xvalues = append(xvalues, c.pgnum) - yvalues = append(yvalues, c.conf) - if c.conf < cutoff { - annotations = append(annotations, chart.Value2{Label: fmt.Sprintf("%.0f", c.pgnum), XValue: c.pgnum, YValue: c.conf}) + xvalues = append(xvalues, c.Pgnum) + yvalues = append(yvalues, c.Conf) + if c.Conf < cutoff { + annotations = append(annotations, chart.Value2{Label: fmt.Sprintf("%.0f", c.Pgnum), XValue: c.Pgnum, YValue: c.Conf}) } if tickevery % i == 0 { - ticks = append(ticks, chart.Tick{c.pgnum, fmt.Sprintf("%.0f", c.pgnum)}) + ticks = append(ticks, chart.Tick{c.Pgnum, fmt.Sprintf("%.0f", c.Pgnum)}) } } mainSeries := chart.ContinuousSeries{ @@ -73,9 +78,9 @@ func graph(confs map[string]*Conf, bookname string, w io.Writer) (error) { } // Create lines marking top and bottom 10% confidence - sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].conf < graphconf[j].conf }) - lowconf := graphconf[int(len(graphconf) / 10)].conf - highconf := graphconf[int((len(graphconf) / 10) * 9)].conf + sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].Conf < graphconf[j].Conf }) + lowconf := graphconf[int(len(graphconf) / 10)].Conf + highconf := graphconf[int((len(graphconf) / 10) * 9)].Conf yvalues = []float64{} for _ = range graphconf { yvalues = append(yvalues, lowconf) |