summaryrefslogtreecommitdiff
path: root/internal/pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'internal/pipeline')
-rw-r--r--internal/pipeline/pipeline.go9
-rw-r--r--internal/pipeline/pipeline_test.go121
2 files changed, 130 insertions, 0 deletions
diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go
index 72d7fd1..3836204 100644
--- a/internal/pipeline/pipeline.go
+++ b/internal/pipeline/pipeline.go
@@ -79,6 +79,10 @@ func GetMailSettings() (mailSettings, error) {
return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil
}
+// download reads file names from a channel and downloads them into
+// dir, putting each successfully downloaded file name into the
+// process channel. If an error occurs it is sent to the errc channel
+// and the function returns early.
func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) {
for key := range dl {
fn := filepath.Join(dir, filepath.Base(key))
@@ -96,6 +100,11 @@ func download(dl chan string, process chan string, conn Pipeliner, dir string, e
close(process)
}
+// up reads file names from a channel and uploads them with
+// the bookname/ prefix, removing the local copy of each file
+// once it has been successfully uploaded. The done channel is
+// then written to to signal completion. If an error occurs it
+// is sent to the errc channel and the function returns early.
func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) {
for path := range c {
name := filepath.Base(path)
diff --git a/internal/pipeline/pipeline_test.go b/internal/pipeline/pipeline_test.go
index 55b93e7..2f7c626 100644
--- a/internal/pipeline/pipeline_test.go
+++ b/internal/pipeline/pipeline_test.go
@@ -17,6 +17,9 @@ import (
"testing"
)
+// TODO: use random file names so these tests can be safely
+// run in parallel
+
// StrLog is a simple logger that saves to a string,
// so it can be printed out only when needed.
type StrLog struct {
@@ -33,6 +36,7 @@ type connection struct {
c Pipeliner
}
+// Test_download tests the download() function inside the pipeline
func Test_download(t *testing.T) {
var slog StrLog
vlog := log.New(&slog, "", 0)
@@ -151,5 +155,122 @@ func Test_download(t *testing.T) {
})
}
}
+}
+
+// Test_up tests the up() function inside the pipeline
+func Test_up(t *testing.T) {
+ var slog StrLog
+ vlog := log.New(&slog, "", 0)
+
+ var conns []connection
+
+ conns = append(conns, connection{name: "local", c: &bookpipeline.LocalConn{Logger: vlog}})
+
+ if !testing.Short() {
+ conns = append(conns, connection{name: "aws", c: &bookpipeline.AwsConn{Logger: vlog}})
+ }
+
+ cases := []struct {
+ ul string
+ contents []byte
+ process string
+ errs []error
+ } {
+ {"notpresent", []byte(""), "", []error{errors.New("no such file or directory"), errors.New("NoSuchKey: The specified key does not exist")}},
+ {"empty", []byte{}, "empty", []error{}},
+ {"justastring", []byte("I am just a basic string"), "justastring", []error{}},
+ }
+ for _, conn := range conns {
+ for _, c := range cases {
+ t.Run(fmt.Sprintf("%s/%s", conn.name, c.ul), func(t *testing.T) {
+ err := conn.c.Init()
+ if err != nil {
+ t.Fatalf("Could not initialise %s connection: %v\nLog: %s", conn.name, err, slog.log)
+ }
+ slog.log = ""
+ tempDir := filepath.Join(os.TempDir(), "pipelinetest")
+ err = os.MkdirAll(tempDir, 0700)
+ if err != nil && ! os.IsExist(err) {
+ t.Fatalf("Could not create temporary directory %s: %v\nLog: %s", tempDir, err, slog.log)
+ }
+
+ // create test file
+ tempFile := filepath.Join(tempDir, c.ul)
+ if c.ul != "notpresent" {
+ err = ioutil.WriteFile(tempFile, c.contents, 0600)
+ if err != nil {
+ t.Fatalf("Could not create temporary file %s: %v\nLog: %s", tempFile, err, slog.log)
+ }
+ }
+
+ // upload
+ ulchan := make(chan string)
+ donechan := make(chan bool)
+ errchan := make(chan error)
+
+ go up(ulchan, donechan, conn.c, "pipelinetest", errchan, vlog)
+
+ ulchan <- filepath.Join(tempDir, c.ul)
+ close(ulchan)
+
+ // check all is as expected
+ select {
+ case err = <-errchan:
+ if len(c.errs) == 0 {
+ t.Fatalf("Received an error when one was not expected, error: %v\nLog: %s", err, slog.log)
+ }
+ expectedErrFound := 0
+ for _, v := range c.errs {
+ if strings.Contains(err.Error(), v.Error()) {
+ expectedErrFound = 1
+ }
+ }
+ if expectedErrFound == 0 {
+ t.Fatalf("Received a different error than was expected, expected one of: %v, got %v\nLog: %s", c.errs, err, slog.log)
+ }
+ case <-donechan:
+ }
+
+ if c.ul == "notpresent" {
+ return
+ }
+
+ _, err = os.Stat(tempFile)
+ if os.IsExist(err) {
+ t.Fatalf("Uploaded file not removed as it should have been after uploading %s: %v\nLog: %s", tempFile, err, slog.log)
+ }
+
+ err = conn.c.Download(conn.c.WIPStorageId(), "pipelinetest/" + c.ul, tempFile)
+ if err != nil {
+ t.Fatalf("Could not download file %s: %v\nLog: %s", tempFile, err, slog.log)
+ }
+
+ dled, err := ioutil.ReadFile(tempFile)
+ if err != nil {
+ t.Fatalf("Could not read downloaded file %s: %v\nLog: %s", tempFile, err, slog.log)
+ }
+
+ if !bytes.Equal(dled, c.contents) {
+ t.Fatalf("Uploaded file differs from expected, expected: '%s', got '%s'\nLog: %s", c.contents, dled, slog.log)
+ }
+
+ // cleanup
+ err = conn.c.DeleteObjects(conn.c.WIPStorageId(), []string{"pipelinetest/" + c.ul})
+ if err != nil {
+ t.Fatalf("Could not delete storage object used for test %s: %v\nLog: %s", c.ul, err, slog.log)
+ }
+
+ err = os.Remove(tempFile)
+ if err != nil {
+ t.Fatalf("Could not remove temporary download file %s: %v\nLog: %s", tempFile, err, slog.log)
+ }
+
+ err = os.Remove(tempDir)
+ if err != nil {
+ t.Fatalf("Could not remove temporary download directory %s: %v\nLog: %s", tempDir, err, slog.log)
+ }
+ })
+ }
+ }
}