diff options
Diffstat (limited to 'cmd')
| -rw-r--r-- | cmd/bookpipeline/main.go | 182 | ||||
| -rw-r--r-- | cmd/lspipeline/main.go | 2 | ||||
| -rw-r--r-- | cmd/mkpipeline/main.go | 2 | 
3 files changed, 178 insertions, 8 deletions
| diff --git a/cmd/bookpipeline/main.go b/cmd/bookpipeline/main.go index 0ed0d67..3a539c1 100644 --- a/cmd/bookpipeline/main.go +++ b/cmd/bookpipeline/main.go @@ -17,7 +17,7 @@ import (  	"rescribe.xyz/utils/pkg/hocr"  ) -const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-na] [-t training] +const usage = `Usage: bookpipeline [-v] [-np] [-nw] [-no] [-nop] [-na] [-t training]  Watches the preprocess, ocr and analyse queues for book names. When  one is found this general process is followed: @@ -35,6 +35,7 @@ one is found this general process is followed:  `  const PauseBetweenChecks = 3 * time.Minute +const PauseBetweenOCRPageChecks = 1 * time.Second  const HeartbeatTime = 60  // null writer to enable non-verbose logging to be discarded @@ -60,6 +61,7 @@ type Pipeliner interface {  	PreQueueId() string  	WipeQueueId() string  	OCRQueueId() string +	OCRPageQueueId() string  	AnalyseQueueId() string  	WIPStorageId() string  	GetLogger() *log.Logger @@ -99,6 +101,31 @@ func up(c chan string, done chan bool, conn Pipeliner, bookname string, errc cha  	done <- true  } +func upAndQueue(c chan string, done chan bool, toQueue string, conn Pipeliner, bookname string, errc chan error, logger *log.Logger) { +	for path := range c { +		name := filepath.Base(path) +		key := filepath.Join(bookname, name) +		logger.Println("Uploading", key) +		err := conn.Upload(conn.WIPStorageId(), key, path) +		if err != nil { +			for range c { +			} // consume the rest of the receiving channel so it isn't blocked +			errc <- err +			return +		} +		conn.GetLogger().Println("Adding", key, "to queue", toQueue) +		err = conn.AddToQueue(toQueue, key) +		if err != nil { +			for range c { +			} // consume the rest of the receiving channel so it isn't blocked +			errc <- err +			return +		} +	} + +	done <- true +} +  func preprocess(pre chan string, up chan string, errc chan error, logger *log.Logger) {  	for path := range pre {  		logger.Println("Preprocessing", path) @@ -277,6 +304,122 @@ func heartbeat(conn Pipeliner, t *time.Ticker, msg bookpipeline.Qmsg, queue stri  	}  } +// allOCRed checks whether all pages of a book have been OCRed. +// This is determined by whether every _bin0.?.png file has a +// corresponding .hocr file. +func allOCRed(bookname string, conn Pipeliner) bool { +	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) +	if err != nil { +		return false +	} + +	// Full wipePattern can match things like 0000.png which getgbook +	// can emit but aren't ocr-able +	//wipePattern := regexp.MustCompile(`[0-9]{4,6}(.bin)?.png$`) +	wipePattern := regexp.MustCompile(`[0-9]{6}(.bin)?.png$`) +	preprocessedPattern := regexp.MustCompile(`_bin[0-9].[0-9].png$`) + +	atleastone := false +	for _, png := range objs { +		if wipePattern.MatchString(png) || preprocessedPattern.MatchString(png) { +			atleastone = true +			found := false +			b := strings.TrimSuffix(filepath.Base(png), ".png") +			hocrname := bookname + "/" + b + ".hocr" +			for _, hocr := range objs { +				if hocr == hocrname { +					found = true +					break +				} +			} +			if found == false { +				return false +			} +		} +	} +	if atleastone == false { +		return false +	} +	return true +} + +// ocrPage OCRs a page based on a message. It may make sense to +// roll this back into processBook (on which it is based) once +// working well. +func ocrPage(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), fromQueue string, toQueue string) error { +	dl := make(chan string) +	msgc := make(chan bookpipeline.Qmsg) +	processc := make(chan string) +	upc := make(chan string) +	done := make(chan bool) +	errc := make(chan error) + +	bookname := filepath.Dir(msg.Body) + +	d := filepath.Join(os.TempDir(), bookname) +	err := os.MkdirAll(d, 0755) +	if err != nil { +		return errors.New(fmt.Sprintf("Failed to create directory %s: %s", d, err)) +	} + +	t := time.NewTicker(HeartbeatTime * time.Second) +	go heartbeat(conn, t, msg, fromQueue, msgc, errc) + +	// these functions will do their jobs when their channels have data +	go download(dl, processc, conn, d, errc, conn.GetLogger()) +	go process(processc, upc, errc, conn.GetLogger()) +	go up(upc, done, conn, bookname, errc, conn.GetLogger()) + +	dl <- msg.Body +	close(dl) + +	// wait for either the done or errc channel to be sent to +	select { +	case err = <-errc: +		t.Stop() +		_ = os.RemoveAll(d) +		return err +	case <-done: +	} + +	if allOCRed(bookname, conn) && toQueue != "" { +		conn.GetLogger().Println("Sending", bookname, "to queue", toQueue) +		err = conn.AddToQueue(toQueue, bookname) +		if err != nil { +			t.Stop() +			_ = os.RemoveAll(d) +			return errors.New(fmt.Sprintf("Error adding to queue %s: %s", bookname, err)) +		} +	} + +	t.Stop() + +	// check whether we're using a newer msg handle +	select { +	case m, ok := <-msgc: +		if ok { +			msg = m +			conn.GetLogger().Println("Using new message handle to delete message from queue") +		} +	default: +		conn.GetLogger().Println("Using original message handle to delete message from queue") +	} + +	conn.GetLogger().Println("Deleting original message from queue", fromQueue) +	err = conn.DelFromQueue(fromQueue, msg.Handle) +	if err != nil { +		_ = os.RemoveAll(d) +		return errors.New(fmt.Sprintf("Error deleting message from queue: %s", err)) +	} + +	err = os.RemoveAll(d) +	if err != nil { +		return errors.New(fmt.Sprintf("Failed to remove directory %s: %s", d, err)) +	} + +	return nil +} +  func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string, chan string, chan error, *log.Logger), match *regexp.Regexp, fromQueue string, toQueue string) error {  	dl := make(chan string)  	msgc := make(chan bookpipeline.Qmsg) @@ -299,7 +442,11 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string  	// these functions will do their jobs when their channels have data  	go download(dl, processc, conn, d, errc, conn.GetLogger())  	go process(processc, upc, errc, conn.GetLogger()) -	go up(upc, done, conn, bookname, errc, conn.GetLogger()) +	if toQueue == conn.OCRPageQueueId() { +		go upAndQueue(upc, done, toQueue, conn, bookname, errc, conn.GetLogger()) +	} else { +		go up(upc, done, conn, bookname, errc, conn.GetLogger()) +	}  	conn.GetLogger().Println("Getting list of objects to download")  	objs, err := conn.ListObjects(conn.WIPStorageId(), bookname) @@ -330,7 +477,8 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string  	case <-done:  	} -	if toQueue != "" { +	if toQueue != "" && toQueue != conn.OCRPageQueueId() { +		go upAndQueue(upc, done, toQueue, conn, bookname, errc, conn.GetLogger())  		conn.GetLogger().Println("Sending", bookname, "to queue", toQueue)  		err = conn.AddToQueue(toQueue, bookname)  		if err != nil { @@ -347,10 +495,10 @@ func processBook(msg bookpipeline.Qmsg, conn Pipeliner, process func(chan string  	case m, ok := <-msgc:  		if ok {  			msg = m -			conn.GetLogger().Println("Using new message handle to delete message from old queue") +			conn.GetLogger().Println("Using new message handle to delete message from queue")  		}  	default: -		conn.GetLogger().Println("Using original message handle to delete message from old queue") +		conn.GetLogger().Println("Using original message handle to delete message from queue")  	}  	conn.GetLogger().Println("Deleting original message from queue", fromQueue) @@ -374,6 +522,7 @@ func main() {  	nopreproc := flag.Bool("np", false, "disable preprocessing")  	nowipe := flag.Bool("nw", false, "disable wipeonly")  	noocr := flag.Bool("no", false, "disable ocr") +	noocrpg := flag.Bool("nop", false, "disable ocr on individual pages")  	noanalyse := flag.Bool("na", false, "disable analysis")  	flag.Usage = func() { @@ -408,6 +557,7 @@ func main() {  	var checkPreQueue <-chan time.Time  	var checkWipeQueue <-chan time.Time  	var checkOCRQueue <-chan time.Time +	var checkOCRPageQueue <-chan time.Time  	var checkAnalyseQueue <-chan time.Time  	if !*nopreproc {  		checkPreQueue = time.After(0) @@ -418,6 +568,9 @@ func main() {  	if !*noocr {  		checkOCRQueue = time.After(0)  	} +	if !*noocrpg { +		checkOCRPageQueue = time.After(0) +	}  	if !*noanalyse {  		checkAnalyseQueue = time.After(0)  	} @@ -436,7 +589,7 @@ func main() {  				continue  			}  			verboselog.Println("Message received on preprocess queue, processing", msg.Body) -			err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRQueueId()) +			err = processBook(msg, conn, preprocess, origPattern, conn.PreQueueId(), conn.OCRPageQueueId())  			if err != nil {  				log.Println("Error during preprocess", err)  			} @@ -452,10 +605,25 @@ func main() {  				continue  			}  			verboselog.Println("Message received on wipeonly queue, processing", msg.Body) -			err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRQueueId()) +			err = processBook(msg, conn, wipe, wipePattern, conn.WipeQueueId(), conn.OCRPageQueueId())  			if err != nil {  				log.Println("Error during wipe", err)  			} +		case <-checkOCRPageQueue: +			msg, err := conn.CheckQueue(conn.OCRPageQueueId(), HeartbeatTime*2) +			checkOCRPageQueue = time.After(PauseBetweenOCRPageChecks) +			if err != nil { +				log.Println("Error checking OCR Page queue", err) +				continue +			} +			if msg.Handle == "" { +				continue +			} +			verboselog.Println("Message received on OCR Page queue, processing", msg.Body) +			err = ocrPage(msg, conn, ocr(*training), conn.OCRPageQueueId(), conn.AnalyseQueueId()) +			if err != nil { +				log.Println("Error during OCR Page process", err) +			}  		case <-checkOCRQueue:  			msg, err := conn.CheckQueue(conn.OCRQueueId(), HeartbeatTime*2)  			checkOCRQueue = time.After(PauseBetweenChecks) diff --git a/cmd/lspipeline/main.go b/cmd/lspipeline/main.go index 0b8ce49..a32d851 100644 --- a/cmd/lspipeline/main.go +++ b/cmd/lspipeline/main.go @@ -28,6 +28,7 @@ type LsPipeliner interface {  	PreQueueId() string  	WipeQueueId() string  	OCRQueueId() string +	OCRPageQueueId() string  	AnalyseQueueId() string  	GetQueueDetails(url string) (string, string, error)  	GetInstanceDetails() ([]bookpipeline.InstanceDetails, error) @@ -62,6 +63,7 @@ func getQueueDetails(conn LsPipeliner, qdetails chan queueDetails) {  		{"preprocess", conn.PreQueueId()},  		{"wipeonly", conn.WipeQueueId()},  		{"ocr", conn.OCRQueueId()}, +		{"ocrpage", conn.OCRPageQueueId()},  		{"analyse", conn.AnalyseQueueId()},  	}  	for _, q := range queues { diff --git a/cmd/mkpipeline/main.go b/cmd/mkpipeline/main.go index e37a56d..a32526a 100644 --- a/cmd/mkpipeline/main.go +++ b/cmd/mkpipeline/main.go @@ -34,7 +34,7 @@ func main() {  	prefix := "rescribe"  	buckets := []string{"inprogress", "done"} -	queues := []string{"preprocess", "wipeonly", "ocr", "analyse"} +	queues := []string{"preprocess", "wipeonly", "ocr", "analyse", "ocrpage"}  	for _, bucket := range buckets {  		bname := prefix + bucket | 
