summaryrefslogtreecommitdiff
path: root/internal/pipeline
diff options
context:
space:
mode:
authorNick White <git@njw.name>2021-07-13 11:49:58 +0100
committerNick White <git@njw.name>2021-07-13 11:49:58 +0100
commite22c16ca84163636d674fb0935156f466169e1a3 (patch)
tree6d29d1ebea4a89dad706df79d40c6a37df2d4b01 /internal/pipeline
parent05fbc6c65c8556d89d8b7fdb645f3ef3b273b5e9 (diff)
internal/pipeline: Reorganise interfaces so that functions only declare what they need
We were using Pipeliner as a catch-all, but it's nicer if the functions can just state that e.g. they need download functionality, so decompose things so that that's how we do things now.
Diffstat (limited to 'internal/pipeline')
-rw-r--r--internal/pipeline/get.go8
-rw-r--r--internal/pipeline/pipeline.go68
-rw-r--r--internal/pipeline/pipeline_test.go8
-rw-r--r--internal/pipeline/put.go4
4 files changed, 62 insertions, 26 deletions
diff --git a/internal/pipeline/get.go b/internal/pipeline/get.go
index 0db5cf2..960c8f7 100644
--- a/internal/pipeline/get.go
+++ b/internal/pipeline/get.go
@@ -12,7 +12,7 @@ import (
"strings"
)
-func DownloadBestPages(dir string, name string, conn Pipeliner, pluspngs bool) error {
+func DownloadBestPages(dir string, name string, conn Downloader, pluspngs bool) error {
key := filepath.Join(name, "best")
fn := filepath.Join(dir, "best")
err := conn.Download(conn.WIPStorageId(), key, fn)
@@ -54,7 +54,7 @@ func DownloadBestPages(dir string, name string, conn Pipeliner, pluspngs bool) e
return nil
}
-func DownloadPdfs(dir string, name string, conn Pipeliner) error {
+func DownloadPdfs(dir string, name string, conn Downloader) error {
for _, suffix := range []string{".colour.pdf", ".binarised.pdf"} {
key := filepath.Join(name, name+suffix)
fn := filepath.Join(dir, name+suffix)
@@ -66,7 +66,7 @@ func DownloadPdfs(dir string, name string, conn Pipeliner) error {
return nil
}
-func DownloadAnalyses(dir string, name string, conn Pipeliner) error {
+func DownloadAnalyses(dir string, name string, conn Downloader) error {
for _, a := range []string{"conf", "graph.png"} {
key := filepath.Join(name, a)
fn := filepath.Join(dir, a)
@@ -78,7 +78,7 @@ func DownloadAnalyses(dir string, name string, conn Pipeliner) error {
return nil
}
-func DownloadAll(dir string, name string, conn Pipeliner) error {
+func DownloadAll(dir string, name string, conn DownloadLister) error {
objs, err := conn.ListObjects(conn.WIPStorageId(), name)
if err != nil {
return fmt.Errorf("Failed to get list of files for book %s: %v", name, err)
diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go
index 3419f74..abf3a08 100644
--- a/internal/pipeline/pipeline.go
+++ b/internal/pipeline/pipeline.go
@@ -30,28 +30,58 @@ import (
const HeartbeatSeconds = 60
-type Clouder interface {
- Init() error
+type Logger interface {
+ GetLogger() *log.Logger
+ Log(v ...interface{})
+}
+
+type Lister interface {
+ Logger
ListObjects(bucket string, prefix string) ([]string, error)
- DeleteObjects(bucket string, keys []string) error
+ WIPStorageId() string
+}
+
+type Downloader interface {
+ Logger
Download(bucket string, key string, fn string) error
+ WIPStorageId() string
+}
+
+type Uploader interface {
+ Logger
Upload(bucket string, key string, path string) error
+ WIPStorageId() string
+}
+
+type Queuer interface {
+ Logger
+ PreQueueId() string
+ WipeQueueId() string
+ OCRPageQueueId() string
+ AnalyseQueueId() string
CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
AddToQueue(url string, msg string) error
DelFromQueue(url string, handle string) error
QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)
}
+type UploadQueuer interface {
+ Uploader
+ Queuer
+}
+
+type DownloadLister interface {
+ Downloader
+ Lister
+}
+
type Pipeliner interface {
- Clouder
- PreQueueId() string
- WipeQueueId() string
- OCRPageQueueId() string
- AnalyseQueueId() string
- TestQueueId() string
- WIPStorageId() string
- GetLogger() *log.Logger
- Log(v ...interface{})
+ Init() error
+ Logger
+ Lister
+ Downloader
+ Uploader
+ Queuer
}
type MinPipeliner interface {
@@ -84,7 +114,7 @@ func GetMailSettings() (mailSettings, error) {
// dir, putting each successfully downloaded file name into the
// process channel. If an error occurs it is sent to the errc channel
// and the function returns early.
-func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) {
+func download(dl chan string, process chan string, conn Downloader, dir string, errc chan error, logger *log.Logger) {
for key := range dl {
fn := filepath.Join(dir, filepath.Base(key))
logger.Println("Downloading", key)
@@ -106,7 +136,7 @@ func download(dl chan string, process chan string, conn Pipeliner, dir string, e
// once it has been successfully uploaded. The done channel is
// then written to to signal completion. If an error occurs it
// is sent to the errc channel and the function returns early.
-func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) {
+func up(c chan string, done chan bool, conn Uploader, bookname string, errc chan error, logger *log.Logger) {
for path := range c {
name := filepath.Base(path)
key := bookname + "/" + name
@@ -136,7 +166,7 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha
// added to the toQueue once it has been uploaded. The done channel
// is then written to to signal completion. If an error occurs it
// is sent to the errc channel and the function returns early.
-func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) {
+func upAndQueue(c chan string, done chan bool, toQueue string, conn UploadQueuer, bookname string, training string, errc chan error, logger *log.Logger) {
for path := range c {
name := filepath.Base(path)
key := bookname + "/" + name
@@ -231,7 +261,7 @@ func Ocr(training string, tesscmd string) func(chan string, chan string, chan er
}
}
-func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) {
+func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Logger) {
return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) {
confs := make(map[string][]*bookpipeline.Conf)
bestconfs := make(map[string]*bookpipeline.Conf)
@@ -432,7 +462,7 @@ func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Log
}
}
-func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) {
+func heartbeat(conn Queuer, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) {
currentmsg := msg
for range t.C {
m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2)
@@ -462,7 +492,7 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri
// allOCRed checks whether all pages of a book have been OCRed.
// This is determined by whether every _bin0.?.png file has a
// corresponding .hocr file.
-func allOCRed(bookname string, conn Pipeliner) bool {
+func allOCRed(bookname string, conn Lister) bool {
objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
if err != nil {
return false
@@ -726,7 +756,7 @@ func getLogs() (string, error) {
return stdout.String(), err
}
-func SaveLogs(conn Pipeliner, starttime int64, hostname string) error {
+func SaveLogs(conn Uploader, starttime int64, hostname string) error {
logs, err := getLogs()
if err != nil {
return fmt.Errorf("Error getting logs, error: %v", err)
diff --git a/internal/pipeline/pipeline_test.go b/internal/pipeline/pipeline_test.go
index dfcb8a3..806f9be 100644
--- a/internal/pipeline/pipeline_test.go
+++ b/internal/pipeline/pipeline_test.go
@@ -28,9 +28,15 @@ func (t *StrLog) Write(p []byte) (n int, err error) {
return len(p), nil
}
+type PipelineTester interface {
+ Pipeliner
+ DeleteObjects(bucket string, keys []string) error
+ TestQueueId() string
+}
+
type connection struct {
name string
- c Pipeliner
+ c PipelineTester
}
// Test_download tests the download() function inside the pipeline
diff --git a/internal/pipeline/put.go b/internal/pipeline/put.go
index 87e4c99..647b7a9 100644
--- a/internal/pipeline/put.go
+++ b/internal/pipeline/put.go
@@ -59,7 +59,7 @@ func CheckImages(dir string) error {
return nil
}
-func DetectQueueType(dir string, conn Pipeliner) string {
+func DetectQueueType(dir string, conn Queuer) string {
// Auto detect type of queue to send to based on file extension
pngdirs, _ := filepath.Glob(dir + "/*.png")
jpgdirs, _ := filepath.Glob(dir + "/*.jpg")
@@ -72,7 +72,7 @@ func DetectQueueType(dir string, conn Pipeliner) string {
}
}
-func UploadImages(dir string, bookname string, conn Pipeliner) error {
+func UploadImages(dir string, bookname string, conn Uploader) error {
walker := make(fileWalk)
go func() {
_ = filepath.Walk(dir, walker.Walk)