diff options
author | Nick White <git@njw.name> | 2019-08-20 16:26:02 +0100 |
---|---|---|
committer | Nick White <git@njw.name> | 2019-08-20 16:26:02 +0100 |
commit | 522e1481f5544362027b006d5fe34609f3d366bc (patch) | |
tree | 761c8edd1d0440e3f88db9f2c0b6c3d205726fef | |
parent | 9f588a71e9a2d7ad179890d0fc19372fae047b04 (diff) |
Substantially improve problematic object listing part of API
Switch to regular non-concurrent stuff, concurrency is better handled
by the main program anyway. Now we handle errors properly, and things
are way simpler.
-rw-r--r-- | pipelinepreprocess/aws.go | 46 | ||||
-rw-r--r-- | pipelinepreprocess/main.go | 28 |
2 files changed, 38 insertions, 36 deletions
diff --git a/pipelinepreprocess/aws.go b/pipelinepreprocess/aws.go index 75bf81c..1ac06de 100644 --- a/pipelinepreprocess/aws.go +++ b/pipelinepreprocess/aws.go @@ -142,56 +142,54 @@ func (a *awsConn) OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error { return a.QueueHeartbeat(t, msgHandle, a.ocrqurl) } -func (a *awsConn) ListObjects(bucket string, prefix string, names chan string) { +func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) { + var names []string err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ Bucket: aws.String(bucket), Prefix: aws.String(prefix), }, func(page *s3.ListObjectsV2Output, last bool) bool { for _, r := range page.Contents { - names <- *r.Key + names = append(names, *r.Key) } return true }) - close(names) - if err != nil { - // TODO: handle error properly - log.Println("Error getting objects") - } + return names, err } -func (a *awsConn) ListToPreprocess(bookname string, names chan string) error { - objs := make(chan string) +func (a *awsConn) ListToPreprocess(bookname string) ([]string, error) { + var names []string preprocessed := regexp.MustCompile(PreprocPattern) - go a.ListObjects("rescribeinprogress", bookname, objs) + objs, err := a.ListObjects("rescribeinprogress", bookname) + if err != nil { + return names, err + } // Filter out any object that looks like it's already been preprocessed - for n := range objs { + for _, n := range objs { if preprocessed.MatchString(n) { a.logger.Println("Skipping item that looks like it has already been processed", n) continue } - names <- n + names = append(names, n) } - close(names) - // TODO: handle errors from ListObjects - return nil + return names, nil } -func (a *awsConn) ListToOCR(bookname string, names chan string) error { - objs := make(chan string) +func (a *awsConn) ListToOCR(bookname string) ([]string, error) { + var names []string preprocessed := regexp.MustCompile(PreprocPattern) - go a.ListObjects("rescribeinprogress", bookname, objs) - a.logger.Println("Completed running listobjects") + objs, err := a.ListObjects("rescribeinprogress", bookname) + if err != nil { + return names, err + } // Filter out any object that looks like it hasn't already been preprocessed - for n := range objs { + for _, n := range objs { if ! preprocessed.MatchString(n) { a.logger.Println("Skipping item that looks like it is not preprocessed", n) continue } - names <- n + names = append(names, n) } - close(names) - // TODO: handle errors from ListObjects - return nil + return names, nil } func (a *awsConn) AddToQueue(url string, msg string) error { diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go index 61dec96..b513f92 100644 --- a/pipelinepreprocess/main.go +++ b/pipelinepreprocess/main.go @@ -32,8 +32,7 @@ const PauseBetweenChecks = 60 * time.Second type Clouder interface { Init() error - //ListObjects(bucket string, prefix string, names chan string) error - ListObjects(bucket string, prefix string, names chan string) + ListObjects(bucket string, prefix string) ([]string, error) Download(bucket string, key string, fn string) error Upload(bucket string, key string, path string) error CheckQueue(url string) (Qmsg, error) @@ -44,8 +43,8 @@ type Clouder interface { type Pipeliner interface { Clouder - ListToPreprocess(bookname string, names chan string) error - ListToOCR(bookname string, names chan string) error + ListToPreprocess(bookname string) ([]string, error) + ListToOCR(bookname string) ([]string, error) DownloadFromInProgress(key string, fn string) error UploadToInProgress(key string, path string) error CheckPreQueue() (Qmsg, error) @@ -149,12 +148,15 @@ func preProcBook(msg Qmsg, conn Pipeliner) { go up(upc, done, conn, bookname) conn.Logger().Println("Getting list of objects to download") - err = conn.ListToPreprocess(bookname, dl) + todl, err := conn.ListToPreprocess(bookname) if err != nil { log.Println("Failed to get list of files for book", bookname, err) t.Stop() return } + for _, d := range todl { + dl <- d + } // wait for the done channel to be posted to <-done @@ -206,13 +208,15 @@ func ocrBook(msg Qmsg, conn Pipeliner) { go up(upc, done, conn, bookname) conn.Logger().Println("Getting list of objects to download") - go conn.ListToOCR(bookname, dl) - //err = conn.ListToOCR(bookname, dl) - //if err != nil { - // log.Println("Failed to get list of files for book", bookname, err) - // t.Stop() - // return - //} + todl, err := conn.ListToOCR(bookname) + if err != nil { + log.Println("Failed to get list of files for book", bookname, err) + t.Stop() + return + } + for _, d := range todl { + dl <- d + } // wait for the done channel to be posted to <-done |