From 69f4de66e901c41ff05ee9b2d9e874ceee7f0b30 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Wed, 20 Nov 2024 12:18:25 -0800 Subject: [PATCH] server: exit hooks for python fork --- sdk/package-lock.json | 4 +- sdk/package.json | 2 +- sdk/types/package-lock.json | 4 +- sdk/types/package.json | 2 +- .../scrypted_python/scrypted_sdk/__init__.py | 1 + .../scrypted_python/scrypted_sdk/types.py | 2 +- server/package-lock.json | 8 ++-- server/package.json | 2 +- server/python/plugin_remote.py | 38 ++++++++++++++++--- 9 files changed, 45 insertions(+), 18 deletions(-) diff --git a/sdk/package-lock.json b/sdk/package-lock.json index c1b66cc0d..bad8d2083 100644 --- a/sdk/package-lock.json +++ b/sdk/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/sdk", - "version": "0.3.84", + "version": "0.3.86", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/sdk", - "version": "0.3.84", + "version": "0.3.86", "license": "ISC", "dependencies": { "@babel/preset-typescript": "^7.26.0", diff --git a/sdk/package.json b/sdk/package.json index ada0f6caa..794ae5a3e 100644 --- a/sdk/package.json +++ b/sdk/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/sdk", - "version": "0.3.84", + "version": "0.3.86", "description": "", "main": "dist/src/index.js", "exports": { diff --git a/sdk/types/package-lock.json b/sdk/types/package-lock.json index 0185a53e2..8018a78f3 100644 --- a/sdk/types/package-lock.json +++ b/sdk/types/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/types", - "version": "0.3.77", + "version": "0.3.79", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/types", - "version": "0.3.77", + "version": "0.3.79", "license": "ISC", "devDependencies": { "@types/node": "^22.1.0", diff --git a/sdk/types/package.json b/sdk/types/package.json index c7e1c4ad4..ea4fdfe42 100644 --- a/sdk/types/package.json +++ b/sdk/types/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/types", - "version": "0.3.77", + "version": "0.3.79", "description": "", "main": "dist/index.js", "author": "", diff --git a/sdk/types/scrypted_python/scrypted_sdk/__init__.py b/sdk/types/scrypted_python/scrypted_sdk/__init__.py index a246ff64e..a8c303b43 100644 --- a/sdk/types/scrypted_python/scrypted_sdk/__init__.py +++ b/sdk/types/scrypted_python/scrypted_sdk/__init__.py @@ -9,6 +9,7 @@ import asyncio class PluginFork: result: asyncio.Task worker: Process + exit: asyncio.Task def terminate(self): pass diff --git a/sdk/types/scrypted_python/scrypted_sdk/types.py b/sdk/types/scrypted_python/scrypted_sdk/types.py index d9f48e3d2..047673e08 100644 --- a/sdk/types/scrypted_python/scrypted_sdk/types.py +++ b/sdk/types/scrypted_python/scrypted_sdk/types.py @@ -950,7 +950,7 @@ class TamperState(TypedDict): pass -TYPES_VERSION = "0.3.77" +TYPES_VERSION = "0.3.79" class AirPurifier: diff --git a/server/package-lock.json b/server/package-lock.json index c700737b8..2f9559e29 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -12,7 +12,7 @@ "dependencies": { "@scrypted/ffmpeg-static": "^6.1.0-build3", "@scrypted/node-pty": "^1.0.22", - "@scrypted/types": "^0.3.77", + "@scrypted/types": "^0.3.79", "adm-zip": "^0.5.16", "body-parser": "^1.20.3", "cookie-parser": "^1.4.7", @@ -557,9 +557,9 @@ } }, "node_modules/@scrypted/types": { - "version": "0.3.77", - "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.77.tgz", - "integrity": "sha512-wLMD5vYqTbAbNZ4+3n7BCuChxJTROidMiYmx5POzMGJfeOQ8Apmag6HpwZI1AlBEhVHPuhiB+hThOTcIF/xqmg==", + "version": "0.3.79", + "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.79.tgz", + "integrity": "sha512-o/Rlgd+F+f9Bmlb9oSl6/qsvNwEIbNQyBGdw/O3M2BmlzUZsSjoJv5gV23SyceEHRV6NLS3anSBKu1CPjKbTRw==", "license": "ISC" }, "node_modules/@types/adm-zip": { diff --git a/server/package.json b/server/package.json index 015780c5e..b3e8b2a5b 100644 --- a/server/package.json +++ b/server/package.json @@ -5,7 +5,7 @@ "dependencies": { "@scrypted/ffmpeg-static": "^6.1.0-build3", "@scrypted/node-pty": "^1.0.22", - "@scrypted/types": "^0.3.77", + "@scrypted/types": "^0.3.79", "adm-zip": "^0.5.16", "body-parser": "^1.20.3", "cookie-parser": "^1.4.7", diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index ec09b4213..27f082984 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -562,6 +562,12 @@ class PeerLiveness: async def waitKilled(self): await self.killed +def safe_set_result(fut: Future, result: Any): + try: + fut.set_result(result) + except: + pass + class PluginRemote: def __init__( self, clusterSetup: ClusterSetup, api, pluginId: str, hostInfo, loop: AbstractEventLoop @@ -834,13 +840,13 @@ class PluginRemote: pass asyncio.ensure_future(waitPeerLiveness(), loop=self.loop) - async def waitClusterForkResult(): + async def waitClusterForkKilled(): try: await clusterForkResult.waitKilled() except: pass - peerLiveness.killed.set_result(None) - asyncio.ensure_future(waitClusterForkResult(), loop=self.loop) + safe_set_result(peerLiveness.killed, None) + asyncio.ensure_future(waitClusterForkKilled(), loop=self.loop) clusterGetRemote = await self.clusterSetup.connectRPCObject(await clusterForkResult.getResult()) remoteDict = await clusterGetRemote() @@ -858,9 +864,20 @@ class PluginRemote: pluginFork = PluginFork() pluginFork.result = asyncio.create_task(getClusterFork()) - pluginFork.terminate = lambda: peerLiveness.killed.set_result(None) + async def waitKilled(): + await peerLiveness.killed + pluginFork.exit = asyncio.create_task(waitKilled()) + def terminate(): + safe_set_result(peerLiveness.killed, None) + pluginFork.worker.terminate() + pluginFork.terminate = terminate + + pluginFork.worker = None + return pluginFork + t: asyncio.Task + t.cancel() if options: runtime = options.get("runtime", None) if runtime and runtime != "python": @@ -869,17 +886,26 @@ class PluginRemote: raise Exception("python fork to filename not supported") parent_conn, child_conn = multiprocessing.Pipe() + pluginFork = PluginFork() - print("new fork") + killed = Future(loop=self.loop) + async def waitKilled(): + await killed + pluginFork.exit = asyncio.create_task(waitKilled()) + def terminate(): + safe_set_result(killed, None) + pluginFork.worker.kill() + pluginFork.terminate = terminate + pluginFork.worker = multiprocessing.Process( target=plugin_fork, args=(child_conn,), daemon=True ) pluginFork.worker.start() - pluginFork.terminate = lambda: pluginFork.worker.kill() def schedule_exit_check(): def exit_check(): if pluginFork.worker.exitcode != None: + safe_set_result(killed, None) pluginFork.worker.join() else: schedule_exit_check()