summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/pipeline/pipeline.go9
-rw-r--r--internal/pipeline/put.go5
2 files changed, 10 insertions, 4 deletions
diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go
index 639bba1..40ed02c 100644
--- a/internal/pipeline/pipeline.go
+++ b/internal/pipeline/pipeline.go
@@ -63,6 +63,7 @@ type Queuer interface {
DelFromQueue(url string, handle string) error
Log(v ...interface{})
OCRPageQueueId() string
+ PreNoWipeQueueId() string
PreQueueId() string
QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)
WipeQueueId() string
@@ -72,6 +73,7 @@ type UploadQueuer interface {
Log(v ...interface{})
Upload(bucket string, key string, path string) error
WIPStorageId() string
+ PreNoWipeQueueId() string
PreQueueId() string
WipeQueueId() string
OCRPageQueueId() string
@@ -93,6 +95,7 @@ type Pipeliner interface {
ListObjects(bucket string, prefix string) ([]string, error)
Log(v ...interface{})
OCRPageQueueId() string
+ PreNoWipeQueueId() string
PreQueueId() string
QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)
Upload(bucket string, key string, path string) error
@@ -239,7 +242,7 @@ func upAndQueue(ctx context.Context, c chan string, done chan bool, toQueue stri
done <- true
}
-func Preprocess(thresholds []float64) func(context.Context, chan string, chan string, chan error, *log.Logger) {
+func Preprocess(thresholds []float64, nowipe bool) func(context.Context, chan string, chan string, chan error, *log.Logger) {
return func(ctx context.Context, pre chan string, up chan string, errc chan error, logger *log.Logger) {
for path := range pre {
select {
@@ -251,7 +254,7 @@ func Preprocess(thresholds []float64) func(context.Context, chan string, chan st
default:
}
logger.Println("Preprocessing", path)
- done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, true, 5, 30, 120, 30)
+ done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, !nowipe, 5, 30, 120, 30)
if err != nil {
for range pre {
} // consume the rest of the receiving channel so it isn't blocked
@@ -822,7 +825,7 @@ func ProcessBook(ctx context.Context, msg bookpipeline.Qmsg, conn Pipeliner, pro
// complete, and will fill the ocrpage queue with parts which succeeded
// on each run, so in that case it's better to delete the message from
// the queue and notify us.
- if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() {
+ if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() || fromQueue == conn.PreNoWipeQueueId() {
conn.Log("Deleting message from queue due to a bad error", fromQueue)
err2 := conn.DelFromQueue(fromQueue, msg.Handle)
if err2 != nil {
diff --git a/internal/pipeline/put.go b/internal/pipeline/put.go
index aba9e0e..47729b5 100644
--- a/internal/pipeline/put.go
+++ b/internal/pipeline/put.go
@@ -83,7 +83,10 @@ func CheckImages(ctx context.Context, dir string) error {
// DetectQueueType detects which queue to use based on the preponderance
// of files of a particular extension in a directory
-func DetectQueueType(dir string, conn Queuer) string {
+func DetectQueueType(dir string, conn Queuer, nowipe bool) string {
+ if nowipe {
+ return conn.PreNoWipeQueueId()
+ }
pngdirs, _ := filepath.Glob(dir + "/*.png")
jpgdirs, _ := filepath.Glob(dir + "/*.jpg")
pngcount := len(pngdirs)