summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--aws.go16
-rw-r--r--cloudsettings.go1
-rw-r--r--cmd/getandpurgequeue/main.go3
-rw-r--r--internal/pipeline/pipeline.go7
-rw-r--r--internal/pipeline/pipeline_test.go154
-rw-r--r--local.go5
6 files changed, 180 insertions, 6 deletions
diff --git a/aws.go b/aws.go
index 65671fa..6e0d7f8 100644
--- a/aws.go
+++ b/aws.go
@@ -51,6 +51,7 @@ type AwsConn struct {
downloader *s3manager.Downloader
uploader *s3manager.Uploader
wipequrl, prequrl, ocrpgqurl, analysequrl string
+ testqurl string
wipstorageid string
}
@@ -125,6 +126,15 @@ func (a *AwsConn) Init() error {
}
a.analysequrl = *result.QueueUrl
+ a.Logger.Println("Getting test queue URL")
+ result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String(queueTest),
+ })
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error getting test queue URL: %s", err))
+ }
+ a.testqurl = *result.QueueUrl
+
return nil
}
@@ -336,6 +346,10 @@ func (a *AwsConn) WIPStorageId() string {
return a.wipstorageid
}
+func (a *AwsConn) TestQueueId() string {
+ return a.testqurl
+}
+
func (a *AwsConn) ListObjects(bucket string, prefix string) ([]string, error) {
var names []string
err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{
@@ -595,7 +609,7 @@ func (a *AwsConn) Log(v ...interface{}) {
// TODO: also set up the necessary security group and iam stuff
func (a *AwsConn) MkPipeline() error {
buckets := []string{storageWip}
- queues := []string{queuePreProc, queueWipeOnly, queueAnalyse, queueOcrPage}
+ queues := []string{queuePreProc, queueWipeOnly, queueAnalyse, queueOcrPage, queueTest}
for _, bucket := range buckets {
err := a.CreateBucket(bucket)
diff --git a/cloudsettings.go b/cloudsettings.go
index ed23d5d..fa55742 100644
--- a/cloudsettings.go
+++ b/cloudsettings.go
@@ -30,6 +30,7 @@ const (
queueWipeOnly = "rescribewipeonly"
queueOcrPage = "rescribeocrpage"
queueAnalyse = "rescribeanalyse"
+ queueTest = "rescribetest1"
)
// Storage bucket names. Can be anything unique in S3.
diff --git a/cmd/getandpurgequeue/main.go b/cmd/getandpurgequeue/main.go
index 33aef60..e5466d3 100644
--- a/cmd/getandpurgequeue/main.go
+++ b/cmd/getandpurgequeue/main.go
@@ -25,6 +25,7 @@ Valid queue names:
- wipeonly
- ocrpage
- analyse
+- test
`
type QueuePipeliner interface {
@@ -34,6 +35,7 @@ type QueuePipeliner interface {
WipeQueueId() string
OCRPageQueueId() string
AnalyseQueueId() string
+ TestQueueId() string
}
func main() {
@@ -63,6 +65,7 @@ func main() {
{conn.WipeQueueId(), "wipeonly"},
{conn.OCRPageQueueId(), "ocrpage"},
{conn.AnalyseQueueId(), "analyse"},
+ {conn.TestQueueId(), "test"},
}
qname := flag.Arg(0)
diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go
index 3836204..3419f74 100644
--- a/internal/pipeline/pipeline.go
+++ b/internal/pipeline/pipeline.go
@@ -48,6 +48,7 @@ type Pipeliner interface {
WipeQueueId() string
OCRPageQueueId() string
AnalyseQueueId() string
+ TestQueueId() string
WIPStorageId() string
GetLogger() *log.Logger
Log(v ...interface{})
@@ -129,6 +130,12 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha
done <- true
}
+// upAndQueue reads file names from a channel and uploads them with
+// the bookname/ prefix, removing the local copy of each file
+// once it has been successfully uploaded. Each done file name is
+// added to the toQueue once it has been uploaded. The done channel
+// is then written to to signal completion. If an error occurs it
+// is sent to the errc channel and the function returns early.
func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) {
for path := range c {
name := filepath.Base(path)
diff --git a/internal/pipeline/pipeline_test.go b/internal/pipeline/pipeline_test.go
index 2f7c626..dfcb8a3 100644
--- a/internal/pipeline/pipeline_test.go
+++ b/internal/pipeline/pipeline_test.go
@@ -17,9 +17,6 @@ import (
"testing"
)
-// TODO: use random file names so these tests can be safely
-// run in parallel
-
// StrLog is a simple logger that saves to a string,
// so it can be printed out only when needed.
type StrLog struct {
@@ -148,7 +145,7 @@ func Test_download(t *testing.T) {
t.Fatalf("Could not remove temporary download file %s: %v\nLog: %s", tempFile, err, slog.log)
}
- err = os.Remove(tempDir)
+ err = os.RemoveAll(tempDir)
if err != nil {
t.Fatalf("Could not remove temporary download directory %s: %v\nLog: %s", tempDir, err, slog.log)
}
@@ -266,7 +263,154 @@ func Test_up(t *testing.T) {
t.Fatalf("Could not remove temporary download file %s: %v\nLog: %s", tempFile, err, slog.log)
}
- err = os.Remove(tempDir)
+ err = os.RemoveAll(tempDir)
+ if err != nil {
+ t.Fatalf("Could not remove temporary download directory %s: %v\nLog: %s", tempDir, err, slog.log)
+ }
+ })
+ }
+ }
+}
+
+// Test_upAndQueue tests the upAndQueue() function inside the pipeline
+func Test_upAndQueue(t *testing.T) {
+ var slog StrLog
+ vlog := log.New(&slog, "", 0)
+
+ var conns []connection
+
+ conns = append(conns, connection{name: "local", c: &bookpipeline.LocalConn{Logger: vlog}})
+
+ if !testing.Short() {
+ conns = append(conns, connection{name: "aws", c: &bookpipeline.AwsConn{Logger: vlog}})
+ }
+
+ cases := []struct {
+ ul string
+ contents []byte
+ process string
+ errs []error
+ } {
+ {"notpresent", []byte(""), "", []error{errors.New("no such file or directory"), errors.New("NoSuchKey: The specified key does not exist")}},
+ {"empty", []byte{}, "empty", []error{}},
+ {"justastring", []byte("I am just a basic string"), "justastring", []error{}},
+ }
+
+ for _, conn := range conns {
+ for _, c := range cases {
+ t.Run(fmt.Sprintf("%s/%s", conn.name, c.ul), func(t *testing.T) {
+ err := conn.c.Init()
+ if err != nil {
+ t.Fatalf("Could not initialise %s connection: %v\nLog: %s", conn.name, err, slog.log)
+ }
+ slog.log = ""
+ tempDir := filepath.Join(os.TempDir(), "pipelinetest")
+ err = os.MkdirAll(tempDir, 0700)
+ if err != nil && ! os.IsExist(err) {
+ t.Fatalf("Could not create temporary directory %s: %v\nLog: %s", tempDir, err, slog.log)
+ }
+
+ // create test file
+ tempFile := filepath.Join(tempDir, c.ul)
+ if c.ul != "notpresent" {
+ err = ioutil.WriteFile(tempFile, c.contents, 0600)
+ if err != nil {
+ t.Fatalf("Could not create temporary file %s: %v\nLog: %s", tempFile, err, slog.log)
+ }
+ }
+
+ // upload
+ ulchan := make(chan string)
+ queueurl := conn.c.TestQueueId()
+ donechan := make(chan bool)
+ errchan := make(chan error)
+
+ go upAndQueue(ulchan, donechan, queueurl, conn.c, "pipelinetest", "test", errchan, vlog)
+
+ ulchan <- filepath.Join(tempDir, c.ul)
+ close(ulchan)
+
+ // check all is as expected
+ select {
+ case err = <-errchan:
+ if len(c.errs) == 0 {
+ t.Fatalf("Received an error when one was not expected, error: %v\nLog: %s", err, slog.log)
+ }
+ expectedErrFound := 0
+ for _, v := range c.errs {
+ if strings.Contains(err.Error(), v.Error()) {
+ expectedErrFound = 1
+ }
+ }
+ if expectedErrFound == 0 {
+ t.Fatalf("Received a different error than was expected, expected one of: %v, got %v\nLog: %s", c.errs, err, slog.log)
+ }
+ case <-donechan:
+ }
+
+ msg, err := conn.c.CheckQueue(queueurl, 10)
+ if err != nil {
+ t.Fatalf("Error checking test queue: %v", err)
+ }
+
+ if c.ul == "notpresent" {
+ if msg.Handle != "" {
+ _ = conn.c.DelFromQueue(queueurl, msg.Handle)
+ t.Fatalf("Queue was written to even when an error was received: %s", msg.Body)
+ }
+ // for the "notpresent" case we can skip the following
+ // checks for the file being present and cleanup, having
+ // already successfully detected that the appropriate error
+ // was received by errchan
+ return
+ }
+
+ _, err = os.Stat(tempFile)
+ if os.IsExist(err) {
+ t.Fatalf("Uploaded file not removed as it should have been after uploading %s: %v\nLog: %s", tempFile, err, slog.log)
+ }
+
+ err = conn.c.Download(conn.c.WIPStorageId(), "pipelinetest/" + c.ul, tempFile)
+ if err != nil {
+ t.Fatalf("Could not download file %s: %v\nLog: %s", tempFile, err, slog.log)
+ }
+
+ dled, err := ioutil.ReadFile(tempFile)
+ if err != nil {
+ t.Fatalf("Could not read downloaded file %s: %v\nLog: %s", tempFile, err, slog.log)
+ }
+
+ if !bytes.Equal(dled, c.contents) {
+ t.Fatalf("Uploaded file differs from expected, expected: '%s', got '%s'\nLog: %s", c.contents, dled, slog.log)
+ }
+
+ target, err := filepath.Rel(os.TempDir(), filepath.Join(tempDir, c.ul))
+ if err != nil {
+ t.Fatalf("Error removing TempDir prefix: %v", err)
+ }
+ queueExpected := target + " test"
+ if msg.Body != queueExpected {
+ _ = conn.c.DelFromQueue(queueurl, msg.Handle)
+ t.Fatalf("Queue contents not as expected, expected: '%s', got '%s'\nLog: %s", queueExpected, msg.Body, slog.log)
+ }
+
+ // cleanup
+ err = conn.c.DeleteObjects(conn.c.WIPStorageId(), []string{"pipelinetest/" + c.ul})
+ if err != nil {
+ t.Fatalf("Could not delete storage object used for test %s: %v\nLog: %s", c.ul, err, slog.log)
+ }
+
+ err = conn.c.DelFromQueue(queueurl, msg.Handle)
+ if err != nil {
+ t.Fatalf("Could not delete test message from queue: %v\nLog: %s", err, slog.log)
+ }
+
+ err = os.Remove(tempFile)
+ if err != nil {
+ t.Fatalf("Could not remove temporary download file %s: %v\nLog: %s", tempFile, err, slog.log)
+ }
+
+ err = os.RemoveAll(tempDir)
if err != nil {
t.Fatalf("Could not remove temporary download directory %s: %v\nLog: %s", tempDir, err, slog.log)
}
diff --git a/local.go b/local.go
index 85a4edc..463fb61 100644
--- a/local.go
+++ b/local.go
@@ -19,6 +19,7 @@ const qidPre = "queuePre"
const qidWipe = "queueWipe"
const qidOCR = "queueOCR"
const qidAnalyse = "queueAnalyse"
+const qidTest = "queueTest"
const storageId = "storage"
// LocalConn is a simple implementation of the pipeliner interface
@@ -120,6 +121,10 @@ func (a *LocalConn) AnalyseQueueId() string {
return qidAnalyse
}
+func (a *LocalConn) TestQueueId() string {
+ return qidTest
+}
+
func (a *LocalConn) WIPStorageId() string {
return storageId
}