diff options
| -rw-r--r-- | bookpipeline/cmd/lspipeline/main.go | 64 | 
1 files changed, 56 insertions, 8 deletions
| diff --git a/bookpipeline/cmd/lspipeline/main.go b/bookpipeline/cmd/lspipeline/main.go index 3972769..8794b38 100644 --- a/bookpipeline/cmd/lspipeline/main.go +++ b/bookpipeline/cmd/lspipeline/main.go @@ -1,6 +1,7 @@  package main  import ( +	"errors"  	"flag"  	"fmt"  	"log" @@ -10,7 +11,7 @@ import (  	"github.com/aws/aws-sdk-go/aws/session"  	"github.com/aws/aws-sdk-go/service/ec2"  	//"github.com/aws/aws-sdk-go/service/s3" -	//"github.com/aws/aws-sdk-go/service/sqs" +	"github.com/aws/aws-sdk-go/service/sqs"  )  const usage = `Usage: lspipeline @@ -18,7 +19,7 @@ const usage = `Usage: lspipeline  Lists useful things related to the pipeline.  - Instances running -- Messages in each queue (ApproximateNumberOfMessages and ApproximateNumberOfMessagesNotVisible from GetQueueAttributes) +- Messages in each queue  - Books not completed (from S3 without a best file)  - Books completed (from S3 with a best file)  - Last 5 lines of bookpipeline logs from each running instance (with -v) @@ -28,11 +29,15 @@ type instanceDetails struct {  	id, name, ip, spot, iType, state, launchTime string  } -func ec2getInstances(ec2svc *ec2.EC2, instances chan instanceDetails) { -	err := ec2svc.DescribeInstancesPages(&ec2.DescribeInstancesInput{}, parseInstances(instances)) -        if err != nil { -                close(instances) -                log.Println("Error with ec2 DescribeInstancePages call:", err) +type queueDetails struct { +	name, numAvailable, numInProgress string +} + +func ec2getInstances(svc *ec2.EC2, instances chan instanceDetails) { +	err := svc.DescribeInstancesPages(&ec2.DescribeInstancesInput{}, parseInstances(instances)) +	if err != nil { +		close(instances) +		log.Println("Error with ec2 DescribeInstancePages call:", err)  	}  } @@ -68,6 +73,42 @@ func parseInstances(details chan instanceDetails) (func(*ec2.DescribeInstancesOu  	}  } +func ec2GetQueueDetails(svc *sqs.SQS, qdetails chan queueDetails, qnames []string) { +	for _, q := range qnames { +		avail, inprog, err := getQueueDetails(q, svc) +		if err != nil { +			log.Println("Error getting queue details:", err) +		} +		var qd queueDetails +		qd.name = q +		qd.numAvailable = avail +		qd.numInProgress = inprog +		qdetails <- qd +	} +	close(qdetails) +} + +func getQueueDetails(qname string, svc *sqs.SQS) (string, string, error) { +	result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ +		QueueName: aws.String(qname), +	}) +	if err != nil { +		return "", "", err +	} +	prequrl := *result.QueueUrl + +	numAvailable := "ApproximateNumberOfMessages" +	numInProgress := "ApproximateNumberOfMessagesNotVisible" +	attrs, err := svc.GetQueueAttributes(&sqs.GetQueueAttributesInput{ +		AttributeNames: []*string{&numAvailable, &numInProgress}, +		QueueUrl: &prequrl, +	}) +	if err != nil { +		return "", "", errors.New(fmt.Sprintf("Failed to get queue attributes: %s", err)) +	} +	return *attrs.Attributes[numAvailable], *attrs.Attributes[numInProgress], nil +} +  func main() {  	flag.Usage = func() {  		fmt.Fprintf(flag.CommandLine.Output(), usage) @@ -83,11 +124,13 @@ func main() {  	}  	ec2svc := ec2.New(sess)  	//s3svc := s3.New(sess) -	//sqssvc := sqs.New(sess) +	sqssvc := sqs.New(sess)  	instances := make(chan instanceDetails, 100) +	queues := make(chan queueDetails)  	go ec2getInstances(ec2svc, instances) +	go ec2GetQueueDetails(sqssvc, queues, []string{"rescribepreprocess", "rescribeocr", "rescribeanalyse"})  	fmt.Println("# Instances")  	for i := range instances { @@ -104,5 +147,10 @@ func main() {  		fmt.Printf("\n")  	} +	fmt.Println("\n# Queues") +	for i := range queues { +		fmt.Printf("%s: %s available, %s in progress\n", i.name, i.numAvailable, i.numInProgress) +	} +  	// TODO: See remaining items in the usage statement  } | 
