summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/pipeline/pipeline.go1
-rw-r--r--internal/pipeline/pipeline_test.go155
2 files changed, 156 insertions, 0 deletions
diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go
index 0d027a3..5ba0cb4 100644
--- a/internal/pipeline/pipeline.go
+++ b/internal/pipeline/pipeline.go
@@ -33,6 +33,7 @@ const HeartbeatSeconds = 60
type Clouder interface {
Init() error
ListObjects(bucket string, prefix string) ([]string, error)
+ DeleteObjects(bucket string, keys []string) error
Download(bucket string, key string, fn string) error
Upload(bucket string, key string, path string) error
CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error)
diff --git a/internal/pipeline/pipeline_test.go b/internal/pipeline/pipeline_test.go
new file mode 100644
index 0000000..68039e1
--- /dev/null
+++ b/internal/pipeline/pipeline_test.go
@@ -0,0 +1,155 @@
+// Copyright 2021 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+package pipeline
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "os"
+ "path/filepath"
+ "rescribe.xyz/bookpipeline"
+ "strings"
+ "testing"
+)
+
+// StrLog is a simple logger that saves to a string,
+// so it can be printed out only when needed.
+type StrLog struct {
+ log string
+}
+
+func (t *StrLog) Write(p []byte) (n int, err error) {
+ t.log += string(p)
+ return len(p), nil
+}
+
+type connection struct {
+ name string
+ c Pipeliner
+}
+
+func Test_download(t *testing.T) {
+ var slog StrLog
+ vlog := log.New(&slog, "", 0)
+
+ var conns []connection
+
+ conns = append(conns, connection{name: "local", c: &bookpipeline.LocalConn{Logger: vlog}})
+
+ if !testing.Short() {
+ conns = append(conns, connection{name: "aws", c: &bookpipeline.AwsConn{Logger: vlog}})
+ }
+
+ cases := []struct {
+ dl string
+ contents []byte
+ process string
+ errs []error
+ } {
+ {"notpresent", []byte(""), "", []error{errors.New("no such file or directory"), errors.New("NoSuchKey: The specified key does not exist")}},
+ {"empty", []byte{}, "empty", []error{}},
+ {"justastring", []byte("I am just a basic string"), "justastring", []error{}},
+ }
+
+ for _, conn := range conns {
+ for _, c := range cases {
+ t.Run(fmt.Sprintf("%s/%s", conn.name, c.dl), func(t *testing.T) {
+ err := conn.c.Init()
+ if err != nil {
+ t.Fatalf("Could not initialise %s connection: %v\nLog: %s", conn.name, err, slog.log)
+ }
+ slog.log = ""
+ tempDir := filepath.Join(os.TempDir(), "pipelinetest")
+ err = os.MkdirAll(tempDir, 0700)
+ if err != nil && ! os.IsExist(err) {
+ t.Fatalf("Could not create temporary directory %s: %v\nLog: %s", tempDir, err, slog.log)
+ }
+
+ // create and upload test file
+ tempFile := filepath.Join(tempDir, "t")
+ err = ioutil.WriteFile(tempFile, c.contents, 0600)
+ if err != nil {
+ t.Fatalf("Could not create temporary file %s: %v\nLog: %s", tempFile, err, slog.log)
+ }
+ if c.dl != "notpresent" {
+ err = conn.c.Upload(conn.c.WIPStorageId(), c.dl, tempFile)
+ if err != nil {
+ t.Fatalf("Could not upload file %s: %v\nLog: %s", tempFile, err, slog.log)
+ }
+ }
+ err = os.Remove(tempFile)
+ if err != nil {
+ t.Fatalf("Could not remove temporary upload file %s: %v\nLog: %s", tempFile, err, slog.log)
+ }
+
+ // download
+ dlchan := make(chan string)
+ processchan := make(chan string)
+ errchan := make(chan error)
+
+ go download(dlchan, processchan, conn.c, tempDir, errchan, vlog)
+
+ dlchan <- c.dl
+ close(dlchan)
+
+ // check all is as expected
+ select {
+ case err = <-errchan:
+ if len(c.errs) == 0 {
+ t.Fatalf("Received an error when one was not expected, error: %v\nLog: %s", err, slog.log)
+ }
+ expectedErrFound := 0
+ for _, v := range c.errs {
+ if strings.Contains(err.Error(), v.Error()) {
+ expectedErrFound = 1
+ }
+ }
+ if expectedErrFound == 0 {
+ t.Fatalf("Received a different error than was expected, expected one of: %v, got %v\nLog: %s", c.errs, err, slog.log)
+ }
+ case process := <-processchan:
+ expected := tempDir + "/" + c.process
+ if expected != process {
+ t.Fatalf("Received a different addition to the process channel than was expected, expected: %v, got %v\nLog: %s", expected, process, slog.log)
+ }
+ }
+
+ if c.dl == "notpresent" {
+ return
+ }
+
+ tempFile = filepath.Join(tempDir, c.dl)
+ dled, err := ioutil.ReadFile(tempFile)
+ if err != nil {
+ t.Fatalf("Could not read downloaded file %s: %v\nLog: %s", tempFile, err, slog.log)
+ }
+
+ if !bytes.Equal(dled, c.contents) {
+ t.Fatalf("Downloaded file differs from expected, expected: '%s', got '%s'\nLog: %s", c.contents, dled, slog.log)
+ }
+
+ // cleanup
+ err = conn.c.DeleteObjects(conn.c.WIPStorageId(), []string{c.dl})
+ if err != nil {
+ t.Fatalf("Could not delete storage object used for test %s: %v\nLog: %s", c.dl, err, slog.log)
+ }
+
+ err = os.Remove(tempFile)
+ if err != nil {
+ t.Fatalf("Could not remove temporary download file %s: %v\nLog: %s", tempFile, err, slog.log)
+ }
+
+ err = os.Remove(tempDir)
+ if err != nil {
+ t.Fatalf("Could not remove temporary download directory %s: %v\nLog: %s", tempDir, err, slog.log)
+ }
+ })
+ }
+ }
+
+}