From 8ff99eef73fa071d7c5d3900f987afdb7754c41c Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Thu, 16 Jun 2022 16:40:58 -0700 Subject: [PATCH] common: decouple werift from rtp forwarders --- plugins/webrtc/package-lock.json | 6 +-- plugins/webrtc/src/rtp-forwarders.ts | 63 +++++++++++++++++++--------- 2 files changed, 46 insertions(+), 23 deletions(-) diff --git a/plugins/webrtc/package-lock.json b/plugins/webrtc/package-lock.json index dfe03d594..94e63fe32 100644 --- a/plugins/webrtc/package-lock.json +++ b/plugins/webrtc/package-lock.json @@ -79,7 +79,7 @@ "ts-jest": "^27.1.1", "ts-node": "^10.4.0", "typedoc": "^0.22.10", - "typescript": "^4.6.4" + "typescript": "^4.5.4" }, "engines": { "node": ">=15" @@ -87,7 +87,7 @@ }, "../../sdk": { "name": "@scrypted/sdk", - "version": "0.0.196", + "version": "0.0.198", "license": "ISC", "dependencies": { "@babel/preset-typescript": "^7.16.7", @@ -195,7 +195,7 @@ "turbo-crc32": "^1.0.1", "tweetnacl": "^1.0.3", "typedoc": "^0.22.10", - "typescript": "^4.6.4", + "typescript": "^4.5.4", "uuid": "^8.3.2" } }, diff --git a/plugins/webrtc/src/rtp-forwarders.ts b/plugins/webrtc/src/rtp-forwarders.ts index 30da73c39..90466f4e6 100644 --- a/plugins/webrtc/src/rtp-forwarders.ts +++ b/plugins/webrtc/src/rtp-forwarders.ts @@ -1,8 +1,7 @@ -import { RtpPacket } from "@koush/werift"; import { closeQuiet, createBindZero } from "@scrypted/common/src/listen-cluster"; import { ffmpegLogInitialOutput, safeKillFFmpeg, safePrintFFmpegArguments } from "@scrypted/common/src/media-helpers"; import { parseHeaders, RtspClient } from "@scrypted/common/src/rtsp-server"; -import { addTrackControls, getSpsPps, parseSdp, replacePorts, replaceSectionPort } from "@scrypted/common/src/sdp-utils"; +import { addTrackControls, getSpsPps, MSection, parseSdp, replacePorts, replaceSectionPort } from "@scrypted/common/src/sdp-utils"; import sdk, { FFmpegInput } from "@scrypted/sdk"; import child_process, { ChildProcess } from 'child_process'; import dgram from 'dgram'; @@ -40,7 +39,7 @@ export interface RtpTrack { ffmpegDestination?: string; packetSize?: number; outputArguments: string[]; - onRtp(rtp: RtpPacket): void; + onRtp(rtp: Buffer): void; firstPacket?: () => void; payloadType?: number; rtcpPort?: number; @@ -93,7 +92,7 @@ export async function createTrackForwarders(console: Console, rtpTracks: RtpTrac url = `${url}?${params}`; outputArguments.push(url); - server.on('message', data => track.onRtp(RtpPacket.deSerialize(data))); + server.on('message', data => track.onRtp(data)); } return { @@ -107,23 +106,33 @@ export async function createTrackForwarders(console: Console, rtpTracks: RtpTrac } } +function isCodecCopy(desiredCodec: string, checkCodec: string) { + return desiredCodec === 'copy' + || (desiredCodec && desiredCodec === checkCodec); +} + +export type RtpForwarderProcess = Awaited>; + export async function startRtpForwarderProcess(console: Console, ffmpegInput: FFmpegInput, rtpTracks: RtpTracks) { let { inputArguments, videoDecoderArguments } = ffmpegInput; let rtspClient: RtspClient; let sockets: dgram.Socket[] = []; let pipeSdp: string; + const { video, audio } = rtpTracks; + const videoCodec = video.codecCopy; + const audioCodec = audio?.codecCopy; + + let videoSection: MSection; + let audioSection: MSection; + if (ffmpegInput.url && ffmpegInput.container?.startsWith('rtsp') - && rtpTracks.video.codecCopy - && rtpTracks.video.codecCopy === ffmpegInput.mediaStreamOptions?.video?.codec) { + && isCodecCopy(videoCodec, ffmpegInput.mediaStreamOptions?.video?.codec)) { console.log('video codec matched:', rtpTracks.video.codecCopy); - const { video, audio } = rtpTracks; delete rtpTracks.video; - const videoCodec = video.codecCopy; - const audioCodec = audio?.codecCopy; rtspClient = new RtspClient(ffmpegInput.url, console); rtspClient.requestTimeout = 10000; @@ -136,7 +145,7 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF rtpTracks = Object.assign({}, rtpTracks); - const videoSection = parsedSdp.msections.find(msection => msection.type === 'video' && msection.codec === videoCodec); + videoSection = parsedSdp.msections.find(msection => msection.type === 'video' && (msection.codec === videoCodec || videoCodec === 'copy')); // maybe fallback to udp forwarding/transcoding? if (!videoSection) throw new Error(`advertised video codec ${videoCodec} not found in sdp.`); @@ -150,20 +159,20 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF port: channel, path: videoSection.control, onRtp: (rtspHeader, rtp) => { - const repacketized = h264Repacketizer.repacketize(RtpPacket.deSerialize(rtp)); - for (const packet of repacketized) { - video.onRtp(packet); - } + video.onRtp(rtp); + // const repacketized = h264Repacketizer.repacketize(RtpPacket.deSerialize(rtp)); + // for (const packet of repacketized) { + // video.onRtp(packet); + // } }, }) channel += 2; - let audioSection = parsedSdp.msections.find(msection => msection.type === 'audio' && msection.codec === audioCodec); + audioSection = parsedSdp.msections.find(msection => msection.type === 'audio' && (msection.codec === audioCodec || audioCodec === 'copy')); if (audio) { if (audioSection - && audioCodec - && audio.codecCopy === ffmpegInput.mediaStreamOptions?.audio?.codec) { + && isCodecCopy(audioCodec, ffmpegInput.mediaStreamOptions?.audio?.codec)) { console.log('audio codec matched:', audio.codecCopy); @@ -174,7 +183,7 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF port: channel, path: audioSection.control, onRtp: (rtspHeader, rtp) => { - audio.onRtp(RtpPacket.deSerialize(rtp)); + audio.onRtp(rtp); }, }); } @@ -246,14 +255,14 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF ...(videoDecoderArguments || []), ...inputArguments, - ...outputArguments, + '-sdp_file', 'pipe:4', ]; safePrintFFmpegArguments(console, args); cp = child_process.spawn(await mediaManager.getFFmpegPath(), args, { - stdio: ['pipe', 'pipe', 'pipe', 'pipe'], + stdio: ['pipe', 'pipe', 'pipe', 'pipe', 'pipe'], }); if (pipeSdp) { const pipe = cp.stdio[3] as Writable; @@ -290,7 +299,21 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF rtspClient?.readLoop().catch(() => { }).finally(kill); }); + if (Object.keys(rtpTracks).length) { + const transcodeSdp = await new Promise((resolve, reject) => { + cp.on('exit', () => reject(new Error('ffmpeg exited before sdp was received'))); + cp.stdio[4].on('data', data => { + resolve(data.toString()); + }); + }); + const parsedSdp = parseSdp(transcodeSdp); + videoSection = parsedSdp.msections.find(msection => msection.type === 'video') || videoSection; + audioSection = parsedSdp.msections.find(msection => msection.type === 'audio') || audioSection; + } + return { + videoSection, + audioSection, kill, killPromise, get killed() {