From 60af66d9ba77a61e61ef71d02c8e30198c433c87 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 19 May 2020 15:23:55 +0100 Subject: Add getandpurgequeue debugging tool --- aws.go | 29 +++++++++++++++ cmd/getandpurgequeue/main.go | 85 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+) create mode 100644 cmd/getandpurgequeue/main.go diff --git a/aws.go b/aws.go index 59f1a7e..da89146 100644 --- a/aws.go +++ b/aws.go @@ -149,6 +149,35 @@ func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error) { } } +func (a *AwsConn) LogAndPurgeQueue(url string) error { + for { + msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ + MaxNumberOfMessages: aws.Int64(10), + VisibilityTimeout: aws.Int64(300), + QueueUrl: &url, + }) + if err != nil { + return err + } + + if len(msgResult.Messages) > 0 { + for _, m := range msgResult.Messages { + a.Logger.Println(*m.Body) + _, err = a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: &url, + ReceiptHandle: m.ReceiptHandle, + }) + if err != nil { + return err + } + } + } else { + break + } + } + return nil +} + // QueueHeartbeat updates the visibility timeout of a message. This // ensures that the message remains "in flight", meaning that it // cannot be seen by other processes, but if this process fails the diff --git a/cmd/getandpurgequeue/main.go b/cmd/getandpurgequeue/main.go new file mode 100644 index 0000000..33aef60 --- /dev/null +++ b/cmd/getandpurgequeue/main.go @@ -0,0 +1,85 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// getandpurgequeue gets and deletes all messages from a queue. This can +// be useful for debugging queue issues. +package main + +import ( + "flag" + "fmt" + "log" + + "rescribe.xyz/bookpipeline" +) + +const usage = `Usage: getandpurgequeue qname + +getandpurgequeue gets and deletes all messages from a queue. + +This can be useful for debugging queue issues. + +Valid queue names: +- preprocess +- wipeonly +- ocrpage +- analyse +` + +type QueuePipeliner interface { + Init() error + LogAndPurgeQueue(url string) error + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string +} + +func main() { + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), usage) + flag.PrintDefaults() + } + flag.Parse() + + if flag.NArg() != 1 { + flag.Usage() + return + } + + var conn QueuePipeliner + conn = &bookpipeline.AwsConn{Region: "eu-west-2"} + + err := conn.Init() + if err != nil { + log.Fatalln("Error setting up cloud connection:", err) + } + + qdetails := []struct { + id, name string + }{ + {conn.PreQueueId(), "preprocess"}, + {conn.WipeQueueId(), "wipeonly"}, + {conn.OCRPageQueueId(), "ocrpage"}, + {conn.AnalyseQueueId(), "analyse"}, + } + + qname := flag.Arg(0) + + var qid string + for i, n := range qdetails { + if n.name == qname { + qid = qdetails[i].id + break + } + } + if qid == "" { + log.Fatalln("Error, no queue named", qname) + } + + err = conn.LogAndPurgeQueue(qid) + if err != nil { + log.Fatalln("Error getting and purging queue", qname, ":", err) + } +} -- cgit v1.2.1-24-ge1ad