diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/pipeline/pipeline.go | 7 | ||||
| -rw-r--r-- | internal/pipeline/pipeline_test.go | 154 | 
2 files changed, 156 insertions, 5 deletions
| diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 3836204..3419f74 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -48,6 +48,7 @@ type Pipeliner interface {  	WipeQueueId() string  	OCRPageQueueId() string  	AnalyseQueueId() string +	TestQueueId() string  	WIPStorageId() string  	GetLogger() *log.Logger  	Log(v ...interface{}) @@ -129,6 +130,12 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha  	done <- true  } +// upAndQueue reads file names from a channel and uploads them with +// the bookname/ prefix, removing the local copy of each file +// once it has been successfully uploaded. Each done file name is +// added to the toQueue once it has been uploaded. The done channel +// is then written to to signal completion. If an error occurs it +// is sent to the errc channel and the function returns early.  func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) {  	for path := range c {  		name := filepath.Base(path) diff --git a/internal/pipeline/pipeline_test.go b/internal/pipeline/pipeline_test.go index 2f7c626..dfcb8a3 100644 --- a/internal/pipeline/pipeline_test.go +++ b/internal/pipeline/pipeline_test.go @@ -17,9 +17,6 @@ import (  	"testing"  ) -// TODO: use random file names so these tests can be safely -//       run in parallel -  // StrLog is a simple logger that saves to a string,  // so it can be printed out only when needed.  type StrLog struct { @@ -148,7 +145,7 @@ func Test_download(t *testing.T) {  					t.Fatalf("Could not remove temporary download file %s: %v\nLog: %s", tempFile, err, slog.log)  				} -				err = os.Remove(tempDir) +				err = os.RemoveAll(tempDir)  				if err != nil {  					t.Fatalf("Could not remove temporary download directory %s: %v\nLog: %s", tempDir, err, slog.log)  				} @@ -266,7 +263,154 @@ func Test_up(t *testing.T) {  					t.Fatalf("Could not remove temporary download file %s: %v\nLog: %s", tempFile, err, slog.log)  				} -				err = os.Remove(tempDir) +				err = os.RemoveAll(tempDir) +				if err != nil { +					t.Fatalf("Could not remove temporary download directory %s: %v\nLog: %s", tempDir, err, slog.log) +				} +			}) +		} +	} +} + +// Test_upAndQueue tests the upAndQueue() function inside the pipeline +func Test_upAndQueue(t *testing.T) { +	var slog StrLog +	vlog := log.New(&slog, "", 0) + +	var conns []connection + +	conns = append(conns, connection{name: "local", c: &bookpipeline.LocalConn{Logger: vlog}}) + +	if !testing.Short() { +		conns = append(conns, connection{name: "aws", c: &bookpipeline.AwsConn{Logger: vlog}}) +	} + +	cases := []struct { +		ul string +		contents []byte +		process string +		errs []error +	} { +		{"notpresent", []byte(""), "", []error{errors.New("no such file or directory"), errors.New("NoSuchKey: The specified key does not exist")}}, +		{"empty", []byte{}, "empty", []error{}}, +		{"justastring", []byte("I am just a basic string"), "justastring", []error{}}, +	} + +	for _, conn := range conns { +		for _, c := range cases { +			t.Run(fmt.Sprintf("%s/%s", conn.name, c.ul), func(t *testing.T) { +				err := conn.c.Init() +				if err != nil { +					t.Fatalf("Could not initialise %s connection: %v\nLog: %s", conn.name, err, slog.log) +				} +				slog.log = "" +				tempDir := filepath.Join(os.TempDir(), "pipelinetest") +				err = os.MkdirAll(tempDir, 0700) +				if err != nil && ! os.IsExist(err) { +					t.Fatalf("Could not create temporary directory %s: %v\nLog: %s", tempDir, err, slog.log) +				} + +				// create test file +				tempFile := filepath.Join(tempDir, c.ul) +				if c.ul != "notpresent" { +					err = ioutil.WriteFile(tempFile, c.contents, 0600) +					if err != nil { +						t.Fatalf("Could not create temporary file %s: %v\nLog: %s", tempFile, err, slog.log) +					} +				} + +				// upload +				ulchan := make(chan string) +				queueurl := conn.c.TestQueueId() +				donechan := make(chan bool) +				errchan := make(chan error) + +				go upAndQueue(ulchan, donechan, queueurl, conn.c, "pipelinetest", "test", errchan, vlog) + +				ulchan <- filepath.Join(tempDir, c.ul) +				close(ulchan) + +				// check all is as expected +				select { +				case err = <-errchan: +					if len(c.errs) == 0 { +						t.Fatalf("Received an error when one was not expected, error: %v\nLog: %s", err, slog.log) +					} +					expectedErrFound := 0 +					for _, v := range c.errs { +						if strings.Contains(err.Error(), v.Error()) { +							expectedErrFound = 1 +						} +					} +					if expectedErrFound == 0 { +						t.Fatalf("Received a different error than was expected, expected one of: %v, got %v\nLog: %s", c.errs, err, slog.log) +					} +				case <-donechan: +				} + +				msg, err := conn.c.CheckQueue(queueurl, 10) +				if err != nil { +					t.Fatalf("Error checking test queue: %v", err) +				} + +				if c.ul == "notpresent" { +					if msg.Handle != "" { +						_ = conn.c.DelFromQueue(queueurl, msg.Handle) +						t.Fatalf("Queue was written to even when an error was received: %s", msg.Body) +					} +					// for the "notpresent" case we can skip the following +					// checks for the file being present and cleanup, having +					// already successfully detected that the appropriate error +					// was received by errchan +					return +				} + +				_, err = os.Stat(tempFile) +				if os.IsExist(err) { +					t.Fatalf("Uploaded file not removed as it should have been after uploading %s: %v\nLog: %s", tempFile, err, slog.log) +				} + +				err = conn.c.Download(conn.c.WIPStorageId(), "pipelinetest/" + c.ul, tempFile) +				if err != nil { +					t.Fatalf("Could not download file %s: %v\nLog: %s", tempFile, err, slog.log) +				} + +				dled, err := ioutil.ReadFile(tempFile) +				if err != nil { +					t.Fatalf("Could not read downloaded file %s: %v\nLog: %s", tempFile, err, slog.log) +				} + +				if !bytes.Equal(dled, c.contents) { +					t.Fatalf("Uploaded file differs from expected, expected: '%s', got '%s'\nLog: %s", c.contents, dled, slog.log) +				} + +				target, err := filepath.Rel(os.TempDir(), filepath.Join(tempDir, c.ul)) +				if err != nil { +					t.Fatalf("Error removing TempDir prefix: %v", err) +				} +				queueExpected := target + " test" +				if msg.Body != queueExpected { +					_ = conn.c.DelFromQueue(queueurl, msg.Handle) +					t.Fatalf("Queue contents not as expected, expected: '%s', got '%s'\nLog: %s", queueExpected, msg.Body, slog.log) +				} + +				// cleanup +				err = conn.c.DeleteObjects(conn.c.WIPStorageId(), []string{"pipelinetest/" + c.ul}) +				if err != nil { +					t.Fatalf("Could not delete storage object used for test %s: %v\nLog: %s", c.ul, err, slog.log) +				} + +				err = conn.c.DelFromQueue(queueurl, msg.Handle) +				if err != nil { +					t.Fatalf("Could not delete test message from queue: %v\nLog: %s", err, slog.log) +				} + +				err = os.Remove(tempFile) +				if err != nil { +					t.Fatalf("Could not remove temporary download file %s: %v\nLog: %s", tempFile, err, slog.log) +				} + +				err = os.RemoveAll(tempDir)  				if err != nil {  					t.Fatalf("Could not remove temporary download directory %s: %v\nLog: %s", tempDir, err, slog.log)  				} | 
