diff options
| -rw-r--r-- | pipelinepreprocess/aws.go | 46 | ||||
| -rw-r--r-- | pipelinepreprocess/main.go | 28 | 
2 files changed, 38 insertions, 36 deletions
| diff --git a/pipelinepreprocess/aws.go b/pipelinepreprocess/aws.go index 75bf81c..1ac06de 100644 --- a/pipelinepreprocess/aws.go +++ b/pipelinepreprocess/aws.go @@ -142,56 +142,54 @@ func (a *awsConn) OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error {  	return a.QueueHeartbeat(t, msgHandle, a.ocrqurl)  } -func (a *awsConn) ListObjects(bucket string, prefix string, names chan string) { +func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) { +	var names []string  	err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{  		Bucket: aws.String(bucket),  		Prefix: aws.String(prefix),  	}, func(page *s3.ListObjectsV2Output, last bool) bool {  		for _, r := range page.Contents { -			names <- *r.Key +			names = append(names, *r.Key)  		}  		return true  	}) -	close(names) -	if err != nil { -		// TODO: handle error properly -		log.Println("Error getting objects") -	} +	return names, err  } -func (a *awsConn) ListToPreprocess(bookname string, names chan string) error { -	objs := make(chan string) +func (a *awsConn) ListToPreprocess(bookname string) ([]string, error) { +	var names []string  	preprocessed := regexp.MustCompile(PreprocPattern) -	go a.ListObjects("rescribeinprogress", bookname, objs) +	objs, err := a.ListObjects("rescribeinprogress", bookname) +	if err != nil { +		return names, err +	}  	// Filter out any object that looks like it's already been preprocessed -	for n := range objs { +	for _, n := range objs {  		if preprocessed.MatchString(n) {  			a.logger.Println("Skipping item that looks like it has already been processed", n)  			continue  		} -		names <- n +		names = append(names, n)  	} -	close(names) -	// TODO: handle errors from ListObjects -	return nil +	return names, nil  } -func (a *awsConn) ListToOCR(bookname string, names chan string) error { -	objs := make(chan string) +func (a *awsConn) ListToOCR(bookname string) ([]string, error) { +	var names []string  	preprocessed := regexp.MustCompile(PreprocPattern) -	go a.ListObjects("rescribeinprogress", bookname, objs) -	a.logger.Println("Completed running listobjects") +	objs, err := a.ListObjects("rescribeinprogress", bookname) +	if err != nil { +		return names, err +	}  	// Filter out any object that looks like it hasn't already been preprocessed -	for n := range objs { +	for _, n := range objs {  		if ! preprocessed.MatchString(n) {  			a.logger.Println("Skipping item that looks like it is not preprocessed", n)  			continue  		} -		names <- n +		names = append(names, n)  	} -	close(names) -	// TODO: handle errors from ListObjects -	return nil +	return names, nil  }  func (a *awsConn) AddToQueue(url string, msg string) error { diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go index 61dec96..b513f92 100644 --- a/pipelinepreprocess/main.go +++ b/pipelinepreprocess/main.go @@ -32,8 +32,7 @@ const PauseBetweenChecks = 60 * time.Second  type Clouder interface {  	Init() error -	//ListObjects(bucket string, prefix string, names chan string) error -	ListObjects(bucket string, prefix string, names chan string) +	ListObjects(bucket string, prefix string) ([]string, error)  	Download(bucket string, key string, fn string) error  	Upload(bucket string, key string, path string) error  	CheckQueue(url string) (Qmsg, error) @@ -44,8 +43,8 @@ type Clouder interface {  type Pipeliner interface {  	Clouder -	ListToPreprocess(bookname string, names chan string) error -	ListToOCR(bookname string, names chan string) error +	ListToPreprocess(bookname string) ([]string, error) +	ListToOCR(bookname string) ([]string, error)  	DownloadFromInProgress(key string, fn string) error  	UploadToInProgress(key string, path string) error  	CheckPreQueue() (Qmsg, error) @@ -149,12 +148,15 @@ func preProcBook(msg Qmsg, conn Pipeliner) {  	go up(upc, done, conn, bookname)  	conn.Logger().Println("Getting list of objects to download") -	err = conn.ListToPreprocess(bookname, dl) +	todl, err := conn.ListToPreprocess(bookname)  	if err != nil {  		log.Println("Failed to get list of files for book", bookname, err)  		t.Stop()  		return  	} +	for _, d := range todl { +		dl <- d +	}  	// wait for the done channel to be posted to  	<-done @@ -206,13 +208,15 @@ func ocrBook(msg Qmsg, conn Pipeliner) {  	go up(upc, done, conn, bookname)  	conn.Logger().Println("Getting list of objects to download") -	go conn.ListToOCR(bookname, dl) -	//err = conn.ListToOCR(bookname, dl) -	//if err != nil { -	//	log.Println("Failed to get list of files for book", bookname, err) -	//	t.Stop() -	//	return -	//} +	todl, err := conn.ListToOCR(bookname) +	if err != nil { +		log.Println("Failed to get list of files for book", bookname, err) +		t.Stop() +		return +	} +	for _, d := range todl { +		dl <- d +	}  	// wait for the done channel to be posted to  	<-done | 
