summaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-10-08 12:52:33 +0100
committerNick White <git@njw.name>2019-10-08 12:52:33 +0100
commit7482157a03ed3e9d7f45e54a126b391001f34948 (patch)
tree52f87b9ca159fe4c04a0349de95ea9de82692b3c /cmd
parentd43c11bf653bfe3c1ad1ed277f1ec08bf155cf98 (diff)
Separate out bookpipeline from catch-all go.git repo, and rename to rescribe.xyz/bookpipeline
The dependencies from the go.git repo will follow in due course.
Diffstat (limited to 'cmd')
-rw-r--r--cmd/bookpipeline/main.go486
-rw-r--r--cmd/booktopipeline/main.go140
-rw-r--r--cmd/confgraph/main.go71
-rw-r--r--cmd/getpipelinebook/main.go122
-rw-r--r--cmd/lspipeline/main.go250
-rw-r--r--cmd/mkpipeline/main.go79
6 files changed, 1148 insertions, 0 deletions
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go
new file mode 100644
index 0000000..f445547
--- /dev/null
+++ b/cmd/bookpipeline/main.go
@@ -0,0 +1,486 @@
+package main
+
+import (
+ "errors"
+ "flag"
+ "fmt"
+ "log"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "regexp"
+ "strings"
+ "time"
+
+ "rescribe.xyz/bookpipeline"
+ "rescribe.xyz/go.git/lib/hocr"
+ "rescribe.xyz/go.git/preproc"
+)
+
+const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-na] [-t training]
+
+Watches the preprocess, ocr and analyse queues for book names. When
+one is found this general process is followed:
+
+- The book name is hidden from the queue, and a 'heartbeat' is
+ started which keeps it hidden (this will time out after 2 minutes
+ if the program is terminated)
+- The necessary files from bookname/ are downloaded
+- The files are processed
+- The resulting files are uploaded to bookname/
+- The heartbeat is stopped
+- The book name is removed from the queue it was taken from, and
+ added to the next queue for future processing
+
+`
+
+const PauseBetweenChecks = 3 * time.Minute
+const HeartbeatTime = 60
+
+// null writer to enable non-verbose logging to be discarded
+type NullWriter bool
+
+func (w NullWriter) Write(p []byte) (n int, err error) {
+ return len(p), nil
+}
+
+type Clouder interface {
+ Init() error
+ ListObjects(bucket string, prefix string) ([]string, error)
+ Download(bucket string, key string, fn string) error
+ Upload(bucket string, key string, path string) error
+ CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
+ AddToQueue(url string, msg string) error
+ DelFromQueue(url string, handle string) error
+ QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)
+}
+
+type Pipeliner interface {
+ Clouder
+ PreQueueId() string
+ WipeQueueId() string
+ OCRQueueId() string
+ AnalyseQueueId() string
+ WIPStorageId() string
+ GetLogger() *log.Logger
+}
+
+func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) {
+ for key := range dl {
+ fn := filepath.Join(dir, filepath.Base(key))
+ logger.Println("Downloading", key)
+ err := conn.Download(conn.WIPStorageId(), key, fn)
+ if err != nil {
+ for range dl {
+ } // consume the rest of the receiving channel so it isn't blocked
+ close(process)
+ errc <- err
+ return
+ }
+ process <- fn
+ }
+ close(process)
+}
+
+func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) {
+ for path := range c {
+ name := filepath.Base(path)
+ key := filepath.Join(bookname, name)
+ logger.Println("Uploading", key)
+ err := conn.Upload(conn.WIPStorageId(), key, path)
+ if err != nil {
+ for range c {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ }
+
+ done <- true
+}
+
+func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) {
+ for path := range pre {
+ logger.Println("Preprocessing", path)
+ done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30)
+ if err != nil {
+ for range pre {
+ } // consume the rest of the receiving channel so it isn't blocked
+ close(up)
+ errc <- err
+ return
+ }
+ for _, p := range done {
+ up <- p
+ }
+ }
+ close(up)
+}
+
+func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) {
+ for path := range towipe {
+ logger.Println("Wiping", path)
+ s := strings.Split(path, ".")
+ base := strings.Join(s[:len(s)-1], "")
+ outpath := base + "_bin0.0.png"
+ err := preproc.WipeFile(path, outpath, 5, 0.03, 30)
+ if err != nil {
+ for range towipe {
+ } // consume the rest of the receiving channel so it isn't blocked
+ close(up)
+ errc <- err
+ return
+ }
+ up <- outpath
+ }
+ close(up)
+}
+
+func ocr(training string) func(chan string, chan string, chan error, *log.Logger) {
+ return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) {
+ for path := range toocr {
+ logger.Println("OCRing", path)
+ name := strings.Replace(path, ".png", "", 1)
+ cmd := exec.Command("tesseract", "-l", training, path, name, "hocr")
+ err := cmd.Run()
+ if err != nil {
+ for range toocr {
+ } // consume the rest of the receiving channel so it isn't blocked
+ close(up)
+ errc <- errors.New(fmt.Sprintf("Error ocring %s: %s", path, err))
+ return
+ }
+ up <- name + ".hocr"
+ }
+ close(up)
+ }
+}
+
+func analyse(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) {
+ confs := make(map[string][]*bookpipeline.Conf)
+ bestconfs := make(map[string]*bookpipeline.Conf)
+ savedir := ""
+
+ for path := range toanalyse {
+ if savedir == "" {
+ savedir = filepath.Dir(path)
+ }
+ logger.Println("Calculating confidence for", path)
+ avg, err := hocr.GetAvgConf(path)
+ if err != nil && err.Error() == "No words found" {
+ continue
+ }
+ if err != nil {
+ for range toanalyse {
+ } // consume the rest of the receiving channel so it isn't blocked
+ close(up)
+ errc <- errors.New(fmt.Sprintf("Error retreiving confidence for %s: %s", path, err))
+ return
+ }
+ base := filepath.Base(path)
+ codestart := strings.Index(base, "_bin")
+ name := base[0:codestart]
+ var c bookpipeline.Conf
+ c.Path = path
+ c.Code = base[codestart:]
+ c.Conf = avg
+ confs[name] = append(confs[name], &c)
+
+ }
+
+ fn := filepath.Join(savedir, "conf")
+ logger.Println("Saving confidences in file", fn)
+ f, err := os.Create(fn)
+ if err != nil {
+ close(up)
+ errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err))
+ return
+ }
+ defer f.Close()
+
+ logger.Println("Finding best confidence for each page, and saving all confidences")
+ for base, conf := range confs {
+ var best float64
+ for _, c := range conf {
+ if c.Conf > best {
+ best = c.Conf
+ bestconfs[base] = c
+ }
+ _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf)
+ if err != nil {
+ close(up)
+ errc <- errors.New(fmt.Sprintf("Error writing confidences file: %s", err))
+ return
+ }
+ }
+ }
+ up <- fn
+
+ logger.Println("Creating best file listing the best file for each page")
+ fn = filepath.Join(savedir, "best")
+ f, err = os.Create(fn)
+ if err != nil {
+ close(up)
+ errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err))
+ return
+ }
+ defer f.Close()
+ for _, conf := range bestconfs {
+ _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path))
+ }
+ up <- fn
+
+ logger.Println("Creating graph")
+ fn = filepath.Join(savedir, "graph.png")
+ f, err = os.Create(fn)
+ if err != nil {
+ close(up)
+ errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err))
+ return
+ }
+ defer f.Close()
+ err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f)
+ if err != nil {
+ close(up)
+ errc <- errors.New(fmt.Sprintf("Error rendering graph: %s", err))
+ return
+ }
+ up <- fn
+
+ close(up)
+}
+
+func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) {
+ currentmsg := msg
+ for range t.C {
+ m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatTime*2)
+ if err != nil {
+ errc <- err
+ t.Stop()
+ return
+ }
+ if m.Id != "" {
+ conn.GetLogger().Println("Replaced message handle as visibilitytimeout limit was reached")
+ currentmsg = m
+ // TODO: maybe handle communicating new msg more gracefully than this
+ for range msgc {
+ } // throw away any old msgc
+ msgc <- m
+ }
+ }
+}
+
+func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
+ dl := make(chan string)
+ msgc := make(chan bookpipeline.Qmsg)
+ processc := make(chan string)
+ upc := make(chan string)
+ done := make(chan bool)
+ errc := make(chan error)
+
+ bookname := msg.Body
+
+ d := filepath.Join(os.TempDir(), bookname)
+ err := os.MkdirAll(d, 0755)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err))
+ }
+
+ t := time.NewTicker(HeartbeatTime * time.Second)
+ go heartbeat(conn, t, msg, fromQueue, msgc, errc)
+
+ // these functions will do their jobs when their channels have data
+ go download(dl, processc, conn, d, errc, conn.GetLogger())
+ go process(processc, upc, errc, conn.GetLogger())
+ go up(upc, done, conn, bookname, errc, conn.GetLogger())
+
+ conn.GetLogger().Println("Getting list of objects to download")
+ objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
+ if err != nil {
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err))
+ }
+ var todl []string
+ for _, n := range objs {
+ if !match.MatchString(n) {
+ conn.GetLogger().Println("Skipping item that doesn't match target", n)
+ continue
+ }
+ todl = append(todl, n)
+ }
+ for _, a := range todl {
+ dl <- a
+ }
+ close(dl)
+
+ // wait for either the done or errc channel to be sent to
+ select {
+ case err = <-errc:
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return err
+ case <-done:
+ }
+
+ if toQueue != "" {
+ conn.GetLogger().Println("Sending", bookname, "to queue", toQueue)
+ err = conn.AddToQueue(toQueue, bookname)
+ if err != nil {
+ t.Stop()
+ _ = os.RemoveAll(d)
+ return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err))
+ }
+ }
+
+ t.Stop()
+
+ // check whether we're using a newer msg handle
+ select {
+ case m, ok := <-msgc:
+ if ok {
+ msg = m
+ conn.GetLogger().Println("Using new message handle to delete message from old queue")
+ }
+ default:
+ conn.GetLogger().Println("Using original message handle to delete message from old queue")
+ }
+
+ conn.GetLogger().Println("Deleting original message from queue", fromQueue)
+ err = conn.DelFromQueue(fromQueue, msg.Handle)
+ if err != nil {
+ _ = os.RemoveAll(d)
+ return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err))
+ }
+
+ err = os.RemoveAll(d)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err))
+ }
+
+ return nil
+}
+
+func main() {
+ verbose := flag.Bool("v", false, "verbose")
+ training := flag.String("t", "rescribealphav5", "tesseract training file to use")
+ nopreproc := flag.Bool("np", false, "disable preprocessing")
+ nowipe := flag.Bool("nw", false, "disable wipeonly")
+ noocr := flag.Bool("no", false, "disable ocr")
+ noanalyse := flag.Bool("na", false, "disable analysis")
+
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(), usage)
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+
+ var verboselog *log.Logger
+ if *verbose {
+ verboselog = log.New(os.Stdout, "", 0)
+ } else {
+ var n NullWriter
+ verboselog = log.New(n, "", 0)
+ }
+
+ origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) // TODO: match alternative file naming
+ wipePattern := regexp.MustCompile(`[0-9]{4}.png$`)
+ preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`)
+ ocredPattern := regexp.MustCompile(`.hocr$`)
+
+ var conn Pipeliner
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
+
+ verboselog.Println("Setting up AWS session")
+ err := conn.Init()
+ if err != nil {
+ log.Fatalln("Error setting up cloud connection:", err)
+ }
+ verboselog.Println("Finished setting up AWS session")
+
+ var checkPreQueue <-chan time.Time
+ var checkWipeQueue <-chan time.Time
+ var checkOCRQueue <-chan time.Time
+ var checkAnalyseQueue <-chan time.Time
+ if !*nopreproc {
+ checkPreQueue = time.After(0)
+ }
+ if !*nowipe {
+ checkWipeQueue = time.After(0)
+ }
+ if !*noocr {
+ checkOCRQueue = time.After(0)
+ }
+ if !*noanalyse {
+ checkAnalyseQueue = time.After(0)
+ }
+
+ for {
+ select {
+ case <-checkPreQueue:
+ msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatTime*2)
+ checkPreQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ log.Println("Error checking preprocess queue", err)
+ continue
+ }
+ if msg.Handle == "" {
+ verboselog.Println("No message received on preprocess queue, sleeping")
+ continue
+ }
+ verboselog.Println("Message received on preprocess queue, processing", msg.Body)
+ err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId())
+ if err != nil {
+ log.Println("Error during preprocess", err)
+ }
+ case <-checkWipeQueue:
+ msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatTime*2)
+ checkWipeQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ log.Println("Error checking wipeonly queue", err)
+ continue
+ }
+ if msg.Handle == "" {
+ verboselog.Println("No message received on wipeonly queue, sleeping")
+ continue
+ }
+ verboselog.Println("Message received on wipeonly queue, processing", msg.Body)
+ err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRQueueId())
+ if err != nil {
+ log.Println("Error during wipe", err)
+ }
+ case <-checkOCRQueue:
+ msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime*2)
+ checkOCRQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ log.Println("Error checking OCR queue", err)
+ continue
+ }
+ if msg.Handle == "" {
+ verboselog.Println("No message received on OCR queue, sleeping")
+ continue
+ }
+ verboselog.Println("Message received on OCR queue, processing", msg.Body)
+ err = processBook(msg, conn, ocr(*training), preprocessedPattern, conn.OCRQueueId(), conn.AnalyseQueueId())
+ if err != nil {
+ log.Println("Error during OCR process", err)
+ }
+ case <-checkAnalyseQueue:
+ msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatTime*2)
+ checkAnalyseQueue = time.After(PauseBetweenChecks)
+ if err != nil {
+ log.Println("Error checking analyse queue", err)
+ continue
+ }
+ if msg.Handle == "" {
+ verboselog.Println("No message received on analyse queue, sleeping")
+ continue
+ }
+ verboselog.Println("Message received on analyse queue, processing", msg.Body)
+ err = processBook(msg, conn, analyse, ocredPattern, conn.AnalyseQueueId(), "")
+ if err != nil {
+ log.Println("Error during analysis", err)
+ }
+ }
+ }
+}
diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go
new file mode 100644
index 0000000..6d9f146
--- /dev/null
+++ b/cmd/booktopipeline/main.go
@@ -0,0 +1,140 @@
+package main
+
+// TODO: use bookpipeline package to do aws stuff
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "os"
+ "path/filepath"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/s3/s3manager"
+ "github.com/aws/aws-sdk-go/service/sqs"
+)
+
+const usage = `Usage: booktopipeline [-prebinarised] [-v] bookdir [bookname]
+
+Uploads the book in bookdir to the S3 'inprogress' bucket and adds it
+to the 'preprocess' SQS queue, or the 'wipeonly' queue if the
+prebinarised flag is set.
+
+If bookname is omitted the last part of the bookdir is used.
+`
+
+// null writer to enable non-verbose logging to be discarded
+type NullWriter bool
+
+func (w NullWriter) Write(p []byte) (n int, err error) {
+ return len(p), nil
+}
+
+var verboselog *log.Logger
+
+type fileWalk chan string
+
+func (f fileWalk) Walk(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if !info.IsDir() {
+ f <- path
+ }
+ return nil
+}
+
+func main() {
+ verbose := flag.Bool("v", false, "Verbose")
+ wipeonly := flag.Bool("prebinarised", false, "Prebinarised: only preprocessing will be to wipe")
+
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(), usage)
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+ if flag.NArg() < 1 {
+ flag.Usage()
+ return
+ }
+
+ bookdir := flag.Arg(0)
+ var bookname string
+ if flag.NArg() > 2 {
+ bookname = flag.Arg(1)
+ } else {
+ bookname = filepath.Base(bookdir)
+ }
+
+ if *verbose {
+ verboselog = log.New(os.Stdout, "", log.LstdFlags)
+ } else {
+ var n NullWriter
+ verboselog = log.New(n, "", log.LstdFlags)
+ }
+
+ verboselog.Println("Setting up AWS session")
+ sess, err := session.NewSession(&aws.Config{
+ Region: aws.String("eu-west-2"),
+ })
+ if err != nil {
+ log.Fatalln("Error: failed to set up aws session:", err)
+ }
+ sqssvc := sqs.New(sess)
+ uploader := s3manager.NewUploader(sess)
+
+ var qname string
+ if *wipeonly {
+ qname = "rescribewipeonly"
+ } else {
+ qname = "rescribepreprocess"
+ }
+ verboselog.Println("Getting Queue URL for", qname)
+ result, err := sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String(qname),
+ })
+ if err != nil {
+ log.Fatalln("Error getting queue URL for", qname, ":", err)
+ }
+ qurl := *result.QueueUrl
+
+ // concurrent walking upload based on example at
+ // https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sdk-utilities.html
+ verboselog.Println("Walking", bookdir)
+ walker := make(fileWalk)
+ go func() {
+ err = filepath.Walk(bookdir, walker.Walk)
+ if err != nil {
+ log.Fatalln("Filesystem walk failed:", err)
+ }
+ close(walker)
+ }()
+
+ for path := range walker {
+ verboselog.Println("Uploading", path)
+ name := filepath.Base(path)
+ file, err := os.Open(path)
+ if err != nil {
+ log.Fatalln("Open file", path, "failed:", err)
+ }
+ defer file.Close()
+ _, err = uploader.Upload(&s3manager.UploadInput{
+ Bucket: aws.String("rescribeinprogress"),
+ Key: aws.String(filepath.Join(bookname, name)),
+ Body: file,
+ })
+ if err != nil {
+ log.Fatalln("Failed to upload", path, err)
+ }
+ }
+
+ verboselog.Println("Sending message", bookname, "to queue", qurl)
+ _, err = sqssvc.SendMessage(&sqs.SendMessageInput{
+ MessageBody: aws.String(bookname),
+ QueueUrl: &qurl,
+ })
+ if err != nil {
+ log.Fatalln("Error adding book to queue:", err)
+ }
+}
diff --git a/cmd/confgraph/main.go b/cmd/confgraph/main.go
new file mode 100644
index 0000000..474c0a2
--- /dev/null
+++ b/cmd/confgraph/main.go
@@ -0,0 +1,71 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "os"
+ "path/filepath"
+ "strings"
+
+ "rescribe.xyz/bookpipeline"
+ "rescribe.xyz/go.git/lib/hocr"
+)
+
+func walker(confs *[]*bookpipeline.Conf) filepath.WalkFunc {
+ return func(path string, info os.FileInfo, err error) error {
+ if info.IsDir() {
+ return nil
+ }
+ if !strings.HasSuffix(path, ".hocr") {
+ return nil
+ }
+ avg, err := hocr.GetAvgConf(path)
+ if err != nil {
+ return err
+ }
+ c := bookpipeline.Conf{
+ Conf: avg,
+ Path: path,
+ }
+ *confs = append(*confs, &c)
+ return nil
+ }
+}
+
+func main() {
+ flag.Usage = func() {
+ fmt.Fprintln(flag.CommandLine.Output(), "Usage: bookpipeline hocrdir graph.png")
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+
+ if flag.NArg() != 2 {
+ flag.Usage()
+ return
+ }
+
+ var confs []*bookpipeline.Conf
+ err := filepath.Walk(flag.Arg(0), walker(&confs))
+ if err != nil {
+ log.Fatalln("Failed to walk", flag.Arg(0), err)
+ }
+
+ // Structure to fit what bookpipeline.Graph needs
+ // TODO: probably reorganise bookpipeline to just need []*Conf
+ cconfs := make(map[string]*bookpipeline.Conf)
+ for _, c := range confs {
+ cconfs[c.Path] = c
+ }
+
+ fn := flag.Arg(1)
+ f, err := os.Create(fn)
+ if err != nil {
+ log.Fatalln("Error creating file", fn, err)
+ }
+ defer f.Close()
+ err = bookpipeline.Graph(cconfs, filepath.Base(flag.Arg(0)), f)
+ if err != nil {
+ log.Fatalln("Error creating graph", err)
+ }
+}
diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go
new file mode 100644
index 0000000..9e900bf
--- /dev/null
+++ b/cmd/getpipelinebook/main.go
@@ -0,0 +1,122 @@
+package main
+
+import (
+ "bufio"
+ "flag"
+ "fmt"
+ "log"
+ "os"
+ "path/filepath"
+
+ "rescribe.xyz/bookpipeline"
+)
+
+const usage = "Usage: getpipelinebook [-a] [-v] bookname\n\nDownloads the pipeline results for a book.\n"
+
+// null writer to enable non-verbose logging to be discarded
+type NullWriter bool
+
+func (w NullWriter) Write(p []byte) (n int, err error) {
+ return len(p), nil
+}
+
+type Pipeliner interface {
+ Init() error
+ ListObjects(bucket string, prefix string) ([]string, error)
+ Download(bucket string, key string, fn string) error
+ Upload(bucket string, key string, path string) error
+ CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
+ AddToQueue(url string, msg string) error
+ DelFromQueue(url string, handle string) error
+ WIPStorageId() string
+}
+
+func main() {
+ all := flag.Bool("a", false, "Get all files for book, not just hOCR and analysis files")
+ verbose := flag.Bool("v", false, "Verbose")
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(), usage)
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+
+ if flag.NArg() < 1 {
+ flag.Usage()
+ return
+ }
+
+ var verboselog *log.Logger
+ if *verbose {
+ verboselog = log.New(os.Stdout, "", log.LstdFlags)
+ } else {
+ var n NullWriter
+ verboselog = log.New(n, "", log.LstdFlags)
+ }
+
+ var conn Pipeliner
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
+
+ verboselog.Println("Setting up AWS session")
+ err := conn.Init()
+ if err != nil {
+ log.Fatalln("Error setting up cloud connection:", err)
+ }
+ verboselog.Println("Finished setting up AWS session")
+
+ bookname := flag.Arg(0)
+
+ err = os.MkdirAll(bookname, 0755)
+ if err != nil {
+ log.Fatalln("Failed to create directory", bookname, err)
+ }
+
+ if *all {
+ verboselog.Println("Downloading all files for", bookname)
+ objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
+ if err != nil {
+ log.Fatalln("Failed to get list of files for book", bookname, err)
+ }
+ for _, i := range objs {
+ verboselog.Println("Downloading", i)
+ err = conn.Download(conn.WIPStorageId(), i, i)
+ if err != nil {
+ log.Fatalln("Failed to download file", i, err)
+ }
+ }
+ return
+ }
+
+ verboselog.Println("Downloading best file")
+ fn := filepath.Join(bookname, "best")
+ err = conn.Download(conn.WIPStorageId(), fn, fn)
+ if err != nil {
+ log.Fatalln("Failed to download 'best' file", err)
+ }
+ f, err := os.Open(fn)
+ if err != nil {
+ log.Fatalln("Failed to open best file", err)
+ }
+ defer f.Close()
+
+ verboselog.Println("Downloading HOCR files")
+ s := bufio.NewScanner(f)
+ for s.Scan() {
+ fn = filepath.Join(bookname, s.Text())
+ verboselog.Println("Downloading file", fn)
+ err = conn.Download(conn.WIPStorageId(), fn, fn)
+ if err != nil {
+ log.Fatalln("Failed to download file", fn, err)
+ }
+ }
+
+ analyses := []string{"conf", "graph.png"}
+ verboselog.Println("Downloading analysis files")
+ for _, a := range analyses {
+ fn = filepath.Join(bookname, a)
+ verboselog.Println("Downloading file", fn)
+ err = conn.Download(conn.WIPStorageId(), fn, fn)
+ if err != nil {
+ log.Fatalln("Failed to download file", fn, err)
+ }
+ }
+}
diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go
new file mode 100644
index 0000000..0e1ebb0
--- /dev/null
+++ b/cmd/lspipeline/main.go
@@ -0,0 +1,250 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "os/exec"
+ "strings"
+
+ "rescribe.xyz/bookpipeline"
+)
+
+const usage = `Usage: lspipeline [-i key] [-n num]
+
+Lists useful things related to the pipeline.
+
+- Instances running
+- Messages in each queue
+- Books not completed
+- Books done
+- Last n lines of bookpipeline logs from each running instance
+`
+
+type LsPipeliner interface {
+ Init() error
+ PreQueueId() string
+ WipeQueueId() string
+ OCRQueueId() string
+ AnalyseQueueId() string
+ GetQueueDetails(url string) (string, string, error)
+ GetInstanceDetails() ([]bookpipeline.InstanceDetails, error)
+ ListObjects(bucket string, prefix string) ([]string, error)
+ WIPStorageId() string
+}
+
+// NullWriter is used so non-verbose logging may be discarded
+type NullWriter bool
+
+func (w NullWriter) Write(p []byte) (n int, err error) {
+ return len(p), nil
+}
+
+type queueDetails struct {
+ name, numAvailable, numInProgress string
+}
+
+func getInstances(conn LsPipeliner, detailsc chan bookpipeline.InstanceDetails) {
+ details, err := conn.GetInstanceDetails()
+ if err != nil {
+ log.Println("Error getting instance details:", err)
+ }
+ for _, d := range details {
+ detailsc <- d
+ }
+ close(detailsc)
+}
+
+func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) {
+ queues := []struct{ name, id string }{
+ {"preprocess", conn.PreQueueId()},
+ {"wipeonly", conn.WipeQueueId()},
+ {"ocr", conn.OCRQueueId()},
+ {"analyse", conn.AnalyseQueueId()},
+ }
+ for _, q := range queues {
+ avail, inprog, err := conn.GetQueueDetails(q.id)
+ if err != nil {
+ log.Println("Error getting queue details:", err)
+ }
+ var qd queueDetails
+ qd.name = q.name
+ qd.numAvailable = avail
+ qd.numInProgress = inprog
+ qdetails <- qd
+ }
+ close(qdetails)
+}
+
+// getBookStatus returns a list of in progress and done books.
+// It determines this by listing all objects, and splitting the
+// prefixes into two lists, those which have a 'graph.png' file,
+// which are classed as done, and those which are not.
+func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) {
+ allfiles, err := conn.ListObjects(conn.WIPStorageId(), "")
+ if err != nil {
+ log.Println("Error getting list of objects:", err)
+ return inprogress, done, err
+ }
+ for _, f := range allfiles {
+ parts := strings.Split(f, "/")
+ if parts[1] != "graph.png" {
+ continue
+ }
+ prefix := parts[0]
+ found := false
+ for _, i := range done {
+ if i == prefix {
+ found = true
+ continue
+ }
+ }
+ if !found {
+ done = append(done, prefix)
+ }
+ }
+
+ for _, f := range allfiles {
+ parts := strings.Split(f, "/")
+ prefix := parts[0]
+ found := false
+ for _, i := range done {
+ if i == prefix {
+ found = true
+ continue
+ }
+ }
+ for _, i := range inprogress {
+ if i == prefix {
+ found = true
+ continue
+ }
+ }
+ if !found {
+ inprogress = append(inprogress, prefix)
+ }
+ }
+
+ return inprogress, done, err
+}
+
+func getBookStatusChan(conn LsPipeliner, inprogressc chan string, donec chan string) {
+ inprogress, done, err := getBookStatus(conn)
+ if err != nil {
+ log.Println("Error getting book status:", err)
+ close(inprogressc)
+ close(donec)
+ return
+ }
+ for _, i := range inprogress {
+ inprogressc <- i
+ }
+ close(inprogressc)
+ for _, i := range done {
+ donec <- i
+ }
+ close(donec)
+}
+
+func getRecentSSHLogs(ip string, id string, n int) (string, error) {
+ addr := fmt.Sprintf("%s@%s", "admin", ip)
+ logcmd := fmt.Sprintf("journalctl -n %d -u bookpipeline", n)
+ var cmd *exec.Cmd
+ if id == "" {
+ cmd = exec.Command("ssh", "-o", "StrictHostKeyChecking no", addr, logcmd)
+ } else {
+ cmd = exec.Command("ssh", "-o", "StrictHostKeyChecking no", "-i", id, addr, logcmd)
+ }
+ out, err := cmd.Output()
+ if err != nil {
+ return "", err
+ }
+ return string(out), nil
+}
+
+func getRecentSSHLogsChan(ips []string, id string, lognum int, logs chan string) {
+ for _, ip := range ips {
+ sshlog, err := getRecentSSHLogs(ip, id, lognum)
+ if err != nil {
+ log.Printf("Error getting SSH logs for %s: %s\n", ip, err)
+ continue
+ }
+ logs <- fmt.Sprintf("%s\n%s", ip, sshlog)
+ }
+ close(logs)
+}
+
+func main() {
+ keyfile := flag.String("i", "", "private key file for SSH")
+ lognum := flag.Int("n", 5, "number of lines to include in SSH logs")
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(), usage)
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+
+ var verboselog *log.Logger
+ var n NullWriter
+ verboselog = log.New(n, "", 0)
+
+ var conn LsPipeliner
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
+ err := conn.Init()
+ if err != nil {
+ log.Fatalln("Failed to set up cloud connection:", err)
+ }
+
+ instances := make(chan bookpipeline.InstanceDetails, 100)
+ queues := make(chan queueDetails)
+ inprogress := make(chan string, 100)
+ done := make(chan string, 100)
+ logs := make(chan string, 10)
+
+ go getInstances(conn, instances)
+ go getQueueDetails(conn, queues)
+ go getBookStatusChan(conn, inprogress, done)
+
+ var ips []string
+
+ fmt.Println("# Instances")
+ for i := range instances {
+ fmt.Printf("ID: %s, Type: %s, LaunchTime: %s, State: %s", i.Id, i.Type, i.LaunchTime, i.State)
+ if i.Name != "" {
+ fmt.Printf(", Name: %s", i.Name)
+ }
+ if i.Ip != "" {
+ fmt.Printf(", IP: %s", i.Ip)
+ if i.State == "running" && i.Name != "workhorse" {
+ ips = append(ips, i.Ip)
+ }
+ }
+ if i.Spot != "" {
+ fmt.Printf(", SpotRequest: %s", i.Spot)
+ }
+ fmt.Printf("\n")
+ }
+
+ go getRecentSSHLogsChan(ips, *keyfile, *lognum, logs)
+
+ fmt.Println("\n# Queues")
+ for i := range queues {
+ fmt.Printf("%s: %s available, %s in progress\n", i.name, i.numAvailable, i.numInProgress)
+ }
+
+ fmt.Println("\n# Books not completed")
+ for i := range inprogress {
+ fmt.Println(i)
+ }
+
+ fmt.Println("\n# Books done")
+ for i := range done {
+ fmt.Println(i)
+ }
+
+ if len(ips) > 0 {
+ fmt.Println("\n# Recent logs")
+ for i := range logs {
+ fmt.Printf("\n%s", i)
+ }
+ }
+}
diff --git a/cmd/mkpipeline/main.go b/cmd/mkpipeline/main.go
new file mode 100644
index 0000000..e37a56d
--- /dev/null
+++ b/cmd/mkpipeline/main.go
@@ -0,0 +1,79 @@
+package main
+
+// TODO: use the bookpipeline package for aws stuff
+// TODO: set up iam role and policy needed for ec2 instances to access this stuff;
+// see arn:aws:iam::557852942063:policy/pipelinestorageandqueue
+// and arn:aws:iam::557852942063:role/pipeliner
+// TODO: set up launch template for ec2 instances
+// NOTE: potentially use json templates to define things, ala aws cli
+
+import (
+ "log"
+ "os"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/awserr"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/aws/aws-sdk-go/service/sqs"
+)
+
+func main() {
+ if len(os.Args) != 1 {
+ log.Fatal("Usage: mkpipeline\n\nSets up necessary S3 buckets and SQS queues for our AWS pipeline\n")
+ }
+
+ sess, err := session.NewSession(&aws.Config{
+ Region: aws.String("eu-west-2"),
+ })
+ if err != nil {
+ log.Fatalf("Error: failed to set up aws session: %v\n", err)
+ }
+ s3svc := s3.New(sess)
+ sqssvc := sqs.New(sess)
+
+ prefix := "rescribe"
+ buckets := []string{"inprogress", "done"}
+ queues := []string{"preprocess", "wipeonly", "ocr", "analyse"}
+
+ for _, bucket := range buckets {
+ bname := prefix + bucket
+ log.Printf("Creating bucket %s\n", bname)
+ _, err = s3svc.CreateBucket(&s3.CreateBucketInput{
+ Bucket: aws.String(bname),
+ })
+ if err != nil {
+ aerr, ok := err.(awserr.Error)
+ if ok && (aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou) {
+ log.Printf("Bucket %s already exists\n", bname)
+ } else {
+ log.Fatalf("Error creating bucket %s: %v\n", bname, err)
+ }
+ }
+ }
+
+ for _, queue := range queues {
+ qname := prefix + queue
+ log.Printf("Creating queue %s\n", qname)
+ _, err = sqssvc.CreateQueue(&sqs.CreateQueueInput{
+ QueueName: aws.String(qname),
+ Attributes: map[string]*string{
+ "VisibilityTimeout": aws.String("120"), // 2 minutes
+ "MessageRetentionPeriod": aws.String("1209600"), // 14 days; max allowed by sqs
+ "ReceiveMessageWaitTimeSeconds": aws.String("20"),
+ },
+ })
+ if err != nil {
+ aerr, ok := err.(awserr.Error)
+ // Note the QueueAlreadyExists code is only emitted if an existing queue
+ // has different attributes than the one that was being created. SQS just
+ // quietly ignores the CreateQueue request if it is identical to an
+ // existing queue.
+ if ok && aerr.Code() == sqs.ErrCodeQueueNameExists {
+ log.Fatalf("Error: Queue %s already exists but has different attributes\n", qname)
+ } else {
+ log.Fatalf("Error creating queue %s: %v\n", qname, err)
+ }
+ }
+ }
+}