summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/addtoqueue/main.go13
-rw-r--r--cmd/bookpipeline/main.go16
-rw-r--r--cmd/booktopipeline/main.go12
-rw-r--r--cmd/getpipelinebook/main.go12
-rw-r--r--local.go241
5 files changed, 283 insertions, 11 deletions
diff --git a/cmd/addtoqueue/main.go b/cmd/addtoqueue/main.go
index 8e4ecd2..57087ca 100644
--- a/cmd/addtoqueue/main.go
+++ b/cmd/addtoqueue/main.go
@@ -14,7 +14,7 @@ import (
"rescribe.xyz/bookpipeline"
)
-const usage = `Usage: addtoqueue qname msg
+const usage = `Usage: addtoqueue [-c conn] qname msg
addtoqueue adds a message to a queue.
@@ -44,6 +44,7 @@ type QueuePipeliner interface {
}
func main() {
+ conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")
flag.Usage = func() {
fmt.Fprintf(flag.CommandLine.Output(), usage)
flag.PrintDefaults()
@@ -58,7 +59,15 @@ func main() {
var n NullWriter
quietlog := log.New(n, "", 0)
var conn QueuePipeliner
- conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: quietlog}
+
+ switch *conntype {
+ case "aws":
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: quietlog}
+ case "local":
+ conn = &bookpipeline.LocalConn{Logger: quietlog}
+ default:
+ log.Fatalln("Unknown connection type")
+ }
err := conn.Init()
if err != nil {
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go
index 3f55def..f5025ac 100644
--- a/cmd/bookpipeline/main.go
+++ b/cmd/bookpipeline/main.go
@@ -25,7 +25,7 @@ import (
"rescribe.xyz/utils/pkg/hocr"
)
-const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false]
+const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false]
Watches the preprocess, wipeonly, ocrpage and analyse queues for messages.
When one is found this general process is followed:
@@ -723,6 +723,7 @@ func main() {
noocrpg := flag.Bool("nop", false, "disable ocr on individual pages")
noanalyse := flag.Bool("na", false, "disable analysis")
autoshutdown := flag.Bool("shutdown", false, "automatically shut down if no work has been available for 5 minutes")
+ conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")
flag.Usage = func() {
fmt.Fprintf(flag.CommandLine.Output(), usage)
@@ -743,7 +744,14 @@ func main() {
ocredPattern := regexp.MustCompile(`.hocr$`)
var conn Pipeliner
- conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
+ switch *conntype {
+ case "aws":
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
+ case "local":
+ conn = &bookpipeline.LocalConn{Logger: verboselog}
+ default:
+ log.Fatalln("Unknown connection type")
+ }
conn.Log("Setting up AWS session")
err := conn.Init()
@@ -773,9 +781,7 @@ func main() {
if !*noanalyse {
checkAnalyseQueue = time.After(0)
}
- if *autoshutdown {
- shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown)
- }
+ shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown)
savelognow = time.NewTicker(LogSaveTime)
for {
diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go
index 2e7064e..609c3b3 100644
--- a/cmd/booktopipeline/main.go
+++ b/cmd/booktopipeline/main.go
@@ -16,7 +16,7 @@ import (
"rescribe.xyz/bookpipeline"
)
-const usage = `Usage: booktopipeline [-t training] [-prebinarised] [-v] bookdir [bookname]
+const usage = `Usage: booktopipeline [-c conn] [-t training] [-prebinarised] [-v] bookdir [bookname]
Uploads the book in bookdir to the S3 'inprogress' bucket and adds it
to the 'preprocess' SQS queue, or the 'wipeonly' queue if the
@@ -57,6 +57,7 @@ func (f fileWalk) Walk(path string, info os.FileInfo, err error) error {
func main() {
verbose := flag.Bool("v", false, "Verbose")
+ conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")
wipeonly := flag.Bool("prebinarised", false, "Prebinarised: only preprocessing will be to wipe")
training := flag.String("t", "", "Training to use (training filename without the .traineddata part)")
@@ -86,7 +87,14 @@ func main() {
}
var conn Pipeliner
- conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
+ switch *conntype {
+ case "aws":
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
+ case "local":
+ conn = &bookpipeline.LocalConn{Logger: verboselog}
+ default:
+ log.Fatalln("Unknown connection type")
+ }
err := conn.Init()
if err != nil {
log.Fatalln("Failed to set up cloud connection:", err)
diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go
index dc9387f..03e709b 100644
--- a/cmd/getpipelinebook/main.go
+++ b/cmd/getpipelinebook/main.go
@@ -17,7 +17,7 @@ import (
"rescribe.xyz/bookpipeline"
)
-const usage = `Usage: getpipelinebook [-a] [-graph] [-pdf] [-png] [-v] bookname
+const usage = `Usage: getpipelinebook [-c conn] [-a] [-graph] [-pdf] [-png] [-v] bookname
Downloads the pipeline results for a book.
@@ -57,6 +57,7 @@ func getpdfs(conn Pipeliner, l *log.Logger, bookname string) {
func main() {
all := flag.Bool("a", false, "Get all files for book")
+ conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")
graph := flag.Bool("graph", false, "Only download graphs (can be used alongside -pdf)")
binarisedpdf := flag.Bool("binarisedpdf", false, "Only download binarised PDF (can be used alongside -graph)")
colourpdf := flag.Bool("colourpdf", false, "Only download colour PDF (can be used alongside -graph)")
@@ -83,7 +84,14 @@ func main() {
}
var conn Pipeliner
- conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
+ switch *conntype {
+ case "aws":
+ conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog}
+ case "local":
+ conn = &bookpipeline.LocalConn{Logger: verboselog}
+ default:
+ log.Fatalln("Unknown connection type")
+ }
verboselog.Println("Setting up AWS session")
err := conn.MinimalInit()
diff --git a/local.go b/local.go
new file mode 100644
index 0000000..70991b2
--- /dev/null
+++ b/local.go
@@ -0,0 +1,241 @@
+// Copyright 2020 Nick White.
+// Use of this source code is governed by the GPLv3
+// license that can be found in the LICENSE file.
+
+package bookpipeline
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "os"
+ "path/filepath"
+ "strings"
+)
+
+const qidPre = "queuePre"
+const qidWipe = "queueWipe"
+const qidOCR = "queueOCR"
+const qidAnalyse = "queueAnalyse"
+const storageId = "storage"
+
+// LocalConn is a simple implementation of the pipeliner interface
+// that doesn't rely on any "cloud" services, instead doing everything
+// on the local machine. This is particularly useful for testing.
+type LocalConn struct {
+ // these should be set before running Init(), or left to defaults
+ TempDir string
+ Logger *log.Logger
+}
+
+// MinimalInit does the bare minimum initialisation
+func (a *LocalConn) MinimalInit() error {
+ var err error
+ if a.TempDir == "" {
+ a.TempDir = filepath.Join(os.TempDir(), "bookpipeline")
+ }
+ err = os.Mkdir(a.TempDir, 0700)
+ if err != nil && !os.IsExist(err) {
+ return fmt.Errorf("Error creating temporary directory: %v", err)
+ }
+
+ err = os.Mkdir(filepath.Join(a.TempDir, storageId), 0700)
+ if err != nil && !os.IsExist(err) {
+ return fmt.Errorf("Error creating storage directory: %v", err)
+ }
+
+ if a.Logger == nil {
+ a.Logger = log.New(os.Stdout, "", 0)
+ }
+
+ return nil
+}
+
+// Init just does the same as MinimalInit
+func (a *LocalConn) Init() error {
+ err := a.MinimalInit()
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// CheckQueue checks for any messages in a queue
+func (a *LocalConn) CheckQueue(url string, timeout int64) (Qmsg, error) {
+ f, err := os.Open(filepath.Join(a.TempDir, url))
+ if err != nil {
+ f, err = os.Create(filepath.Join(a.TempDir, url))
+ }
+ if err != nil {
+ return Qmsg{}, err
+ }
+ if err != nil {
+ return Qmsg{}, err
+ }
+ defer f.Close()
+ r := bufio.NewReader(f)
+ s, err := r.ReadString('\n')
+ if err != nil && err != io.EOF {
+ return Qmsg{}, err
+ }
+ s = strings.TrimSpace(s)
+
+ return Qmsg{Body: s, Handle: s}, nil
+}
+
+// QueueHeartbeat is a no-op with LocalConn
+func (a *LocalConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, error) {
+ return Qmsg{}, nil
+}
+
+// GetQueueDetails gets the number of in progress and available
+// messages for a queue. These are returned as strings.
+func (a *LocalConn) GetQueueDetails(url string) (string, string, error) {
+ b, err := ioutil.ReadFile(filepath.Join(a.TempDir, url))
+ if err != nil {
+ return "", "", err
+ }
+ s := string(b)
+ n := strings.Count(s, "\n")
+
+ return fmt.Sprintf("%d", n), "0", nil
+}
+
+func (a *LocalConn) PreQueueId() string {
+ return qidPre
+}
+
+func (a *LocalConn) WipeQueueId() string {
+ return qidWipe
+}
+
+func (a *LocalConn) OCRPageQueueId() string {
+ return qidOCR
+}
+
+func (a *LocalConn) AnalyseQueueId() string {
+ return qidAnalyse
+}
+
+func (a *LocalConn) WIPStorageId() string {
+ return storageId
+}
+
+func prefixwalker(dirpath string, prefix string, list *[]ObjMeta) filepath.WalkFunc {
+ return func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if info.IsDir() {
+ return nil
+ }
+ //n := filepath.Base(path)
+ n := strings.TrimPrefix(path, dirpath)
+ o := ObjMeta{Name: n, Date: info.ModTime()}
+ *list = append(*list, o)
+ return nil
+ }
+}
+
+func (a *LocalConn) ListObjects(bucket string, prefix string) ([]string, error) {
+ var names []string
+ list, err := a.ListObjectsWithMeta(bucket, prefix)
+ if err != nil {
+ return names, err
+ }
+ for _, v := range list {
+ names = append(names, v.Name)
+ }
+ return names, nil
+}
+
+func (a *LocalConn) ListObjectsWithMeta(bucket string, prefix string) ([]ObjMeta, error) {
+ var list []ObjMeta
+ err := filepath.Walk(filepath.Join(a.TempDir, bucket), prefixwalker(filepath.Join(a.TempDir, bucket), prefix, &list))
+ return list, err
+}
+
+// AddToQueue adds a message to a queue
+func (a *LocalConn) AddToQueue(url string, msg string) error {
+ f, err := os.OpenFile(filepath.Join(a.TempDir, url), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ _, err = f.WriteString(msg + "\n")
+ return err
+}
+
+// DelFromQueue deletes a message from a queue
+func (a *LocalConn) DelFromQueue(url string, handle string) error {
+ b, err := ioutil.ReadFile(filepath.Join(a.TempDir, url))
+ if err != nil {
+ return err
+ }
+ s := string(b)
+
+ i := strings.Index(s, handle)
+
+ // store the joining of part before and part after handle
+ complete := s[0:i] + s[i + len(handle) + 1:len(s)]
+
+ f, err := os.Create(filepath.Join(a.TempDir, url))
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ _, err = f.WriteString(complete)
+ return err
+}
+
+// Download just copies the file from TempDir/bucket/key to path
+func (a *LocalConn) Download(bucket string, key string, path string) error {
+ f, err := os.Create(path)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+
+ fin, err := os.Open(filepath.Join(a.TempDir, bucket, key))
+ if err != nil {
+ return err
+ }
+ defer fin.Close()
+ _, err = io.Copy(f, fin)
+ return err
+}
+
+// Upload just copies the file from path to TempDir/bucket/key
+func (a *LocalConn) Upload(bucket string, key string, path string) error {
+ d := filepath.Join(a.TempDir, bucket, filepath.Dir(key))
+ err := os.Mkdir(d, 0700)
+ if err != nil && !os.IsExist(err) {
+ return fmt.Errorf("Error creating temporary directory: %v", err)
+ }
+ f, err := os.Create(filepath.Join(a.TempDir, bucket, key))
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+
+ fin, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+ defer fin.Close()
+ _, err = io.Copy(f, fin)
+ return err
+}
+
+func (a *LocalConn) GetLogger() *log.Logger {
+ return a.Logger
+}
+
+// Log records an item in the with the Logger. Arguments are handled
+// as with fmt.Println.
+func (a *LocalConn) Log(v ...interface{}) {
+ a.Logger.Print(v...)
+}