python-codecs: multiprocessing decode

This commit is contained in:
Koushik Dutta
2023-03-14 10:22:01 -07:00
parent 45bd3cbb7c
commit 4e25aedbe7

View File

@@ -102,50 +102,70 @@ class PythonCodecs(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.VideoFrameGener
super().__init__(nativeId)
async def generateVideoFrames(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
ffmpegInput: scrypted_sdk.FFmpegInput = await scrypted_sdk.mediaManager.convertMediaObjectToJSON(mediaObject, scrypted_sdk.ScryptedMimeTypes.FFmpegInput.value)
container = ffmpegInput.get('container', None)
videosrc = ffmpegInput.get('url')
videoCodec = optional_chain(ffmpegInput, 'mediaStreamOptions', 'video', 'codec')
if videosrc.startswith('tcp://'):
parsed_url = urlparse(videosrc)
videosrc = 'tcpclientsrc port=%s host=%s' % (
parsed_url.port, parsed_url.hostname)
if container == 'mpegts':
videosrc += ' ! tsdemux'
elif container == 'sdp':
videosrc += ' ! sdpdemux'
else:
raise Exception('unknown container %s' % container)
elif videosrc.startswith('rtsp'):
videosrc = 'rtspsrc buffer-mode=0 location=%s protocols=tcp latency=0 is-live=false' % videosrc
if videoCodec == 'h264':
videosrc += ' ! rtph264depay ! h264parse'
videosrc += ' ! decodebin ! queue leaky=downstream max-size-buffers=0 ! videoconvert ! video/x-raw,format=RGB'
gst, gen = createPipelineIterator(videosrc)
async for gstsample in gen():
caps = gstsample.get_caps()
height = caps.get_structure(0).get_value('height')
width = caps.get_structure(0).get_value('width')
gst_buffer = gstsample.get_buffer()
result, info = gst_buffer.map(Gst.MapFlags.READ)
if not result:
continue
try:
# pyvips.Image.new_from_memory(info.data, width, height, 3, pyvips.BandFormat.UCHAR)
vips = pyvips.Image.new_from_memory(info.data, width, height, 3, pyvips.BandFormat.UCHAR)
vipsImage = VipsImage(vips)
try:
mo = await createVipsMediaObject(VipsImage(vips))
yield mo
finally:
vipsImage.vipsImage.invalidate()
vipsImage.vipsImage = None
finally:
gst_buffer.unmap(info)
worker = scrypted_sdk.fork()
forked = await worker.result
return await forked.generateVideoFrames(mediaObject, options, filter)
def create_scrypted_plugin():
return PythonCodecs()
async def generateVideoFrames(mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
ffmpegInput: scrypted_sdk.FFmpegInput = await scrypted_sdk.mediaManager.convertMediaObjectToJSON(mediaObject, scrypted_sdk.ScryptedMimeTypes.FFmpegInput.value)
container = ffmpegInput.get('container', None)
videosrc = ffmpegInput.get('url')
videoCodec = optional_chain(ffmpegInput, 'mediaStreamOptions', 'video', 'codec')
if videosrc.startswith('tcp://'):
parsed_url = urlparse(videosrc)
videosrc = 'tcpclientsrc port=%s host=%s' % (
parsed_url.port, parsed_url.hostname)
if container == 'mpegts':
videosrc += ' ! tsdemux'
elif container == 'sdp':
videosrc += ' ! sdpdemux'
else:
raise Exception('unknown container %s' % container)
elif videosrc.startswith('rtsp'):
videosrc = 'rtspsrc buffer-mode=0 location=%s protocols=tcp latency=0 is-live=false' % videosrc
if videoCodec == 'h264':
videosrc += ' ! rtph264depay ! h264parse'
videosrc += ' ! decodebin ! queue leaky=downstream max-size-buffers=0 ! videoconvert ! video/x-raw,format=RGB'
gst, gen = createPipelineIterator(videosrc)
async for gstsample in gen():
caps = gstsample.get_caps()
height = caps.get_structure(0).get_value('height')
width = caps.get_structure(0).get_value('width')
gst_buffer = gstsample.get_buffer()
result, info = gst_buffer.map(Gst.MapFlags.READ)
if not result:
continue
try:
# pyvips.Image.new_from_memory(info.data, width, height, 3, pyvips.BandFormat.UCHAR)
vips = pyvips.Image.new_from_memory(info.data, width, height, 3, pyvips.BandFormat.UCHAR)
vipsImage = VipsImage(vips)
try:
mo = await createVipsMediaObject(VipsImage(vips))
yield mo
finally:
vipsImage.vipsImage.invalidate()
vipsImage.vipsImage = None
finally:
gst_buffer.unmap(info)
class CodecFork:
async def generateVideoFrames(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
try:
async for data in generateVideoFrames(mediaObject, options, filter):
yield data
finally:
import os
os._exit(os.EX_OK)
pass
async def fork():
return CodecFork()