server: exit hooks for python fork

This commit is contained in:
Koushik Dutta
2024-11-20 12:18:25 -08:00
parent aed6e0c446
commit 69f4de66e9
9 changed files with 45 additions and 18 deletions

View File

@@ -562,6 +562,12 @@ class PeerLiveness:
async def waitKilled(self):
await self.killed
def safe_set_result(fut: Future, result: Any):
try:
fut.set_result(result)
except:
pass
class PluginRemote:
def __init__(
self, clusterSetup: ClusterSetup, api, pluginId: str, hostInfo, loop: AbstractEventLoop
@@ -834,13 +840,13 @@ class PluginRemote:
pass
asyncio.ensure_future(waitPeerLiveness(), loop=self.loop)
async def waitClusterForkResult():
async def waitClusterForkKilled():
try:
await clusterForkResult.waitKilled()
except:
pass
peerLiveness.killed.set_result(None)
asyncio.ensure_future(waitClusterForkResult(), loop=self.loop)
safe_set_result(peerLiveness.killed, None)
asyncio.ensure_future(waitClusterForkKilled(), loop=self.loop)
clusterGetRemote = await self.clusterSetup.connectRPCObject(await clusterForkResult.getResult())
remoteDict = await clusterGetRemote()
@@ -858,9 +864,20 @@ class PluginRemote:
pluginFork = PluginFork()
pluginFork.result = asyncio.create_task(getClusterFork())
pluginFork.terminate = lambda: peerLiveness.killed.set_result(None)
async def waitKilled():
await peerLiveness.killed
pluginFork.exit = asyncio.create_task(waitKilled())
def terminate():
safe_set_result(peerLiveness.killed, None)
pluginFork.worker.terminate()
pluginFork.terminate = terminate
pluginFork.worker = None
return pluginFork
t: asyncio.Task
t.cancel()
if options:
runtime = options.get("runtime", None)
if runtime and runtime != "python":
@@ -869,17 +886,26 @@ class PluginRemote:
raise Exception("python fork to filename not supported")
parent_conn, child_conn = multiprocessing.Pipe()
pluginFork = PluginFork()
print("new fork")
killed = Future(loop=self.loop)
async def waitKilled():
await killed
pluginFork.exit = asyncio.create_task(waitKilled())
def terminate():
safe_set_result(killed, None)
pluginFork.worker.kill()
pluginFork.terminate = terminate
pluginFork.worker = multiprocessing.Process(
target=plugin_fork, args=(child_conn,), daemon=True
)
pluginFork.worker.start()
pluginFork.terminate = lambda: pluginFork.worker.kill()
def schedule_exit_check():
def exit_check():
if pluginFork.worker.exitcode != None:
safe_set_result(killed, None)
pluginFork.worker.join()
else:
schedule_exit_check()