From 74b89d5f2cd968e58be9a28f1dbce7a1ebda581e Mon Sep 17 00:00:00 2001 From: Nick White Date: Wed, 28 Aug 2019 18:09:06 +0100 Subject: Split out bookpipeline to cmd/ --- bookpipeline/aws.go | 56 ++--- bookpipeline/cmd/bookpipeline/main.go | 388 +++++++++++++++++++++++++++++++++ bookpipeline/graph.go | 35 +-- bookpipeline/main.go | 397 ---------------------------------- 4 files changed, 438 insertions(+), 438 deletions(-) create mode 100644 bookpipeline/cmd/bookpipeline/main.go delete mode 100644 bookpipeline/main.go diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index 761031d..7409434 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -1,4 +1,4 @@ -package main +package bookpipeline import ( "errors" @@ -17,10 +17,14 @@ import ( const PreprocPattern = `_bin[0-9].[0-9].png` const HeartbeatTime = 60 -type awsConn struct { +type Qmsg struct { + Handle, Body string +} + +type AwsConn struct { // these need to be set before running Init() - region string - logger *log.Logger + Region string + Logger *log.Logger // these are used internally sess *session.Session @@ -32,17 +36,17 @@ type awsConn struct { wipstorageid string } -func (a *awsConn) Init() error { - if a.region == "" { - return errors.New("No region set") +func (a *AwsConn) Init() error { + if a.Region == "" { + return errors.New("No Region set") } - if a.logger == nil { + if a.Logger == nil { return errors.New("No logger set") } var err error a.sess, err = session.NewSession(&aws.Config{ - Region: aws.String(a.region), + Region: aws.String(a.Region), }) if err != nil { return errors.New(fmt.Sprintf("Failed to set up aws session: %s", err)) @@ -52,7 +56,7 @@ func (a *awsConn) Init() error { a.downloader = s3manager.NewDownloader(a.sess) a.uploader = s3manager.NewUploader(a.sess) - a.logger.Println("Getting preprocess queue URL") + a.Logger.Println("Getting preprocess queue URL") result, err := a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String("rescribepreprocess"), }) @@ -61,7 +65,7 @@ func (a *awsConn) Init() error { } a.prequrl = *result.QueueUrl - a.logger.Println("Getting OCR queue URL") + a.Logger.Println("Getting OCR queue URL") result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String("rescribeocr"), }) @@ -70,7 +74,7 @@ func (a *awsConn) Init() error { } a.ocrqurl = *result.QueueUrl - a.logger.Println("Getting analyse queue URL") + a.Logger.Println("Getting analyse queue URL") result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String("rescribeanalyse"), }) @@ -84,7 +88,7 @@ func (a *awsConn) Init() error { return nil } -func (a *awsConn) CheckQueue(url string) (Qmsg, error) { +func (a *AwsConn) CheckQueue(url string) (Qmsg, error) { msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ MaxNumberOfMessages: aws.Int64(1), VisibilityTimeout: aws.Int64(HeartbeatTime * 2), @@ -97,14 +101,14 @@ func (a *awsConn) CheckQueue(url string) (Qmsg, error) { if len(msgResult.Messages) > 0 { msg := Qmsg{Handle: *msgResult.Messages[0].ReceiptHandle, Body: *msgResult.Messages[0].Body} - a.logger.Println("Message received:", msg.Body) + a.Logger.Println("Message received:", msg.Body) return msg, nil } else { return Qmsg{}, nil } } -func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { +func (a *AwsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error { for _ = range t.C { duration := int64(HeartbeatTime * 2) _, err := a.sqssvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ @@ -119,23 +123,23 @@ func (a *awsConn) QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) return nil } -func (a *awsConn) PreQueueId() string { +func (a *AwsConn) PreQueueId() string { return a.prequrl } -func (a *awsConn) OCRQueueId() string { +func (a *AwsConn) OCRQueueId() string { return a.ocrqurl } -func (a *awsConn) AnalyseQueueId() string { +func (a *AwsConn) AnalyseQueueId() string { return a.analysequrl } -func (a *awsConn) WIPStorageId() string { +func (a *AwsConn) WIPStorageId() string { return a.wipstorageid } -func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) { +func (a *AwsConn) ListObjects(bucket string, prefix string) ([]string, error) { var names []string err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ Bucket: aws.String(bucket), @@ -149,7 +153,7 @@ func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) { return names, err } -func (a *awsConn) AddToQueue(url string, msg string) error { +func (a *AwsConn) AddToQueue(url string, msg string) error { _, err := a.sqssvc.SendMessage(&sqs.SendMessageInput{ MessageBody: &msg, QueueUrl: &url, @@ -157,7 +161,7 @@ func (a *awsConn) AddToQueue(url string, msg string) error { return err } -func (a *awsConn) DelFromQueue(url string, handle string) error { +func (a *AwsConn) DelFromQueue(url string, handle string) error { _, err := a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ QueueUrl: &url, ReceiptHandle: &handle, @@ -165,7 +169,7 @@ func (a *awsConn) DelFromQueue(url string, handle string) error { return err } -func (a *awsConn) Download(bucket string, key string, path string) error { +func (a *AwsConn) Download(bucket string, key string, path string) error { f, err := os.Create(path) if err != nil { return err @@ -180,7 +184,7 @@ func (a *awsConn) Download(bucket string, key string, path string) error { return err } -func (a *awsConn) Upload(bucket string, key string, path string) error { +func (a *AwsConn) Upload(bucket string, key string, path string) error { file, err := os.Open(path) if err != nil { log.Fatalln("Failed to open file", path, err) @@ -195,6 +199,6 @@ func (a *awsConn) Upload(bucket string, key string, path string) error { return err } -func (a *awsConn) Logger() *log.Logger { - return a.logger +func (a *AwsConn) GetLogger() *log.Logger { + return a.Logger } diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go new file mode 100644 index 0000000..9fa7159 --- /dev/null +++ b/bookpipeline/cmd/bookpipeline/main.go @@ -0,0 +1,388 @@ +package main + +// TODO: have logs go somewhere useful, like email +// TODO: check if images are prebinarised and if so skip multiple binarisation + +import ( + "errors" + "flag" + "fmt" + "log" + "os" + "os/exec" + "path/filepath" + "regexp" + "strings" + "time" + + "rescribe.xyz/go.git/bookpipeline" + "rescribe.xyz/go.git/lib/hocr" + "rescribe.xyz/go.git/preproc" +) + +const usage = `Usage: bookpipeline [-v] [-t training] + +Watches the preprocess, ocr and analyse queues for book names. When +one is found this general process is followed: + +- The book name is hidden from the queue, and a 'heartbeat' is + started which keeps it hidden (this will time out after 2 minutes + if the program is terminated) +- The necessary files from bookname/ are downloaded +- The files are processed +- The resulting files are uploaded to bookname/ +- The heartbeat is stopped +- The book name is removed from the queue it was taken from, and + added to the next queue for future processing + +` + +const PauseBetweenChecks = 3 * time.Minute +const HeartbeatTime = 60 + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} + +type Clouder interface { + Init() error + ListObjects(bucket string, prefix string) ([]string, error) + Download(bucket string, key string, fn string) error + Upload(bucket string, key string, path string) error + CheckQueue(url string) (bookpipeline.Qmsg, error) + AddToQueue(url string, msg string) error + DelFromQueue(url string, handle string) error + QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error +} + +type Pipeliner interface { + Clouder + PreQueueId() string + OCRQueueId() string + AnalyseQueueId() string + WIPStorageId() string + GetLogger() *log.Logger +} + +func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error) { + for key := range dl { + fn := filepath.Join(dir, filepath.Base(key)) + err := conn.Download(conn.WIPStorageId(), key, fn) + if err != nil { + close(process) + errc <- err + return + } + process <- fn + } + close(process) +} + +func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error) { + for path := range c { + name := filepath.Base(path) + key := filepath.Join(bookname, name) + err := conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + errc <- err + return + } + } + + done <- true +} + +func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range pre { + logger.Println("Preprocessing", path) + done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30) + if err != nil { + close(up) + errc <- err + return + } + for _, p := range done { + up <- p + } + } + close(up) +} + +func ocr(training string) func(chan string, chan string, chan error, *log.Logger) { + return func (toocr chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range toocr { + logger.Println("OCRing", path) + name := strings.Replace(path, ".png", "", 1) + cmd := exec.Command("tesseract", "-l", training, path, name, "hocr") + err := cmd.Run() + if err != nil { + close(up) + errc <- errors.New(fmt.Sprintf("Error ocring %s: %s", path, err)) + return + } + up <- name + ".hocr" + } + close(up) + } +} + +func analyse(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { + confs := make(map[string][]*bookpipeline.Conf) + bestconfs := make(map[string]*bookpipeline.Conf) + savedir := "" + + for path := range toanalyse { + if savedir == "" { + savedir = filepath.Dir(path) + } + logger.Println("Calculating confidence for", path) + avg, err := hocr.GetAvgConf(path) + if err != nil { + close(up) + errc <- errors.New(fmt.Sprintf("Error retreiving confidence for %s: %s", path, err)) + return + } + base := filepath.Base(path) + codestart := strings.Index(base, "_bin") + name := base[0:codestart] + var c bookpipeline.Conf + c.Path = path + c.Code = base[codestart:] + c.Conf = avg + confs[name] = append(confs[name], &c) + + } + + fn := filepath.Join(savedir, "conf") + logger.Println("Saving confidences in file", fn) + f, err := os.Create(fn) + if err != nil { + close(up) + errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err)) + return + } + defer f.Close() + + logger.Println("Finding best confidence for each page, and saving all confidences") + for base, conf := range confs { + var best float64 + for _, c := range conf { + if c.Conf > best { + best = c.Conf + bestconfs[base] = c + } + _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) + if err != nil { + close(up) + errc <- errors.New(fmt.Sprintf("Error writing confidences file: %s", err)) + return + } + } + } + up <- fn + + logger.Println("Creating best file listing the best file for each page") + fn = filepath.Join(savedir, "best") + f, err = os.Create(fn) + if err != nil { + close(up) + errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err)) + return + } + defer f.Close() + for _, conf := range bestconfs { + _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) + } + up <- fn + + logger.Println("Creating graph") + fn = filepath.Join(savedir, "graph.png") + f, err = os.Create(fn) + if err != nil { + close(up) + errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err)) + return + } + defer f.Close() + err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) + if err != nil { + close(up) + errc <- errors.New(fmt.Sprintf("Error rendering graph: %s", err)) + return + } + up <- fn + + close(up) +} + +func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { + bookname := msg.Body + + t := time.NewTicker(HeartbeatTime * time.Second) + go conn.QueueHeartbeat(t, msg.Handle, fromQueue) + + d := filepath.Join(os.TempDir(), bookname) + err := os.MkdirAll(d, 0755) + if err != nil { + t.Stop() + return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err)) + } + + dl := make(chan string) + processc := make(chan string) + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) + + // these functions will do their jobs when their channels have data + go download(dl, processc, conn, d, errc) + go process(processc, upc, errc, conn.GetLogger()) + go up(upc, done, conn, bookname, errc) + + conn.GetLogger().Println("Getting list of objects to download") + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err)) + } + var todl []string + for _, n := range objs { + if !match.MatchString(n) { + conn.GetLogger().Println("Skipping item that doesn't match target", n) + continue + } + todl = append(todl, n) + } + for _, a := range todl { + dl <- a + } + close(dl) + + // wait for either the done or errc channel to be sent to + select { + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + return err + case <-done: + } + + if toQueue != "" { + conn.GetLogger().Println("Sending", bookname, "to queue") + err = conn.AddToQueue(toQueue, bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err)) + } + } + + t.Stop() + + conn.GetLogger().Println("Deleting original message from queue") + err = conn.DelFromQueue(fromQueue, msg.Handle) + if err != nil { + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err)) + } + + err = os.RemoveAll(d) + if err != nil { + return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err)) + } + + return nil +} + +func main() { + verbose := flag.Bool("v", false, "verbose") + training := flag.String("t", "rescribealphav5", "tesseract training file to use") + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + var verboselog *log.Logger + if *verbose { + verboselog = log.New(os.Stdout, "", log.LstdFlags) + } else { + var n NullWriter + verboselog = log.New(n, "", log.LstdFlags) + } + + origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) // TODO: match alternative file naming + preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) + ocredPattern := regexp.MustCompile(`.hocr$`) + + var conn Pipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + + verboselog.Println("Setting up AWS session") + err := conn.Init() + if err != nil { + log.Fatalln("Error setting up cloud connection:", err) + } + verboselog.Println("Finished setting up AWS session") + + var checkPreQueue <-chan time.Time + var checkOCRQueue <-chan time.Time + var checkAnalyseQueue <-chan time.Time + checkPreQueue = time.After(0) + checkOCRQueue = time.After(0) + checkAnalyseQueue = time.After(0) + + for { + select { + case <-checkPreQueue: + msg, err := conn.CheckQueue(conn.PreQueueId()) + checkPreQueue = time.After(PauseBetweenChecks) + if err != nil { + log.Println("Error checking preprocess queue", err) + continue + } + if msg.Handle == "" { + verboselog.Println("No message received on preprocess queue, sleeping") + continue + } + err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId()) + if err != nil { + log.Println("Error during preprocess", err) + } + case <-checkOCRQueue: + msg, err := conn.CheckQueue(conn.OCRQueueId()) + checkOCRQueue = time.After(PauseBetweenChecks) + if err != nil { + log.Println("Error checking OCR queue", err) + continue + } + if msg.Handle == "" { + verboselog.Println("No message received on OCR queue, sleeping") + continue + } + err = processBook(msg, conn, ocr(*training), preprocessedPattern, conn.OCRQueueId(), conn.AnalyseQueueId()) + if err != nil { + log.Println("Error during OCR process", err) + } + case <-checkAnalyseQueue: + msg, err := conn.CheckQueue(conn.AnalyseQueueId()) + checkAnalyseQueue = time.After(PauseBetweenChecks) + if err != nil { + log.Println("Error checking analyse queue", err) + continue + } + if msg.Handle == "" { + verboselog.Println("No message received on analyse queue, sleeping") + continue + } + err = processBook(msg, conn, analyse, ocredPattern, conn.AnalyseQueueId(), "") + if err != nil { + log.Println("Error during analysis", err) + } + } + } +} diff --git a/bookpipeline/graph.go b/bookpipeline/graph.go index 27ffd39..a4fdee0 100644 --- a/bookpipeline/graph.go +++ b/bookpipeline/graph.go @@ -1,4 +1,4 @@ -package main +package bookpipeline import ( "fmt" @@ -14,26 +14,31 @@ import ( const maxticks = 20 const cutoff = 70 +type Conf struct { + Path, Code string + Conf float64 +} + type GraphConf struct { - pgnum, conf float64 + Pgnum, Conf float64 } -func graph(confs map[string]*Conf, bookname string, w io.Writer) (error) { +func Graph(confs map[string]*Conf, bookname string, w io.Writer) (error) { // Organise confs to sort them by page var graphconf []GraphConf for _, conf := range confs { - name := filepath.Base(conf.path) + name := filepath.Base(conf.Path) numend := strings.Index(name, "_") pgnum, err := strconv.ParseFloat(name[0:numend], 64) if err != nil { continue } var c GraphConf - c.pgnum = pgnum - c.conf = conf.conf + c.Pgnum = pgnum + c.Conf = conf.Conf graphconf = append(graphconf, c) } - sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].pgnum < graphconf[j].pgnum }) + sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].Pgnum < graphconf[j].Pgnum }) // Create main xvalues and yvalues, annotations and ticks var xvalues, yvalues []float64 @@ -43,13 +48,13 @@ func graph(confs map[string]*Conf, bookname string, w io.Writer) (error) { tickevery := len(graphconf) / maxticks for _, c := range graphconf { i = i + 1 - xvalues = append(xvalues, c.pgnum) - yvalues = append(yvalues, c.conf) - if c.conf < cutoff { - annotations = append(annotations, chart.Value2{Label: fmt.Sprintf("%.0f", c.pgnum), XValue: c.pgnum, YValue: c.conf}) + xvalues = append(xvalues, c.Pgnum) + yvalues = append(yvalues, c.Conf) + if c.Conf < cutoff { + annotations = append(annotations, chart.Value2{Label: fmt.Sprintf("%.0f", c.Pgnum), XValue: c.Pgnum, YValue: c.Conf}) } if tickevery % i == 0 { - ticks = append(ticks, chart.Tick{c.pgnum, fmt.Sprintf("%.0f", c.pgnum)}) + ticks = append(ticks, chart.Tick{c.Pgnum, fmt.Sprintf("%.0f", c.Pgnum)}) } } mainSeries := chart.ContinuousSeries{ @@ -73,9 +78,9 @@ func graph(confs map[string]*Conf, bookname string, w io.Writer) (error) { } // Create lines marking top and bottom 10% confidence - sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].conf < graphconf[j].conf }) - lowconf := graphconf[int(len(graphconf) / 10)].conf - highconf := graphconf[int((len(graphconf) / 10) * 9)].conf + sort.Slice(graphconf, func(i, j int) bool { return graphconf[i].Conf < graphconf[j].Conf }) + lowconf := graphconf[int(len(graphconf) / 10)].Conf + highconf := graphconf[int((len(graphconf) / 10) * 9)].Conf yvalues = []float64{} for _ = range graphconf { yvalues = append(yvalues, lowconf) diff --git a/bookpipeline/main.go b/bookpipeline/main.go deleted file mode 100644 index b7b01dd..0000000 --- a/bookpipeline/main.go +++ /dev/null @@ -1,397 +0,0 @@ -package main - -// TODO: have logs go somewhere useful, like email -// TODO: check if images are prebinarised and if so skip multiple binarisation - -import ( - "errors" - "flag" - "fmt" - "log" - "os" - "os/exec" - "path/filepath" - "regexp" - "strings" - "time" - - "rescribe.xyz/go.git/lib/hocr" - "rescribe.xyz/go.git/preproc" -) - -const usage = `Usage: bookpipeline [-v] [-t training] - -Watches the preprocess, ocr and analyse queues for book names. When -one is found this general process is followed: - -- The book name is hidden from the queue, and a 'heartbeat' is - started which keeps it hidden (this will time out after 2 minutes - if the program is terminated) -- The necessary files from bookname/ are downloaded -- The files are processed -- The resulting files are uploaded to bookname/ -- The heartbeat is stopped -- The book name is removed from the queue it was taken from, and - added to the next queue for future processing - -` - -// null writer to enable non-verbose logging to be discarded -type NullWriter bool - -func (w NullWriter) Write(p []byte) (n int, err error) { - return len(p), nil -} - -const PauseBetweenChecks = 3 * time.Minute - -type Clouder interface { - Init() error - ListObjects(bucket string, prefix string) ([]string, error) - Download(bucket string, key string, fn string) error - Upload(bucket string, key string, path string) error - CheckQueue(url string) (Qmsg, error) - AddToQueue(url string, msg string) error - DelFromQueue(url string, handle string) error - QueueHeartbeat(t *time.Ticker, msgHandle string, qurl string) error -} - -type Pipeliner interface { - Clouder - PreQueueId() string - OCRQueueId() string - AnalyseQueueId() string - WIPStorageId() string - Logger() *log.Logger -} - -type Qmsg struct { - Handle, Body string -} - -func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error) { - for key := range dl { - fn := filepath.Join(dir, filepath.Base(key)) - err := conn.Download(conn.WIPStorageId(), key, fn) - if err != nil { - close(process) - errc <- err - return - } - process <- fn - } - close(process) -} - -func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error) { - for path := range c { - name := filepath.Base(path) - key := filepath.Join(bookname, name) - err := conn.Upload(conn.WIPStorageId(), key, path) - if err != nil { - errc <- err - return - } - } - - done <- true -} - -func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range pre { - logger.Println("Preprocessing", path) - done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30) - if err != nil { - close(up) - errc <- err - return - } - for _, p := range done { - up <- p - } - } - close(up) -} - -func ocr(training string) func(chan string, chan string, chan error, *log.Logger) { - return func (toocr chan string, up chan string, errc chan error, logger *log.Logger) { - for path := range toocr { - logger.Println("OCRing", path) - name := strings.Replace(path, ".png", "", 1) - cmd := exec.Command("tesseract", "-l", training, path, name, "hocr") - err := cmd.Run() - if err != nil { - close(up) - errc <- errors.New(fmt.Sprintf("Error ocring %s: %s", path, err)) - return - } - up <- name + ".hocr" - } - close(up) - } -} - -type Conf struct { - path, code string - conf float64 -} - -func analyse(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { - confs := make(map[string][]*Conf) - bestconfs := make(map[string]*Conf) - savedir := "" - - for path := range toanalyse { - if savedir == "" { - savedir = filepath.Dir(path) - } - logger.Println("Calculating confidence for", path) - avg, err := hocr.GetAvgConf(path) - if err != nil { - close(up) - errc <- errors.New(fmt.Sprintf("Error retreiving confidence for %s: %s", path, err)) - return - } - base := filepath.Base(path) - codestart := strings.Index(base, "_bin") - name := base[0:codestart] - var c Conf - c.path = path - c.code = base[codestart:] - c.conf = avg - confs[name] = append(confs[name], &c) - - } - - fn := filepath.Join(savedir, "conf") - logger.Println("Saving confidences in file", fn) - f, err := os.Create(fn) - if err != nil { - close(up) - errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err)) - return - } - defer f.Close() - - logger.Println("Finding best confidence for each page, and saving all confidences") - for base, conf := range confs { - var best float64 - for _, c := range conf { - if c.conf > best { - best = c.conf - bestconfs[base] = c - } - _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.path, c.conf) - if err != nil { - close(up) - errc <- errors.New(fmt.Sprintf("Error writing confidences file: %s", err)) - return - } - } - } - up <- fn - - logger.Println("Creating best file listing the best file for each page") - fn = filepath.Join(savedir, "best") - f, err = os.Create(fn) - if err != nil { - close(up) - errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err)) - return - } - defer f.Close() - for _, conf := range bestconfs { - _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.path)) - } - up <- fn - - logger.Println("Creating graph") - fn = filepath.Join(savedir, "graph.png") - f, err = os.Create(fn) - if err != nil { - close(up) - errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err)) - return - } - defer f.Close() - err = graph(bestconfs, filepath.Base(savedir), f) - if err != nil { - close(up) - errc <- errors.New(fmt.Sprintf("Error rendering graph: %s", err)) - return - } - up <- fn - - // TODO: generate a general report.txt with statistics etc for the book, send to up - - close(up) -} - -func processBook(msg Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { - bookname := msg.Body - - t := time.NewTicker(HeartbeatTime * time.Second) - go conn.QueueHeartbeat(t, msg.Handle, fromQueue) - - d := filepath.Join(os.TempDir(), bookname) - err := os.MkdirAll(d, 0755) - if err != nil { - t.Stop() - return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err)) - } - - dl := make(chan string) - processc := make(chan string) - upc := make(chan string) - done := make(chan bool) - errc := make(chan error) - - // these functions will do their jobs when their channels have data - go download(dl, processc, conn, d, errc) - go process(processc, upc, errc, conn.Logger()) - go up(upc, done, conn, bookname, errc) - - conn.Logger().Println("Getting list of objects to download") - objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err)) - } - var todl []string - for _, n := range objs { - if !match.MatchString(n) { - conn.Logger().Println("Skipping item that doesn't match target", n) - continue - } - todl = append(todl, n) - } - for _, a := range todl { - dl <- a - } - close(dl) - - // wait for either the done or errc channel to be sent to - select { - case err = <-errc: - t.Stop() - _ = os.RemoveAll(d) - return err - case <-done: - } - - if toQueue != "" { - conn.Logger().Println("Sending", bookname, "to queue") - err = conn.AddToQueue(toQueue, bookname) - if err != nil { - t.Stop() - _ = os.RemoveAll(d) - return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err)) - } - } - - t.Stop() - - conn.Logger().Println("Deleting original message from queue") - err = conn.DelFromQueue(fromQueue, msg.Handle) - if err != nil { - _ = os.RemoveAll(d) - return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err)) - } - - err = os.RemoveAll(d) - if err != nil { - return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err)) - } - - return nil -} - -func main() { - verbose := flag.Bool("v", false, "verbose") - training := flag.String("t", "rescribealphav5", "tesseract training file to use") - flag.Usage = func() { - fmt.Fprintf(flag.CommandLine.Output(), usage) - flag.PrintDefaults() - } - flag.Parse() - - var verboselog *log.Logger - if *verbose { - verboselog = log.New(os.Stdout, "", log.LstdFlags) - } else { - var n NullWriter - verboselog = log.New(n, "", log.LstdFlags) - } - - origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) // TODO: match alternative file naming - preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) - ocredPattern := regexp.MustCompile(`.hocr$`) - - var conn Pipeliner - conn = &awsConn{region: "eu-west-2", logger: verboselog} - - verboselog.Println("Setting up AWS session") - err := conn.Init() - if err != nil { - log.Fatalln("Error setting up cloud connection:", err) - } - verboselog.Println("Finished setting up AWS session") - - var checkPreQueue <-chan time.Time - var checkOCRQueue <-chan time.Time - var checkAnalyseQueue <-chan time.Time - checkPreQueue = time.After(0) - checkOCRQueue = time.After(0) - checkAnalyseQueue = time.After(0) - - for { - select { - case <-checkPreQueue: - msg, err := conn.CheckQueue(conn.PreQueueId()) - checkPreQueue = time.After(PauseBetweenChecks) - if err != nil { - log.Println("Error checking preprocess queue", err) - continue - } - if msg.Handle == "" { - verboselog.Println("No message received on preprocess queue, sleeping") - continue - } - err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId()) - if err != nil { - log.Println("Error during preprocess", err) - } - case <-checkOCRQueue: - msg, err := conn.CheckQueue(conn.OCRQueueId()) - checkOCRQueue = time.After(PauseBetweenChecks) - if err != nil { - log.Println("Error checking OCR queue", err) - continue - } - if msg.Handle == "" { - verboselog.Println("No message received on OCR queue, sleeping") - continue - } - err = processBook(msg, conn, ocr(*training), preprocessedPattern, conn.OCRQueueId(), conn.AnalyseQueueId()) - if err != nil { - log.Println("Error during OCR process", err) - } - case <-checkAnalyseQueue: - msg, err := conn.CheckQueue(conn.AnalyseQueueId()) - checkAnalyseQueue = time.After(PauseBetweenChecks) - if err != nil { - log.Println("Error checking analyse queue", err) - continue - } - if msg.Handle == "" { - verboselog.Println("No message received on analyse queue, sleeping") - continue - } - err = processBook(msg, conn, analyse, ocredPattern, conn.AnalyseQueueId(), "") - if err != nil { - log.Println("Error during analysis", err) - } - } - } -} -- cgit v1.2.1-24-ge1ad