From 2717c5ed21a082a7f24833f3d57b303fd22bd4e5 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 17 Nov 2020 16:37:54 +0000 Subject: Add trimqueue and logwholequeue utilities which can help deal with weird queue states --- aws.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) (limited to 'aws.go') diff --git a/aws.go b/aws.go index 5ebc79f..6b707fe 100644 --- a/aws.go +++ b/aws.go @@ -9,6 +9,7 @@ import ( "fmt" "log" "os" + "strings" "time" "github.com/aws/aws-sdk-go/aws" @@ -178,6 +179,63 @@ func (a *AwsConn) LogAndPurgeQueue(url string) error { return nil } +// LogQueue prints the body of all messages in a queue to the log +func (a *AwsConn) LogQueue(url string) error { + for { + msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ + MaxNumberOfMessages: aws.Int64(10), + VisibilityTimeout: aws.Int64(300), + QueueUrl: &url, + }) + if err != nil { + return err + } + + if len(msgResult.Messages) > 0 { + for _, m := range msgResult.Messages { + a.Logger.Println(*m.Body) + } + } else { + break + } + } + return nil +} + +// RemovePrefixesFromQueue removes any messages in a queue whose +// body starts with the specified prefix. +func (a *AwsConn) RemovePrefixesFromQueue(url string, prefix string) error { + for { + msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ + MaxNumberOfMessages: aws.Int64(10), + VisibilityTimeout: aws.Int64(300), + QueueUrl: &url, + }) + if err != nil { + return err + } + + if len(msgResult.Messages) > 0 { + for _, m := range msgResult.Messages { + if !strings.HasPrefix(*m.Body, prefix) { + continue + } + a.Logger.Printf("Removing %s from queue\n", *m.Body) + _, err = a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: &url, + ReceiptHandle: m.ReceiptHandle, + }) + if err != nil { + return err + } + } + } else { + break + } + } + return nil +} + // QueueHeartbeat updates the visibility timeout of a message. This // ensures that the message remains "in flight", meaning that it // cannot be seen by other processes, but if this process fails the -- cgit v1.2.1-24-ge1ad From 9147e57a3a634ad303e8f1e7c456988996d5c75b Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 14 Dec 2020 17:08:14 +0000 Subject: Add rmbook tool --- aws.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) (limited to 'aws.go') diff --git a/aws.go b/aws.go index 6b707fe..40c452d 100644 --- a/aws.go +++ b/aws.go @@ -379,6 +379,23 @@ func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) { return prefixes, err } +// Deletes a list of objects +func (a *AwsConn) DeleteObjects(bucket string, keys []string) error { + objs := []*s3.ObjectIdentifier{} + for _, v := range keys { + o := s3.ObjectIdentifier{Key: aws.String(v)} + objs = append(objs, &o) + } + _, err := a.s3svc.DeleteObjects(&s3.DeleteObjectsInput{ + Bucket: aws.String(bucket), + Delete: &s3.Delete{ + Objects: objs, + Quiet: aws.Bool(true), + }, + }) + return err +} + // CreateBucket creates a new S3 bucket func (a *AwsConn) CreateBucket(name string) error { _, err := a.s3svc.CreateBucket(&s3.CreateBucketInput{ -- cgit v1.2.1-24-ge1ad From 670d5c1b74f2fa4683bfe7e2d9b1baee14db9104 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 26 Jan 2021 10:54:19 +0000 Subject: Stop limiting keys returned from listobjectprefixes' api usage; this speeds up the request markedly --- aws.go | 1 - 1 file changed, 1 deletion(-) (limited to 'aws.go') diff --git a/aws.go b/aws.go index 40c452d..57aadd3 100644 --- a/aws.go +++ b/aws.go @@ -369,7 +369,6 @@ func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) { err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ Bucket: aws.String(bucket), Delimiter: aws.String("/"), - MaxKeys: aws.Int64(1), }, func(page *s3.ListObjectsV2Output, last bool) bool { for _, r := range page.CommonPrefixes { prefixes = append(prefixes, *r.Prefix) -- cgit v1.2.1-24-ge1ad From 86cc5d6c921ac05e0d08f66b205b51e1f5adb938 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 26 Jan 2021 13:50:02 +0000 Subject: Speed up lspipeline by making s3 requests concurrently and only processing single results from ListObjects requests --- aws.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'aws.go') diff --git a/aws.go b/aws.go index 57aadd3..dd74a01 100644 --- a/aws.go +++ b/aws.go @@ -355,11 +355,12 @@ func (a *AwsConn) ListObjectsWithMeta(bucket string, prefix string) ([]ObjMeta, err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ Bucket: aws.String(bucket), Prefix: aws.String(prefix), + MaxKeys: aws.Int64(1), }, func(page *s3.ListObjectsV2Output, last bool) bool { for _, r := range page.Contents { objs = append(objs, ObjMeta{Name: *r.Key, Date: *r.LastModified}) } - return true + return false // only process the first page as that's all we need }) return objs, err } -- cgit v1.2.1-24-ge1ad From 5c3cee66a90ce6ef87e125b3bf011a6903d38083 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 26 Jan 2021 14:56:10 +0000 Subject: Make ListObjectsWithMeta generic again and create a specialised ListObjectWithMeta for single file listing, so we can still be as fast, but do not have a misleading api --- aws.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) (limited to 'aws.go') diff --git a/aws.go b/aws.go index dd74a01..035b08a 100644 --- a/aws.go +++ b/aws.go @@ -355,16 +355,35 @@ func (a *AwsConn) ListObjectsWithMeta(bucket string, prefix string) ([]ObjMeta, err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ Bucket: aws.String(bucket), Prefix: aws.String(prefix), - MaxKeys: aws.Int64(1), }, func(page *s3.ListObjectsV2Output, last bool) bool { for _, r := range page.Contents { objs = append(objs, ObjMeta{Name: *r.Key, Date: *r.LastModified}) } - return false // only process the first page as that's all we need + return true }) return objs, err } +// ListObjectWithMeta lists the name and last modified date of the +// first object with the specified prefix. +func (a *AwsConn) ListObjectWithMeta(bucket string, prefix string) (ObjMeta, error) { + var obj ObjMeta + err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + Prefix: aws.String(prefix), + MaxKeys: aws.Int64(1), + }, func(page *s3.ListObjectsV2Output, last bool) bool { + for _, r := range page.Contents { + obj = ObjMeta{Name: *r.Key, Date: *r.LastModified} + } + return false + }) + if obj.Name == "" && obj.Date.IsZero() && err == nil { + return obj, fmt.Errorf("No object could be found for %s", prefix) + } + return obj, err +} + func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) { var prefixes []string err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ -- cgit v1.2.1-24-ge1ad From 16ea8034794ef030c969d586a7fc945bf4a2873a Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 1 Feb 2021 11:45:27 +0000 Subject: Ensure DeleteObjects can handle over 1000 files to delete; fixes rmbook for large books --- aws.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) (limited to 'aws.go') diff --git a/aws.go b/aws.go index 035b08a..65671fa 100644 --- a/aws.go +++ b/aws.go @@ -401,9 +401,25 @@ func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) { // Deletes a list of objects func (a *AwsConn) DeleteObjects(bucket string, keys []string) error { objs := []*s3.ObjectIdentifier{} - for _, v := range keys { + for i, v := range keys { o := s3.ObjectIdentifier{Key: aws.String(v)} objs = append(objs, &o) + // s3.DeleteObjects can only take up to 1000 keys at a time, + // so if necessary delete those collected so far and empty + // the objs queue + if i % 1000 == 1 { + _, err := a.s3svc.DeleteObjects(&s3.DeleteObjectsInput{ + Bucket: aws.String(bucket), + Delete: &s3.Delete{ + Objects: objs, + Quiet: aws.Bool(true), + }, + }) + if err != nil { + return err + } + objs = []*s3.ObjectIdentifier{} + } } _, err := a.s3svc.DeleteObjects(&s3.DeleteObjectsInput{ Bucket: aws.String(bucket), -- cgit v1.2.1-24-ge1ad