diff options
| -rw-r--r-- | README | 19 | ||||
| -rw-r--r-- | aws.go | 58 | ||||
| -rw-r--r-- | cmd/bookpipeline/main.go | 765 | ||||
| -rw-r--r-- | cmd/booktopipeline/main.go | 92 | ||||
| -rw-r--r-- | cmd/getbests/main.go | 4 | ||||
| -rw-r--r-- | cmd/getpipelinebook/main.go | 98 | ||||
| -rw-r--r-- | cmd/logwholequeue/main.go | 85 | ||||
| -rw-r--r-- | cmd/postprocess-bythresh/main.go | 71 | ||||
| -rw-r--r-- | cmd/rescribe/main.go | 389 | ||||
| -rw-r--r-- | cmd/trimqueue/main.go | 84 | ||||
| -rw-r--r-- | doc.go | 14 | ||||
| -rw-r--r-- | go.mod | 2 | ||||
| -rw-r--r-- | go.sum | 6 | ||||
| -rw-r--r-- | internal/pipeline/get.go | 89 | ||||
| -rw-r--r-- | internal/pipeline/pipeline.go | 740 | ||||
| -rw-r--r-- | internal/pipeline/put.go | 85 | ||||
| -rw-r--r-- | local.go | 9 | ||||
| -rw-r--r-- | makefile | 12 | ||||
| -rw-r--r-- | pdf.go | 2 | 
19 files changed, 1710 insertions, 914 deletions
| @@ -9,6 +9,11 @@ by running `go get rescribe.xyz/bookpipeline/...` and documentation  can be read with the `go doc` command or online at  <https://pkg.go.dev/rescribe.xyz/bookpipeline>. +If you just want to install and use the commands, you can get the +package with `git clone https://git.rescribe.xyz/bookpipeline`, and +then install them with `go install ./...` from within the +`bookpipeline` directory. +  ## Commands  The commands in the cmd/ directory are at the heart of this @@ -41,6 +46,20 @@ setting:    - pdfbook   : creates a searchable PDF from a directory of hOCR                  and image files +## Rescribe tool for local operation + +While bookpipeline was built with cloud based operation in mind, there is also +a local mode that can be used to run OCR jobs from a single computer, with all +the benefits of preprocessing, choosing the best threshold for each image, +graph creation, PDF creation, and so on that the pipeline provides. + +Several of the commands accept a `-c local` flag for local operation, but now +there is also a new command, named `rescribe`, that is designed to make things +much simpler for people just wanting to do some OCR on their local computer. + +More information about this, including links to prebuilt executables, can be +found on our blog at <https://blog.rescribe.xyz/posts/desktop-tool/>. +  ## Contributions  Any and all comments, bug reports, patches or pull requests would @@ -9,6 +9,7 @@ import (  	"fmt"  	"log"  	"os" +	"strings"  	"time"  	"github.com/aws/aws-sdk-go/aws" @@ -178,6 +179,63 @@ func (a *AwsConn) LogAndPurgeQueue(url string) error {  	return nil  } +// LogQueue prints the body of all messages in a queue to the log +func (a *AwsConn) LogQueue(url string) error { +	for { +		msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ +			MaxNumberOfMessages: aws.Int64(10), +			VisibilityTimeout:   aws.Int64(300), +			QueueUrl:            &url, +		}) +		if err != nil { +			return err +		} + +		if len(msgResult.Messages) > 0 { +			for _, m := range msgResult.Messages { +				a.Logger.Println(*m.Body) +			} +		} else { +			break +		} +	} +	return nil +} + +// RemovePrefixesFromQueue removes any messages in a queue whose +// body starts with the specified prefix. +func (a *AwsConn) RemovePrefixesFromQueue(url string, prefix string) error { +	for { +		msgResult, err := a.sqssvc.ReceiveMessage(&sqs.ReceiveMessageInput{ +			MaxNumberOfMessages: aws.Int64(10), +			VisibilityTimeout:   aws.Int64(300), +			QueueUrl:            &url, +		}) +		if err != nil { +			return err +		} + +		if len(msgResult.Messages) > 0 { +			for _, m := range msgResult.Messages { +				if !strings.HasPrefix(*m.Body, prefix) { +					continue +				} +				a.Logger.Printf("Removing %s from queue\n", *m.Body) +				_, err = a.sqssvc.DeleteMessage(&sqs.DeleteMessageInput{ +					QueueUrl:      &url, +					ReceiptHandle: m.ReceiptHandle, +				}) +				if err != nil { +					return err +				} +			} +		} else { +			break +		} +	} +	return nil +} +  // QueueHeartbeat updates the visibility timeout of a message. This  // ensures that the message remains "in flight", meaning that it  // cannot be seen by other processes, but if this process fails the diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 36295a6..909b431 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -11,23 +11,18 @@ import (  	"bytes"  	"flag"  	"fmt" -	"io/ioutil"  	"log" -	"net/smtp"  	"os"  	"os/exec" -	"path/filepath"  	"regexp" -	"sort" -	"strings"  	"time"  	"rescribe.xyz/bookpipeline" -	"rescribe.xyz/preproc" -	"rescribe.xyz/utils/pkg/hocr" + +	"rescribe.xyz/bookpipeline/internal/pipeline"  ) -const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] +const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] [-autostop secs]  Watches the preprocess, wipeonly, ocrpage and analyse queues for messages.  When one is found this general process is followed: @@ -47,10 +42,9 @@ this put a text file in {UserConfigDir}/bookpipeline/mailsettings with  the contents: {smtpserver} {port} {username} {password} {from} {to}  ` +const QueueTimeoutSecs = 2 * 60  const PauseBetweenChecks = 3 * time.Minute -const TimeBeforeShutdown = 5 * time.Minute  const LogSaveTime = 1 * time.Minute -const HeartbeatSeconds = 60  // null writer to enable non-verbose logging to be discarded  type NullWriter bool @@ -81,686 +75,16 @@ type Pipeliner interface {  	Log(v ...interface{})  } -type pageimg struct { -	hocr, img string -} - -type mailSettings struct { -	server, port, user, pass, from, to string -} - -func getMailSettings() (mailSettings, error) { -	p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings") -	b, err := ioutil.ReadFile(p) -	if err != nil { -		return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err) -	} -	f := strings.Fields(string(b)) -	if len(f) != 6 { -		return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f)) -	} -	return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil -} - -func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { -	for key := range dl { -		fn := filepath.Join(dir, filepath.Base(key)) -		logger.Println("Downloading", key) -		err := conn.Download(conn.WIPStorageId(), key, fn) -		if err != nil { -			for range dl { -			} // consume the rest of the receiving channel so it isn't blocked -			close(process) -			errc <- err -			return -		} -		process <- fn -	} -	close(process) -} - -func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { -	for path := range c { -		name := filepath.Base(path) -		key := bookname + "/" + name -		logger.Println("Uploading", key) -		err := conn.Upload(conn.WIPStorageId(), key, path) -		if err != nil { -			for range c { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -		err = os.Remove(path) -		if err != nil { -			for range c { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -	} - -	done <- true -} - -func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { -	for path := range c { -		name := filepath.Base(path) -		key := bookname + "/" + name -		logger.Println("Uploading", key) -		err := conn.Upload(conn.WIPStorageId(), key, path) -		if err != nil { -			for range c { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -		err = os.Remove(path) -		if err != nil { -			for range c { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -		logger.Println("Adding", key, training, "to queue", toQueue) -		err = conn.AddToQueue(toQueue, key+" "+training) -		if err != nil { -			for range c { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -	} - -	done <- true -} - -func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { -	for path := range pre { -		logger.Println("Preprocessing", path) -		done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30) -		if err != nil { -			for range pre { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -		_ = os.Remove(path) -		for _, p := range done { -			up <- p -		} -	} -	close(up) -} - -func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { -	for path := range towipe { -		logger.Println("Wiping", path) -		s := strings.Split(path, ".") -		base := strings.Join(s[:len(s)-1], "") -		outpath := base + "_bin0.0.png" -		err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30) -		if err != nil { -			for range towipe { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -		up <- outpath -	} -	close(up) -} - -func ocr(training string) func(chan string, chan string, chan error, *log.Logger) { -	return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { -		for path := range toocr { -			logger.Println("OCRing", path) -			name := strings.Replace(path, ".png", "", 1) -			cmd := exec.Command("tesseract", "-l", training, path, name, "hocr") -			var stdout, stderr bytes.Buffer -			cmd.Stdout = &stdout -			cmd.Stderr = &stderr -			err := cmd.Run() -			if err != nil { -				for range toocr { -				} // consume the rest of the receiving channel so it isn't blocked -				errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String()) -				return -			} -			up <- name + ".hocr" -		} -		close(up) -	} -} - -func analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { -	return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { -		confs := make(map[string][]*bookpipeline.Conf) -		bestconfs := make(map[string]*bookpipeline.Conf) -		savedir := "" - -		for path := range toanalyse { -			if savedir == "" { -				savedir = filepath.Dir(path) -			} -			logger.Println("Calculating confidence for", path) -			avg, err := hocr.GetAvgConf(path) -			if err != nil && err.Error() == "No words found" { -				continue -			} -			if err != nil { -				for range toanalyse { -				} // consume the rest of the receiving channel so it isn't blocked -				errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err) -				return -			} -			base := filepath.Base(path) -			codestart := strings.Index(base, "_bin") -			name := base[0:codestart] -			var c bookpipeline.Conf -			c.Path = path -			c.Code = base[codestart:] -			c.Conf = avg -			confs[name] = append(confs[name], &c) -		} - -		fn := filepath.Join(savedir, "conf") -		logger.Println("Saving confidences in file", fn) -		f, err := os.Create(fn) -		if err != nil { -			errc <- fmt.Errorf("Error creating file %s: %s", fn, err) -			return -		} -		defer f.Close() - -		logger.Println("Finding best confidence for each page, and saving all confidences") -		for base, conf := range confs { -			var best float64 -			for _, c := range conf { -				if c.Conf > best { -					best = c.Conf -					bestconfs[base] = c -				} -				_, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) -				if err != nil { -					errc <- fmt.Errorf("Error writing confidences file: %s", err) -					return -				} -			} -		} -		up <- fn - -		logger.Println("Creating best file listing the best file for each page") -		fn = filepath.Join(savedir, "best") -		f, err = os.Create(fn) -		if err != nil { -			errc <- fmt.Errorf("Error creating file %s: %s", fn, err) -			return -		} -		defer f.Close() -		for _, conf := range bestconfs { -			_, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) -		} -		up <- fn - -		var pgs []string -		for _, conf := range bestconfs { -			pgs = append(pgs, conf.Path) -		} -		sort.Strings(pgs) - -		logger.Println("Downloading binarised and original images to create PDFs") -		bookname, err := filepath.Rel(os.TempDir(), savedir) -		if err != nil { -			errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err) -			return -		} -		colourpdf := new(bookpipeline.Fpdf) -		err = colourpdf.Setup() -		if err != nil { -			errc <- fmt.Errorf("Failed to set up PDF: %s", err) -			return -		} -		binarisedpdf := new(bookpipeline.Fpdf) -		err = binarisedpdf.Setup() -		if err != nil { -			errc <- fmt.Errorf("Failed to set up PDF: %s", err) -			return -		} -		binhascontent, colourhascontent := false, false - -		var colourimgs, binimgs []pageimg - -		for _, pg := range pgs { -			base := filepath.Base(pg) -			nosuffix := strings.TrimSuffix(base, ".hocr") -			p := strings.SplitN(base, "_bin", 2) - -			var fn string -			if len(p) > 1 { -				fn = p[0] + ".jpg" -			} else { -				fn = nosuffix + ".jpg" -			} - -			binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"}) -			colourimgs = append(colourimgs, pageimg{hocr: base, img: fn}) -		} - -		for _, pg := range binimgs { -			logger.Println("Downloading binarised page to add to PDF", pg.img) -			err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img)) -			if err != nil { -				logger.Println("Download failed; skipping page", pg.img) -			} else { -				err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true) -				if err != nil { -					errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) -					return -				} -				binhascontent = true -				err = os.Remove(filepath.Join(savedir, pg.img)) -				if err != nil { -					errc <- err -					return -				} -			} -		} - -		if binhascontent { -			fn = filepath.Join(savedir, bookname+".binarised.pdf") -			err = binarisedpdf.Save(fn) -			if err != nil { -				errc <- fmt.Errorf("Failed to save binarised pdf: %s", err) -				return -			} -			up <- fn -			key := bookname + "/" + bookname + ".binarised.pdf" -			conn.Log("Uploading", key) -			err := conn.Upload(conn.WIPStorageId(), key, fn) -			if err != nil { -			} -		} - -		for _, pg := range colourimgs { -			logger.Println("Downloading colour page to add to PDF", pg.img) -			colourfn := pg.img -			err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) -			if err != nil { -				colourfn = strings.Replace(pg.img, ".jpg", ".png", 1) -				logger.Println("Download failed; trying", colourfn) -				err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) -				if err != nil { -					logger.Println("Download failed; skipping page", pg.img) -				} -			} -			if err == nil { -				err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true) -				if err != nil { -					errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) -					return -				} -				colourhascontent = true -				err = os.Remove(filepath.Join(savedir, colourfn)) -				if err != nil { -					errc <- err -					return -				} -			} -		} -		if colourhascontent { -			fn = filepath.Join(savedir, bookname+".colour.pdf") -			err = colourpdf.Save(fn) -			if err != nil { -				errc <- fmt.Errorf("Failed to save colour pdf: %s", err) -				return -			} -			up <- fn -		} - -		logger.Println("Creating graph") -		fn = filepath.Join(savedir, "graph.png") -		f, err = os.Create(fn) -		if err != nil { -			errc <- fmt.Errorf("Error creating file %s: %s", fn, err) -			return -		} -		defer f.Close() -		err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) -		if err != nil && err.Error() != "Not enough valid confidences" { -			errc <- fmt.Errorf("Error rendering graph: %s", err) -			return -		} -		up <- fn - -		close(up) -	} -} - -func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { -	currentmsg := msg -	for range t.C { -		m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) -		if err != nil { -			// This is for better debugging of the heartbeat issue -			conn.Log("Error with heartbeat", err) -			os.Exit(1) -			// TODO: would be better to ensure this error stops any running -			//       processes, as they will ultimately fail in the case of -			//       it. could do this by setting a global variable that -			//       processes check each time they loop. -			errc <- err -			t.Stop() -			return -		} -		if m.Id != "" { -			conn.Log("Replaced message handle as visibilitytimeout limit was reached") -			currentmsg = m -			// TODO: maybe handle communicating new msg more gracefully than this -			for range msgc { -			} // throw away any old msgc -			msgc <- m -		} -	} -} - -// allOCRed checks whether all pages of a book have been OCRed. -// This is determined by whether every _bin0.?.png file has a -// corresponding .hocr file. -func allOCRed(bookname string, conn Pipeliner) bool { -	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) -	if err != nil { -		return false -	} - -	preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) - -	atleastone := false -	for _, png := range objs { -		if preprocessedPattern.MatchString(png) { -			atleastone = true -			found := false -			b := strings.TrimSuffix(filepath.Base(png), ".png") -			hocrname := bookname + "/" + b + ".hocr" -			for _, hocr := range objs { -				if hocr == hocrname { -					found = true -					break -				} -			} -			if found == false { -				return false -			} -		} -	} -	if atleastone == false { -		return false -	} -	return true -} - -// ocrPage OCRs a page based on a message. It may make sense to -// roll this back into processBook (on which it is based) once -// working well. -func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { -	dl := make(chan string) -	msgc := make(chan bookpipeline.Qmsg) -	processc := make(chan string) -	upc := make(chan string) -	done := make(chan bool) -	errc := make(chan error) - -	msgparts := strings.Split(msg.Body, " ") -	bookname := filepath.Dir(msgparts[0]) -	if len(msgparts) > 1 && msgparts[1] != "" { -		process = ocr(msgparts[1]) -	} - -	d := filepath.Join(os.TempDir(), bookname) -	err := os.MkdirAll(d, 0755) -	if err != nil { -		return fmt.Errorf("Failed to create directory %s: %s", d, err) -	} - -	t := time.NewTicker(HeartbeatSeconds * time.Second) -	go heartbeat(conn, t, msg, fromQueue, msgc, errc) - -	// these functions will do their jobs when their channels have data -	go download(dl, processc, conn, d, errc, conn.GetLogger()) -	go process(processc, upc, errc, conn.GetLogger()) -	go up(upc, done, conn, bookname, errc, conn.GetLogger()) - -	dl <- msgparts[0] -	close(dl) - -	// wait for either the done or errc channel to be sent to -	select { -	case err = <-errc: -		t.Stop() -		_ = os.RemoveAll(d) -		return err -	case <-done: -	} - -	if allOCRed(bookname, conn) && toQueue != "" { -		conn.Log("Sending", bookname, "to queue", toQueue) -		err = conn.AddToQueue(toQueue, bookname) -		if err != nil { -			t.Stop() -			_ = os.RemoveAll(d) -			return fmt.Errorf("Error adding to queue %s: %s", bookname, err) -		} -	} - -	t.Stop() - -	// check whether we're using a newer msg handle -	select { -	case m, ok := <-msgc: -		if ok { -			msg = m -			conn.Log("Using new message handle to delete message from queue") -		} -	default: -		conn.Log("Using original message handle to delete message from queue") -	} - -	conn.Log("Deleting original message from queue", fromQueue) -	err = conn.DelFromQueue(fromQueue, msg.Handle) -	if err != nil { -		_ = os.RemoveAll(d) -		return fmt.Errorf("Error deleting message from queue: %s", err) -	} - -	err = os.RemoveAll(d) -	if err != nil { -		return fmt.Errorf("Failed to remove directory %s: %s", d, err) -	} - -	return nil -} - -func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { -	dl := make(chan string) -	msgc := make(chan bookpipeline.Qmsg) -	processc := make(chan string) -	upc := make(chan string) -	done := make(chan bool) -	errc := make(chan error) - -	msgparts := strings.Split(msg.Body, " ") -	bookname := msgparts[0] - -	var training string -	if len(msgparts) > 1 { -		training = msgparts[1] -	} - -	d := filepath.Join(os.TempDir(), bookname) -	err := os.MkdirAll(d, 0755) -	if err != nil { -		return fmt.Errorf("Failed to create directory %s: %s", d, err) -	} - -	t := time.NewTicker(HeartbeatSeconds * time.Second) -	go heartbeat(conn, t, msg, fromQueue, msgc, errc) - -	// these functions will do their jobs when their channels have data -	go download(dl, processc, conn, d, errc, conn.GetLogger()) -	go process(processc, upc, errc, conn.GetLogger()) -	if toQueue == conn.OCRPageQueueId() { -		go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) -	} else { -		go up(upc, done, conn, bookname, errc, conn.GetLogger()) -	} - -	conn.Log("Getting list of objects to download") -	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) -	if err != nil { -		t.Stop() -		_ = os.RemoveAll(d) -		return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err) -	} -	var todl []string -	for _, n := range objs { -		if !match.MatchString(n) { -			conn.Log("Skipping item that doesn't match target", n) -			continue -		} -		todl = append(todl, n) -	} -	for _, a := range todl { -		dl <- a -	} -	close(dl) - -	// wait for either the done or errc channel to be sent to -	select { -	case err = <-errc: -		t.Stop() -		_ = os.RemoveAll(d) -		// if the error is in preprocessing / wipeonly, chances are that it will never -		// complete, and will fill the ocrpage queue with parts which succeeded -		// on each run, so in that case it's better to delete the message from -		// the queue and notify us. -		if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { -			conn.Log("Deleting message from queue due to a bad error", fromQueue) -			err2 := conn.DelFromQueue(fromQueue, msg.Handle) -			if err2 != nil { -				conn.Log("Error deleting message from queue", err2) -			} -			ms, err2 := getMailSettings() -			if err2 != nil { -				conn.Log("Failed to mail settings ", err2) -			} -			if err2 == nil && ms.server != "" { -				logs, err2 := getlogs() -				if err2 != nil { -					conn.Log("Failed to get logs ", err2) -					logs = "" -				} -				msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" + -					"Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" + -					" Fail message: %s\r\nFull log:\r\n%s\r\n", -					ms.to, ms.from, bookname, err, logs) -				host := fmt.Sprintf("%s:%s", ms.server, ms.port) -				auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server) -				err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg)) -				if err2 != nil { -					conn.Log("Error sending email ", err2) -				} -			} -		} -		return err -	case <-done: -	} - -	if toQueue != "" && toQueue != conn.OCRPageQueueId() { -		conn.Log("Sending", bookname, "to queue", toQueue) -		err = conn.AddToQueue(toQueue, bookname) -		if err != nil { -			t.Stop() -			_ = os.RemoveAll(d) -			return fmt.Errorf("Error adding to queue %s: %s", bookname, err) -		} -	} - -	t.Stop() - -	// check whether we're using a newer msg handle -	select { -	case m, ok := <-msgc: -		if ok { -			msg = m -			conn.Log("Using new message handle to delete message from queue") -		} -	default: -		conn.Log("Using original message handle to delete message from queue") -	} - -	conn.Log("Deleting original message from queue", fromQueue) -	err = conn.DelFromQueue(fromQueue, msg.Handle) -	if err != nil { -		_ = os.RemoveAll(d) -		return fmt.Errorf("Error deleting message from queue: %s", err) -	} - -	err = os.RemoveAll(d) -	if err != nil { -		return fmt.Errorf("Failed to remove directory %s: %s", d, err) -	} - -	return nil -} -  func stopTimer(t *time.Timer) {  	if !t.Stop() {  		<-t.C  	}  } -// TODO: rather than relying on journald, would be nicer to save the logs -//       ourselves maybe, so that we weren't relying on a particular systemd -//       setup. this can be done by having the conn.Log also append line -//       to a file (though that would mean everything would have to go through -//       conn.Log, which we're not consistently doing yet). the correct thing -//       to do then would be to implement a new interface that covers the part -//       of log.Logger we use (e.g. Print and Printf), and then have an exported -//       conn struct that implements those, so that we could pass a log.Logger -//       or the new conn struct everywhere (we wouldn't be passing a log.Logger, -//       it's just good to be able to keep the compatibility) -func getlogs() (string, error) { -	cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all") -	var stdout, stderr bytes.Buffer -	cmd.Stdout = &stdout -	cmd.Stderr = &stderr -	err := cmd.Run() -	return stdout.String(), err -} - -func savelogs(conn Pipeliner, starttime int64, hostname string) error { -	logs, err := getlogs() -	if err != nil { -		return fmt.Errorf("Error getting logs, error: %v", err) +func resetTimer(t *time.Timer, d time.Duration) { +	if d > 0 { +		t.Reset(d)  	} -	key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname) -	path := filepath.Join(os.TempDir(), key) -	f, err := os.Create(path) -	if err != nil { -		return fmt.Errorf("Error creating log file", err) -	} -	defer f.Close() -	_, err = f.WriteString(logs) -	if err != nil { -		return fmt.Errorf("Error saving log file", err) -	} -	_ = f.Close() -	err = conn.Upload(conn.WIPStorageId(), key, path) -	if err != nil { -		return fmt.Errorf("Error uploading log", err) -	} -	conn.Log("Log saved to", key) -	return nil  }  func main() { @@ -770,7 +94,8 @@ func main() {  	nowipe := flag.Bool("nw", false, "disable wipeonly")  	noocrpg := flag.Bool("nop", false, "disable ocr on individual pages")  	noanalyse := flag.Bool("na", false, "disable analysis") -	autoshutdown := flag.Bool("shutdown", false, "automatically shut down if no work has been available for 5 minutes") +	autostop := flag.Int64("autostop", 300, "automatically stop process if no work has been available for this number of seconds (to disable autostop set to 0)") +	autoshutdown := flag.Bool("shutdown", false, "automatically shut down host computer if there has been no work to do for the duration set with -autostop")  	conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")  	flag.Usage = func() { @@ -801,17 +126,20 @@ func main() {  		log.Fatalln("Unknown connection type")  	} -	_, err := getMailSettings() -	if err != nil { -		conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err) +	var err error +	if *conntype != "local" { +		_, err = pipeline.GetMailSettings() +		if err != nil { +			conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err) +		}  	} -	conn.Log("Setting up AWS session") +	conn.Log("Setting up session")  	err = conn.Init()  	if err != nil { -		log.Fatalln("Error setting up cloud connection:", err) +		log.Fatalln("Error setting up connection:", err)  	} -	conn.Log("Finished setting up AWS session") +	conn.Log("Finished setting up session")  	starttime := time.Now().Unix()  	hostname, err := os.Hostname() @@ -820,7 +148,7 @@ func main() {  	var checkWipeQueue <-chan time.Time  	var checkOCRPageQueue <-chan time.Time  	var checkAnalyseQueue <-chan time.Time -	var shutdownIfQuiet *time.Timer +	var stopIfQuiet *time.Timer  	var savelognow *time.Ticker  	if !*nopreproc {  		checkPreQueue = time.After(0) @@ -834,13 +162,21 @@ func main() {  	if !*noanalyse {  		checkAnalyseQueue = time.After(0)  	} -	shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown) +	var quietTime = time.Duration(*autostop) * time.Second +	stopIfQuiet = time.NewTimer(quietTime) +	if quietTime == 0 { +		stopIfQuiet.Stop() +	} +  	savelognow = time.NewTicker(LogSaveTime) +	if *conntype == "local" { +		savelognow.Stop() +	}  	for {  		select {  		case <-checkPreQueue: -			msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatSeconds*2) +			msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs)  			checkPreQueue = time.After(PauseBetweenChecks)  			if err != nil {  				conn.Log("Error checking preprocess queue", err) @@ -851,14 +187,14 @@ func main() {  				continue  			}  			conn.Log("Message received on preprocess queue, processing", msg.Body) -			stopTimer(shutdownIfQuiet) -			err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) -			shutdownIfQuiet.Reset(TimeBeforeShutdown) +			stopTimer(stopIfQuiet) +			err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) +			resetTimer(stopIfQuiet, quietTime)  			if err != nil {  				conn.Log("Error during preprocess", err)  			}  		case <-checkWipeQueue: -			msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatSeconds*2) +			msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs)  			checkWipeQueue = time.After(PauseBetweenChecks)  			if err != nil {  				conn.Log("Error checking wipeonly queue", err) @@ -868,15 +204,15 @@ func main() {  				conn.Log("No message received on wipeonly queue, sleeping")  				continue  			} -			stopTimer(shutdownIfQuiet) +			stopTimer(stopIfQuiet)  			conn.Log("Message received on wipeonly queue, processing", msg.Body) -			err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) -			shutdownIfQuiet.Reset(TimeBeforeShutdown) +			err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) +			resetTimer(stopIfQuiet, quietTime)  			if err != nil {  				conn.Log("Error during wipe", err)  			}  		case <-checkOCRPageQueue: -			msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatSeconds*2) +			msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs)  			checkOCRPageQueue = time.After(PauseBetweenChecks)  			if err != nil {  				conn.Log("Error checking OCR Page queue", err) @@ -888,15 +224,15 @@ func main() {  			// Have OCRPageQueue checked immediately after completion, as chances are high that  			// there will be more pages that should be done without delay  			checkOCRPageQueue = time.After(0) -			stopTimer(shutdownIfQuiet) +			stopTimer(stopIfQuiet)  			conn.Log("Message received on OCR Page queue, processing", msg.Body) -			err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) -			shutdownIfQuiet.Reset(TimeBeforeShutdown) +			err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training, ""), conn.OCRPageQueueId(), conn.AnalyseQueueId()) +			resetTimer(stopIfQuiet, quietTime)  			if err != nil {  				conn.Log("Error during OCR Page process", err)  			}  		case <-checkAnalyseQueue: -			msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatSeconds*2) +			msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs)  			checkAnalyseQueue = time.After(PauseBetweenChecks)  			if err != nil {  				conn.Log("Error checking analyse queue", err) @@ -906,25 +242,30 @@ func main() {  				conn.Log("No message received on analyse queue, sleeping")  				continue  			} -			stopTimer(shutdownIfQuiet) +			stopTimer(stopIfQuiet)  			conn.Log("Message received on analyse queue, processing", msg.Body) -			err = processBook(msg, conn, analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") -			shutdownIfQuiet.Reset(TimeBeforeShutdown) +			err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") +			resetTimer(stopIfQuiet, quietTime)  			if err != nil {  				conn.Log("Error during analysis", err)  			}  		case <-savelognow.C:  			conn.Log("Saving logs") -			err = savelogs(conn, starttime, hostname) +			err = pipeline.SaveLogs(conn, starttime, hostname)  			if err != nil {  				conn.Log("Error saving logs", err)  			} -		case <-shutdownIfQuiet.C: -			if !*autoshutdown { +		case <-stopIfQuiet.C: +			if quietTime == 0 {  				continue  			} +			if !*autoshutdown { +				conn.Log("Stopping pipeline") +				_ = pipeline.SaveLogs(conn, starttime, hostname) +				return +			}  			conn.Log("Shutting down") -			_ = savelogs(conn, starttime, hostname) +			_ = pipeline.SaveLogs(conn, starttime, hostname)  			cmd := exec.Command("sudo", "systemctl", "poweroff")  			var stdout, stderr bytes.Buffer  			cmd.Stdout = &stdout diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go index 60d1f81..b4f4d99 100644 --- a/cmd/booktopipeline/main.go +++ b/cmd/booktopipeline/main.go @@ -9,14 +9,13 @@ package main  import (  	"flag"  	"fmt" -	"image" -	_ "image/png" -	_ "image/jpeg"  	"log"  	"os"  	"path/filepath"  	"rescribe.xyz/bookpipeline" + +	"rescribe.xyz/bookpipeline/internal/pipeline"  )  const usage = `Usage: booktopipeline [-c conn] [-t training] [-prebinarised] [-notbinarised] [-v] bookdir [bookname] @@ -32,15 +31,6 @@ using the flags -prebinarised (for the wipeonly queue) or  If bookname is omitted the last part of the bookdir is used.  ` -type Pipeliner interface { -	Init() error -	PreQueueId() string -	WipeQueueId() string -	WIPStorageId() string -	AddToQueue(url string, msg string) error -	Upload(bucket string, key string, path string) error -} -  // null writer to enable non-verbose logging to be discarded  type NullWriter bool @@ -50,18 +40,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) {  var verboselog *log.Logger -type fileWalk chan string - -func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { -	if err != nil { -		return err -	} -	if !info.IsDir() { -		f <- path -	} -	return nil -} -  func main() {  	verbose := flag.Bool("v", false, "Verbose")  	conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") @@ -94,7 +72,7 @@ func main() {  		verboselog = log.New(n, "", log.LstdFlags)  	} -	var conn Pipeliner +	var conn pipeline.Pipeliner  	switch *conntype {  	case "aws":  		conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} @@ -108,18 +86,7 @@ func main() {  		log.Fatalln("Failed to set up cloud connection:", err)  	} -	qid := conn.PreQueueId() - -	// Auto detect type of queue to send to based on file extension -	pngdirs, _ := filepath.Glob(bookdir + "/*.png") -	jpgdirs, _ := filepath.Glob(bookdir + "/*.jpg") -	pngcount := len(pngdirs) -	jpgcount := len(jpgdirs) -	if pngcount > jpgcount { -		qid = conn.WipeQueueId() -	} else { -		qid = conn.PreQueueId() -	} +	qid := pipeline.DetectQueueType(bookdir, conn)  	// Flags set override the queue selection  	if *wipeonly { @@ -130,43 +97,24 @@ func main() {  	}  	verboselog.Println("Checking that all images are valid in", bookdir) -	checker := make(fileWalk) -	go func() { -		err = filepath.Walk(bookdir, checker.Walk) -		if err != nil { -			log.Fatalln("Filesystem walk failed:", err) -		} -		close(checker) -	}() - -	for path := range checker { -		f, err := os.Open(path) -		if err != nil { -			log.Fatalln("Opening image %s failed, bailing: %v", path, err) -		} -		_, _, err = image.Decode(f) -		if err != nil { -			log.Fatalf("Decoding image %s failed, bailing: %v", path, err) -		} +	err = pipeline.CheckImages(bookdir) +	if err != nil { +		log.Fatalln(err)  	} -	verboselog.Println("Walking", bookdir) -	walker := make(fileWalk) -	go func() { -		err = filepath.Walk(bookdir, walker.Walk) -		if err != nil { -			log.Fatalln("Filesystem walk failed:", err) -		} -		close(walker) -	}() - -	for path := range walker { -		verboselog.Println("Uploading", path) -		name := filepath.Base(path) -		err = conn.Upload(conn.WIPStorageId(), filepath.Join(bookname, name), path) -		if err != nil { -			log.Fatalln("Failed to upload", path, err) -		} +	verboselog.Println("Checking that a book hasn't already been uploaded with that name") +	list, err := conn.ListObjects(conn.WIPStorageId(), bookname) +	if err != nil { +		log.Fatalln(err) +	} +	if len(list) > 0 { +		log.Fatalf("Error: There is already a book in S3 named %s", bookname) +	} + +	verboselog.Println("Uploading all images are valid in", bookdir) +	err = pipeline.UploadImages(bookdir, bookname, conn) +	if err != nil { +		log.Fatalln(err)  	}  	if *training != "" { diff --git a/cmd/getbests/main.go b/cmd/getbests/main.go index 9eca0d8..c1ee50d 100644 --- a/cmd/getbests/main.go +++ b/cmd/getbests/main.go @@ -62,8 +62,8 @@ func main() {  	log.Println("Downloading all best files found")  	for _, i := range objs {  		parts := strings.Split(i, "/") -		if parts[len(parts) - 1] == "best" { -			err = conn.Download(conn.WIPStorageId(), i, parts[0] + "-best") +		if parts[len(parts)-1] == "best" { +			err = conn.Download(conn.WIPStorageId(), i, parts[0]+"-best")  			if err != nil {  				log.Fatalln("Failed to download file", i, err)  			} diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go index 03e709b..5116414 100644 --- a/cmd/getpipelinebook/main.go +++ b/cmd/getpipelinebook/main.go @@ -6,15 +6,15 @@  package main  import ( -	"bufio"  	"flag"  	"fmt"  	"log"  	"os"  	"path/filepath" -	"strings"  	"rescribe.xyz/bookpipeline" + +	"rescribe.xyz/bookpipeline/internal/pipeline"  )  const usage = `Usage: getpipelinebook [-c conn] [-a] [-graph] [-pdf] [-png] [-v] bookname @@ -33,28 +33,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) {  	return len(p), nil  } -type Pipeliner interface { -	MinimalInit() error -	ListObjects(bucket string, prefix string) ([]string, error) -	Download(bucket string, key string, fn string) error -	Upload(bucket string, key string, path string) error -	CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) -	AddToQueue(url string, msg string) error -	DelFromQueue(url string, handle string) error -	WIPStorageId() string -} - -func getpdfs(conn Pipeliner, l *log.Logger, bookname string) { -	for _, suffix := range []string{".colour.pdf", ".binarised.pdf"} { -		fn := filepath.Join(bookname, bookname+suffix) -		l.Println("Downloading PDF", fn) -		err := conn.Download(conn.WIPStorageId(), fn, fn) -		if err != nil { -			log.Printf("Failed to download %s: %s\n", fn, err) -		} -	} -} -  func main() {  	all := flag.Bool("a", false, "Get all files for book")  	conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") @@ -83,7 +61,7 @@ func main() {  		verboselog = log.New(n, "", log.LstdFlags)  	} -	var conn Pipeliner +	var conn pipeline.MinPipeliner  	switch *conntype {  	case "aws":  		conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} @@ -109,18 +87,10 @@ func main() {  	if *all {  		verboselog.Println("Downloading all files for", bookname) -		objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) +		err = pipeline.DownloadAll(bookname, conn)  		if err != nil { -			log.Fatalln("Failed to get list of files for book", bookname, err) -		} -		for _, i := range objs { -			verboselog.Println("Downloading", i) -			err = conn.Download(conn.WIPStorageId(), i, i) -			if err != nil { -				log.Fatalln("Failed to download file", i, err) -			} +			log.Fatalln(err)  		} -		return  	}  	if *binarisedpdf { @@ -151,61 +121,29 @@ func main() {  	}  	if *pdf { -		getpdfs(conn, verboselog, bookname) +		verboselog.Println("Downloading PDFs") +		pipeline.DownloadPdfs(bookname, conn)  	}  	if *binarisedpdf || *colourpdf || *graph || *pdf {  		return  	} -	verboselog.Println("Downloading best file") -	fn := filepath.Join(bookname, "best") -	err = conn.Download(conn.WIPStorageId(), fn, fn) +	verboselog.Println("Downloading best pages") +	err = pipeline.DownloadBestPages(bookname, conn, *png)  	if err != nil { -		log.Fatalln("Failed to download 'best' file", err) -	} -	f, err := os.Open(fn) -	if err != nil { -		log.Fatalln("Failed to open best file", err) -	} -	defer f.Close() - -	if *png { -		verboselog.Println("Downloading png files") -		s := bufio.NewScanner(f) -		for s.Scan() { -			txtfn := filepath.Join(bookname, s.Text()) -			fn = strings.Replace(txtfn, ".hocr", ".png", 1) -			verboselog.Println("Downloading file", fn) -			err = conn.Download(conn.WIPStorageId(), fn, fn) -			if err != nil { -				log.Fatalln("Failed to download file", fn, err) -			} -		} -		return +		log.Fatalln(err)  	} -	verboselog.Println("Downloading HOCR files") -	s := bufio.NewScanner(f) -	for s.Scan() { -		fn = filepath.Join(bookname, s.Text()) -		verboselog.Println("Downloading file", fn) -		err = conn.Download(conn.WIPStorageId(), fn, fn) -		if err != nil { -			log.Fatalln("Failed to download file", fn, err) -		} +	verboselog.Println("Downloading PDFs") +	pipeline.DownloadPdfs(bookname, conn) +	if err != nil { +		log.Fatalln(err)  	} -	verboselog.Println("Downloading PDF files") -	getpdfs(conn, verboselog, bookname) - -	verboselog.Println("Downloading analysis files") -	for _, a := range []string{"conf", "graph.png"} { -		fn = filepath.Join(bookname, a) -		verboselog.Println("Downloading file", fn) -		err = conn.Download(conn.WIPStorageId(), fn, fn) -		if err != nil { -			log.Fatalln("Failed to download file", fn, err) -		} +	verboselog.Println("Downloading analyses") +	err = pipeline.DownloadAnalyses(bookname, conn) +	if err != nil { +		log.Fatalln(err)  	}  } diff --git a/cmd/logwholequeue/main.go b/cmd/logwholequeue/main.go new file mode 100644 index 0000000..71e8927 --- /dev/null +++ b/cmd/logwholequeue/main.go @@ -0,0 +1,85 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// logwholequeue gets all messages in a queue. This can be useful +// for debugging queue issues. +package main + +import ( +	"flag" +	"fmt" +	"log" + +	"rescribe.xyz/bookpipeline" +) + +const usage = `Usage: logwholequeue qname + +logwholequeue gets all messages in a queue. + +This can be useful for debugging queue issues. + +Valid queue names: +- preprocess +- wipeonly +- ocrpage +- analyse +` + +type QueuePipeliner interface { +	Init() error +	LogQueue(url string) error +	PreQueueId() string +	WipeQueueId() string +	OCRPageQueueId() string +	AnalyseQueueId() string +} + +func main() { +	flag.Usage = func() { +		fmt.Fprintf(flag.CommandLine.Output(), usage) +		flag.PrintDefaults() +	} +	flag.Parse() + +	if flag.NArg() != 1 { +		flag.Usage() +		return +	} + +	var conn QueuePipeliner +	conn = &bookpipeline.AwsConn{Region: "eu-west-2"} + +	err := conn.Init() +	if err != nil { +		log.Fatalln("Error setting up cloud connection:", err) +	} + +	qdetails := []struct { +		id, name string +	}{ +		{conn.PreQueueId(), "preprocess"}, +		{conn.WipeQueueId(), "wipeonly"}, +		{conn.OCRPageQueueId(), "ocrpage"}, +		{conn.AnalyseQueueId(), "analyse"}, +	} + +	qname := flag.Arg(0) + +	var qid string +	for i, n := range qdetails { +		if n.name == qname { +			qid = qdetails[i].id +			break +		} +	} +	if qid == "" { +		log.Fatalln("Error, no queue named", qname) +	} + +	err = conn.LogQueue(qid) +	if err != nil { +		log.Fatalln("Error getting queue", qname, ":", err) +	} +} diff --git a/cmd/postprocess-bythresh/main.go b/cmd/postprocess-bythresh/main.go index 37b77e7..5bdb839 100644 --- a/cmd/postprocess-bythresh/main.go +++ b/cmd/postprocess-bythresh/main.go @@ -19,7 +19,6 @@ import (  //TO DO: make writetofile return an error and handle that accordingly  // potential TO DO: add text versions where footer is cropped on odd/even pages only -  // the trimblanks function trims the blank lines from a text input  func trimblanks(hocrfile string) string { @@ -50,7 +49,7 @@ func dehyphenateString(in string) string {  		words := strings.Split(line, " ")  		last := words[len(words)-1]  		// the - 2 here is to account for a trailing newline and counting from zero -		if len(last) > 0 && last[len(last) - 1] == '-' && i < len(lines) - 2 { +		if len(last) > 0 && last[len(last)-1] == '-' && i < len(lines)-2 {  			nextwords := strings.Split(lines[i+1], " ")  			if len(nextwords) > 0 {  				line = line[0:len(line)-1] + nextwords[0] @@ -66,17 +65,15 @@ func dehyphenateString(in string) string {  	return strings.Join(newlines, " ")  } -  // the fullcrop function takes a text input and crops the first and the last line (if text is at least 2 lines long)  func fullcrop(noblanks string) string { -  	alllines := strings.Split(noblanks, "\n") -	 +  	if len(alllines) <= 2 { -	return "" -	}	else { -	return strings.Join(alllines[1:len(alllines)-2], "\n") +		return "" +	} else { +		return strings.Join(alllines[1:len(alllines)-2], "\n")  	}  } @@ -132,7 +129,6 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string,  	var killheadtxt string  	var footkilltxt string -  	hocrfilepath := filepath.Join(bookdirectory, hocrfilename)  	confpath := filepath.Join(bookdirectory, "conf") @@ -165,18 +161,16 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string,  		if err != nil {  			log.Fatal(err)  		} -		 -		 +  		trimbest := trimblanks(hocrfiletext) -		 +  		alltxt = dehyphenateString(trimbest) -			 +  		croptxt = dehyphenateString(fullcrop(trimbest)) -	 +  		killheadtxt = dehyphenateString(headcrop(trimbest)) -		 +  		footkilltxt = dehyphenateString(footcrop(trimbest)) -		  	}  	return alltxt, croptxt, killheadtxt, footkilltxt @@ -185,7 +179,7 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string,  // the writetofile function takes a directory, filename and text input and creates a text file within the bookdirectory from them.  func writetofile(bookdirectory, textfilebase, txt string) error {  	alltxtfile := filepath.Join(bookdirectory, textfilebase) -	 +  	file, err := os.Create(alltxtfile)  	if err != nil {  		return fmt.Errorf("Error opening file %s: %v", alltxtfile, err) @@ -194,7 +188,7 @@ func writetofile(bookdirectory, textfilebase, txt string) error {  	if _, err := file.WriteString(txt); err != nil {  		log.Println(err)  	} -return err +	return err  } @@ -215,7 +209,7 @@ func main() {  	bookdirectory := flag.Arg(0)  	confthreshstring := strconv.Itoa(*confthresh) -	 +  	fmt.Println("Postprocessing", bookdirectory, "with threshold", *confthresh)  	bestpath := filepath.Join(bookdirectory, "best") @@ -239,32 +233,31 @@ func main() {  			crop = crop + " " + croptxt  			killhead = killhead + " " + killheadtxt  			killfoot = killfoot + " " + footkilltxt -		 +  		}  	} -	 -	 -	bookname:= filepath.Base(bookdirectory) -		b := bookname + "_" + confthreshstring -		err1 := writetofile(bookdirectory, b + "_all.txt", all) -		if err1 != nil { +	bookname := filepath.Base(bookdirectory) +	b := bookname + "_" + confthreshstring + +	err1 := writetofile(bookdirectory, b+"_all.txt", all) +	if err1 != nil {  		log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err1) -		} -		 -		err2 := writetofile(bookdirectory, b + "_crop.txt", crop) -		if err2 != nil { +	} + +	err2 := writetofile(bookdirectory, b+"_crop.txt", crop) +	if err2 != nil {  		log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err2) -		} -		 -		err3 := writetofile(bookdirectory, b + "_nohead.txt", killhead) -		if err3 != nil { +	} + +	err3 := writetofile(bookdirectory, b+"_nohead.txt", killhead) +	if err3 != nil {  		log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err3) -		} -		 -		err4 := writetofile(bookdirectory, b + "_nofoot.txt", killfoot) -		if err4 != nil { +	} + +	err4 := writetofile(bookdirectory, b+"_nofoot.txt", killfoot) +	if err4 != nil {  		log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err4) -		} +	}  } diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go new file mode 100644 index 0000000..f4489d8 --- /dev/null +++ b/cmd/rescribe/main.go @@ -0,0 +1,389 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// rescribe is a modification of bookpipeline designed for local-only +// operation, which rolls uploading, processing, and downloading of +// a single book by the pipeline into one command. +package main + +import ( +	"flag" +	"fmt" +	"io/ioutil" +	"log" +	"os" +	"os/exec" +	"path/filepath" +	"regexp" +	"runtime" +	"strings" +	"time" + +	"rescribe.xyz/bookpipeline" +	"rescribe.xyz/utils/pkg/hocr" + +	"rescribe.xyz/bookpipeline/internal/pipeline" +) + +const usage = `Usage: rescribe [-v] [-t training] bookdir [savedir] + +Process and OCR a book using the Rescribe pipeline on a local machine. +` + +const QueueTimeoutSecs = 2 * 60 +const PauseBetweenChecks = 1 * time.Second +const LogSaveTime = 1 * time.Minute +var thresholds = []float64{0.1, 0.2, 0.3} + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { +	return len(p), nil +} + +type Clouder interface { +	Init() error +	ListObjects(bucket string, prefix string) ([]string, error) +	Download(bucket string, key string, fn string) error +	Upload(bucket string, key string, path string) error +	CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) +	AddToQueue(url string, msg string) error +	DelFromQueue(url string, handle string) error +	QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) +} + +type Pipeliner interface { +	Clouder +	PreQueueId() string +	WipeQueueId() string +	OCRPageQueueId() string +	AnalyseQueueId() string +	WIPStorageId() string +	GetLogger() *log.Logger +	Log(v ...interface{}) +} + +func stopTimer(t *time.Timer) { +	if !t.Stop() { +		<-t.C +	} +} + +func resetTimer(t *time.Timer, d time.Duration) { +	if d > 0 { +		t.Reset(d) +	} +} + +func main() { +	deftesscmd := "tesseract" +	if runtime.GOOS == "windows" { +		deftesscmd = "C:\\Program Files\\Tesseract-OCR\\tesseract.exe" +	} + +	verbose := flag.Bool("v", false, "verbose") +	training := flag.String("t", "training/rescribev7_fast.traineddata", "path to the tesseract training file to use") +	tesscmd := flag.String("tesscmd", deftesscmd, "The Tesseract executable to run. You may need to set this to the full path of Tesseract.exe if you're on Windows.") + +	flag.Usage = func() { +		fmt.Fprintf(flag.CommandLine.Output(), usage) +		flag.PrintDefaults() +	} +	flag.Parse() + +	if flag.NArg() < 1 || flag.NArg() > 3 { +		flag.Usage() +		return +	} + +	bookdir := flag.Arg(0) +	var bookname string +	if flag.NArg() > 1 { +		bookname = flag.Arg(1) +	} else { +		bookname = filepath.Base(bookdir) +	} + +	var verboselog *log.Logger +	if *verbose { +		verboselog = log.New(os.Stdout, "", 0) +	} else { +		var n NullWriter +		verboselog = log.New(n, "", 0) +	} + +	f, err := os.Open(*training) +	if err != nil { +		fmt.Fprintf(os.Stderr, "Error: Training file %s could not be opened.\n", *training) +		fmt.Fprintf(os.Stderr, "Set the `-t` flag with path to a tesseract .traineddata file.\n") +		os.Exit(1) +	} +	f.Close() + +	abstraining, err := filepath.Abs(*training) +	if err != nil { +		log.Fatalf("Error getting absolute path of training %s: %v", err) +	} +	tessPrefix, trainingName := filepath.Split(abstraining) +	trainingName = strings.TrimSuffix(trainingName, ".traineddata") +	err = os.Setenv("TESSDATA_PREFIX", tessPrefix) +	if err != nil { +		log.Fatalln("Error setting TESSDATA_PREFIX:", err) +	} + +	_, err = exec.Command(*tesscmd, "--help").Output() +	if err != nil { +		fmt.Fprintf(os.Stderr, "Error: Can't run Tesseract.\n") +		fmt.Fprintf(os.Stderr, "Ensure that Tesseract is installed and available.\n") +		fmt.Fprintf(os.Stderr, "You may need to -tesscmd to the full path of Tesseract.exe if you're on Windows, like this:\n") +		fmt.Fprintf(os.Stderr, "  rescribe -tesscmd 'C:\\Program Files\\Tesseract OCR (x86)\\tesseract.exe' ...\n") +		os.Exit(1) +	} + +	tempdir, err := ioutil.TempDir("", "bookpipeline") +	if err != nil { +		log.Fatalln("Error setting up temporary directory:", err) +	} + +	var conn Pipeliner +	conn = &bookpipeline.LocalConn{Logger: verboselog, TempDir: tempdir} + +	conn.Log("Setting up session") +	err = conn.Init() +	if err != nil { +		log.Fatalln("Error setting up connection:", err) +	} +	conn.Log("Finished setting up session") + +	fmt.Printf("Copying book to pipeline\n") + +	err = uploadbook(bookdir, bookname, conn) +	if err != nil { +		_ = os.RemoveAll(tempdir) +		log.Fatalln(err) +	} + +	fmt.Printf("Processing book\n") +	err = processbook(trainingName, *tesscmd, conn) +	if err != nil { +		_ = os.RemoveAll(tempdir) +		log.Fatalln(err) +	} + +	fmt.Printf("Saving finished book to %s\n", bookname) +	err = downloadbook(bookname, conn) +	if err != nil { +		_ = os.RemoveAll(tempdir) +		log.Fatalln(err) +	} + +	err = os.RemoveAll(tempdir) +	if err != nil { +		log.Fatalf("Error removing temporary directory %s: %v", tempdir, err) +	} + +	hocrs, err := filepath.Glob(fmt.Sprintf("%s/*hocr", bookname)) +	if err != nil { +		log.Fatalf("Error looking for .hocr files: %v", err) +	} + +	for _, v := range hocrs { +		err = addTxtVersion(v) +		if err != nil { +			log.Fatalf("Error creating txt version of %s: %v", v, err) +		} + +		err = os.MkdirAll(filepath.Join(bookname, "hocr"), 0755) +		if err != nil { +			log.Fatalf("Error creating hocr directory: %v", err) +		} + +		err = os.Rename(v, filepath.Join(bookname, "hocr", filepath.Base(v))) +		if err != nil { +			log.Fatalf("Error moving hocr %s to hocr directory: %v", v, err) +		} +	} + +	// For simplicity, remove .binarised.pdf and rename .colour.pdf to .pdf +	_ = os.Remove(filepath.Join(bookname, bookname + ".binarised.pdf")) +	_ = os.Rename(filepath.Join(bookname, bookname + ".colour.pdf"), filepath.Join(bookname, bookname + ".pdf")) +} + +func addTxtVersion(hocrfn string) error { +	dir := filepath.Dir(hocrfn) +	err := os.MkdirAll(filepath.Join(dir, "text"), 0755) +	if err != nil { +		log.Fatalf("Error creating text directory: %v", err) +	} + +	t, err := hocr.GetText(hocrfn) +	if err != nil { +		return fmt.Errorf("Error getting text from hocr file %s: %v", hocrfn, err) +	} + +	basefn := filepath.Base(hocrfn) +	for _, v := range thresholds { +		basefn = strings.TrimSuffix(basefn, fmt.Sprintf("_bin%.1f.hocr", v)) +	} +	fn := filepath.Join(dir, "text", basefn + ".txt") + +	err = ioutil.WriteFile(fn, []byte(t), 0644) +	if err != nil { +		return fmt.Errorf("Error creating text file %s: %v", fn, err) +	} + +	return nil +} + +func uploadbook(dir string, name string, conn Pipeliner) error { +	err := pipeline.CheckImages(dir) +	if err != nil { +		return fmt.Errorf("Error with images in %s: %v", dir, err) +	} +	err = pipeline.UploadImages(dir, name, conn) +	if err != nil { +		return fmt.Errorf("Error saving images to process from %s: %v", dir, err) +	} + +	qid := pipeline.DetectQueueType(dir, conn) + +	err = conn.AddToQueue(qid, name) +	if err != nil { +		return fmt.Errorf("Error adding book job to queue %s: %v", qid, err) +	} + +	return nil +} + +func downloadbook(name string, conn Pipeliner) error { +	err := os.MkdirAll(name, 0755) +	if err != nil { +		log.Fatalln("Failed to create directory", name, err) +	} + +	err = pipeline.DownloadBestPages(name, conn, false) +	if err != nil { +		return fmt.Errorf("Error downloading best pages: %v", err) +	} + +	err = pipeline.DownloadPdfs(name, conn) +	if err != nil { +		return fmt.Errorf("Error downloading PDFs: %v", err) +	} + +	err = pipeline.DownloadAnalyses(name, conn) +	if err != nil { +		return fmt.Errorf("Error downloading analyses: %v", err) +	} + +	return nil +} + +func processbook(training string, tesscmd string, conn Pipeliner) error { +	origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) +	wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`) +	ocredPattern := regexp.MustCompile(`.hocr$`) + +	var checkPreQueue <-chan time.Time +	var checkWipeQueue <-chan time.Time +	var checkOCRPageQueue <-chan time.Time +	var checkAnalyseQueue <-chan time.Time +	var stopIfQuiet *time.Timer +	checkPreQueue = time.After(0) +	checkWipeQueue = time.After(0) +	checkOCRPageQueue = time.After(0) +	checkAnalyseQueue = time.After(0) +	var quietTime = 1 * time.Second +	stopIfQuiet = time.NewTimer(quietTime) +	if quietTime == 0 { +		stopIfQuiet.Stop() +	} + +	for { +		select { +		case <-checkPreQueue: +			msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs) +			checkPreQueue = time.After(PauseBetweenChecks) +			if err != nil { +				return fmt.Errorf("Error checking preprocess queue: %v", err) +			} +			if msg.Handle == "" { +				conn.Log("No message received on preprocess queue, sleeping") +				continue +			} +			stopTimer(stopIfQuiet) +			conn.Log("Message received on preprocess queue, processing", msg.Body) +			fmt.Printf("  Preprocessing book (binarising and wiping)\n") +			err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess(thresholds), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) +			fmt.Printf("  OCRing pages ") // this is expected to be added to with dots by OCRPage output +			resetTimer(stopIfQuiet, quietTime) +			if err != nil { +				return fmt.Errorf("Error during preprocess: %v", err) +			} +		case <-checkWipeQueue: +			msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) +			checkWipeQueue = time.After(PauseBetweenChecks) +			if err != nil { +				return fmt.Errorf("Error checking wipeonly queue, %v", err) +			} +			if msg.Handle == "" { +				conn.Log("No message received on wipeonly queue, sleeping") +				continue +			} +			stopTimer(stopIfQuiet) +			conn.Log("Message received on wipeonly queue, processing", msg.Body) +			fmt.Printf("  Preprocessing book (wiping only)\n") +			err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) +			fmt.Printf("  OCRing pages ") // this is expected to be added to with dots by OCRPage output +			resetTimer(stopIfQuiet, quietTime) +			if err != nil { +				return fmt.Errorf("Error during wipe: %v", err) +			} +		case <-checkOCRPageQueue: +			msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs) +			checkOCRPageQueue = time.After(PauseBetweenChecks) +			if err != nil { +				return fmt.Errorf("Error checking OCR Page queue: %v", err) +			} +			if msg.Handle == "" { +				continue +			} +			// Have OCRPageQueue checked immediately after completion, as chances are high that +			// there will be more pages that should be done without delay +			checkOCRPageQueue = time.After(0) +			stopTimer(stopIfQuiet) +			conn.Log("Message received on OCR Page queue, processing", msg.Body) +			fmt.Printf(".") +			err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training, tesscmd), conn.OCRPageQueueId(), conn.AnalyseQueueId()) +			resetTimer(stopIfQuiet, quietTime) +			if err != nil { +				return fmt.Errorf("\nError during OCR Page process: %v", err) +			} +		case <-checkAnalyseQueue: +			msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs) +			checkAnalyseQueue = time.After(PauseBetweenChecks) +			if err != nil { +				return fmt.Errorf("Error checking analyse queue: %v", err) +			} +			if msg.Handle == "" { +				conn.Log("No message received on analyse queue, sleeping") +				continue +			} +			stopTimer(stopIfQuiet) +			conn.Log("Message received on analyse queue, processing", msg.Body) +			fmt.Printf("\n  Analysing OCR and compiling PDFs\n") +			err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") +			resetTimer(stopIfQuiet, quietTime) +			if err != nil { +				return fmt.Errorf("Error during analysis: %v", err) +			} +		case <-stopIfQuiet.C: +			conn.Log("Processing finished") +			return nil +		} +	} + +	return fmt.Errorf("Ended unexpectedly") // should never be reached +} diff --git a/cmd/trimqueue/main.go b/cmd/trimqueue/main.go new file mode 100644 index 0000000..cf65c4d --- /dev/null +++ b/cmd/trimqueue/main.go @@ -0,0 +1,84 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// trimqueue deletes any messages in a queue that match a specified +// prefix. +package main + +import ( +	"flag" +	"fmt" +	"log" + +	"rescribe.xyz/bookpipeline" +) + +const usage = `Usage: trimprefix qname prefix + +trimqueue deletes any messages in a queue that match a specified +prefix. + +Valid queue names: +- preprocess +- wipeonly +- ocrpage +- analyse +` + +type QueuePipeliner interface { +	Init() error +	RemovePrefixesFromQueue(url string, prefix string) error +	PreQueueId() string +	WipeQueueId() string +	OCRPageQueueId() string +	AnalyseQueueId() string +} + +func main() { +	flag.Usage = func() { +		fmt.Fprintf(flag.CommandLine.Output(), usage) +		flag.PrintDefaults() +	} +	flag.Parse() + +	if flag.NArg() != 2 { +		flag.Usage() +		return +	} + +	var conn QueuePipeliner +	conn = &bookpipeline.AwsConn{Region: "eu-west-2"} + +	err := conn.Init() +	if err != nil { +		log.Fatalln("Error setting up cloud connection:", err) +	} + +	qdetails := []struct { +		id, name string +	}{ +		{conn.PreQueueId(), "preprocess"}, +		{conn.WipeQueueId(), "wipeonly"}, +		{conn.OCRPageQueueId(), "ocrpage"}, +		{conn.AnalyseQueueId(), "analyse"}, +	} + +	qname := flag.Arg(0) + +	var qid string +	for i, n := range qdetails { +		if n.name == qname { +			qid = qdetails[i].id +			break +		} +	} +	if qid == "" { +		log.Fatalln("Error, no queue named", qname) +	} + +	err = conn.RemovePrefixesFromQueue(qid, flag.Arg(1)) +	if err != nil { +		log.Fatalln("Error removing prefixes from queue", qname, ":", err) +	} +} @@ -168,5 +168,19 @@ At present the bookpipeline has some silly limitations of file names for book  pages to be recognised. This is something which will be fixed in due course.    Pages that are to be fully processed: *[0-9]{4}.jpg$    Pages that are to be wiped only: *[0-9]{6}(.bin)?.png$ + +Local operation + +While bookpipeline was built with cloud based operation in mind, there is also +a local mode that can be used to run OCR jobs from a single computer, with all +the benefits of preprocessing, choosing the best threshold for each image, +graph creation, PDF creation, and so on that the pipeline provides. + +Several of the commands accept a `-c local` flag for local operation, but now +there is also a new command, named rescribe, that is designed to make things +much simpler for people just wanting to do some OCR on their local computer. + +Note that the local mode is not as well tested as the core cloud modes; please +report any bugs you find with it.  */  package bookpipeline @@ -6,8 +6,8 @@ require (  	github.com/aws/aws-sdk-go v1.30.5  	github.com/blend/go-sdk v2.0.0+incompatible // indirect  	github.com/davecgh/go-spew v1.1.1 // indirect -	github.com/jung-kurt/gofpdf v1.16.2  	github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect +	github.com/phpdave11/gofpdf v1.4.2  	github.com/wcharczuk/go-chart v2.0.2-0.20191206192251-962b9abdec2b+incompatible  	golang.org/x/image v0.0.0-20200618115811-c13761719519  	golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect @@ -17,8 +17,6 @@ github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGw  github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc=  github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=  github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= -github.com/jung-kurt/gofpdf v1.16.2 h1:jgbatWHfRlPYiK85qgevsZTHviWXKwB1TTiKdz5PtRc= -github.com/jung-kurt/gofpdf v1.16.2/go.mod h1:1hl7y57EsiPAkLbOwzpzqgx1A30nQCk/YmFV8S2vmK0=  github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=  github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=  github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -26,7 +24,9 @@ github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=  github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=  github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=  github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= -github.com/phpdave11/gofpdi v1.0.7/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= +github.com/phpdave11/gofpdf v1.4.2 h1:KPKiIbfwbvC/wOncwhrpRdXVj2CZTCFlw4wnoyjtHfQ= +github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY= +github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=  github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=  github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=  github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/internal/pipeline/get.go b/internal/pipeline/get.go new file mode 100644 index 0000000..6949062 --- /dev/null +++ b/internal/pipeline/get.go @@ -0,0 +1,89 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +package pipeline + +import ( +	"bufio" +	"fmt" +	"os" +	"path/filepath" +	"strings" +) + +func DownloadBestPages(name string, conn Pipeliner, pluspngs bool) error { +	fn := filepath.Join(name, "best") +	err := conn.Download(conn.WIPStorageId(), fn, fn) +	if err != nil { +		return fmt.Errorf("Failed to download 'best' file: %v", err) +	} +	f, err := os.Open(fn) +	if err != nil { +		return fmt.Errorf("Failed to open best file: %v", err) +	} +	defer f.Close() + +	s := bufio.NewScanner(f) +	for s.Scan() { +		fn = filepath.Join(name, s.Text()) +		conn.Log("Downloading file", fn) +		err = conn.Download(conn.WIPStorageId(), fn, fn) +		if err != nil { +			return fmt.Errorf("Failed to download file %s: %v", fn, err) +		} +	} + +	if !pluspngs { +		return nil +	} + +	s = bufio.NewScanner(f) +	for s.Scan() { +		txtfn := filepath.Join(name, s.Text()) +		fn = strings.Replace(txtfn, ".hocr", ".png", 1) +		conn.Log("Downloading file", fn) +		err = conn.Download(conn.WIPStorageId(), fn, fn) +		if err != nil { +			return fmt.Errorf("Failed to download file", fn, err) +		} +	} +	return nil +} + +func DownloadPdfs(name string, conn Pipeliner) error { +	for _, suffix := range []string{".colour.pdf", ".binarised.pdf"} { +		fn := filepath.Join(name, name+suffix) +		err := conn.Download(conn.WIPStorageId(), fn, fn) +		if err != nil { +			return fmt.Errorf("Failed to download PDF %s: %v", fn, err) +		} +	} +	return nil +} + +func DownloadAnalyses(name string, conn Pipeliner) error { +	for _, a := range []string{"conf", "graph.png"} { +		fn := filepath.Join(name, a) +		err := conn.Download(conn.WIPStorageId(), fn, fn) +		if err != nil { +			return fmt.Errorf("Failed to download analysis file %s: %v", fn, err) +		} +	} +	return nil +} + +func DownloadAll(name string, conn Pipeliner) error { +	objs, err := conn.ListObjects(conn.WIPStorageId(), name) +	if err != nil { +		return fmt.Errorf("Failed to get list of files for book", name, err) +	} +	for _, i := range objs { +		conn.Log("Downloading", i) +		err = conn.Download(conn.WIPStorageId(), i, i) +		if err != nil { +			return fmt.Errorf("Failed to download file", i, err) +		} +	} +	return nil +} diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go new file mode 100644 index 0000000..20400ad --- /dev/null +++ b/internal/pipeline/pipeline.go @@ -0,0 +1,740 @@ +// Copyright 2020 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// pipeline is a package used by the bookpipeline command, which +// handles the core functionality, using channels heavily to +// coordinate jobs. Note that it is considered an "internal" package, +// not intended for external use, and no guarantee is made of the +// stability of any interfaces provided. +package pipeline + +import ( +	"bytes" +	"fmt" +	"io/ioutil" +	"log" +	"net/smtp" +	"os" +	"os/exec" +	"path/filepath" +	"regexp" +	"sort" +	"strings" +	"time" + +	"rescribe.xyz/bookpipeline" +	"rescribe.xyz/preproc" +	"rescribe.xyz/utils/pkg/hocr" +) + +const HeartbeatSeconds = 60 + +type Clouder interface { +	Init() error +	ListObjects(bucket string, prefix string) ([]string, error) +	Download(bucket string, key string, fn string) error +	Upload(bucket string, key string, path string) error +	CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) +	AddToQueue(url string, msg string) error +	DelFromQueue(url string, handle string) error +	QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) +} + +type Pipeliner interface { +	Clouder +	PreQueueId() string +	WipeQueueId() string +	OCRPageQueueId() string +	AnalyseQueueId() string +	WIPStorageId() string +	GetLogger() *log.Logger +	Log(v ...interface{}) +} + +type MinPipeliner interface { +	Pipeliner +	MinimalInit() error +} + +type pageimg struct { +	hocr, img string +} + +type mailSettings struct { +	server, port, user, pass, from, to string +} + +func GetMailSettings() (mailSettings, error) { +	p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings") +	b, err := ioutil.ReadFile(p) +	if err != nil { +		return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err) +	} +	f := strings.Fields(string(b)) +	if len(f) != 6 { +		return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f)) +	} +	return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil +} + +func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { +	for key := range dl { +		fn := filepath.Join(dir, filepath.Base(key)) +		logger.Println("Downloading", key) +		err := conn.Download(conn.WIPStorageId(), key, fn) +		if err != nil { +			for range dl { +			} // consume the rest of the receiving channel so it isn't blocked +			close(process) +			errc <- err +			return +		} +		process <- fn +	} +	close(process) +} + +func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { +	for path := range c { +		name := filepath.Base(path) +		key := bookname + "/" + name +		logger.Println("Uploading", key) +		err := conn.Upload(conn.WIPStorageId(), key, path) +		if err != nil { +			for range c { +			} // consume the rest of the receiving channel so it isn't blocked +			errc <- err +			return +		} +		err = os.Remove(path) +		if err != nil { +			for range c { +			} // consume the rest of the receiving channel so it isn't blocked +			errc <- err +			return +		} +	} + +	done <- true +} + +func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { +	for path := range c { +		name := filepath.Base(path) +		key := bookname + "/" + name +		logger.Println("Uploading", key) +		err := conn.Upload(conn.WIPStorageId(), key, path) +		if err != nil { +			for range c { +			} // consume the rest of the receiving channel so it isn't blocked +			errc <- err +			return +		} +		err = os.Remove(path) +		if err != nil { +			for range c { +			} // consume the rest of the receiving channel so it isn't blocked +			errc <- err +			return +		} +		logger.Println("Adding", key, training, "to queue", toQueue) +		err = conn.AddToQueue(toQueue, key+" "+training) +		if err != nil { +			for range c { +			} // consume the rest of the receiving channel so it isn't blocked +			errc <- err +			return +		} +	} + +	done <- true +} + +func Preprocess(thresholds []float64) func(chan string, chan string, chan error, *log.Logger) { +	return func(pre chan string, up chan string, errc chan error, logger *log.Logger) { +		for path := range pre { +			logger.Println("Preprocessing", path) +			done, err := preproc.PreProcMulti(path, thresholds, "binary", 0, true, 5, 30, 120, 30) +			if err != nil { +				for range pre { +				} // consume the rest of the receiving channel so it isn't blocked +				errc <- err +				return +			} +			_ = os.Remove(path) +			for _, p := range done { +				up <- p +			} +		} +		close(up) +	} +} + +func Wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { +	for path := range towipe { +		logger.Println("Wiping", path) +		s := strings.Split(path, ".") +		base := strings.Join(s[:len(s)-1], "") +		outpath := base + "_bin0.0.png" +		err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30) +		if err != nil { +			for range towipe { +			} // consume the rest of the receiving channel so it isn't blocked +			errc <- err +			return +		} +		up <- outpath +	} +	close(up) +} + +func Ocr(training string, tesscmd string) func(chan string, chan string, chan error, *log.Logger) { +	return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { +		if tesscmd == "" { +			tesscmd = "tesseract" +		} +		for path := range toocr { +			logger.Println("OCRing", path) +			name := strings.Replace(path, ".png", "", 1) +			cmd := exec.Command(tesscmd, "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") +			var stdout, stderr bytes.Buffer +			cmd.Stdout = &stdout +			cmd.Stderr = &stderr +			err := cmd.Run() +			if err != nil { +				for range toocr { +				} // consume the rest of the receiving channel so it isn't blocked +				errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String()) +				return +			} +			up <- name + ".hocr" +		} +		close(up) +	} +} + +func Analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { +	return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { +		confs := make(map[string][]*bookpipeline.Conf) +		bestconfs := make(map[string]*bookpipeline.Conf) +		savedir := "" + +		for path := range toanalyse { +			if savedir == "" { +				savedir = filepath.Dir(path) +			} +			logger.Println("Calculating confidence for", path) +			avg, err := hocr.GetAvgConf(path) +			if err != nil && err.Error() == "No words found" { +				continue +			} +			if err != nil { +				for range toanalyse { +				} // consume the rest of the receiving channel so it isn't blocked +				errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err) +				return +			} +			base := filepath.Base(path) +			codestart := strings.Index(base, "_bin") +			name := base[0:codestart] +			var c bookpipeline.Conf +			c.Path = path +			c.Code = base[codestart:] +			c.Conf = avg +			confs[name] = append(confs[name], &c) +		} + +		fn := filepath.Join(savedir, "conf") +		logger.Println("Saving confidences in file", fn) +		f, err := os.Create(fn) +		if err != nil { +			errc <- fmt.Errorf("Error creating file %s: %s", fn, err) +			return +		} +		defer f.Close() + +		logger.Println("Finding best confidence for each page, and saving all confidences") +		for base, conf := range confs { +			var best float64 +			for _, c := range conf { +				if c.Conf > best { +					best = c.Conf +					bestconfs[base] = c +				} +				_, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) +				if err != nil { +					errc <- fmt.Errorf("Error writing confidences file: %s", err) +					return +				} +			} +		} +		f.Close() +		up <- fn + +		logger.Println("Creating best file listing the best file for each page") +		fn = filepath.Join(savedir, "best") +		f, err = os.Create(fn) +		if err != nil { +			errc <- fmt.Errorf("Error creating file %s: %s", fn, err) +			return +		} +		defer f.Close() +		for _, conf := range bestconfs { +			_, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) +		} +		f.Close() +		up <- fn + +		var pgs []string +		for _, conf := range bestconfs { +			pgs = append(pgs, conf.Path) +		} +		sort.Strings(pgs) + +		logger.Println("Downloading binarised and original images to create PDFs") +		bookname, err := filepath.Rel(os.TempDir(), savedir) +		if err != nil { +			errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err) +			return +		} +		colourpdf := new(bookpipeline.Fpdf) +		err = colourpdf.Setup() +		if err != nil { +			errc <- fmt.Errorf("Failed to set up PDF: %s", err) +			return +		} +		binarisedpdf := new(bookpipeline.Fpdf) +		err = binarisedpdf.Setup() +		if err != nil { +			errc <- fmt.Errorf("Failed to set up PDF: %s", err) +			return +		} +		binhascontent, colourhascontent := false, false + +		var colourimgs, binimgs []pageimg + +		for _, pg := range pgs { +			base := filepath.Base(pg) +			nosuffix := strings.TrimSuffix(base, ".hocr") +			p := strings.SplitN(base, "_bin", 2) + +			var fn string +			if len(p) > 1 { +				fn = p[0] + ".jpg" +			} else { +				fn = nosuffix + ".jpg" +			} + +			binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"}) +			colourimgs = append(colourimgs, pageimg{hocr: base, img: fn}) +		} + +		for _, pg := range binimgs { +			logger.Println("Downloading binarised page to add to PDF", pg.img) +			err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img)) +			if err != nil { +				logger.Println("Download failed; skipping page", pg.img) +			} else { +				err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true) +				if err != nil { +					errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) +					return +				} +				binhascontent = true +				err = os.Remove(filepath.Join(savedir, pg.img)) +				if err != nil { +					errc <- err +					return +				} +			} +		} + +		if binhascontent { +			fn = filepath.Join(savedir, bookname+".binarised.pdf") +			err = binarisedpdf.Save(fn) +			if err != nil { +				errc <- fmt.Errorf("Failed to save binarised pdf: %s", err) +				return +			} +			up <- fn +			key := bookname + "/" + bookname + ".binarised.pdf" +			conn.Log("Uploading", key) +			err := conn.Upload(conn.WIPStorageId(), key, fn) +			if err != nil { +			} +		} + +		for _, pg := range colourimgs { +			logger.Println("Downloading colour page to add to PDF", pg.img) +			colourfn := pg.img +			err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) +			if err != nil { +				colourfn = strings.Replace(pg.img, ".jpg", ".png", 1) +				logger.Println("Download failed; trying", colourfn) +				err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) +				if err != nil { +					logger.Println("Download failed; skipping page", pg.img) +				} +			} +			if err == nil { +				err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true) +				if err != nil { +					errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) +					return +				} +				colourhascontent = true +				err = os.Remove(filepath.Join(savedir, colourfn)) +				if err != nil { +					errc <- err +					return +				} +			} +		} +		if colourhascontent { +			fn = filepath.Join(savedir, bookname+".colour.pdf") +			err = colourpdf.Save(fn) +			if err != nil { +				errc <- fmt.Errorf("Failed to save colour pdf: %s", err) +				return +			} +			up <- fn +		} + +		logger.Println("Creating graph") +		fn = filepath.Join(savedir, "graph.png") +		f, err = os.Create(fn) +		if err != nil { +			errc <- fmt.Errorf("Error creating file %s: %s", fn, err) +			return +		} +		defer f.Close() +		err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) +		if err != nil && err.Error() != "Not enough valid confidences" { +			errc <- fmt.Errorf("Error rendering graph: %s", err) +			return +		} +		up <- fn + +		close(up) +	} +} + +func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { +	currentmsg := msg +	for range t.C { +		m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) +		if err != nil { +			// This is for better debugging of the heartbeat issue +			conn.Log("Error with heartbeat", err) +			os.Exit(1) +			// TODO: would be better to ensure this error stops any running +			//       processes, as they will ultimately fail in the case of +			//       it. could do this by setting a global variable that +			//       processes check each time they loop. +			errc <- err +			t.Stop() +			return +		} +		if m.Id != "" { +			conn.Log("Replaced message handle as visibilitytimeout limit was reached") +			currentmsg = m +			// TODO: maybe handle communicating new msg more gracefully than this +			for range msgc { +			} // throw away any old msgc +			msgc <- m +		} +	} +} + +// allOCRed checks whether all pages of a book have been OCRed. +// This is determined by whether every _bin0.?.png file has a +// corresponding .hocr file. +func allOCRed(bookname string, conn Pipeliner) bool { +	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) +	if err != nil { +		return false +	} + +	preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) + +	atleastone := false +	for _, png := range objs { +		if preprocessedPattern.MatchString(png) { +			atleastone = true +			found := false +			hocrname := strings.TrimSuffix(png, ".png") + ".hocr" +			for _, hocr := range objs { +				if hocr == hocrname { +					found = true +					break +				} +			} +			if found == false { +				return false +			} +		} +	} +	if atleastone == false { +		return false +	} +	return true +} + +// OcrPage OCRs a page based on a message. It may make sense to +// roll this back into processBook (on which it is based) once +// working well. +func OcrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { +	dl := make(chan string) +	msgc := make(chan bookpipeline.Qmsg) +	processc := make(chan string) +	upc := make(chan string) +	done := make(chan bool) +	errc := make(chan error) + +	msgparts := strings.Split(msg.Body, " ") +	bookname := filepath.Dir(msgparts[0]) +	if len(msgparts) > 1 && msgparts[1] != "" { +		process = Ocr(msgparts[1], "") +	} + +	d := filepath.Join(os.TempDir(), bookname) +	err := os.MkdirAll(d, 0755) +	if err != nil { +		return fmt.Errorf("Failed to create directory %s: %s", d, err) +	} + +	t := time.NewTicker(HeartbeatSeconds * time.Second) +	go heartbeat(conn, t, msg, fromQueue, msgc, errc) + +	// these functions will do their jobs when their channels have data +	go download(dl, processc, conn, d, errc, conn.GetLogger()) +	go process(processc, upc, errc, conn.GetLogger()) +	go up(upc, done, conn, bookname, errc, conn.GetLogger()) + +	dl <- msgparts[0] +	close(dl) + +	// wait for either the done or errc channel to be sent to +	select { +	case err = <-errc: +		t.Stop() +		_ = os.RemoveAll(d) +		return err +	case <-done: +	} + +	if allOCRed(bookname, conn) && toQueue != "" { +		conn.Log("Sending", bookname, "to queue", toQueue) +		err = conn.AddToQueue(toQueue, bookname) +		if err != nil { +			t.Stop() +			_ = os.RemoveAll(d) +			return fmt.Errorf("Error adding to queue %s: %s", bookname, err) +		} +	} + +	t.Stop() + +	// check whether we're using a newer msg handle +	select { +	case m, ok := <-msgc: +		if ok { +			msg = m +			conn.Log("Using new message handle to delete message from queue") +		} +	default: +		conn.Log("Using original message handle to delete message from queue") +	} + +	conn.Log("Deleting original message from queue", fromQueue) +	err = conn.DelFromQueue(fromQueue, msg.Handle) +	if err != nil { +		_ = os.RemoveAll(d) +		return fmt.Errorf("Error deleting message from queue: %s", err) +	} + +	err = os.RemoveAll(d) +	if err != nil { +		return fmt.Errorf("Failed to remove directory %s: %s", d, err) +	} + +	return nil +} + +func ProcessBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { +	dl := make(chan string) +	msgc := make(chan bookpipeline.Qmsg) +	processc := make(chan string) +	upc := make(chan string) +	done := make(chan bool) +	errc := make(chan error) + +	msgparts := strings.Split(msg.Body, " ") +	bookname := msgparts[0] + +	var training string +	if len(msgparts) > 1 { +		training = msgparts[1] +	} + +	d := filepath.Join(os.TempDir(), bookname) +	err := os.MkdirAll(d, 0755) +	if err != nil { +		return fmt.Errorf("Failed to create directory %s: %s", d, err) +	} + +	t := time.NewTicker(HeartbeatSeconds * time.Second) +	go heartbeat(conn, t, msg, fromQueue, msgc, errc) + +	// these functions will do their jobs when their channels have data +	go download(dl, processc, conn, d, errc, conn.GetLogger()) +	go process(processc, upc, errc, conn.GetLogger()) +	if toQueue == conn.OCRPageQueueId() { +		go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) +	} else { +		go up(upc, done, conn, bookname, errc, conn.GetLogger()) +	} + +	conn.Log("Getting list of objects to download") +	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) +	if err != nil { +		t.Stop() +		_ = os.RemoveAll(d) +		return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err) +	} +	var todl []string +	for _, n := range objs { +		if !match.MatchString(n) { +			conn.Log("Skipping item that doesn't match target", n) +			continue +		} +		todl = append(todl, n) +	} +	for _, a := range todl { +		dl <- a +	} +	close(dl) + +	// wait for either the done or errc channel to be sent to +	select { +	case err = <-errc: +		t.Stop() +		_ = os.RemoveAll(d) +		// if the error is in preprocessing / wipeonly, chances are that it will never +		// complete, and will fill the ocrpage queue with parts which succeeded +		// on each run, so in that case it's better to delete the message from +		// the queue and notify us. +		if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { +			conn.Log("Deleting message from queue due to a bad error", fromQueue) +			err2 := conn.DelFromQueue(fromQueue, msg.Handle) +			if err2 != nil { +				conn.Log("Error deleting message from queue", err2) +			} +			ms, err2 := GetMailSettings() +			if err2 != nil { +				conn.Log("Failed to mail settings ", err2) +			} +			if err2 == nil && ms.server != "" { +				logs, err2 := getLogs() +				if err2 != nil { +					conn.Log("Failed to get logs ", err2) +					logs = "" +				} +				msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n"+ +					"Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n"+ +					" Fail message: %s\r\nFull log:\r\n%s\r\n", +					ms.to, ms.from, bookname, err, logs) +				host := fmt.Sprintf("%s:%s", ms.server, ms.port) +				auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server) +				err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg)) +				if err2 != nil { +					conn.Log("Error sending email ", err2) +				} +			} +		} +		return err +	case <-done: +	} + +	if toQueue != "" && toQueue != conn.OCRPageQueueId() { +		conn.Log("Sending", bookname, "to queue", toQueue) +		err = conn.AddToQueue(toQueue, bookname) +		if err != nil { +			t.Stop() +			_ = os.RemoveAll(d) +			return fmt.Errorf("Error adding to queue %s: %s", bookname, err) +		} +	} + +	t.Stop() + +	// check whether we're using a newer msg handle +	select { +	case m, ok := <-msgc: +		if ok { +			msg = m +			conn.Log("Using new message handle to delete message from queue") +		} +	default: +		conn.Log("Using original message handle to delete message from queue") +	} + +	conn.Log("Deleting original message from queue", fromQueue) +	err = conn.DelFromQueue(fromQueue, msg.Handle) +	if err != nil { +		_ = os.RemoveAll(d) +		return fmt.Errorf("Error deleting message from queue: %s", err) +	} + +	err = os.RemoveAll(d) +	if err != nil { +		return fmt.Errorf("Failed to remove directory %s: %s", d, err) +	} + +	return nil +} + +// TODO: rather than relying on journald, would be nicer to save the logs +//       ourselves maybe, so that we weren't relying on a particular systemd +//       setup. this can be done by having the conn.Log also append line +//       to a file (though that would mean everything would have to go through +//       conn.Log, which we're not consistently doing yet). the correct thing +//       to do then would be to implement a new interface that covers the part +//       of log.Logger we use (e.g. Print and Printf), and then have an exported +//       conn struct that implements those, so that we could pass a log.Logger +//       or the new conn struct everywhere (we wouldn't be passing a log.Logger, +//       it's just good to be able to keep the compatibility) +func getLogs() (string, error) { +	cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all") +	var stdout, stderr bytes.Buffer +	cmd.Stdout = &stdout +	cmd.Stderr = &stderr +	err := cmd.Run() +	return stdout.String(), err +} + +func SaveLogs(conn Pipeliner, starttime int64, hostname string) error { +	logs, err := getLogs() +	if err != nil { +		return fmt.Errorf("Error getting logs, error: %v", err) +	} +	key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname) +	path := filepath.Join(os.TempDir(), key) +	f, err := os.Create(path) +	if err != nil { +		return fmt.Errorf("Error creating log file", err) +	} +	defer f.Close() +	_, err = f.WriteString(logs) +	if err != nil { +		return fmt.Errorf("Error saving log file", err) +	} +	_ = f.Close() +	err = conn.Upload(conn.WIPStorageId(), key, path) +	if err != nil { +		return fmt.Errorf("Error uploading log", err) +	} +	conn.Log("Log saved to", key) +	return nil +} diff --git a/internal/pipeline/put.go b/internal/pipeline/put.go new file mode 100644 index 0000000..4b38ea5 --- /dev/null +++ b/internal/pipeline/put.go @@ -0,0 +1,85 @@ +// Copyright 2020 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +package pipeline + +import ( +	"fmt" +	"image" +	_ "image/jpeg" +	_ "image/png" +	"os" +	"path/filepath" +) + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { +	return len(p), nil +} + +type fileWalk chan string + +func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { +	if err != nil { +		return err +	} +	if !info.IsDir() { +		f <- path +	} +	return nil +} + +func CheckImages(dir string) error { +	checker := make(fileWalk) +	go func() { +		_ = filepath.Walk(dir, checker.Walk) +		close(checker) +	}() + +	for path := range checker { +		f, err := os.Open(path) +		if err != nil { +			return fmt.Errorf("Opening image %s failed: %v", path, err) +		} +		_, _, err = image.Decode(f) +		if err != nil { +			return fmt.Errorf("Decoding image %s failed: %v", path, err) +		} +	} + +	return nil +} + +func DetectQueueType(dir string, conn Pipeliner) string { +	// Auto detect type of queue to send to based on file extension +	pngdirs, _ := filepath.Glob(dir + "/*.png") +	jpgdirs, _ := filepath.Glob(dir + "/*.jpg") +	pngcount := len(pngdirs) +	jpgcount := len(jpgdirs) +	if pngcount > jpgcount { +		return conn.WipeQueueId() +	} else { +		return conn.PreQueueId() +	} +} + +func UploadImages(dir string, bookname string, conn Pipeliner) error { +	walker := make(fileWalk) +	go func() { +		_ = filepath.Walk(dir, walker.Walk) +		close(walker) +	}() + +	for path := range walker { +		name := filepath.Base(path) +		err := conn.Upload(conn.WIPStorageId(), filepath.Join(bookname, name), path) +		if err != nil { +			return fmt.Errorf("Failed to upload %s: %v", path, err) +		} +	} + +	return nil +} @@ -27,7 +27,7 @@ const storageId = "storage"  type LocalConn struct {  	// these should be set before running Init(), or left to defaults  	TempDir string -	Logger *log.Logger +	Logger  *log.Logger  }  // MinimalInit does the bare minimum initialisation @@ -36,7 +36,7 @@ func (a *LocalConn) MinimalInit() error {  	if a.TempDir == "" {  		a.TempDir = filepath.Join(os.TempDir(), "bookpipeline")  	} -	err = os.Mkdir(a.TempDir, 0700) +	err = os.MkdirAll(a.TempDir, 0700)  	if err != nil && !os.IsExist(err) {  		return fmt.Errorf("Error creating temporary directory: %v", err)  	} @@ -134,6 +134,7 @@ func prefixwalker(dirpath string, prefix string, list *[]ObjMeta) filepath.WalkF  		}  		n := strings.TrimPrefix(path, dirpath)  		n = strings.TrimPrefix(n, "/") +		n = strings.TrimPrefix(n, "\\")  		o := ObjMeta{Name: n, Date: info.ModTime()}  		*list = append(*list, o)  		return nil @@ -184,12 +185,12 @@ func (a *LocalConn) DelFromQueue(url string, handle string) error {  	// store the joining of part before and part after handle  	var complete string -	if len(s) >= len(handle) + 1 { +	if len(s) >= len(handle)+1 {  		if i > 0 {  			complete = s[:i]  		}  		// the '+1' is for the newline character -		complete += s[i + len(handle) + 1:] +		complete += s[i+len(handle)+1:]  	}  	f, err := os.Create(filepath.Join(a.TempDir, url)) diff --git a/makefile b/makefile new file mode 100644 index 0000000..6ba1af5 --- /dev/null +++ b/makefile @@ -0,0 +1,12 @@ +# See LICENSE file for copyright and license details. + +default: +	@echo "To build and install use the basic go tools like so: go install ./..." +	@echo "This makefile is just for cross compiling (for which the" +	@echo "targets rescribe-osx and rescribe-w32 exist)" + +rescribe-osx: +	GOOS=darwin GOARCH=amd64 go build -o $@ ./cmd/rescribe + +rescribe.exe: +	GOOS=windows GOARCH=386 go build -o $@ ./cmd/rescribe @@ -16,7 +16,7 @@ import (  	"io/ioutil"  	"os" -	"github.com/jung-kurt/gofpdf" +	"github.com/phpdave11/gofpdf"  	"golang.org/x/image/draw"  	"rescribe.xyz/utils/pkg/hocr"  ) | 
