diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/pipeline/get.go | 8 | ||||
| -rw-r--r-- | internal/pipeline/pipeline.go | 68 | ||||
| -rw-r--r-- | internal/pipeline/pipeline_test.go | 8 | ||||
| -rw-r--r-- | internal/pipeline/put.go | 4 | 
4 files changed, 62 insertions, 26 deletions
| diff --git a/internal/pipeline/get.go b/internal/pipeline/get.go index 0db5cf2..960c8f7 100644 --- a/internal/pipeline/get.go +++ b/internal/pipeline/get.go @@ -12,7 +12,7 @@ import (  	"strings"  ) -func DownloadBestPages(dir string, name string, conn Pipeliner, pluspngs bool) error { +func DownloadBestPages(dir string, name string, conn Downloader, pluspngs bool) error {  	key := filepath.Join(name, "best")  	fn := filepath.Join(dir, "best")  	err := conn.Download(conn.WIPStorageId(), key, fn) @@ -54,7 +54,7 @@ func DownloadBestPages(dir string, name string, conn Pipeliner, pluspngs bool) e  	return nil  } -func DownloadPdfs(dir string, name string, conn Pipeliner) error { +func DownloadPdfs(dir string, name string, conn Downloader) error {  	for _, suffix := range []string{".colour.pdf", ".binarised.pdf"} {  		key := filepath.Join(name, name+suffix)  		fn := filepath.Join(dir, name+suffix) @@ -66,7 +66,7 @@ func DownloadPdfs(dir string, name string, conn Pipeliner) error {  	return nil  } -func DownloadAnalyses(dir string, name string, conn Pipeliner) error { +func DownloadAnalyses(dir string, name string, conn Downloader) error {  	for _, a := range []string{"conf", "graph.png"} {  		key := filepath.Join(name, a)  		fn := filepath.Join(dir, a) @@ -78,7 +78,7 @@ func DownloadAnalyses(dir string, name string, conn Pipeliner) error {  	return nil  } -func DownloadAll(dir string, name string, conn Pipeliner) error { +func DownloadAll(dir string, name string, conn DownloadLister) error {  	objs, err := conn.ListObjects(conn.WIPStorageId(), name)  	if err != nil {  		return fmt.Errorf("Failed to get list of files for book %s: %v", name, err) diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 3419f74..abf3a08 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -30,28 +30,58 @@ import (  const HeartbeatSeconds = 60 -type Clouder interface { -	Init() error +type Logger interface { +	GetLogger() *log.Logger +	Log(v ...interface{}) +} + +type Lister interface { +	Logger  	ListObjects(bucket string, prefix string) ([]string, error) -	DeleteObjects(bucket string, keys []string) error +	WIPStorageId() string +} + +type Downloader interface { +	Logger  	Download(bucket string, key string, fn string) error +	WIPStorageId() string +} + +type Uploader interface { +	Logger  	Upload(bucket string, key string, path string) error +	WIPStorageId() string +} + +type Queuer interface { +	Logger +	PreQueueId() string +	WipeQueueId() string +	OCRPageQueueId() string +	AnalyseQueueId() string  	CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)  	AddToQueue(url string, msg string) error  	DelFromQueue(url string, handle string) error  	QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error)  } +type UploadQueuer interface { +	Uploader +	Queuer +} + +type DownloadLister interface { +	Downloader +	Lister +} +  type Pipeliner interface { -	Clouder -	PreQueueId() string -	WipeQueueId() string -	OCRPageQueueId() string -	AnalyseQueueId() string -	TestQueueId() string -	WIPStorageId() string -	GetLogger() *log.Logger -	Log(v ...interface{}) +	Init() error +	Logger +	Lister +	Downloader +	Uploader +	Queuer  }  type MinPipeliner interface { @@ -84,7 +114,7 @@ func GetMailSettings() (mailSettings, error) {  // dir, putting each successfully downloaded file name into the  // process channel. If an error occurs it is sent to the errc channel  // and the function returns early. -func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { +func download(dl chan string, process chan string, conn Downloader, dir string, errc chan error, logger *log.Logger) {  	for key := range dl {  		fn := filepath.Join(dir, filepath.Base(key))  		logger.Println("Downloading", key) @@ -106,7 +136,7 @@ func download(dl chan string, process chan string, conn Pipeliner, dir string, e  // once it has been successfully uploaded. The done channel is  // then written to to signal completion. If an error occurs it  // is sent to the errc channel and the function returns early. -func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { +func up(c chan string, done chan bool, conn Uploader, bookname string, errc chan error, logger *log.Logger) {  	for path := range c {  		name := filepath.Base(path)  		key := bookname + "/" + name @@ -136,7 +166,7 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha  // added to the toQueue once it has been uploaded. The done channel  // is then written to to signal completion. If an error occurs it  // is sent to the errc channel and the function returns early. -func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { +func upAndQueue(c chan string, done chan bool, toQueue string, conn UploadQueuer, bookname string, training string, errc chan error, logger *log.Logger) {  	for path := range c {  		name := filepath.Base(path)  		key := bookname + "/" + name @@ -231,7 +261,7 @@ func Ocr(training string, tesscmd string) func(chan string, chan string, chan er  	}  } -func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { +func Analyse(conn Downloader) func(chan string, chan string, chan error, *log.Logger) {  	return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) {  		confs := make(map[string][]*bookpipeline.Conf)  		bestconfs := make(map[string]*bookpipeline.Conf) @@ -432,7 +462,7 @@ func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Log  	}  } -func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { +func heartbeat(conn Queuer, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) {  	currentmsg := msg  	for range t.C {  		m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) @@ -462,7 +492,7 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri  // allOCRed checks whether all pages of a book have been OCRed.  // This is determined by whether every _bin0.?.png file has a  // corresponding .hocr file. -func allOCRed(bookname string, conn Pipeliner) bool { +func allOCRed(bookname string, conn Lister) bool {  	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname)  	if err != nil {  		return false @@ -726,7 +756,7 @@ func getLogs() (string, error) {  	return stdout.String(), err  } -func SaveLogs(conn Pipeliner, starttime int64, hostname string) error { +func SaveLogs(conn Uploader, starttime int64, hostname string) error {  	logs, err := getLogs()  	if err != nil {  		return fmt.Errorf("Error getting logs, error: %v", err) diff --git a/internal/pipeline/pipeline_test.go b/internal/pipeline/pipeline_test.go index dfcb8a3..806f9be 100644 --- a/internal/pipeline/pipeline_test.go +++ b/internal/pipeline/pipeline_test.go @@ -28,9 +28,15 @@ func (t *StrLog) Write(p []byte) (n int, err error) {  	return len(p), nil  } +type PipelineTester interface { +	Pipeliner +	DeleteObjects(bucket string, keys []string) error +	TestQueueId() string +} +  type connection struct {  	name string -	c Pipeliner +	c PipelineTester  }  // Test_download tests the download() function inside the pipeline diff --git a/internal/pipeline/put.go b/internal/pipeline/put.go index 87e4c99..647b7a9 100644 --- a/internal/pipeline/put.go +++ b/internal/pipeline/put.go @@ -59,7 +59,7 @@ func CheckImages(dir string) error {  	return nil  } -func DetectQueueType(dir string, conn Pipeliner) string { +func DetectQueueType(dir string, conn Queuer) string {  	// Auto detect type of queue to send to based on file extension  	pngdirs, _ := filepath.Glob(dir + "/*.png")  	jpgdirs, _ := filepath.Glob(dir + "/*.jpg") @@ -72,7 +72,7 @@ func DetectQueueType(dir string, conn Pipeliner) string {  	}  } -func UploadImages(dir string, bookname string, conn Pipeliner) error { +func UploadImages(dir string, bookname string, conn Uploader) error {  	walker := make(fileWalk)  	go func() {  		_ = filepath.Walk(dir, walker.Walk) | 
