From c3506be5abe88facdce6cacbac31ac3e5607ca66 Mon Sep 17 00:00:00 2001 From: Brett Jia Date: Fri, 13 Jan 2023 21:23:55 -0500 Subject: [PATCH] run ffmpeg to convert inputs to known format --- plugins/arlo/src/arlo_plugin/camera.py | 46 +++++++++---- .../arlo/src/arlo_plugin/rtcpeerconnection.py | 67 +++---------------- 2 files changed, 44 insertions(+), 69 deletions(-) diff --git a/plugins/arlo/src/arlo_plugin/camera.py b/plugins/arlo/src/arlo_plugin/camera.py index b8ab03ca1..772f625e1 100644 --- a/plugins/arlo/src/arlo_plugin/camera.py +++ b/plugins/arlo/src/arlo_plugin/camera.py @@ -3,6 +3,7 @@ from aiortc import RTCSessionDescription, RTCIceGatherer, RTCIceServer from aiortc.rtcicetransport import candidate_to_aioice, candidate_from_aioice import asyncio import json +import socket import scrypted_sdk from scrypted_sdk import ScryptedDeviceBase @@ -154,19 +155,31 @@ class ArloCamera(ScryptedDeviceBase, Camera, VideoCamera, Intercom, MotionSensor ffmpeg_params = json.loads(await scrypted_sdk.mediaManager.convertMediaObjectToBuffer(media, ScryptedMimeTypes.FFmpegInput.value)) self.logger.debug(f"Received ffmpeg params: {ffmpeg_params}") - options = {} - current_key = None - for arg in ffmpeg_params["inputArguments"]: - if current_key is None and not arg.startswith("-"): - self.logger.warning(f"Ignoring unknown ffmpeg argument {arg}") - continue - if arg.startswith("-"): - current_key = arg.lstrip("-") - options[current_key] = "" - continue - options[current_key] = (options[current_key] + " " + arg).strip() + # Reserve a port for us to give to ffmpeg + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.bind(('localhost', 0)) + port = sock.getsockname()[1] - self.logger.debug(f"Parsed ffmpeg params: {options}") + # Start ffmpeg to convert input into something we know PyAV understands + ffmpeg_path = await scrypted_sdk.mediaManager.getFFmpegPath() + ffmpeg_args = [ + "-y", + "-hide_banner", + "-loglevel", "error", + "-analyzeduration", "0", + "-fflags", "-nobuffer", + "-probesize", "32", + *ffmpeg_params["inputArguments"], + "-vn", "-dn", "-sn", + "-f", "mpegts", + "-flush_packets", "1", + f"udp://localhost:{port}" + ] + ffmpeg = await asyncio.create_subprocess_exec(ffmpeg_path, *ffmpeg_args) + + def cleanup(): + ffmpeg.kill() + self.cleanup = cleanup session_id, ice_servers = self.provider.arlo.StartPushToTalk(self.arlo_basestation, self.arlo_device) self.logger.debug(f"Received ice servers: {[ice['url'] for ice in ice_servers]}") @@ -195,7 +208,12 @@ class ArloCamera(ScryptedDeviceBase, Camera, VideoCamera, Intercom, MotionSensor pc = self.pc = BackgroundRTCPeerConnection() self.sdp_answered = False - await pc.add_audio(options) + sock.close() + await pc.add_audio( + f"udp://localhost:{port}", + format="mpegts", + options={"analyzeduration": "0", "probesize": "32", "flush_packets": "1"} + ) offer = await pc.createOffer() self.logger.info(f"Arlo offer sdp:\n{offer.sdp}") @@ -218,7 +236,9 @@ class ArloCamera(ScryptedDeviceBase, Camera, VideoCamera, Intercom, MotionSensor self.logger.info("Stopping intercom") if self.pc: await self.pc.close() + self.cleanup() self.pc = None + self.cleanup = None self.sdp_answered = False def _update_device_details(self, arlo_device): diff --git a/plugins/arlo/src/arlo_plugin/rtcpeerconnection.py b/plugins/arlo/src/arlo_plugin/rtcpeerconnection.py index 4e06ea9a0..3bb21c55e 100644 --- a/plugins/arlo/src/arlo_plugin/rtcpeerconnection.py +++ b/plugins/arlo/src/arlo_plugin/rtcpeerconnection.py @@ -3,11 +3,11 @@ from aiortc.contrib.media import MediaPlayer import asyncio import threading import logging -import os import queue +import socket import sys -import tempfile -from urllib.parse import urlparse + +import scrypted_sdk # construct logger instance to be used by BackgroundRTCPeerConnection @@ -49,7 +49,7 @@ class BackgroundRTCPeerConnection: self.pending_tasks = set() self.stopped = False - self.cleanup_background = None + self.cleanup = None def __background_main(self): logger.debug(f"Background RTC loop {self.thread.name} starting") @@ -59,9 +59,6 @@ class BackgroundRTCPeerConnection: self.thread_started.put(True) self.background_loop.run_forever() - if self.cleanup_background is not None: - self.cleanup_background() - logger.debug(f"Background RTC loop {self.thread.name} exiting") async def __run_background(self, coroutine, await_result=True, stop_loop=False): @@ -120,10 +117,12 @@ class BackgroundRTCPeerConnection: if self.stopped: return self.stopped = True + if self.cleanup: + await self.cleanup() await self.__run_background(self.pc.close(), await_result=False, stop_loop=True) - async def add_audio(self, options): - """Adds an audio track to the RTCPeerConnection given FFmpeg options. + async def add_audio(self, endpoint, format, options={}): + """Adds an audio track to the RTCPeerConnection, using provided FFmpeg args. This constructs a MediaPlayer in the background thread's asyncio loop, since MediaPlayer also utilizes coroutines and asyncio. @@ -131,22 +130,10 @@ class BackgroundRTCPeerConnection: Note that this may block the background thread's event loop if the server is not yet ready. """ - try: - input = options["i"] - format = options.get("f") - if format is None and input.startswith("rtsp"): - format = "rtsp" - except: - logger.error("error detecting what input file and format to use") - raise - - logger.info(f"Intercom sourced from {input} with format {format}") - - if format == "sdp" and input.startswith("tcp"): - input = await self.__sdp_to_file(input) def add_audio_background(): - media_player = MediaPlayer(input, format=format, options=options) + media_player = MediaPlayer(endpoint, format=format, options=options) + media_player._throttle_playback = False # patch the player's stop function to close RTC if # the media ends before RTC is closed @@ -158,36 +145,4 @@ class BackgroundRTCPeerConnection: self.pc.addTrack(media_player.audio) - self.background_loop.call_soon_threadsafe(add_audio_background) - - async def __sdp_to_file(self, endpoint): - url = urlparse(endpoint) - logger.debug(f"Reading sdp file from {url.hostname}:{url.port}") - reader, writer = await asyncio.open_connection(url.hostname, url.port) - - sdp_contents = bytes() - while True: - line = await reader.readline() - if not line: - break - sdp_contents += line - - logger.debug("Finished reading sdp") - - writer.close() - await writer.wait_closed() - - logger.info(f"Received intercom input sdp:\n{sdp_contents.decode('utf-8')}") - - fd, filename = tempfile.mkstemp(".sdp") - os.write(fd, sdp_contents) - os.close(fd) - - logger.info(f"Wrote sdp to file {filename}") - - def cleanup_background(): - os.remove(filename) - logger.info(f"Deleted sdp file {filename}") - self.cleanup_background = cleanup_background - - return filename \ No newline at end of file + self.background_loop.call_soon_threadsafe(add_audio_background) \ No newline at end of file