diff options
-rw-r--r-- | cmd/addtoqueue/main.go | 13 | ||||
-rw-r--r-- | cmd/bookpipeline/main.go | 16 | ||||
-rw-r--r-- | cmd/booktopipeline/main.go | 12 | ||||
-rw-r--r-- | cmd/getpipelinebook/main.go | 12 | ||||
-rw-r--r-- | local.go | 241 |
5 files changed, 283 insertions, 11 deletions
diff --git a/cmd/addtoqueue/main.go b/cmd/addtoqueue/main.go index 8e4ecd2..57087ca 100644 --- a/cmd/addtoqueue/main.go +++ b/cmd/addtoqueue/main.go @@ -14,7 +14,7 @@ import ( "rescribe.xyz/bookpipeline" ) -const usage = `Usage: addtoqueue qname msg +const usage = `Usage: addtoqueue [-c conn] qname msg addtoqueue adds a message to a queue. @@ -44,6 +44,7 @@ type QueuePipeliner interface { } func main() { + conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) flag.PrintDefaults() @@ -58,7 +59,15 @@ func main() { var n NullWriter quietlog := log.New(n, "", 0) var conn QueuePipeliner - conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: quietlog} + + switch *conntype { + case "aws": + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: quietlog} + case "local": + conn = &bookpipeline.LocalConn{Logger: quietlog} + default: + log.Fatalln("Unknown connection type") + } err := conn.Init() if err != nil { diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 3f55def..f5025ac 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -25,7 +25,7 @@ import ( "rescribe.xyz/utils/pkg/hocr" ) -const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] +const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] Watches the preprocess, wipeonly, ocrpage and analyse queues for messages. When one is found this general process is followed: @@ -723,6 +723,7 @@ func main() { noocrpg := flag.Bool("nop", false, "disable ocr on individual pages") noanalyse := flag.Bool("na", false, "disable analysis") autoshutdown := flag.Bool("shutdown", false, "automatically shut down if no work has been available for 5 minutes") + conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) @@ -743,7 +744,14 @@ func main() { ocredPattern := regexp.MustCompile(`.hocr$`) var conn Pipeliner - conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + switch *conntype { + case "aws": + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + case "local": + conn = &bookpipeline.LocalConn{Logger: verboselog} + default: + log.Fatalln("Unknown connection type") + } conn.Log("Setting up AWS session") err := conn.Init() @@ -773,9 +781,7 @@ func main() { if !*noanalyse { checkAnalyseQueue = time.After(0) } - if *autoshutdown { - shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown) - } + shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown) savelognow = time.NewTicker(LogSaveTime) for { diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go index 2e7064e..609c3b3 100644 --- a/cmd/booktopipeline/main.go +++ b/cmd/booktopipeline/main.go @@ -16,7 +16,7 @@ import ( "rescribe.xyz/bookpipeline" ) -const usage = `Usage: booktopipeline [-t training] [-prebinarised] [-v] bookdir [bookname] +const usage = `Usage: booktopipeline [-c conn] [-t training] [-prebinarised] [-v] bookdir [bookname] Uploads the book in bookdir to the S3 'inprogress' bucket and adds it to the 'preprocess' SQS queue, or the 'wipeonly' queue if the @@ -57,6 +57,7 @@ func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { func main() { verbose := flag.Bool("v", false, "Verbose") + conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") wipeonly := flag.Bool("prebinarised", false, "Prebinarised: only preprocessing will be to wipe") training := flag.String("t", "", "Training to use (training filename without the .traineddata part)") @@ -86,7 +87,14 @@ func main() { } var conn Pipeliner - conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + switch *conntype { + case "aws": + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + case "local": + conn = &bookpipeline.LocalConn{Logger: verboselog} + default: + log.Fatalln("Unknown connection type") + } err := conn.Init() if err != nil { log.Fatalln("Failed to set up cloud connection:", err) diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go index dc9387f..03e709b 100644 --- a/cmd/getpipelinebook/main.go +++ b/cmd/getpipelinebook/main.go @@ -17,7 +17,7 @@ import ( "rescribe.xyz/bookpipeline" ) -const usage = `Usage: getpipelinebook [-a] [-graph] [-pdf] [-png] [-v] bookname +const usage = `Usage: getpipelinebook [-c conn] [-a] [-graph] [-pdf] [-png] [-v] bookname Downloads the pipeline results for a book. @@ -57,6 +57,7 @@ func getpdfs(conn Pipeliner, l *log.Logger, bookname string) { func main() { all := flag.Bool("a", false, "Get all files for book") + conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") graph := flag.Bool("graph", false, "Only download graphs (can be used alongside -pdf)") binarisedpdf := flag.Bool("binarisedpdf", false, "Only download binarised PDF (can be used alongside -graph)") colourpdf := flag.Bool("colourpdf", false, "Only download colour PDF (can be used alongside -graph)") @@ -83,7 +84,14 @@ func main() { } var conn Pipeliner - conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + switch *conntype { + case "aws": + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + case "local": + conn = &bookpipeline.LocalConn{Logger: verboselog} + default: + log.Fatalln("Unknown connection type") + } verboselog.Println("Setting up AWS session") err := conn.MinimalInit() diff --git a/local.go b/local.go new file mode 100644 index 0000000..70991b2 --- /dev/null +++ b/local.go @@ -0,0 +1,241 @@ +// Copyright 2020 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +package bookpipeline + +import ( + "bufio" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "path/filepath" + "strings" +) + +const qidPre = "queuePre" +const qidWipe = "queueWipe" +const qidOCR = "queueOCR" +const qidAnalyse = "queueAnalyse" +const storageId = "storage" + +// LocalConn is a simple implementation of the pipeliner interface +// that doesn't rely on any "cloud" services, instead doing everything +// on the local machine. This is particularly useful for testing. +type LocalConn struct { + // these should be set before running Init(), or left to defaults + TempDir string + Logger *log.Logger +} + +// MinimalInit does the bare minimum initialisation +func (a *LocalConn) MinimalInit() error { + var err error + if a.TempDir == "" { + a.TempDir = filepath.Join(os.TempDir(), "bookpipeline") + } + err = os.Mkdir(a.TempDir, 0700) + if err != nil && !os.IsExist(err) { + return fmt.Errorf("Error creating temporary directory: %v", err) + } + + err = os.Mkdir(filepath.Join(a.TempDir, storageId), 0700) + if err != nil && !os.IsExist(err) { + return fmt.Errorf("Error creating storage directory: %v", err) + } + + if a.Logger == nil { + a.Logger = log.New(os.Stdout, "", 0) + } + + return nil +} + +// Init just does the same as MinimalInit +func (a *LocalConn) Init() error { + err := a.MinimalInit() + if err != nil { + return err + } + + return nil +} + +// CheckQueue checks for any messages in a queue +func (a *LocalConn) CheckQueue(url string, timeout int64) (Qmsg, error) { + f, err := os.Open(filepath.Join(a.TempDir, url)) + if err != nil { + f, err = os.Create(filepath.Join(a.TempDir, url)) + } + if err != nil { + return Qmsg{}, err + } + if err != nil { + return Qmsg{}, err + } + defer f.Close() + r := bufio.NewReader(f) + s, err := r.ReadString('\n') + if err != nil && err != io.EOF { + return Qmsg{}, err + } + s = strings.TrimSpace(s) + + return Qmsg{Body: s, Handle: s}, nil +} + +// QueueHeartbeat is a no-op with LocalConn +func (a *LocalConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, error) { + return Qmsg{}, nil +} + +// GetQueueDetails gets the number of in progress and available +// messages for a queue. These are returned as strings. +func (a *LocalConn) GetQueueDetails(url string) (string, string, error) { + b, err := ioutil.ReadFile(filepath.Join(a.TempDir, url)) + if err != nil { + return "", "", err + } + s := string(b) + n := strings.Count(s, "\n") + + return fmt.Sprintf("%d", n), "0", nil +} + +func (a *LocalConn) PreQueueId() string { + return qidPre +} + +func (a *LocalConn) WipeQueueId() string { + return qidWipe +} + +func (a *LocalConn) OCRPageQueueId() string { + return qidOCR +} + +func (a *LocalConn) AnalyseQueueId() string { + return qidAnalyse +} + +func (a *LocalConn) WIPStorageId() string { + return storageId +} + +func prefixwalker(dirpath string, prefix string, list *[]ObjMeta) filepath.WalkFunc { + return func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + //n := filepath.Base(path) + n := strings.TrimPrefix(path, dirpath) + o := ObjMeta{Name: n, Date: info.ModTime()} + *list = append(*list, o) + return nil + } +} + +func (a *LocalConn) ListObjects(bucket string, prefix string) ([]string, error) { + var names []string + list, err := a.ListObjectsWithMeta(bucket, prefix) + if err != nil { + return names, err + } + for _, v := range list { + names = append(names, v.Name) + } + return names, nil +} + +func (a *LocalConn) ListObjectsWithMeta(bucket string, prefix string) ([]ObjMeta, error) { + var list []ObjMeta + err := filepath.Walk(filepath.Join(a.TempDir, bucket), prefixwalker(filepath.Join(a.TempDir, bucket), prefix, &list)) + return list, err +} + +// AddToQueue adds a message to a queue +func (a *LocalConn) AddToQueue(url string, msg string) error { + f, err := os.OpenFile(filepath.Join(a.TempDir, url), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + _, err = f.WriteString(msg + "\n") + return err +} + +// DelFromQueue deletes a message from a queue +func (a *LocalConn) DelFromQueue(url string, handle string) error { + b, err := ioutil.ReadFile(filepath.Join(a.TempDir, url)) + if err != nil { + return err + } + s := string(b) + + i := strings.Index(s, handle) + + // store the joining of part before and part after handle + complete := s[0:i] + s[i + len(handle) + 1:len(s)] + + f, err := os.Create(filepath.Join(a.TempDir, url)) + if err != nil { + return err + } + defer f.Close() + _, err = f.WriteString(complete) + return err +} + +// Download just copies the file from TempDir/bucket/key to path +func (a *LocalConn) Download(bucket string, key string, path string) error { + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + fin, err := os.Open(filepath.Join(a.TempDir, bucket, key)) + if err != nil { + return err + } + defer fin.Close() + _, err = io.Copy(f, fin) + return err +} + +// Upload just copies the file from path to TempDir/bucket/key +func (a *LocalConn) Upload(bucket string, key string, path string) error { + d := filepath.Join(a.TempDir, bucket, filepath.Dir(key)) + err := os.Mkdir(d, 0700) + if err != nil && !os.IsExist(err) { + return fmt.Errorf("Error creating temporary directory: %v", err) + } + f, err := os.Create(filepath.Join(a.TempDir, bucket, key)) + if err != nil { + return err + } + defer f.Close() + + fin, err := os.Open(path) + if err != nil { + return err + } + defer fin.Close() + _, err = io.Copy(f, fin) + return err +} + +func (a *LocalConn) GetLogger() *log.Logger { + return a.Logger +} + +// Log records an item in the with the Logger. Arguments are handled +// as with fmt.Println. +func (a *LocalConn) Log(v ...interface{}) { + a.Logger.Print(v...) +} |