From 933aa3bf976f66d2a184922aff613c2b22fdca1b Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 23 Sep 2019 16:38:34 +0100 Subject: Move the sqs stuff out to aws.go --- bookpipeline/aws.go | 15 ++++++++ bookpipeline/cmd/lspipeline/main.go | 76 ++++++++++++++++++++++--------------- 2 files changed, 60 insertions(+), 31 deletions(-) diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go index 4d32f3d..689f1b2 100644 --- a/bookpipeline/aws.go +++ b/bookpipeline/aws.go @@ -165,6 +165,21 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e return Qmsg{}, nil } +// GetQueueDetails gets the number of in progress and available +// messages for a queue. These are returned as strings. +func (a *AwsConn) GetQueueDetails(url string) (string, string, error) { + numAvailable := "ApproximateNumberOfMessages" + numInProgress := "ApproximateNumberOfMessagesNotVisible" + attrs, err := a.sqssvc.GetQueueAttributes(&sqs.GetQueueAttributesInput{ + AttributeNames: []*string{&numAvailable, &numInProgress}, + QueueUrl: &url, + }) + if err != nil { + return "", "", errors.New(fmt.Sprintf("Failed to get queue attributes: %s", err)) + } + return *attrs.Attributes[numAvailable], *attrs.Attributes[numInProgress], nil +} + func (a *AwsConn) PreQueueId() string { return a.prequrl } diff --git a/bookpipeline/cmd/lspipeline/main.go b/bookpipeline/cmd/lspipeline/main.go index 8794b38..d49b933 100644 --- a/bookpipeline/cmd/lspipeline/main.go +++ b/bookpipeline/cmd/lspipeline/main.go @@ -1,20 +1,21 @@ package main import ( - "errors" "flag" "fmt" "log" + "os" + + "rescribe.xyz/go.git/bookpipeline" // TODO: abstract out the aws stuff into aws.go in due course "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" //"github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/sqs" ) -const usage = `Usage: lspipeline +const usage = `Usage: lspipeline [-v] Lists useful things related to the pipeline. @@ -25,6 +26,21 @@ Lists useful things related to the pipeline. - Last 5 lines of bookpipeline logs from each running instance (with -v) ` +type LsPipeliner interface { + Init() error + PreQueueId() string + OCRQueueId() string + AnalyseQueueId() string + GetQueueDetails(url string) (string, string, error) +} + +// NullWriter is used so non-verbose logging may be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} + type instanceDetails struct { id, name, ip, spot, iType, state, launchTime string } @@ -73,14 +89,19 @@ func parseInstances(details chan instanceDetails) (func(*ec2.DescribeInstancesOu } } -func ec2GetQueueDetails(svc *sqs.SQS, qdetails chan queueDetails, qnames []string) { - for _, q := range qnames { - avail, inprog, err := getQueueDetails(q, svc) +func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) { + queues := []struct{ name, id string }{ + {"preprocess", conn.PreQueueId()}, + {"ocr", conn.OCRQueueId()}, + {"analyse", conn.AnalyseQueueId()}, + } + for _, q := range queues { + avail, inprog, err := conn.GetQueueDetails(q.id) if err != nil { log.Println("Error getting queue details:", err) } var qd queueDetails - qd.name = q + qd.name = q.name qd.numAvailable = avail qd.numInProgress = inprog qdetails <- qd @@ -88,34 +109,29 @@ func ec2GetQueueDetails(svc *sqs.SQS, qdetails chan queueDetails, qnames []strin close(qdetails) } -func getQueueDetails(qname string, svc *sqs.SQS) (string, string, error) { - result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String(qname), - }) - if err != nil { - return "", "", err - } - prequrl := *result.QueueUrl - - numAvailable := "ApproximateNumberOfMessages" - numInProgress := "ApproximateNumberOfMessagesNotVisible" - attrs, err := svc.GetQueueAttributes(&sqs.GetQueueAttributesInput{ - AttributeNames: []*string{&numAvailable, &numInProgress}, - QueueUrl: &prequrl, - }) - if err != nil { - return "", "", errors.New(fmt.Sprintf("Failed to get queue attributes: %s", err)) - } - return *attrs.Attributes[numAvailable], *attrs.Attributes[numInProgress], nil -} - func main() { + verbose := flag.Bool("v", false, "verbose") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) flag.PrintDefaults() } flag.Parse() + var verboselog *log.Logger + if *verbose { + verboselog = log.New(os.Stdout, "", 0) + } else { + var n NullWriter + verboselog = log.New(n, "", 0) + } + + var conn LsPipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + err := conn.Init() + if err != nil { + log.Fatalln("Failed to set up cloud connection:", err) + } + sess, err := session.NewSession(&aws.Config{ Region: aws.String("eu-west-2"), }) @@ -123,14 +139,12 @@ func main() { log.Fatalln("Failed to set up aws session", err) } ec2svc := ec2.New(sess) - //s3svc := s3.New(sess) - sqssvc := sqs.New(sess) instances := make(chan instanceDetails, 100) queues := make(chan queueDetails) go ec2getInstances(ec2svc, instances) - go ec2GetQueueDetails(sqssvc, queues, []string{"rescribepreprocess", "rescribeocr", "rescribeanalyse"}) + go getQueueDetails(conn, queues) fmt.Println("# Instances") for i := range instances { -- cgit v1.2.1-24-ge1ad