diff options
| author | Nick White <git@njw.name> | 2020-11-09 16:46:43 +0000 | 
|---|---|---|
| committer | Nick White <git@njw.name> | 2020-11-09 16:46:43 +0000 | 
| commit | 4c7cdeb5646e84af3f15d4a7cd48f64d8086a6b9 (patch) | |
| tree | a0a186d5ac1fbdf491a57f5ef25758deb5e51c8d /cmd | |
| parent | fa1593d4b9b5f5e2b9f46137739557e29f65c765 (diff) | |
[bookpipeline] Split most functionality out to package internal/pipeline
No functionality changes, but this should make it easier to make custom
builds using the pipeline in slightly different ways.
Diffstat (limited to 'cmd')
| -rw-r--r-- | cmd/bookpipeline/main.go | 711 | 
1 files changed, 15 insertions, 696 deletions
diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 5a6606d..aff7b87 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -11,20 +11,15 @@ import (  	"bytes"  	"flag"  	"fmt" -	"io/ioutil"  	"log" -	"net/smtp"  	"os"  	"os/exec" -	"path/filepath"  	"regexp" -	"sort" -	"strings"  	"time"  	"rescribe.xyz/bookpipeline" -	"rescribe.xyz/preproc" -	"rescribe.xyz/utils/pkg/hocr" + +	"rescribe.xyz/bookpipeline/internal/pipeline"  )  const usage = `Usage: bookpipeline [-v] [-c conn] [-np] [-nw] [-nop] [-na] [-t training] [-shutdown true/false] [-autostop secs] @@ -47,9 +42,9 @@ this put a text file in {UserConfigDir}/bookpipeline/mailsettings with  the contents: {smtpserver} {port} {username} {password} {from} {to}  ` +const QueueTimeoutSecs = 2 * 60  const PauseBetweenChecks = 3 * time.Minute  const LogSaveTime = 1 * time.Minute -const HeartbeatSeconds = 60  // null writer to enable non-verbose logging to be discarded  type NullWriter bool @@ -80,638 +75,6 @@ type Pipeliner interface {  	Log(v ...interface{})  } -type pageimg struct { -	hocr, img string -} - -type mailSettings struct { -	server, port, user, pass, from, to string -} - -func getMailSettings() (mailSettings, error) { -	p := filepath.Join(os.Getenv("HOME"), ".config", "bookpipeline", "mailsettings") -	b, err := ioutil.ReadFile(p) -	if err != nil { -		return mailSettings{}, fmt.Errorf("Error reading mailsettings from %s: %v", p, err) -	} -	f := strings.Fields(string(b)) -	if len(f) != 6 { -		return mailSettings{}, fmt.Errorf("Error parsing mailsettings, need %d fields, got %d", 6, len(f)) -	} -	return mailSettings{f[0], f[1], f[2], f[3], f[4], f[5]}, nil -} - -func download(dl chan string, process chan string, conn Pipeliner, dir string, errc chan error, logger *log.Logger) { -	for key := range dl { -		fn := filepath.Join(dir, filepath.Base(key)) -		logger.Println("Downloading", key) -		err := conn.Download(conn.WIPStorageId(), key, fn) -		if err != nil { -			for range dl { -			} // consume the rest of the receiving channel so it isn't blocked -			close(process) -			errc <- err -			return -		} -		process <- fn -	} -	close(process) -} - -func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { -	for path := range c { -		name := filepath.Base(path) -		key := bookname + "/" + name -		logger.Println("Uploading", key) -		err := conn.Upload(conn.WIPStorageId(), key, path) -		if err != nil { -			for range c { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -		err = os.Remove(path) -		if err != nil { -			for range c { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -	} - -	done <- true -} - -func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, training string, errc chan error, logger *log.Logger) { -	for path := range c { -		name := filepath.Base(path) -		key := bookname + "/" + name -		logger.Println("Uploading", key) -		err := conn.Upload(conn.WIPStorageId(), key, path) -		if err != nil { -			for range c { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -		err = os.Remove(path) -		if err != nil { -			for range c { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -		logger.Println("Adding", key, training, "to queue", toQueue) -		err = conn.AddToQueue(toQueue, key+" "+training) -		if err != nil { -			for range c { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -	} - -	done <- true -} - -func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) { -	for path := range pre { -		logger.Println("Preprocessing", path) -		done, err := preproc.PreProcMulti(path, []float64{0.1, 0.2, 0.4, 0.5}, "binary", 0, true, 5, 30, 120, 30) -		if err != nil { -			for range pre { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -		_ = os.Remove(path) -		for _, p := range done { -			up <- p -		} -	} -	close(up) -} - -func wipe(towipe chan string, up chan string, errc chan error, logger *log.Logger) { -	for path := range towipe { -		logger.Println("Wiping", path) -		s := strings.Split(path, ".") -		base := strings.Join(s[:len(s)-1], "") -		outpath := base + "_bin0.0.png" -		err := preproc.WipeFile(path, outpath, 5, 0.03, 30, 120, 0.005, 30) -		if err != nil { -			for range towipe { -			} // consume the rest of the receiving channel so it isn't blocked -			errc <- err -			return -		} -		up <- outpath -	} -	close(up) -} - -func ocr(training string) func(chan string, chan string, chan error, *log.Logger) { -	return func(toocr chan string, up chan string, errc chan error, logger *log.Logger) { -		for path := range toocr { -			logger.Println("OCRing", path) -			name := strings.Replace(path, ".png", "", 1) -			cmd := exec.Command("tesseract", "-l", training, path, name, "-c", "tessedit_create_hocr=1", "-c", "hocr_font_info=0") -			var stdout, stderr bytes.Buffer -			cmd.Stdout = &stdout -			cmd.Stderr = &stderr -			err := cmd.Run() -			if err != nil { -				for range toocr { -				} // consume the rest of the receiving channel so it isn't blocked -				errc <- fmt.Errorf("Error ocring %s with training %s: %s\nStdout: %s\nStderr: %s\n", path, training, err, stdout.String(), stderr.String()) -				return -			} -			up <- name + ".hocr" -		} -		close(up) -	} -} - -func analyse(conn Pipeliner) func(chan string, chan string, chan error, *log.Logger) { -	return func(toanalyse chan string, up chan string, errc chan error, logger *log.Logger) { -		confs := make(map[string][]*bookpipeline.Conf) -		bestconfs := make(map[string]*bookpipeline.Conf) -		savedir := "" - -		for path := range toanalyse { -			if savedir == "" { -				savedir = filepath.Dir(path) -			} -			logger.Println("Calculating confidence for", path) -			avg, err := hocr.GetAvgConf(path) -			if err != nil && err.Error() == "No words found" { -				continue -			} -			if err != nil { -				for range toanalyse { -				} // consume the rest of the receiving channel so it isn't blocked -				errc <- fmt.Errorf("Error retreiving confidence for %s: %s", path, err) -				return -			} -			base := filepath.Base(path) -			codestart := strings.Index(base, "_bin") -			name := base[0:codestart] -			var c bookpipeline.Conf -			c.Path = path -			c.Code = base[codestart:] -			c.Conf = avg -			confs[name] = append(confs[name], &c) -		} - -		fn := filepath.Join(savedir, "conf") -		logger.Println("Saving confidences in file", fn) -		f, err := os.Create(fn) -		if err != nil { -			errc <- fmt.Errorf("Error creating file %s: %s", fn, err) -			return -		} -		defer f.Close() - -		logger.Println("Finding best confidence for each page, and saving all confidences") -		for base, conf := range confs { -			var best float64 -			for _, c := range conf { -				if c.Conf > best { -					best = c.Conf -					bestconfs[base] = c -				} -				_, err = fmt.Fprintf(f, "%s\t%02.f\n", c.Path, c.Conf) -				if err != nil { -					errc <- fmt.Errorf("Error writing confidences file: %s", err) -					return -				} -			} -		} -		up <- fn - -		logger.Println("Creating best file listing the best file for each page") -		fn = filepath.Join(savedir, "best") -		f, err = os.Create(fn) -		if err != nil { -			errc <- fmt.Errorf("Error creating file %s: %s", fn, err) -			return -		} -		defer f.Close() -		for _, conf := range bestconfs { -			_, err = fmt.Fprintf(f, "%s\n", filepath.Base(conf.Path)) -		} -		up <- fn - -		var pgs []string -		for _, conf := range bestconfs { -			pgs = append(pgs, conf.Path) -		} -		sort.Strings(pgs) - -		logger.Println("Downloading binarised and original images to create PDFs") -		bookname, err := filepath.Rel(os.TempDir(), savedir) -		if err != nil { -			errc <- fmt.Errorf("Failed to do filepath.Rel of %s to %s: %s", os.TempDir(), savedir, err) -			return -		} -		colourpdf := new(bookpipeline.Fpdf) -		err = colourpdf.Setup() -		if err != nil { -			errc <- fmt.Errorf("Failed to set up PDF: %s", err) -			return -		} -		binarisedpdf := new(bookpipeline.Fpdf) -		err = binarisedpdf.Setup() -		if err != nil { -			errc <- fmt.Errorf("Failed to set up PDF: %s", err) -			return -		} -		binhascontent, colourhascontent := false, false - -		var colourimgs, binimgs []pageimg - -		for _, pg := range pgs { -			base := filepath.Base(pg) -			nosuffix := strings.TrimSuffix(base, ".hocr") -			p := strings.SplitN(base, "_bin", 2) - -			var fn string -			if len(p) > 1 { -				fn = p[0] + ".jpg" -			} else { -				fn = nosuffix + ".jpg" -			} - -			binimgs = append(binimgs, pageimg{hocr: base, img: nosuffix + ".png"}) -			colourimgs = append(colourimgs, pageimg{hocr: base, img: fn}) -		} - -		for _, pg := range binimgs { -			logger.Println("Downloading binarised page to add to PDF", pg.img) -			err := conn.Download(conn.WIPStorageId(), bookname+"/"+pg.img, filepath.Join(savedir, pg.img)) -			if err != nil { -				logger.Println("Download failed; skipping page", pg.img) -			} else { -				err = binarisedpdf.AddPage(filepath.Join(savedir, pg.img), filepath.Join(savedir, pg.hocr), true) -				if err != nil { -					errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) -					return -				} -				binhascontent = true -				err = os.Remove(filepath.Join(savedir, pg.img)) -				if err != nil { -					errc <- err -					return -				} -			} -		} - -		if binhascontent { -			fn = filepath.Join(savedir, bookname+".binarised.pdf") -			err = binarisedpdf.Save(fn) -			if err != nil { -				errc <- fmt.Errorf("Failed to save binarised pdf: %s", err) -				return -			} -			up <- fn -			key := bookname + "/" + bookname + ".binarised.pdf" -			conn.Log("Uploading", key) -			err := conn.Upload(conn.WIPStorageId(), key, fn) -			if err != nil { -			} -		} - -		for _, pg := range colourimgs { -			logger.Println("Downloading colour page to add to PDF", pg.img) -			colourfn := pg.img -			err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) -			if err != nil { -				colourfn = strings.Replace(pg.img, ".jpg", ".png", 1) -				logger.Println("Download failed; trying", colourfn) -				err = conn.Download(conn.WIPStorageId(), bookname+"/"+colourfn, filepath.Join(savedir, colourfn)) -				if err != nil { -					logger.Println("Download failed; skipping page", pg.img) -				} -			} -			if err == nil { -				err = colourpdf.AddPage(filepath.Join(savedir, colourfn), filepath.Join(savedir, pg.hocr), true) -				if err != nil { -					errc <- fmt.Errorf("Failed to add page %s to PDF: %s", pg.img, err) -					return -				} -				colourhascontent = true -				err = os.Remove(filepath.Join(savedir, colourfn)) -				if err != nil { -					errc <- err -					return -				} -			} -		} -		if colourhascontent { -			fn = filepath.Join(savedir, bookname+".colour.pdf") -			err = colourpdf.Save(fn) -			if err != nil { -				errc <- fmt.Errorf("Failed to save colour pdf: %s", err) -				return -			} -			up <- fn -		} - -		logger.Println("Creating graph") -		fn = filepath.Join(savedir, "graph.png") -		f, err = os.Create(fn) -		if err != nil { -			errc <- fmt.Errorf("Error creating file %s: %s", fn, err) -			return -		} -		defer f.Close() -		err = bookpipeline.Graph(bestconfs, filepath.Base(savedir), f) -		if err != nil && err.Error() != "Not enough valid confidences" { -			errc <- fmt.Errorf("Error rendering graph: %s", err) -			return -		} -		up <- fn - -		close(up) -	} -} - -func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue string, msgc chan bookpipeline.Qmsg, errc chan error) { -	currentmsg := msg -	for range t.C { -		m, err := conn.QueueHeartbeat(currentmsg, queue, HeartbeatSeconds*2) -		if err != nil { -			// This is for better debugging of the heartbeat issue -			conn.Log("Error with heartbeat", err) -			os.Exit(1) -			// TODO: would be better to ensure this error stops any running -			//       processes, as they will ultimately fail in the case of -			//       it. could do this by setting a global variable that -			//       processes check each time they loop. -			errc <- err -			t.Stop() -			return -		} -		if m.Id != "" { -			conn.Log("Replaced message handle as visibilitytimeout limit was reached") -			currentmsg = m -			// TODO: maybe handle communicating new msg more gracefully than this -			for range msgc { -			} // throw away any old msgc -			msgc <- m -		} -	} -} - -// allOCRed checks whether all pages of a book have been OCRed. -// This is determined by whether every _bin0.?.png file has a -// corresponding .hocr file. -func allOCRed(bookname string, conn Pipeliner) bool { -	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) -	if err != nil { -		return false -	} - -	preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) - -	atleastone := false -	for _, png := range objs { -		if preprocessedPattern.MatchString(png) { -			atleastone = true -			found := false -			b := strings.TrimSuffix(filepath.Base(png), ".png") -			hocrname := bookname + "/" + b + ".hocr" -			for _, hocr := range objs { -				if hocr == hocrname { -					found = true -					break -				} -			} -			if found == false { -				return false -			} -		} -	} -	if atleastone == false { -		return false -	} -	return true -} - -// ocrPage OCRs a page based on a message. It may make sense to -// roll this back into processBook (on which it is based) once -// working well. -func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { -	dl := make(chan string) -	msgc := make(chan bookpipeline.Qmsg) -	processc := make(chan string) -	upc := make(chan string) -	done := make(chan bool) -	errc := make(chan error) - -	msgparts := strings.Split(msg.Body, " ") -	bookname := filepath.Dir(msgparts[0]) -	if len(msgparts) > 1 && msgparts[1] != "" { -		process = ocr(msgparts[1]) -	} - -	d := filepath.Join(os.TempDir(), bookname) -	err := os.MkdirAll(d, 0755) -	if err != nil { -		return fmt.Errorf("Failed to create directory %s: %s", d, err) -	} - -	t := time.NewTicker(HeartbeatSeconds * time.Second) -	go heartbeat(conn, t, msg, fromQueue, msgc, errc) - -	// these functions will do their jobs when their channels have data -	go download(dl, processc, conn, d, errc, conn.GetLogger()) -	go process(processc, upc, errc, conn.GetLogger()) -	go up(upc, done, conn, bookname, errc, conn.GetLogger()) - -	dl <- msgparts[0] -	close(dl) - -	// wait for either the done or errc channel to be sent to -	select { -	case err = <-errc: -		t.Stop() -		_ = os.RemoveAll(d) -		return err -	case <-done: -	} - -	if allOCRed(bookname, conn) && toQueue != "" { -		conn.Log("Sending", bookname, "to queue", toQueue) -		err = conn.AddToQueue(toQueue, bookname) -		if err != nil { -			t.Stop() -			_ = os.RemoveAll(d) -			return fmt.Errorf("Error adding to queue %s: %s", bookname, err) -		} -	} - -	t.Stop() - -	// check whether we're using a newer msg handle -	select { -	case m, ok := <-msgc: -		if ok { -			msg = m -			conn.Log("Using new message handle to delete message from queue") -		} -	default: -		conn.Log("Using original message handle to delete message from queue") -	} - -	conn.Log("Deleting original message from queue", fromQueue) -	err = conn.DelFromQueue(fromQueue, msg.Handle) -	if err != nil { -		_ = os.RemoveAll(d) -		return fmt.Errorf("Error deleting message from queue: %s", err) -	} - -	err = os.RemoveAll(d) -	if err != nil { -		return fmt.Errorf("Failed to remove directory %s: %s", d, err) -	} - -	return nil -} - -func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error { -	dl := make(chan string) -	msgc := make(chan bookpipeline.Qmsg) -	processc := make(chan string) -	upc := make(chan string) -	done := make(chan bool) -	errc := make(chan error) - -	msgparts := strings.Split(msg.Body, " ") -	bookname := msgparts[0] - -	var training string -	if len(msgparts) > 1 { -		training = msgparts[1] -	} - -	d := filepath.Join(os.TempDir(), bookname) -	err := os.MkdirAll(d, 0755) -	if err != nil { -		return fmt.Errorf("Failed to create directory %s: %s", d, err) -	} - -	t := time.NewTicker(HeartbeatSeconds * time.Second) -	go heartbeat(conn, t, msg, fromQueue, msgc, errc) - -	// these functions will do their jobs when their channels have data -	go download(dl, processc, conn, d, errc, conn.GetLogger()) -	go process(processc, upc, errc, conn.GetLogger()) -	if toQueue == conn.OCRPageQueueId() { -		go upAndQueue(upc, done, toQueue, conn, bookname, training, errc, conn.GetLogger()) -	} else { -		go up(upc, done, conn, bookname, errc, conn.GetLogger()) -	} - -	conn.Log("Getting list of objects to download") -	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) -	if err != nil { -		t.Stop() -		_ = os.RemoveAll(d) -		return fmt.Errorf("Failed to get list of files for book %s: %s", bookname, err) -	} -	var todl []string -	for _, n := range objs { -		if !match.MatchString(n) { -			conn.Log("Skipping item that doesn't match target", n) -			continue -		} -		todl = append(todl, n) -	} -	for _, a := range todl { -		dl <- a -	} -	close(dl) - -	// wait for either the done or errc channel to be sent to -	select { -	case err = <-errc: -		t.Stop() -		_ = os.RemoveAll(d) -		// if the error is in preprocessing / wipeonly, chances are that it will never -		// complete, and will fill the ocrpage queue with parts which succeeded -		// on each run, so in that case it's better to delete the message from -		// the queue and notify us. -		if fromQueue == conn.PreQueueId() || fromQueue == conn.WipeQueueId() { -			conn.Log("Deleting message from queue due to a bad error", fromQueue) -			err2 := conn.DelFromQueue(fromQueue, msg.Handle) -			if err2 != nil { -				conn.Log("Error deleting message from queue", err2) -			} -			ms, err2 := getMailSettings() -			if err2 != nil { -				conn.Log("Failed to mail settings ", err2) -			} -			if err2 == nil && ms.server != "" { -				logs, err2 := getlogs() -				if err2 != nil { -					conn.Log("Failed to get logs ", err2) -					logs = "" -				} -				msg := fmt.Sprintf("To: %s\r\nFrom: %s\r\n" + -					"Subject: [bookpipeline] Error in wipeonly / preprocessing queue with %s\r\n\r\n" + -					" Fail message: %s\r\nFull log:\r\n%s\r\n", -					ms.to, ms.from, bookname, err, logs) -				host := fmt.Sprintf("%s:%s", ms.server, ms.port) -				auth := smtp.PlainAuth("", ms.user, ms.pass, ms.server) -				err2 = smtp.SendMail(host, auth, ms.from, []string{ms.to}, []byte(msg)) -				if err2 != nil { -					conn.Log("Error sending email ", err2) -				} -			} -		} -		return err -	case <-done: -	} - -	if toQueue != "" && toQueue != conn.OCRPageQueueId() { -		conn.Log("Sending", bookname, "to queue", toQueue) -		err = conn.AddToQueue(toQueue, bookname) -		if err != nil { -			t.Stop() -			_ = os.RemoveAll(d) -			return fmt.Errorf("Error adding to queue %s: %s", bookname, err) -		} -	} - -	t.Stop() - -	// check whether we're using a newer msg handle -	select { -	case m, ok := <-msgc: -		if ok { -			msg = m -			conn.Log("Using new message handle to delete message from queue") -		} -	default: -		conn.Log("Using original message handle to delete message from queue") -	} - -	conn.Log("Deleting original message from queue", fromQueue) -	err = conn.DelFromQueue(fromQueue, msg.Handle) -	if err != nil { -		_ = os.RemoveAll(d) -		return fmt.Errorf("Error deleting message from queue: %s", err) -	} - -	err = os.RemoveAll(d) -	if err != nil { -		return fmt.Errorf("Failed to remove directory %s: %s", d, err) -	} - -	return nil -} -  func stopTimer(t *time.Timer) {  	if !t.Stop() {  		<-t.C @@ -724,50 +87,6 @@ func resetTimer(t *time.Timer, d time.Duration) {  	}  } -// TODO: rather than relying on journald, would be nicer to save the logs -//       ourselves maybe, so that we weren't relying on a particular systemd -//       setup. this can be done by having the conn.Log also append line -//       to a file (though that would mean everything would have to go through -//       conn.Log, which we're not consistently doing yet). the correct thing -//       to do then would be to implement a new interface that covers the part -//       of log.Logger we use (e.g. Print and Printf), and then have an exported -//       conn struct that implements those, so that we could pass a log.Logger -//       or the new conn struct everywhere (we wouldn't be passing a log.Logger, -//       it's just good to be able to keep the compatibility) -func getlogs() (string, error) { -	cmd := exec.Command("journalctl", "-u", "bookpipeline", "-n", "all") -	var stdout, stderr bytes.Buffer -	cmd.Stdout = &stdout -	cmd.Stderr = &stderr -	err := cmd.Run() -	return stdout.String(), err -} - -func savelogs(conn Pipeliner, starttime int64, hostname string) error { -	logs, err := getlogs() -	if err != nil { -		return fmt.Errorf("Error getting logs, error: %v", err) -	} -	key := fmt.Sprintf("bookpipeline.log.%d.%s", starttime, hostname) -	path := filepath.Join(os.TempDir(), key) -	f, err := os.Create(path) -	if err != nil { -		return fmt.Errorf("Error creating log file", err) -	} -	defer f.Close() -	_, err = f.WriteString(logs) -	if err != nil { -		return fmt.Errorf("Error saving log file", err) -	} -	_ = f.Close() -	err = conn.Upload(conn.WIPStorageId(), key, path) -	if err != nil { -		return fmt.Errorf("Error uploading log", err) -	} -	conn.Log("Log saved to", key) -	return nil -} -  func main() {  	verbose := flag.Bool("v", false, "verbose")  	training := flag.String("t", "rescribealphav5", "default tesseract training file to use (without the .traineddata part)") @@ -809,7 +128,7 @@ func main() {  	var err error  	if *conntype != "local" { -		_, err = getMailSettings() +		_, err = pipeline.GetMailSettings()  		if err != nil {  			conn.Log("Warning: disabling email notifications as mail setting retrieval failed: ", err)  		} @@ -857,7 +176,7 @@ func main() {  	for {  		select {  		case <-checkPreQueue: -			msg, err := conn.CheckQueue(conn.PreQueueId(), HeartbeatSeconds*2) +			msg, err := conn.CheckQueue(conn.PreQueueId(), QueueTimeoutSecs)  			checkPreQueue = time.After(PauseBetweenChecks)  			if err != nil {  				conn.Log("Error checking preprocess queue", err) @@ -869,13 +188,13 @@ func main() {  			}  			conn.Log("Message received on preprocess queue, processing", msg.Body)  			stopTimer(stopIfQuiet) -			err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId()) +			err = pipeline.ProcessBook(msg, conn, pipeline.Preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId())  			resetTimer(stopIfQuiet, quietTime)  			if err != nil {  				conn.Log("Error during preprocess", err)  			}  		case <-checkWipeQueue: -			msg, err := conn.CheckQueue(conn.WipeQueueId(), HeartbeatSeconds*2) +			msg, err := conn.CheckQueue(conn.WipeQueueId(), QueueTimeoutSecs)  			checkWipeQueue = time.After(PauseBetweenChecks)  			if err != nil {  				conn.Log("Error checking wipeonly queue", err) @@ -887,13 +206,13 @@ func main() {  			}  			stopTimer(stopIfQuiet)  			conn.Log("Message received on wipeonly queue, processing", msg.Body) -			err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId()) +			err = pipeline.ProcessBook(msg, conn, pipeline.Wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())  			resetTimer(stopIfQuiet, quietTime)  			if err != nil {  				conn.Log("Error during wipe", err)  			}  		case <-checkOCRPageQueue: -			msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatSeconds*2) +			msg, err := conn.CheckQueue(conn.OCRPageQueueId(), QueueTimeoutSecs)  			checkOCRPageQueue = time.After(PauseBetweenChecks)  			if err != nil {  				conn.Log("Error checking OCR Page queue", err) @@ -907,13 +226,13 @@ func main() {  			checkOCRPageQueue = time.After(0)  			stopTimer(stopIfQuiet)  			conn.Log("Message received on OCR Page queue, processing", msg.Body) -			err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) +			err = pipeline.OcrPage(msg, conn, pipeline.Ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId())  			resetTimer(stopIfQuiet, quietTime)  			if err != nil {  				conn.Log("Error during OCR Page process", err)  			}  		case <-checkAnalyseQueue: -			msg, err := conn.CheckQueue(conn.AnalyseQueueId(), HeartbeatSeconds*2) +			msg, err := conn.CheckQueue(conn.AnalyseQueueId(), QueueTimeoutSecs)  			checkAnalyseQueue = time.After(PauseBetweenChecks)  			if err != nil {  				conn.Log("Error checking analyse queue", err) @@ -925,14 +244,14 @@ func main() {  			}  			stopTimer(stopIfQuiet)  			conn.Log("Message received on analyse queue, processing", msg.Body) -			err = processBook(msg, conn, analyse(conn), ocredPattern, conn.AnalyseQueueId(), "") +			err = pipeline.ProcessBook(msg, conn, pipeline.Analyse(conn), ocredPattern, conn.AnalyseQueueId(), "")  			resetTimer(stopIfQuiet, quietTime)  			if err != nil {  				conn.Log("Error during analysis", err)  			}  		case <-savelognow.C:  			conn.Log("Saving logs") -			err = savelogs(conn, starttime, hostname) +			err = pipeline.SaveLogs(conn, starttime, hostname)  			if err != nil {  				conn.Log("Error saving logs", err)  			} @@ -942,11 +261,11 @@ func main() {  			}  			if !*autoshutdown {  				conn.Log("Stopping pipeline") -				_ = savelogs(conn, starttime, hostname) +				_ = pipeline.SaveLogs(conn, starttime, hostname)  				return  			}  			conn.Log("Shutting down") -			_ = savelogs(conn, starttime, hostname) +			_ = pipeline.SaveLogs(conn, starttime, hostname)  			cmd := exec.Command("sudo", "systemctl", "poweroff")  			var stdout, stderr bytes.Buffer  			cmd.Stdout = &stdout  | 
