summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick White <git@njw.name>2020-04-07 12:18:38 +0100
committerNick White <git@njw.name>2020-04-07 12:18:38 +0100
commit9fb0842f07320d47509865c689747d2c82379f3d (patch)
treec9f0d382cf57052d2765d7d933e1f4bb5859342d
parentaf63449be98f1964b67981b2aedda0d2ea70fe6d (diff)
Remove unused OCR queue (was superceded by the ocrpage queue some time ago)
-rw-r--r--aws.go41
-rw-r--r--cloudsettings.go1
-rw-r--r--cmd/addtoqueue/main.go3
-rw-r--r--cmd/bookpipeline/main.go31
-rw-r--r--cmd/lspipeline/main.go2
-rw-r--r--cmd/unstickocr/main.go118
6 files changed, 16 insertions, 180 deletions
diff --git a/aws.go b/aws.go
index 5f1befa..2015d9c 100644
--- a/aws.go
+++ b/aws.go
@@ -40,16 +40,14 @@ type AwsConn struct {
Region string
Logger *log.Logger
- // these are used internally
- sess *session.Session
- ec2svc *ec2.EC2
- s3svc *s3.S3
- sqssvc *sqs.SQS
- downloader *s3manager.Downloader
- uploader *s3manager.Uploader
- wipequrl, prequrl, ocrqurl, analysequrl string
- ocrpgqurl string
- wipstorageid string
+ sess *session.Session
+ ec2svc *ec2.EC2
+ s3svc *s3.S3
+ sqssvc *sqs.SQS
+ downloader *s3manager.Downloader
+ uploader *s3manager.Uploader
+ wipequrl, prequrl, ocrpgqurl, analysequrl string
+ wipstorageid string
}
// MinimalInit does the bare minimum to initialise aws services
@@ -105,14 +103,14 @@ func (a *AwsConn) Init() error {
}
a.wipequrl = *result.QueueUrl
- a.Logger.Println("Getting OCR queue URL")
+ a.Logger.Println("Getting OCR Page queue URL")
result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
- QueueName: aws.String(queueOcr),
+ QueueName: aws.String(queueOcrPage),
})
if err != nil {
- return errors.New(fmt.Sprintf("Error getting OCR queue URL: %s", err))
+ return errors.New(fmt.Sprintf("Error getting OCR Page queue URL: %s", err))
}
- a.ocrqurl = *result.QueueUrl
+ a.ocrpgqurl = *result.QueueUrl
a.Logger.Println("Getting analyse queue URL")
result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
@@ -123,15 +121,6 @@ func (a *AwsConn) Init() error {
}
a.analysequrl = *result.QueueUrl
- a.Logger.Println("Getting OCR Page queue URL")
- result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
- QueueName: aws.String(queueOcrPage),
- })
- if err != nil {
- return errors.New(fmt.Sprintf("Error getting OCR Page queue URL: %s", err))
- }
- a.ocrpgqurl = *result.QueueUrl
-
return nil
}
@@ -245,10 +234,6 @@ func (a *AwsConn) WipeQueueId() string {
return a.wipequrl
}
-func (a *AwsConn) OCRQueueId() string {
- return a.ocrqurl
-}
-
func (a *AwsConn) OCRPageQueueId() string {
return a.ocrpgqurl
}
@@ -467,7 +452,7 @@ func (a *AwsConn) Log(v ...interface{}) {
// mkpipeline sets up necessary buckets and queues for the pipeline
func (a *AwsConn) MkPipeline() error {
buckets := []string{storageWip}
- queues := []string{queuePreProc, queueWipeOnly, queueOcr, queueAnalyse, queueOcrPage}
+ queues := []string{queuePreProc, queueWipeOnly, queueAnalyse, queueOcrPage}
for _, bucket := range buckets {
err := a.CreateBucket(bucket)
diff --git a/cloudsettings.go b/cloudsettings.go
index d627342..0cf1777 100644
--- a/cloudsettings.go
+++ b/cloudsettings.go
@@ -19,7 +19,6 @@ const (
const (
queuePreProc = "rescribepreprocess"
queueWipeOnly = "rescribewipeonly"
- queueOcr = "rescribeocr"
queueOcrPage = "rescribeocrpage"
queueAnalyse = "rescribeanalyse"
)
diff --git a/cmd/addtoqueue/main.go b/cmd/addtoqueue/main.go
index 06edac5..c3284e9 100644
--- a/cmd/addtoqueue/main.go
+++ b/cmd/addtoqueue/main.go
@@ -21,7 +21,6 @@ This is handy to work around bugs when things are misbehaving.
Valid queue names:
- preprocess
- wipeonly
-- ocr
- ocrpage
- analyse
`
@@ -38,7 +37,6 @@ type QueuePipeliner interface {
AddToQueue(url string, msg string) error
PreQueueId() string
WipeQueueId() string
- OCRQueueId() string
OCRPageQueueId() string
AnalyseQueueId() string
}
@@ -70,7 +68,6 @@ func main() {
}{
{conn.PreQueueId(), "preprocess"},
{conn.WipeQueueId(), "wipeonly"},
- {conn.OCRQueueId(), "ocr"},
{conn.OCRPageQueueId(), "ocrpage"},
{conn.AnalyseQueueId(), "analyse"},
}
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go
index 8d2ffcc..9059167 100644
--- a/cmd/bookpipeline/main.go
+++ b/cmd/bookpipeline/main.go
@@ -22,10 +22,10 @@ import (
"rescribe.xyz/utils/pkg/hocr"
)
-const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-nop] [-na] [-t training] [-shutdown true/false]
+const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false]
-Watches the preprocess, ocr and analyse queues for book names. When
-one is found this general process is followed:
+Watches the preprocess, wipeonly, ocrpage and analyse queues for messages.
+When one is found this general process is followed:
- The book name is hidden from the queue, and a 'heartbeat' is
started which keeps it hidden (this will time out after 2 minutes
@@ -66,7 +66,6 @@ type Pipeliner interface {
Clouder
PreQueueId() string
WipeQueueId() string
- OCRQueueId() string
OCRPageQueueId() string
AnalyseQueueId() string
WIPStorageId() string
@@ -672,7 +671,6 @@ func main() {
training := flag.String("t", "rescribealphav5", "default tesseract training file to use (without the .traineddata part)")
nopreproc := flag.Bool("np", false, "disable preprocessing")
nowipe := flag.Bool("nw", false, "disable wipeonly")
- noocr := flag.Bool("no", false, "disable ocr")
noocrpg := flag.Bool("nop", false, "disable ocr on individual pages")
noanalyse := flag.Bool("na", false, "disable analysis")
autoshutdown := flag.Bool("shutdown", false, "automatically shut down if no work has been available for 5 minutes")
@@ -693,7 +691,6 @@ func main() {
origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`)
wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`)
- preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`)
ocredPattern := regexp.MustCompile(`.hocr$`)
var conn Pipeliner
@@ -711,7 +708,6 @@ func main() {
var checkPreQueue <-chan time.Time
var checkWipeQueue <-chan time.Time
- var checkOCRQueue <-chan time.Time
var checkOCRPageQueue <-chan time.Time
var checkAnalyseQueue <-chan time.Time
var shutdownIfQuiet *time.Timer
@@ -722,9 +718,6 @@ func main() {
if !*nowipe {
checkWipeQueue = time.After(0)
}
- if !*noocr {
- checkOCRQueue = time.After(0)
- }
if !*noocrpg {
checkOCRPageQueue = time.After(0)
}
@@ -794,24 +787,6 @@ func main() {
if err != nil {
conn.Log("Error during OCR Page process", err)
}
- case <-checkOCRQueue:
- msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatSeconds*2)
- checkOCRQueue = time.After(PauseBetweenChecks)
- if err != nil {
- conn.Log("Error checking OCR queue", err)
- continue
- }
- if msg.Handle == "" {
- conn.Log("No message received on OCR queue, sleeping")
- continue
- }
- stopTimer(shutdownIfQuiet)
- conn.Log("Message received on OCR queue, processing", msg.Body)
- err = processBook(msg, conn, ocr(*training), preprocessedPattern, conn.OCRQueueId(), conn.AnalyseQueueId())
- shutdownIfQuiet.Reset(TimeBeforeShutdown)
- if err != nil {
- conn.Log("Error during OCR process", err)
- }
case <-checkAnalyseQueue:
msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatSeconds*2)
checkAnalyseQueue = time.After(PauseBetweenChecks)
diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go
index 659b034..ab2f8b1 100644
--- a/cmd/lspipeline/main.go
+++ b/cmd/lspipeline/main.go
@@ -30,7 +30,6 @@ type LsPipeliner interface {
Init() error
PreQueueId() string
WipeQueueId() string
- OCRQueueId() string
OCRPageQueueId() string
AnalyseQueueId() string
GetQueueDetails(url string) (string, string, error)
@@ -66,7 +65,6 @@ func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) {
queues := []struct{ name, id string }{
{"preprocess", conn.PreQueueId()},
{"wipeonly", conn.WipeQueueId()},
- {"ocr", conn.OCRQueueId()},
{"ocrpage", conn.OCRPageQueueId()},
{"analyse", conn.AnalyseQueueId()},
}
diff --git a/cmd/unstickocr/main.go b/cmd/unstickocr/main.go
deleted file mode 100644
index 796525b..0000000
--- a/cmd/unstickocr/main.go
+++ /dev/null
@@ -1,118 +0,0 @@
-// Copyright 2019 Nick White.
-// Use of this source code is governed by the GPLv3
-// license that can be found in the LICENSE file.
-
-package main
-
-import (
- "flag"
- "fmt"
- "log"
- "os"
- "time"
-
- "rescribe.xyz/bookpipeline"
-)
-
-const usage = `Usage: unstickocr [-v] bookname
-
-unstickocr deletes a book from the OCR queue and adds it to the
-Analyse queue.
-
-This should be done automatically by the bookpipeline tool once
-the OCR job has completed, but sometimes it isn't, because of a
-nasty bug. Once that bug is squashed, this tool can be deleted.
-`
-
-// null writer to enable non-verbose logging to be discarded
-type NullWriter bool
-
-func (w NullWriter) Write(p []byte) (n int, err error) {
- return len(p), nil
-}
-
-type UnstickPipeliner interface {
- Init() error
- CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
- AddToQueue(url string, msg string) error
- DelFromQueue(url string, handle string) error
- OCRQueueId() string
- AnalyseQueueId() string
-}
-
-func main() {
- verbose := flag.Bool("v", false, "verbose")
- flag.Usage = func() {
- fmt.Fprintf(flag.CommandLine.Output(), usage)
- flag.PrintDefaults()
- }
- flag.Parse()
-
- if flag.NArg() != 1 {
- flag.Usage()
- return
- }
-
- var verboselog *log.Logger
- if *verbose {
- verboselog = log.New(os.Stdout, "", 0)
- } else {
- var n NullWriter
- verboselog = log.New(n, "", 0)
- }
-
- var conn UnstickPipeliner
- conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
-
- err := conn.Init()
- if err != nil {
- log.Fatalln("Error setting up cloud connection:", err)
- }
-
- book := flag.Arg(0)
- done := false
-
- for a := 0; a < 5; a++ {
- for i := 0; i < 10; i++ {
- verboselog.Println("Checking OCR queue for", book)
- msg, err := conn.CheckQueue(conn.OCRQueueId(), 10)
- if err != nil {
- log.Fatalln("Error checking OCR queue:", err)
- continue
- }
- if msg.Handle == "" {
- verboselog.Println("No message received on OCR queue")
- continue
- }
- if msg.Body != book {
- verboselog.Println("Message received on OCR queue is not the one we're",
- "looking for, so will try again - found", msg.Body)
- continue
- }
- err = conn.DelFromQueue(conn.OCRQueueId(), msg.Handle)
- if err != nil {
- log.Fatalln("Error deleting message from OCR queue:", err)
- }
- err = conn.AddToQueue(conn.AnalyseQueueId(), book)
- if err != nil {
- log.Fatalln("Error adding message to Analyse queue:", err)
- }
- done = true
- break
- }
- if done == true {
- break
- }
- log.Println("No message found yet, sleeping for 30 seconds to try again")
- time.Sleep(30 * time.Minute)
- }
-
- if done == true {
- fmt.Println("Succeeded moving message from OCR queue to Analyse queue.")
- } else {
- log.Fatalln("Failed to find message", book, "on OCR queue; is it still being processed?",
- "It can only be discovered and processed by this tool when it is available.",
- "Try shutting down any instance that is using it, waiting a few minutes,",
- "and rerunning this tool.")
- }
-}