diff options
-rw-r--r-- | internal/pipeline/pipeline.go | 9 | ||||
-rw-r--r-- | internal/pipeline/pipeline_test.go | 121 |
2 files changed, 130 insertions, 0 deletions
diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 72d7fd1..3836204 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -79,6 +79,10 @@ func GetMailSettings() (mailSettings, error) { return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil } +// download reads file names from a channel and downloads them into +// dir, putting each successfully downloaded file name into the +// process channel. If an error occurs it is sent to the errc channel +// and the function returns early. func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { for key := range dl { fn := filepath.Join(dir, filepath.Base(key)) @@ -96,6 +100,11 @@ func download(dl chan string, process chan string, conn Pipeliner, dir string, e close(process) } +// up reads file names from a channel and uploads them with +// the bookname/ prefix, removing the local copy of each file +// once it has been successfully uploaded. The done channel is +// then written to to signal completion. If an error occurs it +// is sent to the errc channel and the function returns early. func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { for path := range c { name := filepath.Base(path) diff --git a/internal/pipeline/pipeline_test.go b/internal/pipeline/pipeline_test.go index 55b93e7..2f7c626 100644 --- a/internal/pipeline/pipeline_test.go +++ b/internal/pipeline/pipeline_test.go @@ -17,6 +17,9 @@ import ( "testing" ) +// TODO: use random file names so these tests can be safely +// run in parallel + // StrLog is a simple logger that saves to a string, // so it can be printed out only when needed. type StrLog struct { @@ -33,6 +36,7 @@ type connection struct { c Pipeliner } +// Test_download tests the download() function inside the pipeline func Test_download(t *testing.T) { var slog StrLog vlog := log.New(&slog, "", 0) @@ -151,5 +155,122 @@ func Test_download(t *testing.T) { }) } } +} + +// Test_up tests the up() function inside the pipeline +func Test_up(t *testing.T) { + var slog StrLog + vlog := log.New(&slog, "", 0) + + var conns []connection + + conns = append(conns, connection{name: "local", c: &bookpipeline.LocalConn{Logger: vlog}}) + + if !testing.Short() { + conns = append(conns, connection{name: "aws", c: &bookpipeline.AwsConn{Logger: vlog}}) + } + + cases := []struct { + ul string + contents []byte + process string + errs []error + } { + {"notpresent", []byte(""), "", []error{errors.New("no such file or directory"), errors.New("NoSuchKey: The specified key does not exist")}}, + {"empty", []byte{}, "empty", []error{}}, + {"justastring", []byte("I am just a basic string"), "justastring", []error{}}, + } + for _, conn := range conns { + for _, c := range cases { + t.Run(fmt.Sprintf("%s/%s", conn.name, c.ul), func(t *testing.T) { + err := conn.c.Init() + if err != nil { + t.Fatalf("Could not initialise %s connection: %v\nLog: %s", conn.name, err, slog.log) + } + slog.log = "" + tempDir := filepath.Join(os.TempDir(), "pipelinetest") + err = os.MkdirAll(tempDir, 0700) + if err != nil && ! os.IsExist(err) { + t.Fatalf("Could not create temporary directory %s: %v\nLog: %s", tempDir, err, slog.log) + } + + // create test file + tempFile := filepath.Join(tempDir, c.ul) + if c.ul != "notpresent" { + err = ioutil.WriteFile(tempFile, c.contents, 0600) + if err != nil { + t.Fatalf("Could not create temporary file %s: %v\nLog: %s", tempFile, err, slog.log) + } + } + + // upload + ulchan := make(chan string) + donechan := make(chan bool) + errchan := make(chan error) + + go up(ulchan, donechan, conn.c, "pipelinetest", errchan, vlog) + + ulchan <- filepath.Join(tempDir, c.ul) + close(ulchan) + + // check all is as expected + select { + case err = <-errchan: + if len(c.errs) == 0 { + t.Fatalf("Received an error when one was not expected, error: %v\nLog: %s", err, slog.log) + } + expectedErrFound := 0 + for _, v := range c.errs { + if strings.Contains(err.Error(), v.Error()) { + expectedErrFound = 1 + } + } + if expectedErrFound == 0 { + t.Fatalf("Received a different error than was expected, expected one of: %v, got %v\nLog: %s", c.errs, err, slog.log) + } + case <-donechan: + } + + if c.ul == "notpresent" { + return + } + + _, err = os.Stat(tempFile) + if os.IsExist(err) { + t.Fatalf("Uploaded file not removed as it should have been after uploading %s: %v\nLog: %s", tempFile, err, slog.log) + } + + err = conn.c.Download(conn.c.WIPStorageId(), "pipelinetest/" + c.ul, tempFile) + if err != nil { + t.Fatalf("Could not download file %s: %v\nLog: %s", tempFile, err, slog.log) + } + + dled, err := ioutil.ReadFile(tempFile) + if err != nil { + t.Fatalf("Could not read downloaded file %s: %v\nLog: %s", tempFile, err, slog.log) + } + + if !bytes.Equal(dled, c.contents) { + t.Fatalf("Uploaded file differs from expected, expected: '%s', got '%s'\nLog: %s", c.contents, dled, slog.log) + } + + // cleanup + err = conn.c.DeleteObjects(conn.c.WIPStorageId(), []string{"pipelinetest/" + c.ul}) + if err != nil { + t.Fatalf("Could not delete storage object used for test %s: %v\nLog: %s", c.ul, err, slog.log) + } + + err = os.Remove(tempFile) + if err != nil { + t.Fatalf("Could not remove temporary download file %s: %v\nLog: %s", tempFile, err, slog.log) + } + + err = os.Remove(tempDir) + if err != nil { + t.Fatalf("Could not remove temporary download directory %s: %v\nLog: %s", tempDir, err, slog.log) + } + }) + } + } } |