From a19d916ef09fb6200a70e3965ccde620aef91e55 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Tue, 14 Mar 2023 09:16:08 -0700 Subject: [PATCH] python-codecs: improve memory management --- plugins/python-codecs/src/gstreamer.py | 9 ++--- plugins/python-codecs/src/main.py | 55 +++++++++++++++----------- 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/plugins/python-codecs/src/gstreamer.py b/plugins/python-codecs/src/gstreamer.py index 35bfceb5f..63089ff2a 100644 --- a/plugins/python-codecs/src/gstreamer.py +++ b/plugins/python-codecs/src/gstreamer.py @@ -16,7 +16,7 @@ except: class Callback: def __init__(self, callback) -> None: - self.loop = asyncio.get_event_loop() + self.loop = asyncio.get_running_loop() self.callback = callback def createPipelineIterator(pipeline: str): @@ -27,7 +27,7 @@ def createPipelineIterator(pipeline: str): def on_bus_message(bus, message): t = str(message.type) - print(t) + # print(t) if t == str(Gst.MessageType.EOS): finish() elif t == str(Gst.MessageType.WARNING): @@ -57,7 +57,7 @@ def createPipelineIterator(pipeline: str): bus.add_signal_watch() finished = concurrent.futures.Future() - finished.add_done_callback(lambda _: threading.Thread(target=stopGst).start()) + finished.add_done_callback(lambda _: threading.Thread(target=stopGst, name="StopGst").start()) hasFinished = False appsink = gst.get_by_name('appsink') @@ -84,7 +84,7 @@ def createPipelineIterator(pipeline: str): yieldFuture.set_result(None) finally: finish() - print('finished') + print('gstreamer finished') def on_new_sample(sink, preroll): @@ -96,7 +96,6 @@ def createPipelineIterator(pipeline: str): if not callback.callback or hasFinished: hasFinished = True if callback.callback: - print('erpasd') asyncio.run_coroutine_threadsafe(callback.callback(None), loop = callback.loop) return Gst.FlowReturn.OK diff --git a/plugins/python-codecs/src/main.py b/plugins/python-codecs/src/main.py index 1495c1917..caf26265d 100644 --- a/plugins/python-codecs/src/main.py +++ b/plugins/python-codecs/src/main.py @@ -7,6 +7,7 @@ from urllib.parse import urlparse import pyvips import threading import traceback +import concurrent.futures try: import gi @@ -17,8 +18,11 @@ try: except: pass +vipsExecutor = concurrent.futures.ThreadPoolExecutor(max_workers=2, thread_name_prefix="vips") + async def to_thread(f): - return await asyncio.to_thread(f) + loop = asyncio.get_running_loop() + return await loop.run_in_executor(vipsExecutor, f) class VipsImage(scrypted_sdk.VideoFrame): def __init__(self, vipsImage: pyvips.Image) -> None: @@ -47,15 +51,18 @@ class VipsImage(scrypted_sdk.VideoFrame): return await to_thread(lambda: vipsImage.vipsImage.write_to_buffer('.' + options['format'])) async def toVipsImage(self, options: scrypted_sdk.ImageOptions = None): - return await to_thread(lambda: toVipsImage(self.vipsImage, options)) + return await to_thread(lambda: toVipsImage(self, options)) async def toImage(self, options: scrypted_sdk.ImageOptions = None) -> Any: - if options and options['format']: + if options and options.get('format', None): raise Exception('format can only be used with toBuffer') newVipsImage = await self.toVipsImage(options) return await createVipsMediaObject(newVipsImage) -def toVipsImage(vipsImage: pyvips.Image, options: scrypted_sdk.ImageOptions = None) -> VipsImage: +def toVipsImage(vipsImageWrapper: VipsImage, options: scrypted_sdk.ImageOptions = None) -> VipsImage: + vipsImage = vipsImageWrapper.vipsImage + if not vipsImage: + raise Exception('Video Frame has been invalidated') options = options or {} crop = options.get('crop') if crop: @@ -117,28 +124,28 @@ class PythonCodecs(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.VideoFrameGener videosrc += ' ! decodebin ! queue leaky=downstream max-size-buffers=0 ! videoconvert ! video/x-raw,format=RGB' - try: - gst, gen = createPipelineIterator(videosrc) - async for gstsample in gen(): - caps = gstsample.get_caps() - height = caps.get_structure(0).get_value('height') - width = caps.get_structure(0).get_value('width') - gst_buffer = gstsample.get_buffer() - result, info = gst_buffer.map(Gst.MapFlags.READ) - if not result: - continue - + gst, gen = createPipelineIterator(videosrc) + async for gstsample in gen(): + caps = gstsample.get_caps() + height = caps.get_structure(0).get_value('height') + width = caps.get_structure(0).get_value('width') + gst_buffer = gstsample.get_buffer() + result, info = gst_buffer.map(Gst.MapFlags.READ) + if not result: + continue + + try: + # pyvips.Image.new_from_memory(info.data, width, height, 3, pyvips.BandFormat.UCHAR) + vips = pyvips.Image.new_from_memory(info.data, width, height, 3, pyvips.BandFormat.UCHAR) + vipsImage = VipsImage(vips) try: - # pyvips.Image.new_from_memory(info.data, width, height, 3, pyvips.BandFormat.UCHAR) - vips = pyvips.Image.new_from_memory(info.data, width, height, 3, pyvips.BandFormat.UCHAR) - vipsImage = await createVipsMediaObject(VipsImage(vips)) - yield vipsImage + mo = await createVipsMediaObject(VipsImage(vips)) + yield mo finally: - gst_buffer.unmap(info) - except: - traceback.print_exc() - finally: - print('done!') + vipsImage.vipsImage.invalidate() + vipsImage.vipsImage = None + finally: + gst_buffer.unmap(info) def create_scrypted_plugin(): return PythonCodecs()