summaryrefslogtreecommitdiff
path: root/bookpipeline/cmd/lspipeline
diff options
context:
space:
mode:
Diffstat (limited to 'bookpipeline/cmd/lspipeline')
-rw-r--r--bookpipeline/cmd/lspipeline/main.go64
1 files changed, 56 insertions, 8 deletions
diff --git a/bookpipeline/cmd/lspipeline/main.go b/bookpipeline/cmd/lspipeline/main.go
index 3972769..8794b38 100644
--- a/bookpipeline/cmd/lspipeline/main.go
+++ b/bookpipeline/cmd/lspipeline/main.go
@@ -1,6 +1,7 @@
package main
import (
+ "errors"
"flag"
"fmt"
"log"
@@ -10,7 +11,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
//"github.com/aws/aws-sdk-go/service/s3"
- //"github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/aws/aws-sdk-go/service/sqs"
)
const usage = `Usage: lspipeline
@@ -18,7 +19,7 @@ const usage = `Usage: lspipeline
Lists useful things related to the pipeline.
- Instances running
-- Messages in each queue (ApproximateNumberOfMessages and ApproximateNumberOfMessagesNotVisible from GetQueueAttributes)
+- Messages in each queue
- Books not completed (from S3 without a best file)
- Books completed (from S3 with a best file)
- Last 5 lines of bookpipeline logs from each running instance (with -v)
@@ -28,11 +29,15 @@ type instanceDetails struct {
id, name, ip, spot, iType, state, launchTime string
}
-func ec2getInstances(ec2svc *ec2.EC2, instances chan instanceDetails) {
- err := ec2svc.DescribeInstancesPages(&ec2.DescribeInstancesInput{}, parseInstances(instances))
- if err != nil {
- close(instances)
- log.Println("Error with ec2 DescribeInstancePages call:", err)
+type queueDetails struct {
+ name, numAvailable, numInProgress string
+}
+
+func ec2getInstances(svc *ec2.EC2, instances chan instanceDetails) {
+ err := svc.DescribeInstancesPages(&ec2.DescribeInstancesInput{}, parseInstances(instances))
+ if err != nil {
+ close(instances)
+ log.Println("Error with ec2 DescribeInstancePages call:", err)
}
}
@@ -68,6 +73,42 @@ func parseInstances(details chan instanceDetails) (func(*ec2.DescribeInstancesOu
}
}
+func ec2GetQueueDetails(svc *sqs.SQS, qdetails chan queueDetails, qnames []string) {
+ for _, q := range qnames {
+ avail, inprog, err := getQueueDetails(q, svc)
+ if err != nil {
+ log.Println("Error getting queue details:", err)
+ }
+ var qd queueDetails
+ qd.name = q
+ qd.numAvailable = avail
+ qd.numInProgress = inprog
+ qdetails <- qd
+ }
+ close(qdetails)
+}
+
+func getQueueDetails(qname string, svc *sqs.SQS) (string, string, error) {
+ result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String(qname),
+ })
+ if err != nil {
+ return "", "", err
+ }
+ prequrl := *result.QueueUrl
+
+ numAvailable := "ApproximateNumberOfMessages"
+ numInProgress := "ApproximateNumberOfMessagesNotVisible"
+ attrs, err := svc.GetQueueAttributes(&sqs.GetQueueAttributesInput{
+ AttributeNames: []*string{&numAvailable, &numInProgress},
+ QueueUrl: &prequrl,
+ })
+ if err != nil {
+ return "", "", errors.New(fmt.Sprintf("Failed to get queue attributes: %s", err))
+ }
+ return *attrs.Attributes[numAvailable], *attrs.Attributes[numInProgress], nil
+}
+
func main() {
flag.Usage = func() {
fmt.Fprintf(flag.CommandLine.Output(), usage)
@@ -83,11 +124,13 @@ func main() {
}
ec2svc := ec2.New(sess)
//s3svc := s3.New(sess)
- //sqssvc := sqs.New(sess)
+ sqssvc := sqs.New(sess)
instances := make(chan instanceDetails, 100)
+ queues := make(chan queueDetails)
go ec2getInstances(ec2svc, instances)
+ go ec2GetQueueDetails(sqssvc, queues, []string{"rescribepreprocess", "rescribeocr", "rescribeanalyse"})
fmt.Println("# Instances")
for i := range instances {
@@ -104,5 +147,10 @@ func main() {
fmt.Printf("\n")
}
+ fmt.Println("\n# Queues")
+ for i := range queues {
+ fmt.Printf("%s: %s available, %s in progress\n", i.name, i.numAvailable, i.numInProgress)
+ }
+
// TODO: See remaining items in the usage statement
}