Files @ 29f28718a69b
Branch filter:

Location: OneEye/exp/kerokero/prepare_data.py

Laman
transitional data processing
import os
import sys
import re
import math
import random
import logging as log

import numpy as np
import cv2 as cv
import PIL.Image
import PIL.ImageDraw

import config as cfg
sys.path.append("..")
sys.path.append("../../src")
from annotations import DataFile,computeBoundingBox,Corners,EPoint,Board
from geometry import Line
from kerokero.transformation_matrices import getIdentity,getRotation,getTranslation,getScale,getMirroring,getProjection

random.seed(361)


class Stats:
	counts=[0,0,0,0]


class Sample:
	SIDE=224

	def __init__(self,img,grid):
		""":param img: a greyscale image as a 2D np.uint8
		:param grid: iterable of 4 EPoints, ie. Corners"""
		self.img=img
		self.grid=grid

	def transform(self):
		""":return: (img, grid), where img is a 2D np.float32 with values in (0,1),
		grid [(float) x, (float) y, ...], with x, y in (-1,1)"""
		center=self._getCenter()
		m=getIdentity()
		t1=getTranslation(-center.x,-center.y)
		proj=getProjection()
		rot=getRotation()
		mir=getMirroring()
		for mi in [t1,mir,proj,rot]:
			m=np.matmul(mi,m)
		m=np.matmul(self._computeCrop(m),m)
		img=cv.warpPerspective(self.img,m,(self.SIDE,self.SIDE))
		img=PIL.Image.fromarray(img[:,:,::-1])
		grid=Corners(c.transform(m) for c in self.grid)
		mask=self._createMask(img,grid)
		return (img,mask)

	def rectify(self):
		x1=self.SIDE*0.1
		x2=self.SIDE*0.9
		abcd=list(map(list,self.grid))
		destPoints=[(x1,x1),(x1,x2),(x2,x2),(x2,x1)]
		abcd_=list(map(list, (EPoint(x,y)+self._createNoise() for (x,y) in destPoints)))
		m=cv.getPerspectiveTransform(np.float32(abcd),np.float32(abcd_))
		img=cv.warpPerspective(self.img,m,(self.SIDE,self.SIDE))
		img=np.uint8(img)
		grid=Corners(c.transform(m) for c in self.grid)
		grid=list(map(lambda p: list(2*p/self.SIDE-EPoint(1,1)), grid))
		return (img,grid)

	def _getCenter(self):
		(a,b,c,d)=self.grid
		p=Line.fromPoints(a,c)
		q=Line.fromPoints(b,d)
		return p.intersect(q)

	def _computeCrop(self,m):
		grid=Corners(c.transform(m) for c in self.grid)
		(x1,y1,x2,y2)=computeBoundingBox(grid)
		(wg,hg)=(x2-x1,y2-y1)
		(left,top,right,bottom)=[random.uniform(0.05,0.2) for i in range(4)]
		t2=getTranslation(left*wg-x1, top*hg-y1)
		scale=getScale(self.SIDE/(wg*(1+left+right)), self.SIDE/(hg*(1+top+bottom)))
		return np.matmul(scale,t2)

	def _createMask(self,image,grid):
		img=PIL.Image.new("L",image.size)
		draw=PIL.ImageDraw.Draw(img)
		draw.polygon([tuple(p) for p in grid],255,255)
		return img

	def _createNoise(self):
		alpha=random.uniform(0,math.pi*2)
		d=random.uniform(0,self.SIDE*0.05)
		return EPoint(math.cos(alpha)*d, math.sin(alpha)*d)

	def show(self):
		img=cv.cvtColor(self.img,cv.COLOR_GRAY2BGR)
		for c in self.grid:
			cv.circle(img,(int(c.x),int(c.y)),3,[0,255,0],-1)
		img=cv.resize(img,(self.SIDE*2,self.SIDE*2))
		show(img)


def traverseDirs(root):
	stack=[root]
	while len(stack)>0:
		d=stack.pop()
		contents=sorted(os.scandir(d),key=lambda f: f.name,reverse=True)
		if any(f.name=="annotations.json.gz" for f in contents):
			log.info(d)
			yield d
		for f in contents:
			if f.is_dir(): stack.append(f.path)


def harvestDir(path):
	annotations=DataFile(os.path.join(path,"annotations.json.gz"))
	imgFilter=lambda f: f.is_file() and re.match(r".*\.(jpg|jpeg|png|gif)$", f.name.lower())
	files=sorted(filter(imgFilter,os.scandir(path)),key=lambda f: f.name)
	boards=annotations["."]
	for f in files:
		grade=annotations.get(f.name,[Board()])[0].grade
		Stats.counts[grade]+=1
		if not Board.UNSET<grade<=Board.POOR: continue
		img=cv.imread(f.path)
		for b in boards:
			sample=Sample(img,b.grid)
			(transformedImg,mask)=sample.transform()
			yield (transformedImg,mask)


def loadDataset(root):
	testRatio=0.1
	trainRatio=1-testRatio
	images=[]
	labels=[]
	for d in traverseDirs(root):
		for (img,label) in harvestDir(d):
			images.append(img)
			labels.append(label)
	log.info("clear images: %s",Stats.counts[1])
	log.info("good images: %s",Stats.counts[2])
	log.info("poor images: %s",Stats.counts[3])
	log.info("unset images: %s",Stats.counts[0])
	log.info("total: %s",sum(Stats.counts))
	n=len(images)
	keys=list(range(n))
	random.shuffle(keys)
	images=[images[k] for k in keys]
	labels=[labels[k] for k in keys]
	m=int(n*trainRatio)
	return (
		(np.uint8(images[:m]),np.float32(labels[:m])),
		(np.uint8(images[m:]),np.float32(labels[m:]))
	)


def prepareDataset(root,dest):
	i=0
	train=[]
	test=[]
	for d in traverseDirs(root):
		for (image,mask) in harvestDir(d):
			i+=1
			if random.random()<0.9:
				image.save(os.path.join(dest,"train/{0}.jpg".format(i)))
				mask.save(os.path.join(dest,"train_masks/{0}_mask.png".format(i)))
				train.append(str(i)+".jpg")
			else:
				image.save(os.path.join(dest,"test/{0}.jpg".format(i)))
				test.append(str(i)+".jpg")
	with open(os.path.join(dest,"train_masks.csv"),mode="w") as f:
		f.write("img,rle_mask\n")
		for file in train:
			f.write('{0},""\n'.format(file))
	with open(os.path.join(dest,"test_masks.csv"),mode="w") as f:
		f.write("img,rle_mask\n")
		for file in test:
			f.write('{0},""\n'.format(file))
	log.info("clear images: %s",Stats.counts[1])
	log.info("good images: %s",Stats.counts[2])
	log.info("poor images: %s",Stats.counts[3])
	log.info("unset images: %s",Stats.counts[0])
	log.info("total: %s",sum(Stats.counts))


def show(img,filename="x"):
	cv.imshow(filename,img)
	cv.waitKey(0)
	cv.destroyAllWindows()


if __name__=="__main__":
	prepareDataset(sys.argv[1],sys.argv[2])