diff options
Diffstat (limited to 'bookpipeline/cmd')
| -rw-r--r-- | bookpipeline/cmd/bookpipeline/main.go | 26 | 
1 files changed, 22 insertions, 4 deletions
| diff --git a/bookpipeline/cmd/bookpipeline/main.go b/bookpipeline/cmd/bookpipeline/main.go index 7ffacf8..1c2592b 100644 --- a/bookpipeline/cmd/bookpipeline/main.go +++ b/bookpipeline/cmd/bookpipeline/main.go @@ -54,7 +54,7 @@ type Clouder interface {  	CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)  	AddToQueue(url string, msg string) error  	DelFromQueue(url string, handle string) error -	QueueHeartbeat(msgHandle string, qurl string, duration int64) error +	QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)  }  type Pipeliner interface { @@ -232,19 +232,28 @@ func analyse(toanalyse chan string, up chan string, errc chan error, logger *log  	close(up)  } -func heartbeat(conn Pipeliner, t *time.Ticker, msg string, queue string, errc chan error) { +func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { +	currentmsg := msg  	for range t.C { -		err := conn.QueueHeartbeat(msg, queue, HeartbeatTime*2) +		m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatTime*2)  		if err != nil {  			errc <- err  			t.Stop()  			return  		} +		if m.Id != "" { +			currentmsg = m +			// TODO: maybe handle communicating new msg more gracefully than this +			for range msgc { +			} // throw away any old msgc +			msgc <- m +		}  	}  }  func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {  	dl := make(chan string) +	msgc := make(chan bookpipeline.Qmsg)  	processc := make(chan string)  	upc := make(chan string)  	done := make(chan bool) @@ -259,7 +268,7 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string  	}  	t := time.NewTicker(HeartbeatTime * time.Second) -	go heartbeat(conn, t, msg.Handle, fromQueue, errc) +	go heartbeat(conn, t, msg, fromQueue, msgc, errc)  	// these functions will do their jobs when their channels have data  	go download(dl, processc, conn, d, errc, conn.GetLogger()) @@ -307,6 +316,15 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string  	t.Stop() +	// check whether we're using a newer msg handle +	select { +	case m, ok := <-msgc : +		if ok { +			msg = m +		} +	default: +	} +  	conn.GetLogger().Println("Deleting original message from queue", fromQueue)  	err = conn.DelFromQueue(fromQueue, msg.Handle)  	if err != nil { | 
