summaryrefslogtreecommitdiff
path: root/internal/pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'internal/pipeline')
-rw-r--r--internal/pipeline/pipeline.go30
1 files changed, 16 insertions, 14 deletions
diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go
index b1c3cb9..cce5c19 100644
--- a/internal/pipeline/pipeline.go
+++ b/internal/pipeline/pipeline.go
@@ -146,22 +146,24 @@ func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, b
done <- true
}
-func Preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) {
- for path := range pre {
- logger.Println("Preprocessing", path)
- done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30)
- if err != nil {
- for range pre {
- } // consume the rest of the receiving channel so it isn't blocked
- errc <- err
- return
- }
- _ = os.Remove(path)
- for _, p := range done {
- up <- p
+func Preprocess(thresholds []float64) func(chan string, chan string, chan error, *log.Logger) {
+ return func(pre chan string, up chan string, errc chan error, logger *log.Logger) {
+ for path := range pre {
+ logger.Println("Preprocessing", path)
+ done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, true, 5, 30, 120, 30)
+ if err != nil {
+ for range pre {
+ } // consume the rest of the receiving channel so it isn't blocked
+ errc <- err
+ return
+ }
+ _ = os.Remove(path)
+ for _, p := range done {
+ up <- p
+ }
}
+ close(up)
}
- close(up)
}
func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) {