Files
@ d5d8fe073c1c
Branch filter:
Location: OneEye/src/video.py - annotation
d5d8fe073c1c
1.5 KiB
text/x-python
capturing video with openCV
d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c d5d8fe073c1c | import time
import threading
import cv2 as cv
class VideoCapture:
def __init__(self, video_source=0):
self._ownMessages=None
self._coreMessages=None
self._shutdown=False
self._vid = cv.VideoCapture(video_source)
if not self._vid.isOpened():
raise ValueError("Unable to open video source", video_source)
self.width = self._vid.get(cv.CAP_PROP_FRAME_WIDTH)
self.height = self._vid.get(cv.CAP_PROP_FRAME_HEIGHT)
def getFrame(self):
if self._vid.isOpened():
(res,frame) = self._vid.read()
if res:
return (res, cv.cvtColor(frame, cv.COLOR_BGR2RGB))
else:
return (res,None)
else:
return (False,None)
def shutDown(self):
self._ownMessages.send("!kill",("video",))
self._shutdown=True
def __call__(self,ownMessages,coreMessages):
self._ownMessages=ownMessages
self._coreMessages=coreMessages
self.listenerThread=threading.Thread(target=lambda: ownMessages.listen(self._handleEvent))
self.listenerThread.start()
t=0
while not self._shutdown:
self._vid.set(cv.CAP_PROP_POS_MSEC,t*1000)
(res,frame)=self.getFrame()
if res: self._coreMessages.send("putFrame",(frame,))
time.sleep(1)
t+=1
def __del__(self):
if self._vid.isOpened():
self._vid.release()
def _handleEvent(self,e):
actions={
"shutDown": self.shutDown
}
(actionName,args,kwargs)=e
return actions[actionName](*args,**kwargs)
def capVideo(stream,ownMessages,coreMessages):
v=VideoCapture(stream)
v(ownMessages,coreMessages)
|