From 99995ea88295ffdee63ae8fef816a3554a3121fa Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Sat, 25 Mar 2023 09:27:04 -0700 Subject: [PATCH] server: start watchdog/stats after plugin dependency installation completes --- server/package-lock.json | 4 +-- server/python/plugin_remote.py | 30 +++++++++++------------ server/src/plugin/plugin-remote-worker.ts | 12 ++++----- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/server/package-lock.json b/server/package-lock.json index 21f7d8a4f..732a6d31e 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/server", - "version": "0.7.26", + "version": "0.7.28", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/server", - "version": "0.7.26", + "version": "0.7.28", "license": "ISC", "dependencies": { "@mapbox/node-pre-gyp": "^1.0.10", diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index 6ab4ae27e..7dbe0f6d0 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -218,6 +218,7 @@ class PluginRemote: consoles: Mapping[str, Future[Tuple[StreamReader, StreamWriter]]] = {} def __init__(self, peer: rpc.RpcPeer, api, pluginId, hostInfo, loop: AbstractEventLoop): + self.allMemoryStats = {} self.peer = peer self.api = api self.pluginId = pluginId @@ -446,6 +447,8 @@ class PluginRemote: self.deviceManager = DeviceManager(self.nativeIds, self.systemManager) self.mediaManager = MediaManager(await self.api.getMediaManager()) + await self.start_stats_runner() + try: from scrypted_sdk import sdk_init2 # type: ignore @@ -479,7 +482,7 @@ class PluginRemote: forkPeer.peerName = 'thread' async def updateStats(stats): - allMemoryStats[forkPeer] = stats + self.allMemoryStats[forkPeer] = stats forkPeer.params['updateStats'] = updateStats async def forkReadLoop(): @@ -489,7 +492,7 @@ class PluginRemote: # traceback.print_exc() print('fork read loop exited') finally: - allMemoryStats.pop(forkPeer) + self.allMemoryStats.pop(forkPeer) parent_conn.close() rpcTransport.executor.shutdown() asyncio.run_coroutine_threadsafe(forkReadLoop(), loop=self.loop) @@ -580,16 +583,8 @@ class PluginRemote: async def getServicePort(self, name): pass - -allMemoryStats = {} - -async def plugin_async_main(loop: AbstractEventLoop, rpcTransport: rpc_reader.RpcTransport): - peer, readLoop = await rpc_reader.prepare_peer_readloop(loop, rpcTransport) - peer.params['print'] = print - peer.params['getRemote'] = lambda api, pluginId, hostInfo: PluginRemote(peer, api, pluginId, hostInfo, loop) - - async def get_update_stats(): - update_stats = await peer.getParam('updateStats') + async def start_stats_runner(self): + update_stats = await self.peer.getParam('updateStats') if not update_stats: print('host did not provide update_stats') return @@ -608,7 +603,7 @@ async def plugin_async_main(loop: AbstractEventLoop, rpcTransport: rpc_reader.Rp except: heapTotal = 0 - for _, stats in allMemoryStats.items(): + for _, stats in self.allMemoryStats.items(): ptime += stats['cpu']['user'] heapTotal += stats['memoryUsage']['heapTotal'] @@ -621,12 +616,15 @@ async def plugin_async_main(loop: AbstractEventLoop, rpcTransport: rpc_reader.Rp 'heapTotal': heapTotal, }, } - asyncio.run_coroutine_threadsafe(update_stats(stats), loop) - loop.call_later(10, stats_runner) + asyncio.run_coroutine_threadsafe(update_stats(stats), self.loop) + self.loop.call_later(10, stats_runner) stats_runner() - asyncio.run_coroutine_threadsafe(get_update_stats(), loop) +async def plugin_async_main(loop: AbstractEventLoop, rpcTransport: rpc_reader.RpcTransport): + peer, readLoop = await rpc_reader.prepare_peer_readloop(loop, rpcTransport) + peer.params['print'] = print + peer.params['getRemote'] = lambda api, pluginId, hostInfo: PluginRemote(peer, api, pluginId, hostInfo, loop) try: await readLoop() diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index f9d8673ad..bd10c34f1 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -39,12 +39,6 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const { getDeviceConsole, getMixinConsole } = prepareConsoles(() => peer.selfName, () => systemManager, () => deviceManager, getPlugins); - // process.cpuUsage is for the entire process. - // process.memoryUsage is per thread. - const allMemoryStats = new Map(); - - peer.getParam('updateStats').then(updateStats => startStatsUpdater(allMemoryStats, updateStats)); - let replPort: Promise; let _pluginConsole: Console; @@ -240,6 +234,12 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe await installOptionalDependencies(getPluginConsole(), packageJson); + // process.cpuUsage is for the entire process. + // process.memoryUsage is per thread. + const allMemoryStats = new Map(); + // start the stats updater/watchdog after installation has finished, as that may take some time. + peer.getParam('updateStats').then(updateStats => startStatsUpdater(allMemoryStats, updateStats)); + const main = pluginReader('main.nodejs.js'); pluginReader = undefined; const script = main.toString();