diff options
Diffstat (limited to 'internal/pipeline/pipeline.go')
-rw-r--r-- | internal/pipeline/pipeline.go | 68 |
1 files changed, 49 insertions, 19 deletions
diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 3419f74..abf3a08 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -30,28 +30,58 @@ import ( const HeartbeatSeconds = 60 -type Clouder interface { - Init() error +type Logger interface { + GetLogger() *log.Logger + Log(v ...interface{}) +} + +type Lister interface { + Logger ListObjects(bucket string, prefix string) ([]string, error) - DeleteObjects(bucket string, keys []string) error + WIPStorageId() string +} + +type Downloader interface { + Logger Download(bucket string, key string, fn string) error + WIPStorageId() string +} + +type Uploader interface { + Logger Upload(bucket string, key string, path string) error + WIPStorageId() string +} + +type Queuer interface { + Logger + PreQueueId() string + WipeQueueId() string + OCRPageQueueId() string + AnalyseQueueId() string CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) AddToQueue(url string, msg string) error DelFromQueue(url string, handle string) error QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) } +type UploadQueuer interface { + Uploader + Queuer +} + +type DownloadLister interface { + Downloader + Lister +} + type Pipeliner interface { - Clouder - PreQueueId() string - WipeQueueId() string - OCRPageQueueId() string - AnalyseQueueId() string - TestQueueId() string - WIPStorageId() string - GetLogger() *log.Logger - Log(v ...interface{}) + Init() error + Logger + Lister + Downloader + Uploader + Queuer } type MinPipeliner interface { @@ -84,7 +114,7 @@ func GetMailSettings() (mailSettings, error) { // dir, putting each successfully downloaded file name into the // process channel. If an error occurs it is sent to the errc channel // and the function returns early. -func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { +func download(dl chan string, process chan string, conn Downloader, dir string, errc chan error, logger *log.Logger) { for key := range dl { fn := filepath.Join(dir, filepath.Base(key)) logger.Println("Downloading", key) @@ -106,7 +136,7 @@ func download(dl chan string, process chan string, conn Pipeliner, dir string, e // once it has been successfully uploaded. The done channel is // then written to to signal completion. If an error occurs it // is sent to the errc channel and the function returns early. -func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { +func up(c chan string, done chan bool, conn Uploader, bookname string, errc chan error, logger *log.Logger) { for path := range c { name := filepath.Base(path) key := bookname + "/" + name @@ -136,7 +166,7 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha // added to the toQueue once it has been uploaded. The done channel // is then written to to signal completion. If an error occurs it // is sent to the errc channel and the function returns early. -func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { +func upAndQueue(c chan string, done chan bool, toQueue string, conn UploadQueuer, bookname string, training string, errc chan error, logger *log.Logger) { for path := range c { name := filepath.Base(path) key := bookname + "/" + name @@ -231,7 +261,7 @@ func Ocr(training string, tesscmd string) func(chan string, chan string, chan er } } -func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { +func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Logger) { return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { confs := make(map[string][]*bookpipeline.Conf) bestconfs := make(map[string]*bookpipeline.Conf) @@ -432,7 +462,7 @@ func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Log } } -func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { +func heartbeat(conn Queuer, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { currentmsg := msg for range t.C { m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) @@ -462,7 +492,7 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri // allOCRed checks whether all pages of a book have been OCRed. // This is determined by whether every _bin0.?.png file has a // corresponding .hocr file. -func allOCRed(bookname string, conn Pipeliner) bool { +func allOCRed(bookname string, conn Lister) bool { objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) if err != nil { return false @@ -726,7 +756,7 @@ func getLogs() (string, error) { return stdout.String(), err } -func SaveLogs(conn Pipeliner, starttime int64, hostname string) error { +func SaveLogs(conn Uploader, starttime int64, hostname string) error { logs, err := getLogs() if err != nil { return fmt.Errorf("Error getting logs, error: %v", err) |