From 7482157a03ed3e9d7f45e54a126b391001f34948 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 8 Oct 2019 12:52:33 +0100 Subject: Separate out bookpipeline from catch-all go.git repo, and rename to rescribe.xyz/bookpipeline The dependencies from the go.git repo will follow in due course. --- cmd/bookpipeline/main.go | 486 ++++++++++++++++++++++++++++++++++++++++++++ cmd/booktopipeline/main.go | 140 +++++++++++++ cmd/confgraph/main.go | 71 +++++++ cmd/getpipelinebook/main.go | 122 +++++++++++ cmd/lspipeline/main.go | 250 +++++++++++++++++++++++ cmd/mkpipeline/main.go | 79 +++++++ 6 files changed, 1148 insertions(+) create mode 100644 cmd/bookpipeline/main.go create mode 100644 cmd/booktopipeline/main.go create mode 100644 cmd/confgraph/main.go create mode 100644 cmd/getpipelinebook/main.go create mode 100644 cmd/lspipeline/main.go create mode 100644 cmd/mkpipeline/main.go (limited to 'cmd') diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go new file mode 100644 index 0000000..f445547 --- /dev/null +++ b/cmd/bookpipeline/main.go @@ -0,0 +1,486 @@ +package main + +import ( + "errors" + "flag" + "fmt" + "log" + "os" + "os/exec" + "path/filepath" + "regexp" + "strings" + "time" + + "rescribe.xyz/bookpipeline" + "rescribe.xyz/go.git/lib/hocr" + "rescribe.xyz/go.git/preproc" +) + +const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-na] [-t training] + +Watches the preprocess, ocr and analyse queues for book names. When +one is found this general process is followed: + +- The book name is hidden from the queue, and a 'heartbeat' is + started which keeps it hidden (this will time out after 2 minutes + if the program is terminated) +- The necessary files from bookname/ are downloaded +- The files are processed +- The resulting files are uploaded to bookname/ +- The heartbeat is stopped +- The book name is removed from the queue it was taken from, and + added to the next queue for future processing + +` + +const PauseBetweenChecks = 3 * time.Minute +const HeartbeatTime = 60 + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} + +type Clouder interface { + Init() error + ListObjects(bucket string, prefix string) ([]string, error) + Download(bucket string, key string, fn string) error + Upload(bucket string, key string, path string) error + CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) + AddToQueue(url string, msg string) error + DelFromQueue(url string, handle string) error + QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) +} + +type Pipeliner interface { + Clouder + PreQueueId() string + WipeQueueId() string + OCRQueueId() string + AnalyseQueueId() string + WIPStorageId() string + GetLogger() *log.Logger +} + +func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { + for key := range dl { + fn := filepath.Join(dir, filepath.Base(key)) + logger.Println("Downloading", key) + err := conn.Download(conn.WIPStorageId(), key, fn) + if err != nil { + for range dl { + } // consume the rest of the receiving channel so it isn't blocked + close(process) + errc <- err + return + } + process <- fn + } + close(process) +} + +func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { + for path := range c { + name := filepath.Base(path) + key := filepath.Join(bookname, name) + logger.Println("Uploading", key) + err := conn.Upload(conn.WIPStorageId(), key, path) + if err != nil { + for range c { + } // consume the rest of the receiving channel so it isn't blocked + errc <- err + return + } + } + + done <- true +} + +func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range pre { + logger.Println("Preprocessing", path) + done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30) + if err != nil { + for range pre { + } // consume the rest of the receiving channel so it isn't blocked + close(up) + errc <- err + return + } + for _, p := range done { + up <- p + } + } + close(up) +} + +func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range towipe { + logger.Println("Wiping", path) + s := strings.Split(path, ".") + base := strings.Join(s[:len(s)-1], "") + outpath := base + "_bin0.0.png" + err := preproc.WipeFile(path, outpath, 5, 0.03, 30) + if err != nil { + for range towipe { + } // consume the rest of the receiving channel so it isn't blocked + close(up) + errc <- err + return + } + up <- outpath + } + close(up) +} + +func ocr(training string) func(chan string, chan string, chan error, *log.Logger) { + return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { + for path := range toocr { + logger.Println("OCRing", path) + name := strings.Replace(path, ".png", "", 1) + cmd := exec.Command("tesseract", "-l", training, path, name, "hocr") + err := cmd.Run() + if err != nil { + for range toocr { + } // consume the rest of the receiving channel so it isn't blocked + close(up) + errc <- errors.New(fmt.Sprintf("Error ocring %s: %s", path, err)) + return + } + up <- name + ".hocr" + } + close(up) + } +} + +func analyse(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { + confs := make(map[string][]*bookpipeline.Conf) + bestconfs := make(map[string]*bookpipeline.Conf) + savedir := "" + + for path := range toanalyse { + if savedir == "" { + savedir = filepath.Dir(path) + } + logger.Println("Calculating confidence for", path) + avg, err := hocr.GetAvgConf(path) + if err != nil && err.Error() == "No words found" { + continue + } + if err != nil { + for range toanalyse { + } // consume the rest of the receiving channel so it isn't blocked + close(up) + errc <- errors.New(fmt.Sprintf("Error retreiving confidence for %s: %s", path, err)) + return + } + base := filepath.Base(path) + codestart := strings.Index(base, "_bin") + name := base[0:codestart] + var c bookpipeline.Conf + c.Path = path + c.Code = base[codestart:] + c.Conf = avg + confs[name] = append(confs[name], &c) + + } + + fn := filepath.Join(savedir, "conf") + logger.Println("Saving confidences in file", fn) + f, err := os.Create(fn) + if err != nil { + close(up) + errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err)) + return + } + defer f.Close() + + logger.Println("Finding best confidence for each page, and saving all confidences") + for base, conf := range confs { + var best float64 + for _, c := range conf { + if c.Conf > best { + best = c.Conf + bestconfs[base] = c + } + _, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) + if err != nil { + close(up) + errc <- errors.New(fmt.Sprintf("Error writing confidences file: %s", err)) + return + } + } + } + up <- fn + + logger.Println("Creating best file listing the best file for each page") + fn = filepath.Join(savedir, "best") + f, err = os.Create(fn) + if err != nil { + close(up) + errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err)) + return + } + defer f.Close() + for _, conf := range bestconfs { + _, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) + } + up <- fn + + logger.Println("Creating graph") + fn = filepath.Join(savedir, "graph.png") + f, err = os.Create(fn) + if err != nil { + close(up) + errc <- errors.New(fmt.Sprintf("Error creating file %s: %s", fn, err)) + return + } + defer f.Close() + err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) + if err != nil { + close(up) + errc <- errors.New(fmt.Sprintf("Error rendering graph: %s", err)) + return + } + up <- fn + + close(up) +} + +func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { + currentmsg := msg + for range t.C { + m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatTime*2) + if err != nil { + errc <- err + t.Stop() + return + } + if m.Id != "" { + conn.GetLogger().Println("Replaced message handle as visibilitytimeout limit was reached") + currentmsg = m + // TODO: maybe handle communicating new msg more gracefully than this + for range msgc { + } // throw away any old msgc + msgc <- m + } + } +} + +func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { + dl := make(chan string) + msgc := make(chan bookpipeline.Qmsg) + processc := make(chan string) + upc := make(chan string) + done := make(chan bool) + errc := make(chan error) + + bookname := msg.Body + + d := filepath.Join(os.TempDir(), bookname) + err := os.MkdirAll(d, 0755) + if err != nil { + return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err)) + } + + t := time.NewTicker(HeartbeatTime * time.Second) + go heartbeat(conn, t, msg, fromQueue, msgc, errc) + + // these functions will do their jobs when their channels have data + go download(dl, processc, conn, d, errc, conn.GetLogger()) + go process(processc, upc, errc, conn.GetLogger()) + go up(upc, done, conn, bookname, errc, conn.GetLogger()) + + conn.GetLogger().Println("Getting list of objects to download") + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Failed to get list of files for book %s: %s", bookname, err)) + } + var todl []string + for _, n := range objs { + if !match.MatchString(n) { + conn.GetLogger().Println("Skipping item that doesn't match target", n) + continue + } + todl = append(todl, n) + } + for _, a := range todl { + dl <- a + } + close(dl) + + // wait for either the done or errc channel to be sent to + select { + case err = <-errc: + t.Stop() + _ = os.RemoveAll(d) + return err + case <-done: + } + + if toQueue != "" { + conn.GetLogger().Println("Sending", bookname, "to queue", toQueue) + err = conn.AddToQueue(toQueue, bookname) + if err != nil { + t.Stop() + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err)) + } + } + + t.Stop() + + // check whether we're using a newer msg handle + select { + case m, ok := <-msgc: + if ok { + msg = m + conn.GetLogger().Println("Using new message handle to delete message from old queue") + } + default: + conn.GetLogger().Println("Using original message handle to delete message from old queue") + } + + conn.GetLogger().Println("Deleting original message from queue", fromQueue) + err = conn.DelFromQueue(fromQueue, msg.Handle) + if err != nil { + _ = os.RemoveAll(d) + return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err)) + } + + err = os.RemoveAll(d) + if err != nil { + return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err)) + } + + return nil +} + +func main() { + verbose := flag.Bool("v", false, "verbose") + training := flag.String("t", "rescribealphav5", "tesseract training file to use") + nopreproc := flag.Bool("np", false, "disable preprocessing") + nowipe := flag.Bool("nw", false, "disable wipeonly") + noocr := flag.Bool("no", false, "disable ocr") + noanalyse := flag.Bool("na", false, "disable analysis") + + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + var verboselog *log.Logger + if *verbose { + verboselog = log.New(os.Stdout, "", 0) + } else { + var n NullWriter + verboselog = log.New(n, "", 0) + } + + origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) // TODO: match alternative file naming + wipePattern := regexp.MustCompile(`[0-9]{4}.png$`) + preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) + ocredPattern := regexp.MustCompile(`.hocr$`) + + var conn Pipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + + verboselog.Println("Setting up AWS session") + err := conn.Init() + if err != nil { + log.Fatalln("Error setting up cloud connection:", err) + } + verboselog.Println("Finished setting up AWS session") + + var checkPreQueue <-chan time.Time + var checkWipeQueue <-chan time.Time + var checkOCRQueue <-chan time.Time + var checkAnalyseQueue <-chan time.Time + if !*nopreproc { + checkPreQueue = time.After(0) + } + if !*nowipe { + checkWipeQueue = time.After(0) + } + if !*noocr { + checkOCRQueue = time.After(0) + } + if !*noanalyse { + checkAnalyseQueue = time.After(0) + } + + for { + select { + case <-checkPreQueue: + msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatTime*2) + checkPreQueue = time.After(PauseBetweenChecks) + if err != nil { + log.Println("Error checking preprocess queue", err) + continue + } + if msg.Handle == "" { + verboselog.Println("No message received on preprocess queue, sleeping") + continue + } + verboselog.Println("Message received on preprocess queue, processing", msg.Body) + err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId()) + if err != nil { + log.Println("Error during preprocess", err) + } + case <-checkWipeQueue: + msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatTime*2) + checkWipeQueue = time.After(PauseBetweenChecks) + if err != nil { + log.Println("Error checking wipeonly queue", err) + continue + } + if msg.Handle == "" { + verboselog.Println("No message received on wipeonly queue, sleeping") + continue + } + verboselog.Println("Message received on wipeonly queue, processing", msg.Body) + err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRQueueId()) + if err != nil { + log.Println("Error during wipe", err) + } + case <-checkOCRQueue: + msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime*2) + checkOCRQueue = time.After(PauseBetweenChecks) + if err != nil { + log.Println("Error checking OCR queue", err) + continue + } + if msg.Handle == "" { + verboselog.Println("No message received on OCR queue, sleeping") + continue + } + verboselog.Println("Message received on OCR queue, processing", msg.Body) + err = processBook(msg, conn, ocr(*training), preprocessedPattern, conn.OCRQueueId(), conn.AnalyseQueueId()) + if err != nil { + log.Println("Error during OCR process", err) + } + case <-checkAnalyseQueue: + msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatTime*2) + checkAnalyseQueue = time.After(PauseBetweenChecks) + if err != nil { + log.Println("Error checking analyse queue", err) + continue + } + if msg.Handle == "" { + verboselog.Println("No message received on analyse queue, sleeping") + continue + } + verboselog.Println("Message received on analyse queue, processing", msg.Body) + err = processBook(msg, conn, analyse, ocredPattern, conn.AnalyseQueueId(), "") + if err != nil { + log.Println("Error during analysis", err) + } + } + } +} diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go new file mode 100644 index 0000000..6d9f146 --- /dev/null +++ b/cmd/booktopipeline/main.go @@ -0,0 +1,140 @@ +package main + +// TODO: use bookpipeline package to do aws stuff + +import ( + "flag" + "fmt" + "log" + "os" + "path/filepath" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/aws/aws-sdk-go/service/sqs" +) + +const usage = `Usage: booktopipeline [-prebinarised] [-v] bookdir [bookname] + +Uploads the book in bookdir to the S3 'inprogress' bucket and adds it +to the 'preprocess' SQS queue, or the 'wipeonly' queue if the +prebinarised flag is set. + +If bookname is omitted the last part of the bookdir is used. +` + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} + +var verboselog *log.Logger + +type fileWalk chan string + +func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + f <- path + } + return nil +} + +func main() { + verbose := flag.Bool("v", false, "Verbose") + wipeonly := flag.Bool("prebinarised", false, "Prebinarised: only preprocessing will be to wipe") + + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + if flag.NArg() < 1 { + flag.Usage() + return + } + + bookdir := flag.Arg(0) + var bookname string + if flag.NArg() > 2 { + bookname = flag.Arg(1) + } else { + bookname = filepath.Base(bookdir) + } + + if *verbose { + verboselog = log.New(os.Stdout, "", log.LstdFlags) + } else { + var n NullWriter + verboselog = log.New(n, "", log.LstdFlags) + } + + verboselog.Println("Setting up AWS session") + sess, err := session.NewSession(&aws.Config{ + Region: aws.String("eu-west-2"), + }) + if err != nil { + log.Fatalln("Error: failed to set up aws session:", err) + } + sqssvc := sqs.New(sess) + uploader := s3manager.NewUploader(sess) + + var qname string + if *wipeonly { + qname = "rescribewipeonly" + } else { + qname = "rescribepreprocess" + } + verboselog.Println("Getting Queue URL for", qname) + result, err := sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(qname), + }) + if err != nil { + log.Fatalln("Error getting queue URL for", qname, ":", err) + } + qurl := *result.QueueUrl + + // concurrent walking upload based on example at + // https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sdk-utilities.html + verboselog.Println("Walking", bookdir) + walker := make(fileWalk) + go func() { + err = filepath.Walk(bookdir, walker.Walk) + if err != nil { + log.Fatalln("Filesystem walk failed:", err) + } + close(walker) + }() + + for path := range walker { + verboselog.Println("Uploading", path) + name := filepath.Base(path) + file, err := os.Open(path) + if err != nil { + log.Fatalln("Open file", path, "failed:", err) + } + defer file.Close() + _, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String("rescribeinprogress"), + Key: aws.String(filepath.Join(bookname, name)), + Body: file, + }) + if err != nil { + log.Fatalln("Failed to upload", path, err) + } + } + + verboselog.Println("Sending message", bookname, "to queue", qurl) + _, err = sqssvc.SendMessage(&sqs.SendMessageInput{ + MessageBody: aws.String(bookname), + QueueUrl: &qurl, + }) + if err != nil { + log.Fatalln("Error adding book to queue:", err) + } +} diff --git a/cmd/confgraph/main.go b/cmd/confgraph/main.go new file mode 100644 index 0000000..474c0a2 --- /dev/null +++ b/cmd/confgraph/main.go @@ -0,0 +1,71 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "path/filepath" + "strings" + + "rescribe.xyz/bookpipeline" + "rescribe.xyz/go.git/lib/hocr" +) + +func walker(confs *[]*bookpipeline.Conf) filepath.WalkFunc { + return func(path string, info os.FileInfo, err error) error { + if info.IsDir() { + return nil + } + if !strings.HasSuffix(path, ".hocr") { + return nil + } + avg, err := hocr.GetAvgConf(path) + if err != nil { + return err + } + c := bookpipeline.Conf{ + Conf: avg, + Path: path, + } + *confs = append(*confs, &c) + return nil + } +} + +func main() { + flag.Usage = func() { + fmt.Fprintln(flag.CommandLine.Output(), "Usage: bookpipeline hocrdir graph.png") + flag.PrintDefaults() + } + flag.Parse() + + if flag.NArg() != 2 { + flag.Usage() + return + } + + var confs []*bookpipeline.Conf + err := filepath.Walk(flag.Arg(0), walker(&confs)) + if err != nil { + log.Fatalln("Failed to walk", flag.Arg(0), err) + } + + // Structure to fit what bookpipeline.Graph needs + // TODO: probably reorganise bookpipeline to just need []*Conf + cconfs := make(map[string]*bookpipeline.Conf) + for _, c := range confs { + cconfs[c.Path] = c + } + + fn := flag.Arg(1) + f, err := os.Create(fn) + if err != nil { + log.Fatalln("Error creating file", fn, err) + } + defer f.Close() + err = bookpipeline.Graph(cconfs, filepath.Base(flag.Arg(0)), f) + if err != nil { + log.Fatalln("Error creating graph", err) + } +} diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go new file mode 100644 index 0000000..9e900bf --- /dev/null +++ b/cmd/getpipelinebook/main.go @@ -0,0 +1,122 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "log" + "os" + "path/filepath" + + "rescribe.xyz/bookpipeline" +) + +const usage = "Usage: getpipelinebook [-a] [-v] bookname\n\nDownloads the pipeline results for a book.\n" + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} + +type Pipeliner interface { + Init() error + ListObjects(bucket string, prefix string) ([]string, error) + Download(bucket string, key string, fn string) error + Upload(bucket string, key string, path string) error + CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) + AddToQueue(url string, msg string) error + DelFromQueue(url string, handle string) error + WIPStorageId() string +} + +func main() { + all := flag.Bool("a", false, "Get all files for book, not just hOCR and analysis files") + verbose := flag.Bool("v", false, "Verbose") + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + if flag.NArg() < 1 { + flag.Usage() + return + } + + var verboselog *log.Logger + if *verbose { + verboselog = log.New(os.Stdout, "", log.LstdFlags) + } else { + var n NullWriter + verboselog = log.New(n, "", log.LstdFlags) + } + + var conn Pipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + + verboselog.Println("Setting up AWS session") + err := conn.Init() + if err != nil { + log.Fatalln("Error setting up cloud connection:", err) + } + verboselog.Println("Finished setting up AWS session") + + bookname := flag.Arg(0) + + err = os.MkdirAll(bookname, 0755) + if err != nil { + log.Fatalln("Failed to create directory", bookname, err) + } + + if *all { + verboselog.Println("Downloading all files for", bookname) + objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) + if err != nil { + log.Fatalln("Failed to get list of files for book", bookname, err) + } + for _, i := range objs { + verboselog.Println("Downloading", i) + err = conn.Download(conn.WIPStorageId(), i, i) + if err != nil { + log.Fatalln("Failed to download file", i, err) + } + } + return + } + + verboselog.Println("Downloading best file") + fn := filepath.Join(bookname, "best") + err = conn.Download(conn.WIPStorageId(), fn, fn) + if err != nil { + log.Fatalln("Failed to download 'best' file", err) + } + f, err := os.Open(fn) + if err != nil { + log.Fatalln("Failed to open best file", err) + } + defer f.Close() + + verboselog.Println("Downloading HOCR files") + s := bufio.NewScanner(f) + for s.Scan() { + fn = filepath.Join(bookname, s.Text()) + verboselog.Println("Downloading file", fn) + err = conn.Download(conn.WIPStorageId(), fn, fn) + if err != nil { + log.Fatalln("Failed to download file", fn, err) + } + } + + analyses := []string{"conf", "graph.png"} + verboselog.Println("Downloading analysis files") + for _, a := range analyses { + fn = filepath.Join(bookname, a) + verboselog.Println("Downloading file", fn) + err = conn.Download(conn.WIPStorageId(), fn, fn) + if err != nil { + log.Fatalln("Failed to download file", fn, err) + } + } +} diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go new file mode 100644 index 0000000..0e1ebb0 --- /dev/null +++ b/cmd/lspipeline/main.go @@ -0,0 +1,250 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os/exec" + "strings" + + "rescribe.xyz/bookpipeline" +) + +const usage = `Usage: lspipeline [-i key] [-n num] + +Lists useful things related to the pipeline. + +- Instances running +- Messages in each queue +- Books not completed +- Books done +- Last n lines of bookpipeline logs from each running instance +` + +type LsPipeliner interface { + Init() error + PreQueueId() string + WipeQueueId() string + OCRQueueId() string + AnalyseQueueId() string + GetQueueDetails(url string) (string, string, error) + GetInstanceDetails() ([]bookpipeline.InstanceDetails, error) + ListObjects(bucket string, prefix string) ([]string, error) + WIPStorageId() string +} + +// NullWriter is used so non-verbose logging may be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} + +type queueDetails struct { + name, numAvailable, numInProgress string +} + +func getInstances(conn LsPipeliner, detailsc chan bookpipeline.InstanceDetails) { + details, err := conn.GetInstanceDetails() + if err != nil { + log.Println("Error getting instance details:", err) + } + for _, d := range details { + detailsc <- d + } + close(detailsc) +} + +func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) { + queues := []struct{ name, id string }{ + {"preprocess", conn.PreQueueId()}, + {"wipeonly", conn.WipeQueueId()}, + {"ocr", conn.OCRQueueId()}, + {"analyse", conn.AnalyseQueueId()}, + } + for _, q := range queues { + avail, inprog, err := conn.GetQueueDetails(q.id) + if err != nil { + log.Println("Error getting queue details:", err) + } + var qd queueDetails + qd.name = q.name + qd.numAvailable = avail + qd.numInProgress = inprog + qdetails <- qd + } + close(qdetails) +} + +// getBookStatus returns a list of in progress and done books. +// It determines this by listing all objects, and splitting the +// prefixes into two lists, those which have a 'graph.png' file, +// which are classed as done, and those which are not. +func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) { + allfiles, err := conn.ListObjects(conn.WIPStorageId(), "") + if err != nil { + log.Println("Error getting list of objects:", err) + return inprogress, done, err + } + for _, f := range allfiles { + parts := strings.Split(f, "/") + if parts[1] != "graph.png" { + continue + } + prefix := parts[0] + found := false + for _, i := range done { + if i == prefix { + found = true + continue + } + } + if !found { + done = append(done, prefix) + } + } + + for _, f := range allfiles { + parts := strings.Split(f, "/") + prefix := parts[0] + found := false + for _, i := range done { + if i == prefix { + found = true + continue + } + } + for _, i := range inprogress { + if i == prefix { + found = true + continue + } + } + if !found { + inprogress = append(inprogress, prefix) + } + } + + return inprogress, done, err +} + +func getBookStatusChan(conn LsPipeliner, inprogressc chan string, donec chan string) { + inprogress, done, err := getBookStatus(conn) + if err != nil { + log.Println("Error getting book status:", err) + close(inprogressc) + close(donec) + return + } + for _, i := range inprogress { + inprogressc <- i + } + close(inprogressc) + for _, i := range done { + donec <- i + } + close(donec) +} + +func getRecentSSHLogs(ip string, id string, n int) (string, error) { + addr := fmt.Sprintf("%s@%s", "admin", ip) + logcmd := fmt.Sprintf("journalctl -n %d -u bookpipeline", n) + var cmd *exec.Cmd + if id == "" { + cmd = exec.Command("ssh", "-o", "StrictHostKeyChecking no", addr, logcmd) + } else { + cmd = exec.Command("ssh", "-o", "StrictHostKeyChecking no", "-i", id, addr, logcmd) + } + out, err := cmd.Output() + if err != nil { + return "", err + } + return string(out), nil +} + +func getRecentSSHLogsChan(ips []string, id string, lognum int, logs chan string) { + for _, ip := range ips { + sshlog, err := getRecentSSHLogs(ip, id, lognum) + if err != nil { + log.Printf("Error getting SSH logs for %s: %s\n", ip, err) + continue + } + logs <- fmt.Sprintf("%s\n%s", ip, sshlog) + } + close(logs) +} + +func main() { + keyfile := flag.String("i", "", "private key file for SSH") + lognum := flag.Int("n", 5, "number of lines to include in SSH logs") + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + var verboselog *log.Logger + var n NullWriter + verboselog = log.New(n, "", 0) + + var conn LsPipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + err := conn.Init() + if err != nil { + log.Fatalln("Failed to set up cloud connection:", err) + } + + instances := make(chan bookpipeline.InstanceDetails, 100) + queues := make(chan queueDetails) + inprogress := make(chan string, 100) + done := make(chan string, 100) + logs := make(chan string, 10) + + go getInstances(conn, instances) + go getQueueDetails(conn, queues) + go getBookStatusChan(conn, inprogress, done) + + var ips []string + + fmt.Println("# Instances") + for i := range instances { + fmt.Printf("ID: %s, Type: %s, LaunchTime: %s, State: %s", i.Id, i.Type, i.LaunchTime, i.State) + if i.Name != "" { + fmt.Printf(", Name: %s", i.Name) + } + if i.Ip != "" { + fmt.Printf(", IP: %s", i.Ip) + if i.State == "running" && i.Name != "workhorse" { + ips = append(ips, i.Ip) + } + } + if i.Spot != "" { + fmt.Printf(", SpotRequest: %s", i.Spot) + } + fmt.Printf("\n") + } + + go getRecentSSHLogsChan(ips, *keyfile, *lognum, logs) + + fmt.Println("\n# Queues") + for i := range queues { + fmt.Printf("%s: %s available, %s in progress\n", i.name, i.numAvailable, i.numInProgress) + } + + fmt.Println("\n# Books not completed") + for i := range inprogress { + fmt.Println(i) + } + + fmt.Println("\n# Books done") + for i := range done { + fmt.Println(i) + } + + if len(ips) > 0 { + fmt.Println("\n# Recent logs") + for i := range logs { + fmt.Printf("\n%s", i) + } + } +} diff --git a/cmd/mkpipeline/main.go b/cmd/mkpipeline/main.go new file mode 100644 index 0000000..e37a56d --- /dev/null +++ b/cmd/mkpipeline/main.go @@ -0,0 +1,79 @@ +package main + +// TODO: use the bookpipeline package for aws stuff +// TODO: set up iam role and policy needed for ec2 instances to access this stuff; +// see arn:aws:iam::557852942063:policy/pipelinestorageandqueue +// and arn:aws:iam::557852942063:role/pipeliner +// TODO: set up launch template for ec2 instances +// NOTE: potentially use json templates to define things, ala aws cli + +import ( + "log" + "os" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/sqs" +) + +func main() { + if len(os.Args) != 1 { + log.Fatal("Usage: mkpipeline\n\nSets up necessary S3 buckets and SQS queues for our AWS pipeline\n") + } + + sess, err := session.NewSession(&aws.Config{ + Region: aws.String("eu-west-2"), + }) + if err != nil { + log.Fatalf("Error: failed to set up aws session: %v\n", err) + } + s3svc := s3.New(sess) + sqssvc := sqs.New(sess) + + prefix := "rescribe" + buckets := []string{"inprogress", "done"} + queues := []string{"preprocess", "wipeonly", "ocr", "analyse"} + + for _, bucket := range buckets { + bname := prefix + bucket + log.Printf("Creating bucket %s\n", bname) + _, err = s3svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bname), + }) + if err != nil { + aerr, ok := err.(awserr.Error) + if ok && (aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou) { + log.Printf("Bucket %s already exists\n", bname) + } else { + log.Fatalf("Error creating bucket %s: %v\n", bname, err) + } + } + } + + for _, queue := range queues { + qname := prefix + queue + log.Printf("Creating queue %s\n", qname) + _, err = sqssvc.CreateQueue(&sqs.CreateQueueInput{ + QueueName: aws.String(qname), + Attributes: map[string]*string{ + "VisibilityTimeout": aws.String("120"), // 2 minutes + "MessageRetentionPeriod": aws.String("1209600"), // 14 days; max allowed by sqs + "ReceiveMessageWaitTimeSeconds": aws.String("20"), + }, + }) + if err != nil { + aerr, ok := err.(awserr.Error) + // Note the QueueAlreadyExists code is only emitted if an existing queue + // has different attributes than the one that was being created. SQS just + // quietly ignores the CreateQueue request if it is identical to an + // existing queue. + if ok && aerr.Code() == sqs.ErrCodeQueueNameExists { + log.Fatalf("Error: Queue %s already exists but has different attributes\n", qname) + } else { + log.Fatalf("Error creating queue %s: %v\n", qname, err) + } + } + } +} -- cgit v1.2.1-24-ge1ad