summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick White <git@njw.name>2019-08-20 16:26:02 +0100
committerNick White <git@njw.name>2019-08-20 16:26:02 +0100
commit522e1481f5544362027b006d5fe34609f3d366bc (patch)
tree761c8edd1d0440e3f88db9f2c0b6c3d205726fef
parent9f588a71e9a2d7ad179890d0fc19372fae047b04 (diff)
Substantially improve problematic object listing part of API
Switch to regular non-concurrent stuff, concurrency is better handled by the main program anyway. Now we handle errors properly, and things are way simpler.
-rw-r--r--pipelinepreprocess/aws.go46
-rw-r--r--pipelinepreprocess/main.go28
2 files changed, 38 insertions, 36 deletions
diff --git a/pipelinepreprocess/aws.go b/pipelinepreprocess/aws.go
index 75bf81c..1ac06de 100644
--- a/pipelinepreprocess/aws.go
+++ b/pipelinepreprocess/aws.go
@@ -142,56 +142,54 @@ func (a *awsConn) OCRQueueHeartbeat(t *time.Ticker, msgHandle string) error {
return a.QueueHeartbeat(t, msgHandle, a.ocrqurl)
}
-func (a *awsConn) ListObjects(bucket string, prefix string, names chan string) {
+func (a *awsConn) ListObjects(bucket string, prefix string) ([]string, error) {
+ var names []string
err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
}, func(page *s3.ListObjectsV2Output, last bool) bool {
for _, r := range page.Contents {
- names <- *r.Key
+ names = append(names, *r.Key)
}
return true
})
- close(names)
- if err != nil {
- // TODO: handle error properly
- log.Println("Error getting objects")
- }
+ return names, err
}
-func (a *awsConn) ListToPreprocess(bookname string, names chan string) error {
- objs := make(chan string)
+func (a *awsConn) ListToPreprocess(bookname string) ([]string, error) {
+ var names []string
preprocessed := regexp.MustCompile(PreprocPattern)
- go a.ListObjects("rescribeinprogress", bookname, objs)
+ objs, err := a.ListObjects("rescribeinprogress", bookname)
+ if err != nil {
+ return names, err
+ }
// Filter out any object that looks like it's already been preprocessed
- for n := range objs {
+ for _, n := range objs {
if preprocessed.MatchString(n) {
a.logger.Println("Skipping item that looks like it has already been processed", n)
continue
}
- names <- n
+ names = append(names, n)
}
- close(names)
- // TODO: handle errors from ListObjects
- return nil
+ return names, nil
}
-func (a *awsConn) ListToOCR(bookname string, names chan string) error {
- objs := make(chan string)
+func (a *awsConn) ListToOCR(bookname string) ([]string, error) {
+ var names []string
preprocessed := regexp.MustCompile(PreprocPattern)
- go a.ListObjects("rescribeinprogress", bookname, objs)
- a.logger.Println("Completed running listobjects")
+ objs, err := a.ListObjects("rescribeinprogress", bookname)
+ if err != nil {
+ return names, err
+ }
// Filter out any object that looks like it hasn't already been preprocessed
- for n := range objs {
+ for _, n := range objs {
if ! preprocessed.MatchString(n) {
a.logger.Println("Skipping item that looks like it is not preprocessed", n)
continue
}
- names <- n
+ names = append(names, n)
}
- close(names)
- // TODO: handle errors from ListObjects
- return nil
+ return names, nil
}
func (a *awsConn) AddToQueue(url string, msg string) error {
diff --git a/pipelinepreprocess/main.go b/pipelinepreprocess/main.go
index 61dec96..b513f92 100644
--- a/pipelinepreprocess/main.go
+++ b/pipelinepreprocess/main.go
@@ -32,8 +32,7 @@ const PauseBetweenChecks = 60 * time.Second
type Clouder interface {
Init() error
- //ListObjects(bucket string, prefix string, names chan string) error
- ListObjects(bucket string, prefix string, names chan string)
+ ListObjects(bucket string, prefix string) ([]string, error)
Download(bucket string, key string, fn string) error
Upload(bucket string, key string, path string) error
CheckQueue(url string) (Qmsg, error)
@@ -44,8 +43,8 @@ type Clouder interface {
type Pipeliner interface {
Clouder
- ListToPreprocess(bookname string, names chan string) error
- ListToOCR(bookname string, names chan string) error
+ ListToPreprocess(bookname string) ([]string, error)
+ ListToOCR(bookname string) ([]string, error)
DownloadFromInProgress(key string, fn string) error
UploadToInProgress(key string, path string) error
CheckPreQueue() (Qmsg, error)
@@ -149,12 +148,15 @@ func preProcBook(msg Qmsg, conn Pipeliner) {
go up(upc, done, conn, bookname)
conn.Logger().Println("Getting list of objects to download")
- err = conn.ListToPreprocess(bookname, dl)
+ todl, err := conn.ListToPreprocess(bookname)
if err != nil {
log.Println("Failed to get list of files for book", bookname, err)
t.Stop()
return
}
+ for _, d := range todl {
+ dl <- d
+ }
// wait for the done channel to be posted to
<-done
@@ -206,13 +208,15 @@ func ocrBook(msg Qmsg, conn Pipeliner) {
go up(upc, done, conn, bookname)
conn.Logger().Println("Getting list of objects to download")
- go conn.ListToOCR(bookname, dl)
- //err = conn.ListToOCR(bookname, dl)
- //if err != nil {
- // log.Println("Failed to get list of files for book", bookname, err)
- // t.Stop()
- // return
- //}
+ todl, err := conn.ListToOCR(bookname)
+ if err != nil {
+ log.Println("Failed to get list of files for book", bookname, err)
+ t.Stop()
+ return
+ }
+ for _, d := range todl {
+ dl <- d
+ }
// wait for the done channel to be posted to
<-done