diff options
Diffstat (limited to 'internal/pipeline')
| -rw-r--r-- | internal/pipeline/pipeline.go | 9 | ||||
| -rw-r--r-- | internal/pipeline/put.go | 5 | 
2 files changed, 10 insertions, 4 deletions
| diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 639bba1..40ed02c 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -63,6 +63,7 @@ type Queuer interface {  	DelFromQueue(url string, handle string) error  	Log(v ...interface{})  	OCRPageQueueId() string +	PreNoWipeQueueId() string  	PreQueueId() string  	QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)  	WipeQueueId() string @@ -72,6 +73,7 @@ type UploadQueuer interface {  	Log(v ...interface{})  	Upload(bucket string, key string, path string) error  	WIPStorageId() string +	PreNoWipeQueueId() string  	PreQueueId() string  	WipeQueueId() string  	OCRPageQueueId() string @@ -93,6 +95,7 @@ type Pipeliner interface {  	ListObjects(bucket string, prefix string) ([]string, error)  	Log(v ...interface{})  	OCRPageQueueId() string +	PreNoWipeQueueId() string  	PreQueueId() string  	QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)  	Upload(bucket string, key string, path string) error @@ -239,7 +242,7 @@ func upAndQueue(ctx context.Context, c chan string, done chan bool, toQueue stri  	done <- true  } -func Preprocess(thresholds []float64) func(context.Context, chan string, chan string, chan error, *log.Logger) { +func Preprocess(thresholds []float64, nowipe bool) func(context.Context, chan string, chan string, chan error, *log.Logger) {  	return func(ctx context.Context, pre chan string, up chan string, errc chan error, logger *log.Logger) {  		for path := range pre {  			select { @@ -251,7 +254,7 @@ func Preprocess(thresholds []float64) func(context.Context, chan string, chan st  			default:  			}  			logger.Println("Preprocessing", path) -			done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, true, 5, 30, 120, 30) +			done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, !nowipe, 5, 30, 120, 30)  			if err != nil {  				for range pre {  				} // consume the rest of the receiving channel so it isn't blocked @@ -822,7 +825,7 @@ func ProcessBook(ctx context.Context, msg bookpipeline.Qmsg, conn Pipeliner, pro  		// complete, and will fill the ocrpage queue with parts which succeeded  		// on each run, so in that case it's better to delete the message from  		// the queue and notify us. -		if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { +		if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() || fromQueue == conn.PreNoWipeQueueId() {  			conn.Log("Deleting message from queue due to a bad error", fromQueue)  			err2 := conn.DelFromQueue(fromQueue, msg.Handle)  			if err2 != nil { diff --git a/internal/pipeline/put.go b/internal/pipeline/put.go index aba9e0e..47729b5 100644 --- a/internal/pipeline/put.go +++ b/internal/pipeline/put.go @@ -83,7 +83,10 @@ func CheckImages(ctx context.Context, dir string) error {  // DetectQueueType detects which queue to use based on the preponderance  // of files of a particular extension in a directory -func DetectQueueType(dir string, conn Queuer) string { +func DetectQueueType(dir string, conn Queuer, nowipe bool) string { +	if nowipe { +		return conn.PreNoWipeQueueId() +	}  	pngdirs, _ := filepath.Glob(dir + "/*.png")  	jpgdirs, _ := filepath.Glob(dir + "/*.jpg")  	pngcount := len(pngdirs) | 
