From 933aa3bf976f66d2a184922aff613c2b22fdca1b Mon Sep 17 00:00:00 2001
From: Nick White <git@njw.name>
Date: Mon, 23 Sep 2019 16:38:34 +0100
Subject: Move the sqs stuff out to aws.go

---
 bookpipeline/cmd/lspipeline/main.go | 76 ++++++++++++++++++++++---------------
 1 file changed, 45 insertions(+), 31 deletions(-)

(limited to 'bookpipeline/cmd/lspipeline')

diff --git a/bookpipeline/cmd/lspipeline/main.go b/bookpipeline/cmd/lspipeline/main.go
index 8794b38..d49b933 100644
--- a/bookpipeline/cmd/lspipeline/main.go
+++ b/bookpipeline/cmd/lspipeline/main.go
@@ -1,20 +1,21 @@
 package main
 
 import (
-	"errors"
 	"flag"
 	"fmt"
 	"log"
+	"os"
+
+	"rescribe.xyz/go.git/bookpipeline"
 
 	// TODO: abstract out the aws stuff into aws.go in due course
 	"github.com/aws/aws-sdk-go/aws"
 	"github.com/aws/aws-sdk-go/aws/session"
 	"github.com/aws/aws-sdk-go/service/ec2"
 	//"github.com/aws/aws-sdk-go/service/s3"
-	"github.com/aws/aws-sdk-go/service/sqs"
 )
 
-const usage = `Usage: lspipeline
+const usage = `Usage: lspipeline [-v]
 
 Lists useful things related to the pipeline.
 
@@ -25,6 +26,21 @@ Lists useful things related to the pipeline.
 - Last 5 lines of bookpipeline logs from each running instance (with -v)
 `
 
+type LsPipeliner interface {
+	Init() error
+	PreQueueId() string
+	OCRQueueId() string
+	AnalyseQueueId() string
+	GetQueueDetails(url string) (string, string, error)
+}
+
+// NullWriter is used so non-verbose logging may be discarded
+type NullWriter bool
+
+func (w NullWriter) Write(p []byte) (n int, err error) {
+	return len(p), nil
+}
+
 type instanceDetails struct {
 	id, name, ip, spot, iType, state, launchTime string
 }
@@ -73,14 +89,19 @@ func parseInstances(details chan instanceDetails) (func(*ec2.DescribeInstancesOu
 	}
 }
 
-func ec2GetQueueDetails(svc *sqs.SQS, qdetails chan queueDetails, qnames []string) {
-	for _, q := range qnames {
-		avail, inprog, err := getQueueDetails(q, svc)
+func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) {
+	queues := []struct{ name, id string }{
+		{"preprocess", conn.PreQueueId()},
+		{"ocr", conn.OCRQueueId()},
+		{"analyse", conn.AnalyseQueueId()},
+	}
+	for _, q := range queues {
+		avail, inprog, err := conn.GetQueueDetails(q.id)
 		if err != nil {
 			log.Println("Error getting queue details:", err)
 		}
 		var qd queueDetails
-		qd.name = q
+		qd.name = q.name
 		qd.numAvailable = avail
 		qd.numInProgress = inprog
 		qdetails <- qd
@@ -88,34 +109,29 @@ func ec2GetQueueDetails(svc *sqs.SQS, qdetails chan queueDetails, qnames []strin
 	close(qdetails)
 }
 
-func getQueueDetails(qname string, svc *sqs.SQS) (string, string, error) {
-	result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
-		QueueName: aws.String(qname),
-	})
-	if err != nil {
-		return "", "", err
-	}
-	prequrl := *result.QueueUrl
-
-	numAvailable := "ApproximateNumberOfMessages"
-	numInProgress := "ApproximateNumberOfMessagesNotVisible"
-	attrs, err := svc.GetQueueAttributes(&sqs.GetQueueAttributesInput{
-		AttributeNames: []*string{&numAvailable, &numInProgress},
-		QueueUrl: &prequrl,
-	})
-	if err != nil {
-		return "", "", errors.New(fmt.Sprintf("Failed to get queue attributes: %s", err))
-	}
-	return *attrs.Attributes[numAvailable], *attrs.Attributes[numInProgress], nil
-}
-
 func main() {
+	verbose := flag.Bool("v", false, "verbose")
 	flag.Usage = func() {
 		fmt.Fprintf(flag.CommandLine.Output(), usage)
 		flag.PrintDefaults()
 	}
 	flag.Parse()
 
+	var verboselog *log.Logger
+	if *verbose {
+		verboselog = log.New(os.Stdout, "", 0)
+	} else {
+		var n NullWriter
+		verboselog = log.New(n, "", 0)
+	}
+
+	var conn LsPipeliner
+	conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
+	err := conn.Init()
+	if err != nil {
+		log.Fatalln("Failed to set up cloud connection:", err)
+	}
+
 	sess, err := session.NewSession(&aws.Config{
 		Region: aws.String("eu-west-2"),
 	})
@@ -123,14 +139,12 @@ func main() {
 		log.Fatalln("Failed to set up aws session", err)
 	}
 	ec2svc := ec2.New(sess)
-	//s3svc := s3.New(sess)
-	sqssvc := sqs.New(sess)
 
 	instances := make(chan instanceDetails, 100)
 	queues := make(chan queueDetails)
 
 	go ec2getInstances(ec2svc, instances)
-	go ec2GetQueueDetails(sqssvc, queues, []string{"rescribepreprocess", "rescribeocr", "rescribeanalyse"})
+	go getQueueDetails(conn, queues)
 
 	fmt.Println("# Instances")
 	for i := range instances {
-- 
cgit v1.2.1-24-ge1ad