diff options
| author | Nick White <git@njw.name> | 2021-02-05 17:15:51 +0000 | 
|---|---|---|
| committer | Nick White <git@njw.name> | 2021-02-05 17:15:51 +0000 | 
| commit | 11470933e4fd379b4aefa4e2bab33662a72791c2 (patch) | |
| tree | 8607e7739989ff63032b9ce10a8bf8553ecc6eb6 /cmd | |
| parent | 3e7da751b3ca917adb79674eac4ef2a3267e3984 (diff) | |
| parent | a8c7481f0dc02bbda3b3a07091e9d61f6eb728b2 (diff) | |
Merge branch 'master' of ssh://ssh.phx.nearlyfreespeech.net/home/public/bookpipeline
Diffstat (limited to 'cmd')
| -rw-r--r-- | cmd/bookpipeline/main.go | 765 | ||||
| -rw-r--r-- | cmd/booktopipeline/main.go | 92 | ||||
| -rw-r--r-- | cmd/getbests/main.go | 4 | ||||
| -rw-r--r-- | cmd/getpipelinebook/main.go | 98 | ||||
| -rw-r--r-- | cmd/logwholequeue/main.go | 85 | ||||
| -rw-r--r-- | cmd/lspipeline/main.go | 82 | ||||
| -rw-r--r-- | cmd/postprocess-bythresh/main.go | 71 | ||||
| -rw-r--r-- | cmd/rescribe/main.go | 395 | ||||
| -rw-r--r-- | cmd/rmbook/main.go | 87 | ||||
| -rw-r--r-- | cmd/trimqueue/main.go | 84 | 
10 files changed, 840 insertions, 923 deletions
| diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 36295a6..909b431 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -11,23 +11,18 @@ import (  	"bytes"  	"flag"  	"fmt" -	"io/ioutil"  	"log" -	"net/smtp"  	"os"  	"os/exec" -	"path/filepath"  	"regexp" -	"sort" -	"strings"  	"time"  	"rescribe.xyz/bookpipeline" -	"rescribe.xyz/preproc" -	"rescribe.xyz/utils/pkg/hocr" + +	"rescribe.xyz/bookpipeline/internal/pipeline"  ) -const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] +const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] [-autostop secs]  Watches the preprocess, wipeonly, ocrpage and analyse queues for messages.  When one is found this general process is followed: @@ -47,10 +42,9 @@ this put a text file in {UserConfigDir}/bookpipeline/mailsettings with  the contents: {smtpserver} {port} {username} {password} {from} {to}  ` +const QueueTimeoutSecs = 2 * 60  const PauseBetweenChecks = 3 * time.Minute -const TimeBeforeShutdown = 5 * time.Minute  const LogSaveTime = 1 * time.Minute -const HeartbeatSeconds = 60  // null writer to enable non-verbose logging to be discarded  type NullWriter bool @@ -81,686 +75,16 @@ type Pipeliner interface {  	Log(v ...interface{})  } -type pageimg struct { -	hocr, img string -} - -type mailSettings struct { -	server, port, user, pass, from, to string -} - -func getMailSettings() (mailSettings, error) { -	p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings") -	b, err := ioutil.ReadFile(p) -	if err != nil { -		return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err) -	} -	f := strings.Fields(string(b)) -	if len(f) != 6 { -		return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f)) -	} -	return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil -} - -func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { -	for key := range dl { -		fn := filepath.Join(dir, filepath.Base(key)) -		logger.Println("Downloading", key) -		err := conn.Download(conn.WIPStorageId(), key, fn) -		if err != nil { -			for range dl { -			} // consume the rest of the receiving channel so it isn't blocked -			close(process) -			errc <- err -			return -		} -		process <- fn -	} -	close(process) -} - -func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { -	for path := range c { -		name := filepath.Base(path) -		key := bookname + "/" + name -		logger.Println("Uploading", key) -		err := conn.Upload(conn.WIPStorageId(), key, path) -		if err != nil { -			for range c { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -		err = os.Remove(path) -		if err != nil { -			for range c { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -	} - -	done <- true -} - -func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { -	for path := range c { -		name := filepath.Base(path) -		key := bookname + "/" + name -		logger.Println("Uploading", key) -		err := conn.Upload(conn.WIPStorageId(), key, path) -		if err != nil { -			for range c { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -		err = os.Remove(path) -		if err != nil { -			for range c { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -		logger.Println("Adding", key, training, "to queue", toQueue) -		err = conn.AddToQueue(toQueue, key+" "+training) -		if err != nil { -			for range c { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -	} - -	done <- true -} - -func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { -	for path := range pre { -		logger.Println("Preprocessing", path) -		done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30) -		if err != nil { -			for range pre { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -		_ = os.Remove(path) -		for _, p := range done { -			up <- p -		} -	} -	close(up) -} - -func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { -	for path := range towipe { -		logger.Println("Wiping", path) -		s := strings.Split(path, ".") -		base := strings.Join(s[:len(s)-1], "") -		outpath := base + "_bin0.0.png" -		err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30) -		if err != nil { -			for range towipe { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -		up <- outpath -	} -	close(up) -} - -func ocr(training string) func(chan string, chan string, chan error, *log.Logger) { -	return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { -		for path := range toocr { -			logger.Println("OCRing", path) -			name := strings.Replace(path, ".png", "", 1) -			cmd := exec.Command("tesseract", "-l", training, path, name, "hocr") -			var stdout, stderr bytes.Buffer -			cmd.Stdout = &stdout -			cmd.Stderr = &stderr -			err := cmd.Run() -			if err != nil { -				for range toocr { -				} // consume the rest of the receiving channel so it isn't blocked -				errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String()) -				return -			} -			up <- name + ".hocr" -		} -		close(up) -	} -} - -func analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { -	return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { -		confs := make(map[string][]*bookpipeline.Conf) -		bestconfs := make(map[string]*bookpipeline.Conf) -		savedir := "" - -		for path := range toanalyse { -			if savedir == "" { -				savedir = filepath.Dir(path) -			} -			logger.Println("Calculating confidence for", path) -			avg, err := hocr.GetAvgConf(path) -			if err != nil && err.Error() == "No words found" { -				continue -			} -			if err != nil { -				for range toanalyse { -				} // consume the rest of the receiving channel so it isn't blocked -				errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err) -				return -			} -			base := filepath.Base(path) -			codestart := strings.Index(base, "_bin") -			name := base[0:codestart] -			var c bookpipeline.Conf -			c.Path = path -			c.Code = base[codestart:] -			c.Conf = avg -			confs[name] = append(confs[name], &c) -		} - -		fn := filepath.Join(savedir, "conf") -		logger.Println("Saving confidences in file", fn) -		f, err := os.Create(fn) -		if err != nil { -			errc <- fmt.Errorf("Error creating file %s: %s", fn, err) -			return -		} -		defer f.Close() - -		logger.Println("Finding best confidence for each page, and saving all confidences") -		for base, conf := range confs { -			var best float64 -			for _, c := range conf { -				if c.Conf > best { -					best = c.Conf -					bestconfs[base] = c -				} -				_, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) -				if err != nil { -					errc <- fmt.Errorf("Error writing confidences file: %s", err) -					return -				} -			} -		} -		up <- fn - -		logger.Println("Creating best file listing the best file for each page") -		fn = filepath.Join(savedir, "best") -		f, err = os.Create(fn) -		if err != nil { -			errc <- fmt.Errorf("Error creating file %s: %s", fn, err) -			return -		} -		defer f.Close() -		for _, conf := range bestconfs { -			_, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) -		} -		up <- fn - -		var pgs []string -		for _, conf := range bestconfs { -			pgs = append(pgs, conf.Path) -		} -		sort.Strings(pgs) - -		logger.Println("Downloading binarised and original images to create PDFs") -		bookname, err := filepath.Rel(os.TempDir(), savedir) -		if err != nil { -			errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err) -			return -		} -		colourpdf := new(bookpipeline.Fpdf) -		err = colourpdf.Setup() -		if err != nil { -			errc <- fmt.Errorf("Failed to set up PDF: %s", err) -			return -		} -		binarisedpdf := new(bookpipeline.Fpdf) -		err = binarisedpdf.Setup() -		if err != nil { -			errc <- fmt.Errorf("Failed to set up PDF: %s", err) -			return -		} -		binhascontent, colourhascontent := false, false - -		var colourimgs, binimgs []pageimg - -		for _, pg := range pgs { -			base := filepath.Base(pg) -			nosuffix := strings.TrimSuffix(base, ".hocr") -			p := strings.SplitN(base, "_bin", 2) - -			var fn string -			if len(p) > 1 { -				fn = p[0] + ".jpg" -			} else { -				fn = nosuffix + ".jpg" -			} - -			binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"}) -			colourimgs = append(colourimgs, pageimg{hocr: base, img: fn}) -		} - -		for _, pg := range binimgs { -			logger.Println("Downloading binarised page to add to PDF", pg.img) -			err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img)) -			if err != nil { -				logger.Println("Download failed; skipping page", pg.img) -			} else { -				err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true) -				if err != nil { -					errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) -					return -				} -				binhascontent = true -				err = os.Remove(filepath.Join(savedir, pg.img)) -				if err != nil { -					errc <- err -					return -				} -			} -		} - -		if binhascontent { -			fn = filepath.Join(savedir, bookname+".binarised.pdf") -			err = binarisedpdf.Save(fn) -			if err != nil { -				errc <- fmt.Errorf("Failed to save binarised pdf: %s", err) -				return -			} -			up <- fn -			key := bookname + "/" + bookname + ".binarised.pdf" -			conn.Log("Uploading", key) -			err := conn.Upload(conn.WIPStorageId(), key, fn) -			if err != nil { -			} -		} - -		for _, pg := range colourimgs { -			logger.Println("Downloading colour page to add to PDF", pg.img) -			colourfn := pg.img -			err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) -			if err != nil { -				colourfn = strings.Replace(pg.img, ".jpg", ".png", 1) -				logger.Println("Download failed; trying", colourfn) -				err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) -				if err != nil { -					logger.Println("Download failed; skipping page", pg.img) -				} -			} -			if err == nil { -				err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true) -				if err != nil { -					errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) -					return -				} -				colourhascontent = true -				err = os.Remove(filepath.Join(savedir, colourfn)) -				if err != nil { -					errc <- err -					return -				} -			} -		} -		if colourhascontent { -			fn = filepath.Join(savedir, bookname+".colour.pdf") -			err = colourpdf.Save(fn) -			if err != nil { -				errc <- fmt.Errorf("Failed to save colour pdf: %s", err) -				return -			} -			up <- fn -		} - -		logger.Println("Creating graph") -		fn = filepath.Join(savedir, "graph.png") -		f, err = os.Create(fn) -		if err != nil { -			errc <- fmt.Errorf("Error creating file %s: %s", fn, err) -			return -		} -		defer f.Close() -		err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) -		if err != nil && err.Error() != "Not enough valid confidences" { -			errc <- fmt.Errorf("Error rendering graph: %s", err) -			return -		} -		up <- fn - -		close(up) -	} -} - -func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { -	currentmsg := msg -	for range t.C { -		m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) -		if err != nil { -			// This is for better debugging of the heartbeat issue -			conn.Log("Error with heartbeat", err) -			os.Exit(1) -			// TODO: would be better to ensure this error stops any running -			//       processes, as they will ultimately fail in the case of -			//       it. could do this by setting a global variable that -			//       processes check each time they loop. -			errc <- err -			t.Stop() -			return -		} -		if m.Id != "" { -			conn.Log("Replaced message handle as visibilitytimeout limit was reached") -			currentmsg = m -			// TODO: maybe handle communicating new msg more gracefully than this -			for range msgc { -			} // throw away any old msgc -			msgc <- m -		} -	} -} - -// allOCRed checks whether all pages of a book have been OCRed. -// This is determined by whether every _bin0.?.png file has a -// corresponding .hocr file. -func allOCRed(bookname string, conn Pipeliner) bool { -	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) -	if err != nil { -		return false -	} - -	preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) - -	atleastone := false -	for _, png := range objs { -		if preprocessedPattern.MatchString(png) { -			atleastone = true -			found := false -			b := strings.TrimSuffix(filepath.Base(png), ".png") -			hocrname := bookname + "/" + b + ".hocr" -			for _, hocr := range objs { -				if hocr == hocrname { -					found = true -					break -				} -			} -			if found == false { -				return false -			} -		} -	} -	if atleastone == false { -		return false -	} -	return true -} - -// ocrPage OCRs a page based on a message. It may make sense to -// roll this back into processBook (on which it is based) once -// working well. -func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { -	dl := make(chan string) -	msgc := make(chan bookpipeline.Qmsg) -	processc := make(chan string) -	upc := make(chan string) -	done := make(chan bool) -	errc := make(chan error) - -	msgparts := strings.Split(msg.Body, " ") -	bookname := filepath.Dir(msgparts[0]) -	if len(msgparts) > 1 && msgparts[1] != "" { -		process = ocr(msgparts[1]) -	} - -	d := filepath.Join(os.TempDir(), bookname) -	err := os.MkdirAll(d, 0755) -	if err != nil { -		return fmt.Errorf("Failed to create directory %s: %s", d, err) -	} - -	t := time.NewTicker(HeartbeatSeconds * time.Second) -	go heartbeat(conn, t, msg, fromQueue, msgc, errc) - -	// these functions will do their jobs when their channels have data -	go download(dl, processc, conn, d, errc, conn.GetLogger()) -	go process(processc, upc, errc, conn.GetLogger()) -	go up(upc, done, conn, bookname, errc, conn.GetLogger()) - -	dl <- msgparts[0] -	close(dl) - -	// wait for either the done or errc channel to be sent to -	select { -	case err = <-errc: -		t.Stop() -		_ = os.RemoveAll(d) -		return err -	case <-done: -	} - -	if allOCRed(bookname, conn) && toQueue != "" { -		conn.Log("Sending", bookname, "to queue", toQueue) -		err = conn.AddToQueue(toQueue, bookname) -		if err != nil { -			t.Stop() -			_ = os.RemoveAll(d) -			return fmt.Errorf("Error adding to queue %s: %s", bookname, err) -		} -	} - -	t.Stop() - -	// check whether we're using a newer msg handle -	select { -	case m, ok := <-msgc: -		if ok { -			msg = m -			conn.Log("Using new message handle to delete message from queue") -		} -	default: -		conn.Log("Using original message handle to delete message from queue") -	} - -	conn.Log("Deleting original message from queue", fromQueue) -	err = conn.DelFromQueue(fromQueue, msg.Handle) -	if err != nil { -		_ = os.RemoveAll(d) -		return fmt.Errorf("Error deleting message from queue: %s", err) -	} - -	err = os.RemoveAll(d) -	if err != nil { -		return fmt.Errorf("Failed to remove directory %s: %s", d, err) -	} - -	return nil -} - -func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { -	dl := make(chan string) -	msgc := make(chan bookpipeline.Qmsg) -	processc := make(chan string) -	upc := make(chan string) -	done := make(chan bool) -	errc := make(chan error) - -	msgparts := strings.Split(msg.Body, " ") -	bookname := msgparts[0] - -	var training string -	if len(msgparts) > 1 { -		training = msgparts[1] -	} - -	d := filepath.Join(os.TempDir(), bookname) -	err := os.MkdirAll(d, 0755) -	if err != nil { -		return fmt.Errorf("Failed to create directory %s: %s", d, err) -	} - -	t := time.NewTicker(HeartbeatSeconds * time.Second) -	go heartbeat(conn, t, msg, fromQueue, msgc, errc) - -	// these functions will do their jobs when their channels have data -	go download(dl, processc, conn, d, errc, conn.GetLogger()) -	go process(processc, upc, errc, conn.GetLogger()) -	if toQueue == conn.OCRPageQueueId() { -		go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) -	} else { -		go up(upc, done, conn, bookname, errc, conn.GetLogger()) -	} - -	conn.Log("Getting list of objects to download") -	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) -	if err != nil { -		t.Stop() -		_ = os.RemoveAll(d) -		return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err) -	} -	var todl []string -	for _, n := range objs { -		if !match.MatchString(n) { -			conn.Log("Skipping item that doesn't match target", n) -			continue -		} -		todl = append(todl, n) -	} -	for _, a := range todl { -		dl <- a -	} -	close(dl) - -	// wait for either the done or errc channel to be sent to -	select { -	case err = <-errc: -		t.Stop() -		_ = os.RemoveAll(d) -		// if the error is in preprocessing / wipeonly, chances are that it will never -		// complete, and will fill the ocrpage queue with parts which succeeded -		// on each run, so in that case it's better to delete the message from -		// the queue and notify us. -		if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { -			conn.Log("Deleting message from queue due to a bad error", fromQueue) -			err2 := conn.DelFromQueue(fromQueue, msg.Handle) -			if err2 != nil { -				conn.Log("Error deleting message from queue", err2) -			} -			ms, err2 := getMailSettings() -			if err2 != nil { -				conn.Log("Failed to mail settings ", err2) -			} -			if err2 == nil && ms.server != "" { -				logs, err2 := getlogs() -				if err2 != nil { -					conn.Log("Failed to get logs ", err2) -					logs = "" -				} -				msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" + -					"Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" + -					" Fail message: %s\r\nFull log:\r\n%s\r\n", -					ms.to, ms.from, bookname, err, logs) -				host := fmt.Sprintf("%s:%s", ms.server, ms.port) -				auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server) -				err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg)) -				if err2 != nil { -					conn.Log("Error sending email ", err2) -				} -			} -		} -		return err -	case <-done: -	} - -	if toQueue != "" && toQueue != conn.OCRPageQueueId() { -		conn.Log("Sending", bookname, "to queue", toQueue) -		err = conn.AddToQueue(toQueue, bookname) -		if err != nil { -			t.Stop() -			_ = os.RemoveAll(d) -			return fmt.Errorf("Error adding to queue %s: %s", bookname, err) -		} -	} - -	t.Stop() - -	// check whether we're using a newer msg handle -	select { -	case m, ok := <-msgc: -		if ok { -			msg = m -			conn.Log("Using new message handle to delete message from queue") -		} -	default: -		conn.Log("Using original message handle to delete message from queue") -	} - -	conn.Log("Deleting original message from queue", fromQueue) -	err = conn.DelFromQueue(fromQueue, msg.Handle) -	if err != nil { -		_ = os.RemoveAll(d) -		return fmt.Errorf("Error deleting message from queue: %s", err) -	} - -	err = os.RemoveAll(d) -	if err != nil { -		return fmt.Errorf("Failed to remove directory %s: %s", d, err) -	} - -	return nil -} -  func stopTimer(t *time.Timer) {  	if !t.Stop() {  		<-t.C  	}  } -// TODO: rather than relying on journald, would be nicer to save the logs -//       ourselves maybe, so that we weren't relying on a particular systemd -//       setup. this can be done by having the conn.Log also append line -//       to a file (though that would mean everything would have to go through -//       conn.Log, which we're not consistently doing yet). the correct thing -//       to do then would be to implement a new interface that covers the part -//       of log.Logger we use (e.g. Print and Printf), and then have an exported -//       conn struct that implements those, so that we could pass a log.Logger -//       or the new conn struct everywhere (we wouldn't be passing a log.Logger, -//       it's just good to be able to keep the compatibility) -func getlogs() (string, error) { -	cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all") -	var stdout, stderr bytes.Buffer -	cmd.Stdout = &stdout -	cmd.Stderr = &stderr -	err := cmd.Run() -	return stdout.String(), err -} - -func savelogs(conn Pipeliner, starttime int64, hostname string) error { -	logs, err := getlogs() -	if err != nil { -		return fmt.Errorf("Error getting logs, error: %v", err) +func resetTimer(t *time.Timer, d time.Duration) { +	if d > 0 { +		t.Reset(d)  	} -	key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname) -	path := filepath.Join(os.TempDir(), key) -	f, err := os.Create(path) -	if err != nil { -		return fmt.Errorf("Error creating log file", err) -	} -	defer f.Close() -	_, err = f.WriteString(logs) -	if err != nil { -		return fmt.Errorf("Error saving log file", err) -	} -	_ = f.Close() -	err = conn.Upload(conn.WIPStorageId(), key, path) -	if err != nil { -		return fmt.Errorf("Error uploading log", err) -	} -	conn.Log("Log saved to", key) -	return nil  }  func main() { @@ -770,7 +94,8 @@ func main() {  	nowipe := flag.Bool("nw", false, "disable wipeonly")  	noocrpg := flag.Bool("nop", false, "disable ocr on individual pages")  	noanalyse := flag.Bool("na", false, "disable analysis") -	autoshutdown := flag.Bool("shutdown", false, "automatically shut down if no work has been available for 5 minutes") +	autostop := flag.Int64("autostop", 300, "automatically stop process if no work has been available for this number of seconds (to disable autostop set to 0)") +	autoshutdown := flag.Bool("shutdown", false, "automatically shut down host computer if there has been no work to do for the duration set with -autostop")  	conntype := flag.String("c", "aws", "connection type ('aws' or 'local')")  	flag.Usage = func() { @@ -801,17 +126,20 @@ func main() {  		log.Fatalln("Unknown connection type")  	} -	_, err := getMailSettings() -	if err != nil { -		conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err) +	var err error +	if *conntype != "local" { +		_, err = pipeline.GetMailSettings() +		if err != nil { +			conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err) +		}  	} -	conn.Log("Setting up AWS session") +	conn.Log("Setting up session")  	err = conn.Init()  	if err != nil { -		log.Fatalln("Error setting up cloud connection:", err) +		log.Fatalln("Error setting up connection:", err)  	} -	conn.Log("Finished setting up AWS session") +	conn.Log("Finished setting up session")  	starttime := time.Now().Unix()  	hostname, err := os.Hostname() @@ -820,7 +148,7 @@ func main() {  	var checkWipeQueue <-chan time.Time  	var checkOCRPageQueue <-chan time.Time  	var checkAnalyseQueue <-chan time.Time -	var shutdownIfQuiet *time.Timer +	var stopIfQuiet *time.Timer  	var savelognow *time.Ticker  	if !*nopreproc {  		checkPreQueue = time.After(0) @@ -834,13 +162,21 @@ func main() {  	if !*noanalyse {  		checkAnalyseQueue = time.After(0)  	} -	shutdownIfQuiet = time.NewTimer(TimeBeforeShutdown) +	var quietTime = time.Duration(*autostop) * time.Second +	stopIfQuiet = time.NewTimer(quietTime) +	if quietTime == 0 { +		stopIfQuiet.Stop() +	} +  	savelognow = time.NewTicker(LogSaveTime) +	if *conntype == "local" { +		savelognow.Stop() +	}  	for {  		select {  		case <-checkPreQueue: -			msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatSeconds*2) +			msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs)  			checkPreQueue = time.After(PauseBetweenChecks)  			if err != nil {  				conn.Log("Error checking preprocess queue", err) @@ -851,14 +187,14 @@ func main() {  				continue  			}  			conn.Log("Message received on preprocess queue, processing", msg.Body) -			stopTimer(shutdownIfQuiet) -			err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) -			shutdownIfQuiet.Reset(TimeBeforeShutdown) +			stopTimer(stopIfQuiet) +			err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess([]float64{0.1, 0.2, 0.4, 0.5}), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) +			resetTimer(stopIfQuiet, quietTime)  			if err != nil {  				conn.Log("Error during preprocess", err)  			}  		case <-checkWipeQueue: -			msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatSeconds*2) +			msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs)  			checkWipeQueue = time.After(PauseBetweenChecks)  			if err != nil {  				conn.Log("Error checking wipeonly queue", err) @@ -868,15 +204,15 @@ func main() {  				conn.Log("No message received on wipeonly queue, sleeping")  				continue  			} -			stopTimer(shutdownIfQuiet) +			stopTimer(stopIfQuiet)  			conn.Log("Message received on wipeonly queue, processing", msg.Body) -			err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) -			shutdownIfQuiet.Reset(TimeBeforeShutdown) +			err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) +			resetTimer(stopIfQuiet, quietTime)  			if err != nil {  				conn.Log("Error during wipe", err)  			}  		case <-checkOCRPageQueue: -			msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatSeconds*2) +			msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs)  			checkOCRPageQueue = time.After(PauseBetweenChecks)  			if err != nil {  				conn.Log("Error checking OCR Page queue", err) @@ -888,15 +224,15 @@ func main() {  			// Have OCRPageQueue checked immediately after completion, as chances are high that  			// there will be more pages that should be done without delay  			checkOCRPageQueue = time.After(0) -			stopTimer(shutdownIfQuiet) +			stopTimer(stopIfQuiet)  			conn.Log("Message received on OCR Page queue, processing", msg.Body) -			err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) -			shutdownIfQuiet.Reset(TimeBeforeShutdown) +			err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training, ""), conn.OCRPageQueueId(), conn.AnalyseQueueId()) +			resetTimer(stopIfQuiet, quietTime)  			if err != nil {  				conn.Log("Error during OCR Page process", err)  			}  		case <-checkAnalyseQueue: -			msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatSeconds*2) +			msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs)  			checkAnalyseQueue = time.After(PauseBetweenChecks)  			if err != nil {  				conn.Log("Error checking analyse queue", err) @@ -906,25 +242,30 @@ func main() {  				conn.Log("No message received on analyse queue, sleeping")  				continue  			} -			stopTimer(shutdownIfQuiet) +			stopTimer(stopIfQuiet)  			conn.Log("Message received on analyse queue, processing", msg.Body) -			err = processBook(msg, conn, analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") -			shutdownIfQuiet.Reset(TimeBeforeShutdown) +			err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") +			resetTimer(stopIfQuiet, quietTime)  			if err != nil {  				conn.Log("Error during analysis", err)  			}  		case <-savelognow.C:  			conn.Log("Saving logs") -			err = savelogs(conn, starttime, hostname) +			err = pipeline.SaveLogs(conn, starttime, hostname)  			if err != nil {  				conn.Log("Error saving logs", err)  			} -		case <-shutdownIfQuiet.C: -			if !*autoshutdown { +		case <-stopIfQuiet.C: +			if quietTime == 0 {  				continue  			} +			if !*autoshutdown { +				conn.Log("Stopping pipeline") +				_ = pipeline.SaveLogs(conn, starttime, hostname) +				return +			}  			conn.Log("Shutting down") -			_ = savelogs(conn, starttime, hostname) +			_ = pipeline.SaveLogs(conn, starttime, hostname)  			cmd := exec.Command("sudo", "systemctl", "poweroff")  			var stdout, stderr bytes.Buffer  			cmd.Stdout = &stdout diff --git a/cmd/booktopipeline/main.go b/cmd/booktopipeline/main.go index 60d1f81..b4f4d99 100644 --- a/cmd/booktopipeline/main.go +++ b/cmd/booktopipeline/main.go @@ -9,14 +9,13 @@ package main  import (  	"flag"  	"fmt" -	"image" -	_ "image/png" -	_ "image/jpeg"  	"log"  	"os"  	"path/filepath"  	"rescribe.xyz/bookpipeline" + +	"rescribe.xyz/bookpipeline/internal/pipeline"  )  const usage = `Usage: booktopipeline [-c conn] [-t training] [-prebinarised] [-notbinarised] [-v] bookdir [bookname] @@ -32,15 +31,6 @@ using the flags -prebinarised (for the wipeonly queue) or  If bookname is omitted the last part of the bookdir is used.  ` -type Pipeliner interface { -	Init() error -	PreQueueId() string -	WipeQueueId() string -	WIPStorageId() string -	AddToQueue(url string, msg string) error -	Upload(bucket string, key string, path string) error -} -  // null writer to enable non-verbose logging to be discarded  type NullWriter bool @@ -50,18 +40,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) {  var verboselog *log.Logger -type fileWalk chan string - -func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { -	if err != nil { -		return err -	} -	if !info.IsDir() { -		f <- path -	} -	return nil -} -  func main() {  	verbose := flag.Bool("v", false, "Verbose")  	conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") @@ -94,7 +72,7 @@ func main() {  		verboselog = log.New(n, "", log.LstdFlags)  	} -	var conn Pipeliner +	var conn pipeline.Pipeliner  	switch *conntype {  	case "aws":  		conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} @@ -108,18 +86,7 @@ func main() {  		log.Fatalln("Failed to set up cloud connection:", err)  	} -	qid := conn.PreQueueId() - -	// Auto detect type of queue to send to based on file extension -	pngdirs, _ := filepath.Glob(bookdir + "/*.png") -	jpgdirs, _ := filepath.Glob(bookdir + "/*.jpg") -	pngcount := len(pngdirs) -	jpgcount := len(jpgdirs) -	if pngcount > jpgcount { -		qid = conn.WipeQueueId() -	} else { -		qid = conn.PreQueueId() -	} +	qid := pipeline.DetectQueueType(bookdir, conn)  	// Flags set override the queue selection  	if *wipeonly { @@ -130,43 +97,24 @@ func main() {  	}  	verboselog.Println("Checking that all images are valid in", bookdir) -	checker := make(fileWalk) -	go func() { -		err = filepath.Walk(bookdir, checker.Walk) -		if err != nil { -			log.Fatalln("Filesystem walk failed:", err) -		} -		close(checker) -	}() - -	for path := range checker { -		f, err := os.Open(path) -		if err != nil { -			log.Fatalln("Opening image %s failed, bailing: %v", path, err) -		} -		_, _, err = image.Decode(f) -		if err != nil { -			log.Fatalf("Decoding image %s failed, bailing: %v", path, err) -		} +	err = pipeline.CheckImages(bookdir) +	if err != nil { +		log.Fatalln(err)  	} -	verboselog.Println("Walking", bookdir) -	walker := make(fileWalk) -	go func() { -		err = filepath.Walk(bookdir, walker.Walk) -		if err != nil { -			log.Fatalln("Filesystem walk failed:", err) -		} -		close(walker) -	}() - -	for path := range walker { -		verboselog.Println("Uploading", path) -		name := filepath.Base(path) -		err = conn.Upload(conn.WIPStorageId(), filepath.Join(bookname, name), path) -		if err != nil { -			log.Fatalln("Failed to upload", path, err) -		} +	verboselog.Println("Checking that a book hasn't already been uploaded with that name") +	list, err := conn.ListObjects(conn.WIPStorageId(), bookname) +	if err != nil { +		log.Fatalln(err) +	} +	if len(list) > 0 { +		log.Fatalf("Error: There is already a book in S3 named %s", bookname) +	} + +	verboselog.Println("Uploading all images are valid in", bookdir) +	err = pipeline.UploadImages(bookdir, bookname, conn) +	if err != nil { +		log.Fatalln(err)  	}  	if *training != "" { diff --git a/cmd/getbests/main.go b/cmd/getbests/main.go index 9eca0d8..c1ee50d 100644 --- a/cmd/getbests/main.go +++ b/cmd/getbests/main.go @@ -62,8 +62,8 @@ func main() {  	log.Println("Downloading all best files found")  	for _, i := range objs {  		parts := strings.Split(i, "/") -		if parts[len(parts) - 1] == "best" { -			err = conn.Download(conn.WIPStorageId(), i, parts[0] + "-best") +		if parts[len(parts)-1] == "best" { +			err = conn.Download(conn.WIPStorageId(), i, parts[0]+"-best")  			if err != nil {  				log.Fatalln("Failed to download file", i, err)  			} diff --git a/cmd/getpipelinebook/main.go b/cmd/getpipelinebook/main.go index 03e709b..ccedd72 100644 --- a/cmd/getpipelinebook/main.go +++ b/cmd/getpipelinebook/main.go @@ -6,15 +6,15 @@  package main  import ( -	"bufio"  	"flag"  	"fmt"  	"log"  	"os"  	"path/filepath" -	"strings"  	"rescribe.xyz/bookpipeline" + +	"rescribe.xyz/bookpipeline/internal/pipeline"  )  const usage = `Usage: getpipelinebook [-c conn] [-a] [-graph] [-pdf] [-png] [-v] bookname @@ -33,28 +33,6 @@ func (w NullWriter) Write(p []byte) (n int, err error) {  	return len(p), nil  } -type Pipeliner interface { -	MinimalInit() error -	ListObjects(bucket string, prefix string) ([]string, error) -	Download(bucket string, key string, fn string) error -	Upload(bucket string, key string, path string) error -	CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) -	AddToQueue(url string, msg string) error -	DelFromQueue(url string, handle string) error -	WIPStorageId() string -} - -func getpdfs(conn Pipeliner, l *log.Logger, bookname string) { -	for _, suffix := range []string{".colour.pdf", ".binarised.pdf"} { -		fn := filepath.Join(bookname, bookname+suffix) -		l.Println("Downloading PDF", fn) -		err := conn.Download(conn.WIPStorageId(), fn, fn) -		if err != nil { -			log.Printf("Failed to download %s: %s\n", fn, err) -		} -	} -} -  func main() {  	all := flag.Bool("a", false, "Get all files for book")  	conntype := flag.String("c", "aws", "connection type ('aws' or 'local')") @@ -83,7 +61,7 @@ func main() {  		verboselog = log.New(n, "", log.LstdFlags)  	} -	var conn Pipeliner +	var conn pipeline.MinPipeliner  	switch *conntype {  	case "aws":  		conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} @@ -109,18 +87,10 @@ func main() {  	if *all {  		verboselog.Println("Downloading all files for", bookname) -		objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) +		err = pipeline.DownloadAll(bookname, bookname, conn)  		if err != nil { -			log.Fatalln("Failed to get list of files for book", bookname, err) -		} -		for _, i := range objs { -			verboselog.Println("Downloading", i) -			err = conn.Download(conn.WIPStorageId(), i, i) -			if err != nil { -				log.Fatalln("Failed to download file", i, err) -			} +			log.Fatalln(err)  		} -		return  	}  	if *binarisedpdf { @@ -151,61 +121,29 @@ func main() {  	}  	if *pdf { -		getpdfs(conn, verboselog, bookname) +		verboselog.Println("Downloading PDFs") +		pipeline.DownloadPdfs(bookname, bookname, conn)  	}  	if *binarisedpdf || *colourpdf || *graph || *pdf {  		return  	} -	verboselog.Println("Downloading best file") -	fn := filepath.Join(bookname, "best") -	err = conn.Download(conn.WIPStorageId(), fn, fn) +	verboselog.Println("Downloading best pages") +	err = pipeline.DownloadBestPages(bookname, bookname, conn, *png)  	if err != nil { -		log.Fatalln("Failed to download 'best' file", err) -	} -	f, err := os.Open(fn) -	if err != nil { -		log.Fatalln("Failed to open best file", err) -	} -	defer f.Close() - -	if *png { -		verboselog.Println("Downloading png files") -		s := bufio.NewScanner(f) -		for s.Scan() { -			txtfn := filepath.Join(bookname, s.Text()) -			fn = strings.Replace(txtfn, ".hocr", ".png", 1) -			verboselog.Println("Downloading file", fn) -			err = conn.Download(conn.WIPStorageId(), fn, fn) -			if err != nil { -				log.Fatalln("Failed to download file", fn, err) -			} -		} -		return +		log.Fatalln(err)  	} -	verboselog.Println("Downloading HOCR files") -	s := bufio.NewScanner(f) -	for s.Scan() { -		fn = filepath.Join(bookname, s.Text()) -		verboselog.Println("Downloading file", fn) -		err = conn.Download(conn.WIPStorageId(), fn, fn) -		if err != nil { -			log.Fatalln("Failed to download file", fn, err) -		} +	verboselog.Println("Downloading PDFs") +	pipeline.DownloadPdfs(bookname, bookname, conn) +	if err != nil { +		log.Fatalln(err)  	} -	verboselog.Println("Downloading PDF files") -	getpdfs(conn, verboselog, bookname) - -	verboselog.Println("Downloading analysis files") -	for _, a := range []string{"conf", "graph.png"} { -		fn = filepath.Join(bookname, a) -		verboselog.Println("Downloading file", fn) -		err = conn.Download(conn.WIPStorageId(), fn, fn) -		if err != nil { -			log.Fatalln("Failed to download file", fn, err) -		} +	verboselog.Println("Downloading analyses") +	err = pipeline.DownloadAnalyses(bookname, bookname, conn) +	if err != nil { +		log.Fatalln(err)  	}  } diff --git a/cmd/logwholequeue/main.go b/cmd/logwholequeue/main.go new file mode 100644 index 0000000..71e8927 --- /dev/null +++ b/cmd/logwholequeue/main.go @@ -0,0 +1,85 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// logwholequeue gets all messages in a queue. This can be useful +// for debugging queue issues. +package main + +import ( +	"flag" +	"fmt" +	"log" + +	"rescribe.xyz/bookpipeline" +) + +const usage = `Usage: logwholequeue qname + +logwholequeue gets all messages in a queue. + +This can be useful for debugging queue issues. + +Valid queue names: +- preprocess +- wipeonly +- ocrpage +- analyse +` + +type QueuePipeliner interface { +	Init() error +	LogQueue(url string) error +	PreQueueId() string +	WipeQueueId() string +	OCRPageQueueId() string +	AnalyseQueueId() string +} + +func main() { +	flag.Usage = func() { +		fmt.Fprintf(flag.CommandLine.Output(), usage) +		flag.PrintDefaults() +	} +	flag.Parse() + +	if flag.NArg() != 1 { +		flag.Usage() +		return +	} + +	var conn QueuePipeliner +	conn = &bookpipeline.AwsConn{Region: "eu-west-2"} + +	err := conn.Init() +	if err != nil { +		log.Fatalln("Error setting up cloud connection:", err) +	} + +	qdetails := []struct { +		id, name string +	}{ +		{conn.PreQueueId(), "preprocess"}, +		{conn.WipeQueueId(), "wipeonly"}, +		{conn.OCRPageQueueId(), "ocrpage"}, +		{conn.AnalyseQueueId(), "analyse"}, +	} + +	qname := flag.Arg(0) + +	var qid string +	for i, n := range qdetails { +		if n.name == qname { +			qid = qdetails[i].id +			break +		} +	} +	if qid == "" { +		log.Fatalln("Error, no queue named", qname) +	} + +	err = conn.LogQueue(qid) +	if err != nil { +		log.Fatalln("Error getting queue", qname, ":", err) +	} +} diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index b649778..131ff12 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -12,6 +12,7 @@ import (  	"os/exec"  	"sort"  	"strings" +	"time"  	"rescribe.xyz/bookpipeline"  ) @@ -35,7 +36,7 @@ type LsPipeliner interface {  	AnalyseQueueId() string  	GetQueueDetails(url string) (string, string, error)  	GetInstanceDetails() ([]bookpipeline.InstanceDetails, error) -	ListObjectsWithMeta(bucket string, prefix string) ([]bookpipeline.ObjMeta, error) +	ListObjectWithMeta(bucket string, prefix string) (bookpipeline.ObjMeta, error)  	ListObjectPrefixes(bucket string) ([]string, error)  	WIPStorageId() string  } @@ -100,43 +101,88 @@ func (o ObjMetas) Less(i, j int) bool {  	return o[i].Date.Before(o[j].Date)  } +// getBookDetails determines whether a book is done and what date +// it was completed, or if it has not finished, the date of any +// book file. +func getBookDetails(conn LsPipeliner, key string) (date time.Time, done bool, err error) { +	// First try to get the graph.png file from the book, which marks +	// it as done +	obj, err := conn.ListObjectWithMeta(conn.WIPStorageId(), key+"graph.png") +	if err == nil { +		return obj.Date, true, nil +	} + +	// Otherwise get any file from the book to get a date to sort by +	obj, err = conn.ListObjectWithMeta(conn.WIPStorageId(), key) +	if err != nil { +		return time.Time{}, false, err +	} +	return obj.Date, false, nil +} + +// getBookDetailsChan gets the details for a book putting it into either the +// done or inprogress channels as appropriate, or sending an error to errc +// on failure. +func getBookDetailsChan(conn LsPipeliner, key string, done chan bookpipeline.ObjMeta, inprogress chan bookpipeline.ObjMeta, errc chan error) { +	date, isdone, err := getBookDetails(conn, key) +	if err != nil { +		errc <- err +		return +	} +	meta := bookpipeline.ObjMeta{Name: strings.TrimSuffix(key, "/"), Date: date} +	if isdone { +		done <- meta +	} else { +		inprogress <- meta +	} +} +  // getBookStatus returns a list of in progress and done books.  // It determines this by finding all prefixes, and splitting them  // into two lists, those which have a 'graph.png' file (the done  // list), and those which do not (the inprogress list). They are  // sorted according to the date of the graph.png file, or the date  // of a random file with the prefix if no graph.png was found. +// It spins up many goroutines to do query the book status and +// dates, as it is far faster to do concurrently.  func getBookStatus(conn LsPipeliner) (inprogress []string, done []string, err error) {  	prefixes, err := conn.ListObjectPrefixes(conn.WIPStorageId()) -	var inprogressmeta, donemeta ObjMetas  	if err != nil {  		log.Println("Error getting object prefixes:", err)  		return  	} -	// Search for graph.png to determine done books (and save the date of it to sort with) + +	donec := make(chan bookpipeline.ObjMeta, 100) +	inprogressc := make(chan bookpipeline.ObjMeta, 100) +	errc := make(chan error) +  	for _, p := range prefixes { -		objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), p+"graph.png") -		if err != nil || len(objs) == 0 { -			inprogressmeta = append(inprogressmeta, bookpipeline.ObjMeta{Name: p}) -		} else { -			donemeta = append(donemeta, bookpipeline.ObjMeta{Name: p, Date: objs[0].Date}) -		} +		go getBookDetailsChan(conn, p, donec, inprogressc, errc)  	} -	// Get a random file from the inprogress list to get a date to sort by -	for _, i := range inprogressmeta { -		objs, err := conn.ListObjectsWithMeta(conn.WIPStorageId(), i.Name) -		if err != nil || len(objs) == 0 { -			continue + +	var inprogressmeta, donemeta ObjMetas + +	// there will be exactly as many sends to donec or inprogressc +	// as there are prefixes +	for range prefixes { +		select { +		case i := <-donec: +			donemeta = append(donemeta, i) +		case i := <-inprogressc: +			inprogressmeta = append(inprogressmeta, i) +		case err = <-errc: +			return inprogress, done, err  		} -		i.Date = objs[0].Date  	} +  	sort.Sort(donemeta) +	sort.Sort(inprogressmeta) +  	for _, i := range donemeta { -		done = append(done, strings.TrimSuffix(i.Name, "/")) +		done = append(done, i.Name)  	} -	sort.Sort(inprogressmeta)  	for _, i := range inprogressmeta { -		inprogress = append(inprogress, strings.TrimSuffix(i.Name, "/")) +		inprogress = append(inprogress, i.Name)  	}  	return diff --git a/cmd/postprocess-bythresh/main.go b/cmd/postprocess-bythresh/main.go index 37b77e7..5bdb839 100644 --- a/cmd/postprocess-bythresh/main.go +++ b/cmd/postprocess-bythresh/main.go @@ -19,7 +19,6 @@ import (  //TO DO: make writetofile return an error and handle that accordingly  // potential TO DO: add text versions where footer is cropped on odd/even pages only -  // the trimblanks function trims the blank lines from a text input  func trimblanks(hocrfile string) string { @@ -50,7 +49,7 @@ func dehyphenateString(in string) string {  		words := strings.Split(line, " ")  		last := words[len(words)-1]  		// the - 2 here is to account for a trailing newline and counting from zero -		if len(last) > 0 && last[len(last) - 1] == '-' && i < len(lines) - 2 { +		if len(last) > 0 && last[len(last)-1] == '-' && i < len(lines)-2 {  			nextwords := strings.Split(lines[i+1], " ")  			if len(nextwords) > 0 {  				line = line[0:len(line)-1] + nextwords[0] @@ -66,17 +65,15 @@ func dehyphenateString(in string) string {  	return strings.Join(newlines, " ")  } -  // the fullcrop function takes a text input and crops the first and the last line (if text is at least 2 lines long)  func fullcrop(noblanks string) string { -  	alllines := strings.Split(noblanks, "\n") -	 +  	if len(alllines) <= 2 { -	return "" -	}	else { -	return strings.Join(alllines[1:len(alllines)-2], "\n") +		return "" +	} else { +		return strings.Join(alllines[1:len(alllines)-2], "\n")  	}  } @@ -132,7 +129,6 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string,  	var killheadtxt string  	var footkilltxt string -  	hocrfilepath := filepath.Join(bookdirectory, hocrfilename)  	confpath := filepath.Join(bookdirectory, "conf") @@ -165,18 +161,16 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string,  		if err != nil {  			log.Fatal(err)  		} -		 -		 +  		trimbest := trimblanks(hocrfiletext) -		 +  		alltxt = dehyphenateString(trimbest) -			 +  		croptxt = dehyphenateString(fullcrop(trimbest)) -	 +  		killheadtxt = dehyphenateString(headcrop(trimbest)) -		 +  		footkilltxt = dehyphenateString(footcrop(trimbest)) -		  	}  	return alltxt, croptxt, killheadtxt, footkilltxt @@ -185,7 +179,7 @@ func convertselect(bookdirectory, hocrfilename string, confthresh int) (string,  // the writetofile function takes a directory, filename and text input and creates a text file within the bookdirectory from them.  func writetofile(bookdirectory, textfilebase, txt string) error {  	alltxtfile := filepath.Join(bookdirectory, textfilebase) -	 +  	file, err := os.Create(alltxtfile)  	if err != nil {  		return fmt.Errorf("Error opening file %s: %v", alltxtfile, err) @@ -194,7 +188,7 @@ func writetofile(bookdirectory, textfilebase, txt string) error {  	if _, err := file.WriteString(txt); err != nil {  		log.Println(err)  	} -return err +	return err  } @@ -215,7 +209,7 @@ func main() {  	bookdirectory := flag.Arg(0)  	confthreshstring := strconv.Itoa(*confthresh) -	 +  	fmt.Println("Postprocessing", bookdirectory, "with threshold", *confthresh)  	bestpath := filepath.Join(bookdirectory, "best") @@ -239,32 +233,31 @@ func main() {  			crop = crop + " " + croptxt  			killhead = killhead + " " + killheadtxt  			killfoot = killfoot + " " + footkilltxt -		 +  		}  	} -	 -	 -	bookname:= filepath.Base(bookdirectory) -		b := bookname + "_" + confthreshstring -		err1 := writetofile(bookdirectory, b + "_all.txt", all) -		if err1 != nil { +	bookname := filepath.Base(bookdirectory) +	b := bookname + "_" + confthreshstring + +	err1 := writetofile(bookdirectory, b+"_all.txt", all) +	if err1 != nil {  		log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err1) -		} -		 -		err2 := writetofile(bookdirectory, b + "_crop.txt", crop) -		if err2 != nil { +	} + +	err2 := writetofile(bookdirectory, b+"_crop.txt", crop) +	if err2 != nil {  		log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err2) -		} -		 -		err3 := writetofile(bookdirectory, b + "_nohead.txt", killhead) -		if err3 != nil { +	} + +	err3 := writetofile(bookdirectory, b+"_nohead.txt", killhead) +	if err3 != nil {  		log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err3) -		} -		 -		err4 := writetofile(bookdirectory, b + "_nofoot.txt", killfoot) -		if err4 != nil { +	} + +	err4 := writetofile(bookdirectory, b+"_nofoot.txt", killfoot) +	if err4 != nil {  		log.Fatalf("Ah shit, we're going down, Nick says ABORT! %v", err4) -		} +	}  } diff --git a/cmd/rescribe/main.go b/cmd/rescribe/main.go new file mode 100644 index 0000000..07eeaf0 --- /dev/null +++ b/cmd/rescribe/main.go @@ -0,0 +1,395 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// rescribe is a modification of bookpipeline designed for local-only +// operation, which rolls uploading, processing, and downloading of +// a single book by the pipeline into one command. +package main + +import ( +	"flag" +	"fmt" +	"io/ioutil" +	"log" +	"os" +	"os/exec" +	"path/filepath" +	"regexp" +	"runtime" +	"strings" +	"time" + +	"rescribe.xyz/bookpipeline" +	"rescribe.xyz/utils/pkg/hocr" + +	"rescribe.xyz/bookpipeline/internal/pipeline" +) + +const usage = `Usage: rescribe [-v] [-t training] bookdir [savedir] + +Process and OCR a book using the Rescribe pipeline on a local machine. + +OCR results are saved into the bookdir directory unless savedir is +specified. +` + +const QueueTimeoutSecs = 2 * 60 +const PauseBetweenChecks = 1 * time.Second +const LogSaveTime = 1 * time.Minute +var thresholds = []float64{0.1, 0.2, 0.3} + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { +	return len(p), nil +} + +type Clouder interface { +	Init() error +	ListObjects(bucket string, prefix string) ([]string, error) +	Download(bucket string, key string, fn string) error +	Upload(bucket string, key string, path string) error +	CheckQueue(url string, timeout int64) (bookpipeline.Qmsg, error) +	AddToQueue(url string, msg string) error +	DelFromQueue(url string, handle string) error +	QueueHeartbeat(msg bookpipeline.Qmsg, qurl string, duration int64) (bookpipeline.Qmsg, error) +} + +type Pipeliner interface { +	Clouder +	PreQueueId() string +	WipeQueueId() string +	OCRPageQueueId() string +	AnalyseQueueId() string +	WIPStorageId() string +	GetLogger() *log.Logger +	Log(v ...interface{}) +} + +func stopTimer(t *time.Timer) { +	if !t.Stop() { +		<-t.C +	} +} + +func resetTimer(t *time.Timer, d time.Duration) { +	if d > 0 { +		t.Reset(d) +	} +} + +func main() { +	deftesscmd := "tesseract" +	if runtime.GOOS == "windows" { +		deftesscmd = "C:\\Program Files\\Tesseract-OCR\\tesseract.exe" +	} + +	verbose := flag.Bool("v", false, "verbose") +	training := flag.String("t", "training/rescribev7_fast.traineddata", "path to the tesseract training file to use") +	tesscmd := flag.String("tesscmd", deftesscmd, "The Tesseract executable to run. You may need to set this to the full path of Tesseract.exe if you're on Windows.") + +	flag.Usage = func() { +		fmt.Fprintf(flag.CommandLine.Output(), usage) +		flag.PrintDefaults() +	} +	flag.Parse() + +	if flag.NArg() < 1 || flag.NArg() > 2 { +		flag.Usage() +		return +	} + +	bookdir := flag.Arg(0) +	bookname := filepath.Base(bookdir) +	savedir := bookdir +	if flag.NArg() > 1 { +		savedir = flag.Arg(1) +	} + +	var verboselog *log.Logger +	if *verbose { +		verboselog = log.New(os.Stdout, "", 0) +	} else { +		var n NullWriter +		verboselog = log.New(n, "", 0) +	} + +	f, err := os.Open(*training) +	if err != nil { +		fmt.Fprintf(os.Stderr, "Error: Training file %s could not be opened.\n", *training) +		fmt.Fprintf(os.Stderr, "Set the `-t` flag with path to a tesseract .traineddata file.\n") +		os.Exit(1) +	} +	f.Close() + +	abstraining, err := filepath.Abs(*training) +	if err != nil { +		log.Fatalf("Error getting absolute path of training %s: %v", err) +	} +	tessPrefix, trainingName := filepath.Split(abstraining) +	trainingName = strings.TrimSuffix(trainingName, ".traineddata") +	err = os.Setenv("TESSDATA_PREFIX", tessPrefix) +	if err != nil { +		log.Fatalln("Error setting TESSDATA_PREFIX:", err) +	} + +	_, err = exec.Command(*tesscmd, "--help").Output() +	if err != nil { +		fmt.Fprintf(os.Stderr, "Error: Can't run Tesseract.\n") +		fmt.Fprintf(os.Stderr, "Ensure that Tesseract is installed and available.\n") +		fmt.Fprintf(os.Stderr, "You may need to -tesscmd to the full path of Tesseract.exe if you're on Windows, like this:\n") +		fmt.Fprintf(os.Stderr, "  rescribe -tesscmd 'C:\\Program Files\\Tesseract OCR (x86)\\tesseract.exe' ...\n") +		os.Exit(1) +	} + +	tempdir, err := ioutil.TempDir("", "bookpipeline") +	if err != nil { +		log.Fatalln("Error setting up temporary directory:", err) +	} + +	var conn Pipeliner +	conn = &bookpipeline.LocalConn{Logger: verboselog, TempDir: tempdir} + +	conn.Log("Setting up session") +	err = conn.Init() +	if err != nil { +		log.Fatalln("Error setting up connection:", err) +	} +	conn.Log("Finished setting up session") + +	fmt.Printf("Copying book to pipeline\n") + +	err = uploadbook(bookdir, bookname, conn) +	if err != nil { +		_ = os.RemoveAll(tempdir) +		log.Fatalln(err) +	} + +	fmt.Printf("Processing book\n") +	err = processbook(trainingName, *tesscmd, conn) +	if err != nil { +		_ = os.RemoveAll(tempdir) +		log.Fatalln(err) +	} + +	fmt.Printf("Saving finished book to %s\n", savedir) +	err = os.MkdirAll(savedir, 0755) +	if err != nil { +		log.Fatalf("Error creating save directory %s: %v", savedir, err) +	} +	err = downloadbook(savedir, bookname, conn) +	if err != nil { +		_ = os.RemoveAll(tempdir) +		log.Fatalln(err) +	} + +	err = os.RemoveAll(tempdir) +	if err != nil { +		log.Fatalf("Error removing temporary directory %s: %v", tempdir, err) +	} + +	hocrs, err := filepath.Glob(fmt.Sprintf("%s%s*.hocr", savedir, string(filepath.Separator))) +	if err != nil { +		log.Fatalf("Error looking for .hocr files: %v", err) +	} + +	for _, v := range hocrs { +		err = addTxtVersion(v) +		if err != nil { +			log.Fatalf("Error creating txt version of %s: %v", v, err) +		} + +		err = os.MkdirAll(filepath.Join(savedir, "hocr"), 0755) +		if err != nil { +			log.Fatalf("Error creating hocr directory: %v", err) +		} + +		err = os.Rename(v, filepath.Join(savedir, "hocr", filepath.Base(v))) +		if err != nil { +			log.Fatalf("Error moving hocr %s to hocr directory: %v", v, err) +		} +	} + +	// For simplicity, remove .binarised.pdf and rename .colour.pdf to .pdf +	_ = os.Remove(filepath.Join(savedir, bookname + ".binarised.pdf")) +	_ = os.Rename(filepath.Join(savedir, bookname + ".colour.pdf"), filepath.Join(savedir, bookname + ".pdf")) +} + +func addTxtVersion(hocrfn string) error { +	dir := filepath.Dir(hocrfn) +	err := os.MkdirAll(filepath.Join(dir, "text"), 0755) +	if err != nil { +		log.Fatalf("Error creating text directory: %v", err) +	} + +	t, err := hocr.GetText(hocrfn) +	if err != nil { +		return fmt.Errorf("Error getting text from hocr file %s: %v", hocrfn, err) +	} + +	basefn := filepath.Base(hocrfn) +	for _, v := range thresholds { +		basefn = strings.TrimSuffix(basefn, fmt.Sprintf("_bin%.1f.hocr", v)) +	} +	fn := filepath.Join(dir, "text", basefn + ".txt") + +	err = ioutil.WriteFile(fn, []byte(t), 0644) +	if err != nil { +		return fmt.Errorf("Error creating text file %s: %v", fn, err) +	} + +	return nil +} + +func uploadbook(dir string, name string, conn Pipeliner) error { +	err := pipeline.CheckImages(dir) +	if err != nil { +		return fmt.Errorf("Error with images in %s: %v", dir, err) +	} +	err = pipeline.UploadImages(dir, name, conn) +	if err != nil { +		return fmt.Errorf("Error saving images to process from %s: %v", dir, err) +	} + +	qid := pipeline.DetectQueueType(dir, conn) + +	err = conn.AddToQueue(qid, name) +	if err != nil { +		return fmt.Errorf("Error adding book job to queue %s: %v", qid, err) +	} + +	return nil +} + +func downloadbook(dir string, name string, conn Pipeliner) error { +	err := os.MkdirAll(name, 0755) +	if err != nil { +		log.Fatalln("Failed to create directory", name, err) +	} + +	err = pipeline.DownloadBestPages(dir, name, conn, false) +	if err != nil { +		return fmt.Errorf("Error downloading best pages: %v", err) +	} + +	err = pipeline.DownloadPdfs(dir, name, conn) +	if err != nil { +		return fmt.Errorf("Error downloading PDFs: %v", err) +	} + +	err = pipeline.DownloadAnalyses(dir, name, conn) +	if err != nil { +		return fmt.Errorf("Error downloading analyses: %v", err) +	} + +	return nil +} + +func processbook(training string, tesscmd string, conn Pipeliner) error { +	origPattern := regexp.MustCompile(`[0-9]{4}.jpg$`) +	wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`) +	ocredPattern := regexp.MustCompile(`.hocr$`) + +	var checkPreQueue <-chan time.Time +	var checkWipeQueue <-chan time.Time +	var checkOCRPageQueue <-chan time.Time +	var checkAnalyseQueue <-chan time.Time +	var stopIfQuiet *time.Timer +	checkPreQueue = time.After(0) +	checkWipeQueue = time.After(0) +	checkOCRPageQueue = time.After(0) +	checkAnalyseQueue = time.After(0) +	var quietTime = 1 * time.Second +	stopIfQuiet = time.NewTimer(quietTime) +	if quietTime == 0 { +		stopIfQuiet.Stop() +	} + +	for { +		select { +		case <-checkPreQueue: +			msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs) +			checkPreQueue = time.After(PauseBetweenChecks) +			if err != nil { +				return fmt.Errorf("Error checking preprocess queue: %v", err) +			} +			if msg.Handle == "" { +				conn.Log("No message received on preprocess queue, sleeping") +				continue +			} +			stopTimer(stopIfQuiet) +			conn.Log("Message received on preprocess queue, processing", msg.Body) +			fmt.Printf("  Preprocessing book (binarising and wiping)\n") +			err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess(thresholds), origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) +			fmt.Printf("  OCRing pages ") // this is expected to be added to with dots by OCRPage output +			resetTimer(stopIfQuiet, quietTime) +			if err != nil { +				return fmt.Errorf("Error during preprocess: %v", err) +			} +		case <-checkWipeQueue: +			msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs) +			checkWipeQueue = time.After(PauseBetweenChecks) +			if err != nil { +				return fmt.Errorf("Error checking wipeonly queue, %v", err) +			} +			if msg.Handle == "" { +				conn.Log("No message received on wipeonly queue, sleeping") +				continue +			} +			stopTimer(stopIfQuiet) +			conn.Log("Message received on wipeonly queue, processing", msg.Body) +			fmt.Printf("  Preprocessing book (wiping only)\n") +			err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) +			fmt.Printf("  OCRing pages ") // this is expected to be added to with dots by OCRPage output +			resetTimer(stopIfQuiet, quietTime) +			if err != nil { +				return fmt.Errorf("Error during wipe: %v", err) +			} +		case <-checkOCRPageQueue: +			msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs) +			checkOCRPageQueue = time.After(PauseBetweenChecks) +			if err != nil { +				return fmt.Errorf("Error checking OCR Page queue: %v", err) +			} +			if msg.Handle == "" { +				continue +			} +			// Have OCRPageQueue checked immediately after completion, as chances are high that +			// there will be more pages that should be done without delay +			checkOCRPageQueue = time.After(0) +			stopTimer(stopIfQuiet) +			conn.Log("Message received on OCR Page queue, processing", msg.Body) +			fmt.Printf(".") +			err = pipeline.OcrPage(msg, conn, pipeline.Ocr(training, tesscmd), conn.OCRPageQueueId(), conn.AnalyseQueueId()) +			resetTimer(stopIfQuiet, quietTime) +			if err != nil { +				return fmt.Errorf("\nError during OCR Page process: %v", err) +			} +		case <-checkAnalyseQueue: +			msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs) +			checkAnalyseQueue = time.After(PauseBetweenChecks) +			if err != nil { +				return fmt.Errorf("Error checking analyse queue: %v", err) +			} +			if msg.Handle == "" { +				conn.Log("No message received on analyse queue, sleeping") +				continue +			} +			stopTimer(stopIfQuiet) +			conn.Log("Message received on analyse queue, processing", msg.Body) +			fmt.Printf("\n  Analysing OCR and compiling PDFs\n") +			err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") +			resetTimer(stopIfQuiet, quietTime) +			if err != nil { +				return fmt.Errorf("Error during analysis: %v", err) +			} +		case <-stopIfQuiet.C: +			conn.Log("Processing finished") +			return nil +		} +	} + +	return fmt.Errorf("Ended unexpectedly") // should never be reached +} diff --git a/cmd/rmbook/main.go b/cmd/rmbook/main.go new file mode 100644 index 0000000..fcacc2e --- /dev/null +++ b/cmd/rmbook/main.go @@ -0,0 +1,87 @@ +// Copyright 2020 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// rmbook removes a book from cloud storage. +package main + +import ( +	"flag" +	"fmt" +	"log" + +	"rescribe.xyz/bookpipeline" +) + +const usage = `Usage: rmbook [-dryrun] bookname + +Removes a book from cloud storage. +` + +// null writer to enable non-verbose logging to be discarded +type NullWriter bool + +func (w NullWriter) Write(p []byte) (n int, err error) { +	return len(p), nil +} + +type RmPipeliner interface { +        MinimalInit() error +        WIPStorageId() string +	DeleteObjects(bucket string, keys []string) error +	ListObjects(bucket string, prefix string) ([]string, error) +} + +func main() { +	dryrun := flag.Bool("dryrun", false, "print which files would be deleted but don't delete") +	flag.Usage = func() { +		fmt.Fprintf(flag.CommandLine.Output(), usage) +		flag.PrintDefaults() +	} +	flag.Parse() + +	if flag.NArg() < 1 { +		flag.Usage() +		return +	} + +	var n NullWriter +	verboselog := log.New(n, "", log.LstdFlags) + +	var conn RmPipeliner +	conn = &bookpipeline.AwsConn{Region: "eu-west-2", Logger: verboselog} + +	fmt.Println("Setting up cloud connection") +	err := conn.MinimalInit() +	if err != nil { +		log.Fatalln("Error setting up cloud connection:", err) +	} + +	bookname := flag.Arg(0) + "/" + +	fmt.Println("Getting list of files for book") +	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) +	if err != nil { +		log.Fatalln("Error in listing book items:", err) +	} + +	if len(objs) == 0 { +		log.Fatalln("No files found for book:", bookname) +	} + +	if *dryrun { +		fmt.Printf("I would delete these files:\n") +		for _, v := range objs { +			fmt.Println(v) +		} +		return +	} + +	fmt.Println("Deleting all files for book") +	err = conn.DeleteObjects(conn.WIPStorageId(), objs) +	if err != nil { +		log.Fatalln("Error deleting book files:", err) +	} + +	fmt.Println("Finished deleting files") +} diff --git a/cmd/trimqueue/main.go b/cmd/trimqueue/main.go new file mode 100644 index 0000000..cf65c4d --- /dev/null +++ b/cmd/trimqueue/main.go @@ -0,0 +1,84 @@ +// Copyright 2019 Nick White. +// Use of this source code is governed by the GPLv3 +// license that can be found in the LICENSE file. + +// trimqueue deletes any messages in a queue that match a specified +// prefix. +package main + +import ( +	"flag" +	"fmt" +	"log" + +	"rescribe.xyz/bookpipeline" +) + +const usage = `Usage: trimprefix qname prefix + +trimqueue deletes any messages in a queue that match a specified +prefix. + +Valid queue names: +- preprocess +- wipeonly +- ocrpage +- analyse +` + +type QueuePipeliner interface { +	Init() error +	RemovePrefixesFromQueue(url string, prefix string) error +	PreQueueId() string +	WipeQueueId() string +	OCRPageQueueId() string +	AnalyseQueueId() string +} + +func main() { +	flag.Usage = func() { +		fmt.Fprintf(flag.CommandLine.Output(), usage) +		flag.PrintDefaults() +	} +	flag.Parse() + +	if flag.NArg() != 2 { +		flag.Usage() +		return +	} + +	var conn QueuePipeliner +	conn = &bookpipeline.AwsConn{Region: "eu-west-2"} + +	err := conn.Init() +	if err != nil { +		log.Fatalln("Error setting up cloud connection:", err) +	} + +	qdetails := []struct { +		id, name string +	}{ +		{conn.PreQueueId(), "preprocess"}, +		{conn.WipeQueueId(), "wipeonly"}, +		{conn.OCRPageQueueId(), "ocrpage"}, +		{conn.AnalyseQueueId(), "analyse"}, +	} + +	qname := flag.Arg(0) + +	var qid string +	for i, n := range qdetails { +		if n.name == qname { +			qid = qdetails[i].id +			break +		} +	} +	if qid == "" { +		log.Fatalln("Error, no queue named", qname) +	} + +	err = conn.RemovePrefixesFromQueue(qid, flag.Arg(1)) +	if err != nil { +		log.Fatalln("Error removing prefixes from queue", qname, ":", err) +	} +} | 
