diff options
-rw-r--r-- | cmd/bookpipeline/main.go | 46 |
1 files changed, 31 insertions, 15 deletions
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 7a7a277..5a6606d 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -27,7 +27,7 @@ import ( "rescribe.xyz/utils/pkg/hocr" ) -const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] +const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] [-autostop secs] Watches the preprocess, wipeonly, ocrpage and analyse queues for messages. When one is found this general process is followed: @@ -48,7 +48,6 @@ the contents: {smtpserver} {port} {username} {password} {from} {to} ` const PauseBetweenChecks = 3 * time.Minute -const TimeBeforeShutdown = 5 * time.Minute const LogSaveTime = 1 * time.Minute const HeartbeatSeconds = 60 @@ -719,6 +718,12 @@ func stopTimer(t *time.Timer) { } } +func resetTimer(t *time.Timer, d time.Duration) { + if d > 0 { + t.Reset(d) + } +} + // TODO: rather than relying on journald, would be nicer to save the logs // ourselves maybe, so that we weren't relying on a particular systemd // setup. this can be done by having the conn.Log also append line @@ -770,7 +775,8 @@ func main() { nowipe := flag.Bool("nw", false, "disable wipeonly") noocrpg := flag.Bool("nop", false, "disable ocr on individual pages") noanalyse := flag.Bool("na", false, "disable analysis") - autoshutdown := flag.Bool("shutdown", false, "automatically shut down if no work has been available for 5 minutes") + autostop := flag.Int64("autostop", 300, "automatically stop process if no work has been available for this number of seconds (to disable autostop set to 0)") + autoshutdown := flag.Bool("shutdown", false, "automatically shut down host computer if there has been no work to do for the duration set with -autostop") conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") flag.Usage = func() { @@ -823,7 +829,7 @@ func main() { var checkWipeQueue <-chan time.Time var checkOCRPageQueue <-chan time.Time var checkAnalyseQueue <-chan time.Time - var shutdownIfQuiet *time.Timer + var stopIfQuiet *time.Timer var savelognow *time.Ticker if !*nopreproc { checkPreQueue = time.After(0) @@ -837,7 +843,12 @@ func main() { if !*noanalyse { checkAnalyseQueue = time.After(0) } - shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown) + var quietTime = time.Duration(*autostop) * time.Second + stopIfQuiet = time.NewTimer(quietTime) + if quietTime == 0 { + stopIfQuiet.Stop() + } + savelognow = time.NewTicker(LogSaveTime) if *conntype == "local" { savelognow.Stop() @@ -857,9 +868,9 @@ func main() { continue } conn.Log("Message received on preprocess queue, processing", msg.Body) - stopTimer(shutdownIfQuiet) + stopTimer(stopIfQuiet) err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) - shutdownIfQuiet.Reset(TimeBeforeShutdown) + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during preprocess", err) } @@ -874,10 +885,10 @@ func main() { conn.Log("No message received on wipeonly queue, sleeping") continue } - stopTimer(shutdownIfQuiet) + stopTimer(stopIfQuiet) conn.Log("Message received on wipeonly queue, processing", msg.Body) err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) - shutdownIfQuiet.Reset(TimeBeforeShutdown) + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during wipe", err) } @@ -894,10 +905,10 @@ func main() { // Have OCRPageQueue checked immediately after completion, as chances are high that // there will be more pages that should be done without delay checkOCRPageQueue = time.After(0) - stopTimer(shutdownIfQuiet) + stopTimer(stopIfQuiet) conn.Log("Message received on OCR Page queue, processing", msg.Body) err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) - shutdownIfQuiet.Reset(TimeBeforeShutdown) + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during OCR Page process", err) } @@ -912,10 +923,10 @@ func main() { conn.Log("No message received on analyse queue, sleeping") continue } - stopTimer(shutdownIfQuiet) + stopTimer(stopIfQuiet) conn.Log("Message received on analyse queue, processing", msg.Body) err = processBook(msg, conn, analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") - shutdownIfQuiet.Reset(TimeBeforeShutdown) + resetTimer(stopIfQuiet, quietTime) if err != nil { conn.Log("Error during analysis", err) } @@ -925,10 +936,15 @@ func main() { if err != nil { conn.Log("Error saving logs", err) } - case <-shutdownIfQuiet.C: - if !*autoshutdown { + case <-stopIfQuiet.C: + if quietTime == 0 { continue } + if !*autoshutdown { + conn.Log("Stopping pipeline") + _ = savelogs(conn, starttime, hostname) + return + } conn.Log("Shutting down") _ = savelogs(conn, starttime, hostname) cmd := exec.Command("sudo", "systemctl", "poweroff") |