diff options
Diffstat (limited to 'bookpipeline/cmd/bookpipeline/main.go')
-rw-r--r-- | bookpipeline/cmd/bookpipeline/main.go | 26 |
1 files changed, 22 insertions, 4 deletions
diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go index 7ffacf8..1c2592b 100644 --- a/bookpipeline/cmd/bookpipeline/main.go +++ b/bookpipeline/cmd/bookpipeline/main.go @@ -54,7 +54,7 @@ type Clouder interface { CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) AddToQueue(url string, msg string) error DelFromQueue(url string, handle string) error - QueueHeartbeat(msgHandle string, qurl string, duration int64) error + QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) } type Pipeliner interface { @@ -232,19 +232,28 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log close(up) } -func heartbeat(conn Pipeliner, t *time.Ticker, msg string, queue string, errc chan error) { +func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { + currentmsg := msg for range t.C { - err := conn.QueueHeartbeat(msg, queue, HeartbeatTime*2) + m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatTime*2) if err != nil { errc <- err t.Stop() return } + if m.Id != "" { + currentmsg = m + // TODO: maybe handle communicating new msg more gracefully than this + for range msgc { + } // throw away any old msgc + msgc <- m + } } } func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { dl := make(chan string) + msgc := make(chan bookpipeline.Qmsg) processc := make(chan string) upc := make(chan string) done := make(chan bool) @@ -259,7 +268,7 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string } t := time.NewTicker(HeartbeatTime * time.Second) - go heartbeat(conn, t, msg.Handle, fromQueue, errc) + go heartbeat(conn, t, msg, fromQueue, msgc, errc) // these functions will do their jobs when their channels have data go download(dl, processc, conn, d, errc, conn.GetLogger()) @@ -307,6 +316,15 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string t.Stop() + // check whether we're using a newer msg handle + select { + case m, ok := <-msgc : + if ok { + msg = m + } + default: + } + conn.GetLogger().Println("Deleting original message from queue", fromQueue) err = conn.DelFromQueue(fromQueue, msg.Handle) if err != nil { |