From 107b1d28add39d1e252beec72b83f9fd69ee5cdb Mon Sep 17 00:00:00 2001 From: Nick White Date: Thu, 19 Sep 2019 17:46:07 +0100 Subject: Add queue listing to lspipeline --- bookpipeline/cmd/lspipeline/main.go | 64 ++++++++++++++++++++++++++++++++----- 1 file changed, 56 insertions(+), 8 deletions(-) diff --git a/bookpipeline/cmd/lspipeline/main.go b/bookpipeline/cmd/lspipeline/main.go index 3972769..8794b38 100644 --- a/bookpipeline/cmd/lspipeline/main.go +++ b/bookpipeline/cmd/lspipeline/main.go @@ -1,6 +1,7 @@ package main import ( + "errors" "flag" "fmt" "log" @@ -10,7 +11,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" //"github.com/aws/aws-sdk-go/service/s3" - //"github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go/service/sqs" ) const usage = `Usage: lspipeline @@ -18,7 +19,7 @@ const usage = `Usage: lspipeline Lists useful things related to the pipeline. - Instances running -- Messages in each queue (ApproximateNumberOfMessages and ApproximateNumberOfMessagesNotVisible from GetQueueAttributes) +- Messages in each queue - Books not completed (from S3 without a best file) - Books completed (from S3 with a best file) - Last 5 lines of bookpipeline logs from each running instance (with -v) @@ -28,11 +29,15 @@ type instanceDetails struct { id, name, ip, spot, iType, state, launchTime string } -func ec2getInstances(ec2svc *ec2.EC2, instances chan instanceDetails) { - err := ec2svc.DescribeInstancesPages(&ec2.DescribeInstancesInput{}, parseInstances(instances)) - if err != nil { - close(instances) - log.Println("Error with ec2 DescribeInstancePages call:", err) +type queueDetails struct { + name, numAvailable, numInProgress string +} + +func ec2getInstances(svc *ec2.EC2, instances chan instanceDetails) { + err := svc.DescribeInstancesPages(&ec2.DescribeInstancesInput{}, parseInstances(instances)) + if err != nil { + close(instances) + log.Println("Error with ec2 DescribeInstancePages call:", err) } } @@ -68,6 +73,42 @@ func parseInstances(details chan instanceDetails) (func(*ec2.DescribeInstancesOu } } +func ec2GetQueueDetails(svc *sqs.SQS, qdetails chan queueDetails, qnames []string) { + for _, q := range qnames { + avail, inprog, err := getQueueDetails(q, svc) + if err != nil { + log.Println("Error getting queue details:", err) + } + var qd queueDetails + qd.name = q + qd.numAvailable = avail + qd.numInProgress = inprog + qdetails <- qd + } + close(qdetails) +} + +func getQueueDetails(qname string, svc *sqs.SQS) (string, string, error) { + result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(qname), + }) + if err != nil { + return "", "", err + } + prequrl := *result.QueueUrl + + numAvailable := "ApproximateNumberOfMessages" + numInProgress := "ApproximateNumberOfMessagesNotVisible" + attrs, err := svc.GetQueueAttributes(&sqs.GetQueueAttributesInput{ + AttributeNames: []*string{&numAvailable, &numInProgress}, + QueueUrl: &prequrl, + }) + if err != nil { + return "", "", errors.New(fmt.Sprintf("Failed to get queue attributes: %s", err)) + } + return *attrs.Attributes[numAvailable], *attrs.Attributes[numInProgress], nil +} + func main() { flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) @@ -83,11 +124,13 @@ func main() { } ec2svc := ec2.New(sess) //s3svc := s3.New(sess) - //sqssvc := sqs.New(sess) + sqssvc := sqs.New(sess) instances := make(chan instanceDetails, 100) + queues := make(chan queueDetails) go ec2getInstances(ec2svc, instances) + go ec2GetQueueDetails(sqssvc, queues, []string{"rescribepreprocess", "rescribeocr", "rescribeanalyse"}) fmt.Println("# Instances") for i := range instances { @@ -104,5 +147,10 @@ func main() { fmt.Printf("\n") } + fmt.Println("\n# Queues") + for i := range queues { + fmt.Printf("%s: %s available, %s in progress\n", i.name, i.numAvailable, i.numInProgress) + } + // TODO: See remaining items in the usage statement } -- cgit v1.2.1-24-ge1ad