import sys sys.path.append("../src") import math import random from datetime import datetime import os.path import logging as log import numpy as np import scipy.optimize import scipy.signal import cv2 as cv import config as cfg from geometry import EPoint,Line DEBUG=True class LineBag: def __init__(self): self._lines=[] def put(self,score,alpha,beta,peaks): self._lines.append((score,alpha,beta,peaks)) def pull(self,count): self._lines.sort(reverse=True) res=[] for (score,alpha,beta,peaks) in self._lines: if any(abs(alpha-gamma)<10 and abs(beta-delta)<10 for (_,gamma,delta,_) in res): continue # avoid intersecting lines if any((beta-delta)!=0 and (alpha-gamma)/(beta-delta)<0 for (_,gamma,delta,_) in res): continue res.append((score,alpha,beta,peaks)) if len(res)>=count: break return res class HoughTransform: """Find line sequences with Hough transform. Uses usual image coordinates on input and output, with [0,0] in the upper left corner and [height-1,width-1] in the lower right. However, internally it uses the usual cartesian coordinates, centered at the image center. [-w/2,-h/2] in the upper left and [w/2,h/2] in the lower right.""" def __init__(self,img): self._angleBandwidth=30 # degrees (h,w)=img.shape[:2] self._diagLen=int(np.sqrt(h**2+w**2))+1 self._center=(w//2,h//2) self._acc=np.zeros((180,self._diagLen),dtype=np.int32) self.update(img) def extract(self): img=self._createImg() self.show(img) lines=self._detectLines() res=[] i=0 for (score,alpha,beta,peaks) in lines: log.debug("score: %s",score) log.debug("alpha, beta: %s, %s",alpha,beta) self._drawLine(img,alpha,beta,peaks,i) res.append([]) keys=self._readLineKeys(alpha,beta) for k in peaks: (alphaDeg,d)=keys[k] line=Line(alphaDeg*math.pi/180,d-self._diagLen//2) res[-1].append(self._transformOutput(line)) res[-1].sort(key=lambda line: line.d) i+=1 self.show(img) return res def update(self,img,weight=1): start=datetime.now().timestamp() for (r,row) in enumerate(img): for (c,pix) in enumerate(row): if pix==0: continue for alphaDeg in range(0,180): d=self._computeDist(c,r,alphaDeg)+self._diagLen//2 self._acc[(alphaDeg,d)]+=weight log.debug("Hough updated in %s s",round(datetime.now().timestamp()-start,3)) def scoreLine(self,line): transformed=self._transformInput(line) alphaDeg=round(transformed.alpha*180/math.pi)%180 d=round(transformed.d+self._diagLen//2) if not 0<=d<self._diagLen: return 0 return self._acc[(alphaDeg,d)] def show(self,img=None): if img is None: img=self._createImg() show(img,"Hough transform accumulator") def _computeDist(self,x,y,alphaDeg): alphaRad=alphaDeg*math.pi/180 (x0,y0)=self._center (dx,dy)=(x-x0,y0-y) d=dx*math.cos(alphaRad)+dy*math.sin(alphaRad) return int(d) def _detectLines(self): bag=LineBag() for alpha in range(0,180+60,2): for beta in range(max(alpha-60,0),min(alpha+60,180+60),2): accLine=[self._acc[key] for key in self._readLineKeys(alpha,beta)] (peaks,props)=scipy.signal.find_peaks(accLine,prominence=0) (prominences,peaks)=zip(*sorted(zip(props["prominences"],peaks),reverse=True)[:19]) bag.put(sum(prominences),alpha,beta,peaks) return bag.pull(2) def _readLineKeys(self,alpha,beta): n=self._diagLen-1 res=[] for i in range(n+1): k=round((alpha*(n-i)+beta*i)/n) if k>=180: k=k%180 i=n-i res.append((k,i)) return res def _transformInput(self,line): reflectedLine=Line(math.pi*2-line.alpha,line.d) (x,y)=self._center basis=EPoint(x,-y) shiftedLine=reflectedLine.shiftBasis(basis) if shiftedLine.alpha>=math.pi: shiftedLine=Line(shiftedLine.alpha-math.pi,-shiftedLine.d) return shiftedLine def _transformOutput(self,line): (x,y)=self._center basis=EPoint(-x,y) shiftedLine=line.shiftBasis(basis) reflectedLine=Line(math.pi*2-shiftedLine.alpha,shiftedLine.d) log.debug("%s -> %s",line,reflectedLine) return reflectedLine def _createImg(self): maxVal=self._acc.max() arr=np.expand_dims(np.uint8(255*self._acc//maxVal),axis=2) img=np.concatenate((arr,arr,arr),axis=2) (h,w)=img.shape[:2] for x in range(0,w,4): # y axis img[h//2,x]=[255,255,255] for y in range(0,h,4): img[y,w//2]=[255,255,255] return img def _drawLine(self,img,alpha,beta,peaks,colorKey): colors=[[0,255,255],[255,0,255],[255,255,0]] color=colors[colorKey] (h,w)=img.shape[:2] keys=self._readLineKeys(alpha,beta) for (y,x) in keys: if x%3!=0: continue if y<0 or y>=h: continue img[y,x]=color for k in peaks: (y,x)=keys[k] cv.drawMarker(img,(x,y),color,cv.MARKER_TILTED_CROSS,8) class Accumulator: NEIGHBOURHOOD=2 def __init__(self): self._acc=[] self._hits=[] def add(self,line): (d,k)=self._findClosest(line) if d<=self.NEIGHBOURHOOD: self._acc=self._averageLines(self._acc[k],line,self._hits[k],1) self._hits[k]+=1 else: k=-1 self._acc.append(line) self._hits.append(1) return (self._hits[k],k) def pop(self,k): acc=self._acc (acc[k],acc[-1])=(acc[-1],acc[k]) hits=self._hits (hits[k],hits[-1])=(hits[-1],hits[k]) hits.pop() return acc.pop() def _findClosest(self,line): def dist(p,q): alpha=p.alpha*180/math.pi beta=q.alpha*180/math.pi gamma=abs(alpha-beta) if gamma>180: gamma=360-gamma return math.sqrt(gamma**2+(p.d-q.d)**2) (d,key)=min(zip((dist(line,p) for p in self._acc), range(len(self._acc)))) return (d,key) def _averageLines(self,ab,cd,w1,w2): w=w1+w2 (a,b)=ab.toPoints() (c,d)=cd.toPoints() e=(a*w1+c*w2)/w f=(b*w1+c*w2)/w return Line.fromPoints(e,f) class RandomizedHoughTransform: HIT_LIMIT=10 CANDIDATE_LIMIT=10 MIN_SCORE=50 def __init__(self,img): self._img=np.copy(img) (self._h,self._w)=img.shape[:2] self._acc=Accumulator() self._candidates=[] self._res=[] def _sampleLine(self): """:return: (Line) p""" a=self._chooseRandomPixel() b=self._chooseRandomPixel() while b==a: b=self._chooseRandomPixel() return Line.fromPoints(a,b) def _updateAcc(self,line): (hits,k)=self._acc.add(line) if hits>=self.HIT_LIMIT: self._addCandidate(self._acc.pop(k)) def _addCandidate(self,line): self._candidates.append(line) if len(self._candidates)>=self.CANDIDATE_LIMIT: for p in self._candidates: p_=self._confirmLine(p) if p_: self._res.append(p_) self._candidates=[] def _chooseRandomPixel(self): val=0 while not val: x=random.randrange(0,self._w) y=random.randrange(0,self._h) val=self._img[y,x] return EPoint(x,y) def _confirmLine(self,line): score=0 for point in self._walkLine(line): if self._img[point]==1: score+=1 if score>self.MIN_SCORE: for point in self._walkLine(line): # erase the line self._img[point]=0 return line else: return None def _walkLine(self,line): (a,b,c)=line.toNormal() if abs(line.alpha-math.pi/2)<math.pi/4 or abs(line.alpha-3*math.pi/2)<math.pi/4: # vertical normal ~ horizontal line for x in range(self._w): y=int((-c-a*x)/b) if 0<=y<self.h: yield (y,x) else: # a predominantly vertical line for y in range(self._h): x=int((-c-b*y)/a) if 0<=x<self.w: yield (y,x) def show(img,filename="x"): if cfg.INTERACTIVE: cv.imshow(filename,img) cv.waitKey(0) cv.destroyAllWindows() else: d=int(datetime.now().timestamp()) path=os.path.join(cfg.imgDir,"{0} {1:03} {2}.png".format(d,cfg.i,filename)) cfg.i+=1 cv.imwrite(path,img) def filterVert(edges): kernel = np.array([[1,0,1],[1,0,1],[1,0,1]],np.uint8) edges = cv.erode(edges,kernel) kernel=np.array([[0,1,0],[0,1,0],[0,1,0]],np.uint8) edges=cv.dilate(edges,kernel) return edges def filterHor(edges): kernel = np.array([[1,1,1],[0,0,0],[1,1,1]],np.uint8) edges = cv.erode(edges,kernel) kernel=np.array([[0,0,0],[1,1,1],[0,0,0]],np.uint8) edges=cv.dilate(edges,kernel) return edges def filterDiag(edges): kernel = np.array([[0,0,1],[1,0,0],[0,1,0]],np.uint8) edges1 = cv.erode(edges,kernel) kernel=np.array([[1,0,0],[0,1,0],[0,0,1]],np.uint8) edges1=cv.dilate(edges1,kernel) kernel = np.array([[0,1,0],[1,0,0],[0,0,1]],np.uint8) edges2 = cv.erode(edges,kernel) kernel=np.array([[0,0,1],[0,1,0],[1,0,0]],np.uint8) edges2=cv.dilate(edges2,kernel) return edges1+edges2 def prepareEdgeImg(img): gray=cv.cvtColor(img,cv.COLOR_BGR2GRAY) show(gray,"greyscale image") edges=cv.Canny(gray,70,130) show(edges,"Canny edge detector") edges=filterHor(edges)+filterVert(edges)+filterDiag(edges) show(edges,"kernel filtered edges") return edges