diff options
| author | Nick White <git@njw.name> | 2021-01-26 14:17:19 +0000 | 
|---|---|---|
| committer | Nick White <git@njw.name> | 2021-01-26 14:17:19 +0000 | 
| commit | 54150b54cd06e3deba44e73b151070b74a4d8e76 (patch) | |
| tree | 894aecdcfe887a71dd40e35dfd4a877eb5a6beca /cmd/lspipeline | |
| parent | 86cc5d6c921ac05e0d08f66b205b51e1f5adb938 (diff) | |
Improve lspipeline concurrency by removing WaitGroup stuff
Diffstat (limited to 'cmd/lspipeline')
| -rw-r--r-- | cmd/lspipeline/main.go | 52 | 
1 files changed, 21 insertions, 31 deletions
| diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index bf19b52..8980c59 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -12,7 +12,6 @@ import (  	"os/exec"  	"sort"  	"strings" -	"sync"  	"time"  	"rescribe.xyz/bookpipeline" @@ -124,11 +123,10 @@ func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, er  	return objs[0].Date, false, nil  } -// getBookDetailsWg gets the details for a book putting it into either the -// done or inprogress channels as appropriate, and using a sync.WaitGroup Done -// signal so it can be tracked. On error it sends to the errc channel. -func getBookDetailsWg(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error, wg *sync.WaitGroup) { -	defer wg.Done() +// getBookDetailsChan gets the details for a book putting it into either the +// done or inprogress channels as appropriate, or sending an error to errc +// on failure. +func getBookDetailsChan(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error) {  	date, isdone, err := getBookDetails(conn, key)  	if err != nil {  		errc <- err @@ -148,6 +146,8 @@ func getBookDetailsWg(conn LsPipeliner, key string, done chan bookpipeline.ObjMe  // list), and those which do not (the inprogress list). They are  // sorted according to the date of the graph.png file, or the date  // of a random file with the prefix if no graph.png was found. +// It spins up many goroutines to do query the book status and +// dates, as it is far faster to do concurrently.  func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) {  	prefixes, err := conn.ListObjectPrefixes(conn.WIPStorageId())  	if err != nil { @@ -155,37 +155,27 @@ func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err er  		return  	} -	// 100,000 size buffer is to ensure we never block, as we're using waitgroup -	// rather than channel blocking to determine when to continue. Probably there -	// is a better way to do this, though, like reading the done and inprogress -	// channels in a goroutine and doing wg.Done() when each is read there instead. -	donec := make(chan bookpipeline.ObjMeta, 100000) -	inprogressc := make(chan bookpipeline.ObjMeta, 100000) -	errc := make(chan error, 100000) +	donec := make(chan bookpipeline.ObjMeta, 100) +	inprogressc := make(chan bookpipeline.ObjMeta, 100) +	errc := make(chan error) -	var wg sync.WaitGroup  	for _, p := range prefixes { -		wg.Add(1) -		go getBookDetailsWg(conn, p, donec, inprogressc, errc, &wg) -	} -	wg.Wait() -	close(donec) -	close(inprogressc) - -	select { -		case err = <-errc: -			return inprogress, done, err -		default: -			break +		go getBookDetailsChan(conn, p, donec, inprogressc, errc)  	}  	var inprogressmeta, donemeta ObjMetas -	for i := range donec { -		donemeta = append(donemeta, i) -	} -	for i := range inprogressc { -		inprogressmeta = append(inprogressmeta, i) +	// there will be exactly as many sends to donec or inprogressc +	// as there are prefixes +	for range prefixes { +		select { +		case i := <-donec: +			donemeta = append(donemeta, i) +		case i := <-inprogressc: +			inprogressmeta = append(inprogressmeta, i) +		case err = <-errc: +			return inprogress, done, err +		}  	}  	sort.Sort(donemeta) | 
