webrtc: update werift upstream.

This commit is contained in:
Koushik Dutta
2022-07-22 11:14:53 -07:00
parent a0bcc0bbb3
commit e8fbc1cdfa
8 changed files with 149 additions and 69 deletions

View File

@@ -615,14 +615,15 @@ export class RtspServer {
sdp = sdp.trim();
}
async handleSetup() {
async handleSetup(methods = ['play', 'record', 'teardown']) {
let currentHeaders: string[] = [];
while (true) {
let line = await readLine(this.client);
line = line.trim();
if (!line) {
if (!await this.headers(currentHeaders))
break;
const method = await this.headers(currentHeaders);
if (methods.includes(method))
return method;
currentHeaders = [];
continue;
}
@@ -833,7 +834,7 @@ export class RtspServer {
}
await thisAny[method](url, requestHeaders);
return method !== 'play' && method !== 'record' && method !== 'teardown';
return method;
}
respond(code: number, message: string, requestHeaders: Headers, headers: Headers, buffer?: Buffer) {

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/webrtc",
"version": "0.0.52",
"version": "0.0.53",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/webrtc",
"version": "0.0.52",
"version": "0.0.53",
"hasInstallScript": true,
"dependencies": {
"@koush/werift": "file:../../external/werift/packages/webrtc",

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/webrtc",
"version": "0.0.52",
"version": "0.0.53",
"scripts": {
"scrypted-setup-project": "scrypted-setup-project",
"prescrypted-setup-project": "scrypted-package-json",

View File

@@ -183,9 +183,7 @@ export async function createRTCPeerConnectionSink(
const { name: audioCodecName } = getAudioCodec(audioTransceiver.sender.codec);
let audioCodecCopy = maximumCompatibilityMode ? undefined : audioCodecName;
const videoTranscodeArguments: string[] = [
'-an', '-sn', '-dn',
];
const videoTranscodeArguments: string[] = [];
const transcode = transcodeBaseline
|| mediaStreamOptions?.video?.codec !== 'h264'
|| ffmpegInput.h264EncoderArguments?.length
@@ -252,7 +250,7 @@ export async function createRTCPeerConnectionSink(
const audioRtpTrack: RtpTrack = {
codecCopy: audioCodecCopy,
onRtp: buffer => audioTransceiver.sender.sendRtp(buffer),
outputArguments: [
encoderArguments: [
...audioTranscodeArguments,
]
};
@@ -276,7 +274,7 @@ export async function createRTCPeerConnectionSink(
videoTransceiver.sender.sendRtp(packet);
}
},
outputArguments: [
encoderArguments: [
...videoTranscodeArguments,
],
firstPacket: () => console.log('first video packet', Date.now() - timeStart),

View File

@@ -1,6 +1,7 @@
import { closeQuiet, createBindZero } from "@scrypted/common/src/listen-cluster";
import { Deferred } from "@scrypted/common/src/deferred";
import { closeQuiet, createBindZero, listenZeroSingleClient } from "@scrypted/common/src/listen-cluster";
import { ffmpegLogInitialOutput, safeKillFFmpeg, safePrintFFmpegArguments } from "@scrypted/common/src/media-helpers";
import { RtspClient } from "@scrypted/common/src/rtsp-server";
import { RtspClient, RtspServer } from "@scrypted/common/src/rtsp-server";
import { addTrackControls, MSection, parseSdp, replaceSectionPort } from "@scrypted/common/src/sdp-utils";
import sdk, { FFmpegInput } from "@scrypted/sdk";
import child_process, { ChildProcess } from 'child_process';
@@ -13,7 +14,8 @@ export interface RtpTrack {
codecCopy?: string;
ffmpegDestination?: string;
packetSize?: number;
outputArguments: string[];
encoderArguments: string[];
outputArguments?: string[];
onRtp(rtp: Buffer): void;
onMSection?: (msection: MSection) => void;
firstPacket?: () => void;
@@ -46,29 +48,13 @@ export async function createTrackForwarders(console: Console, rtpTracks: RtpTrac
const { server, port } = track.bind;
sockets[key] = server;
server.once('message', () => track.firstPacket?.());
const outputArguments = track.outputArguments;
const outputArguments = track.outputArguments = [];
if (track.payloadType)
outputArguments.push('-payload_type', track.payloadType.toString());
if (track.ssrc)
outputArguments.push('-ssrc', track.ssrc.toString());
outputArguments.push('-f', 'rtp');
const ip = track.ffmpegDestination || '127.0.0.1';
const params = new URLSearchParams();
let url = `rtp://${ip}:${port}`;
if (track.rtcpPort)
params.set('rtcpport', track.rtcpPort.toString());
if (track.packetSize)
params.set('pkt_size', track.packetSize.toString());
if (track.srtp) {
url = `s${url}`;
outputArguments.push(
"-srtp_out_suite", track.srtp.crytoSuite,
"-srtp_out_params", track.srtp.key.toString('base64'),
);
}
url = `${url}?${params}`;
outputArguments.push(url);
server.on('message', data => track.onRtp(data));
}
@@ -91,21 +77,24 @@ function isCodecCopy(desiredCodec: string, checkCodec: string) {
export type RtpForwarderProcess = Awaited<ReturnType<typeof startRtpForwarderProcess>>;
export async function startRtpForwarderProcess(console: Console, ffmpegInput: FFmpegInput, rtpTracks: RtpTracks) {
export async function startRtpForwarderProcess(console: Console, ffmpegInput: FFmpegInput, rtpTracks: RtpTracks, rtspMode?: 'udp' | 'tcp' | 'pull') {
let { inputArguments, videoDecoderArguments } = ffmpegInput;
let rtspClient: RtspClient;
let sockets: dgram.Socket[] = [];
let pipeSdp: string;
const { video, audio } = rtpTracks;
rtpTracks = Object.assign({}, rtpTracks);
const videoCodec = video.codecCopy;
const audioCodec = audio?.codecCopy;
let videoSection: MSection;
let audioSection: MSection;
const isRtsp = ffmpegInput.container?.startsWith('rtsp');
const videoSectionDeferred = new Deferred<MSection>();
const audioSectionDeferred = new Deferred<MSection>();
videoSectionDeferred.promise.then(s => video?.onMSection?.(s));
audioSectionDeferred.promise.then(s => audio?.onMSection?.(s));
if (ffmpegInput.url
&& isRtsp
&& isCodecCopy(videoCodec, ffmpegInput.mediaStreamOptions?.video?.codec)) {
@@ -123,14 +112,12 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF
const sdp = describe.body.toString();
const parsedSdp = parseSdp(sdp);
rtpTracks = Object.assign({}, rtpTracks);
videoSection = parsedSdp.msections.find(msection => msection.type === 'video' && (msection.codec === videoCodec || videoCodec === 'copy'));
const videoSection = parsedSdp.msections.find(msection => msection.type === 'video' && (msection.codec === videoCodec || videoCodec === 'copy'));
// maybe fallback to udp forwarding/transcoding?
if (!videoSection)
throw new Error(`advertised video codec ${videoCodec} not found in sdp.`);
video.onMSection?.(videoSection);
videoSectionDeferred.resolve(videoSection);
let channel = 0;
await rtspClient.setup({
@@ -143,7 +130,7 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF
})
channel += 2;
audioSection = parsedSdp.msections.find(msection => msection.type === 'audio' && (msection.codec === audioCodec || audioCodec === 'copy'));
const audioSection = parsedSdp.msections.find(msection => msection.type === 'audio' && (msection.codec === audioCodec || audioCodec === 'copy'));
if (audio) {
if (audioSection
@@ -153,7 +140,7 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF
delete rtpTracks.audio;
audio.onMSection?.(audioSection);
audioSectionDeferred.resolve(audioSection);
await rtspClient.setup({
type: 'tcp',
@@ -168,12 +155,13 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF
console.log('audio codec transcoding:', audio.codecCopy);
const newSdp = parseSdp(sdp);
audioSection = newSdp.msections.find(msection => msection.type === 'audio' && msection.codec === audioCodec)
let audioSection = newSdp.msections.find(msection => msection.type === 'audio' && msection.codec === audioCodec)
if (!audioSection)
audioSection = newSdp.msections.find(msection => msection.type === 'audio');
if (!audioSection) {
console.warn(`audio section not found in sdp.`);
audioSectionDeferred.resolve(undefined);
}
else {
newSdp.msections = newSdp.msections.filter(msection => msection === audioSection);
@@ -192,8 +180,6 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF
const audioSender = await createBindZero();
sockets.push(audioSender.server);
audio.onMSection?.(audioSection);
await rtspClient.setup({
type: 'tcp',
port: channel,
@@ -205,6 +191,9 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF
}
}
}
else {
audioSectionDeferred.resolve(undefined);
}
await rtspClient.play();
}
@@ -217,16 +206,63 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF
console.log('video codec/container not matched, transcoding:', rtpTracks.audio?.codecCopy);
}
const reportTranscodedSections = (sdp: string) => {
const parsedSdp = parseSdp(sdp);
const videoSection = parsedSdp.msections.find(msection => msection.type === 'video');
const audioSection = parsedSdp.msections.find(msection => msection.type === 'audio');
videoSectionDeferred.resolve(videoSection);
audioSectionDeferred.resolve(audioSection);
return { videoSection, audioSection };
}
const forwarders = await createTrackForwarders(console, rtpTracks);
const useRtp = !rtspMode;
const rtspServerDeferred = new Deferred<RtspServer>();
let cp: ChildProcess;
// will no op if there's no tracks
if (Object.keys(rtpTracks).length) {
if (useRtp) {
rtspServerDeferred.resolve(undefined);
for (const key of Object.keys(rtpTracks)) {
const track: RtpTrack = rtpTracks[key];
const ip = track.ffmpegDestination || '127.0.0.1';
const params = new URLSearchParams();
const { port } = track.bind;
let url = `rtp://${ip}:${port}`;
if (track.rtcpPort)
params.set('rtcpport', track.rtcpPort.toString());
if (track.packetSize)
params.set('pkt_size', track.packetSize.toString());
if (track.srtp) {
url = `s${url}`;
track.outputArguments.push(
"-srtp_out_suite", track.srtp.crytoSuite,
"-srtp_out_params", track.srtp.key.toString('base64'),
);
}
url = `${url}?${params}`;
track.outputArguments.push('-dn', '-sn');
if (key !== 'video')
track.outputArguments.push('-vn');
if (key !== 'audio')
track.outputArguments.push('-an');
track.outputArguments.push('-f', 'rtp');
track.outputArguments.push(url);
}
}
const outputArguments: string[] = [];
for (const key of Object.keys(rtpTracks)) {
const track: RtpTrack = rtpTracks[key];
outputArguments.push(...track.outputArguments);
outputArguments.push(...track.encoderArguments, ...track.outputArguments);
}
const args = [
@@ -235,9 +271,60 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF
...(videoDecoderArguments || []),
...inputArguments,
...outputArguments,
'-sdp_file', 'pipe:4',
];
if (useRtp) {
args.push(
'-sdp_file', 'pipe:4',
);
}
else {
// seems better to use udp for audio timing/chop.
const useUdp = rtspMode === 'udp';
const serverPort = await listenZeroSingleClient();
args.push(
'-rtsp_transport',
useUdp ? 'udp' : 'tcp',
'-f', 'rtsp',
`rtsp://127.0.0.1:${serverPort.port}`
);
serverPort.clientPromise.then(async (client) => {
const rtspServer = new RtspServer(client, undefined, useUdp);
rtspServer.console = console;
await rtspServer.handleSetup(['announce']);
const { videoSection, audioSection } = reportTranscodedSections(rtspServer.sdp);
await rtspServer.handleSetup();
rtspServer.setupTracks[videoSection?.control]?.rtp?.on('message', rtp => {
rtpTracks.video.onRtp(rtp);;
});
rtspServer.setupTracks[audioSection?.control]?.rtp?.on('message', rtp => {
rtpTracks.audio.onRtp(rtp);;
});
rtspServerDeferred.resolve(rtspServer);
if (rtspMode !== 'pull') {
for await (const rtspSample of rtspServer.handleRecord()) {
if (rtspSample.type === videoSection.codec) {
rtpTracks.video.onRtp(rtspSample.packet);
}
else if (rtspSample.type === audioSection?.codec) {
rtpTracks.audio.onRtp(rtspSample.packet);
}
else {
console.warn('unexpected rtsp sample', rtspSample.type);
}
}
}
});
}
safePrintFFmpegArguments(console, args);
cp = child_process.spawn(await mediaManager.getFFmpegPath(), args, {
@@ -250,6 +337,13 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF
}
ffmpegLogInitialOutput(console, cp);
cp.on('exit', () => forwarders.close());
if (useRtp) {
cp.stdio[4].on('data', data => {
const transcodeSdp = data.toString();
reportTranscodedSections(transcodeSdp);
});
}
}
else {
console.log('bypassing ffmpeg, perfect codecs');
@@ -281,21 +375,10 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF
rtspClient?.readLoop().catch(() => { }).finally(kill);
});
if (Object.keys(rtpTracks).length) {
const transcodeSdp = await new Promise<string>((resolve, reject) => {
cp.on('exit', () => reject(new Error('ffmpeg exited before sdp was received')));
cp.stdio[4].on('data', data => {
resolve(data.toString());
});
});
const parsedSdp = parseSdp(transcodeSdp);
videoSection = parsedSdp.msections.find(msection => msection.type === 'video') || videoSection;
audioSection = parsedSdp.msections.find(msection => msection.type === 'audio') || audioSection;
}
return {
videoSection,
audioSection,
rtspServer: rtspServerDeferred.promise,
videoSection: videoSectionDeferred.promise,
audioSection: audioSectionDeferred.promise,
kill,
killPromise,
get killed() {

View File

@@ -55,9 +55,7 @@ export function getAudioCodec(outputCodecParameters: RTCRtpCodecParameters) {
}
export function getFFmpegRtpAudioOutputArguments(inputCodec: string, outputCodecParameters: RTCRtpCodecParameters, maximumCompatibilityMode: boolean) {
const ret = [
'-vn', '-sn', '-dn',
];
const ret: string[] = [];
const { encoder, name } = getAudioCodec(outputCodecParameters);

View File

@@ -78,7 +78,7 @@ export async function createRTCPeerConnectionSource(options: {
const pc = await peerConnection.promise;
audioTransceiver = pc.addTransceiver("audio", setup.audio as any);
audioTransceiver.mid = '0';
// audioTransceiver.mid = '0';
audioTransceiver.onTrack.subscribe((track) => {
track.onReceiveRtp.subscribe(rtp => {
if (!gotAudio) {
@@ -91,7 +91,7 @@ export async function createRTCPeerConnectionSource(options: {
});
const videoTransceiver = pc.addTransceiver("video", setup.video as any);
videoTransceiver.mid = '1';
// videoTransceiver.mid = '1';
videoTransceiver.onTrack.subscribe((track) => {
track.onReceiveRtp.subscribe(rtp => {
if (!gotVideo) {
@@ -158,7 +158,7 @@ export async function createRTCPeerConnectionSource(options: {
const pc = await peerConnection.promise;
if (setup.datachannel) {
pc.createDataChannel(setup.datachannel.label, setup.datachannel.dict);
pc.sctpTransport.mid = '2';
// pc.sctpTransport.mid = '2';
}
const gatheringPromise = pc.iceGatheringState === 'complete' ? Promise.resolve(undefined) : new Promise(resolve => pc.iceGatheringStateChange.subscribe(state => {