path: root/cmd/lspipeline/main.go
diff options
authorNick White <>2021-01-26 13:50:02 +0000
committerNick White <>2021-01-26 13:50:02 +0000
commit86cc5d6c921ac05e0d08f66b205b51e1f5adb938 (patch)
treea32226f1fb3969336f6701b389ee0ce186df6adc /cmd/lspipeline/main.go
parent670d5c1b74f2fa4683bfe7e2d9b1baee14db9104 (diff)
Speed up lspipeline by making s3 requests concurrently and only processing single results from ListObjects requests
Diffstat (limited to 'cmd/lspipeline/main.go')
1 files changed, 77 insertions, 18 deletions
diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go
index b649778..bf19b52 100644
--- a/cmd/lspipeline/main.go
+++ b/cmd/lspipeline/main.go
@@ -12,6 +12,8 @@ import (
+ "sync"
+ "time"
@@ -100,6 +102,46 @@ func (o ObjMetas) Less(i, j int) bool {
return o[i].Date.Before(o[j].Date)
+// getBookDetails determines whether a book is done and what date
+// it was completed, or if it has not finished, the date of any
+// book file.
+func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, err error) {
+ // First try to get the graph.png file from the book, which marks
+ // it as done
+ objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), key+"graph.png")
+ if err == nil && len(objs) > 0 {
+ return objs[0].Date, true, nil
+ }
+ // Otherwise get any file from the book to get a date to sort by
+ objs, err = conn.ListObjectsWithMeta(conn.WIPStorageId(), key)
+ if err != nil {
+ return time.Time{}, false, err
+ }
+ if len(objs) == 0 {
+ return time.Time{}, false, fmt.Errorf("No files found for book %s", key)
+ }
+ return objs[0].Date, false, nil
+// getBookDetailsWg gets the details for a book putting it into either the
+// done or inprogress channels as appropriate, and using a sync.WaitGroup Done
+// signal so it can be tracked. On error it sends to the errc channel.
+func getBookDetailsWg(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error, wg *sync.WaitGroup) {
+ defer wg.Done()
+ date, isdone, err := getBookDetails(conn, key)
+ if err != nil {
+ errc <- err
+ return
+ }
+ meta := bookpipeline.ObjMeta{Name: strings.TrimSuffix(key, "/"), Date: date}
+ if isdone {
+ done <- meta
+ } else {
+ inprogress <- meta
+ }
// getBookStatus returns a list of in progress and done books.
// It determines this by finding all prefixes, and splitting them
// into two lists, those which have a 'graph.png' file (the done
@@ -108,35 +150,52 @@ func (o ObjMetas) Less(i, j int) bool {
// of a random file with the prefix if no graph.png was found.
func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) {
prefixes, err := conn.ListObjectPrefixes(conn.WIPStorageId())
- var inprogressmeta, donemeta ObjMetas
if err != nil {
log.Println("Error getting object prefixes:", err)
- // Search for graph.png to determine done books (and save the date of it to sort with)
+ // 100,000 size buffer is to ensure we never block, as we're using waitgroup
+ // rather than channel blocking to determine when to continue. Probably there
+ // is a better way to do this, though, like reading the done and inprogress
+ // channels in a goroutine and doing wg.Done() when each is read there instead.
+ donec := make(chan bookpipeline.ObjMeta, 100000)
+ inprogressc := make(chan bookpipeline.ObjMeta, 100000)
+ errc := make(chan error, 100000)
+ var wg sync.WaitGroup
for _, p := range prefixes {
- objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), p+"graph.png")
- if err != nil || len(objs) == 0 {
- inprogressmeta = append(inprogressmeta, bookpipeline.ObjMeta{Name: p})
- } else {
- donemeta = append(donemeta, bookpipeline.ObjMeta{Name: p, Date: objs[0].Date})
- }
+ wg.Add(1)
+ go getBookDetailsWg(conn, p, donec, inprogressc, errc, &wg)
- // Get a random file from the inprogress list to get a date to sort by
- for _, i := range inprogressmeta {
- objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), i.Name)
- if err != nil || len(objs) == 0 {
- continue
- }
- i.Date = objs[0].Date
+ wg.Wait()
+ close(donec)
+ close(inprogressc)
+ select {
+ case err = <-errc:
+ return inprogress, done, err
+ default:
+ break
+ }
+ var inprogressmeta, donemeta ObjMetas
+ for i := range donec {
+ donemeta = append(donemeta, i)
+ }
+ for i := range inprogressc {
+ inprogressmeta = append(inprogressmeta, i)
+ sort.Sort(inprogressmeta)
for _, i := range donemeta {
- done = append(done, strings.TrimSuffix(i.Name, "/"))
+ done = append(done, i.Name)
- sort.Sort(inprogressmeta)
for _, i := range inprogressmeta {
- inprogress = append(inprogress, strings.TrimSuffix(i.Name, "/"))
+ inprogress = append(inprogress, i.Name)