From 4e25aedbe738d05389241df91606d32335a3533a Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Tue, 14 Mar 2023 10:22:01 -0700 Subject: [PATCH] python-codecs: multiprocessing decode --- plugins/python-codecs/src/main.py | 108 ++++++++++++++++++------------ 1 file changed, 64 insertions(+), 44 deletions(-) diff --git a/plugins/python-codecs/src/main.py b/plugins/python-codecs/src/main.py index caf26265d..6ff32581c 100644 --- a/plugins/python-codecs/src/main.py +++ b/plugins/python-codecs/src/main.py @@ -102,50 +102,70 @@ class PythonCodecs(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.VideoFrameGener super().__init__(nativeId) async def generateVideoFrames(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame: - ffmpegInput: scrypted_sdk.FFmpegInput = await scrypted_sdk.mediaManager.convertMediaObjectToJSON(mediaObject, scrypted_sdk.ScryptedMimeTypes.FFmpegInput.value) - container = ffmpegInput.get('container', None) - videosrc = ffmpegInput.get('url') - videoCodec = optional_chain(ffmpegInput, 'mediaStreamOptions', 'video', 'codec') - - if videosrc.startswith('tcp://'): - parsed_url = urlparse(videosrc) - videosrc = 'tcpclientsrc port=%s host=%s' % ( - parsed_url.port, parsed_url.hostname) - if container == 'mpegts': - videosrc += ' ! tsdemux' - elif container == 'sdp': - videosrc += ' ! sdpdemux' - else: - raise Exception('unknown container %s' % container) - elif videosrc.startswith('rtsp'): - videosrc = 'rtspsrc buffer-mode=0 location=%s protocols=tcp latency=0 is-live=false' % videosrc - if videoCodec == 'h264': - videosrc += ' ! rtph264depay ! h264parse' - - videosrc += ' ! decodebin ! queue leaky=downstream max-size-buffers=0 ! videoconvert ! video/x-raw,format=RGB' - - gst, gen = createPipelineIterator(videosrc) - async for gstsample in gen(): - caps = gstsample.get_caps() - height = caps.get_structure(0).get_value('height') - width = caps.get_structure(0).get_value('width') - gst_buffer = gstsample.get_buffer() - result, info = gst_buffer.map(Gst.MapFlags.READ) - if not result: - continue - - try: - # pyvips.Image.new_from_memory(info.data, width, height, 3, pyvips.BandFormat.UCHAR) - vips = pyvips.Image.new_from_memory(info.data, width, height, 3, pyvips.BandFormat.UCHAR) - vipsImage = VipsImage(vips) - try: - mo = await createVipsMediaObject(VipsImage(vips)) - yield mo - finally: - vipsImage.vipsImage.invalidate() - vipsImage.vipsImage = None - finally: - gst_buffer.unmap(info) + worker = scrypted_sdk.fork() + forked = await worker.result + return await forked.generateVideoFrames(mediaObject, options, filter) def create_scrypted_plugin(): return PythonCodecs() + + +async def generateVideoFrames(mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame: + ffmpegInput: scrypted_sdk.FFmpegInput = await scrypted_sdk.mediaManager.convertMediaObjectToJSON(mediaObject, scrypted_sdk.ScryptedMimeTypes.FFmpegInput.value) + container = ffmpegInput.get('container', None) + videosrc = ffmpegInput.get('url') + videoCodec = optional_chain(ffmpegInput, 'mediaStreamOptions', 'video', 'codec') + + if videosrc.startswith('tcp://'): + parsed_url = urlparse(videosrc) + videosrc = 'tcpclientsrc port=%s host=%s' % ( + parsed_url.port, parsed_url.hostname) + if container == 'mpegts': + videosrc += ' ! tsdemux' + elif container == 'sdp': + videosrc += ' ! sdpdemux' + else: + raise Exception('unknown container %s' % container) + elif videosrc.startswith('rtsp'): + videosrc = 'rtspsrc buffer-mode=0 location=%s protocols=tcp latency=0 is-live=false' % videosrc + if videoCodec == 'h264': + videosrc += ' ! rtph264depay ! h264parse' + + videosrc += ' ! decodebin ! queue leaky=downstream max-size-buffers=0 ! videoconvert ! video/x-raw,format=RGB' + + gst, gen = createPipelineIterator(videosrc) + async for gstsample in gen(): + caps = gstsample.get_caps() + height = caps.get_structure(0).get_value('height') + width = caps.get_structure(0).get_value('width') + gst_buffer = gstsample.get_buffer() + result, info = gst_buffer.map(Gst.MapFlags.READ) + if not result: + continue + + try: + # pyvips.Image.new_from_memory(info.data, width, height, 3, pyvips.BandFormat.UCHAR) + vips = pyvips.Image.new_from_memory(info.data, width, height, 3, pyvips.BandFormat.UCHAR) + vipsImage = VipsImage(vips) + try: + mo = await createVipsMediaObject(VipsImage(vips)) + yield mo + finally: + vipsImage.vipsImage.invalidate() + vipsImage.vipsImage = None + finally: + gst_buffer.unmap(info) + +class CodecFork: + async def generateVideoFrames(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame: + try: + async for data in generateVideoFrames(mediaObject, options, filter): + yield data + finally: + import os + os._exit(os.EX_OK) + pass + + +async def fork(): + return CodecFork()