summaryrefslogtreecommitdiff
path: root/bookpipeline/cmd/bookpipeline/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'bookpipeline/cmd/bookpipeline/main.go')
-rw-r--r--bookpipeline/cmd/bookpipeline/main.go26
1 files changed, 22 insertions, 4 deletions
diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go
index 7ffacf8..1c2592b 100644
--- a/bookpipeline/cmd/bookpipeline/main.go
+++ b/bookpipeline/cmd/bookpipeline/main.go
@@ -54,7 +54,7 @@ type Clouder interface {
CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
AddToQueue(url string, msg string) error
DelFromQueue(url string, handle string) error
- QueueHeartbeat(msgHandle string, qurl string, duration int64) error
+ QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)
}
type Pipeliner interface {
@@ -232,19 +232,28 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log
close(up)
}
-func heartbeat(conn Pipeliner, t *time.Ticker, msg string, queue string, errc chan error) {
+func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) {
+ currentmsg := msg
for range t.C {
- err := conn.QueueHeartbeat(msg, queue, HeartbeatTime*2)
+ m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatTime*2)
if err != nil {
errc <- err
t.Stop()
return
}
+ if m.Id != "" {
+ currentmsg = m
+ // TODO: maybe handle communicating new msg more gracefully than this
+ for range msgc {
+ } // throw away any old msgc
+ msgc <- m
+ }
}
}
func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {
dl := make(chan string)
+ msgc := make(chan bookpipeline.Qmsg)
processc := make(chan string)
upc := make(chan string)
done := make(chan bool)
@@ -259,7 +268,7 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
}
t := time.NewTicker(HeartbeatTime * time.Second)
- go heartbeat(conn, t, msg.Handle, fromQueue, errc)
+ go heartbeat(conn, t, msg, fromQueue, msgc, errc)
// these functions will do their jobs when their channels have data
go download(dl, processc, conn, d, errc, conn.GetLogger())
@@ -307,6 +316,15 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
t.Stop()
+ // check whether we're using a newer msg handle
+ select {
+ case m, ok := <-msgc :
+ if ok {
+ msg = m
+ }
+ default:
+ }
+
conn.GetLogger().Println("Deleting original message from queue", fromQueue)
err = conn.DelFromQueue(fromQueue, msg.Handle)
if err != nil {