From 3df57681cfea4893e6cd16e4d17c0fc91c4a0261 Mon Sep 17 00:00:00 2001 From: Brett Jia Date: Sun, 15 Jan 2023 23:33:00 -0500 Subject: [PATCH] cleanly exit orphaned subprocesses --- plugins/arlo/src/arlo_plugin/camera.py | 20 ++++--- plugins/arlo/src/arlo_plugin/child_process.py | 55 +++++++++++++++++++ 2 files changed, 66 insertions(+), 9 deletions(-) create mode 100644 plugins/arlo/src/arlo_plugin/child_process.py diff --git a/plugins/arlo/src/arlo_plugin/camera.py b/plugins/arlo/src/arlo_plugin/camera.py index 6962439a0..328727f19 100644 --- a/plugins/arlo/src/arlo_plugin/camera.py +++ b/plugins/arlo/src/arlo_plugin/camera.py @@ -2,13 +2,13 @@ from aioice import Candidate from aiortc import RTCSessionDescription, RTCIceGatherer, RTCIceServer from aiortc.rtcicetransport import candidate_to_aioice, candidate_from_aioice import asyncio -import json import socket import scrypted_sdk from scrypted_sdk import ScryptedDeviceBase from scrypted_sdk.types import Camera, VideoCamera, MotionSensor, Battery, ScryptedMimeTypes +from .child_process import ChildProcess from .logging import ScryptedDeviceLoggerMixin from .util import BackgroundTaskMixin from .rtcpeerconnection import BackgroundRTCPeerConnection @@ -233,7 +233,9 @@ class ArloCameraRTCSignalingSession(BackgroundTaskMixin): f"udp://localhost:{port}" ] self.logger.debug(f"Starting ffmpeg at {ffmpeg_path} with {ffmpeg_args}") - self.ffmpeg_subprocess = await asyncio.create_subprocess_exec(ffmpeg_path, *ffmpeg_args) + + self.ffmpeg_subprocess = ChildProcess(ffmpeg_path, *ffmpeg_args) + self.ffmpeg_subprocess.start() self.pc = BackgroundRTCPeerConnection(self.logger) await self.pc.add_media( @@ -251,12 +253,8 @@ class ArloCameraRTCSignalingSession(BackgroundTaskMixin): await ice_gatherer.gather() self.local_candidates = ice_gatherer.getLocalCandidates() - #return - try: - if self.camera._can_push_to_talk(): - await self.initialize_push_to_talk() - except Exception as e: - self.logger.error(e) + if self.camera._can_push_to_talk(): + await self.initialize_push_to_talk() async def initialize_push_to_talk(self): self.logger.info("Initializing push to talk for RTC") @@ -355,11 +353,15 @@ class ArloCameraRTCSignalingSession(BackgroundTaskMixin): async def shutdown(self): if self.ffmpeg_subprocess is not None: - self.ffmpeg_subprocess.terminate() + self.ffmpeg_subprocess.stop() + self.ffmpeg_subprocess = None if self.pc is not None: await self.pc.close() + self.pc = None if self.arlo_pc is not None: await self.arlo_pc.close() + self.arlo_pc = None + self.arlo_relay_track = None class ArloCameraRTCSessionControl: diff --git a/plugins/arlo/src/arlo_plugin/child_process.py b/plugins/arlo/src/arlo_plugin/child_process.py new file mode 100644 index 000000000..bd6af15c8 --- /dev/null +++ b/plugins/arlo/src/arlo_plugin/child_process.py @@ -0,0 +1,55 @@ +import multiprocessing +import subprocess +import time +import threading + +HEARTBEAT_INTERVAL = 1 + + +def multiprocess_main(child_conn, exe, args): + sp = subprocess.Popen([exe, *args]) + + while True: + has_data = child_conn.poll(HEARTBEAT_INTERVAL * 5) + if not has_data: + break + keep_alive = child_conn.recv() + if not keep_alive: + break + + sp.terminate() + sp.wait() + + +class ChildProcess: + """Class to manage running a child process that gets cleaned up if the parent exits. + + When spawining subprocesses in Python, if the parent is forcibly killed (as is the case + when Scrypted restarts plugins), subprocesses get orphaned. This approach uses parent-child + heartbeats for the child to ensure that the parent process is still alive, and to cleanly + exit the child if the parent has terminated. + """ + + def __init__(self, exe, *args): + self.exe = exe + self.args = args + + self.parent_conn, self.child_conn = multiprocessing.Pipe() + self.process = multiprocessing.Process(target=multiprocess_main, args=(self.child_conn, exe, args)) + self.process.daemon = True + self._stop = False + + self.thread = threading.Thread(target=self.heartbeat) + + def start(self): + self.process.start() + self.thread.start() + + def stop(self): + self._stop = True + self.parent_conn.send(False) + + def heartbeat(self): + while not self._stop: + time.sleep(HEARTBEAT_INTERVAL) + self.parent_conn.send(True)