diff options
| author | Nick White <git@njw.name> | 2020-05-26 16:03:17 +0100 | 
|---|---|---|
| committer | Nick White <git@njw.name> | 2020-05-26 16:03:17 +0100 | 
| commit | 870fb0f3e01a0771d3839da2461e0cec30282bc0 (patch) | |
| tree | c8f461ba1b829ce4d91d6eb90fd7d34f1e086845 | |
| parent | 3d2bdd1d30ec6d43b4921d29d5a8338d65c48a5a (diff) | |
| parent | ebc27eef77868fa44daed7cfb0ea129690029da8 (diff) | |
Merge branch 'local'
| -rw-r--r-- | cmd/addtoqueue/main.go | 13 | ||||
| -rw-r--r-- | cmd/bookpipeline/main.go | 12 | ||||
| -rw-r--r-- | cmd/booktopipeline/main.go | 12 | ||||
| -rw-r--r-- | cmd/getpipelinebook/main.go | 12 | ||||
| -rw-r--r-- | local.go | 241 | 
5 files changed, 282 insertions, 8 deletions
| diff --git a/cmd/addtoqueue/main.go b/cmd/addtoqueue/main.go index 8e4ecd2..57087ca 100644 --- a/cmd/addtoqueue/main.go +++ b/cmd/addtoqueue/main.go @@ -14,7 +14,7 @@ import (  	"rescribe.xyz/bookpipeline"  ) -const usage = `Usage: addtoqueue qname msg +const usage = `Usage: addtoqueue [-c conn] qname msg  addtoqueue adds a message to a queue. @@ -44,6 +44,7 @@ type QueuePipeliner interface {  }  func main() { +	conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")  	flag.Usage = func() {  		fmt.Fprintf(flag.CommandLine.Output(), usage)  		flag.PrintDefaults() @@ -58,7 +59,15 @@ func main() {  	var n NullWriter  	quietlog := log.New(n, "", 0)  	var conn QueuePipeliner -	conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: quietlog} + +	switch *conntype { +	case "aws": +		conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: quietlog} +	case "local": +		conn = &bookpipeline.LocalConn{Logger: quietlog} +	default: +		log.Fatalln("Unknown connection type") +	}  	err := conn.Init()  	if err != nil { diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 9618e93..344006a 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -25,7 +25,7 @@ import (  	"rescribe.xyz/utils/pkg/hocr"  ) -const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] +const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false]  Watches the preprocess, wipeonly, ocrpage and analyse queues for messages.  When one is found this general process is followed: @@ -677,6 +677,7 @@ func main() {  	noocrpg := flag.Bool("nop", false, "disable ocr on individual pages")  	noanalyse := flag.Bool("na", false, "disable analysis")  	autoshutdown := flag.Bool("shutdown", false, "automatically shut down if no work has been available for 5 minutes") +	conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")  	flag.Usage = func() {  		fmt.Fprintf(flag.CommandLine.Output(), usage) @@ -697,7 +698,14 @@ func main() {  	ocredPattern := regexp.MustCompile(`.hocr$`)  	var conn Pipeliner -	conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} +	switch *conntype { +	case "aws": +		conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} +	case "local": +		conn = &bookpipeline.LocalConn{Logger: verboselog} +	default: +		log.Fatalln("Unknown connection type") +	}  	conn.Log("Setting up AWS session")  	err := conn.Init() diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go index 2e7064e..609c3b3 100644 --- a/cmd/booktopipeline/main.go +++ b/cmd/booktopipeline/main.go @@ -16,7 +16,7 @@ import (  	"rescribe.xyz/bookpipeline"  ) -const usage = `Usage: booktopipeline [-t training] [-prebinarised] [-v] bookdir [bookname] +const usage = `Usage: booktopipeline [-c conn] [-t training] [-prebinarised] [-v] bookdir [bookname]  Uploads the book in bookdir to the S3 'inprogress' bucket and adds it  to the 'preprocess' SQS queue, or the 'wipeonly' queue if the @@ -57,6 +57,7 @@ func (f fileWalk) Walk(path string, info os.FileInfo, err error) error {  func main() {  	verbose := flag.Bool("v", false, "Verbose") +	conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")  	wipeonly := flag.Bool("prebinarised", false, "Prebinarised: only preprocessing will be to wipe")  	training := flag.String("t", "", "Training to use (training filename without the .traineddata part)") @@ -86,7 +87,14 @@ func main() {  	}  	var conn Pipeliner -	conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} +	switch *conntype { +	case "aws": +		conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} +	case "local": +		conn = &bookpipeline.LocalConn{Logger: verboselog} +	default: +		log.Fatalln("Unknown connection type") +	}  	err := conn.Init()  	if err != nil {  		log.Fatalln("Failed to set up cloud connection:", err) diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go index dc9387f..03e709b 100644 --- a/cmd/getpipelinebook/main.go +++ b/cmd/getpipelinebook/main.go @@ -17,7 +17,7 @@ import (  	"rescribe.xyz/bookpipeline"  ) -const usage = `Usage: getpipelinebook [-a] [-graph] [-pdf] [-png] [-v] bookname +const usage = `Usage: getpipelinebook [-c conn] [-a] [-graph] [-pdf] [-png] [-v] bookname  Downloads the pipeline results for a book. @@ -57,6 +57,7 @@ func getpdfs(conn Pipeliner, l *log.Logger, bookname string) {  func main() {  	all := flag.Bool("a", false, "Get all files for book") +	conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")  	graph := flag.Bool("graph", false, "Only download graphs (can be used alongside -pdf)")  	binarisedpdf := flag.Bool("binarisedpdf", false, "Only download binarised PDF (can be used alongside -graph)")  	colourpdf := flag.Bool("colourpdf", false, "Only download colour PDF (can be used alongside -graph)") @@ -83,7 +84,14 @@ func main() {  	}  	var conn Pipeliner -	conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} +	switch *conntype { +	case "aws": +		conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} +	case "local": +		conn = &bookpipeline.LocalConn{Logger: verboselog} +	default: +		log.Fatalln("Unknown connection type") +	}  	verboselog.Println("Setting up AWS session")  	err := conn.MinimalInit() diff --git a/local.go b/local.go new file mode 100644 index 0000000..70991b2 --- /dev/null +++ b/local.go @@ -0,0 +1,241 @@ +// Copyright 2020 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +package bookpipeline + +import ( +	"bufio" +	"fmt" +	"io" +	"io/ioutil" +	"log" +	"os" +	"path/filepath" +	"strings" +) + +const qidPre = "queuePre" +const qidWipe = "queueWipe" +const qidOCR = "queueOCR" +const qidAnalyse = "queueAnalyse" +const storageId = "storage" + +// LocalConn is a simple implementation of the pipeliner interface +// that doesn't rely on any "cloud" services, instead doing everything +// on the local machine. This is particularly useful for testing. +type LocalConn struct { +	// these should be set before running Init(), or left to defaults +	TempDir string +	Logger *log.Logger +} + +// MinimalInit does the bare minimum initialisation +func (a *LocalConn) MinimalInit() error { +	var err error +	if a.TempDir == "" { +		a.TempDir = filepath.Join(os.TempDir(), "bookpipeline") +	} +	err = os.Mkdir(a.TempDir, 0700) +	if err != nil && !os.IsExist(err) { +		return fmt.Errorf("Error creating temporary directory: %v", err) +	} + +	err = os.Mkdir(filepath.Join(a.TempDir, storageId), 0700) +	if err != nil && !os.IsExist(err) { +		return fmt.Errorf("Error creating storage directory: %v", err) +	} + +	if a.Logger == nil { +		a.Logger = log.New(os.Stdout, "", 0) +	} + +	return nil +} + +// Init just does the same as MinimalInit +func (a *LocalConn) Init() error { +	err := a.MinimalInit() +	if err != nil { +		return err +	} + +	return nil +} + +// CheckQueue checks for any messages in a queue +func (a *LocalConn) CheckQueue(url string, timeout int64) (Qmsg, error) { +	f, err := os.Open(filepath.Join(a.TempDir, url)) +	if err != nil { +		f, err = os.Create(filepath.Join(a.TempDir, url)) +	} +	if err != nil { +		return Qmsg{}, err +	} +	if err != nil { +		return Qmsg{}, err +	} +	defer f.Close() +	r := bufio.NewReader(f) +	s, err := r.ReadString('\n') +	if err != nil && err != io.EOF { +		return Qmsg{}, err +	} +	s = strings.TrimSpace(s) + +	return Qmsg{Body: s, Handle: s}, nil +} + +// QueueHeartbeat is a no-op with LocalConn +func (a *LocalConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, error) { +	return Qmsg{}, nil +} + +// GetQueueDetails gets the number of in progress and available +// messages for a queue. These are returned as strings. +func (a *LocalConn) GetQueueDetails(url string) (string, string, error) { +	b, err := ioutil.ReadFile(filepath.Join(a.TempDir, url)) +	if err != nil { +		return "", "", err +	} +	s := string(b) +	n := strings.Count(s, "\n") + +	return fmt.Sprintf("%d", n), "0", nil +} + +func (a *LocalConn) PreQueueId() string { +	return qidPre +} + +func (a *LocalConn) WipeQueueId() string { +	return qidWipe +} + +func (a *LocalConn) OCRPageQueueId() string { +	return qidOCR +} + +func (a *LocalConn) AnalyseQueueId() string { +	return qidAnalyse +} + +func (a *LocalConn) WIPStorageId() string { +	return storageId +} + +func prefixwalker(dirpath string, prefix string, list *[]ObjMeta) filepath.WalkFunc { +	return func(path string, info os.FileInfo, err error) error { +		if err != nil { +			return err +		} +		if info.IsDir() { +			return nil +		} +		//n := filepath.Base(path) +		n := strings.TrimPrefix(path, dirpath) +		o := ObjMeta{Name: n, Date: info.ModTime()} +		*list = append(*list, o) +		return nil +	} +} + +func (a *LocalConn) ListObjects(bucket string, prefix string) ([]string, error) { +	var names []string +	list, err := a.ListObjectsWithMeta(bucket, prefix) +	if err != nil { +		return names, err +	} +	for _, v := range list { +		names = append(names, v.Name) +	} +	return names, nil +} + +func (a *LocalConn) ListObjectsWithMeta(bucket string, prefix string) ([]ObjMeta, error) { +	var list []ObjMeta +	err := filepath.Walk(filepath.Join(a.TempDir, bucket), prefixwalker(filepath.Join(a.TempDir, bucket), prefix, &list)) +	return list, err +} + +// AddToQueue adds a message to a queue +func (a *LocalConn) AddToQueue(url string, msg string) error { +	f, err := os.OpenFile(filepath.Join(a.TempDir, url), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) +	if err != nil { +		return err +	} +	defer f.Close() +	_, err = f.WriteString(msg + "\n") +	return err +} + +// DelFromQueue deletes a message from a queue +func (a *LocalConn) DelFromQueue(url string, handle string) error { +	b, err := ioutil.ReadFile(filepath.Join(a.TempDir, url)) +	if err != nil { +		return err +	} +	s := string(b) + +	i := strings.Index(s, handle) + +	// store the joining of part before and part after handle +	complete := s[0:i] + s[i + len(handle) + 1:len(s)] + +	f, err := os.Create(filepath.Join(a.TempDir, url)) +	if err != nil { +		return err +	} +	defer f.Close() +	_, err = f.WriteString(complete) +	return err +} + +// Download just copies the file from TempDir/bucket/key to path +func (a *LocalConn) Download(bucket string, key string, path string) error { +	f, err := os.Create(path) +	if err != nil { +		return err +	} +	defer f.Close() + +	fin, err := os.Open(filepath.Join(a.TempDir, bucket, key)) +	if err != nil { +		return err +	} +	defer fin.Close() +	_, err = io.Copy(f, fin) +	return err +} + +// Upload just copies the file from path to TempDir/bucket/key +func (a *LocalConn) Upload(bucket string, key string, path string) error { +	d := filepath.Join(a.TempDir, bucket, filepath.Dir(key)) +	err := os.Mkdir(d, 0700) +	if err != nil && !os.IsExist(err) { +		return fmt.Errorf("Error creating temporary directory: %v", err) +	} +	f, err := os.Create(filepath.Join(a.TempDir, bucket, key)) +	if err != nil { +		return err +	} +	defer f.Close() + +	fin, err := os.Open(path) +	if err != nil { +		return err +	} +	defer fin.Close() +	_, err = io.Copy(f, fin) +	return err +} + +func (a *LocalConn) GetLogger() *log.Logger { +	return a.Logger +} + +// Log records an item in the with the Logger. Arguments are handled +// as with fmt.Println. +func (a *LocalConn) Log(v ...interface{}) { +	a.Logger.Print(v...) +} | 
