From 522e1481f5544362027b006d5fe34609f3d366bc Mon Sep 17 00:00:00 2001
From: Nick White <git@njw.name>
Date: Tue, 20 Aug 2019 16:26:02 +0100
Subject: Substantially improve problematic object listing part of API

Switch to regular non-concurrent stuff, concurrency is better handled
by the main program anyway. Now we handle errors properly, and things
are way simpler.
---
 pipelinepreprocess/aws.go  | 46 ++++++++++++++++++++++------------------------
 pipelinepreprocess/main.go | 28 ++++++++++++++++------------
 2 files changed, 38 insertions(+), 36 deletions(-)

(limited to 'pipelinepreprocess')

diff --git a/pipelinepreprocess/aws.go b/pipelinepreprocess/aws.go
index 75bf81c..1ac06de 100644
--- a/pipelinepreprocess/aws.go
+++ b/pipelinepreprocess/aws.go
@@ -142,56 +142,54 @@ func (a *awsConn) OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error {
 	return a.QueueHeartbeat(t, msgHandle, a.ocrqurl)
 }
 
-func (a *awsConn) ListObjects(bucket string, prefix string, names chan string) {
+func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) {
+	var names []string
 	err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{
 		Bucket: aws.String(bucket),
 		Prefix: aws.String(prefix),
 	}, func(page *s3.ListObjectsV2Output, last bool) bool {
 		for _, r := range page.Contents {
-			names <- *r.Key
+			names = append(names, *r.Key)
 		}
 		return true
 	})
-	close(names)
-	if err != nil {
-		// TODO: handle error properly
-		log.Println("Error getting objects")
-	}
+	return names, err
 }
 
-func (a *awsConn) ListToPreprocess(bookname string, names chan string) error {
-	objs := make(chan string)
+func (a *awsConn) ListToPreprocess(bookname string) ([]string, error) {
+	var names []string
 	preprocessed := regexp.MustCompile(PreprocPattern)
-	go a.ListObjects("rescribeinprogress", bookname, objs)
+	objs, err := a.ListObjects("rescribeinprogress", bookname)
+	if err != nil {
+		return names, err
+	}
 	// Filter out any object that looks like it's already been preprocessed
-	for n := range objs {
+	for _, n := range objs {
 		if preprocessed.MatchString(n) {
 			a.logger.Println("Skipping item that looks like it has already been processed", n)
 			continue
 		}
-		names <- n
+		names = append(names, n)
 	}
-	close(names)
-	// TODO: handle errors from ListObjects
-	return nil
+	return names, nil
 }
 
-func (a *awsConn) ListToOCR(bookname string, names chan string) error {
-	objs := make(chan string)
+func (a *awsConn) ListToOCR(bookname string) ([]string, error) {
+	var names []string
 	preprocessed := regexp.MustCompile(PreprocPattern)
-	go a.ListObjects("rescribeinprogress", bookname, objs)
-	a.logger.Println("Completed running listobjects")
+	objs, err := a.ListObjects("rescribeinprogress", bookname)
+	if err != nil {
+		return names, err
+	}
 	// Filter out any object that looks like it hasn't already been preprocessed
-	for n := range objs {
+	for _, n := range objs {
 		if ! preprocessed.MatchString(n) {
 			a.logger.Println("Skipping item that looks like it is not preprocessed", n)
 			continue
 		}
-		names <- n
+		names = append(names, n)
 	}
-	close(names)
-	// TODO: handle errors from ListObjects
-	return nil
+	return names, nil
 }
 
 func (a *awsConn) AddToQueue(url string, msg string) error {
diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go
index 61dec96..b513f92 100644
--- a/pipelinepreprocess/main.go
+++ b/pipelinepreprocess/main.go
@@ -32,8 +32,7 @@ const PauseBetweenChecks = 60 * time.Second
 
 type Clouder interface {
 	Init() error
-	//ListObjects(bucket string, prefix string, names chan string) error
-	ListObjects(bucket string, prefix string, names chan string)
+	ListObjects(bucket string, prefix string) ([]string, error)
 	Download(bucket string, key string, fn string) error
 	Upload(bucket string, key string, path string) error
 	CheckQueue(url string) (Qmsg, error)
@@ -44,8 +43,8 @@ type Clouder interface {
 
 type Pipeliner interface {
 	Clouder
-	ListToPreprocess(bookname string, names chan string) error
-	ListToOCR(bookname string, names chan string) error
+	ListToPreprocess(bookname string) ([]string, error)
+	ListToOCR(bookname string) ([]string, error)
 	DownloadFromInProgress(key string, fn string) error
 	UploadToInProgress(key string, path string) error
 	CheckPreQueue() (Qmsg, error)
@@ -149,12 +148,15 @@ func preProcBook(msg Qmsg, conn Pipeliner) {
 	go up(upc, done, conn, bookname)
 
 	conn.Logger().Println("Getting list of objects to download")
-	err = conn.ListToPreprocess(bookname, dl)
+	todl, err := conn.ListToPreprocess(bookname)
 	if err != nil {
 		log.Println("Failed to get list of files for book", bookname, err)
 		t.Stop()
 		return
 	}
+	for _, d := range todl {
+		dl <- d
+	}
 
 	// wait for the done channel to be posted to
 	<-done
@@ -206,13 +208,15 @@ func ocrBook(msg Qmsg, conn Pipeliner) {
 	go up(upc, done, conn, bookname)
 
 	conn.Logger().Println("Getting list of objects to download")
-	go conn.ListToOCR(bookname, dl)
-	//err = conn.ListToOCR(bookname, dl)
-	//if err != nil {
-	//	log.Println("Failed to get list of files for book", bookname, err)
-	//	t.Stop()
-	//	return
-	//}
+	todl, err := conn.ListToOCR(bookname)
+	if err != nil {
+		log.Println("Failed to get list of files for book", bookname, err)
+		t.Stop()
+		return
+	}
+	for _, d := range todl {
+		dl <- d
+	}
 
 	// wait for the done channel to be posted to
 	<-done
-- 
cgit v1.2.1-24-ge1ad