diff options
author | Nick White <git@njw.name> | 2019-10-08 12:52:33 +0100 |
---|---|---|
committer | Nick White <git@njw.name> | 2019-10-08 12:52:33 +0100 |
commit | 7482157a03ed3e9d7f45e54a126b391001f34948 (patch) | |
tree | 52f87b9ca159fe4c04a0349de95ea9de82692b3c /cmd/lspipeline | |
parent | d43c11bf653bfe3c1ad1ed277f1ec08bf155cf98 (diff) |
Separate out bookpipeline from catch-all go.git repo, and rename to rescribe.xyz/bookpipeline
The dependencies from the go.git repo will follow in due course.
Diffstat (limited to 'cmd/lspipeline')
-rw-r--r-- | cmd/lspipeline/main.go | 250 |
1 files changed, 250 insertions, 0 deletions
diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go new file mode 100644 index 0000000..0e1ebb0 --- /dev/null +++ b/cmd/lspipeline/main.go @@ -0,0 +1,250 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os/exec" + "strings" + + "rescribe.xyz/bookpipeline" +) + +const usage = `Usage: lspipeline [-i key] [-n num] + +Lists useful things related to the pipeline. + +- Instances running +- Messages in each queue +- Books not completed +- Books done +- Last n lines of bookpipeline logs from each running instance +` + +type LsPipeliner interface { + Init() error + PreQueueId() string + WipeQueueId() string + OCRQueueId() string + AnalyseQueueId() string + GetQueueDetails(url string) (string, string, error) + GetInstanceDetails() ([]bookpipeline.InstanceDetails, error) + ListObjects(bucket string, prefix string) ([]string, error) + WIPStorageId() string +} + +// NullWriter is used so non-verbose logging may be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} + +type queueDetails struct { + name, numAvailable, numInProgress string +} + +func getInstances(conn LsPipeliner, detailsc chan bookpipeline.InstanceDetails) { + details, err := conn.GetInstanceDetails() + if err != nil { + log.Println("Error getting instance details:", err) + } + for _, d := range details { + detailsc <- d + } + close(detailsc) +} + +func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) { + queues := []struct{ name, id string }{ + {"preprocess", conn.PreQueueId()}, + {"wipeonly", conn.WipeQueueId()}, + {"ocr", conn.OCRQueueId()}, + {"analyse", conn.AnalyseQueueId()}, + } + for _, q := range queues { + avail, inprog, err := conn.GetQueueDetails(q.id) + if err != nil { + log.Println("Error getting queue details:", err) + } + var qd queueDetails + qd.name = q.name + qd.numAvailable = avail + qd.numInProgress = inprog + qdetails <- qd + } + close(qdetails) +} + +// getBookStatus returns a list of in progress and done books. +// It determines this by listing all objects, and splitting the +// prefixes into two lists, those which have a 'graph.png' file, +// which are classed as done, and those which are not. +func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) { + allfiles, err := conn.ListObjects(conn.WIPStorageId(), "") + if err != nil { + log.Println("Error getting list of objects:", err) + return inprogress, done, err + } + for _, f := range allfiles { + parts := strings.Split(f, "/") + if parts[1] != "graph.png" { + continue + } + prefix := parts[0] + found := false + for _, i := range done { + if i == prefix { + found = true + continue + } + } + if !found { + done = append(done, prefix) + } + } + + for _, f := range allfiles { + parts := strings.Split(f, "/") + prefix := parts[0] + found := false + for _, i := range done { + if i == prefix { + found = true + continue + } + } + for _, i := range inprogress { + if i == prefix { + found = true + continue + } + } + if !found { + inprogress = append(inprogress, prefix) + } + } + + return inprogress, done, err +} + +func getBookStatusChan(conn LsPipeliner, inprogressc chan string, donec chan string) { + inprogress, done, err := getBookStatus(conn) + if err != nil { + log.Println("Error getting book status:", err) + close(inprogressc) + close(donec) + return + } + for _, i := range inprogress { + inprogressc <- i + } + close(inprogressc) + for _, i := range done { + donec <- i + } + close(donec) +} + +func getRecentSSHLogs(ip string, id string, n int) (string, error) { + addr := fmt.Sprintf("%s@%s", "admin", ip) + logcmd := fmt.Sprintf("journalctl -n %d -u bookpipeline", n) + var cmd *exec.Cmd + if id == "" { + cmd = exec.Command("ssh", "-o", "StrictHostKeyChecking no", addr, logcmd) + } else { + cmd = exec.Command("ssh", "-o", "StrictHostKeyChecking no", "-i", id, addr, logcmd) + } + out, err := cmd.Output() + if err != nil { + return "", err + } + return string(out), nil +} + +func getRecentSSHLogsChan(ips []string, id string, lognum int, logs chan string) { + for _, ip := range ips { + sshlog, err := getRecentSSHLogs(ip, id, lognum) + if err != nil { + log.Printf("Error getting SSH logs for %s: %s\n", ip, err) + continue + } + logs <- fmt.Sprintf("%s\n%s", ip, sshlog) + } + close(logs) +} + +func main() { + keyfile := flag.String("i", "", "private key file for SSH") + lognum := flag.Int("n", 5, "number of lines to include in SSH logs") + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + var verboselog *log.Logger + var n NullWriter + verboselog = log.New(n, "", 0) + + var conn LsPipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + err := conn.Init() + if err != nil { + log.Fatalln("Failed to set up cloud connection:", err) + } + + instances := make(chan bookpipeline.InstanceDetails, 100) + queues := make(chan queueDetails) + inprogress := make(chan string, 100) + done := make(chan string, 100) + logs := make(chan string, 10) + + go getInstances(conn, instances) + go getQueueDetails(conn, queues) + go getBookStatusChan(conn, inprogress, done) + + var ips []string + + fmt.Println("# Instances") + for i := range instances { + fmt.Printf("ID: %s, Type: %s, LaunchTime: %s, State: %s", i.Id, i.Type, i.LaunchTime, i.State) + if i.Name != "" { + fmt.Printf(", Name: %s", i.Name) + } + if i.Ip != "" { + fmt.Printf(", IP: %s", i.Ip) + if i.State == "running" && i.Name != "workhorse" { + ips = append(ips, i.Ip) + } + } + if i.Spot != "" { + fmt.Printf(", SpotRequest: %s", i.Spot) + } + fmt.Printf("\n") + } + + go getRecentSSHLogsChan(ips, *keyfile, *lognum, logs) + + fmt.Println("\n# Queues") + for i := range queues { + fmt.Printf("%s: %s available, %s in progress\n", i.name, i.numAvailable, i.numInProgress) + } + + fmt.Println("\n# Books not completed") + for i := range inprogress { + fmt.Println(i) + } + + fmt.Println("\n# Books done") + for i := range done { + fmt.Println(i) + } + + if len(ips) > 0 { + fmt.Println("\n# Recent logs") + for i := range logs { + fmt.Printf("\n%s", i) + } + } +} |