summaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
Diffstat (limited to 'cmd')
-rw-r--r--cmd/bookpipeline/main.go46
1 files changed, 31 insertions, 15 deletions
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go
index 7a7a277..5a6606d 100644
--- a/cmd/bookpipeline/main.go
+++ b/cmd/bookpipeline/main.go
@@ -27,7 +27,7 @@ import (
"rescribe.xyz/utils/pkg/hocr"
)
-const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false]
+const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] [-autostop secs]
Watches the preprocess, wipeonly, ocrpage and analyse queues for messages.
When one is found this general process is followed:
@@ -48,7 +48,6 @@ the contents: {smtpserver} {port} {username} {password} {from} {to}
`
const PauseBetweenChecks = 3 * time.Minute
-const TimeBeforeShutdown = 5 * time.Minute
const LogSaveTime = 1 * time.Minute
const HeartbeatSeconds = 60
@@ -719,6 +718,12 @@ func stopTimer(t *time.Timer) {
}
}
+func resetTimer(t *time.Timer, d time.Duration) {
+ if d > 0 {
+ t.Reset(d)
+ }
+}
+
// TODO: rather than relying on journald, would be nicer to save the logs
// ourselves maybe, so that we weren't relying on a particular systemd
// setup. this can be done by having the conn.Log also append line
@@ -770,7 +775,8 @@ func main() {
nowipe := flag.Bool("nw", false, "disable wipeonly")
noocrpg := flag.Bool("nop", false, "disable ocr on individual pages")
noanalyse := flag.Bool("na", false, "disable analysis")
- autoshutdown := flag.Bool("shutdown", false, "automatically shut down if no work has been available for 5 minutes")
+ autostop := flag.Int64("autostop", 300, "automatically stop process if no work has been available for this number of seconds (to disable autostop set to 0)")
+ autoshutdown := flag.Bool("shutdown", false, "automatically shut down host computer if there has been no work to do for the duration set with -autostop")
conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")
flag.Usage = func() {
@@ -823,7 +829,7 @@ func main() {
var checkWipeQueue <-chan time.Time
var checkOCRPageQueue <-chan time.Time
var checkAnalyseQueue <-chan time.Time
- var shutdownIfQuiet *time.Timer
+ var stopIfQuiet *time.Timer
var savelognow *time.Ticker
if !*nopreproc {
checkPreQueue = time.After(0)
@@ -837,7 +843,12 @@ func main() {
if !*noanalyse {
checkAnalyseQueue = time.After(0)
}
- shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown)
+ var quietTime = time.Duration(*autostop) * time.Second
+ stopIfQuiet = time.NewTimer(quietTime)
+ if quietTime == 0 {
+ stopIfQuiet.Stop()
+ }
+
savelognow = time.NewTicker(LogSaveTime)
if *conntype == "local" {
savelognow.Stop()
@@ -857,9 +868,9 @@ func main() {
continue
}
conn.Log("Message received on preprocess queue, processing", msg.Body)
- stopTimer(shutdownIfQuiet)
+ stopTimer(stopIfQuiet)
err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId())
- shutdownIfQuiet.Reset(TimeBeforeShutdown)
+ resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during preprocess", err)
}
@@ -874,10 +885,10 @@ func main() {
conn.Log("No message received on wipeonly queue, sleeping")
continue
}
- stopTimer(shutdownIfQuiet)
+ stopTimer(stopIfQuiet)
conn.Log("Message received on wipeonly queue, processing", msg.Body)
err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())
- shutdownIfQuiet.Reset(TimeBeforeShutdown)
+ resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during wipe", err)
}
@@ -894,10 +905,10 @@ func main() {
// Have OCRPageQueue checked immediately after completion, as chances are high that
// there will be more pages that should be done without delay
checkOCRPageQueue = time.After(0)
- stopTimer(shutdownIfQuiet)
+ stopTimer(stopIfQuiet)
conn.Log("Message received on OCR Page queue, processing", msg.Body)
err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId())
- shutdownIfQuiet.Reset(TimeBeforeShutdown)
+ resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during OCR Page process", err)
}
@@ -912,10 +923,10 @@ func main() {
conn.Log("No message received on analyse queue, sleeping")
continue
}
- stopTimer(shutdownIfQuiet)
+ stopTimer(stopIfQuiet)
conn.Log("Message received on analyse queue, processing", msg.Body)
err = processBook(msg, conn, analyse(conn), ocredPattern, conn.AnalyseQueueId(), "")
- shutdownIfQuiet.Reset(TimeBeforeShutdown)
+ resetTimer(stopIfQuiet, quietTime)
if err != nil {
conn.Log("Error during analysis", err)
}
@@ -925,10 +936,15 @@ func main() {
if err != nil {
conn.Log("Error saving logs", err)
}
- case <-shutdownIfQuiet.C:
- if !*autoshutdown {
+ case <-stopIfQuiet.C:
+ if quietTime == 0 {
continue
}
+ if !*autoshutdown {
+ conn.Log("Stopping pipeline")
+ _ = savelogs(conn, starttime, hostname)
+ return
+ }
conn.Log("Shutting down")
_ = savelogs(conn, starttime, hostname)
cmd := exec.Command("sudo", "systemctl", "poweroff")