Files
@ d954258c9613
Branch filter:
Location: OneEye/src/video.py - annotation
d954258c9613
1.6 KiB
text/x-python
color sampler extended to mark corners
d5d8fe073c1c d5d8fe073c1c dc1ed8ff56ef d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c dc1ed8ff56ef dc1ed8ff56ef d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c dc1ed8ff56ef dc1ed8ff56ef d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c dc1ed8ff56ef dc1ed8ff56ef d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c dc1ed8ff56ef d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c dc1ed8ff56ef d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c | import time
import threading
import logging
import cv2 as cv
log=logging.getLogger(__name__)
class VideoCapture:
def __init__(self, video_source=0):
self._ownMessages=None
self._coreMessages=None
self._shutdown=False
self._vid = cv.VideoCapture(video_source)
if not self._vid.isOpened():
raise ValueError("Unable to open video source", video_source)
self.width = self._vid.get(cv.CAP_PROP_FRAME_WIDTH)
self.height = self._vid.get(cv.CAP_PROP_FRAME_HEIGHT)
def getFrame(self):
if self._vid.isOpened():
(res,frame) = self._vid.read()
if res:
return (res, cv.cvtColor(frame, cv.COLOR_BGR2RGB))
else:
return (res,None)
else:
return (False,None)
def _shutDown(self):
log.info("Video proc exiting.")
self._ownMessages.send("!kill",("video",))
self._shutdown=True
def __call__(self,ownMessages,coreMessages):
self._ownMessages=ownMessages
self._coreMessages=coreMessages
self._listenerThread=threading.Thread(target=lambda: ownMessages.listen(self._handleEvent))
self._listenerThread.start()
t=0
while not self._shutdown:
self._vid.set(cv.CAP_PROP_POS_MSEC,t*1000)
(res,frame)=self.getFrame()
if res: self._coreMessages.send("putFrame",(frame,))
time.sleep(1)
t+=1
self._listenerThread.join()
def __del__(self):
if self._vid.isOpened():
self._vid.release()
def _handleEvent(self,e):
actions={
"shutDown": self._shutDown
}
(actionName,args,kwargs)=e
return actions[actionName](*args,**kwargs)
def capVideo(stream,ownMessages,coreMessages):
v=VideoCapture(stream)
v(ownMessages,coreMessages)
|