diff options
| -rw-r--r-- | aws.go | 3 | ||||
| -rw-r--r-- | cmd/lspipeline/main.go | 95 | 
2 files changed, 79 insertions, 19 deletions
| @@ -355,11 +355,12 @@ func (a *AwsConn) ListObjectsWithMeta(bucket string, prefix string) ([]ObjMeta,  	err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{  		Bucket: aws.String(bucket),  		Prefix: aws.String(prefix), +		MaxKeys: aws.Int64(1),  	}, func(page *s3.ListObjectsV2Output, last bool) bool {  		for _, r := range page.Contents {  			objs = append(objs, ObjMeta{Name: *r.Key, Date: *r.LastModified})  		} -		return true +		return false // only process the first page as that's all we need  	})  	return objs, err  } diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index b649778..bf19b52 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -12,6 +12,8 @@ import (  	"os/exec"  	"sort"  	"strings" +	"sync" +	"time"  	"rescribe.xyz/bookpipeline"  ) @@ -100,6 +102,46 @@ func (o ObjMetas) Less(i, j int) bool {  	return o[i].Date.Before(o[j].Date)  } +// getBookDetails determines whether a book is done and what date +// it was completed, or if it has not finished, the date of any +// book file. +func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, err error) { +	// First try to get the graph.png file from the book, which marks +	// it as done +	objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), key+"graph.png") +	if err == nil && len(objs) > 0 { +		return objs[0].Date, true, nil +	} + +	// Otherwise get any file from the book to get a date to sort by +	objs, err = conn.ListObjectsWithMeta(conn.WIPStorageId(), key) +	if err != nil { +		return time.Time{}, false, err +	} +	if len(objs) == 0 { +		return time.Time{}, false, fmt.Errorf("No files found for book %s", key) +	} +	return objs[0].Date, false, nil +} + +// getBookDetailsWg gets the details for a book putting it into either the +// done or inprogress channels as appropriate, and using a sync.WaitGroup Done +// signal so it can be tracked. On error it sends to the errc channel. +func getBookDetailsWg(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error, wg *sync.WaitGroup) { +	defer wg.Done() +	date, isdone, err := getBookDetails(conn, key) +	if err != nil { +		errc <- err +		return +	} +	meta := bookpipeline.ObjMeta{Name: strings.TrimSuffix(key, "/"), Date: date} +	if isdone { +		done <- meta +	} else { +		inprogress <- meta +	} +} +  // getBookStatus returns a list of in progress and done books.  // It determines this by finding all prefixes, and splitting them  // into two lists, those which have a 'graph.png' file (the done @@ -108,35 +150,52 @@ func (o ObjMetas) Less(i, j int) bool {  // of a random file with the prefix if no graph.png was found.  func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) {  	prefixes, err := conn.ListObjectPrefixes(conn.WIPStorageId()) -	var inprogressmeta, donemeta ObjMetas  	if err != nil {  		log.Println("Error getting object prefixes:", err)  		return  	} -	// Search for graph.png to determine done books (and save the date of it to sort with) + +	// 100,000 size buffer is to ensure we never block, as we're using waitgroup +	// rather than channel blocking to determine when to continue. Probably there +	// is a better way to do this, though, like reading the done and inprogress +	// channels in a goroutine and doing wg.Done() when each is read there instead. +	donec := make(chan bookpipeline.ObjMeta, 100000) +	inprogressc := make(chan bookpipeline.ObjMeta, 100000) +	errc := make(chan error, 100000) + +	var wg sync.WaitGroup  	for _, p := range prefixes { -		objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), p+"graph.png") -		if err != nil || len(objs) == 0 { -			inprogressmeta = append(inprogressmeta, bookpipeline.ObjMeta{Name: p}) -		} else { -			donemeta = append(donemeta, bookpipeline.ObjMeta{Name: p, Date: objs[0].Date}) -		} +		wg.Add(1) +		go getBookDetailsWg(conn, p, donec, inprogressc, errc, &wg)  	} -	// Get a random file from the inprogress list to get a date to sort by -	for _, i := range inprogressmeta { -		objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), i.Name) -		if err != nil || len(objs) == 0 { -			continue -		} -		i.Date = objs[0].Date +	wg.Wait() +	close(donec) +	close(inprogressc) + +	select { +		case err = <-errc: +			return inprogress, done, err +		default: +			break +	} + +	var inprogressmeta, donemeta ObjMetas + +	for i := range donec { +		donemeta = append(donemeta, i) +	} +	for i := range inprogressc { +		inprogressmeta = append(inprogressmeta, i)  	} +  	sort.Sort(donemeta) +	sort.Sort(inprogressmeta) +  	for _, i := range donemeta { -		done = append(done, strings.TrimSuffix(i.Name, "/")) +		done = append(done, i.Name)  	} -	sort.Sort(inprogressmeta)  	for _, i := range inprogressmeta { -		inprogress = append(inprogress, strings.TrimSuffix(i.Name, "/")) +		inprogress = append(inprogress, i.Name)  	}  	return | 
