diff options
Diffstat (limited to 'internal')
-rw-r--r-- | internal/pipeline/pipeline.go | 9 | ||||
-rw-r--r-- | internal/pipeline/put.go | 5 |
2 files changed, 10 insertions, 4 deletions
diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 639bba1..40ed02c 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -63,6 +63,7 @@ type Queuer interface { DelFromQueue(url string, handle string) error Log(v ...interface{}) OCRPageQueueId() string + PreNoWipeQueueId() string PreQueueId() string QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) WipeQueueId() string @@ -72,6 +73,7 @@ type UploadQueuer interface { Log(v ...interface{}) Upload(bucket string, key string, path string) error WIPStorageId() string + PreNoWipeQueueId() string PreQueueId() string WipeQueueId() string OCRPageQueueId() string @@ -93,6 +95,7 @@ type Pipeliner interface { ListObjects(bucket string, prefix string) ([]string, error) Log(v ...interface{}) OCRPageQueueId() string + PreNoWipeQueueId() string PreQueueId() string QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) Upload(bucket string, key string, path string) error @@ -239,7 +242,7 @@ func upAndQueue(ctx context.Context, c chan string, done chan bool, toQueue stri done <- true } -func Preprocess(thresholds []float64) func(context.Context, chan string, chan string, chan error, *log.Logger) { +func Preprocess(thresholds []float64, nowipe bool) func(context.Context, chan string, chan string, chan error, *log.Logger) { return func(ctx context.Context, pre chan string, up chan string, errc chan error, logger *log.Logger) { for path := range pre { select { @@ -251,7 +254,7 @@ func Preprocess(thresholds []float64) func(context.Context, chan string, chan st default: } logger.Println("Preprocessing", path) - done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, true, 5, 30, 120, 30) + done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, !nowipe, 5, 30, 120, 30) if err != nil { for range pre { } // consume the rest of the receiving channel so it isn't blocked @@ -822,7 +825,7 @@ func ProcessBook(ctx context.Context, msg bookpipeline.Qmsg, conn Pipeliner, pro // complete, and will fill the ocrpage queue with parts which succeeded // on each run, so in that case it's better to delete the message from // the queue and notify us. - if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { + if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() || fromQueue == conn.PreNoWipeQueueId() { conn.Log("Deleting message from queue due to a bad error", fromQueue) err2 := conn.DelFromQueue(fromQueue, msg.Handle) if err2 != nil { diff --git a/internal/pipeline/put.go b/internal/pipeline/put.go index aba9e0e..47729b5 100644 --- a/internal/pipeline/put.go +++ b/internal/pipeline/put.go @@ -83,7 +83,10 @@ func CheckImages(ctx context.Context, dir string) error { // DetectQueueType detects which queue to use based on the preponderance // of files of a particular extension in a directory -func DetectQueueType(dir string, conn Queuer) string { +func DetectQueueType(dir string, conn Queuer, nowipe bool) string { + if nowipe { + return conn.PreNoWipeQueueId() + } pngdirs, _ := filepath.Glob(dir + "/*.png") jpgdirs, _ := filepath.Glob(dir + "/*.jpg") pngcount := len(pngdirs) |