Changeset - 29f28718a69b
[Not reviewed]
tip default
0 1 0
Laman - 6 years ago 2019-09-09 20:58:21

transitional data processing
1 file changed with 46 insertions and 44 deletions:
0 comments (0 inline, 0 general)
exp/kerokero/prepare_data.py
Show inline comments
 
import os
 
import sys
 
import re
 
import math
 
import random
 
import logging as log
 

	
 
import numpy as np
 
import cv2 as cv
 
import PIL.Image
 
import PIL.ImageDraw
 

	
 
import config as cfg
 
sys.path.append("..")
 
sys.path.append("../../src")
 
from annotations import DataFile,computeBoundingBox,Corners,EPoint,Board
 
from geometry import Line
 
from kerokero.transformation_matrices import getIdentity,getRotation,getTranslation,getScale,getMirroring,getProjection
 

	
 
random.seed(361)
 

	
 

	
 
class Stats:
 
@@ -35,83 +37,66 @@ class Sample:
 
		""":return: (img, grid), where img is a 2D np.float32 with values in (0,1),
 
		grid [(float) x, (float) y, ...], with x, y in (-1,1)"""
 
		center=self._getCenter()
 
		m=getIdentity()
 
		t1=getTranslation(-center.x,-center.y)
 
		proj=getProjection()
 
		rot=getRotation()
 
		mir=getMirroring()
 
		for mi in [t1,mir,proj,rot]:
 
			m=np.matmul(mi,m)
 
		m=np.matmul(self._computeCrop(m),m)
 
		img=cv.warpPerspective(self.img,m,(self.SIDE,self.SIDE))
 
		img=np.uint8(img)
 
		img=PIL.Image.fromarray(img[:,:,::-1])
 
		grid=Corners(c.transform(m) for c in self.grid)
 
		grid=list(map(lambda p: list(2*p/self.SIDE-EPoint(1,1)), grid))
 
		return (img,grid)
 
		mask=self._createMask(img,grid)
 
		return (img,mask)
 

	
 
	def rectify(self):
 
		x1=self.SIDE*0.1
 
		x2=self.SIDE*0.9
 
		abcd=list(map(list,self.grid))
 
		destPoints=[(x1,x1),(x1,x2),(x2,x2),(x2,x1)]
 
		abcd_=list(map(list, (EPoint(x,y)+self._createNoise() for (x,y) in destPoints)))
 
		m=cv.getPerspectiveTransform(np.float32(abcd),np.float32(abcd_))
 
		img=cv.warpPerspective(self.img,m,(self.SIDE,self.SIDE))
 
		img=np.uint8(img)
 
		grid=Corners(c.transform(m) for c in self.grid)
 
		grid=list(map(lambda p: list(2*p/self.SIDE-EPoint(1,1)), grid))
 
		return (img,grid)
 

	
 
	def cut(self):
 
		width=max(p.x for p in self.grid)-min(p.x for p in self.grid)
 
		height=max(p.y for p in self.grid)-min(p.y for p in self.grid)
 
		kx=width/4
 
		ky=height/4
 
		n=self.SIDE
 
		for p in self.grid:
 
			shift=self._createNoise(0.2)
 
			abcd=[[p.x-kx,p.y-ky],[p.x-kx,p.y+ky],[p.x+kx,p.y+ky],[p.x+kx,p.y-ky]]
 
			abcd_=[[shift.x,shift.y],[shift.x,n+shift.y],[n+shift.x,n+shift.y],[n+shift.x,shift.y]]
 
			m=cv.getPerspectiveTransform(np.float32(abcd),np.float32(abcd_))
 
			t1=getTranslation(-n/2,-n/2)
 
			mir=getMirroring()
 
			proj=getProjection()
 
			rot=getRotation()
 
			t2=getTranslation(n/2,n/2)
 
			for mi in [t1,mir,proj,rot,t2]:
 
				m=np.matmul(mi,m)
 
			img=cv.warpPerspective(self.img,m,(self.SIDE,self.SIDE))
 
			img=np.uint8(img)
 
			point=p.transform(m)*2/self.SIDE-EPoint(1,1)
 
			yield (img,[point.x,point.y])
 

	
 
	def _getCenter(self):
 
		(a,b,c,d)=self.grid
 
		p=Line.fromPoints(a,c)
 
		q=Line.fromPoints(b,d)
 
		return p.intersect(q)
 

	
 
	def _computeCrop(self,m):
 
		grid=Corners(c.transform(m) for c in self.grid)
 
		(x1,y1,x2,y2)=computeBoundingBox(grid)
 
		(wg,hg)=(x2-x1,y2-y1)
 
		(left,top,right,bottom)=[random.uniform(0.05,0.2) for i in range(4)]
 
		t2=getTranslation(left*wg-x1, top*hg-y1)
 
		scale=getScale(self.SIDE/(wg*(1+left+right)), self.SIDE/(hg*(1+top+bottom)))
 
		return np.matmul(scale,t2)
 

	
 
	def _createNoise(self,mag=0.05):
 
	def _createMask(self,image,grid):
 
		img=PIL.Image.new("L",image.size)
 
		draw=PIL.ImageDraw.Draw(img)
 
		draw.polygon([tuple(p) for p in grid],255,255)
 
		return img
 

	
 
	def _createNoise(self):
 
		alpha=random.uniform(0,math.pi*2)
 
		d=random.uniform(0,self.SIDE*mag)
 
		d=random.uniform(0,self.SIDE*0.05)
 
		return EPoint(math.cos(alpha)*d, math.sin(alpha)*d)
 

	
 
	def show(self):
 
		img=cv.cvtColor(self.img,cv.COLOR_GRAY2BGR)
 
		for c in self.grid:
 
			cv.circle(img,(int(c.x),int(c.y)),3,[0,255,0],-1)
 
		img=cv.resize(img,(self.SIDE*2,self.SIDE*2))
 
		show(img)
 

	
 

	
 
def traverseDirs(root):
 
	stack=[root]
 
@@ -124,35 +109,30 @@ def traverseDirs(root):
 
		for f in contents:
 
			if f.is_dir(): stack.append(f.path)
 

	
 

	
 
def harvestDir(path):
 
	annotations=DataFile(os.path.join(path,"annotations.json.gz"))
 
	imgFilter=lambda f: f.is_file() and re.match(r".*\.(jpg|jpeg|png|gif)$", f.name.lower())
 
	files=sorted(filter(imgFilter,os.scandir(path)),key=lambda f: f.name)
 
	boards=annotations["."]
 
	for f in files:
 
		grade=annotations.get(f.name,[Board()])[0].grade
 
		Stats.counts[grade]+=1
 
		if not Board.UNSET<grade<=Board.GOOD: continue
 
		if not Board.UNSET<grade<=Board.POOR: continue
 
		img=cv.imread(f.path)
 
		img=cv.cvtColor(img,cv.COLOR_BGR2GRAY)
 
		for b in boards:
 
			sample=Sample(img,b.grid)
 
			# sample.show()
 
			# (transformedImg,label)=sample.transform()
 
			# (transformedImg,label)=sample.rectify()
 
			for (transformedImg,label) in sample.cut():
 
				Sample(np.uint8(transformedImg),[(EPoint(*label)+EPoint(1,1))*Sample.SIDE/2]).show()
 
				yield (transformedImg,label)
 
			(transformedImg,mask)=sample.transform()
 
			yield (transformedImg,mask)
 

	
 

	
 
def loadDataset(root):
 
	testRatio=0.1
 
	trainRatio=1-testRatio
 
	images=[]
 
	labels=[]
 
	for d in traverseDirs(root):
 
		for (img,label) in harvestDir(d):
 
			images.append(img)
 
			labels.append(label)
 
	log.info("clear images: %s",Stats.counts[1])
 
@@ -163,27 +143,49 @@ def loadDataset(root):
 
	n=len(images)
 
	keys=list(range(n))
 
	random.shuffle(keys)
 
	images=[images[k] for k in keys]
 
	labels=[labels[k] for k in keys]
 
	m=int(n*trainRatio)
 
	return (
 
		(np.uint8(images[:m]),np.float32(labels[:m])),
 
		(np.uint8(images[m:]),np.float32(labels[m:]))
 
	)
 

	
 

	
 
def prepareDataset(root,dest):
 
	i=0
 
	train=[]
 
	test=[]
 
	for d in traverseDirs(root):
 
		for (image,mask) in harvestDir(d):
 
			i+=1
 
			if random.random()<0.9:
 
				image.save(os.path.join(dest,"train/{0}.jpg".format(i)))
 
				mask.save(os.path.join(dest,"train_masks/{0}_mask.png".format(i)))
 
				train.append(str(i)+".jpg")
 
			else:
 
				image.save(os.path.join(dest,"test/{0}.jpg".format(i)))
 
				test.append(str(i)+".jpg")
 
	with open(os.path.join(dest,"train_masks.csv"),mode="w") as f:
 
		f.write("img,rle_mask\n")
 
		for file in train:
 
			f.write('{0},""\n'.format(file))
 
	with open(os.path.join(dest,"test_masks.csv"),mode="w") as f:
 
		f.write("img,rle_mask\n")
 
		for file in test:
 
			f.write('{0},""\n'.format(file))
 
	log.info("clear images: %s",Stats.counts[1])
 
	log.info("good images: %s",Stats.counts[2])
 
	log.info("poor images: %s",Stats.counts[3])
 
	log.info("unset images: %s",Stats.counts[0])
 
	log.info("total: %s",sum(Stats.counts))
 

	
 

	
 
def show(img,filename="x"):
 
	cv.imshow(filename,img)
 
	cv.waitKey(0)
 
	cv.destroyAllWindows()
 

	
 

	
 
if __name__=="__main__":
 
	((trainImages,trainLabels),(testImages,testLabels))=loadDataset(sys.argv[1])
 
	np.savez_compressed(
 
		sys.argv[2],
 
		trainImages=trainImages,
 
		trainLabels=trainLabels,
 
		testImages=testImages,
 
		testLabels=testLabels
 
	)
 
	prepareDataset(sys.argv[1],sys.argv[2])
0 comments (0 inline, 0 general)