diff options
author | Nick White <git@njw.name> | 2020-11-17 16:37:54 +0000 |
---|---|---|
committer | Nick White <git@njw.name> | 2020-11-17 16:37:54 +0000 |
commit | 2717c5ed21a082a7f24833f3d57b303fd22bd4e5 (patch) | |
tree | e2f58418cc222eb46e679dbabfc1ccecbb1e9387 /aws.go | |
parent | f71fd636f151e5cb7eafb2ae6c21c1c188d43fdd (diff) |
Add trimqueue and logwholequeue utilities which can help deal with weird queue states
Diffstat (limited to 'aws.go')
-rw-r--r-- | aws.go | 58 |
1 files changed, 58 insertions, 0 deletions
@@ -9,6 +9,7 @@ import ( "fmt" "log" "os" + "strings" "time" "github.com/aws/aws-sdk-go/aws" @@ -178,6 +179,63 @@ func (a *AwsConn) LogAndPurgeQueue(url string) error { return nil } +// LogQueue prints the body of all messages in a queue to the log +func (a *AwsConn) LogQueue(url string) error { + for { + msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ + MaxNumberOfMessages: aws.Int64(10), + VisibilityTimeout: aws.Int64(300), + QueueUrl: &url, + }) + if err != nil { + return err + } + + if len(msgResult.Messages) > 0 { + for _, m := range msgResult.Messages { + a.Logger.Println(*m.Body) + } + } else { + break + } + } + return nil +} + +// RemovePrefixesFromQueue removes any messages in a queue whose +// body starts with the specified prefix. +func (a *AwsConn) RemovePrefixesFromQueue(url string, prefix string) error { + for { + msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ + MaxNumberOfMessages: aws.Int64(10), + VisibilityTimeout: aws.Int64(300), + QueueUrl: &url, + }) + if err != nil { + return err + } + + if len(msgResult.Messages) > 0 { + for _, m := range msgResult.Messages { + if !strings.HasPrefix(*m.Body, prefix) { + continue + } + a.Logger.Printf("Removing %s from queue\n", *m.Body) + _, err = a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: &url, + ReceiptHandle: m.ReceiptHandle, + }) + if err != nil { + return err + } + } + } else { + break + } + } + return nil +} + // QueueHeartbeat updates the visibility timeout of a message. This // ensures that the message remains "in flight", meaning that it // cannot be seen by other processes, but if this process fails the |