// Copyright 2019 Nick White. // Use of this source code is governed by the GPLv3 // license that can be found in the LICENSE file. // lspipeline-ng lists useful things related to the book pipeline. package main import ( "flag" "fmt" "log" "os/exec" "sort" "strings" "sync" "time" "rescribe.xyz/bookpipeline" ) const usage = `Usage: lspipeline-ng [-i key] [-n num] [-nobooks] Lists useful things related to the pipeline. - Instances running - Messages in each queue - Books not completed - Books done - Last n lines of bookpipeline logs from each running instance The -ng version does concurrent requests for book status to speed that process up significantly. ` type LsPipeliner interface { Init() error PreQueueId() string WipeQueueId() string OCRPageQueueId() string AnalyseQueueId() string GetQueueDetails(url string) (string, string, error) GetInstanceDetails() ([]bookpipeline.InstanceDetails, error) ListObjectWithMeta(bucket string, prefix string) (bookpipeline.ObjMeta, error) ListObjectPrefixes(bucket string) ([]string, error) WIPStorageId() string } // NullWriter is used so non-verbose logging may be discarded type NullWriter bool func (w NullWriter) Write(p []byte) (n int, err error) { return len(p), nil } type queueDetails struct { name, numAvailable, numInProgress string } func getInstances(conn LsPipeliner, detailsc chan bookpipeline.InstanceDetails) { details, err := conn.GetInstanceDetails() if err != nil { log.Println("Error getting instance details:", err) } for _, d := range details { detailsc <- d } close(detailsc) } func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) { queues := []struct{ name, id string }{ {"preprocess", conn.PreQueueId()}, {"wipeonly", conn.WipeQueueId()}, {"ocrpage", conn.OCRPageQueueId()}, {"analyse", conn.AnalyseQueueId()}, } for _, q := range queues { avail, inprog, err := conn.GetQueueDetails(q.id) if err != nil { log.Println("Error getting queue details:", err) } var qd queueDetails qd.name = q.name qd.numAvailable = avail qd.numInProgress = inprog qdetails <- qd } close(qdetails) } type ObjMetas []bookpipeline.ObjMeta // used by sort.Sort func (o ObjMetas) Len() int { return len(o) } // used by sort.Sort func (o ObjMetas) Swap(i, j int) { o[i], o[j] = o[j], o[i] } // used by sort.Sort func (o ObjMetas) Less(i, j int) bool { return o[i].Date.Before(o[j].Date) } // getBookDetails determines whether a book is done and what date // it was completed, or if it has not finished, the date of any // book file. func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, err error) { // First try to get the graph.png file from the book, which marks // it as done obj, err := conn.ListObjectWithMeta(conn.WIPStorageId(), key+"graph.png") if err == nil { return obj.Date, true, nil } // Otherwise get any file from the book to get a date to sort by obj, err = conn.ListObjectWithMeta(conn.WIPStorageId(), key) if err != nil { return time.Time{}, false, err } return obj.Date, false, nil } // getBookDetailsChan gets the details for a book putting it into either the // done or inprogress channels as appropriate, or sending an error to errc // on failure. func getBookDetailsChan(conn LsPipeliner, wg *sync.WaitGroup, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error) { fmt.Printf("getting book details for %s\n", key) date, isdone, err := getBookDetails(conn, key) if err != nil { wg.Done() errc <- err return } meta := bookpipeline.ObjMeta{Name: strings.TrimSuffix(key, "/"), Date: date} if isdone { done <- meta } else { inprogress <- meta } wg.Done() } // getBookStatus returns a list of in progress and done books. // It determines this by finding all prefixes, and splitting them // into two lists, those which have a 'graph.png' file (the done // list), and those which do not (the inprogress list). They are // sorted according to the date of the graph.png file, or the date // of a random file with the prefix if no graph.png was found. // It spins up many goroutines to do query the book status and // dates, as it is far faster to do concurrently. func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) { prefixes, err := conn.ListObjectPrefixes(conn.WIPStorageId()) if err != nil { log.Println("Error getting object prefixes:", err) return } donec := make(chan bookpipeline.ObjMeta, 100) inprogressc := make(chan bookpipeline.ObjMeta, 100) errc := make(chan error) // This is a bit messy, but it works. We get the book details in blocks of // 30 simultaneous requests, using wait groups to pause once each 20 have // been sent. go func() { var wg sync.WaitGroup for i, p := range prefixes { wg.Add(1) go getBookDetailsChan(conn, &wg, p, donec, inprogressc, errc) if i % 30 == 0 { wg.Wait() } } }() var inprogressmeta, donemeta ObjMetas // there will be exactly as many sends to donec or inprogressc // as there are prefixes for range prefixes { select { case i := <-donec: donemeta = append(donemeta, i) case i := <-inprogressc: inprogressmeta = append(inprogressmeta, i) case err = <-errc: return inprogress, done, err } } sort.Sort(donemeta) sort.Sort(inprogressmeta) for _, i := range donemeta { done = append(done, i.Name) } for _, i := range inprogressmeta { inprogress = append(inprogress, i.Name) } return } // getBookStatusChan runs getBookStatus and sends its results to // channels for the done and receive arrays. func getBookStatusChan(conn LsPipeliner, inprogressc chan string, donec chan string) { inprogress, done, err := getBookStatus(conn) if err != nil { log.Println("Error getting book status:", err) close(inprogressc) close(donec) return } for _, i := range inprogress { inprogressc <- i } close(inprogressc) for _, i := range done { donec <- i } close(donec) } func getRecentSSHLogs(ip string, id string, n int) (string, error) { addr := fmt.Sprintf("%s@%s", "admin", ip) logcmd := fmt.Sprintf("journalctl -n %d -u bookpipeline", n) var cmd *exec.Cmd if id == "" { cmd = exec.Command("ssh", "-o", "StrictHostKeyChecking no", addr, logcmd) } else { cmd = exec.Command("ssh", "-o", "StrictHostKeyChecking no", "-i", id, addr, logcmd) } out, err := cmd.Output() if err != nil { return "", err } return string(out), nil } func getRecentSSHLogsChan(ips []string, id string, lognum int, logs chan string) { for _, ip := range ips { sshlog, err := getRecentSSHLogs(ip, id, lognum) if err != nil { log.Printf("Error getting SSH logs for %s: %s\n", ip, err) continue } logs <- fmt.Sprintf("%s\n%s", ip, sshlog) } close(logs) } func main() { keyfile := flag.String("i", "", "private key file for SSH") lognum := flag.Int("n", 5, "number of lines to include in SSH logs") nobooks := flag.Bool("nobooks", false, "disable listing books completed and not completed (which takes some time)") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) flag.PrintDefaults() } flag.Parse() var verboselog *log.Logger var n NullWriter verboselog = log.New(n, "", 0) var conn LsPipeliner conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} err := conn.Init() if err != nil { log.Fatalln("Failed to set up cloud connection:", err) } instances := make(chan bookpipeline.InstanceDetails, 100) queues := make(chan queueDetails) inprogress := make(chan string, 100) done := make(chan string, 100) logs := make(chan string, 10) go getInstances(conn, instances) go getQueueDetails(conn, queues) if !*nobooks { go getBookStatusChan(conn, inprogress, done) } var ips []string fmt.Println("# Instances") for i := range instances { fmt.Printf("ID: %s, Type: %s, LaunchTime: %s, State: %s", i.Id, i.Type, i.LaunchTime, i.State) if i.Name != "" { fmt.Printf(", Name: %s", i.Name) } if i.Ip != "" { fmt.Printf(", IP: %s", i.Ip) if i.State == "running" && i.Name != "workhorse" { ips = append(ips, i.Ip) } } if i.Spot != "" { fmt.Printf(", SpotRequest: %s", i.Spot) } fmt.Printf("\n") } go getRecentSSHLogsChan(ips, *keyfile, *lognum, logs) fmt.Println("\n# Queues") for i := range queues { fmt.Printf("%s: %s available, %s in progress\n", i.name, i.numAvailable, i.numInProgress) } if len(ips) > 0 { fmt.Println("\n# Recent logs") for i := range logs { fmt.Printf("\n%s", i) } } if !*nobooks { fmt.Println("\n# Books not completed") for i := range inprogress { fmt.Println(i) } fmt.Println("\n# Books done") for i := range done { fmt.Println(i) } } }