summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-09-23 16:38:34 +0100
committerNick White <git@njw.name>2019-09-23 16:38:34 +0100
commit933aa3bf976f66d2a184922aff613c2b22fdca1b (patch)
tree858425d74bf2743c6d0baa53ba1474262ac34343
parent107b1d28add39d1e252beec72b83f9fd69ee5cdb (diff)
Move the sqs stuff out to aws.go
-rw-r--r--bookpipeline/aws.go15
-rw-r--r--bookpipeline/cmd/lspipeline/main.go76
2 files changed, 60 insertions, 31 deletions
diff --git a/bookpipeline/aws.go b/bookpipeline/aws.go
index 4d32f3d..689f1b2 100644
--- a/bookpipeline/aws.go
+++ b/bookpipeline/aws.go
@@ -165,6 +165,21 @@ func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, e
return Qmsg{}, nil
}
+// GetQueueDetails gets the number of in progress and available
+// messages for a queue. These are returned as strings.
+func (a *AwsConn) GetQueueDetails(url string) (string, string, error) {
+ numAvailable := "ApproximateNumberOfMessages"
+ numInProgress := "ApproximateNumberOfMessagesNotVisible"
+ attrs, err := a.sqssvc.GetQueueAttributes(&sqs.GetQueueAttributesInput{
+ AttributeNames: []*string{&numAvailable, &numInProgress},
+ QueueUrl: &url,
+ })
+ if err != nil {
+ return "", "", errors.New(fmt.Sprintf("Failed to get queue attributes: %s", err))
+ }
+ return *attrs.Attributes[numAvailable], *attrs.Attributes[numInProgress], nil
+}
+
func (a *AwsConn) PreQueueId() string {
return a.prequrl
}
diff --git a/bookpipeline/cmd/lspipeline/main.go b/bookpipeline/cmd/lspipeline/main.go
index 8794b38..d49b933 100644
--- a/bookpipeline/cmd/lspipeline/main.go
+++ b/bookpipeline/cmd/lspipeline/main.go
@@ -1,20 +1,21 @@
package main
import (
- "errors"
"flag"
"fmt"
"log"
+ "os"
+
+ "rescribe.xyz/go.git/bookpipeline"
// TODO: abstract out the aws stuff into aws.go in due course
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
//"github.com/aws/aws-sdk-go/service/s3"
- "github.com/aws/aws-sdk-go/service/sqs"
)
-const usage = `Usage: lspipeline
+const usage = `Usage: lspipeline [-v]
Lists useful things related to the pipeline.
@@ -25,6 +26,21 @@ Lists useful things related to the pipeline.
- Last 5 lines of bookpipeline logs from each running instance (with -v)
`
+type LsPipeliner interface {
+ Init() error
+ PreQueueId() string
+ OCRQueueId() string
+ AnalyseQueueId() string
+ GetQueueDetails(url string) (string, string, error)
+}
+
+// NullWriter is used so non-verbose logging may be discarded
+type NullWriter bool
+
+func (w NullWriter) Write(p []byte) (n int, err error) {
+ return len(p), nil
+}
+
type instanceDetails struct {
id, name, ip, spot, iType, state, launchTime string
}
@@ -73,14 +89,19 @@ func parseInstances(details chan instanceDetails) (func(*ec2.DescribeInstancesOu
}
}
-func ec2GetQueueDetails(svc *sqs.SQS, qdetails chan queueDetails, qnames []string) {
- for _, q := range qnames {
- avail, inprog, err := getQueueDetails(q, svc)
+func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) {
+ queues := []struct{ name, id string }{
+ {"preprocess", conn.PreQueueId()},
+ {"ocr", conn.OCRQueueId()},
+ {"analyse", conn.AnalyseQueueId()},
+ }
+ for _, q := range queues {
+ avail, inprog, err := conn.GetQueueDetails(q.id)
if err != nil {
log.Println("Error getting queue details:", err)
}
var qd queueDetails
- qd.name = q
+ qd.name = q.name
qd.numAvailable = avail
qd.numInProgress = inprog
qdetails <- qd
@@ -88,34 +109,29 @@ func ec2GetQueueDetails(svc *sqs.SQS, qdetails chan queueDetails, qnames []strin
close(qdetails)
}
-func getQueueDetails(qname string, svc *sqs.SQS) (string, string, error) {
- result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
- QueueName: aws.String(qname),
- })
- if err != nil {
- return "", "", err
- }
- prequrl := *result.QueueUrl
-
- numAvailable := "ApproximateNumberOfMessages"
- numInProgress := "ApproximateNumberOfMessagesNotVisible"
- attrs, err := svc.GetQueueAttributes(&sqs.GetQueueAttributesInput{
- AttributeNames: []*string{&numAvailable, &numInProgress},
- QueueUrl: &prequrl,
- })
- if err != nil {
- return "", "", errors.New(fmt.Sprintf("Failed to get queue attributes: %s", err))
- }
- return *attrs.Attributes[numAvailable], *attrs.Attributes[numInProgress], nil
-}
-
func main() {
+ verbose := flag.Bool("v", false, "verbose")
flag.Usage = func() {
fmt.Fprintf(flag.CommandLine.Output(), usage)
flag.PrintDefaults()
}
flag.Parse()
+ var verboselog *log.Logger
+ if *verbose {
+ verboselog = log.New(os.Stdout, "", 0)
+ } else {
+ var n NullWriter
+ verboselog = log.New(n, "", 0)
+ }
+
+ var conn LsPipeliner
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
+ err := conn.Init()
+ if err != nil {
+ log.Fatalln("Failed to set up cloud connection:", err)
+ }
+
sess, err := session.NewSession(&aws.Config{
Region: aws.String("eu-west-2"),
})
@@ -123,14 +139,12 @@ func main() {
log.Fatalln("Failed to set up aws session", err)
}
ec2svc := ec2.New(sess)
- //s3svc := s3.New(sess)
- sqssvc := sqs.New(sess)
instances := make(chan instanceDetails, 100)
queues := make(chan queueDetails)
go ec2getInstances(ec2svc, instances)
- go ec2GetQueueDetails(sqssvc, queues, []string{"rescribepreprocess", "rescribeocr", "rescribeanalyse"})
+ go getQueueDetails(conn, queues)
fmt.Println("# Instances")
for i := range instances {