From 9b5c2da71305e21bb047a0ea7415ce2299999bb9 Mon Sep 17 00:00:00 2001 From: Brett Jia Date: Sun, 15 Jan 2023 21:24:04 -0500 Subject: [PATCH] relay talkback audio --- plugins/arlo/src/arlo_plugin/camera.py | 187 +++++++++--------- plugins/arlo/src/arlo_plugin/logging.py | 4 + plugins/arlo/src/arlo_plugin/provider.py | 2 - .../arlo/src/arlo_plugin/rtcpeerconnection.py | 151 ++++++++++---- 4 files changed, 204 insertions(+), 140 deletions(-) diff --git a/plugins/arlo/src/arlo_plugin/camera.py b/plugins/arlo/src/arlo_plugin/camera.py index d5acfc58a..5c4379cdf 100644 --- a/plugins/arlo/src/arlo_plugin/camera.py +++ b/plugins/arlo/src/arlo_plugin/camera.py @@ -115,7 +115,7 @@ class ArloCamera(ScryptedDeviceBase, Camera, VideoCamera, MotionSensor, Battery, scrypted_setup = { "type": "offer", "audio": { - "direction": "recvonly", + "direction": "sendrecv" if self._can_push_to_talk() else "recvonly", }, "video": { "direction": "recvonly", @@ -124,103 +124,14 @@ class ArloCamera(ScryptedDeviceBase, Camera, VideoCamera, MotionSensor, Battery, plugin_setup = {} scrypted_offer = await scrypted_session.createLocalDescription("offer", scrypted_setup, sendIceCandidate=plugin_session.addIceCandidate) + self.logger.info(f"Scrypted offer sdp:\n{scrypted_offer['sdp']}") await plugin_session.setRemoteDescription(scrypted_offer, plugin_setup) plugin_answer = await plugin_session.createLocalDescription("answer", plugin_setup, scrypted_session.sendIceCandidate) + self.logger.info(f"Scrypted answer sdp:\n{plugin_answer['sdp']}") await scrypted_session.setRemoteDescription(plugin_answer, scrypted_setup) return ArloCameraRTCSessionControl(plugin_session) - async def startIntercom(self, media): - self.logger.info("Starting intercom") - - ffmpeg_params = json.loads(await scrypted_sdk.mediaManager.convertMediaObjectToBuffer(media, ScryptedMimeTypes.FFmpegInput.value)) - self.logger.debug(f"Received ffmpeg params: {ffmpeg_params}") - - # Reserve a port for us to give to ffmpeg - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.bind(('localhost', 0)) - port = sock.getsockname()[1] - - # Start ffmpeg to convert input into something we know PyAV understands - ffmpeg_path = await scrypted_sdk.mediaManager.getFFmpegPath() - ffmpeg_args = [ - "-y", - "-hide_banner", - "-loglevel", "error", - "-analyzeduration", "0", - "-fflags", "-nobuffer", - "-probesize", "32", - *ffmpeg_params["inputArguments"], - "-vn", "-dn", "-sn", - "-f", "mpegts", - "-flush_packets", "1", - f"udp://localhost:{port}" - ] - ffmpeg = await asyncio.create_subprocess_exec(ffmpeg_path, *ffmpeg_args) - - def cleanup(): - ffmpeg.kill() - self.cleanup = cleanup - - session_id, ice_servers = self.provider.arlo.StartPushToTalk(self.arlo_basestation, self.arlo_device) - self.logger.debug(f"Received ice servers: {[ice['url'] for ice in ice_servers]}") - - ice_servers = [ - RTCIceServer(urls=ice["url"], credential=ice.get("credential"), username=ice.get("username")) - for ice in ice_servers - ] - ice_gatherer = RTCIceGatherer(ice_servers) - await ice_gatherer.gather() - - local_candidates = [ - f"candidate:{Candidate.to_sdp(candidate_to_aioice(candidate))}" - for candidate in ice_gatherer.getLocalCandidates() - ] - - log_candidates = '\n'.join(local_candidates) - self.logger.info(f"Local candidates:\n{log_candidates}") - - # MediaPlayer/PyAV will block until the intercom stream starts, and it seems that scrypted waits - # for startIntercom to exit before sending data. So, let's do the remaining setup in a coroutine - # so this function can return early. - # This is required even if we use BackgroundRTCPeerConnection, since setting up MediaPlayer may - # block the background thread's event loop and prevent other async functions from running. - async def async_setup(): - pc = self.pc = BackgroundRTCPeerConnection() - self.sdp_answered = False - - sock.close() - await pc.add_audio( - f"udp://localhost:{port}", - format="mpegts", - ) - - offer = await pc.createOffer() - self.logger.info(f"Arlo offer sdp:\n{offer.sdp}") - - await pc.setLocalDescription(offer) - - self.provider.arlo.NotifyPushToTalkSDP( - self.arlo_basestation, self.arlo_device, - session_id, offer.sdp - ) - for candidate in local_candidates: - self.provider.arlo.NotifyPushToTalkCandidate( - self.arlo_basestation, self.arlo_device, - session_id, candidate - ) - - self.create_task(async_setup()) - - async def stopIntercom(self): - self.logger.info("Stopping intercom") - if self.pc: - await self.pc.close() - self.cleanup() - self.pc = None - self.cleanup = None - self.sdp_answered = False - def _update_device_details(self, arlo_device): """For updating device details from the Arlo dictionary retrieved from Arlo's REST API. """ @@ -246,6 +157,8 @@ class ArloCameraRTCSignalingSession(BackgroundTaskMixin): self.local_candidates = None self.arlo_pc = None self.arlo_sdp_answered = False + self.arlo_relay = None + self.arlo_relay_track = None self.stop_subscriptions = False self.start_sdp_answer_subscription() @@ -294,6 +207,7 @@ class ArloCameraRTCSignalingSession(BackgroundTaskMixin): ) async def initialize(self): + self.logger.info("Initializing video stream for RTC") rtsp_url = await self.camera._getVideoStreamURL() # Reserve a port for us to give to ffmpeg @@ -314,20 +228,21 @@ class ArloCameraRTCSignalingSession(BackgroundTaskMixin): "-acodec", "aac", "-i", rtsp_url, "-vcodec", "copy", - "-acodec", "copy", + "-acodec", "aac", "-f", "mpegts", "-flush_packets", "1", f"udp://localhost:{port}" ] + self.logger.debug(f"Starting ffmpeg at {ffmpeg_path} with {ffmpeg_args}") self.ffmpeg_subprocess = await asyncio.create_subprocess_exec(ffmpeg_path, *ffmpeg_args) - self.pc = BackgroundRTCPeerConnection() - self.pc.add_media( + self.pc = BackgroundRTCPeerConnection(self.logger) + await self.pc.add_media( f"udp://localhost:{port}", format="mpegts", options={ "vcodec": "h264", - "acodec": "pcm_s16le", + "acodec": "aac", "analyzeduration": "0", "probesize": "32" } @@ -337,6 +252,67 @@ class ArloCameraRTCSignalingSession(BackgroundTaskMixin): await ice_gatherer.gather() self.local_candidates = ice_gatherer.getLocalCandidates() + #return + try: + if self.camera._can_push_to_talk(): + await self.initialize_push_to_talk() + except Exception as e: + self.logger.error(e) + + async def initialize_push_to_talk(self): + self.logger.info("Initializing push to talk for RTC") + + session_id, ice_servers = self.provider.arlo.StartPushToTalk(self.arlo_basestation, self.arlo_device) + self.logger.debug(f"Received ice servers: {[ice['url'] for ice in ice_servers]}") + + ice_servers = [ + RTCIceServer(urls=ice["url"], credential=ice.get("credential"), username=ice.get("username")) + for ice in ice_servers + ] + ice_gatherer = RTCIceGatherer(ice_servers) + await ice_gatherer.gather() + + local_candidates = [ + f"candidate:{Candidate.to_sdp(candidate_to_aioice(candidate))}" + for candidate in ice_gatherer.getLocalCandidates() + ] + + log_candidates = '\n'.join(local_candidates) + self.logger.debug(f"Local candidates:\n{log_candidates}") + + self.arlo_pc = BackgroundRTCPeerConnection(self.logger, background=self.pc.background) + + received_audio_track = asyncio.get_event_loop().create_future() + async def on_track(track): + self.logger.debug(f"Received track from scrypted: {track.kind}") + if track.kind == "audio" and self.arlo_relay is None: + self.arlo_relay, self.arlo_relay_track = await self.arlo_pc.subscribe_track(track) + received_audio_track.set_result(True) + self.pc.on_track(on_track) + + # Perform the remaining setup asynchronously later, since we need to finish initializing + # before RTC session exchange can happen. + async def async_setup(): + await received_audio_track + self.sdp_answered = False + + offer = await self.arlo_pc.createOffer() + self.logger.info(f"Arlo offer sdp:\n{offer.sdp}") + + await self.arlo_pc.setLocalDescription(offer) + + self.provider.arlo.NotifyPushToTalkSDP( + self.arlo_basestation, self.arlo_device, + session_id, offer.sdp + ) + for candidate in local_candidates: + self.provider.arlo.NotifyPushToTalkCandidate( + self.arlo_basestation, self.arlo_device, + session_id, candidate + ) + + self.create_task(async_setup()) + async def createLocalDescription(self, type, setup, sendIceCandidate=None): if type == "offer": raise Exception("can only create answers in ArloCameraRTCSignalingSession.createLocalDescription") @@ -372,9 +348,15 @@ class ArloCameraRTCSignalingSession(BackgroundTaskMixin): async def getOptions(self): pass + async def unmute_relay(self): + await self.arlo_pc.unmute_relay(self.arlo_relay, self.arlo_relay_track) + + async def mute_relay(self): + await self.arlo_pc.mute_relay(self.arlo_relay, self.arlo_relay_track) + async def shutdown(self): if self.ffmpeg_subprocess is not None: - self.ffmpeg_subprocess.kill() + self.ffmpeg_subprocess.terminate() if self.pc is not None: await self.pc.close() if self.arlo_pc is not None: @@ -384,6 +366,17 @@ class ArloCameraRTCSignalingSession(BackgroundTaskMixin): class ArloCameraRTCSessionControl: def __init__(self, arlo_session): self.arlo_session = arlo_session + self.logger = arlo_session.logger + + async def setPlayback(self, options): + self.logger.debug(f"setPlayback options {options}") + audio = options.get("audio") + if audio is None: + return + if audio: + await self.arlo_session.unmute_relay() + else: + await self.arlo_session.mute_relay() async def endSession(self): await self.arlo_session.shutdown() \ No newline at end of file diff --git a/plugins/arlo/src/arlo_plugin/logging.py b/plugins/arlo/src/arlo_plugin/logging.py index c5b7874d2..3088ab263 100644 --- a/plugins/arlo/src/arlo_plugin/logging.py +++ b/plugins/arlo/src/arlo_plugin/logging.py @@ -77,6 +77,10 @@ def init_aiortc_logger(logger_name): # rtcrtpsender is extremely noisy for DEBUG, so filter out all # the packet and bitrate logs logger.addFilter(lambda record: 0 if ") > " in record.getMessage() or ") - receiver" in record.getMessage() else 1) + if logger_name == "aiortc.rtcrtpreceiver": + # rtcrtpreceiver is extremely noisy for DEBUG, so filter out all + # the packet logs + logger.addFilter(lambda record: 0 if ") < " in record.getMessage() or ") > " in record.getMessage() else 1) for log in aiortc_loggers: init_aiortc_logger(log) diff --git a/plugins/arlo/src/arlo_plugin/provider.py b/plugins/arlo/src/arlo_plugin/provider.py index 1d3df2303..bafe50921 100644 --- a/plugins/arlo/src/arlo_plugin/provider.py +++ b/plugins/arlo/src/arlo_plugin/provider.py @@ -14,7 +14,6 @@ from .camera import ArloCamera from .doorbell import ArloDoorbell from .logging import ScryptedDeviceLoggerMixin, propagate_aiortc_logging_level from .util import BackgroundTaskMixin -from .rtcpeerconnection import logger as background_rtc_logger class ArloProvider(ScryptedDeviceBase, Settings, DeviceProvider, DeviceDiscovery, ScryptedDeviceLoggerMixin, BackgroundTaskMixin): @@ -160,7 +159,6 @@ class ArloProvider(ScryptedDeviceBase, Settings, DeviceProvider, DeviceDiscovery for _, device in self.scrypted_devices.items(): device.logger.setLevel(log_level) arlo_lib_logger.setLevel(log_level) - background_rtc_logger.setLevel(log_level) propagate_aiortc_logging_level(log_level) logging.getLogger('libav').setLevel(log_level) diff --git a/plugins/arlo/src/arlo_plugin/rtcpeerconnection.py b/plugins/arlo/src/arlo_plugin/rtcpeerconnection.py index e4d0e30cd..655f3cf6a 100644 --- a/plugins/arlo/src/arlo_plugin/rtcpeerconnection.py +++ b/plugins/arlo/src/arlo_plugin/rtcpeerconnection.py @@ -1,41 +1,15 @@ from aiortc import RTCPeerConnection -from aiortc.contrib.media import MediaPlayer +from aiortc.contrib.media import MediaPlayer, MediaRelay import asyncio import threading -import logging import queue -import sys -# construct logger instance to be used by BackgroundRTCPeerConnection -logger = logging.getLogger("rtc") -logger.setLevel(logging.INFO) +class BackgroundThreadedLoop: + """Class that manages a background thread and its asyncio loop.""" -# output logger to stdout -ch = logging.StreamHandler(sys.stdout) - -# log formatting -fmt = logging.Formatter("(arlo) %(levelname)s:%(name)s:%(asctime)s.%(msecs)03d %(message)s", "%H:%M:%S") -ch.setFormatter(fmt) - -# configure handler to logger -logger.addHandler(ch) - - -class BackgroundRTCPeerConnection: - """Proxy class to use RTCPeerConnection in a background thread. - - The purpose of this proxy is to ensure that RTCPeerConnection operations - do not block the main asyncio thread. From testing, it seems that the - close() function blocks until the source RTSP server exits, which we - have no control over. Additionally, since asyncio coroutines are tied - to the event loop they were constructed from, it is not possible to only - run close() in a separate thread. Therefore, each instance of RTCPeerConnection - is launched within its own ephemeral thread, which cleans itself up once - close() completes. - """ - - def __init__(self): + def __init__(self, logger): + self.logger = logger self.main_loop = asyncio.get_event_loop() self.background_loop = asyncio.new_event_loop() @@ -45,19 +19,18 @@ class BackgroundRTCPeerConnection: self.thread_started.get() self.pending_tasks = set() - self.stopped = False + self.handles = 1 def __background_main(self): - logger.debug(f"Background RTC loop {self.thread.name} starting") - self.pc = RTCPeerConnection() + self.logger.debug(f"Background RTC loop {self.thread.name} starting") asyncio.set_event_loop(self.background_loop) self.thread_started.put(True) self.background_loop.run_forever() - logger.debug(f"Background RTC loop {self.thread.name} exiting") + self.logger.debug(f"Background RTC loop {self.thread.name} exiting") - async def __run_background(self, coroutine, await_result=True, stop_loop=False): + async def run_background(self, coroutine, await_result=True): fut = self.main_loop.create_future() def background_callback(): @@ -86,7 +59,7 @@ class BackgroundRTCPeerConnection: task.add_done_callback(self.pending_tasks.discard) task.add_done_callback( lambda _: - self.background_loop.stop() if self.stopped and len(self.pending_tasks) + self.background_loop.stop() if self.handles == 0 and len(self.pending_tasks) == 0 else None ) @@ -97,6 +70,57 @@ class BackgroundRTCPeerConnection: return None return await fut + async def close_with(self, coroutine, await_result=False): + if self.handles == 0: + return + self.handles -= 1 + await self.run_background(coroutine, await_result=await_result) + + +class BackgroundRTCPeerConnection: + """Proxy class to use RTCPeerConnection in a background thread. + + The purpose of this proxy is to ensure that RTCPeerConnection operations + do not block the main asyncio thread. From testing, it seems that the + close() function blocks until the source RTSP server exits, which we + have no control over. Additionally, since asyncio coroutines are tied + to the event loop they were constructed from, it is not possible to only + run close() in a separate thread. Therefore, each instance of RTCPeerConnection + is launched within its own ephemeral thread, which cleans itself up once + close() completes. + """ + + def __init__(self, logger, background=None): + self.background = background + if background: + self.background.handles += 1 + else: + self.background = BackgroundThreadedLoop(logger) + self.logger = logger + + self.pc = None + self.track_queue = asyncio.Queue() + self.stopped = False + + self.initialized = queue.Queue(1) + self.background.background_loop.call_soon_threadsafe(self.__background_init) + self.initialized.get() + + def __background_init(self): + pc = self.pc = RTCPeerConnection() + + @pc.on("track") + def on_track(track): + self.background.main_loop.call_soon_threadsafe( + self.background.main_loop.create_task, + self.track_queue.put(track), + ) + + self.initialized.put(True) + + async def __run_background(self, coroutine, await_result=True): + return await self.background.run_background(coroutine, await_result=await_result) + async def createOffer(self): return await self.__run_background(self.pc.createOffer()) @@ -116,9 +140,10 @@ class BackgroundRTCPeerConnection: if self.stopped: return self.stopped = True - await self.__run_background(self.pc.close(), await_result=False, stop_loop=True) + await self.track_queue.put(None) + await self.background.close_with(self.pc.close()) - def add_media(self, endpoint, format=None, options={}): + async def add_media(self, endpoint, format=None, options={}): """Adds media track(s) to the RTCPeerConnection, using provided arguments. This constructs a MediaPlayer in the background thread's asyncio loop, @@ -127,17 +152,21 @@ class BackgroundRTCPeerConnection: Note that this may block the background thread's event loop if the endpoint server is not yet ready. """ + main_loop = self.background.main_loop + background_loop = self.background.background_loop def add_media_background(): + self.logger.debug(f"Adding endpoint {endpoint} to MediaPlayer") media_player = MediaPlayer(endpoint, format=format, options=options) media_player._throttle_playback = False + self.logger.debug(f"Added endpoint {endpoint} to MediaPlayer") # patch the player's stop function to close RTC if # the media ends before RTC is closed old_stop = media_player._stop def new_stop(*args, **kwargs): old_stop(*args, **kwargs) - self.main_loop.call_soon_threadsafe(self.main_loop.create_task, self.close()) + main_loop.call_soon_threadsafe(main_loop.create_task, self.close()) media_player._stop = new_stop if media_player.audio is not None: @@ -145,4 +174,44 @@ class BackgroundRTCPeerConnection: if media_player.video is not None: self.pc.addTrack(media_player.video) - self.background_loop.call_soon_threadsafe(add_media_background) \ No newline at end of file + background_loop.call_soon_threadsafe(add_media_background) + + async def subscribe_track(self, track): + relay_fut = self.background.main_loop.create_future() + + def relay_background(): + self.logger.debug("Starting track relay") + relay = MediaRelay() + relay_track = relay.subscribe(track, buffered=False) + self.pc.addTrack(relay_track) + self.background.main_loop.call_soon_threadsafe(relay_fut.set_result, (relay, relay_track)) + self.logger.debug("Started track relay") + + self.background.background_loop.call_soon_threadsafe(relay_background) + return await relay_fut + + async def mute_relay(self, relay, relay_track): + def mute_background(): + self.logger.debug("Muting track relay") + relay._stop(relay_track) + self.logger.debug("Muted track relay") + self.background.background_loop.call_soon_threadsafe(mute_background) + + async def unmute_relay(self, relay, relay_track): + def unmute_background(): + self.logger.debug("Unmuting track relay") + relay._start(relay_track) + self.logger.debug("Unmuted track relay") + self.background.background_loop.call_soon_threadsafe(unmute_background) + + def on_track(self, callback): + async def loop(): + while not self.stopped: + try: + track = await asyncio.wait_for(self.track_queue.get(), 5) + except asyncio.TimeoutError: + continue + if not track: + break + await callback(track) + self.background.main_loop.create_task(loop())