From 86cc5d6c921ac05e0d08f66b205b51e1f5adb938 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 26 Jan 2021 13:50:02 +0000 Subject: Speed up lspipeline by making s3 requests concurrently and only processing single results from ListObjects requests --- cmd/lspipeline/main.go | 95 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 77 insertions(+), 18 deletions(-) (limited to 'cmd/lspipeline/main.go') diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index b649778..bf19b52 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -12,6 +12,8 @@ import ( "os/exec" "sort" "strings" + "sync" + "time" "rescribe.xyz/bookpipeline" ) @@ -100,6 +102,46 @@ func (o ObjMetas) Less(i, j int) bool { return o[i].Date.Before(o[j].Date) } +// getBookDetails determines whether a book is done and what date +// it was completed, or if it has not finished, the date of any +// book file. +func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, err error) { + // First try to get the graph.png file from the book, which marks + // it as done + objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), key+"graph.png") + if err == nil && len(objs) > 0 { + return objs[0].Date, true, nil + } + + // Otherwise get any file from the book to get a date to sort by + objs, err = conn.ListObjectsWithMeta(conn.WIPStorageId(), key) + if err != nil { + return time.Time{}, false, err + } + if len(objs) == 0 { + return time.Time{}, false, fmt.Errorf("No files found for book %s", key) + } + return objs[0].Date, false, nil +} + +// getBookDetailsWg gets the details for a book putting it into either the +// done or inprogress channels as appropriate, and using a sync.WaitGroup Done +// signal so it can be tracked. On error it sends to the errc channel. +func getBookDetailsWg(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error, wg *sync.WaitGroup) { + defer wg.Done() + date, isdone, err := getBookDetails(conn, key) + if err != nil { + errc <- err + return + } + meta := bookpipeline.ObjMeta{Name: strings.TrimSuffix(key, "/"), Date: date} + if isdone { + done <- meta + } else { + inprogress <- meta + } +} + // getBookStatus returns a list of in progress and done books. // It determines this by finding all prefixes, and splitting them // into two lists, those which have a 'graph.png' file (the done @@ -108,35 +150,52 @@ func (o ObjMetas) Less(i, j int) bool { // of a random file with the prefix if no graph.png was found. func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) { prefixes, err := conn.ListObjectPrefixes(conn.WIPStorageId()) - var inprogressmeta, donemeta ObjMetas if err != nil { log.Println("Error getting object prefixes:", err) return } - // Search for graph.png to determine done books (and save the date of it to sort with) + + // 100,000 size buffer is to ensure we never block, as we're using waitgroup + // rather than channel blocking to determine when to continue. Probably there + // is a better way to do this, though, like reading the done and inprogress + // channels in a goroutine and doing wg.Done() when each is read there instead. + donec := make(chan bookpipeline.ObjMeta, 100000) + inprogressc := make(chan bookpipeline.ObjMeta, 100000) + errc := make(chan error, 100000) + + var wg sync.WaitGroup for _, p := range prefixes { - objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), p+"graph.png") - if err != nil || len(objs) == 0 { - inprogressmeta = append(inprogressmeta, bookpipeline.ObjMeta{Name: p}) - } else { - donemeta = append(donemeta, bookpipeline.ObjMeta{Name: p, Date: objs[0].Date}) - } + wg.Add(1) + go getBookDetailsWg(conn, p, donec, inprogressc, errc, &wg) } - // Get a random file from the inprogress list to get a date to sort by - for _, i := range inprogressmeta { - objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), i.Name) - if err != nil || len(objs) == 0 { - continue - } - i.Date = objs[0].Date + wg.Wait() + close(donec) + close(inprogressc) + + select { + case err = <-errc: + return inprogress, done, err + default: + break + } + + var inprogressmeta, donemeta ObjMetas + + for i := range donec { + donemeta = append(donemeta, i) + } + for i := range inprogressc { + inprogressmeta = append(inprogressmeta, i) } + sort.Sort(donemeta) + sort.Sort(inprogressmeta) + for _, i := range donemeta { - done = append(done, strings.TrimSuffix(i.Name, "/")) + done = append(done, i.Name) } - sort.Sort(inprogressmeta) for _, i := range inprogressmeta { - inprogress = append(inprogress, strings.TrimSuffix(i.Name, "/")) + inprogress = append(inprogress, i.Name) } return -- cgit v1.2.1-24-ge1ad From 54150b54cd06e3deba44e73b151070b74a4d8e76 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 26 Jan 2021 14:17:19 +0000 Subject: Improve lspipeline concurrency by removing WaitGroup stuff --- cmd/lspipeline/main.go | 52 ++++++++++++++++++++------------------------------ 1 file changed, 21 insertions(+), 31 deletions(-) (limited to 'cmd/lspipeline/main.go') diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index bf19b52..8980c59 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -12,7 +12,6 @@ import ( "os/exec" "sort" "strings" - "sync" "time" "rescribe.xyz/bookpipeline" @@ -124,11 +123,10 @@ func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, er return objs[0].Date, false, nil } -// getBookDetailsWg gets the details for a book putting it into either the -// done or inprogress channels as appropriate, and using a sync.WaitGroup Done -// signal so it can be tracked. On error it sends to the errc channel. -func getBookDetailsWg(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error, wg *sync.WaitGroup) { - defer wg.Done() +// getBookDetailsChan gets the details for a book putting it into either the +// done or inprogress channels as appropriate, or sending an error to errc +// on failure. +func getBookDetailsChan(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error) { date, isdone, err := getBookDetails(conn, key) if err != nil { errc <- err @@ -148,6 +146,8 @@ func getBookDetailsWg(conn LsPipeliner, key string, done chan bookpipeline.ObjMe // list), and those which do not (the inprogress list). They are // sorted according to the date of the graph.png file, or the date // of a random file with the prefix if no graph.png was found. +// It spins up many goroutines to do query the book status and +// dates, as it is far faster to do concurrently. func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) { prefixes, err := conn.ListObjectPrefixes(conn.WIPStorageId()) if err != nil { @@ -155,37 +155,27 @@ func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err er return } - // 100,000 size buffer is to ensure we never block, as we're using waitgroup - // rather than channel blocking to determine when to continue. Probably there - // is a better way to do this, though, like reading the done and inprogress - // channels in a goroutine and doing wg.Done() when each is read there instead. - donec := make(chan bookpipeline.ObjMeta, 100000) - inprogressc := make(chan bookpipeline.ObjMeta, 100000) - errc := make(chan error, 100000) + donec := make(chan bookpipeline.ObjMeta, 100) + inprogressc := make(chan bookpipeline.ObjMeta, 100) + errc := make(chan error) - var wg sync.WaitGroup for _, p := range prefixes { - wg.Add(1) - go getBookDetailsWg(conn, p, donec, inprogressc, errc, &wg) - } - wg.Wait() - close(donec) - close(inprogressc) - - select { - case err = <-errc: - return inprogress, done, err - default: - break + go getBookDetailsChan(conn, p, donec, inprogressc, errc) } var inprogressmeta, donemeta ObjMetas - for i := range donec { - donemeta = append(donemeta, i) - } - for i := range inprogressc { - inprogressmeta = append(inprogressmeta, i) + // there will be exactly as many sends to donec or inprogressc + // as there are prefixes + for range prefixes { + select { + case i := <-donec: + donemeta = append(donemeta, i) + case i := <-inprogressc: + inprogressmeta = append(inprogressmeta, i) + case err = <-errc: + return inprogress, done, err + } } sort.Sort(donemeta) -- cgit v1.2.1-24-ge1ad From 5c3cee66a90ce6ef87e125b3bf011a6903d38083 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 26 Jan 2021 14:56:10 +0000 Subject: Make ListObjectsWithMeta generic again and create a specialised ListObjectWithMeta for single file listing, so we can still be as fast, but do not have a misleading api --- cmd/lspipeline/main.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) (limited to 'cmd/lspipeline/main.go') diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index 8980c59..131ff12 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -36,7 +36,7 @@ type LsPipeliner interface { AnalyseQueueId() string GetQueueDetails(url string) (string, string, error) GetInstanceDetails() ([]bookpipeline.InstanceDetails, error) - ListObjectsWithMeta(bucket string, prefix string) ([]bookpipeline.ObjMeta, error) + ListObjectWithMeta(bucket string, prefix string) (bookpipeline.ObjMeta, error) ListObjectPrefixes(bucket string) ([]string, error) WIPStorageId() string } @@ -107,20 +107,17 @@ func (o ObjMetas) Less(i, j int) bool { func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, err error) { // First try to get the graph.png file from the book, which marks // it as done - objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), key+"graph.png") - if err == nil && len(objs) > 0 { - return objs[0].Date, true, nil + obj, err := conn.ListObjectWithMeta(conn.WIPStorageId(), key+"graph.png") + if err == nil { + return obj.Date, true, nil } // Otherwise get any file from the book to get a date to sort by - objs, err = conn.ListObjectsWithMeta(conn.WIPStorageId(), key) + obj, err = conn.ListObjectWithMeta(conn.WIPStorageId(), key) if err != nil { return time.Time{}, false, err } - if len(objs) == 0 { - return time.Time{}, false, fmt.Errorf("No files found for book %s", key) - } - return objs[0].Date, false, nil + return obj.Date, false, nil } // getBookDetailsChan gets the details for a book putting it into either the -- cgit v1.2.1-24-ge1ad