From e8fbc1cdfa183f06bca52b404e7acd279108bfe9 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Fri, 22 Jul 2022 11:14:53 -0700 Subject: [PATCH] webrtc: update werift upstream. --- common/src/rtsp-server.ts | 9 +- external/werift | 2 +- plugins/webrtc/package-lock.json | 4 +- plugins/webrtc/package.json | 2 +- plugins/webrtc/src/ffmpeg-to-wrtc.ts | 8 +- plugins/webrtc/src/rtp-forwarders.ts | 183 ++++++++++++++----- plugins/webrtc/src/webrtc-required-codecs.ts | 4 +- plugins/webrtc/src/wrtc-to-rtsp.ts | 6 +- 8 files changed, 149 insertions(+), 69 deletions(-) diff --git a/common/src/rtsp-server.ts b/common/src/rtsp-server.ts index fca752fb0..5a95d36bf 100644 --- a/common/src/rtsp-server.ts +++ b/common/src/rtsp-server.ts @@ -615,14 +615,15 @@ export class RtspServer { sdp = sdp.trim(); } - async handleSetup() { + async handleSetup(methods = ['play', 'record', 'teardown']) { let currentHeaders: string[] = []; while (true) { let line = await readLine(this.client); line = line.trim(); if (!line) { - if (!await this.headers(currentHeaders)) - break; + const method = await this.headers(currentHeaders); + if (methods.includes(method)) + return method; currentHeaders = []; continue; } @@ -833,7 +834,7 @@ export class RtspServer { } await thisAny[method](url, requestHeaders); - return method !== 'play' && method !== 'record' && method !== 'teardown'; + return method; } respond(code: number, message: string, requestHeaders: Headers, headers: Headers, buffer?: Buffer) { diff --git a/external/werift b/external/werift index e311e8006..470d17c0f 160000 --- a/external/werift +++ b/external/werift @@ -1 +1 @@ -Subproject commit e311e800666e5f21bbf5b704b55ea43b328a09f3 +Subproject commit 470d17c0f096fe61a00f131b76d0f4162211e57c diff --git a/plugins/webrtc/package-lock.json b/plugins/webrtc/package-lock.json index 104c0321b..4fda7fb02 100644 --- a/plugins/webrtc/package-lock.json +++ b/plugins/webrtc/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/webrtc", - "version": "0.0.52", + "version": "0.0.53", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/webrtc", - "version": "0.0.52", + "version": "0.0.53", "hasInstallScript": true, "dependencies": { "@koush/werift": "file:../../external/werift/packages/webrtc", diff --git a/plugins/webrtc/package.json b/plugins/webrtc/package.json index 75afe099a..bdc22d022 100644 --- a/plugins/webrtc/package.json +++ b/plugins/webrtc/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/webrtc", - "version": "0.0.52", + "version": "0.0.53", "scripts": { "scrypted-setup-project": "scrypted-setup-project", "prescrypted-setup-project": "scrypted-package-json", diff --git a/plugins/webrtc/src/ffmpeg-to-wrtc.ts b/plugins/webrtc/src/ffmpeg-to-wrtc.ts index fe5416450..b28641e7a 100644 --- a/plugins/webrtc/src/ffmpeg-to-wrtc.ts +++ b/plugins/webrtc/src/ffmpeg-to-wrtc.ts @@ -183,9 +183,7 @@ export async function createRTCPeerConnectionSink( const { name: audioCodecName } = getAudioCodec(audioTransceiver.sender.codec); let audioCodecCopy = maximumCompatibilityMode ? undefined : audioCodecName; - const videoTranscodeArguments: string[] = [ - '-an', '-sn', '-dn', - ]; + const videoTranscodeArguments: string[] = []; const transcode = transcodeBaseline || mediaStreamOptions?.video?.codec !== 'h264' || ffmpegInput.h264EncoderArguments?.length @@ -252,7 +250,7 @@ export async function createRTCPeerConnectionSink( const audioRtpTrack: RtpTrack = { codecCopy: audioCodecCopy, onRtp: buffer => audioTransceiver.sender.sendRtp(buffer), - outputArguments: [ + encoderArguments: [ ...audioTranscodeArguments, ] }; @@ -276,7 +274,7 @@ export async function createRTCPeerConnectionSink( videoTransceiver.sender.sendRtp(packet); } }, - outputArguments: [ + encoderArguments: [ ...videoTranscodeArguments, ], firstPacket: () => console.log('first video packet', Date.now() - timeStart), diff --git a/plugins/webrtc/src/rtp-forwarders.ts b/plugins/webrtc/src/rtp-forwarders.ts index d9cc998ab..22eae694a 100644 --- a/plugins/webrtc/src/rtp-forwarders.ts +++ b/plugins/webrtc/src/rtp-forwarders.ts @@ -1,6 +1,7 @@ -import { closeQuiet, createBindZero } from "@scrypted/common/src/listen-cluster"; +import { Deferred } from "@scrypted/common/src/deferred"; +import { closeQuiet, createBindZero, listenZeroSingleClient } from "@scrypted/common/src/listen-cluster"; import { ffmpegLogInitialOutput, safeKillFFmpeg, safePrintFFmpegArguments } from "@scrypted/common/src/media-helpers"; -import { RtspClient } from "@scrypted/common/src/rtsp-server"; +import { RtspClient, RtspServer } from "@scrypted/common/src/rtsp-server"; import { addTrackControls, MSection, parseSdp, replaceSectionPort } from "@scrypted/common/src/sdp-utils"; import sdk, { FFmpegInput } from "@scrypted/sdk"; import child_process, { ChildProcess } from 'child_process'; @@ -13,7 +14,8 @@ export interface RtpTrack { codecCopy?: string; ffmpegDestination?: string; packetSize?: number; - outputArguments: string[]; + encoderArguments: string[]; + outputArguments?: string[]; onRtp(rtp: Buffer): void; onMSection?: (msection: MSection) => void; firstPacket?: () => void; @@ -46,29 +48,13 @@ export async function createTrackForwarders(console: Console, rtpTracks: RtpTrac const { server, port } = track.bind; sockets[key] = server; server.once('message', () => track.firstPacket?.()); - const outputArguments = track.outputArguments; + const outputArguments = track.outputArguments = []; if (track.payloadType) outputArguments.push('-payload_type', track.payloadType.toString()); if (track.ssrc) outputArguments.push('-ssrc', track.ssrc.toString()); - outputArguments.push('-f', 'rtp'); - const ip = track.ffmpegDestination || '127.0.0.1'; - const params = new URLSearchParams(); - let url = `rtp://${ip}:${port}`; - if (track.rtcpPort) - params.set('rtcpport', track.rtcpPort.toString()); - if (track.packetSize) - params.set('pkt_size', track.packetSize.toString()); - if (track.srtp) { - url = `s${url}`; - outputArguments.push( - "-srtp_out_suite", track.srtp.crytoSuite, - "-srtp_out_params", track.srtp.key.toString('base64'), - ); - } - url = `${url}?${params}`; - outputArguments.push(url); + server.on('message', data => track.onRtp(data)); } @@ -91,21 +77,24 @@ function isCodecCopy(desiredCodec: string, checkCodec: string) { export type RtpForwarderProcess = Awaited>; -export async function startRtpForwarderProcess(console: Console, ffmpegInput: FFmpegInput, rtpTracks: RtpTracks) { +export async function startRtpForwarderProcess(console: Console, ffmpegInput: FFmpegInput, rtpTracks: RtpTracks, rtspMode?: 'udp' | 'tcp' | 'pull') { let { inputArguments, videoDecoderArguments } = ffmpegInput; let rtspClient: RtspClient; let sockets: dgram.Socket[] = []; let pipeSdp: string; const { video, audio } = rtpTracks; + rtpTracks = Object.assign({}, rtpTracks); const videoCodec = video.codecCopy; const audioCodec = audio?.codecCopy; - let videoSection: MSection; - let audioSection: MSection; - const isRtsp = ffmpegInput.container?.startsWith('rtsp'); + const videoSectionDeferred = new Deferred(); + const audioSectionDeferred = new Deferred(); + videoSectionDeferred.promise.then(s => video?.onMSection?.(s)); + audioSectionDeferred.promise.then(s => audio?.onMSection?.(s)); + if (ffmpegInput.url && isRtsp && isCodecCopy(videoCodec, ffmpegInput.mediaStreamOptions?.video?.codec)) { @@ -123,14 +112,12 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF const sdp = describe.body.toString(); const parsedSdp = parseSdp(sdp); - rtpTracks = Object.assign({}, rtpTracks); - - videoSection = parsedSdp.msections.find(msection => msection.type === 'video' && (msection.codec === videoCodec || videoCodec === 'copy')); + const videoSection = parsedSdp.msections.find(msection => msection.type === 'video' && (msection.codec === videoCodec || videoCodec === 'copy')); // maybe fallback to udp forwarding/transcoding? if (!videoSection) throw new Error(`advertised video codec ${videoCodec} not found in sdp.`); - video.onMSection?.(videoSection); + videoSectionDeferred.resolve(videoSection); let channel = 0; await rtspClient.setup({ @@ -143,7 +130,7 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF }) channel += 2; - audioSection = parsedSdp.msections.find(msection => msection.type === 'audio' && (msection.codec === audioCodec || audioCodec === 'copy')); + const audioSection = parsedSdp.msections.find(msection => msection.type === 'audio' && (msection.codec === audioCodec || audioCodec === 'copy')); if (audio) { if (audioSection @@ -153,7 +140,7 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF delete rtpTracks.audio; - audio.onMSection?.(audioSection); + audioSectionDeferred.resolve(audioSection); await rtspClient.setup({ type: 'tcp', @@ -168,12 +155,13 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF console.log('audio codec transcoding:', audio.codecCopy); const newSdp = parseSdp(sdp); - audioSection = newSdp.msections.find(msection => msection.type === 'audio' && msection.codec === audioCodec) + let audioSection = newSdp.msections.find(msection => msection.type === 'audio' && msection.codec === audioCodec) if (!audioSection) audioSection = newSdp.msections.find(msection => msection.type === 'audio'); if (!audioSection) { console.warn(`audio section not found in sdp.`); + audioSectionDeferred.resolve(undefined); } else { newSdp.msections = newSdp.msections.filter(msection => msection === audioSection); @@ -192,8 +180,6 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF const audioSender = await createBindZero(); sockets.push(audioSender.server); - audio.onMSection?.(audioSection); - await rtspClient.setup({ type: 'tcp', port: channel, @@ -205,6 +191,9 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF } } } + else { + audioSectionDeferred.resolve(undefined); + } await rtspClient.play(); } @@ -217,16 +206,63 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF console.log('video codec/container not matched, transcoding:', rtpTracks.audio?.codecCopy); } + const reportTranscodedSections = (sdp: string) => { + const parsedSdp = parseSdp(sdp); + const videoSection = parsedSdp.msections.find(msection => msection.type === 'video'); + const audioSection = parsedSdp.msections.find(msection => msection.type === 'audio'); + + videoSectionDeferred.resolve(videoSection); + audioSectionDeferred.resolve(audioSection); + + return { videoSection, audioSection }; + } + const forwarders = await createTrackForwarders(console, rtpTracks); + const useRtp = !rtspMode; + const rtspServerDeferred = new Deferred(); + let cp: ChildProcess; // will no op if there's no tracks if (Object.keys(rtpTracks).length) { + if (useRtp) { + rtspServerDeferred.resolve(undefined); + + for (const key of Object.keys(rtpTracks)) { + const track: RtpTrack = rtpTracks[key]; + + const ip = track.ffmpegDestination || '127.0.0.1'; + const params = new URLSearchParams(); + const { port } = track.bind; + let url = `rtp://${ip}:${port}`; + if (track.rtcpPort) + params.set('rtcpport', track.rtcpPort.toString()); + if (track.packetSize) + params.set('pkt_size', track.packetSize.toString()); + if (track.srtp) { + url = `s${url}`; + track.outputArguments.push( + "-srtp_out_suite", track.srtp.crytoSuite, + "-srtp_out_params", track.srtp.key.toString('base64'), + ); + } + url = `${url}?${params}`; + + track.outputArguments.push('-dn', '-sn'); + if (key !== 'video') + track.outputArguments.push('-vn'); + if (key !== 'audio') + track.outputArguments.push('-an'); + track.outputArguments.push('-f', 'rtp'); + track.outputArguments.push(url); + } + } + const outputArguments: string[] = []; for (const key of Object.keys(rtpTracks)) { const track: RtpTrack = rtpTracks[key]; - outputArguments.push(...track.outputArguments); + outputArguments.push(...track.encoderArguments, ...track.outputArguments); } const args = [ @@ -235,9 +271,60 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF ...(videoDecoderArguments || []), ...inputArguments, ...outputArguments, - '-sdp_file', 'pipe:4', ]; + if (useRtp) { + args.push( + '-sdp_file', 'pipe:4', + ); + } + else { + // seems better to use udp for audio timing/chop. + const useUdp = rtspMode === 'udp'; + + const serverPort = await listenZeroSingleClient(); + + args.push( + '-rtsp_transport', + useUdp ? 'udp' : 'tcp', + '-f', 'rtsp', + `rtsp://127.0.0.1:${serverPort.port}` + ); + + serverPort.clientPromise.then(async (client) => { + const rtspServer = new RtspServer(client, undefined, useUdp); + rtspServer.console = console; + + await rtspServer.handleSetup(['announce']); + const { videoSection, audioSection } = reportTranscodedSections(rtspServer.sdp); + await rtspServer.handleSetup(); + + rtspServer.setupTracks[videoSection?.control]?.rtp?.on('message', rtp => { + rtpTracks.video.onRtp(rtp);; + }); + + rtspServer.setupTracks[audioSection?.control]?.rtp?.on('message', rtp => { + rtpTracks.audio.onRtp(rtp);; + }); + + rtspServerDeferred.resolve(rtspServer); + + if (rtspMode !== 'pull') { + for await (const rtspSample of rtspServer.handleRecord()) { + if (rtspSample.type === videoSection.codec) { + rtpTracks.video.onRtp(rtspSample.packet); + } + else if (rtspSample.type === audioSection?.codec) { + rtpTracks.audio.onRtp(rtspSample.packet); + } + else { + console.warn('unexpected rtsp sample', rtspSample.type); + } + } + } + }); + } + safePrintFFmpegArguments(console, args); cp = child_process.spawn(await mediaManager.getFFmpegPath(), args, { @@ -250,6 +337,13 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF } ffmpegLogInitialOutput(console, cp); cp.on('exit', () => forwarders.close()); + + if (useRtp) { + cp.stdio[4].on('data', data => { + const transcodeSdp = data.toString(); + reportTranscodedSections(transcodeSdp); + }); + } } else { console.log('bypassing ffmpeg, perfect codecs'); @@ -281,21 +375,10 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF rtspClient?.readLoop().catch(() => { }).finally(kill); }); - if (Object.keys(rtpTracks).length) { - const transcodeSdp = await new Promise((resolve, reject) => { - cp.on('exit', () => reject(new Error('ffmpeg exited before sdp was received'))); - cp.stdio[4].on('data', data => { - resolve(data.toString()); - }); - }); - const parsedSdp = parseSdp(transcodeSdp); - videoSection = parsedSdp.msections.find(msection => msection.type === 'video') || videoSection; - audioSection = parsedSdp.msections.find(msection => msection.type === 'audio') || audioSection; - } - return { - videoSection, - audioSection, + rtspServer: rtspServerDeferred.promise, + videoSection: videoSectionDeferred.promise, + audioSection: audioSectionDeferred.promise, kill, killPromise, get killed() { diff --git a/plugins/webrtc/src/webrtc-required-codecs.ts b/plugins/webrtc/src/webrtc-required-codecs.ts index eb8c539db..994d3e473 100644 --- a/plugins/webrtc/src/webrtc-required-codecs.ts +++ b/plugins/webrtc/src/webrtc-required-codecs.ts @@ -55,9 +55,7 @@ export function getAudioCodec(outputCodecParameters: RTCRtpCodecParameters) { } export function getFFmpegRtpAudioOutputArguments(inputCodec: string, outputCodecParameters: RTCRtpCodecParameters, maximumCompatibilityMode: boolean) { - const ret = [ - '-vn', '-sn', '-dn', - ]; + const ret: string[] = []; const { encoder, name } = getAudioCodec(outputCodecParameters); diff --git a/plugins/webrtc/src/wrtc-to-rtsp.ts b/plugins/webrtc/src/wrtc-to-rtsp.ts index 55b800c8f..9fe4121c5 100644 --- a/plugins/webrtc/src/wrtc-to-rtsp.ts +++ b/plugins/webrtc/src/wrtc-to-rtsp.ts @@ -78,7 +78,7 @@ export async function createRTCPeerConnectionSource(options: { const pc = await peerConnection.promise; audioTransceiver = pc.addTransceiver("audio", setup.audio as any); - audioTransceiver.mid = '0'; + // audioTransceiver.mid = '0'; audioTransceiver.onTrack.subscribe((track) => { track.onReceiveRtp.subscribe(rtp => { if (!gotAudio) { @@ -91,7 +91,7 @@ export async function createRTCPeerConnectionSource(options: { }); const videoTransceiver = pc.addTransceiver("video", setup.video as any); - videoTransceiver.mid = '1'; + // videoTransceiver.mid = '1'; videoTransceiver.onTrack.subscribe((track) => { track.onReceiveRtp.subscribe(rtp => { if (!gotVideo) { @@ -158,7 +158,7 @@ export async function createRTCPeerConnectionSource(options: { const pc = await peerConnection.promise; if (setup.datachannel) { pc.createDataChannel(setup.datachannel.label, setup.datachannel.dict); - pc.sctpTransport.mid = '2'; + // pc.sctpTransport.mid = '2'; } const gatheringPromise = pc.iceGatheringState === 'complete' ? Promise.resolve(undefined) : new Promise(resolve => pc.iceGatheringStateChange.subscribe(state => {