diff options
| -rw-r--r-- | aws.go | 58 | ||||
| -rw-r--r-- | cmd/logwholequeue/main.go | 85 | ||||
| -rw-r--r-- | cmd/trimqueue/main.go | 84 | 
3 files changed, 227 insertions, 0 deletions
@@ -9,6 +9,7 @@ import (  	"fmt"  	"log"  	"os" +	"strings"  	"time"  	"github.com/aws/aws-sdk-go/aws" @@ -178,6 +179,63 @@ func (a *AwsConn) LogAndPurgeQueue(url string) error {  	return nil  } +// LogQueue prints the body of all messages in a queue to the log +func (a *AwsConn) LogQueue(url string) error { +	for { +		msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ +			MaxNumberOfMessages: aws.Int64(10), +			VisibilityTimeout:   aws.Int64(300), +			QueueUrl:            &url, +		}) +		if err != nil { +			return err +		} + +		if len(msgResult.Messages) > 0 { +			for _, m := range msgResult.Messages { +				a.Logger.Println(*m.Body) +			} +		} else { +			break +		} +	} +	return nil +} + +// RemovePrefixesFromQueue removes any messages in a queue whose +// body starts with the specified prefix. +func (a *AwsConn) RemovePrefixesFromQueue(url string, prefix string) error { +	for { +		msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ +			MaxNumberOfMessages: aws.Int64(10), +			VisibilityTimeout:   aws.Int64(300), +			QueueUrl:            &url, +		}) +		if err != nil { +			return err +		} + +		if len(msgResult.Messages) > 0 { +			for _, m := range msgResult.Messages { +				if !strings.HasPrefix(*m.Body, prefix) { +					continue +				} +				a.Logger.Printf("Removing %s from queue\n", *m.Body) +				_, err = a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ +					QueueUrl:      &url, +					ReceiptHandle: m.ReceiptHandle, +				}) +				if err != nil { +					return err +				} +			} +		} else { +			break +		} +	} +	return nil +} +  // QueueHeartbeat updates the visibility timeout of a message. This  // ensures that the message remains "in flight", meaning that it  // cannot be seen by other processes, but if this process fails the diff --git a/cmd/logwholequeue/main.go b/cmd/logwholequeue/main.go new file mode 100644 index 0000000..71e8927 --- /dev/null +++ b/cmd/logwholequeue/main.go @@ -0,0 +1,85 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// logwholequeue gets all messages in a queue. This can be useful +// for debugging queue issues. +package main + +import ( +	"flag" +	"fmt" +	"log" + +	"rescribe.xyz/bookpipeline" +) + +const usage = `Usage: logwholequeue qname + +logwholequeue gets all messages in a queue. + +This can be useful for debugging queue issues. + +Valid queue names: +- preprocess +- wipeonly +- ocrpage +- analyse +` + +type QueuePipeliner interface { +	Init() error +	LogQueue(url string) error +	PreQueueId() string +	WipeQueueId() string +	OCRPageQueueId() string +	AnalyseQueueId() string +} + +func main() { +	flag.Usage = func() { +		fmt.Fprintf(flag.CommandLine.Output(), usage) +		flag.PrintDefaults() +	} +	flag.Parse() + +	if flag.NArg() != 1 { +		flag.Usage() +		return +	} + +	var conn QueuePipeliner +	conn = &bookpipeline.AwsConn{Region: "eu-west-2"} + +	err := conn.Init() +	if err != nil { +		log.Fatalln("Error setting up cloud connection:", err) +	} + +	qdetails := []struct { +		id, name string +	}{ +		{conn.PreQueueId(), "preprocess"}, +		{conn.WipeQueueId(), "wipeonly"}, +		{conn.OCRPageQueueId(), "ocrpage"}, +		{conn.AnalyseQueueId(), "analyse"}, +	} + +	qname := flag.Arg(0) + +	var qid string +	for i, n := range qdetails { +		if n.name == qname { +			qid = qdetails[i].id +			break +		} +	} +	if qid == "" { +		log.Fatalln("Error, no queue named", qname) +	} + +	err = conn.LogQueue(qid) +	if err != nil { +		log.Fatalln("Error getting queue", qname, ":", err) +	} +} diff --git a/cmd/trimqueue/main.go b/cmd/trimqueue/main.go new file mode 100644 index 0000000..cf65c4d --- /dev/null +++ b/cmd/trimqueue/main.go @@ -0,0 +1,84 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// trimqueue deletes any messages in a queue that match a specified +// prefix. +package main + +import ( +	"flag" +	"fmt" +	"log" + +	"rescribe.xyz/bookpipeline" +) + +const usage = `Usage: trimprefix qname prefix + +trimqueue deletes any messages in a queue that match a specified +prefix. + +Valid queue names: +- preprocess +- wipeonly +- ocrpage +- analyse +` + +type QueuePipeliner interface { +	Init() error +	RemovePrefixesFromQueue(url string, prefix string) error +	PreQueueId() string +	WipeQueueId() string +	OCRPageQueueId() string +	AnalyseQueueId() string +} + +func main() { +	flag.Usage = func() { +		fmt.Fprintf(flag.CommandLine.Output(), usage) +		flag.PrintDefaults() +	} +	flag.Parse() + +	if flag.NArg() != 2 { +		flag.Usage() +		return +	} + +	var conn QueuePipeliner +	conn = &bookpipeline.AwsConn{Region: "eu-west-2"} + +	err := conn.Init() +	if err != nil { +		log.Fatalln("Error setting up cloud connection:", err) +	} + +	qdetails := []struct { +		id, name string +	}{ +		{conn.PreQueueId(), "preprocess"}, +		{conn.WipeQueueId(), "wipeonly"}, +		{conn.OCRPageQueueId(), "ocrpage"}, +		{conn.AnalyseQueueId(), "analyse"}, +	} + +	qname := flag.Arg(0) + +	var qid string +	for i, n := range qdetails { +		if n.name == qname { +			qid = qdetails[i].id +			break +		} +	} +	if qid == "" { +		log.Fatalln("Error, no queue named", qname) +	} + +	err = conn.RemovePrefixesFromQueue(qid, flag.Arg(1)) +	if err != nil { +		log.Fatalln("Error removing prefixes from queue", qname, ":", err) +	} +}  | 
