diff options
author | Nick White <git@njw.name> | 2019-09-11 15:41:23 +0100 |
---|---|---|
committer | Nick White <git@njw.name> | 2019-09-11 15:41:23 +0100 |
commit | 7d627f29e76225c99e5ab062a59cfddfbe8fa43c (patch) | |
tree | a81db1a129c3405d3a2f2d9cf3af13684bb37faf /bookpipeline/cmd | |
parent | cde53a94fd0eeb65209e549d3e13486dd2681b7e (diff) |
Work around the SQS limit of 12 hours of visibility timeout
This is done by checking for the error that is emitted in such a
case, and if it's found trying several times to find the message
back in the queue, and returning the message with an updated
handle back to the caller to use in the future.
Diffstat (limited to 'bookpipeline/cmd')
-rw-r--r-- | bookpipeline/cmd/bookpipeline/main.go | 26 |
1 files changed, 22 insertions, 4 deletions
diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go index 7ffacf8..1c2592b 100644 --- a/bookpipeline/cmd/bookpipeline/main.go +++ b/bookpipeline/cmd/bookpipeline/main.go @@ -54,7 +54,7 @@ type Clouder interface { CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) AddToQueue(url string, msg string) error DelFromQueue(url string, handle string) error - QueueHeartbeat(msgHandle string, qurl string, duration int64) error + QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) } type Pipeliner interface { @@ -232,19 +232,28 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log close(up) } -func heartbeat(conn Pipeliner, t *time.Ticker, msg string, queue string, errc chan error) { +func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { + currentmsg := msg for range t.C { - err := conn.QueueHeartbeat(msg, queue, HeartbeatTime*2) + m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatTime*2) if err != nil { errc <- err t.Stop() return } + if m.Id != "" { + currentmsg = m + // TODO: maybe handle communicating new msg more gracefully than this + for range msgc { + } // throw away any old msgc + msgc <- m + } } } func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { dl := make(chan string) + msgc := make(chan bookpipeline.Qmsg) processc := make(chan string) upc := make(chan string) done := make(chan bool) @@ -259,7 +268,7 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string } t := time.NewTicker(HeartbeatTime * time.Second) - go heartbeat(conn, t, msg.Handle, fromQueue, errc) + go heartbeat(conn, t, msg, fromQueue, msgc, errc) // these functions will do their jobs when their channels have data go download(dl, processc, conn, d, errc, conn.GetLogger()) @@ -307,6 +316,15 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string t.Stop() + // check whether we're using a newer msg handle + select { + case m, ok := <-msgc : + if ok { + msg = m + } + default: + } + conn.GetLogger().Println("Deleting original message from queue", fromQueue) err = conn.DelFromQueue(fromQueue, msg.Handle) if err != nil { |