summaryrefslogtreecommitdiff
path: root/internal/pipeline/pipeline.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/pipeline/pipeline.go')
-rw-r--r--internal/pipeline/pipeline.go68
1 files changed, 49 insertions, 19 deletions
diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go
index 3419f74..abf3a08 100644
--- a/internal/pipeline/pipeline.go
+++ b/internal/pipeline/pipeline.go
@@ -30,28 +30,58 @@ import (
const HeartbeatSeconds = 60
-type Clouder interface {
- Init() error
+type Logger interface {
+ GetLogger() *log.Logger
+ Log(v ...interface{})
+}
+
+type Lister interface {
+ Logger
ListObjects(bucket string, prefix string) ([]string, error)
- DeleteObjects(bucket string, keys []string) error
+ WIPStorageId() string
+}
+
+type Downloader interface {
+ Logger
Download(bucket string, key string, fn string) error
+ WIPStorageId() string
+}
+
+type Uploader interface {
+ Logger
Upload(bucket string, key string, path string) error
+ WIPStorageId() string
+}
+
+type Queuer interface {
+ Logger
+ PreQueueId() string
+ WipeQueueId() string
+ OCRPageQueueId() string
+ AnalyseQueueId() string
CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
AddToQueue(url string, msg string) error
DelFromQueue(url string, handle string) error
QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)
}
+type UploadQueuer interface {
+ Uploader
+ Queuer
+}
+
+type DownloadLister interface {
+ Downloader
+ Lister
+}
+
type Pipeliner interface {
- Clouder
- PreQueueId() string
- WipeQueueId() string
- OCRPageQueueId() string
- AnalyseQueueId() string
- TestQueueId() string
- WIPStorageId() string
- GetLogger() *log.Logger
- Log(v ...interface{})
+ Init() error
+ Logger
+ Lister
+ Downloader
+ Uploader
+ Queuer
}
type MinPipeliner interface {
@@ -84,7 +114,7 @@ func GetMailSettings() (mailSettings, error) {
// dir, putting each successfully downloaded file name into the
// process channel. If an error occurs it is sent to the errc channel
// and the function returns early.
-func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) {
+func download(dl chan string, process chan string, conn Downloader, dir string, errc chan error, logger *log.Logger) {
for key := range dl {
fn := filepath.Join(dir, filepath.Base(key))
logger.Println("Downloading", key)
@@ -106,7 +136,7 @@ func download(dl chan string, process chan string, conn Pipeliner, dir string, e
// once it has been successfully uploaded. The done channel is
// then written to to signal completion. If an error occurs it
// is sent to the errc channel and the function returns early.
-func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) {
+func up(c chan string, done chan bool, conn Uploader, bookname string, errc chan error, logger *log.Logger) {
for path := range c {
name := filepath.Base(path)
key := bookname + "/" + name
@@ -136,7 +166,7 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha
// added to the toQueue once it has been uploaded. The done channel
// is then written to to signal completion. If an error occurs it
// is sent to the errc channel and the function returns early.
-func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) {
+func upAndQueue(c chan string, done chan bool, toQueue string, conn UploadQueuer, bookname string, training string, errc chan error, logger *log.Logger) {
for path := range c {
name := filepath.Base(path)
key := bookname + "/" + name
@@ -231,7 +261,7 @@ func Ocr(training string, tesscmd string) func(chan string, chan string, chan er
}
}
-func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) {
+func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Logger) {
return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) {
confs := make(map[string][]*bookpipeline.Conf)
bestconfs := make(map[string]*bookpipeline.Conf)
@@ -432,7 +462,7 @@ func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Log
}
}
-func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) {
+func heartbeat(conn Queuer, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) {
currentmsg := msg
for range t.C {
m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2)
@@ -462,7 +492,7 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri
// allOCRed checks whether all pages of a book have been OCRed.
// This is determined by whether every _bin0.?.png file has a
// corresponding .hocr file.
-func allOCRed(bookname string, conn Pipeliner) bool {
+func allOCRed(bookname string, conn Lister) bool {
objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)
if err != nil {
return false
@@ -726,7 +756,7 @@ func getLogs() (string, error) {
return stdout.String(), err
}
-func SaveLogs(conn Pipeliner, starttime int64, hostname string) error {
+func SaveLogs(conn Uploader, starttime int64, hostname string) error {
logs, err := getLogs()
if err != nil {
return fmt.Errorf("Error getting logs, error: %v", err)