diff options
| -rw-r--r-- | internal/pipeline/pipeline.go | 1 | ||||
| -rw-r--r-- | internal/pipeline/pipeline_test.go | 155 | 
2 files changed, 156 insertions, 0 deletions
| diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 0d027a3..5ba0cb4 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -33,6 +33,7 @@ const HeartbeatSeconds = 60  type Clouder interface {  	Init() error  	ListObjects(bucket string, prefix string) ([]string, error) +	DeleteObjects(bucket string, keys []string) error  	Download(bucket string, key string, fn string) error  	Upload(bucket string, key string, path string) error  	CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) diff --git a/internal/pipeline/pipeline_test.go b/internal/pipeline/pipeline_test.go new file mode 100644 index 0000000..68039e1 --- /dev/null +++ b/internal/pipeline/pipeline_test.go @@ -0,0 +1,155 @@ +// Copyright 2021 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +package pipeline + +import ( +	"bytes" +	"errors" +	"fmt" +	"io/ioutil" +	"log" +	"os" +	"path/filepath" +	"rescribe.xyz/bookpipeline" +	"strings" +	"testing" +) + +// StrLog is a simple logger that saves to a string, +// so it can be printed out only when needed. +type StrLog struct { +	log string +} + +func (t *StrLog) Write(p []byte) (n int, err error) { +	t.log += string(p) +	return len(p), nil +} + +type connection struct { +	name string +	c Pipeliner +} + +func Test_download(t *testing.T) { +	var slog StrLog +	vlog := log.New(&slog, "", 0) + +	var conns []connection + +	conns = append(conns, connection{name: "local", c: &bookpipeline.LocalConn{Logger: vlog}}) + +	if !testing.Short() { +		conns = append(conns, connection{name: "aws", c: &bookpipeline.AwsConn{Logger: vlog}}) +	} + +	cases := []struct { +		dl string +		contents []byte +		process string +		errs []error +	} { +		{"notpresent", []byte(""), "", []error{errors.New("no such file or directory"), errors.New("NoSuchKey: The specified key does not exist")}}, +		{"empty", []byte{}, "empty", []error{}}, +		{"justastring", []byte("I am just a basic string"), "justastring", []error{}}, +	} + +	for _, conn := range conns { +		for _, c := range cases { +			t.Run(fmt.Sprintf("%s/%s", conn.name, c.dl), func(t *testing.T) { +				err := conn.c.Init() +				if err != nil { +					t.Fatalf("Could not initialise %s connection: %v\nLog: %s", conn.name, err, slog.log) +				} +				slog.log = "" +				tempDir := filepath.Join(os.TempDir(), "pipelinetest") +				err = os.MkdirAll(tempDir, 0700) +				if err != nil && ! os.IsExist(err) { +					t.Fatalf("Could not create temporary directory %s: %v\nLog: %s", tempDir, err, slog.log) +				} + +				// create and upload test file +				tempFile := filepath.Join(tempDir, "t") +				err = ioutil.WriteFile(tempFile, c.contents, 0600) +				if err != nil { +					t.Fatalf("Could not create temporary file %s: %v\nLog: %s", tempFile, err, slog.log) +				} +				if c.dl != "notpresent" { +					err = conn.c.Upload(conn.c.WIPStorageId(), c.dl, tempFile) +					if err != nil { +						t.Fatalf("Could not upload file %s: %v\nLog: %s", tempFile, err, slog.log) +					} +				} +				err = os.Remove(tempFile) +				if err != nil { +					t.Fatalf("Could not remove temporary upload file %s: %v\nLog: %s", tempFile, err, slog.log) +				} + +				// download +				dlchan := make(chan string) +				processchan := make(chan string) +				errchan := make(chan error) + +				go download(dlchan, processchan, conn.c, tempDir, errchan, vlog) + +				dlchan <- c.dl +				close(dlchan) + +				// check all is as expected +				select { +				case err = <-errchan: +					if len(c.errs) == 0 { +						t.Fatalf("Received an error when one was not expected, error: %v\nLog: %s", err, slog.log) +					} +					expectedErrFound := 0 +					for _, v := range c.errs { +						if strings.Contains(err.Error(), v.Error()) { +							expectedErrFound = 1 +						} +					} +					if expectedErrFound == 0 { +						t.Fatalf("Received a different error than was expected, expected one of: %v, got %v\nLog: %s", c.errs, err, slog.log) +					} +				case process := <-processchan: +					expected := tempDir + "/" + c.process  +					if expected != process { +						t.Fatalf("Received a different addition to the process channel than was expected, expected: %v, got %v\nLog: %s", expected, process, slog.log) +					} +				} + +				if c.dl == "notpresent" { +					return +				} + +				tempFile = filepath.Join(tempDir, c.dl) +				dled, err := ioutil.ReadFile(tempFile) +				if err != nil { +					t.Fatalf("Could not read downloaded file %s: %v\nLog: %s", tempFile, err, slog.log) +				} + +				if !bytes.Equal(dled, c.contents) { +					t.Fatalf("Downloaded file differs from expected, expected: '%s', got '%s'\nLog: %s", c.contents, dled, slog.log) +				} + +				// cleanup +				err = conn.c.DeleteObjects(conn.c.WIPStorageId(), []string{c.dl}) +				if err != nil { +					t.Fatalf("Could not delete storage object used for test %s: %v\nLog: %s", c.dl, err, slog.log) +				} + +				err = os.Remove(tempFile) +				if err != nil { +					t.Fatalf("Could not remove temporary download file %s: %v\nLog: %s", tempFile, err, slog.log) +				} + +				err = os.Remove(tempDir) +				if err != nil { +					t.Fatalf("Could not remove temporary download directory %s: %v\nLog: %s", tempDir, err, slog.log) +				} +			}) +		} +	} + +} | 
