From e22c16ca84163636d674fb0935156f466169e1a3 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 13 Jul 2021 11:49:58 +0100 Subject: internal/pipeline: Reorganise interfaces so that functions only declare what they need We were using Pipeliner as a catch-all, but it's nicer if the functions can just state that e.g. they need download functionality, so decompose things so that that's how we do things now. --- internal/pipeline/get.go | 8 ++--- internal/pipeline/pipeline.go | 68 +++++++++++++++++++++++++++----------- internal/pipeline/pipeline_test.go | 8 ++++- internal/pipeline/put.go | 4 +-- 4 files changed, 62 insertions(+), 26 deletions(-) (limited to 'internal/pipeline') diff --git a/internal/pipeline/get.go b/internal/pipeline/get.go index 0db5cf2..960c8f7 100644 --- a/internal/pipeline/get.go +++ b/internal/pipeline/get.go @@ -12,7 +12,7 @@ import ( "strings" ) -func DownloadBestPages(dir string, name string, conn Pipeliner, pluspngs bool) error { +func DownloadBestPages(dir string, name string, conn Downloader, pluspngs bool) error { key := filepath.Join(name, "best") fn := filepath.Join(dir, "best") err := conn.Download(conn.WIPStorageId(), key, fn) @@ -54,7 +54,7 @@ func DownloadBestPages(dir string, name string, conn Pipeliner, pluspngs bool) e return nil } -func DownloadPdfs(dir string, name string, conn Pipeliner) error { +func DownloadPdfs(dir string, name string, conn Downloader) error { for _, suffix := range []string{".colour.pdf", ".binarised.pdf"} { key := filepath.Join(name, name+suffix) fn := filepath.Join(dir, name+suffix) @@ -66,7 +66,7 @@ func DownloadPdfs(dir string, name string, conn Pipeliner) error { return nil } -func DownloadAnalyses(dir string, name string, conn Pipeliner) error { +func DownloadAnalyses(dir string, name string, conn Downloader) error { for _, a := range []string{"conf", "graph.png"} { key := filepath.Join(name, a) fn := filepath.Join(dir, a) @@ -78,7 +78,7 @@ func DownloadAnalyses(dir string, name string, conn Pipeliner) error { return nil } -func DownloadAll(dir string, name string, conn Pipeliner) error { +func DownloadAll(dir string, name string, conn DownloadLister) error { objs, err := conn.ListObjects(conn.WIPStorageId(), name) if err != nil { return fmt.Errorf("Failed to get list of files for book %s: %v", name, err) diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 3419f74..abf3a08 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -30,28 +30,58 @@ import ( const HeartbeatSeconds = 60 -type Clouder interface { - Init() error +type Logger interface { + GetLogger() *log.Logger + Log(v ...interface{}) +} + +type Lister interface { + Logger ListObjects(bucket string, prefix string) ([]string, error) - DeleteObjects(bucket string, keys []string) error + WIPStorageId() string +} + +type Downloader interface { + Logger Download(bucket string, key string, fn string) error + WIPStorageId() string +} + +type Uploader interface { + Logger Upload(bucket string, key string, path string) error + WIPStorageId() string +} + +type Queuer interface { + Logger + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) AddToQueue(url string, msg string) error DelFromQueue(url string, handle string) error QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) } +type UploadQueuer interface { + Uploader + Queuer +} + +type DownloadLister interface { + Downloader + Lister +} + type Pipeliner interface { - Clouder - PreQueueId() string - WipeQueueId() string - OCRPageQueueId() string - AnalyseQueueId() string - TestQueueId() string - WIPStorageId() string - GetLogger() *log.Logger - Log(v ...interface{}) + Init() error + Logger + Lister + Downloader + Uploader + Queuer } type MinPipeliner interface { @@ -84,7 +114,7 @@ func GetMailSettings() (mailSettings, error) { // dir, putting each successfully downloaded file name into the // process channel. If an error occurs it is sent to the errc channel // and the function returns early. -func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { +func download(dl chan string, process chan string, conn Downloader, dir string, errc chan error, logger *log.Logger) { for key := range dl { fn := filepath.Join(dir, filepath.Base(key)) logger.Println("Downloading", key) @@ -106,7 +136,7 @@ func download(dl chan string, process chan string, conn Pipeliner, dir string, e // once it has been successfully uploaded. The done channel is // then written to to signal completion. If an error occurs it // is sent to the errc channel and the function returns early. -func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { +func up(c chan string, done chan bool, conn Uploader, bookname string, errc chan error, logger *log.Logger) { for path := range c { name := filepath.Base(path) key := bookname + "/" + name @@ -136,7 +166,7 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha // added to the toQueue once it has been uploaded. The done channel // is then written to to signal completion. If an error occurs it // is sent to the errc channel and the function returns early. -func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { +func upAndQueue(c chan string, done chan bool, toQueue string, conn UploadQueuer, bookname string, training string, errc chan error, logger *log.Logger) { for path := range c { name := filepath.Base(path) key := bookname + "/" + name @@ -231,7 +261,7 @@ func Ocr(training string, tesscmd string) func(chan string, chan string, chan er } } -func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { +func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Logger) { return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { confs := make(map[string][]*bookpipeline.Conf) bestconfs := make(map[string]*bookpipeline.Conf) @@ -432,7 +462,7 @@ func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Log } } -func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { +func heartbeat(conn Queuer, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { currentmsg := msg for range t.C { m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) @@ -462,7 +492,7 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri // allOCRed checks whether all pages of a book have been OCRed. // This is determined by whether every _bin0.?.png file has a // corresponding .hocr file. -func allOCRed(bookname string, conn Pipeliner) bool { +func allOCRed(bookname string, conn Lister) bool { objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) if err != nil { return false @@ -726,7 +756,7 @@ func getLogs() (string, error) { return stdout.String(), err } -func SaveLogs(conn Pipeliner, starttime int64, hostname string) error { +func SaveLogs(conn Uploader, starttime int64, hostname string) error { logs, err := getLogs() if err != nil { return fmt.Errorf("Error getting logs, error: %v", err) diff --git a/internal/pipeline/pipeline_test.go b/internal/pipeline/pipeline_test.go index dfcb8a3..806f9be 100644 --- a/internal/pipeline/pipeline_test.go +++ b/internal/pipeline/pipeline_test.go @@ -28,9 +28,15 @@ func (t *StrLog) Write(p []byte) (n int, err error) { return len(p), nil } +type PipelineTester interface { + Pipeliner + DeleteObjects(bucket string, keys []string) error + TestQueueId() string +} + type connection struct { name string - c Pipeliner + c PipelineTester } // Test_download tests the download() function inside the pipeline diff --git a/internal/pipeline/put.go b/internal/pipeline/put.go index 87e4c99..647b7a9 100644 --- a/internal/pipeline/put.go +++ b/internal/pipeline/put.go @@ -59,7 +59,7 @@ func CheckImages(dir string) error { return nil } -func DetectQueueType(dir string, conn Pipeliner) string { +func DetectQueueType(dir string, conn Queuer) string { // Auto detect type of queue to send to based on file extension pngdirs, _ := filepath.Glob(dir + "/*.png") jpgdirs, _ := filepath.Glob(dir + "/*.jpg") @@ -72,7 +72,7 @@ func DetectQueueType(dir string, conn Pipeliner) string { } } -func UploadImages(dir string, bookname string, conn Pipeliner) error { +func UploadImages(dir string, bookname string, conn Uploader) error { walker := make(fileWalk) go func() { _ = filepath.Walk(dir, walker.Walk) -- cgit v1.2.1-24-ge1ad