Changeset - 634319abff9f
[Not reviewed]
default
0 1 0
Laman - 6 years ago 2019-04-16 20:19:53

experimental randomized Hough transform
1 file changed with 113 insertions and 0 deletions:
0 comments (0 inline, 0 general)
exp/hough.py
Show inline comments
 
import sys
 
sys.path.append("../src")
 

	
 
import math
 
import random
 
from datetime import datetime
 
import os.path
 
import logging as log
 

	
 
import numpy as np
 
import scipy.optimize
 
import scipy.signal
 
import cv2 as cv
 

	
 
import config as cfg
 
from geometry import EPoint,Line
 

	
 
DEBUG=True
 

	
 

	
 
class LineBag:
 
	def __init__(self):
 
		self._lines=[]
 

	
 
	def put(self,score,alpha,beta,peaks):
 
		self._lines.append((score,alpha,beta,peaks))
 

	
 
	def pull(self,count):
 
		self._lines.sort(reverse=True)
 
		res=[]
 
		for (score,alpha,beta,peaks) in self._lines:
 
			if any(abs(alpha-gamma)<10 and abs(beta-delta)<10 for (_,gamma,delta,_) in res): continue
 
			# avoid intersecting lines
 
			if any((beta-delta)!=0 and (alpha-gamma)/(beta-delta)<0 for (_,gamma,delta,_) in res): continue
 
			res.append((score,alpha,beta,peaks))
 
			if len(res)>=count: break
 
		return res
 

	
 

	
 
class HoughTransform:
 
	"""Find line sequences with Hough transform.
 

	
 
	Uses usual image coordinates on input and output, with [0,0] in the upper left corner and [height-1,width-1] in the lower right.
 
	However, internally it uses the usual cartesian coordinates, centered at the image center. [-w/2,-h/2] in the upper left and [w/2,h/2] in the lower right."""
 
	def __init__(self,img):
 
		self._angleBandwidth=30 # degrees
 

	
 
		(h,w)=img.shape[:2]
 
		self._diagLen=int(np.sqrt(h**2+w**2))+1
 
		self._center=(w//2,h//2)
 
		self._acc=np.zeros((180,self._diagLen),dtype=np.int32)
 

	
 
		self.update(img)
 

	
 
	def extract(self):
 
		img=self._createImg()
 
		self.show(img)
 
		lines=self._detectLines()
 
		res=[]
 
		i=0
 
		for (score,alpha,beta,peaks) in lines:
 
			log.debug("score: %s",score)
 
			log.debug("alpha, beta: %s, %s",alpha,beta)
 
			self._drawLine(img,alpha,beta,peaks,i)
 

	
 
			res.append([])
 
			keys=self._readLineKeys(alpha,beta)
 
			for k in peaks:
 
				(alphaDeg,d)=keys[k]
 
				line=Line(alphaDeg*math.pi/180,d-self._diagLen//2)
 
				res[-1].append(self._transformOutput(line))
 
			res[-1].sort(key=lambda line: line.d)
 
			i+=1
 

	
 
		self.show(img)
 
		return res
 

	
 
	def update(self,img,weight=1):
 
		start=datetime.now().timestamp()
 
		for (r,row) in enumerate(img):
 
			for (c,pix) in enumerate(row):
 
				if pix==0: continue
 
				for alphaDeg in range(0,180):
 
					d=self._computeDist(c,r,alphaDeg)+self._diagLen//2
 
					self._acc[(alphaDeg,d)]+=weight
 
		log.debug("Hough updated in %s s",round(datetime.now().timestamp()-start,3))
 

	
 
	def scoreLine(self,line):
 
		transformed=self._transformInput(line)
 
		alphaDeg=round(transformed.alpha*180/math.pi)%180
 
		d=round(transformed.d+self._diagLen//2)
 
		if not 0<=d<self._diagLen: return 0
 
		return self._acc[(alphaDeg,d)]
 

	
 
	def show(self,img=None):
 
		if img is None: img=self._createImg()
 
		show(img,"Hough transform accumulator")
 

	
 
	def _computeDist(self,x,y,alphaDeg):
 
		alphaRad=alphaDeg*math.pi/180
 
		(x0,y0)=self._center
 
		(dx,dy)=(x-x0,y0-y)
 
		d=dx*math.cos(alphaRad)+dy*math.sin(alphaRad)
 
		return int(d)
 

	
 
	def _detectLines(self):
 
		bag=LineBag()
 
		for alpha in range(0,180+60,2):
 
			for beta in range(max(alpha-60,0),min(alpha+60,180+60),2):
 
				accLine=[self._acc[key] for key in self._readLineKeys(alpha,beta)]
 
				(peaks,props)=scipy.signal.find_peaks(accLine,prominence=0)
 
				(prominences,peaks)=zip(*sorted(zip(props["prominences"],peaks),reverse=True)[:19])
 
				bag.put(sum(prominences),alpha,beta,peaks)
 
		return bag.pull(2)
 

	
 
	def _readLineKeys(self,alpha,beta):
 
		n=self._diagLen-1
 
		res=[]
 
		for i in range(n+1):
 
			k=round((alpha*(n-i)+beta*i)/n)
 
			if k>=180:
 
				k=k%180
 
				i=n-i
 
			res.append((k,i))
 
		return res
 

	
 
	def _transformInput(self,line):
 
		reflectedLine=Line(math.pi*2-line.alpha,line.d)
 
		(x,y)=self._center
 
		basis=EPoint(x,-y)
 
		shiftedLine=reflectedLine.shiftBasis(basis)
 
		if shiftedLine.alpha>=math.pi:
 
			shiftedLine=Line(shiftedLine.alpha-math.pi,-shiftedLine.d)
 
		return shiftedLine
 

	
 
	def _transformOutput(self,line):
 
		(x,y)=self._center
 
		basis=EPoint(-x,y)
 
		shiftedLine=line.shiftBasis(basis)
 
		reflectedLine=Line(math.pi*2-shiftedLine.alpha,shiftedLine.d)
 
		log.debug("%s -> %s",line,reflectedLine)
 
		return reflectedLine
 

	
 
	def _createImg(self):
 
		maxVal=self._acc.max()
 
		arr=np.expand_dims(np.uint8(255*self._acc//maxVal),axis=2)
 
		img=np.concatenate((arr,arr,arr),axis=2)
 

	
 
		(h,w)=img.shape[:2]
 

	
 
		for x in range(0,w,4): # y axis
 
			img[h//2,x]=[255,255,255]
 
		for y in range(0,h,4):
 
			img[y,w//2]=[255,255,255]
 

	
 
		return img
 

	
 
	def _drawLine(self,img,alpha,beta,peaks,colorKey):
 
		colors=[[0,255,255],[255,0,255],[255,255,0]]
 
		color=colors[colorKey]
 
		(h,w)=img.shape[:2]
 
		keys=self._readLineKeys(alpha,beta)
 
		for (y,x) in keys:
 
			if x%3!=0: continue
 
			if y<0 or y>=h: continue
 
			img[y,x]=color
 
		for k in peaks:
 
			(y,x)=keys[k]
 
			cv.drawMarker(img,(x,y),color,cv.MARKER_TILTED_CROSS,8)
 

	
 

	
 
class Accumulator:
 
	NEIGHBOURHOOD=2
 

	
 
	def __init__(self):
 
		self._acc=[]
 
		self._hits=[]
 

	
 
	def add(self,line):
 
		(d,k)=self._findClosest(line)
 
		if d<=self.NEIGHBOURHOOD:
 
			self._acc=self._averageLines(self._acc[k],line,self._hits[k],1)
 
			self._hits[k]+=1
 
		else:
 
			k=-1
 
			self._acc.append(line)
 
			self._hits.append(1)
 
		return (self._hits[k],k)
 

	
 
	def pop(self,k):
 
		acc=self._acc
 
		(acc[k],acc[-1])=(acc[-1],acc[k])
 
		hits=self._hits
 
		(hits[k],hits[-1])=(hits[-1],hits[k])
 
		hits.pop()
 
		return acc.pop()
 

	
 
	def _findClosest(self,line):
 
		def dist(p,q):
 
			alpha=p.alpha*180/math.pi
 
			beta=q.alpha*180/math.pi
 
			gamma=abs(alpha-beta)
 
			if gamma>180: gamma=360-gamma
 
			return math.sqrt(gamma**2+(p.d-q.d)**2)
 

	
 
		(d,key)=min(zip((dist(line,p) for p in self._acc), range(len(self._acc))))
 
		return (d,key)
 

	
 
	def _averageLines(self,ab,cd,w1,w2):
 
		w=w1+w2
 
		(a,b)=ab.toPoints()
 
		(c,d)=cd.toPoints()
 
		e=(a*w1+c*w2)/w
 
		f=(b*w1+c*w2)/w
 
		return Line.fromPoints(e,f)
 

	
 

	
 
class RandomizedHoughTransform:
 
	HIT_LIMIT=10
 
	CANDIDATE_LIMIT=10
 
	MIN_SCORE=50
 

	
 
	def __init__(self,img):
 
		self._img=np.copy(img)
 
		(self._h,self._w)=img.shape[:2]
 

	
 
		self._acc=Accumulator()
 
		self._candidates=[]
 
		self._res=[]
 

	
 
	def _sampleLine(self):
 
		""":return: (Line) p"""
 
		a=self._chooseRandomPixel()
 
		b=self._chooseRandomPixel()
 
		while b==a: b=self._chooseRandomPixel()
 
		return Line.fromPoints(a,b)
 

	
 
	def _updateAcc(self,line):
 
		(hits,k)=self._acc.add(line)
 
		if hits>=self.HIT_LIMIT:
 
			self._addCandidate(self._acc.pop(k))
 

	
 
	def _addCandidate(self,line):
 
		self._candidates.append(line)
 
		if len(self._candidates)>=self.CANDIDATE_LIMIT:
 
			for p in self._candidates:
 
				p_=self._confirmLine(p)
 
				if p_: self._res.append(p_)
 
			self._candidates=[]
 

	
 
	def _chooseRandomPixel(self):
 
		val=0
 
		while not val:
 
			x=random.randrange(0,self._w)
 
			y=random.randrange(0,self._h)
 
			val=self._img[y,x]
 
		return EPoint(x,y)
 

	
 
	def _confirmLine(self,line):
 
		score=0
 
		for point in self._walkLine(line):
 
			if self._img[point]==1:
 
				score+=1
 
		if score>self.MIN_SCORE:
 
			for point in self._walkLine(line): # erase the line
 
				self._img[point]=0
 
			return line
 
		else: return None
 

	
 
	def _walkLine(self,line):
 
		(a,b,c)=line.toNormal()
 
		if abs(line.alpha-math.pi/2)<math.pi/4 or abs(line.alpha-3*math.pi/2)<math.pi/4: # vertical normal ~ horizontal line
 
			for x in range(self._w):
 
				y=int((-c-a*x)/b)
 
				if 0<=y<self.h:
 
					yield (y,x)
 
		else: # a predominantly vertical line
 
			for y in range(self._h):
 
				x=int((-c-b*y)/a)
 
				if 0<=x<self.w:
 
					yield (y,x)
 

	
 

	
 
def show(img,filename="x"):
 
	if cfg.INTERACTIVE:
 
		cv.imshow(filename,img)
 
		cv.waitKey(0)
 
		cv.destroyAllWindows()
 
	else:
 
		d=int(datetime.now().timestamp())
 
		path=os.path.join(cfg.imgDir,"{0} {1:03} {2}.png".format(d,cfg.i,filename))
 
		cfg.i+=1
 
		cv.imwrite(path,img)
 

	
 

	
 
def filterVert(edges):
 
	kernel = np.array([[1,0,1],[1,0,1],[1,0,1]],np.uint8)
 
	edges = cv.erode(edges,kernel)
 
	kernel=np.array([[0,1,0],[0,1,0],[0,1,0]],np.uint8)
 
	edges=cv.dilate(edges,kernel)
 
	return edges
 

	
 
def filterHor(edges):
 
	kernel = np.array([[1,1,1],[0,0,0],[1,1,1]],np.uint8)
 
	edges = cv.erode(edges,kernel)
 
	kernel=np.array([[0,0,0],[1,1,1],[0,0,0]],np.uint8)
 
	edges=cv.dilate(edges,kernel)
 
	return edges
 

	
 
def filterDiag(edges):
 
	kernel = np.array([[0,0,1],[1,0,0],[0,1,0]],np.uint8)
 
	edges1 = cv.erode(edges,kernel)
 
	kernel=np.array([[1,0,0],[0,1,0],[0,0,1]],np.uint8)
 
	edges1=cv.dilate(edges1,kernel)
 

	
 
	kernel = np.array([[0,1,0],[1,0,0],[0,0,1]],np.uint8)
 
	edges2 = cv.erode(edges,kernel)
 
	kernel=np.array([[0,0,1],[0,1,0],[1,0,0]],np.uint8)
 
	edges2=cv.dilate(edges2,kernel)
 

	
 
	return edges1+edges2
 

	
 
def prepareEdgeImg(img):
 
	gray=cv.cvtColor(img,cv.COLOR_BGR2GRAY)
 
	show(gray,"greyscale image")
 
	edges=cv.Canny(gray,70,130)
 
	show(edges,"Canny edge detector")
 
	edges=filterHor(edges)+filterVert(edges)+filterDiag(edges)
 
	show(edges,"kernel filtered edges")
 
	return edges
0 comments (0 inline, 0 general)