diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index ecc010e70..713c65280 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -8,6 +8,7 @@ import platform import shutil import subprocess import threading +import concurrent.futures import time import traceback import zipfile @@ -35,6 +36,21 @@ class SystemDeviceState(TypedDict): value: any +class StreamPipeReader: + def __init__(self, conn: multiprocessing.connection.Connection) -> None: + self.conn = conn + self.executor = concurrent.futures.ThreadPoolExecutor() + + def readBlocking(self, n): + b = bytes(0) + while len(b) < n: + self.conn.poll() + b += os.read(self.conn.fileno(), n - len(b)) + return b + + async def read(self, n): + return await asyncio.get_event_loop().run_in_executor(self.executor, lambda: self.readBlocking(n)) + class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager): def __init__(self, api: Any, systemState: Mapping[str, Mapping[str, SystemDeviceState]]) -> None: super().__init__() @@ -467,14 +483,17 @@ class PluginRemote: async def getFork(): fd = os.dup(parent_conn.fileno()) - forkPeer, readLoop = await rpc_reader.prepare_peer_readloop(self.loop, fd, fd) + reader = StreamPipeReader(parent_conn) + forkPeer, readLoop = await rpc_reader.prepare_peer_readloop(self.loop, reader = reader, writeFd = fd) forkPeer.peerName = 'thread' async def forkReadLoop(): try: await readLoop() except: + traceback.print_exc() print('fork read loop exited') - pass + finally: + reader.executor.shutdown() asyncio.run_coroutine_threadsafe(forkReadLoop(), loop=self.loop) getRemote = await forkPeer.getParam('getRemote') remote: PluginRemote = await getRemote(self.api, self.pluginId, self.hostInfo) @@ -563,13 +582,15 @@ class PluginRemote: async def getServicePort(self, name): pass -async def plugin_async_main(loop: AbstractEventLoop, readFd: int, writeFd: int): - peer, readLoop = await rpc_reader.prepare_peer_readloop(loop, readFd, writeFd) +async def plugin_async_main(loop: AbstractEventLoop, readFd: int = None, writeFd: int = None, reader: asyncio.StreamReader = None, writer: asyncio.StreamWriter = None): + peer, readLoop = await rpc_reader.prepare_peer_readloop(loop, readFd=readFd, writeFd=writeFd, reader=reader, writer=writer) peer.params['print'] = print peer.params['getRemote'] = lambda api, pluginId, hostInfo: PluginRemote(peer, api, pluginId, hostInfo, loop) async def get_update_stats(): update_stats = await peer.getParam('updateStats') + if not update_stats: + return def stats_runner(): ptime = round(time.process_time() * 1000000) @@ -601,10 +622,14 @@ async def plugin_async_main(loop: AbstractEventLoop, readFd: int, writeFd: int): asyncio.run_coroutine_threadsafe(get_update_stats(), loop) - await readLoop() + try: + await readLoop() + finally: + if reader and hasattr(reader, 'executor'): + r: StreamPipeReader = reader + r.executor.shutdown() - -def main(readFd: int, writeFd: int): +def main(readFd: int = None, writeFd: int = None, reader: asyncio.StreamReader = None, writer: asyncio.StreamWriter = None): loop = asyncio.new_event_loop() def gc_runner(): @@ -612,10 +637,10 @@ def main(readFd: int, writeFd: int): loop.call_later(10, gc_runner) gc_runner() - loop.run_until_complete(plugin_async_main(loop, readFd, writeFd)) + loop.run_until_complete(plugin_async_main(loop, readFd=readFd, writeFd=writeFd, reader=reader, writer=writer)) loop.close() -def plugin_main(readFd: int, writeFd: int): +def plugin_main(readFd: int = None, writeFd: int = None, reader: asyncio.StreamReader = None, writer: asyncio.StreamWriter = None): try: import gi gi.require_version('Gst', '1.0') @@ -624,17 +649,18 @@ def plugin_main(readFd: int, writeFd: int): loop = GLib.MainLoop() - worker = threading.Thread(target=main, args=(readFd, writeFd), name="asyncio-main") + worker = threading.Thread(target=main, args=(readFd, writeFd, reader, writer), name="asyncio-main") worker.start() loop.run() except: - main(readFd, writeFd) + main(readFd=readFd, writeFd=writeFd, reader=reader, writer=writer) def plugin_fork(conn: multiprocessing.connection.Connection): fd = os.dup(conn.fileno()) - plugin_main(fd, fd) + reader = StreamPipeReader(conn) + plugin_main(reader=reader, writeFd=fd) if __name__ == "__main__": plugin_main(3, 4) diff --git a/server/src/plugin/media.ts b/server/src/plugin/media.ts index 8e36fe884..8ecc66955 100644 --- a/server/src/plugin/media.ts +++ b/server/src/plugin/media.ts @@ -385,7 +385,7 @@ export abstract class MediaManagerBase implements MediaManager { node[candidateId] = inputWeight + outputWeight; } catch (e) { - console.warn(converter.name, 'skipping converter due to error', e) + console.warn(candidate.name, 'skipping converter due to error', e) } }