summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--aws.go58
-rw-r--r--cmd/logwholequeue/main.go85
-rw-r--r--cmd/trimqueue/main.go84
3 files changed, 227 insertions, 0 deletions
diff --git a/aws.go b/aws.go
index 5ebc79f..6b707fe 100644
--- a/aws.go
+++ b/aws.go
@@ -9,6 +9,7 @@ import (
"fmt"
"log"
"os"
+ "strings"
"time"
"github.com/aws/aws-sdk-go/aws"
@@ -178,6 +179,63 @@ func (a *AwsConn) LogAndPurgeQueue(url string) error {
return nil
}
+// LogQueue prints the body of all messages in a queue to the log
+func (a *AwsConn) LogQueue(url string) error {
+ for {
+ msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
+ MaxNumberOfMessages: aws.Int64(10),
+ VisibilityTimeout: aws.Int64(300),
+ QueueUrl: &url,
+ })
+ if err != nil {
+ return err
+ }
+
+ if len(msgResult.Messages) > 0 {
+ for _, m := range msgResult.Messages {
+ a.Logger.Println(*m.Body)
+ }
+ } else {
+ break
+ }
+ }
+ return nil
+}
+
+// RemovePrefixesFromQueue removes any messages in a queue whose
+// body starts with the specified prefix.
+func (a *AwsConn) RemovePrefixesFromQueue(url string, prefix string) error {
+ for {
+ msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{
+ MaxNumberOfMessages: aws.Int64(10),
+ VisibilityTimeout: aws.Int64(300),
+ QueueUrl: &url,
+ })
+ if err != nil {
+ return err
+ }
+
+ if len(msgResult.Messages) > 0 {
+ for _, m := range msgResult.Messages {
+ if !strings.HasPrefix(*m.Body, prefix) {
+ continue
+ }
+ a.Logger.Printf("Removing %s from queue\n", *m.Body)
+ _, err = a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{
+ QueueUrl: &url,
+ ReceiptHandle: m.ReceiptHandle,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ } else {
+ break
+ }
+ }
+ return nil
+}
+
// QueueHeartbeat updates the visibility timeout of a message. This
// ensures that the message remains "in flight", meaning that it
// cannot be seen by other processes, but if this process fails the
diff --git a/cmd/logwholequeue/main.go b/cmd/logwholequeue/main.go
new file mode 100644
index 0000000..71e8927
--- /dev/null
+++ b/cmd/logwholequeue/main.go
@@ -0,0 +1,85 @@
+// Copyright 2019 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+// logwholequeue gets all messages in a queue. This can be useful
+// for debugging queue issues.
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+
+ "rescribe.xyz/bookpipeline"
+)
+
+const usage = `Usage: logwholequeue qname
+
+logwholequeue gets all messages in a queue.
+
+This can be useful for debugging queue issues.
+
+Valid queue names:
+- preprocess
+- wipeonly
+- ocrpage
+- analyse
+`
+
+type QueuePipeliner interface {
+ Init() error
+ LogQueue(url string) error
+ PreQueueId() string
+ WipeQueueId() string
+ OCRPageQueueId() string
+ AnalyseQueueId() string
+}
+
+func main() {
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(), usage)
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+
+ if flag.NArg() != 1 {
+ flag.Usage()
+ return
+ }
+
+ var conn QueuePipeliner
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2"}
+
+ err := conn.Init()
+ if err != nil {
+ log.Fatalln("Error setting up cloud connection:", err)
+ }
+
+ qdetails := []struct {
+ id, name string
+ }{
+ {conn.PreQueueId(), "preprocess"},
+ {conn.WipeQueueId(), "wipeonly"},
+ {conn.OCRPageQueueId(), "ocrpage"},
+ {conn.AnalyseQueueId(), "analyse"},
+ }
+
+ qname := flag.Arg(0)
+
+ var qid string
+ for i, n := range qdetails {
+ if n.name == qname {
+ qid = qdetails[i].id
+ break
+ }
+ }
+ if qid == "" {
+ log.Fatalln("Error, no queue named", qname)
+ }
+
+ err = conn.LogQueue(qid)
+ if err != nil {
+ log.Fatalln("Error getting queue", qname, ":", err)
+ }
+}
diff --git a/cmd/trimqueue/main.go b/cmd/trimqueue/main.go
new file mode 100644
index 0000000..cf65c4d
--- /dev/null
+++ b/cmd/trimqueue/main.go
@@ -0,0 +1,84 @@
+// Copyright 2019 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+// trimqueue deletes any messages in a queue that match a specified
+// prefix.
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+
+ "rescribe.xyz/bookpipeline"
+)
+
+const usage = `Usage: trimprefix qname prefix
+
+trimqueue deletes any messages in a queue that match a specified
+prefix.
+
+Valid queue names:
+- preprocess
+- wipeonly
+- ocrpage
+- analyse
+`
+
+type QueuePipeliner interface {
+ Init() error
+ RemovePrefixesFromQueue(url string, prefix string) error
+ PreQueueId() string
+ WipeQueueId() string
+ OCRPageQueueId() string
+ AnalyseQueueId() string
+}
+
+func main() {
+ flag.Usage = func() {
+ fmt.Fprintf(flag.CommandLine.Output(), usage)
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+
+ if flag.NArg() != 2 {
+ flag.Usage()
+ return
+ }
+
+ var conn QueuePipeliner
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2"}
+
+ err := conn.Init()
+ if err != nil {
+ log.Fatalln("Error setting up cloud connection:", err)
+ }
+
+ qdetails := []struct {
+ id, name string
+ }{
+ {conn.PreQueueId(), "preprocess"},
+ {conn.WipeQueueId(), "wipeonly"},
+ {conn.OCRPageQueueId(), "ocrpage"},
+ {conn.AnalyseQueueId(), "analyse"},
+ }
+
+ qname := flag.Arg(0)
+
+ var qid string
+ for i, n := range qdetails {
+ if n.name == qname {
+ qid = qdetails[i].id
+ break
+ }
+ }
+ if qid == "" {
+ log.Fatalln("Error, no queue named", qname)
+ }
+
+ err = conn.RemovePrefixesFromQueue(qid, flag.Arg(1))
+ if err != nil {
+ log.Fatalln("Error removing prefixes from queue", qname, ":", err)
+ }
+}