From 48d894897894a010db1661259fa375e1be36e659 Mon Sep 17 00:00:00 2001 From: Nick White Date: Wed, 28 Aug 2019 18:12:20 +0100 Subject: Move booktopipeline and mkpipeline into bookpipeline/cmd --- bookpipeline/cmd/booktopipeline/main.go | 117 ++++++++++++++++++++++++++++++++ bookpipeline/cmd/mkpipeline/main.go | 74 ++++++++++++++++++++ booktopipeline/main.go | 116 ------------------------------- mkpipeline/main.go | 72 -------------------- 4 files changed, 191 insertions(+), 188 deletions(-) create mode 100644 bookpipeline/cmd/booktopipeline/main.go create mode 100644 bookpipeline/cmd/mkpipeline/main.go delete mode 100644 booktopipeline/main.go delete mode 100644 mkpipeline/main.go diff --git a/bookpipeline/cmd/booktopipeline/main.go b/bookpipeline/cmd/booktopipeline/main.go new file mode 100644 index 0000000..40ed35b --- /dev/null +++ b/bookpipeline/cmd/booktopipeline/main.go @@ -0,0 +1,117 @@ +package main +// TODO: use bookpipeline package to do aws stuff +// TODO: have logs go somewhere useful, like email + +import ( + "flag" + "log" + "os" + "path/filepath" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/aws/aws-sdk-go/service/sqs" +) + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool +func (w NullWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} +var verboselog *log.Logger + +type fileWalk chan string + +func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + f <- path + } + return nil +} + +func main() { + verbose := flag.Bool("v", false, "Verbose") + flag.Parse() + + if flag.NArg() < 1 { + log.Fatal("Usage: booktopipeline [-v] bookdir [bookname]\n\nUploads the book in bookdir to the S3 'inprogress' bucket and adds it to the 'preprocess' SQS queue\nIf bookname is omitted the last part of the bookdir is used\n") + } + + bookdir := flag.Arg(0) + var bookname string + if flag.NArg() > 2 { + bookname = flag.Arg(1) + } else { + bookname = filepath.Base(bookdir) + } + + if *verbose { + verboselog = log.New(os.Stdout, "", log.LstdFlags) + } else { + var n NullWriter + verboselog = log.New(n, "", log.LstdFlags) + } + + verboselog.Println("Setting up AWS session") + sess, err := session.NewSession(&aws.Config{ + Region: aws.String("eu-west-2"), + }) + if err != nil { + log.Fatalln("Error: failed to set up aws session:", err) + } + sqssvc := sqs.New(sess) + uploader := s3manager.NewUploader(sess) + + qname := "rescribepreprocess" + verboselog.Println("Getting Queue URL for", qname) + result, err := sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(qname), + }) + if err != nil { + log.Fatalln("Error getting queue URL for", qname, ":", err) + } + qurl := *result.QueueUrl + + // concurrent walking upload based on example at + // https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sdk-utilities.html + verboselog.Println("Walking", bookdir) + walker := make(fileWalk) + go func() { + err = filepath.Walk(bookdir, walker.Walk) + if err != nil { + log.Fatalln("Filesystem walk failed:", err) + } + close(walker) + }() + + for path := range walker { + verboselog.Println("Uploading", path) + name := filepath.Base(path) + file, err := os.Open(path) + if err != nil { + log.Fatalln("Open file", path, "failed:", err) + } + defer file.Close() + _, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String("rescribeinprogress"), + Key: aws.String(filepath.Join(bookname, name)), + Body: file, + }) + if err != nil { + log.Fatalln("Failed to upload", path, err) + } + } + + verboselog.Println("Sending message", bookname, "to queue", qurl) + _, err = sqssvc.SendMessage(&sqs.SendMessageInput{ + MessageBody: aws.String(bookname), + QueueUrl: &qurl, + }) + if err != nil { + log.Fatalln("Error adding book to queue:", err) + } +} diff --git a/bookpipeline/cmd/mkpipeline/main.go b/bookpipeline/cmd/mkpipeline/main.go new file mode 100644 index 0000000..b15d160 --- /dev/null +++ b/bookpipeline/cmd/mkpipeline/main.go @@ -0,0 +1,74 @@ +package main + +// TODO: use the bookpipeline package for aws stuff + +import ( + "log" + "os" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/sqs" +) + +func main() { + if len(os.Args) != 1 { + log.Fatal("Usage: mkpipeline\n\nSets up necessary S3 buckets and SQS queues for our AWS pipeline\n") + } + + sess, err := session.NewSession(&aws.Config{ + Region: aws.String("eu-west-2"), + }) + if err != nil { + log.Fatalf("Error: failed to set up aws session: %v\n", err) + } + s3svc := s3.New(sess) + sqssvc := sqs.New(sess) + + prefix := "rescribe" + buckets := []string{"inprogress", "done"} + queues := []string{"preprocess", "ocr", "analyse"} + + for _, bucket := range buckets { + bname := prefix + bucket + log.Printf("Creating bucket %s\n", bname) + _, err = s3svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bname), + }) + if err != nil { + aerr, ok := err.(awserr.Error) + if ok && (aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou) { + log.Printf("Bucket %s already exists\n", bname) + } else { + log.Fatalf("Error creating bucket %s: %v\n", bname, err) + } + } + } + + for _, queue := range queues { + qname := prefix + queue + log.Printf("Creating queue %s\n", qname) + _, err = sqssvc.CreateQueue(&sqs.CreateQueueInput{ + QueueName: aws.String(qname), + Attributes: map[string]*string{ + "VisibilityTimeout": aws.String("120"), // 2 minutes + "MessageRetentionPeriod": aws.String("1209600"), // 14 days; max allowed by sqs + "ReceiveMessageWaitTimeSeconds": aws.String("20"), + }, + }) + if err != nil { + aerr, ok := err.(awserr.Error) + // Note the QueueAlreadyExists code is only emitted if an existing queue + // has different attributes than the one that was being created. SQS just + // quietly ignores the CreateQueue request if it is identical to an + // existing queue. + if ok && aerr.Code() == sqs.ErrCodeQueueNameExists { + log.Fatalf("Error: Queue %s already exists but has different attributes\n", qname) + } else { + log.Fatalf("Error creating queue %s: %v\n", qname, err) + } + } + } +} diff --git a/booktopipeline/main.go b/booktopipeline/main.go deleted file mode 100644 index 7c4e004..0000000 --- a/booktopipeline/main.go +++ /dev/null @@ -1,116 +0,0 @@ -package main -// TODO: have logs go somewhere useful, like email - -import ( - "flag" - "log" - "os" - "path/filepath" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3/s3manager" - "github.com/aws/aws-sdk-go/service/sqs" -) - -// null writer to enable non-verbose logging to be discarded -type NullWriter bool -func (w NullWriter) Write(p []byte) (n int, err error) { - return len(p), nil -} -var verboselog *log.Logger - -type fileWalk chan string - -func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if !info.IsDir() { - f <- path - } - return nil -} - -func main() { - verbose := flag.Bool("v", false, "Verbose") - flag.Parse() - - if flag.NArg() < 1 { - log.Fatal("Usage: booktopipeline [-v] bookdir [bookname]\n\nUploads the book in bookdir to the S3 'inprogress' bucket and adds it to the 'preprocess' SQS queue\nIf bookname is omitted the last part of the bookdir is used\n") - } - - bookdir := flag.Arg(0) - var bookname string - if flag.NArg() > 2 { - bookname = flag.Arg(1) - } else { - bookname = filepath.Base(bookdir) - } - - if *verbose { - verboselog = log.New(os.Stdout, "", log.LstdFlags) - } else { - var n NullWriter - verboselog = log.New(n, "", log.LstdFlags) - } - - verboselog.Println("Setting up AWS session") - sess, err := session.NewSession(&aws.Config{ - Region: aws.String("eu-west-2"), - }) - if err != nil { - log.Fatalln("Error: failed to set up aws session:", err) - } - sqssvc := sqs.New(sess) - uploader := s3manager.NewUploader(sess) - - qname := "rescribepreprocess" - verboselog.Println("Getting Queue URL for", qname) - result, err := sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String(qname), - }) - if err != nil { - log.Fatalln("Error getting queue URL for", qname, ":", err) - } - qurl := *result.QueueUrl - - // concurrent walking upload based on example at - // https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sdk-utilities.html - verboselog.Println("Walking", bookdir) - walker := make(fileWalk) - go func() { - err = filepath.Walk(bookdir, walker.Walk) - if err != nil { - log.Fatalln("Filesystem walk failed:", err) - } - close(walker) - }() - - for path := range walker { - verboselog.Println("Uploading", path) - name := filepath.Base(path) - file, err := os.Open(path) - if err != nil { - log.Fatalln("Open file", path, "failed:", err) - } - defer file.Close() - _, err = uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String("rescribeinprogress"), - Key: aws.String(filepath.Join(bookname, name)), - Body: file, - }) - if err != nil { - log.Fatalln("Failed to upload", path, err) - } - } - - verboselog.Println("Sending message", bookname, "to queue", qurl) - _, err = sqssvc.SendMessage(&sqs.SendMessageInput{ - MessageBody: aws.String(bookname), - QueueUrl: &qurl, - }) - if err != nil { - log.Fatalln("Error adding book to queue:", err) - } -} diff --git a/mkpipeline/main.go b/mkpipeline/main.go deleted file mode 100644 index 572ef76..0000000 --- a/mkpipeline/main.go +++ /dev/null @@ -1,72 +0,0 @@ -package main - -import ( - "log" - "os" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/sqs" -) - -func main() { - if len(os.Args) != 1 { - log.Fatal("Usage: mkpipeline\n\nSets up necessary S3 buckets and SQS queues for our AWS pipeline\n") - } - - sess, err := session.NewSession(&aws.Config{ - Region: aws.String("eu-west-2"), - }) - if err != nil { - log.Fatalf("Error: failed to set up aws session: %v\n", err) - } - s3svc := s3.New(sess) - sqssvc := sqs.New(sess) - - prefix := "rescribe" - buckets := []string{"inprogress", "done"} - queues := []string{"preprocess", "ocr", "analyse"} - - for _, bucket := range buckets { - bname := prefix + bucket - log.Printf("Creating bucket %s\n", bname) - _, err = s3svc.CreateBucket(&s3.CreateBucketInput{ - Bucket: aws.String(bname), - }) - if err != nil { - aerr, ok := err.(awserr.Error) - if ok && (aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou) { - log.Printf("Bucket %s already exists\n", bname) - } else { - log.Fatalf("Error creating bucket %s: %v\n", bname, err) - } - } - } - - for _, queue := range queues { - qname := prefix + queue - log.Printf("Creating queue %s\n", qname) - _, err = sqssvc.CreateQueue(&sqs.CreateQueueInput{ - QueueName: aws.String(qname), - Attributes: map[string]*string{ - "VisibilityTimeout": aws.String("120"), // 2 minutes - "MessageRetentionPeriod": aws.String("1209600"), // 14 days; max allowed by sqs - "ReceiveMessageWaitTimeSeconds": aws.String("20"), - }, - }) - if err != nil { - aerr, ok := err.(awserr.Error) - // Note the QueueAlreadyExists code is only emitted if an existing queue - // has different attributes than the one that was being created. SQS just - // quietly ignores the CreateQueue request if it is identical to an - // existing queue. - if ok && aerr.Code() == sqs.ErrCodeQueueNameExists { - log.Fatalf("Error: Queue %s already exists but has different attributes\n", qname) - } else { - log.Fatalf("Error creating queue %s: %v\n", qname, err) - } - } - } -} -- cgit v1.2.1-24-ge1ad