summaryrefslogtreecommitdiff
path: root/aws.go
diff options
context:
space:
mode:
Diffstat (limited to 'aws.go')
-rw-r--r--aws.go112
1 files changed, 111 insertions, 1 deletions
diff --git a/aws.go b/aws.go
index 5ebc79f..65671fa 100644
--- a/aws.go
+++ b/aws.go
@@ -9,6 +9,7 @@ import (
"fmt"
"log"
"os"
+ "strings"
"time"
"github.com/aws/aws-sdk-go/aws"
@@ -178,6 +179,63 @@ func (a *AwsConn) LogAndPurgeQueue(url string) error {
return nil
}
+// LogQueue prints the body of all messages in a queue to the log
+func (a *AwsConn) LogQueue(url string) error {
+ for {
+ msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
+ MaxNumberOfMessages: aws.Int64(10),
+ VisibilityTimeout: aws.Int64(300),
+ QueueUrl: &url,
+ })
+ if err != nil {
+ return err
+ }
+
+ if len(msgResult.Messages) > 0 {
+ for _, m := range msgResult.Messages {
+ a.Logger.Println(*m.Body)
+ }
+ } else {
+ break
+ }
+ }
+ return nil
+}
+
+// RemovePrefixesFromQueue removes any messages in a queue whose
+// body starts with the specified prefix.
+func (a *AwsConn) RemovePrefixesFromQueue(url string, prefix string) error {
+ for {
+ msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
+ MaxNumberOfMessages: aws.Int64(10),
+ VisibilityTimeout: aws.Int64(300),
+ QueueUrl: &url,
+ })
+ if err != nil {
+ return err
+ }
+
+ if len(msgResult.Messages) > 0 {
+ for _, m := range msgResult.Messages {
+ if !strings.HasPrefix(*m.Body, prefix) {
+ continue
+ }
+ a.Logger.Printf("Removing %s from queue\n", *m.Body)
+ _, err = a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{
+ QueueUrl: &url,
+ ReceiptHandle: m.ReceiptHandle,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ } else {
+ break
+ }
+ }
+ return nil
+}
+
// QueueHeartbeat updates the visibility timeout of a message. This
// ensures that the message remains "in flight", meaning that it
// cannot be seen by other processes, but if this process fails the
@@ -306,12 +364,31 @@ func (a *AwsConn) ListObjectsWithMeta(bucket string, prefix string) ([]ObjMeta,
return objs, err
}
+// ListObjectWithMeta lists the name and last modified date of the
+// first object with the specified prefix.
+func (a *AwsConn) ListObjectWithMeta(bucket string, prefix string) (ObjMeta, error) {
+ var obj ObjMeta
+ err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{
+ Bucket: aws.String(bucket),
+ Prefix: aws.String(prefix),
+ MaxKeys: aws.Int64(1),
+ }, func(page *s3.ListObjectsV2Output, last bool) bool {
+ for _, r := range page.Contents {
+ obj = ObjMeta{Name: *r.Key, Date: *r.LastModified}
+ }
+ return false
+ })
+ if obj.Name == "" && obj.Date.IsZero() && err == nil {
+ return obj, fmt.Errorf("No object could be found for %s", prefix)
+ }
+ return obj, err
+}
+
func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) {
var prefixes []string
err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Delimiter: aws.String("/"),
- MaxKeys: aws.Int64(1),
}, func(page *s3.ListObjectsV2Output, last bool) bool {
for _, r := range page.CommonPrefixes {
prefixes = append(prefixes, *r.Prefix)
@@ -321,6 +398,39 @@ func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error) {
return prefixes, err
}
+// Deletes a list of objects
+func (a *AwsConn) DeleteObjects(bucket string, keys []string) error {
+ objs := []*s3.ObjectIdentifier{}
+ for i, v := range keys {
+ o := s3.ObjectIdentifier{Key: aws.String(v)}
+ objs = append(objs, &o)
+ // s3.DeleteObjects can only take up to 1000 keys at a time,
+ // so if necessary delete those collected so far and empty
+ // the objs queue
+ if i % 1000 == 1 {
+ _, err := a.s3svc.DeleteObjects(&s3.DeleteObjectsInput{
+ Bucket: aws.String(bucket),
+ Delete: &s3.Delete{
+ Objects: objs,
+ Quiet: aws.Bool(true),
+ },
+ })
+ if err != nil {
+ return err
+ }
+ objs = []*s3.ObjectIdentifier{}
+ }
+ }
+ _, err := a.s3svc.DeleteObjects(&s3.DeleteObjectsInput{
+ Bucket: aws.String(bucket),
+ Delete: &s3.Delete{
+ Objects: objs,
+ Quiet: aws.Bool(true),
+ },
+ })
+ return err
+}
+
// CreateBucket creates a new S3 bucket
func (a *AwsConn) CreateBucket(name string) error {
_, err := a.s3svc.CreateBucket(&s3.CreateBucketInput{