From 48d894897894a010db1661259fa375e1be36e659 Mon Sep 17 00:00:00 2001
From: Nick White <git@njw.name>
Date: Wed, 28 Aug 2019 18:12:20 +0100
Subject: Move booktopipeline and mkpipeline into bookpipeline/cmd

---
 bookpipeline/cmd/booktopipeline/main.go | 117 ++++++++++++++++++++++++++++++++
 bookpipeline/cmd/mkpipeline/main.go     |  74 ++++++++++++++++++++
 2 files changed, 191 insertions(+)
 create mode 100644 bookpipeline/cmd/booktopipeline/main.go
 create mode 100644 bookpipeline/cmd/mkpipeline/main.go

(limited to 'bookpipeline/cmd')

diff --git a/bookpipeline/cmd/booktopipeline/main.go b/bookpipeline/cmd/booktopipeline/main.go
new file mode 100644
index 0000000..40ed35b
--- /dev/null
+++ b/bookpipeline/cmd/booktopipeline/main.go
@@ -0,0 +1,117 @@
+package main
+// TODO: use bookpipeline package to do aws stuff
+// TODO: have logs go somewhere useful, like email
+
+import (
+	"flag"
+	"log"
+	"os"
+	"path/filepath"
+
+	"github.com/aws/aws-sdk-go/aws"
+	"github.com/aws/aws-sdk-go/aws/session"
+	"github.com/aws/aws-sdk-go/service/s3/s3manager"
+	"github.com/aws/aws-sdk-go/service/sqs"
+)
+
+// null writer to enable non-verbose logging to be discarded
+type NullWriter bool
+func (w NullWriter) Write(p []byte) (n int, err error) {
+	return len(p), nil
+}
+var verboselog *log.Logger
+
+type fileWalk chan string
+
+func (f fileWalk) Walk(path string, info os.FileInfo, err error) error {
+	if err != nil {
+		return err
+	}
+	if !info.IsDir() {
+		f <- path
+	}
+	return nil
+}
+
+func main() {
+	verbose := flag.Bool("v", false, "Verbose")
+	flag.Parse()
+
+	if flag.NArg() < 1 {
+		log.Fatal("Usage: booktopipeline [-v] bookdir [bookname]\n\nUploads the book in bookdir to the S3 'inprogress' bucket and adds it to the 'preprocess' SQS queue\nIf bookname is omitted the last part of the bookdir is used\n")
+	}
+
+	bookdir := flag.Arg(0)
+	var bookname string
+	if flag.NArg() > 2 {
+		bookname = flag.Arg(1)
+	} else {
+		bookname = filepath.Base(bookdir)
+	}
+
+	if *verbose {
+		verboselog = log.New(os.Stdout, "", log.LstdFlags)
+	} else {
+		var n NullWriter
+                verboselog = log.New(n, "", log.LstdFlags)
+	}
+
+	verboselog.Println("Setting up AWS session")
+	sess, err := session.NewSession(&aws.Config{
+		Region: aws.String("eu-west-2"),
+	})
+	if err != nil {
+		log.Fatalln("Error: failed to set up aws session:", err)
+	}
+	sqssvc := sqs.New(sess)
+	uploader := s3manager.NewUploader(sess)
+
+	qname := "rescribepreprocess"
+	verboselog.Println("Getting Queue URL for", qname)
+	result, err := sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+		QueueName: aws.String(qname),
+	})
+	if err != nil {
+		log.Fatalln("Error getting queue URL for", qname, ":", err)
+	}
+	qurl := *result.QueueUrl
+
+	// concurrent walking upload based on example at
+	// https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sdk-utilities.html
+	verboselog.Println("Walking", bookdir)
+	walker := make(fileWalk)
+	go func() {
+		err = filepath.Walk(bookdir, walker.Walk)
+		if err != nil {
+			log.Fatalln("Filesystem walk failed:", err)
+		}
+		close(walker)
+	}()
+
+	for path := range walker {
+		verboselog.Println("Uploading", path)
+		name := filepath.Base(path)
+		file, err := os.Open(path)
+		if err != nil {
+			log.Fatalln("Open file", path, "failed:", err)
+		}
+		defer file.Close()
+		_, err = uploader.Upload(&s3manager.UploadInput{
+			Bucket: aws.String("rescribeinprogress"),
+			Key:    aws.String(filepath.Join(bookname, name)),
+			Body:   file,
+		})
+		if err != nil {
+			log.Fatalln("Failed to upload", path, err)
+		}
+	}
+
+	verboselog.Println("Sending message", bookname, "to queue", qurl)
+	_, err = sqssvc.SendMessage(&sqs.SendMessageInput{
+		MessageBody: aws.String(bookname),
+		QueueUrl: &qurl,
+	})
+	if err != nil {
+		log.Fatalln("Error adding book to queue:", err)
+	}
+}
diff --git a/bookpipeline/cmd/mkpipeline/main.go b/bookpipeline/cmd/mkpipeline/main.go
new file mode 100644
index 0000000..b15d160
--- /dev/null
+++ b/bookpipeline/cmd/mkpipeline/main.go
@@ -0,0 +1,74 @@
+package main
+
+// TODO: use the bookpipeline package for aws stuff
+
+import (
+	"log"
+	"os"
+
+	"github.com/aws/aws-sdk-go/aws"
+	"github.com/aws/aws-sdk-go/aws/awserr"
+	"github.com/aws/aws-sdk-go/aws/session"
+	"github.com/aws/aws-sdk-go/service/s3"
+	"github.com/aws/aws-sdk-go/service/sqs"
+)
+
+func main() {
+	if len(os.Args) != 1 {
+		log.Fatal("Usage: mkpipeline\n\nSets up necessary S3 buckets and SQS queues for our AWS pipeline\n")
+	}
+
+	sess, err := session.NewSession(&aws.Config{
+		Region: aws.String("eu-west-2"),
+	})
+	if err != nil {
+		log.Fatalf("Error: failed to set up aws session: %v\n", err)
+	}
+	s3svc := s3.New(sess)
+	sqssvc := sqs.New(sess)
+
+	prefix := "rescribe"
+	buckets := []string{"inprogress", "done"}
+	queues := []string{"preprocess", "ocr", "analyse"}
+
+	for _, bucket := range buckets {
+		bname := prefix + bucket
+		log.Printf("Creating bucket %s\n", bname)
+		_, err = s3svc.CreateBucket(&s3.CreateBucketInput{
+			Bucket: aws.String(bname),
+		})
+		if err != nil {
+			aerr, ok := err.(awserr.Error)
+			if ok && (aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou) {
+				log.Printf("Bucket %s already exists\n", bname)
+			} else {
+				log.Fatalf("Error creating bucket %s: %v\n", bname, err)
+			}
+		}
+	}
+
+	for _, queue := range queues {
+		qname := prefix + queue
+		log.Printf("Creating queue %s\n", qname)
+		_, err = sqssvc.CreateQueue(&sqs.CreateQueueInput{
+			QueueName: aws.String(qname),
+			Attributes: map[string]*string{
+				"VisibilityTimeout": aws.String("120"), // 2 minutes
+				"MessageRetentionPeriod": aws.String("1209600"), // 14 days; max allowed by sqs
+				"ReceiveMessageWaitTimeSeconds": aws.String("20"),
+			},
+		})
+		if err != nil {
+			aerr, ok := err.(awserr.Error)
+			// Note the QueueAlreadyExists code is only emitted if an existing queue
+			// has different attributes than the one that was being created. SQS just
+			// quietly ignores the CreateQueue request if it is identical to an
+			// existing queue.
+			if ok && aerr.Code() == sqs.ErrCodeQueueNameExists {
+				log.Fatalf("Error: Queue %s already exists but has different attributes\n", qname)
+			} else {
+				log.Fatalf("Error creating queue %s: %v\n", qname, err)
+			}
+		}
+	}
+}
-- 
cgit v1.2.1-24-ge1ad