run ffmpeg to convert inputs to known format

This commit is contained in:
Brett Jia
2023-01-13 21:23:55 -05:00
parent 09de710bde
commit c3506be5ab
2 changed files with 44 additions and 69 deletions

View File

@@ -3,6 +3,7 @@ from aiortc import RTCSessionDescription, RTCIceGatherer, RTCIceServer
from aiortc.rtcicetransport import candidate_to_aioice, candidate_from_aioice
import asyncio
import json
import socket
import scrypted_sdk
from scrypted_sdk import ScryptedDeviceBase
@@ -154,19 +155,31 @@ class ArloCamera(ScryptedDeviceBase, Camera, VideoCamera, Intercom, MotionSensor
ffmpeg_params = json.loads(await scrypted_sdk.mediaManager.convertMediaObjectToBuffer(media, ScryptedMimeTypes.FFmpegInput.value))
self.logger.debug(f"Received ffmpeg params: {ffmpeg_params}")
options = {}
current_key = None
for arg in ffmpeg_params["inputArguments"]:
if current_key is None and not arg.startswith("-"):
self.logger.warning(f"Ignoring unknown ffmpeg argument {arg}")
continue
if arg.startswith("-"):
current_key = arg.lstrip("-")
options[current_key] = ""
continue
options[current_key] = (options[current_key] + " " + arg).strip()
# Reserve a port for us to give to ffmpeg
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('localhost', 0))
port = sock.getsockname()[1]
self.logger.debug(f"Parsed ffmpeg params: {options}")
# Start ffmpeg to convert input into something we know PyAV understands
ffmpeg_path = await scrypted_sdk.mediaManager.getFFmpegPath()
ffmpeg_args = [
"-y",
"-hide_banner",
"-loglevel", "error",
"-analyzeduration", "0",
"-fflags", "-nobuffer",
"-probesize", "32",
*ffmpeg_params["inputArguments"],
"-vn", "-dn", "-sn",
"-f", "mpegts",
"-flush_packets", "1",
f"udp://localhost:{port}"
]
ffmpeg = await asyncio.create_subprocess_exec(ffmpeg_path, *ffmpeg_args)
def cleanup():
ffmpeg.kill()
self.cleanup = cleanup
session_id, ice_servers = self.provider.arlo.StartPushToTalk(self.arlo_basestation, self.arlo_device)
self.logger.debug(f"Received ice servers: {[ice['url'] for ice in ice_servers]}")
@@ -195,7 +208,12 @@ class ArloCamera(ScryptedDeviceBase, Camera, VideoCamera, Intercom, MotionSensor
pc = self.pc = BackgroundRTCPeerConnection()
self.sdp_answered = False
await pc.add_audio(options)
sock.close()
await pc.add_audio(
f"udp://localhost:{port}",
format="mpegts",
options={"analyzeduration": "0", "probesize": "32", "flush_packets": "1"}
)
offer = await pc.createOffer()
self.logger.info(f"Arlo offer sdp:\n{offer.sdp}")
@@ -218,7 +236,9 @@ class ArloCamera(ScryptedDeviceBase, Camera, VideoCamera, Intercom, MotionSensor
self.logger.info("Stopping intercom")
if self.pc:
await self.pc.close()
self.cleanup()
self.pc = None
self.cleanup = None
self.sdp_answered = False
def _update_device_details(self, arlo_device):

View File

@@ -3,11 +3,11 @@ from aiortc.contrib.media import MediaPlayer
import asyncio
import threading
import logging
import os
import queue
import socket
import sys
import tempfile
from urllib.parse import urlparse
import scrypted_sdk
# construct logger instance to be used by BackgroundRTCPeerConnection
@@ -49,7 +49,7 @@ class BackgroundRTCPeerConnection:
self.pending_tasks = set()
self.stopped = False
self.cleanup_background = None
self.cleanup = None
def __background_main(self):
logger.debug(f"Background RTC loop {self.thread.name} starting")
@@ -59,9 +59,6 @@ class BackgroundRTCPeerConnection:
self.thread_started.put(True)
self.background_loop.run_forever()
if self.cleanup_background is not None:
self.cleanup_background()
logger.debug(f"Background RTC loop {self.thread.name} exiting")
async def __run_background(self, coroutine, await_result=True, stop_loop=False):
@@ -120,10 +117,12 @@ class BackgroundRTCPeerConnection:
if self.stopped:
return
self.stopped = True
if self.cleanup:
await self.cleanup()
await self.__run_background(self.pc.close(), await_result=False, stop_loop=True)
async def add_audio(self, options):
"""Adds an audio track to the RTCPeerConnection given FFmpeg options.
async def add_audio(self, endpoint, format, options={}):
"""Adds an audio track to the RTCPeerConnection, using provided FFmpeg args.
This constructs a MediaPlayer in the background thread's asyncio loop,
since MediaPlayer also utilizes coroutines and asyncio.
@@ -131,22 +130,10 @@ class BackgroundRTCPeerConnection:
Note that this may block the background thread's event loop if the
server is not yet ready.
"""
try:
input = options["i"]
format = options.get("f")
if format is None and input.startswith("rtsp"):
format = "rtsp"
except:
logger.error("error detecting what input file and format to use")
raise
logger.info(f"Intercom sourced from {input} with format {format}")
if format == "sdp" and input.startswith("tcp"):
input = await self.__sdp_to_file(input)
def add_audio_background():
media_player = MediaPlayer(input, format=format, options=options)
media_player = MediaPlayer(endpoint, format=format, options=options)
media_player._throttle_playback = False
# patch the player's stop function to close RTC if
# the media ends before RTC is closed
@@ -158,36 +145,4 @@ class BackgroundRTCPeerConnection:
self.pc.addTrack(media_player.audio)
self.background_loop.call_soon_threadsafe(add_audio_background)
async def __sdp_to_file(self, endpoint):
url = urlparse(endpoint)
logger.debug(f"Reading sdp file from {url.hostname}:{url.port}")
reader, writer = await asyncio.open_connection(url.hostname, url.port)
sdp_contents = bytes()
while True:
line = await reader.readline()
if not line:
break
sdp_contents += line
logger.debug("Finished reading sdp")
writer.close()
await writer.wait_closed()
logger.info(f"Received intercom input sdp:\n{sdp_contents.decode('utf-8')}")
fd, filename = tempfile.mkstemp(".sdp")
os.write(fd, sdp_contents)
os.close(fd)
logger.info(f"Wrote sdp to file {filename}")
def cleanup_background():
os.remove(filename)
logger.info(f"Deleted sdp file {filename}")
self.cleanup_background = cleanup_background
return filename
self.background_loop.call_soon_threadsafe(add_audio_background)