summaryrefslogtreecommitdiff
path: root/cmd/lspipeline
diff options
context:
space:
mode:
authorNick White <git@njw.name>2021-01-26 14:17:19 +0000
committerNick White <git@njw.name>2021-01-26 14:17:19 +0000
commit54150b54cd06e3deba44e73b151070b74a4d8e76 (patch)
tree894aecdcfe887a71dd40e35dfd4a877eb5a6beca /cmd/lspipeline
parent86cc5d6c921ac05e0d08f66b205b51e1f5adb938 (diff)
Improve lspipeline concurrency by removing WaitGroup stuff
Diffstat (limited to 'cmd/lspipeline')
-rw-r--r--cmd/lspipeline/main.go52
1 files changed, 21 insertions, 31 deletions
diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go
index bf19b52..8980c59 100644
--- a/cmd/lspipeline/main.go
+++ b/cmd/lspipeline/main.go
@@ -12,7 +12,6 @@ import (
"os/exec"
"sort"
"strings"
- "sync"
"time"
"rescribe.xyz/bookpipeline"
@@ -124,11 +123,10 @@ func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, er
return objs[0].Date, false, nil
}
-// getBookDetailsWg gets the details for a book putting it into either the
-// done or inprogress channels as appropriate, and using a sync.WaitGroup Done
-// signal so it can be tracked. On error it sends to the errc channel.
-func getBookDetailsWg(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error, wg *sync.WaitGroup) {
- defer wg.Done()
+// getBookDetailsChan gets the details for a book putting it into either the
+// done or inprogress channels as appropriate, or sending an error to errc
+// on failure.
+func getBookDetailsChan(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error) {
date, isdone, err := getBookDetails(conn, key)
if err != nil {
errc <- err
@@ -148,6 +146,8 @@ func getBookDetailsWg(conn LsPipeliner, key string, done chan bookpipeline.ObjMe
// list), and those which do not (the inprogress list). They are
// sorted according to the date of the graph.png file, or the date
// of a random file with the prefix if no graph.png was found.
+// It spins up many goroutines to do query the book status and
+// dates, as it is far faster to do concurrently.
func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) {
prefixes, err := conn.ListObjectPrefixes(conn.WIPStorageId())
if err != nil {
@@ -155,37 +155,27 @@ func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err er
return
}
- // 100,000 size buffer is to ensure we never block, as we're using waitgroup
- // rather than channel blocking to determine when to continue. Probably there
- // is a better way to do this, though, like reading the done and inprogress
- // channels in a goroutine and doing wg.Done() when each is read there instead.
- donec := make(chan bookpipeline.ObjMeta, 100000)
- inprogressc := make(chan bookpipeline.ObjMeta, 100000)
- errc := make(chan error, 100000)
+ donec := make(chan bookpipeline.ObjMeta, 100)
+ inprogressc := make(chan bookpipeline.ObjMeta, 100)
+ errc := make(chan error)
- var wg sync.WaitGroup
for _, p := range prefixes {
- wg.Add(1)
- go getBookDetailsWg(conn, p, donec, inprogressc, errc, &wg)
- }
- wg.Wait()
- close(donec)
- close(inprogressc)
-
- select {
- case err = <-errc:
- return inprogress, done, err
- default:
- break
+ go getBookDetailsChan(conn, p, donec, inprogressc, errc)
}
var inprogressmeta, donemeta ObjMetas
- for i := range donec {
- donemeta = append(donemeta, i)
- }
- for i := range inprogressc {
- inprogressmeta = append(inprogressmeta, i)
+ // there will be exactly as many sends to donec or inprogressc
+ // as there are prefixes
+ for range prefixes {
+ select {
+ case i := <-donec:
+ donemeta = append(donemeta, i)
+ case i := <-inprogressc:
+ inprogressmeta = append(inprogressmeta, i)
+ case err = <-errc:
+ return inprogress, done, err
+ }
}
sort.Sort(donemeta)