Changeset - dc1ed8ff56ef
[Not reviewed]
default
0 5 0
Laman - 6 years ago 2018-12-12 23:48:38

cleaned up and documented shutdown
5 files changed with 41 insertions and 46 deletions:
0 comments (0 inline, 0 general)
overview.md
Show inline comments
 
Processes and threads
 
=====================
 

	
 
Processes communicate through MsgQueues (a wrapper around multiprocessing.Queue). Each owns one incoming and holds any number of outcoming queues. The queue runs in one thread, reads messages and calls handlers on its owner.
 
Processes communicate through MsgQueues (a wrapper around multiprocessing.Queue).
 
Each owns one incoming and holds any number of outcoming queues.
 
The queue runs in one thread, reads messages and calls handlers on its owner.
 

	
 
A shutdown is initiated by closing the GUI.
 
It closes its MsgQueue and exits its mainloop.
 
End of GUI process is waited on by the Core.
 
It follows by sending VideoCapture process an "shutdown" order and then waits on its end.
 
The VideoCapture closes its MsgQueue and sets its main loop for a break with its _shutdown attribute.
 
Finally the Core closes its MsgQueue and exits. 
 

	
 
Core
 
----
 
* Main process, starts GUI and VideoCapture. Waits for them to finish.
 
* Runs ImageAnalyzer and StateBag.
 
* Messages API:
 
	* putFrame(frame)
 
	* setParams(params)
 
	* fetchFrame(key)
 
	* fetchParams(params)
 

	
 
GUI
 
---
 
* Presents data for the user, handles user input.
 

	
 
VideoCapture
 
------------
 
* Consumes a video stream, sending captured frames to Core.
 
\ No newline at end of file
src/core.py
Show inline comments
 
import multiprocessing
 
import threading
 
import logging
 

	
 
import PIL
 

	
 
import config as cfg
 
from util import MsgQueue
 
from gui import gui
 
from analyzer import ImageAnalyzer
 
from analyzer.framecache import FrameCache
 
from go.core import Go, isLegalPosition
 
from statebag import StateBag
 
from video import capVideo
 

	
 
log=logging.getLogger(__name__)
 

	
 

	
 
class Core:
 
	def __init__(self):
 
		self.grid=None
 
		self.go=Go()
 
		self.detector=ImageAnalyzer()
 
		self._cache=FrameCache()
 
		self.states=StateBag()
 

	
 
		self._ownMessages=MsgQueue(self._handleEvent)
 
		self._guiMessages=MsgQueue()
 
		self._vidMessages=MsgQueue()
 
		self._ownMessages=MsgQueue("Core",self._handleEvent)
 
		self._guiMessages=MsgQueue("GUI")
 
		self._vidMessages=MsgQueue("Video")
 
		self._listenerThread=None
 

	
 
		self._frame=None
 

	
 
		self._guiProc=multiprocessing.Process(name="gui", target=gui, args=(self._guiMessages,self._ownMessages))
 
		self._guiProc.start()
 
		self._vidProc=multiprocessing.Process(
 
			name="video",
 
			target=capVideo,
 
			args=(cfg.misc.video, self._vidMessages,self._ownMessages)
 
		)
 
		self._vidProc.start()
 

	
 
	def fetchFrame(self,key):
 
		frame=self._cache.get(key)
 
		if frame is None:
 
			(key,frame)=self._cache.getRelative(10)
 
		self._guiMessages.send("setFrame", (frame.copy(), gui.PREVIEW, key))
 

	
 
	def putFrame(self,frame):
 
		self._frame=PIL.Image.fromarray(frame)
 
		k=self._cache.put(self._frame)
 
		self._guiMessages.send("setFrame", (self._frame, gui.RECORDING, k))
 
		self.analyze()
 

	
 
	def sendParams(self):
 
		self._guiMessages.send("setParams",(self.detector.params.copy(),))
 

	
 
	def setParams(self,params):
 
		self.detector.setParams(params)
 

	
 
	def analyze(self):
 
		if self.detector.analyze(self._frame):
 
			if isLegalPosition(self.detector.board):
 
				state=self.states.pushState(self.detector.board)
 
				rec=[]
 
				if state:
 
					rec=state.exportRecord()
 
					log.debug("progressive game record: %s",rec)
 
				self._guiMessages.send("setGameState", (self.detector.board,rec))
 

	
 
				self.go.transitionMove(self.detector.board)
 
				log.debug("conservative game record: %s",self.go._record)
 
			else:
 
				log.info("illegal position detected")
 

	
 
	def listen(self):
 
		listenerThread=threading.Thread(target=lambda: self._ownMessages.listen())
 
		listenerThread.start()
 
		self._listenerThread=threading.Thread(target=lambda: self._ownMessages.listen())
 
		self._listenerThread.start()
 

	
 
	def joinChildren(self):
 
		self._guiProc.join()
 
		self._vidMessages.send("shutDown")
 
		self._vidProc.join()
 
		self._ownMessages.send("!kill",("core",))
 
		self._listenerThread.join()
 
		log.info("Core exiting.")
 

	
 
	def _handleEvent(self,e):
 
		actions={
 
			"fetchFrame": self.fetchFrame,
 
			"putFrame": self.putFrame,
 
			"fetchParams": self.sendParams,
 
			"setParams": self.setParams
 
		}
 
		(actionName,args,kwargs)=e
 

	
 
		return actions[actionName](*args,**kwargs)
 

	
 
if __name__=="__main__":
 
	core=Core()
 
	core.listen()
 
	log.info("OneEye started.")
 
	core.joinChildren()
 
	log.info("Exited correctly.")
 

	
 
"""
 
core
 
====
 
grid
 
go
 
imageAnalyzer
 

	
 

	
 
gui
 
===
 
corners
 

	
 
a) keeps references to important objects and uses them
 
b) gets and sets all relevant data through method calls with core
 

	
 
GUI
 
<- addCorner(corner)
 
-> redrawImgView(img,grid)
 
<- refreshTresholds(tresB,tresW)
 

	
 
BoardView
 
-> redrawState(go)
 

	
 

	
 
core-gui: just pass messages with relevant data (!! always pass object copies, don't share instances)
 
"""
 
\ No newline at end of file
 
	log.info("OneEye done.")
src/gui/__init__.py
Show inline comments
 
import logging
 
import threading
 
import tkinter as tk
 

	
 
import config
 
from analyzer import ImageAnalyzer
 
from .mainwindow import MainWindow
 
from .boardview import BoardView
 
from .settings import Settings
 

	
 
log=logging.getLogger(__name__)
 

	
 

	
 
class GUI:
 
	SETUP=PREVIEW=0
 
	RECORDING=REAL=1
 

	
 
	def __init__(self):
 
		self.root = tk.Tk()
 
		self.root.title("OneEye {0}.{1}.{2}".format(*config.misc.version))
 
		self.root.option_add('*tearOff',False) # for menu
 

	
 
		self.detector=ImageAnalyzer()
 
		self._frameKey=0
 

	
 
		self._ownMessages=None
 
		self._coreMessages=None
 

	
 
		self._state=GUI.SETUP
 

	
 
		self.mainWindow = MainWindow(self, master=self.root)
 
		self.settings=None
 
		self.root.columnconfigure(0,weight=1)
 
		self.root.rowconfigure(0,weight=1)
 

	
 
		self.root.bind("<<redrawImgView>>", lambda e: self.mainWindow.redrawImgView())
 
		self.root.bind("<<setUp>>", lambda e: self.setUp())
 
		self.root.bind("<<setRecording>>", lambda e: self.setRecording())
 
		self.root.bind("<F12>",lambda e: Settings(self))
 
		self.mainWindow.bind("<Destroy>",lambda e: self._ownMessages.send("!kill",("gui",)))
 
		self.mainWindow.bind("<Destroy>",lambda e: self._shutDown())
 

	
 
		self.setUp()
 

	
 
	def __call__(self,ownMessages,coreMessages):
 
		self._ownMessages=ownMessages
 
		self._coreMessages=coreMessages
 

	
 
		self.listenerThread=threading.Thread(target=lambda: ownMessages.listen(self._handleEvent))
 
		self.listenerThread.start()
 
		self._listenerThread=threading.Thread(target=lambda: ownMessages.listen(self._handleEvent))
 
		self._listenerThread.start()
 

	
 
		self.mainWindow.mainloop()
 

	
 
	def sendMsg(self,actionName,args=tuple(),kwargs=None):
 
		self._coreMessages.send(actionName,args,kwargs)
 

	
 
	def setUp(self):
 
		self.mainWindow.setUp()
 
		self.root.bind("<Left>",lambda e: self.sendMsg("prevFrame"))
 
		self.root.bind("<Right>",lambda e: self.sendMsg("nextFrame"))
 

	
 
	def setRecording(self):
 
		self.mainWindow.setRecording()
 
		self.root.bind("<Left>",lambda e: None)
 
		self.root.bind("<Right>",lambda e: None)
 
		if self.settings:
 
			self.settings.destroy()
 
			self.settings=None
 
		self.sendParams()
 

	
 
	def sendParams(self):
 
		self.sendMsg("setParams",(self.detector.params.copy(),))
 

	
 
	def _shutDown(self):
 
		log.info("GUI proc exiting.")
 
		self._ownMessages.send("!kill",("gui",))
 
		self._listenerThread.join()
 

	
 
	def _handleEvent(self,e):
 
		actions={
 
			"setFrame": self._frameHandler,
 
			"setGameState": self._stateHandler,
 
			"setParams": self._paramsHandler
 
		}
 
		(actionName,args,kwargs)=e
 

	
 
		return actions[actionName](*args,**kwargs)
 

	
 
	def _frameHandler(self,frame,type,key):
 
		if self._state!=type and self.mainWindow.imgView.isSet():
 
			log.info("ignored setFrame event, wrong type")
 
			return
 
		self._frameKey=key
 
		self.mainWindow.setFrame(frame)
 
		self.root.event_generate("<<redrawImgView>>")
 

	
 
	def _stateHandler(self,gameState,moves):
 
		labels={(row,col):(i+1) for (i,(c,row,col)) in enumerate(moves)}
 
		self.mainWindow.boardView.redrawState(gameState,labels)
 

	
 
	def _paramsHandler(self,params):
 
		if not self.settings:
 
			log.warning("received 'setParams' message while settings is '%s'",str(self.settings))
 
			return
 
		self.detector.setParams(params)
 
		self.settings.setParams(params)
 

	
 

	
 
gui=GUI()
 

	
 
"""
 
# setup #
 
* we can click around the ImgView
 
* we can walk through the frames back and forth
 
* BoardView is showing what the reading of ImgView _would_ be
 
* core is reading and analyzing frames, pushing results to StateBag, but not showing them
 

	
 
# recording #
 
* ImgView is showing the current picture, is not clickable
 
* BoardView is showing last detected position
 
* on switch to recording (if parameters have changed):
 
	* feed analyzer new parameters and start using them
 
	* in the background reanalyze cached frames with the new parameters and merge them into StateBag
 
"""
src/util.py
Show inline comments
 
import random
 
import multiprocessing
 
import logging
 

	
 
log=logging.getLogger(__name__)
 

	
 
EMPTY=0
 
BLACK=1
 
WHITE=-1
 

	
 
colorNames={BLACK:"B",WHITE:"W"}
 

	
 

	
 
class MsgQueue:
 
	def __init__(self,handler=None):
 
	def __init__(self,name,handler=None):
 
		self.name=name
 
		self._queue=multiprocessing.Queue()
 
		self._event=multiprocessing.Event()
 
		self._handleEvent=handler
 

	
 
	def send(self,actionName,args=tuple(),kwargs=None):
 
		if kwargs is None: kwargs=dict()
 
		self._queue.put((actionName,args,kwargs))
 
		self._event.set()
 

	
 
	def listen(self,handleEvent=None):
 
		if handleEvent is not None: self._handleEvent=handleEvent
 

	
 
		while True:
 
			self._event.wait()
 
			msg=self._queue.get()
 
			if self._queue.empty():
 
				self._event.clear()
 
			log.info(msg)
 
			if msg[0]=="!kill":
 
				self._queue.cancel_join_thread()
 
				break
 
			log.info(msg if msg[0]!="putFrame" else "('putFrame', ..., {})")
 
			if msg[0]=="!kill": break
 
			self._handleEvent(msg)
 
		log.info("%s MsgQueue exiting.",self.name)
 

	
 
	def setHandler(self,handler):
 
		self._handleEvent=handler
 

	
 

	
 
rand=random.Random()
 
rand.seed(361)
 
zobristNums=tuple(
 
	tuple(
 
		tuple(rand.getrandbits(32) for i in range(3)) for c in range(19)
 
	) for r in range(19)
 
)
 

	
 
def hashBoard(board):
 
	res=0
 
	for (r,row) in enumerate(board):
 
		for (c,item) in enumerate(row):
 
			res^=zobristNums[r][c][item+1]
 
	return res
 

	
 

	
 
def diffHash(r,c,oldItem,newItem):
 
	h=zobristNums[r][c]
 
	return h[oldItem+1] ^ h[newItem+1]
 

	
 

	
 
def exportBoard(board):
 
	substitutions={BLACK:"X", WHITE:"O"}
 
	template=[["."]*19 for r in range(19)]
 
	for r in range(3,19,6):
 
		for c in range(3,19,6):
 
			template[r][c]=","
 

	
 
	for (row,templateRow) in zip(board,template):
 
		for (c,x) in enumerate(row):
 
			if x!=EMPTY: templateRow[c]=substitutions[x]
 

	
 
	return "\n".join("".join(row) for row in template)
src/video.py
Show inline comments
 
import time
 
import threading
 
import logging
 

	
 
import cv2 as cv
 

	
 
log=logging.getLogger(__name__)
 

	
 

	
 
class VideoCapture:
 
	def __init__(self, video_source=0):
 
		self._ownMessages=None
 
		self._coreMessages=None
 
		self._shutdown=False
 
		
 
		self._vid = cv.VideoCapture(video_source)
 
		if not self._vid.isOpened():
 
			raise ValueError("Unable to open video source", video_source)
 

	
 
		self.width = self._vid.get(cv.CAP_PROP_FRAME_WIDTH)
 
		self.height = self._vid.get(cv.CAP_PROP_FRAME_HEIGHT)
 

	
 
	def getFrame(self):
 
		if self._vid.isOpened():
 
			(res,frame) = self._vid.read()
 
			if res:
 
				return (res, cv.cvtColor(frame, cv.COLOR_BGR2RGB))
 
			else:
 
				return (res,None)
 
		else:
 
			return (False,None)
 

	
 
	def shutDown(self):
 
	def _shutDown(self):
 
		log.info("Video proc exiting.")
 
		self._ownMessages.send("!kill",("video",))
 
		self._shutdown=True
 

	
 
	def __call__(self,ownMessages,coreMessages):
 
		self._ownMessages=ownMessages
 
		self._coreMessages=coreMessages
 

	
 
		self.listenerThread=threading.Thread(target=lambda: ownMessages.listen(self._handleEvent))
 
		self.listenerThread.start()
 
		self._listenerThread=threading.Thread(target=lambda: ownMessages.listen(self._handleEvent))
 
		self._listenerThread.start()
 

	
 
		t=0
 
		while not self._shutdown:
 
			self._vid.set(cv.CAP_PROP_POS_MSEC,t*1000)
 
			(res,frame)=self.getFrame()
 
			if res: self._coreMessages.send("putFrame",(frame,))
 
			time.sleep(1)
 
			t+=1
 
		self._listenerThread.join()
 

	
 
	def __del__(self):
 
		if self._vid.isOpened():
 
			self._vid.release()
 
			
 
	def _handleEvent(self,e):
 
		actions={
 
			"shutDown": self.shutDown
 
			"shutDown": self._shutDown
 
		}
 
		(actionName,args,kwargs)=e
 

	
 
		return actions[actionName](*args,**kwargs)
 

	
 

	
 
def capVideo(stream,ownMessages,coreMessages):
 
	v=VideoCapture(stream)
 
	v(ownMessages,coreMessages)
0 comments (0 inline, 0 general)