summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick White <git@njw.name>2020-03-23 14:16:20 +0000
committerNick White <git@njw.name>2020-03-23 14:16:20 +0000
commite05d7ed7fa25e193d0354f763f5760696ac13d5e (patch)
tree4f3f6094e71c8a6eeffca64807878379de58e2ff
parent43e1e0b378101d113ab5fe98a0efb67a0615ddc7 (diff)
Add Log() function to Pipeliner interface
This simplifies things nicely from using conn.GetLogger().Println() to conn.Log()
-rw-r--r--aws.go6
-rw-r--r--cmd/bookpipeline/main.go27
2 files changed, 20 insertions, 13 deletions
diff --git a/aws.go b/aws.go
index 359c0be..87654c0 100644
--- a/aws.go
+++ b/aws.go
@@ -457,3 +457,9 @@ func (a *AwsConn) StartInstances(n int) error {
})
return err
}
+
+// Log records an item in the with the Logger. Arguments are handled
+// as with fmt.Println.
+func (a *AwsConn) Log(v ...interface{}) {
+ a.Logger.Println(v)
+}
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go
index b9705e0..f4b1ad4 100644
--- a/cmd/bookpipeline/main.go
+++ b/cmd/bookpipeline/main.go
@@ -71,6 +71,7 @@ type Pipeliner interface {
AnalyseQueueId() string
WIPStorageId() string
GetLogger() *log.Logger
+ Log(v ...interface{})
}
func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) {
@@ -119,7 +120,7 @@ func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, b
errc <- err
return
}
- conn.GetLogger().Println("Adding", key, training, "to queue", toQueue)
+ logger.Println("Adding", key, training, "to queue", toQueue)
err = conn.AddToQueue(toQueue, key + " " + training)
if err != nil {
for range c {
@@ -389,7 +390,7 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri
m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatTime*2)
if err != nil {
// This is for better debugging of the heartbeat issue
- conn.GetLogger().Println("Error with heartbeat", err)
+ conn.Log("Error with heartbeat", err)
os.Exit(1)
// TODO: would be better to ensure this error stops any running
// processes, as they will ultimately fail in the case of
@@ -400,7 +401,7 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri
return
}
if m.Id != "" {
- conn.GetLogger().Println("Replaced message handle as visibilitytimeout limit was reached")
+ conn.Log("Replaced message handle as visibilitytimeout limit was reached")
currentmsg = m
// TODO: maybe handle communicating new msg more gracefully than this
for range msgc {
@@ -489,7 +490,7 @@ func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, ch
}
if allOCRed(bookname, conn) && toQueue != "" {
- conn.GetLogger().Println("Sending", bookname, "to queue", toQueue)
+ conn.Log("Sending", bookname, "to queue", toQueue)
err = conn.AddToQueue(toQueue, bookname)
if err != nil {
t.Stop()
@@ -505,13 +506,13 @@ func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, ch
case m, ok := <-msgc:
if ok {
msg = m
- conn.GetLogger().Println("Using new message handle to delete message from queue")
+ conn.Log("Using new message handle to delete message from queue")
}
default:
- conn.GetLogger().Println("Using original message handle to delete message from queue")
+ conn.Log("Using original message handle to delete message from queue")
}
- conn.GetLogger().Println("Deleting original message from queue", fromQueue)
+ conn.Log("Deleting original message from queue", fromQueue)
err = conn.DelFromQueue(fromQueue, msg.Handle)
if err != nil {
_ = os.RemoveAll(d)
@@ -560,7 +561,7 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
go up(upc, done, conn, bookname, errc, conn.GetLogger())
}
- conn.GetLogger().Println("Getting list of objects to download")
+ conn.Log("Getting list of objects to download")
objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
if err != nil {
t.Stop()
@@ -570,7 +571,7 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
var todl []string
for _, n := range objs {
if !match.MatchString(n) {
- conn.GetLogger().Println("Skipping item that doesn't match target", n)
+ conn.Log("Skipping item that doesn't match target", n)
continue
}
todl = append(todl, n)
@@ -590,7 +591,7 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
}
if toQueue != "" && toQueue != conn.OCRPageQueueId() {
- conn.GetLogger().Println("Sending", bookname, "to queue", toQueue)
+ conn.Log("Sending", bookname, "to queue", toQueue)
err = conn.AddToQueue(toQueue, bookname)
if err != nil {
t.Stop()
@@ -606,13 +607,13 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string
case m, ok := <-msgc:
if ok {
msg = m
- conn.GetLogger().Println("Using new message handle to delete message from queue")
+ conn.Log("Using new message handle to delete message from queue")
}
default:
- conn.GetLogger().Println("Using original message handle to delete message from queue")
+ conn.Log("Using original message handle to delete message from queue")
}
- conn.GetLogger().Println("Deleting original message from queue", fromQueue)
+ conn.Log("Deleting original message from queue", fromQueue)
err = conn.DelFromQueue(fromQueue, msg.Handle)
if err != nil {
_ = os.RemoveAll(d)