relay talkback audio

This commit is contained in:
Brett Jia
2023-01-15 21:24:04 -05:00
parent d8b14d4a09
commit 9b5c2da713
4 changed files with 204 additions and 140 deletions

View File

@@ -115,7 +115,7 @@ class ArloCamera(ScryptedDeviceBase, Camera, VideoCamera, MotionSensor, Battery,
scrypted_setup = {
"type": "offer",
"audio": {
"direction": "recvonly",
"direction": "sendrecv" if self._can_push_to_talk() else "recvonly",
},
"video": {
"direction": "recvonly",
@@ -124,103 +124,14 @@ class ArloCamera(ScryptedDeviceBase, Camera, VideoCamera, MotionSensor, Battery,
plugin_setup = {}
scrypted_offer = await scrypted_session.createLocalDescription("offer", scrypted_setup, sendIceCandidate=plugin_session.addIceCandidate)
self.logger.info(f"Scrypted offer sdp:\n{scrypted_offer['sdp']}")
await plugin_session.setRemoteDescription(scrypted_offer, plugin_setup)
plugin_answer = await plugin_session.createLocalDescription("answer", plugin_setup, scrypted_session.sendIceCandidate)
self.logger.info(f"Scrypted answer sdp:\n{plugin_answer['sdp']}")
await scrypted_session.setRemoteDescription(plugin_answer, scrypted_setup)
return ArloCameraRTCSessionControl(plugin_session)
async def startIntercom(self, media):
self.logger.info("Starting intercom")
ffmpeg_params = json.loads(await scrypted_sdk.mediaManager.convertMediaObjectToBuffer(media, ScryptedMimeTypes.FFmpegInput.value))
self.logger.debug(f"Received ffmpeg params: {ffmpeg_params}")
# Reserve a port for us to give to ffmpeg
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('localhost', 0))
port = sock.getsockname()[1]
# Start ffmpeg to convert input into something we know PyAV understands
ffmpeg_path = await scrypted_sdk.mediaManager.getFFmpegPath()
ffmpeg_args = [
"-y",
"-hide_banner",
"-loglevel", "error",
"-analyzeduration", "0",
"-fflags", "-nobuffer",
"-probesize", "32",
*ffmpeg_params["inputArguments"],
"-vn", "-dn", "-sn",
"-f", "mpegts",
"-flush_packets", "1",
f"udp://localhost:{port}"
]
ffmpeg = await asyncio.create_subprocess_exec(ffmpeg_path, *ffmpeg_args)
def cleanup():
ffmpeg.kill()
self.cleanup = cleanup
session_id, ice_servers = self.provider.arlo.StartPushToTalk(self.arlo_basestation, self.arlo_device)
self.logger.debug(f"Received ice servers: {[ice['url'] for ice in ice_servers]}")
ice_servers = [
RTCIceServer(urls=ice["url"], credential=ice.get("credential"), username=ice.get("username"))
for ice in ice_servers
]
ice_gatherer = RTCIceGatherer(ice_servers)
await ice_gatherer.gather()
local_candidates = [
f"candidate:{Candidate.to_sdp(candidate_to_aioice(candidate))}"
for candidate in ice_gatherer.getLocalCandidates()
]
log_candidates = '\n'.join(local_candidates)
self.logger.info(f"Local candidates:\n{log_candidates}")
# MediaPlayer/PyAV will block until the intercom stream starts, and it seems that scrypted waits
# for startIntercom to exit before sending data. So, let's do the remaining setup in a coroutine
# so this function can return early.
# This is required even if we use BackgroundRTCPeerConnection, since setting up MediaPlayer may
# block the background thread's event loop and prevent other async functions from running.
async def async_setup():
pc = self.pc = BackgroundRTCPeerConnection()
self.sdp_answered = False
sock.close()
await pc.add_audio(
f"udp://localhost:{port}",
format="mpegts",
)
offer = await pc.createOffer()
self.logger.info(f"Arlo offer sdp:\n{offer.sdp}")
await pc.setLocalDescription(offer)
self.provider.arlo.NotifyPushToTalkSDP(
self.arlo_basestation, self.arlo_device,
session_id, offer.sdp
)
for candidate in local_candidates:
self.provider.arlo.NotifyPushToTalkCandidate(
self.arlo_basestation, self.arlo_device,
session_id, candidate
)
self.create_task(async_setup())
async def stopIntercom(self):
self.logger.info("Stopping intercom")
if self.pc:
await self.pc.close()
self.cleanup()
self.pc = None
self.cleanup = None
self.sdp_answered = False
def _update_device_details(self, arlo_device):
"""For updating device details from the Arlo dictionary retrieved from Arlo's REST API.
"""
@@ -246,6 +157,8 @@ class ArloCameraRTCSignalingSession(BackgroundTaskMixin):
self.local_candidates = None
self.arlo_pc = None
self.arlo_sdp_answered = False
self.arlo_relay = None
self.arlo_relay_track = None
self.stop_subscriptions = False
self.start_sdp_answer_subscription()
@@ -294,6 +207,7 @@ class ArloCameraRTCSignalingSession(BackgroundTaskMixin):
)
async def initialize(self):
self.logger.info("Initializing video stream for RTC")
rtsp_url = await self.camera._getVideoStreamURL()
# Reserve a port for us to give to ffmpeg
@@ -314,20 +228,21 @@ class ArloCameraRTCSignalingSession(BackgroundTaskMixin):
"-acodec", "aac",
"-i", rtsp_url,
"-vcodec", "copy",
"-acodec", "copy",
"-acodec", "aac",
"-f", "mpegts",
"-flush_packets", "1",
f"udp://localhost:{port}"
]
self.logger.debug(f"Starting ffmpeg at {ffmpeg_path} with {ffmpeg_args}")
self.ffmpeg_subprocess = await asyncio.create_subprocess_exec(ffmpeg_path, *ffmpeg_args)
self.pc = BackgroundRTCPeerConnection()
self.pc.add_media(
self.pc = BackgroundRTCPeerConnection(self.logger)
await self.pc.add_media(
f"udp://localhost:{port}",
format="mpegts",
options={
"vcodec": "h264",
"acodec": "pcm_s16le",
"acodec": "aac",
"analyzeduration": "0",
"probesize": "32"
}
@@ -337,6 +252,67 @@ class ArloCameraRTCSignalingSession(BackgroundTaskMixin):
await ice_gatherer.gather()
self.local_candidates = ice_gatherer.getLocalCandidates()
#return
try:
if self.camera._can_push_to_talk():
await self.initialize_push_to_talk()
except Exception as e:
self.logger.error(e)
async def initialize_push_to_talk(self):
self.logger.info("Initializing push to talk for RTC")
session_id, ice_servers = self.provider.arlo.StartPushToTalk(self.arlo_basestation, self.arlo_device)
self.logger.debug(f"Received ice servers: {[ice['url'] for ice in ice_servers]}")
ice_servers = [
RTCIceServer(urls=ice["url"], credential=ice.get("credential"), username=ice.get("username"))
for ice in ice_servers
]
ice_gatherer = RTCIceGatherer(ice_servers)
await ice_gatherer.gather()
local_candidates = [
f"candidate:{Candidate.to_sdp(candidate_to_aioice(candidate))}"
for candidate in ice_gatherer.getLocalCandidates()
]
log_candidates = '\n'.join(local_candidates)
self.logger.debug(f"Local candidates:\n{log_candidates}")
self.arlo_pc = BackgroundRTCPeerConnection(self.logger, background=self.pc.background)
received_audio_track = asyncio.get_event_loop().create_future()
async def on_track(track):
self.logger.debug(f"Received track from scrypted: {track.kind}")
if track.kind == "audio" and self.arlo_relay is None:
self.arlo_relay, self.arlo_relay_track = await self.arlo_pc.subscribe_track(track)
received_audio_track.set_result(True)
self.pc.on_track(on_track)
# Perform the remaining setup asynchronously later, since we need to finish initializing
# before RTC session exchange can happen.
async def async_setup():
await received_audio_track
self.sdp_answered = False
offer = await self.arlo_pc.createOffer()
self.logger.info(f"Arlo offer sdp:\n{offer.sdp}")
await self.arlo_pc.setLocalDescription(offer)
self.provider.arlo.NotifyPushToTalkSDP(
self.arlo_basestation, self.arlo_device,
session_id, offer.sdp
)
for candidate in local_candidates:
self.provider.arlo.NotifyPushToTalkCandidate(
self.arlo_basestation, self.arlo_device,
session_id, candidate
)
self.create_task(async_setup())
async def createLocalDescription(self, type, setup, sendIceCandidate=None):
if type == "offer":
raise Exception("can only create answers in ArloCameraRTCSignalingSession.createLocalDescription")
@@ -372,9 +348,15 @@ class ArloCameraRTCSignalingSession(BackgroundTaskMixin):
async def getOptions(self):
pass
async def unmute_relay(self):
await self.arlo_pc.unmute_relay(self.arlo_relay, self.arlo_relay_track)
async def mute_relay(self):
await self.arlo_pc.mute_relay(self.arlo_relay, self.arlo_relay_track)
async def shutdown(self):
if self.ffmpeg_subprocess is not None:
self.ffmpeg_subprocess.kill()
self.ffmpeg_subprocess.terminate()
if self.pc is not None:
await self.pc.close()
if self.arlo_pc is not None:
@@ -384,6 +366,17 @@ class ArloCameraRTCSignalingSession(BackgroundTaskMixin):
class ArloCameraRTCSessionControl:
def __init__(self, arlo_session):
self.arlo_session = arlo_session
self.logger = arlo_session.logger
async def setPlayback(self, options):
self.logger.debug(f"setPlayback options {options}")
audio = options.get("audio")
if audio is None:
return
if audio:
await self.arlo_session.unmute_relay()
else:
await self.arlo_session.mute_relay()
async def endSession(self):
await self.arlo_session.shutdown()

View File

@@ -77,6 +77,10 @@ def init_aiortc_logger(logger_name):
# rtcrtpsender is extremely noisy for DEBUG, so filter out all
# the packet and bitrate logs
logger.addFilter(lambda record: 0 if ") > " in record.getMessage() or ") - receiver" in record.getMessage() else 1)
if logger_name == "aiortc.rtcrtpreceiver":
# rtcrtpreceiver is extremely noisy for DEBUG, so filter out all
# the packet logs
logger.addFilter(lambda record: 0 if ") < " in record.getMessage() or ") > " in record.getMessage() else 1)
for log in aiortc_loggers:
init_aiortc_logger(log)

View File

@@ -14,7 +14,6 @@ from .camera import ArloCamera
from .doorbell import ArloDoorbell
from .logging import ScryptedDeviceLoggerMixin, propagate_aiortc_logging_level
from .util import BackgroundTaskMixin
from .rtcpeerconnection import logger as background_rtc_logger
class ArloProvider(ScryptedDeviceBase, Settings, DeviceProvider, DeviceDiscovery, ScryptedDeviceLoggerMixin, BackgroundTaskMixin):
@@ -160,7 +159,6 @@ class ArloProvider(ScryptedDeviceBase, Settings, DeviceProvider, DeviceDiscovery
for _, device in self.scrypted_devices.items():
device.logger.setLevel(log_level)
arlo_lib_logger.setLevel(log_level)
background_rtc_logger.setLevel(log_level)
propagate_aiortc_logging_level(log_level)
logging.getLogger('libav').setLevel(log_level)

View File

@@ -1,41 +1,15 @@
from aiortc import RTCPeerConnection
from aiortc.contrib.media import MediaPlayer
from aiortc.contrib.media import MediaPlayer, MediaRelay
import asyncio
import threading
import logging
import queue
import sys
# construct logger instance to be used by BackgroundRTCPeerConnection
logger = logging.getLogger("rtc")
logger.setLevel(logging.INFO)
class BackgroundThreadedLoop:
"""Class that manages a background thread and its asyncio loop."""
# output logger to stdout
ch = logging.StreamHandler(sys.stdout)
# log formatting
fmt = logging.Formatter("(arlo) %(levelname)s:%(name)s:%(asctime)s.%(msecs)03d %(message)s", "%H:%M:%S")
ch.setFormatter(fmt)
# configure handler to logger
logger.addHandler(ch)
class BackgroundRTCPeerConnection:
"""Proxy class to use RTCPeerConnection in a background thread.
The purpose of this proxy is to ensure that RTCPeerConnection operations
do not block the main asyncio thread. From testing, it seems that the
close() function blocks until the source RTSP server exits, which we
have no control over. Additionally, since asyncio coroutines are tied
to the event loop they were constructed from, it is not possible to only
run close() in a separate thread. Therefore, each instance of RTCPeerConnection
is launched within its own ephemeral thread, which cleans itself up once
close() completes.
"""
def __init__(self):
def __init__(self, logger):
self.logger = logger
self.main_loop = asyncio.get_event_loop()
self.background_loop = asyncio.new_event_loop()
@@ -45,19 +19,18 @@ class BackgroundRTCPeerConnection:
self.thread_started.get()
self.pending_tasks = set()
self.stopped = False
self.handles = 1
def __background_main(self):
logger.debug(f"Background RTC loop {self.thread.name} starting")
self.pc = RTCPeerConnection()
self.logger.debug(f"Background RTC loop {self.thread.name} starting")
asyncio.set_event_loop(self.background_loop)
self.thread_started.put(True)
self.background_loop.run_forever()
logger.debug(f"Background RTC loop {self.thread.name} exiting")
self.logger.debug(f"Background RTC loop {self.thread.name} exiting")
async def __run_background(self, coroutine, await_result=True, stop_loop=False):
async def run_background(self, coroutine, await_result=True):
fut = self.main_loop.create_future()
def background_callback():
@@ -86,7 +59,7 @@ class BackgroundRTCPeerConnection:
task.add_done_callback(self.pending_tasks.discard)
task.add_done_callback(
lambda _:
self.background_loop.stop() if self.stopped and len(self.pending_tasks)
self.background_loop.stop() if self.handles == 0 and len(self.pending_tasks) == 0
else None
)
@@ -97,6 +70,57 @@ class BackgroundRTCPeerConnection:
return None
return await fut
async def close_with(self, coroutine, await_result=False):
if self.handles == 0:
return
self.handles -= 1
await self.run_background(coroutine, await_result=await_result)
class BackgroundRTCPeerConnection:
"""Proxy class to use RTCPeerConnection in a background thread.
The purpose of this proxy is to ensure that RTCPeerConnection operations
do not block the main asyncio thread. From testing, it seems that the
close() function blocks until the source RTSP server exits, which we
have no control over. Additionally, since asyncio coroutines are tied
to the event loop they were constructed from, it is not possible to only
run close() in a separate thread. Therefore, each instance of RTCPeerConnection
is launched within its own ephemeral thread, which cleans itself up once
close() completes.
"""
def __init__(self, logger, background=None):
self.background = background
if background:
self.background.handles += 1
else:
self.background = BackgroundThreadedLoop(logger)
self.logger = logger
self.pc = None
self.track_queue = asyncio.Queue()
self.stopped = False
self.initialized = queue.Queue(1)
self.background.background_loop.call_soon_threadsafe(self.__background_init)
self.initialized.get()
def __background_init(self):
pc = self.pc = RTCPeerConnection()
@pc.on("track")
def on_track(track):
self.background.main_loop.call_soon_threadsafe(
self.background.main_loop.create_task,
self.track_queue.put(track),
)
self.initialized.put(True)
async def __run_background(self, coroutine, await_result=True):
return await self.background.run_background(coroutine, await_result=await_result)
async def createOffer(self):
return await self.__run_background(self.pc.createOffer())
@@ -116,9 +140,10 @@ class BackgroundRTCPeerConnection:
if self.stopped:
return
self.stopped = True
await self.__run_background(self.pc.close(), await_result=False, stop_loop=True)
await self.track_queue.put(None)
await self.background.close_with(self.pc.close())
def add_media(self, endpoint, format=None, options={}):
async def add_media(self, endpoint, format=None, options={}):
"""Adds media track(s) to the RTCPeerConnection, using provided arguments.
This constructs a MediaPlayer in the background thread's asyncio loop,
@@ -127,17 +152,21 @@ class BackgroundRTCPeerConnection:
Note that this may block the background thread's event loop if the
endpoint server is not yet ready.
"""
main_loop = self.background.main_loop
background_loop = self.background.background_loop
def add_media_background():
self.logger.debug(f"Adding endpoint {endpoint} to MediaPlayer")
media_player = MediaPlayer(endpoint, format=format, options=options)
media_player._throttle_playback = False
self.logger.debug(f"Added endpoint {endpoint} to MediaPlayer")
# patch the player's stop function to close RTC if
# the media ends before RTC is closed
old_stop = media_player._stop
def new_stop(*args, **kwargs):
old_stop(*args, **kwargs)
self.main_loop.call_soon_threadsafe(self.main_loop.create_task, self.close())
main_loop.call_soon_threadsafe(main_loop.create_task, self.close())
media_player._stop = new_stop
if media_player.audio is not None:
@@ -145,4 +174,44 @@ class BackgroundRTCPeerConnection:
if media_player.video is not None:
self.pc.addTrack(media_player.video)
self.background_loop.call_soon_threadsafe(add_media_background)
background_loop.call_soon_threadsafe(add_media_background)
async def subscribe_track(self, track):
relay_fut = self.background.main_loop.create_future()
def relay_background():
self.logger.debug("Starting track relay")
relay = MediaRelay()
relay_track = relay.subscribe(track, buffered=False)
self.pc.addTrack(relay_track)
self.background.main_loop.call_soon_threadsafe(relay_fut.set_result, (relay, relay_track))
self.logger.debug("Started track relay")
self.background.background_loop.call_soon_threadsafe(relay_background)
return await relay_fut
async def mute_relay(self, relay, relay_track):
def mute_background():
self.logger.debug("Muting track relay")
relay._stop(relay_track)
self.logger.debug("Muted track relay")
self.background.background_loop.call_soon_threadsafe(mute_background)
async def unmute_relay(self, relay, relay_track):
def unmute_background():
self.logger.debug("Unmuting track relay")
relay._start(relay_track)
self.logger.debug("Unmuted track relay")
self.background.background_loop.call_soon_threadsafe(unmute_background)
def on_track(self, callback):
async def loop():
while not self.stopped:
try:
track = await asyncio.wait_for(self.track_queue.get(), 5)
except asyncio.TimeoutError:
continue
if not track:
break
await callback(track)
self.background.main_loop.create_task(loop())