common: decouple werift from rtp forwarders

This commit is contained in:
Koushik Dutta
2022-06-16 16:40:58 -07:00
parent 6a1850edb7
commit 8ff99eef73
2 changed files with 46 additions and 23 deletions

View File

@@ -79,7 +79,7 @@
"ts-jest": "^27.1.1",
"ts-node": "^10.4.0",
"typedoc": "^0.22.10",
"typescript": "^4.6.4"
"typescript": "^4.5.4"
},
"engines": {
"node": ">=15"
@@ -87,7 +87,7 @@
},
"../../sdk": {
"name": "@scrypted/sdk",
"version": "0.0.196",
"version": "0.0.198",
"license": "ISC",
"dependencies": {
"@babel/preset-typescript": "^7.16.7",
@@ -195,7 +195,7 @@
"turbo-crc32": "^1.0.1",
"tweetnacl": "^1.0.3",
"typedoc": "^0.22.10",
"typescript": "^4.6.4",
"typescript": "^4.5.4",
"uuid": "^8.3.2"
}
},

View File

@@ -1,8 +1,7 @@
import { RtpPacket } from "@koush/werift";
import { closeQuiet, createBindZero } from "@scrypted/common/src/listen-cluster";
import { ffmpegLogInitialOutput, safeKillFFmpeg, safePrintFFmpegArguments } from "@scrypted/common/src/media-helpers";
import { parseHeaders, RtspClient } from "@scrypted/common/src/rtsp-server";
import { addTrackControls, getSpsPps, parseSdp, replacePorts, replaceSectionPort } from "@scrypted/common/src/sdp-utils";
import { addTrackControls, getSpsPps, MSection, parseSdp, replacePorts, replaceSectionPort } from "@scrypted/common/src/sdp-utils";
import sdk, { FFmpegInput } from "@scrypted/sdk";
import child_process, { ChildProcess } from 'child_process';
import dgram from 'dgram';
@@ -40,7 +39,7 @@ export interface RtpTrack {
ffmpegDestination?: string;
packetSize?: number;
outputArguments: string[];
onRtp(rtp: RtpPacket): void;
onRtp(rtp: Buffer): void;
firstPacket?: () => void;
payloadType?: number;
rtcpPort?: number;
@@ -93,7 +92,7 @@ export async function createTrackForwarders(console: Console, rtpTracks: RtpTrac
url = `${url}?${params}`;
outputArguments.push(url);
server.on('message', data => track.onRtp(RtpPacket.deSerialize(data)));
server.on('message', data => track.onRtp(data));
}
return {
@@ -107,23 +106,33 @@ export async function createTrackForwarders(console: Console, rtpTracks: RtpTrac
}
}
function isCodecCopy(desiredCodec: string, checkCodec: string) {
return desiredCodec === 'copy'
|| (desiredCodec && desiredCodec === checkCodec);
}
export type RtpForwarderProcess = Awaited<ReturnType<typeof startRtpForwarderProcess>>;
export async function startRtpForwarderProcess(console: Console, ffmpegInput: FFmpegInput, rtpTracks: RtpTracks) {
let { inputArguments, videoDecoderArguments } = ffmpegInput;
let rtspClient: RtspClient;
let sockets: dgram.Socket[] = [];
let pipeSdp: string;
const { video, audio } = rtpTracks;
const videoCodec = video.codecCopy;
const audioCodec = audio?.codecCopy;
let videoSection: MSection;
let audioSection: MSection;
if (ffmpegInput.url &&
ffmpegInput.container?.startsWith('rtsp')
&& rtpTracks.video.codecCopy
&& rtpTracks.video.codecCopy === ffmpegInput.mediaStreamOptions?.video?.codec) {
&& isCodecCopy(videoCodec, ffmpegInput.mediaStreamOptions?.video?.codec)) {
console.log('video codec matched:', rtpTracks.video.codecCopy);
const { video, audio } = rtpTracks;
delete rtpTracks.video;
const videoCodec = video.codecCopy;
const audioCodec = audio?.codecCopy;
rtspClient = new RtspClient(ffmpegInput.url, console);
rtspClient.requestTimeout = 10000;
@@ -136,7 +145,7 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF
rtpTracks = Object.assign({}, rtpTracks);
const videoSection = parsedSdp.msections.find(msection => msection.type === 'video' && msection.codec === videoCodec);
videoSection = parsedSdp.msections.find(msection => msection.type === 'video' && (msection.codec === videoCodec || videoCodec === 'copy'));
// maybe fallback to udp forwarding/transcoding?
if (!videoSection)
throw new Error(`advertised video codec ${videoCodec} not found in sdp.`);
@@ -150,20 +159,20 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF
port: channel,
path: videoSection.control,
onRtp: (rtspHeader, rtp) => {
const repacketized = h264Repacketizer.repacketize(RtpPacket.deSerialize(rtp));
for (const packet of repacketized) {
video.onRtp(packet);
}
video.onRtp(rtp);
// const repacketized = h264Repacketizer.repacketize(RtpPacket.deSerialize(rtp));
// for (const packet of repacketized) {
// video.onRtp(packet);
// }
},
})
channel += 2;
let audioSection = parsedSdp.msections.find(msection => msection.type === 'audio' && msection.codec === audioCodec);
audioSection = parsedSdp.msections.find(msection => msection.type === 'audio' && (msection.codec === audioCodec || audioCodec === 'copy'));
if (audio) {
if (audioSection
&& audioCodec
&& audio.codecCopy === ffmpegInput.mediaStreamOptions?.audio?.codec) {
&& isCodecCopy(audioCodec, ffmpegInput.mediaStreamOptions?.audio?.codec)) {
console.log('audio codec matched:', audio.codecCopy);
@@ -174,7 +183,7 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF
port: channel,
path: audioSection.control,
onRtp: (rtspHeader, rtp) => {
audio.onRtp(RtpPacket.deSerialize(rtp));
audio.onRtp(rtp);
},
});
}
@@ -246,14 +255,14 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF
...(videoDecoderArguments || []),
...inputArguments,
...outputArguments,
'-sdp_file', 'pipe:4',
];
safePrintFFmpegArguments(console, args);
cp = child_process.spawn(await mediaManager.getFFmpegPath(), args, {
stdio: ['pipe', 'pipe', 'pipe', 'pipe'],
stdio: ['pipe', 'pipe', 'pipe', 'pipe', 'pipe'],
});
if (pipeSdp) {
const pipe = cp.stdio[3] as Writable;
@@ -290,7 +299,21 @@ export async function startRtpForwarderProcess(console: Console, ffmpegInput: FF
rtspClient?.readLoop().catch(() => { }).finally(kill);
});
if (Object.keys(rtpTracks).length) {
const transcodeSdp = await new Promise<string>((resolve, reject) => {
cp.on('exit', () => reject(new Error('ffmpeg exited before sdp was received')));
cp.stdio[4].on('data', data => {
resolve(data.toString());
});
});
const parsedSdp = parseSdp(transcodeSdp);
videoSection = parsedSdp.msections.find(msection => msection.type === 'video') || videoSection;
audioSection = parsedSdp.msections.find(msection => msection.type === 'audio') || audioSection;
}
return {
videoSection,
audioSection,
kill,
killPromise,
get killed() {