diff --git a/plugins/arlo/src/arlo_plugin/camera.py b/plugins/arlo/src/arlo_plugin/camera.py index 3eaaba8b7..b8ab03ca1 100644 --- a/plugins/arlo/src/arlo_plugin/camera.py +++ b/plugins/arlo/src/arlo_plugin/camera.py @@ -195,7 +195,7 @@ class ArloCamera(ScryptedDeviceBase, Camera, VideoCamera, Intercom, MotionSensor pc = self.pc = BackgroundRTCPeerConnection() self.sdp_answered = False - pc.add_audio(options) + await pc.add_audio(options) offer = await pc.createOffer() self.logger.info(f"Arlo offer sdp:\n{offer.sdp}") diff --git a/plugins/arlo/src/arlo_plugin/rtcpeerconnection.py b/plugins/arlo/src/arlo_plugin/rtcpeerconnection.py index f696f17a3..22f5c4799 100644 --- a/plugins/arlo/src/arlo_plugin/rtcpeerconnection.py +++ b/plugins/arlo/src/arlo_plugin/rtcpeerconnection.py @@ -3,8 +3,11 @@ from aiortc.contrib.media import MediaPlayer import asyncio import threading import logging +import os import queue import sys +import tempfile +from urllib.parse import urlparse # construct logger instance to be used by BackgroundRTCPeerConnection @@ -44,6 +47,10 @@ class BackgroundRTCPeerConnection: self.thread.start() self.thread_started.get() + self.pending_tasks = set() + self.stopped = False + self.cleanup_background = None + def __background_main(self): logger.debug(f"Background RTC loop {self.thread.name} starting") self.pc = RTCPeerConnection() @@ -52,6 +59,9 @@ class BackgroundRTCPeerConnection: self.thread_started.put(True) self.background_loop.run_forever() + if self.cleanup_background is not None: + self.cleanup_background() + logger.debug(f"Background RTC loop {self.thread.name} exiting") async def __run_background(self, coroutine, await_result=True, stop_loop=False): @@ -83,7 +93,14 @@ class BackgroundRTCPeerConnection: self.background_loop.stop() task = self.background_loop.create_task(coroutine) + self.pending_tasks.add(task) task.add_done_callback(callback) + task.add_done_callback(self.pending_tasks.discard) + task.add_done_callback( + lambda _: + self.background_loop.stop() if self.stopped and len(self.pending_tasks) + else None + ) # start the callback in the background loop self.background_loop.call_soon_threadsafe(background_callback) @@ -105,9 +122,12 @@ class BackgroundRTCPeerConnection: return await self.__run_background(self.pc.addIceCandidate(candidate)) async def close(self): + if self.stopped: + return + self.stopped = True await self.__run_background(self.pc.close(), await_result=False, stop_loop=True) - def add_audio(self, options): + async def add_audio(self, options): """Adds an audio track to the RTCPeerConnection given FFmpeg options. This constructs a MediaPlayer in the background thread's asyncio loop, @@ -127,6 +147,9 @@ class BackgroundRTCPeerConnection: logger.info(f"Intercom sourced from {input} with format {format}") + if format == "sdp" and input.startswith("tcp"): + input = await self.__sdp_to_file(input) + def add_audio_background(): media_player = MediaPlayer(input, format=format, options=options) @@ -140,4 +163,36 @@ class BackgroundRTCPeerConnection: self.pc.addTrack(media_player.audio) - self.background_loop.call_soon_threadsafe(add_audio_background) \ No newline at end of file + self.background_loop.call_soon_threadsafe(add_audio_background) + + async def __sdp_to_file(self, endpoint): + url = urlparse(endpoint) + logger.debug(f"Reading sdp file from {url.hostname}:{url.port}") + reader, writer = await asyncio.open_connection(url.hostname, url.port) + + sdp_contents = bytes() + while True: + line = await reader.readline() + if not line: + break + sdp_contents += line + + logger.debug("Finished reading sdp") + + writer.close() + await writer.wait_closed() + + logger.info(f"Received intercom input sdp:\n{sdp_contents.decode('utf-8')}") + + fd, filename = tempfile.mkstemp(".sdp") + os.write(fd, sdp_contents) + os.close(fd) + + logger.info(f"Wrote sdp to file {filename}") + + def cleanup_background(): + os.remove(filename) + logger.info(f"Deleted sdp file {filename}") + self.cleanup_background = cleanup_background + + return filename \ No newline at end of file