summaryrefslogtreecommitdiff
path: root/aws.go
diff options
context:
space:
mode:
authorNick White <git@njw.name>2020-11-17 16:37:54 +0000
committerNick White <git@njw.name>2020-11-17 16:37:54 +0000
commit2717c5ed21a082a7f24833f3d57b303fd22bd4e5 (patch)
treee2f58418cc222eb46e679dbabfc1ccecbb1e9387 /aws.go
parentf71fd636f151e5cb7eafb2ae6c21c1c188d43fdd (diff)
Add trimqueue and logwholequeue utilities which can help deal with weird queue states
Diffstat (limited to 'aws.go')
-rw-r--r--aws.go58
1 files changed, 58 insertions, 0 deletions
diff --git a/aws.go b/aws.go
index 5ebc79f..6b707fe 100644
--- a/aws.go
+++ b/aws.go
@@ -9,6 +9,7 @@ import (
"fmt"
"log"
"os"
+ "strings"
"time"
"github.com/aws/aws-sdk-go/aws"
@@ -178,6 +179,63 @@ func (a *AwsConn) LogAndPurgeQueue(url string) error {
return nil
}
+// LogQueue prints the body of all messages in a queue to the log
+func (a *AwsConn) LogQueue(url string) error {
+ for {
+ msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
+ MaxNumberOfMessages: aws.Int64(10),
+ VisibilityTimeout: aws.Int64(300),
+ QueueUrl: &url,
+ })
+ if err != nil {
+ return err
+ }
+
+ if len(msgResult.Messages) > 0 {
+ for _, m := range msgResult.Messages {
+ a.Logger.Println(*m.Body)
+ }
+ } else {
+ break
+ }
+ }
+ return nil
+}
+
+// RemovePrefixesFromQueue removes any messages in a queue whose
+// body starts with the specified prefix.
+func (a *AwsConn) RemovePrefixesFromQueue(url string, prefix string) error {
+ for {
+ msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
+ MaxNumberOfMessages: aws.Int64(10),
+ VisibilityTimeout: aws.Int64(300),
+ QueueUrl: &url,
+ })
+ if err != nil {
+ return err
+ }
+
+ if len(msgResult.Messages) > 0 {
+ for _, m := range msgResult.Messages {
+ if !strings.HasPrefix(*m.Body, prefix) {
+ continue
+ }
+ a.Logger.Printf("Removing %s from queue\n", *m.Body)
+ _, err = a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{
+ QueueUrl: &url,
+ ReceiptHandle: m.ReceiptHandle,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ } else {
+ break
+ }
+ }
+ return nil
+}
+
// QueueHeartbeat updates the visibility timeout of a message. This
// ensures that the message remains "in flight", meaning that it
// cannot be seen by other processes, but if this process fails the