From 0547ff4c377ec2905f717969f07779208b5bded4 Mon Sep 17 00:00:00 2001 From: Nick White Date: Fri, 22 May 2020 18:14:12 +0100 Subject: Add experimental local connection type --- local.go | 235 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 local.go diff --git a/local.go b/local.go new file mode 100644 index 0000000..bc9fafb --- /dev/null +++ b/local.go @@ -0,0 +1,235 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +package bookpipeline + +import ( + "bufio" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "path/filepath" + "strings" +) + +const qidPre = "queuePre" +const qidWipe = "queueWipe" +const qidOCR = "queueOCR" +const qidAnalyse = "queueAnalyse" +const storageId = "storage" + +// LocalConn is a simple implementation of the pipeliner interface +// that doesn't rely on any "cloud" services, instead doing everything +// on the local machine. This is particularly useful for testing. +type LocalConn struct { + // these should be set before running Init(), or left to defaults + TempDir string + Logger *log.Logger +} + +// MinimalInit does the bare minimum initialisation +func (a *LocalConn) MinimalInit() error { + var err error + if a.TempDir == "" { + a.TempDir = filepath.Join(os.TempDir(), "bookpipeline") + } + err = os.Mkdir(a.TempDir, 0700) + if err != nil && !os.IsExist(err) { + return fmt.Errorf("Error creating temporary directory: %v", err) + } + + err = os.Mkdir(filepath.Join(a.TempDir, storageId), 0700) + if err != nil && !os.IsExist(err) { + return fmt.Errorf("Error creating storage directory: %v", err) + } + + if a.Logger == nil { + a.Logger = log.New(os.Stdout, "", 0) + } + + return nil +} + +// Init just does the same as MinimalInit +func (a *LocalConn) Init() error { + err := a.MinimalInit() + if err != nil { + return err + } + + return nil +} + +// CheckQueue checks for any messages in a queue +func (a *LocalConn) CheckQueue(url string, timeout int64) (Qmsg, error) { + f, err := os.Open(filepath.Join(a.TempDir, url)) + if err != nil { + f, err = os.Create(filepath.Join(a.TempDir, url)) + } + if err != nil { + return Qmsg{}, err + } + if err != nil { + return Qmsg{}, err + } + defer f.Close() + r := bufio.NewReader(f) + s, err := r.ReadString('\n') + if err != nil && err != io.EOF { + return Qmsg{}, err + } + + return Qmsg{Body: s, Handle: s}, nil +} + +// QueueHeartbeat is a no-op with LocalConn +func (a *LocalConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, error) { + return Qmsg{}, nil +} + +// GetQueueDetails gets the number of in progress and available +// messages for a queue. These are returned as strings. +func (a *LocalConn) GetQueueDetails(url string) (string, string, error) { + b, err := ioutil.ReadFile(filepath.Join(a.TempDir, url)) + if err != nil { + return "", "", err + } + s := string(b) + n := strings.Count(s, "\n") + + return fmt.Sprintf("%d", n), "0", nil +} + +func (a *LocalConn) PreQueueId() string { + return qidPre +} + +func (a *LocalConn) WipeQueueId() string { + return qidWipe +} + +func (a *LocalConn) OCRPageQueueId() string { + return qidOCR +} + +func (a *LocalConn) AnalyseQueueId() string { + return qidAnalyse +} + +func (a *LocalConn) WIPStorageId() string { + return storageId +} + +func prefixwalker(dirpath string, prefix string, list *[]ObjMeta) filepath.WalkFunc { + return func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + //n := filepath.Base(path) + n := strings.TrimPrefix(path, dirpath) + o := ObjMeta{Name: n, Date: info.ModTime()} + *list = append(*list, o) + return nil + } +} + +func (a *LocalConn) ListObjects(bucket string, prefix string) ([]string, error) { + var names []string + list, err := a.ListObjectsWithMeta(bucket, prefix) + if err != nil { + return names, err + } + for _, v := range list { + names = append(names, v.Name) + } + return names, nil +} + +func (a *LocalConn) ListObjectsWithMeta(bucket string, prefix string) ([]ObjMeta, error) { + var list []ObjMeta + err := filepath.Walk(filepath.Join(a.TempDir, bucket), prefixwalker(filepath.Join(a.TempDir, bucket), prefix, &list)) + return list, err +} + +// AddToQueue adds a message to a queue +func (a *LocalConn) AddToQueue(url string, msg string) error { + f, err := os.OpenFile(filepath.Join(a.TempDir, url), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + _, err = f.WriteString(msg + "\n") + return err +} + +// DelFromQueue deletes a message from a queue +func (a *LocalConn) DelFromQueue(url string, handle string) error { + b, err := ioutil.ReadFile(filepath.Join(a.TempDir, url)) + if err != nil { + return err + } + s := string(b) + + i := strings.Index(s, handle) + + // store the joining of part before and part after handle + complete := s[0:i] + s[i + len(handle):len(s)] + + f, err := os.Create(filepath.Join(a.TempDir, url)) + if err != nil { + return err + } + defer f.Close() + _, err = f.WriteString(complete) + return err +} + +// Download just copies the file from TempDir/bucket/key to path +func (a *LocalConn) Download(bucket string, key string, path string) error { + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + fin, err := os.Open(filepath.Join(a.TempDir, bucket, key)) + if err != nil { + return err + } + defer fin.Close() + _, err = io.Copy(f, fin) + return err +} + +// Upload just copies the file from path to TempDir/bucket/key +func (a *LocalConn) Upload(bucket string, key string, path string) error { + f, err := os.Create(filepath.Join(a.TempDir, bucket, key)) + if err != nil { + return err + } + defer f.Close() + + fin, err := os.Open(path) + if err != nil { + return err + } + defer fin.Close() + _, err = io.Copy(f, fin) + return err +} + +func (a *LocalConn) GetLogger() *log.Logger { + return a.Logger +} + +// Log records an item in the with the Logger. Arguments are handled +// as with fmt.Println. +func (a *LocalConn) Log(v ...interface{}) { + a.Logger.Print(v...) +} -- cgit v1.2.1-24-ge1ad From 519244c01ff6a5cd3550fc44656b6eb741b99a8a Mon Sep 17 00:00:00 2001 From: Nick White Date: Fri, 22 May 2020 18:14:52 +0100 Subject: Fix bookpipeline failing if shutdown option isnt used --- cmd/bookpipeline/main.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 3b9befb..9618e93 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -727,9 +727,7 @@ func main() { if !*noanalyse { checkAnalyseQueue = time.After(0) } - if *autoshutdown { - shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown) - } + shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown) savelognow = time.NewTicker(LogSaveTime) for { -- cgit v1.2.1-24-ge1ad From c80b2b74037297f7a8be5ece015dbdbea2d170f6 Mon Sep 17 00:00:00 2001 From: Nick White Date: Fri, 22 May 2020 18:22:21 +0100 Subject: Fix CheckQueue for LocalConn --- local.go | 1 + 1 file changed, 1 insertion(+) diff --git a/local.go b/local.go index bc9fafb..5e21b19 100644 --- a/local.go +++ b/local.go @@ -81,6 +81,7 @@ func (a *LocalConn) CheckQueue(url string, timeout int64) (Qmsg, error) { if err != nil && err != io.EOF { return Qmsg{}, err } + s = strings.TrimSpace(s) return Qmsg{Body: s, Handle: s}, nil } -- cgit v1.2.1-24-ge1ad From ed561822b58756c101654946cff0617a6038bc2c Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 26 May 2020 16:02:42 +0100 Subject: Fix DelFromQueue and Upload for local connections --- local.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/local.go b/local.go index 5e21b19..70991b2 100644 --- a/local.go +++ b/local.go @@ -1,4 +1,4 @@ -// Copyright 2019 Nick White. +// Copyright 2020 Nick White. // Use of this source code is governed by the GPLv3 // license that can be found in the LICENSE file. @@ -180,7 +180,7 @@ func (a *LocalConn) DelFromQueue(url string, handle string) error { i := strings.Index(s, handle) // store the joining of part before and part after handle - complete := s[0:i] + s[i + len(handle):len(s)] + complete := s[0:i] + s[i + len(handle) + 1:len(s)] f, err := os.Create(filepath.Join(a.TempDir, url)) if err != nil { @@ -210,6 +210,11 @@ func (a *LocalConn) Download(bucket string, key string, path string) error { // Upload just copies the file from path to TempDir/bucket/key func (a *LocalConn) Upload(bucket string, key string, path string) error { + d := filepath.Join(a.TempDir, bucket, filepath.Dir(key)) + err := os.Mkdir(d, 0700) + if err != nil && !os.IsExist(err) { + return fmt.Errorf("Error creating temporary directory: %v", err) + } f, err := os.Create(filepath.Join(a.TempDir, bucket, key)) if err != nil { return err -- cgit v1.2.1-24-ge1ad From ebc27eef77868fa44daed7cfb0ea129690029da8 Mon Sep 17 00:00:00 2001 From: Nick White Date: Tue, 26 May 2020 16:03:12 +0100 Subject: Add -c conntype for necessary tools to allow local connection to be used --- cmd/addtoqueue/main.go | 13 +++++++++++-- cmd/bookpipeline/main.go | 12 ++++++++++-- cmd/booktopipeline/main.go | 12 ++++++++++-- cmd/getpipelinebook/main.go | 12 ++++++++++-- 4 files changed, 41 insertions(+), 8 deletions(-) diff --git a/cmd/addtoqueue/main.go b/cmd/addtoqueue/main.go index 8e4ecd2..57087ca 100644 --- a/cmd/addtoqueue/main.go +++ b/cmd/addtoqueue/main.go @@ -14,7 +14,7 @@ import ( "rescribe.xyz/bookpipeline" ) -const usage = `Usage: addtoqueue qname msg +const usage = `Usage: addtoqueue [-c conn] qname msg addtoqueue adds a message to a queue. @@ -44,6 +44,7 @@ type QueuePipeliner interface { } func main() { + conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) flag.PrintDefaults() @@ -58,7 +59,15 @@ func main() { var n NullWriter quietlog := log.New(n, "", 0) var conn QueuePipeliner - conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: quietlog} + + switch *conntype { + case "aws": + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: quietlog} + case "local": + conn = &bookpipeline.LocalConn{Logger: quietlog} + default: + log.Fatalln("Unknown connection type") + } err := conn.Init() if err != nil { diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 9618e93..344006a 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -25,7 +25,7 @@ import ( "rescribe.xyz/utils/pkg/hocr" ) -const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] +const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] Watches the preprocess, wipeonly, ocrpage and analyse queues for messages. When one is found this general process is followed: @@ -677,6 +677,7 @@ func main() { noocrpg := flag.Bool("nop", false, "disable ocr on individual pages") noanalyse := flag.Bool("na", false, "disable analysis") autoshutdown := flag.Bool("shutdown", false, "automatically shut down if no work has been available for 5 minutes") + conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), usage) @@ -697,7 +698,14 @@ func main() { ocredPattern := regexp.MustCompile(`.hocr$`) var conn Pipeliner - conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + switch *conntype { + case "aws": + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + case "local": + conn = &bookpipeline.LocalConn{Logger: verboselog} + default: + log.Fatalln("Unknown connection type") + } conn.Log("Setting up AWS session") err := conn.Init() diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go index 2e7064e..609c3b3 100644 --- a/cmd/booktopipeline/main.go +++ b/cmd/booktopipeline/main.go @@ -16,7 +16,7 @@ import ( "rescribe.xyz/bookpipeline" ) -const usage = `Usage: booktopipeline [-t training] [-prebinarised] [-v] bookdir [bookname] +const usage = `Usage: booktopipeline [-c conn] [-t training] [-prebinarised] [-v] bookdir [bookname] Uploads the book in bookdir to the S3 'inprogress' bucket and adds it to the 'preprocess' SQS queue, or the 'wipeonly' queue if the @@ -57,6 +57,7 @@ func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { func main() { verbose := flag.Bool("v", false, "Verbose") + conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") wipeonly := flag.Bool("prebinarised", false, "Prebinarised: only preprocessing will be to wipe") training := flag.String("t", "", "Training to use (training filename without the .traineddata part)") @@ -86,7 +87,14 @@ func main() { } var conn Pipeliner - conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + switch *conntype { + case "aws": + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + case "local": + conn = &bookpipeline.LocalConn{Logger: verboselog} + default: + log.Fatalln("Unknown connection type") + } err := conn.Init() if err != nil { log.Fatalln("Failed to set up cloud connection:", err) diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go index dc9387f..03e709b 100644 --- a/cmd/getpipelinebook/main.go +++ b/cmd/getpipelinebook/main.go @@ -17,7 +17,7 @@ import ( "rescribe.xyz/bookpipeline" ) -const usage = `Usage: getpipelinebook [-a] [-graph] [-pdf] [-png] [-v] bookname +const usage = `Usage: getpipelinebook [-c conn] [-a] [-graph] [-pdf] [-png] [-v] bookname Downloads the pipeline results for a book. @@ -57,6 +57,7 @@ func getpdfs(conn Pipeliner, l *log.Logger, bookname string) { func main() { all := flag.Bool("a", false, "Get all files for book") + conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") graph := flag.Bool("graph", false, "Only download graphs (can be used alongside -pdf)") binarisedpdf := flag.Bool("binarisedpdf", false, "Only download binarised PDF (can be used alongside -graph)") colourpdf := flag.Bool("colourpdf", false, "Only download colour PDF (can be used alongside -graph)") @@ -83,7 +84,14 @@ func main() { } var conn Pipeliner - conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + switch *conntype { + case "aws": + conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + case "local": + conn = &bookpipeline.LocalConn{Logger: verboselog} + default: + log.Fatalln("Unknown connection type") + } verboselog.Println("Setting up AWS session") err := conn.MinimalInit() -- cgit v1.2.1-24-ge1ad