From b8533faf24f2bf5247f28848459f9b7af77e61b3 Mon Sep 17 00:00:00 2001 From: Nick White Date: Mon, 12 Jul 2021 16:47:09 +0100 Subject: Add test for upAndQueue function This involved adding a test queue, so it can be run safely without intefering with the pipeline. --- aws.go | 16 +++- cloudsettings.go | 1 + cmd/getandpurgequeue/main.go | 3 + internal/pipeline/pipeline.go | 7 ++ internal/pipeline/pipeline_test.go | 154 +++++++++++++++++++++++++++++++++++-- local.go | 5 ++ 6 files changed, 180 insertions(+), 6 deletions(-) diff --git a/aws.go b/aws.go index 65671fa..6e0d7f8 100644 --- a/aws.go +++ b/aws.go @@ -51,6 +51,7 @@ type AwsConn struct { downloader *s3manager.Downloader uploader *s3manager.Uploader wipequrl, prequrl, ocrpgqurl, analysequrl string + testqurl string wipstorageid string } @@ -125,6 +126,15 @@ func (a *AwsConn) Init() error { } a.analysequrl = *result.QueueUrl + a.Logger.Println("Getting test queue URL") + result, err = a.sqssvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(queueTest), + }) + if err != nil { + return errors.New(fmt.Sprintf("Error getting test queue URL: %s", err)) + } + a.testqurl = *result.QueueUrl + return nil } @@ -336,6 +346,10 @@ func (a *AwsConn) WIPStorageId() string { return a.wipstorageid } +func (a *AwsConn) TestQueueId() string { + return a.testqurl +} + func (a *AwsConn) ListObjects(bucket string, prefix string) ([]string, error) { var names []string err := a.s3svc.ListObjectsV2Pages(&s3.ListObjectsV2Input{ @@ -595,7 +609,7 @@ func (a *AwsConn) Log(v ...interface{}) { // TODO: also set up the necessary security group and iam stuff func (a *AwsConn) MkPipeline() error { buckets := []string{storageWip} - queues := []string{queuePreProc, queueWipeOnly, queueAnalyse, queueOcrPage} + queues := []string{queuePreProc, queueWipeOnly, queueAnalyse, queueOcrPage, queueTest} for _, bucket := range buckets { err := a.CreateBucket(bucket) diff --git a/cloudsettings.go b/cloudsettings.go index ed23d5d..fa55742 100644 --- a/cloudsettings.go +++ b/cloudsettings.go @@ -30,6 +30,7 @@ const ( queueWipeOnly = "rescribewipeonly" queueOcrPage = "rescribeocrpage" queueAnalyse = "rescribeanalyse" + queueTest = "rescribetest1" ) // Storage bucket names. Can be anything unique in S3. diff --git a/cmd/getandpurgequeue/main.go b/cmd/getandpurgequeue/main.go index 33aef60..e5466d3 100644 --- a/cmd/getandpurgequeue/main.go +++ b/cmd/getandpurgequeue/main.go @@ -25,6 +25,7 @@ Valid queue names: - wipeonly - ocrpage - analyse +- test ` type QueuePipeliner interface { @@ -34,6 +35,7 @@ type QueuePipeliner interface { WipeQueueId() string OCRPageQueueId() string AnalyseQueueId() string + TestQueueId() string } func main() { @@ -63,6 +65,7 @@ func main() { {conn.WipeQueueId(), "wipeonly"}, {conn.OCRPageQueueId(), "ocrpage"}, {conn.AnalyseQueueId(), "analyse"}, + {conn.TestQueueId(), "test"}, } qname := flag.Arg(0) diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 3836204..3419f74 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -48,6 +48,7 @@ type Pipeliner interface { WipeQueueId() string OCRPageQueueId() string AnalyseQueueId() string + TestQueueId() string WIPStorageId() string GetLogger() *log.Logger Log(v ...interface{}) @@ -129,6 +130,12 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha done <- true } +// upAndQueue reads file names from a channel and uploads them with +// the bookname/ prefix, removing the local copy of each file +// once it has been successfully uploaded. Each done file name is +// added to the toQueue once it has been uploaded. The done channel +// is then written to to signal completion. If an error occurs it +// is sent to the errc channel and the function returns early. func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { for path := range c { name := filepath.Base(path) diff --git a/internal/pipeline/pipeline_test.go b/internal/pipeline/pipeline_test.go index 2f7c626..dfcb8a3 100644 --- a/internal/pipeline/pipeline_test.go +++ b/internal/pipeline/pipeline_test.go @@ -17,9 +17,6 @@ import ( "testing" ) -// TODO: use random file names so these tests can be safely -// run in parallel - // StrLog is a simple logger that saves to a string, // so it can be printed out only when needed. type StrLog struct { @@ -148,7 +145,7 @@ func Test_download(t *testing.T) { t.Fatalf("Could not remove temporary download file %s: %v\nLog: %s", tempFile, err, slog.log) } - err = os.Remove(tempDir) + err = os.RemoveAll(tempDir) if err != nil { t.Fatalf("Could not remove temporary download directory %s: %v\nLog: %s", tempDir, err, slog.log) } @@ -266,7 +263,154 @@ func Test_up(t *testing.T) { t.Fatalf("Could not remove temporary download file %s: %v\nLog: %s", tempFile, err, slog.log) } - err = os.Remove(tempDir) + err = os.RemoveAll(tempDir) + if err != nil { + t.Fatalf("Could not remove temporary download directory %s: %v\nLog: %s", tempDir, err, slog.log) + } + }) + } + } +} + +// Test_upAndQueue tests the upAndQueue() function inside the pipeline +func Test_upAndQueue(t *testing.T) { + var slog StrLog + vlog := log.New(&slog, "", 0) + + var conns []connection + + conns = append(conns, connection{name: "local", c: &bookpipeline.LocalConn{Logger: vlog}}) + + if !testing.Short() { + conns = append(conns, connection{name: "aws", c: &bookpipeline.AwsConn{Logger: vlog}}) + } + + cases := []struct { + ul string + contents []byte + process string + errs []error + } { + {"notpresent", []byte(""), "", []error{errors.New("no such file or directory"), errors.New("NoSuchKey: The specified key does not exist")}}, + {"empty", []byte{}, "empty", []error{}}, + {"justastring", []byte("I am just a basic string"), "justastring", []error{}}, + } + + for _, conn := range conns { + for _, c := range cases { + t.Run(fmt.Sprintf("%s/%s", conn.name, c.ul), func(t *testing.T) { + err := conn.c.Init() + if err != nil { + t.Fatalf("Could not initialise %s connection: %v\nLog: %s", conn.name, err, slog.log) + } + slog.log = "" + tempDir := filepath.Join(os.TempDir(), "pipelinetest") + err = os.MkdirAll(tempDir, 0700) + if err != nil && ! os.IsExist(err) { + t.Fatalf("Could not create temporary directory %s: %v\nLog: %s", tempDir, err, slog.log) + } + + // create test file + tempFile := filepath.Join(tempDir, c.ul) + if c.ul != "notpresent" { + err = ioutil.WriteFile(tempFile, c.contents, 0600) + if err != nil { + t.Fatalf("Could not create temporary file %s: %v\nLog: %s", tempFile, err, slog.log) + } + } + + // upload + ulchan := make(chan string) + queueurl := conn.c.TestQueueId() + donechan := make(chan bool) + errchan := make(chan error) + + go upAndQueue(ulchan, donechan, queueurl, conn.c, "pipelinetest", "test", errchan, vlog) + + ulchan <- filepath.Join(tempDir, c.ul) + close(ulchan) + + // check all is as expected + select { + case err = <-errchan: + if len(c.errs) == 0 { + t.Fatalf("Received an error when one was not expected, error: %v\nLog: %s", err, slog.log) + } + expectedErrFound := 0 + for _, v := range c.errs { + if strings.Contains(err.Error(), v.Error()) { + expectedErrFound = 1 + } + } + if expectedErrFound == 0 { + t.Fatalf("Received a different error than was expected, expected one of: %v, got %v\nLog: %s", c.errs, err, slog.log) + } + case <-donechan: + } + + msg, err := conn.c.CheckQueue(queueurl, 10) + if err != nil { + t.Fatalf("Error checking test queue: %v", err) + } + + if c.ul == "notpresent" { + if msg.Handle != "" { + _ = conn.c.DelFromQueue(queueurl, msg.Handle) + t.Fatalf("Queue was written to even when an error was received: %s", msg.Body) + } + // for the "notpresent" case we can skip the following + // checks for the file being present and cleanup, having + // already successfully detected that the appropriate error + // was received by errchan + return + } + + _, err = os.Stat(tempFile) + if os.IsExist(err) { + t.Fatalf("Uploaded file not removed as it should have been after uploading %s: %v\nLog: %s", tempFile, err, slog.log) + } + + err = conn.c.Download(conn.c.WIPStorageId(), "pipelinetest/" + c.ul, tempFile) + if err != nil { + t.Fatalf("Could not download file %s: %v\nLog: %s", tempFile, err, slog.log) + } + + dled, err := ioutil.ReadFile(tempFile) + if err != nil { + t.Fatalf("Could not read downloaded file %s: %v\nLog: %s", tempFile, err, slog.log) + } + + if !bytes.Equal(dled, c.contents) { + t.Fatalf("Uploaded file differs from expected, expected: '%s', got '%s'\nLog: %s", c.contents, dled, slog.log) + } + + target, err := filepath.Rel(os.TempDir(), filepath.Join(tempDir, c.ul)) + if err != nil { + t.Fatalf("Error removing TempDir prefix: %v", err) + } + queueExpected := target + " test" + if msg.Body != queueExpected { + _ = conn.c.DelFromQueue(queueurl, msg.Handle) + t.Fatalf("Queue contents not as expected, expected: '%s', got '%s'\nLog: %s", queueExpected, msg.Body, slog.log) + } + + // cleanup + err = conn.c.DeleteObjects(conn.c.WIPStorageId(), []string{"pipelinetest/" + c.ul}) + if err != nil { + t.Fatalf("Could not delete storage object used for test %s: %v\nLog: %s", c.ul, err, slog.log) + } + + err = conn.c.DelFromQueue(queueurl, msg.Handle) + if err != nil { + t.Fatalf("Could not delete test message from queue: %v\nLog: %s", err, slog.log) + } + + err = os.Remove(tempFile) + if err != nil { + t.Fatalf("Could not remove temporary download file %s: %v\nLog: %s", tempFile, err, slog.log) + } + + err = os.RemoveAll(tempDir) if err != nil { t.Fatalf("Could not remove temporary download directory %s: %v\nLog: %s", tempDir, err, slog.log) } diff --git a/local.go b/local.go index 85a4edc..463fb61 100644 --- a/local.go +++ b/local.go @@ -19,6 +19,7 @@ const qidPre = "queuePre" const qidWipe = "queueWipe" const qidOCR = "queueOCR" const qidAnalyse = "queueAnalyse" +const qidTest = "queueTest" const storageId = "storage" // LocalConn is a simple implementation of the pipeliner interface @@ -120,6 +121,10 @@ func (a *LocalConn) AnalyseQueueId() string { return qidAnalyse } +func (a *LocalConn) TestQueueId() string { + return qidTest +} + func (a *LocalConn) WIPStorageId() string { return storageId } -- cgit v1.2.1-24-ge1ad