Changeset - 655956f6ba89
[Not reviewed]
default
4 1 5
Laman - 6 years ago 2019-05-04 16:43:20

training and testing model
6 files changed with 89 insertions and 12 deletions:
0 comments (0 inline, 0 general)
exp/board_detect.py
Show inline comments
 
import sys
 

	
 
sys.path.append("../src")
 

	
 
import os
 
import random
 
import itertools
 
import logging as log
 

	
 
import cv2 as cv
 
import numpy as np
 

	
 
from geometry import Line
 
from ransac import DiagonalRansac
 
from quantization import kmeans,QuantizedImage
 
from annotations import DataFile,computeBoundingBox
 
from hough import show,prepareEdgeImg,HoughTransform
 
from analyzer.epoint import EPoint
 
from analyzer.corners import Corners
 

	
 
random.seed(361)
 
log.basicConfig(level=log.DEBUG,format="%(message)s")
 

	
 

	
 
def filterStones(contours,bwImg,stoneDims):
 
	contourImg=cv.cvtColor(bwImg,cv.COLOR_GRAY2BGR)
 
	res=[]
 
	for (i,c) in enumerate(contours):
 
		keep=True
 
		moments=cv.moments(c)
 
		center=(moments["m10"]/(moments["m00"] or 1), moments["m01"]/(moments["m00"] or 1))
 
		area=cv.contourArea(c)
 
		(x,y,w,h)=cv.boundingRect(c)
 
		if w>stoneDims[0] or h>stoneDims[1]*1.5 or w<2 or h<2:
 
			cv.drawMarker(contourImg,tuple(map(int,center)),(0,0,255),cv.MARKER_TILTED_CROSS,12)
 
			keep=False
 
		coverage1=area/(w*h or 1)
 
		hull=cv.convexHull(c)
 
		coverage2=area/(cv.contourArea(hull) or 1)
 
		# if coverage2<0.8:
 
		# 	cv.drawMarker(contourImg,tuple(map(int,center)),(0,127,255),cv.MARKER_DIAMOND,12)
 
		# 	keep=False
 
		if keep:
 
			res.append((EPoint(*center),c))
 
			cv.drawMarker(contourImg,tuple(map(int,center)),(255,0,0),cv.MARKER_CROSS)
 
	log.debug("accepted: %s",len(res))
 
	log.debug("rejected: %s",len(contours)-len(res))
 
	show(contourImg,"accepted and rejected stones")
 
	return res
 

	
 

	
 
class BoardDetector:
 
	def __init__(self,annotationsPath):
 
		self._annotations=DataFile(annotationsPath)
 

	
 
		self._rectW=0
 
		self._rectH=0
 
		self._rect=None
 

	
 
		self._hough=None
 
		self._rectiMatrix=None
 
		self._inverseMatrix=None
 

	
 
		self.grid=None
 

	
 
	def __call__(self,img,filename):
 
		# approximately detect the board
 
		(h,w)=img.shape[:2]
 
		log.debug("image dimensions: %s x %s",w,h)
 
		show(img,filename)
 
		(x1,y1,x2,y2)=self._detectRough(img,filename)
 
		rect=img[y1:y2,x1:x2]
 
		self._rectW=x2-x1
 
		self._rectH=y2-y1
 
		self._rect=rect
 

	
 
		# quantize colors
 
		quantized=QuantizedImage(rect)
 
		gray=cv.cvtColor(rect,cv.COLOR_BGR2GRAY)
 
		edges=cv.Canny(gray,70,130)
 
		show(edges,"edges")
 
		edgeMask=(255-edges)
 
		quantizedImg=quantized.img & cv.cvtColor(edgeMask,cv.COLOR_GRAY2BGR)
 
		show(quantizedImg,"quantized, edges separated")
 

	
 
		# detect black and white stones
 
		stones=self._detectStones(quantized,edgeMask)
 

	
 
		# detect lines from edges and stones
 
		edgeImg=prepareEdgeImg(rect)
 
		self._hough=HoughTransform(edgeImg)
 
		stonesImg=np.zeros((self._rectH,self._rectW),np.uint8)
 
		for (point,c) in stones:
 
			cv.circle(stonesImg,(int(point.x),int(point.y)),2,255,-1)
 

	
 
		show(stonesImg,"detected stones")
 
		self._hough.update(stonesImg,10)
 
		lines=self._hough.extract()
 

	
 
		linesImg=np.copy(rect)
 
		for line in itertools.chain(*lines):
 
			self._drawLine(linesImg,line)
 
		show(linesImg,"detected lines")
 

	
 
		# rectify the image
 
		matrix=self._computeTransformationMatrix(lines[0][0],lines[0][-1],lines[1][0],lines[1][-1])
 
		rectiLines=[[line.transform(matrix) for line in pack] for pack in lines]
 
		quantized.transform(matrix)
 

	
 
		# determine precise board edges
 
		self.grid=self._detectGrid(rectiLines,linesImg)
 

	
 
		self.detectPosition(quantized)
 

	
 
	def _detectRough(self,img,filename):
 
		corners=self._annotations[filename][0]
 
		(x1,y1,x2,y2)=computeBoundingBox(corners)
 
		log.debug("bounding box: (%s,%s) - (%s,%s)",x1,y1,x2,y2)
 
		return (x1,y1,x2,y2)
 

	
 
	def _sampleColors(self,rect):
 
		(h,w)=rect.shape[:2]
 
		minirect=rect[h//4:3*h//4, w//4:3*w//4]
 
		return kmeans(minirect)
 

	
 
	def _detectStones(self,quantized,edgeMask):
 
		(h,w)=quantized.img.shape[:2]
 
		mask=self._maskStones(quantized,edgeMask)
 
		stoneDims=(w/19,h/19)
 
		log.debug("stone dims: %s - %s",tuple(x/2 for x in stoneDims),stoneDims)
 

	
 
		(contours,hierarchy)=cv.findContours(mask,cv.RETR_LIST,cv.CHAIN_APPROX_SIMPLE)
 
		stoneLocs=filterStones(contours,mask,stoneDims)
 

	
 
		return stoneLocs
 

	
 
	def _maskStones(self,quantized,edgeMask):
 
		distTransform=cv.distanceTransform(quantized.maskB&edgeMask,cv.DIST_L2,5)
 
		maskB=cv.inRange(distTransform,6,20)
 
		show(maskB,"black areas")
 

	
 
		distTransform=cv.distanceTransform(quantized.maskW&edgeMask,cv.DIST_L2,5)
 
		maskW=cv.inRange(distTransform,6,20)
 
		show(maskW,"white areas")
 

	
 
		stones=cv.bitwise_or(maskB,maskW)
 
		show(stones,"black and white areas")
 
		return stones
 

	
 
	def _computeTransformationMatrix(self,p,q,r,s): # p || q, r || s
 
		(a,b,c,d)=Corners([p.intersect(r),p.intersect(s),q.intersect(r),q.intersect(s)]) # canonize the abcd order
 
		pad=20
 
		a_=EPoint(b.x+pad,min(a.y,d.y)+pad)
 
		b_=EPoint(b.x+pad,max(b.y,c.y)-pad)
 
		c_=EPoint(c.x-pad,max(b.y,c.y)-pad)
 
		d_=EPoint(c.x-pad,min(a.y,d.y)+pad)
 
		abcd=[list(point) for point in (a,b,c,d)]
 
		abcd_=[list(point) for point in (a_,b_,c_,d_)]
 
		log.debug("abcd: %s ->",(a,b,c,d))
 
		log.debug("-> abcd_: %s",(a_,b_,c_,d_))
 
		matrix=cv.getPerspectiveTransform(np.float32(abcd),np.float32(abcd_))
 
		log.debug("transformation matrix: %s",matrix)
 

	
 
		rect=np.copy(self._rect)
 
		for point in (a,b,c,d):
 
			cv.drawMarker(rect,(int(point.x),int(point.y)),(0,255,255),cv.MARKER_TILTED_CROSS)
 
		show(rect)
 
		transformed=cv.warpPerspective(rect,matrix,(self._rectW,self._rectH))
 
		show(transformed,"rectified image")
 

	
 
		self._rectiMatrix=matrix
 
		self._inverseMatrix=np.linalg.inv(matrix)
 
		return matrix
 

	
 
	def _detectGrid(self,lines,img):
 
		intersections=[]
 
		for p in lines[0]:
 
			for q in lines[1]:
 
				intersections.append(p.intersect(q))
 

	
 
		sack=DiagonalRansac(intersections,19)
 
		diagonals=sack.extract(10,3000)
 
		log.debug("diagonals candidates: %s",diagonals)
 
		for line in diagonals:
 
			self._drawLine(img,line.transform(self._inverseMatrix),[0,255,255])
 
		show(img,"diagonals candidates")
 

	
 
		best=(0,None)
 
		transformedImg=cv.warpPerspective(img,self._rectiMatrix,(self._rectW,self._rectH))
 
		explored=[0,0,0]
 

	
 
		for e in diagonals:
 
			for f in diagonals:
 
				explored[0]+=1
 
				center=e.intersect(f)
 
				if not center: continue
 
				if center.x<0 or center.x>self._rectW or center.y<0 or center.y>self._rectH: continue
 
				for line in itertools.chain(*lines):
 
					for i in range(1,10): # 10th is useless, 11-19 are symmetrical to 1-9
 
						explored[1]+=1
 
						grid=self._constructGrid(e,f,line,i)
 
						if not grid: continue
 
						explored[2]+=1
 
						score=self._scoreGrid(grid)
 
						if score>best[0]:
 
							best=(score,grid)
 
							log.debug("new best grid: %s",score)
 
							self._showGrid(transformedImg,grid)
 
		log.debug("diagonal pairs: %s, explored grids: %s, scored grids: %s",*explored)
 
		return best[1]
 

	
 
	def _constructGrid(self,e,f,line,i):
 
		"""Contruct a grid.
 

	
 
		:param e: (Line) one diagonal
 
		:param f: (Line) other diagonal
 
		:param line: (Line) one of the grid lines
 
		:param i: (int) line's index among the grid's lines, 1<=i<=9"""
 
		center=e.intersect(f)
 
		p1=line.intersect(e)
 
		p2=line.intersect(f)
 
		a=center+9*(p1-center)/(10-i)
 
		b=center+9*(p2-center)/(10-i)
 
		c=2*center-a
 
		d=2*center-b
 
		# abort unfitting sizes
 
		if not all(0<=point.x<self._rectW and 0<=point.y<self._rectH for point in (a,b,c,d)):
 
			return None
 
		if any(g.dist(h)<19*10 for (g,h) in [(a,b),(a,c),(a,d),(b,c),(b,d),(c,d)]):
 
			return None
 
		(a,b,c,d)=Corners([a,b,c,d])
 
		rows=[]
 
		cols=[]
 
		for j in range(19):
 
			rows.append(Line.fromPoints((a*(18-j)+b*j)/18,(d*(18-j)+c*j)/18))
 
			cols.append(Line.fromPoints((a*(18-j)+d*j)/18,(b*(18-j)+c*j)/18))
 
		return (rows,cols)
 

	
 
	def _scoreGrid(self,lines):
 
		(p,q,r,s)=(lines[0][0],lines[0][-1],lines[-1][0],lines[-1][-1])
 
		corners=(p.intersect(r),p.intersect(s),q.intersect(r),q.intersect(s))
 
		origCorners=[c.transform(self._inverseMatrix) for c in corners]
 
		# must fit
 
		if not all(0<=c.x<self._rectW and 0<=c.y<self._rectH for c in origCorners):
 
			return 0
 
		return sum(self._hough.scoreLine(p.transform(self._inverseMatrix)) for p in itertools.chain(*lines))
 

	
 
	def detectPosition(self,img):
 
		if not self.grid: return None
 
		(rows,cols)=self.grid
 
		intersections=[[row.intersect(col) for col in cols] for row in rows]
 
		position=[[self._detectStoneAt(img,point) for point in row] for row in intersections]
 
		log.debug("detected position:\n%s","\n".join("".join(row) for row in position))
 
		return position
 

	
 
	def _detectStoneAt(self,img,intersection):
 
		(height,width)=img.img.shape[:2]
 
		(x,y)=map(int,intersection)
 
		scores=[0,0,0]
 
		for xi in range(x-2,x+3):
 
			if xi<0 or xi>=width: continue
 
			for yi in range(y-2,y+3):
 
				if yi<0 or yi>=height: continue
 
				scores[img.get(xi,yi)]+=1
 
		return sorted(list(zip(scores,"XO.")))[-1][1]
 

	
 
	def _drawLine(self,img,line,color=None):
 
		if not color: color=[0,255,0]
 
		(h,w)=img.shape[:2]
 
		corners=[EPoint(0,0),EPoint(w,0),EPoint(0,h),EPoint(w,h)] # NW NE SW SE
 
		borders=[
 
			[Line.fromPoints(corners[0],corners[1]), Line.fromPoints(corners[2],corners[3])], # N S
 
			[Line.fromPoints(corners[0],corners[2]), Line.fromPoints(corners[1],corners[3])] # W E
 
		]
 

	
 
		(a,b)=(line.intersect(borders[0][0]), line.intersect(borders[0][1]))
 
		log.debug("%s %s",line,(a,b))
 
		if not a or not b:
 
			(a,b)=(line.intersect(borders[1][0]), line.intersect(borders[1][1]))
 
			log.debug("* %s %s",line,(a,b))
 
		if any(abs(x)>10**5 for x in [*a,*b]):
 
			log.debug("ignored")
 
			return
 
		cv.line(img,(int(a.x),int(a.y)),(int(b.x),int(b.y)),color)
 

	
 
	def _showGrid(self,img,lines):
 
		img=np.copy(img)
 
		(rows,cols)=lines
 
		for row in rows:
 
			for col in cols:
 
				point=row.intersect(col)
 
				xy=(int(point.x),int(point.y))
 
				cv.circle(img,xy,4,[0,0,255],-1)
 
		show(img,"grid candidate")
 

	
 

	
 
if __name__=="__main__":
 
	detector=BoardDetector(sys.argv[2])
 
	filepath=sys.argv[1]
 
	filename=os.path.basename(filepath)
 
	img=cv.imread(filepath)
 
	detector(img,filename)
exp/kerokero/__init__.py
Show inline comments
 
file renamed from exp/keras/__init__.py to exp/kerokero/__init__.py
exp/kerokero/prepare_data.py
Show inline comments
 
file renamed from exp/keras/prepare_data.py to exp/kerokero/prepare_data.py
 
import os
 
import sys
 
import re
 
import random
 
import itertools
 

	
 
import numpy as np
 
import cv2 as cv
 

	
 
sys.path.append("../exp")
 
sys.path.append("..")
 
sys.path.append("../../src")
 
from annotations import DataFile,computeBoundingBox,Corners
 
from geometry import Line
 
from keras.transformation_matrices import getIdentity,getRotation,getTranslation,getScale,getMirroring,getProjection
 
from kerokero.transformation_matrices import getIdentity,getRotation,getTranslation,getScale,getMirroring,getProjection
 

	
 
random.seed(361)
 

	
 

	
 
class Sample:
 
	SIDE=256
 
	SIDE=224
 

	
 
	def __init__(self,img,grid):
 
		self.img=img
 
		self.grid=grid
 

	
 
	def transform(self):
 
		center=self._getCenter()
 
		m=getIdentity()
 
		t1=getTranslation(-center.x,-center.y)
 
		proj=getProjection()
 
		rot=getRotation()
 
		mir=getMirroring()
 
		for mi in [t1,mir,proj,rot]:
 
			m=np.matmul(mi,m)
 
		m=np.matmul(self._computeCrop(m),m)
 
		img=cv.warpPerspective(self.img,m,(self.SIDE,self.SIDE))
 
		grid=Corners(c.transform(m) for c in self.grid)
 
		Sample(img,grid).show()
 
		return (img,list(itertools.chain.from_iterable(grid)))
 

	
 
	def _getCenter(self):
 
		(a,b,c,d)=self.grid
 
		p=Line.fromPoints(a,c)
 
		q=Line.fromPoints(b,d)
 
		return p.intersect(q)
 

	
 
	def _computeCrop(self,m):
 
		grid=Corners(c.transform(m) for c in self.grid)
 
		(x1,y1,x2,y2)=computeBoundingBox(grid)
 
		(wg,hg)=(x2-x1,y2-y1)
 
		(left,top,right,bottom)=[random.uniform(0.05,0.2) for i in range(4)]
 
		t2=getTranslation(left*wg-x1, top*hg-y1)
 
		scale=getScale(self.SIDE/(wg*(1+left+right)), self.SIDE/(hg*(1+top+bottom)))
 
		return np.matmul(scale,t2)
 

	
 
	def show(self):
 
		img=np.copy(self.img)
 
		for c in self.grid:
 
			cv.circle(img,(int(c.x),int(c.y)),3,[0,255,0],-1)
 
		show(img)
 

	
 

	
 
def traverseDirs(root):
 
	stack=[root]
 
	while len(stack)>0:
 
		d=stack.pop()
 
		contents=sorted(os.scandir(d),key=lambda f: f.name,reverse=True)
 
		if any(f.name=="annotations.json.gz" for f in contents):
 
			print(d)
 
			yield d
 
		for f in contents:
 
			if f.is_dir(): stack.append(f.path)
 

	
 

	
 
def harvestDir(path):
 
	annotations=DataFile(os.path.join(path,"annotations.json.gz"))
 
	imgFilter=lambda f: f.is_file() and re.match(r".*\.(jpg|jpeg|png|gif)$", f.name.lower())
 
	files=sorted(filter(imgFilter,os.scandir(path)),key=lambda f: f.name)
 
	boards=annotations["."]
 
	for f in files:
 
		img=cv.imread(f.path)
 
		img=cv.cvtColor(img,cv.COLOR_BGR2GRAY)
 
		for b in boards:
 
			sample=Sample(img,b.grid)
 
			sample.transform()
 
			(img,label)=sample.transform()
 
			yield (img,label)
 

	
 

	
 
def loadDataset(root):
 
	testRatio=0.1
 
	trainRatio=1-testRatio
 
	images=[]
 
	labels=[]
 
	for d in traverseDirs(root):
 
		for (img,label) in harvestDir(d):
 
			images.append(img)
 
			labels.append(label)
 
	n=len(images)
 
	keys=list(range(n))
 
	random.shuffle(keys)
 
	images=[images[k] for k in keys]
 
	labels=[labels[k] for k in keys]
 
	m=int(n*trainRatio)
 
	return (
 
		(np.uint8(images[:m]),np.float32(labels[:m])),
 
		(np.uint8(images[m:]),np.float32(labels[m:]))
 
	)
 

	
 

	
 
def show(img,filename="x"):
 
	cv.imshow(filename,img)
 
	cv.waitKey(0)
 
	cv.destroyAllWindows()
 

	
 

	
 
if __name__=="__main__":
 
	root=sys.argv[1]
 
	for d in traverseDirs(root):
 
		harvestDir(d)
exp/kerokero/test.py
Show inline comments
 
new file 100644
 
import argparse
 

	
 
import numpy as np
 
from keras.models import load_model
 

	
 
from prepare_data import loadDataset,Sample
 
from analyzer.epoint import EPoint
 
from analyzer.corners import Corners
 

	
 

	
 
parser=argparse.ArgumentParser()
 
parser.add_argument("model")
 
parser.add_argument("data_dir")
 
args=parser.parse_args()
 

	
 
model=load_model(args.model)
 

	
 
print("loading data...")
 
((trainImages,trainLabels),(testImages,testLabels))=loadDataset(args.data_dir)
 
print("done")
 

	
 
for img in testImages:
 
	label=model.predict(np.reshape(img,(1,224,224)))
 
	print(label)
 
	points=[]
 
	for i in range(4):
 
		points.append(EPoint(label[0][i*2],label[0][i*2+1]))
 
	corners=Corners(points)
 
	sample=Sample(np.uint8(img),corners)
 
	sample.show()
exp/kerokero/train.py
Show inline comments
 
file renamed from exp/keras/train.py to exp/kerokero/train.py
 
import argparse
 

	
 
from keras.layers import Conv2D,Dropout,Dense,Flatten
 
from keras.models import Sequential
 
from keras.models import Sequential,load_model
 

	
 
from prepare_data import loadDataset
 

	
 

	
 
model = Sequential([
 
	Flatten(input_shape=(96,96)),
 
parser=argparse.ArgumentParser()
 
parser.add_argument("data_dir")
 
parser.add_argument("--load_model")
 
parser.add_argument("--save_model",default="/tmp/gogo-{0:03}.h5")
 
parser.add_argument("--epochs",type=int,default=100)
 
parser.add_argument("--initial_epoch",type=int,default=0)
 
args=parser.parse_args()
 

	
 
model=Sequential([
 
	Flatten(input_shape=(224,224)),
 
	Dense(128, activation="relu"),
 
	Dropout(0.1),
 
	Dense(64, activation="relu"),
 
	Dense(30)
 
	Dense(8)
 
])
 

	
 
model.compile(
 
	optimizer='adam',
 
	loss='mse',
 
	metrics=['mae','accuracy']
 
)
 
if args.load_model:
 
	model=load_model(args.load_model)
 

	
 
model.fit(X_train,y_train,epochs = 500,batch_size = 128,validation_split = 0.2)
 
print("loading data...")
 
((trainImages,trainLabels),(testImages,testLabels))=loadDataset(args.data_dir)
 
print("done")
 

	
 
for i in range(args.initial_epoch,args.epochs//10):
 
	model.fit(trainImages,trainLabels,epochs=(i+1)*10,initial_epoch=i*10,batch_size=128,validation_split=1/9)
 
	model.save(args.save_model.format(i+1))
 
print(model.evaluate(testImages,testLabels))
exp/kerokero/transformation_matrices.py
Show inline comments
 
file renamed from exp/keras/transformation_matrices.py to exp/kerokero/transformation_matrices.py
 
import math
 
import random
 

	
 
import numpy as np
 

	
 

	
 
def getIdentity():
 
	return np.float32([
 
		[1,0,0],
 
		[0,1,0],
 
		[0,0,1]
 
	])
 

	
 

	
 
def getRotation():
 
	alpha=random.random()*2*math.pi
 
	return np.float32([
 
		[math.cos(alpha),math.sin(alpha),0],
 
		[-math.sin(alpha),math.cos(alpha),0],
 
		[0,0,1]
 
	])
 

	
 

	
 
def getTranslation(dx,dy):
 
	return np.float32([
 
		[1,0,dx],
 
		[0,1,dy],
 
		[0,0,1]
 
	])
 

	
 

	
 
def getScale(kx,ky=0):
 
	if not ky: ky=kx
 
	return np.float32([
 
		[kx,0,0],
 
		[0,ky,0],
 
		[0,0,1]
 
	])
 

	
 

	
 
def getMirroring():
 
	return np.float32([
 
		[random.choice((1,-1)),0,0],
 
		[0,1,0],
 
		[0,0,1]
 
	])
 

	
 

	
 
def getProjection():
 
	dx=random.uniform(-0.001,0.001)
 
	dy=random.uniform(-0.001,0.001)
 
	dx=random.uniform(-0.0005,0.0005)
 
	dy=random.uniform(-0.0005,0.0005)
 
	return np.float32([
 
		[1,0,0],
 
		[0,1,0],
 
		[dx,dy,1]
 
	])
0 comments (0 inline, 0 general)