rebroadcast: watch rtsp parser data timeout

This commit is contained in:
Koushik Dutta
2022-02-18 18:13:16 -08:00
parent 706f6150a5
commit 035f72d4a4
2 changed files with 45 additions and 28 deletions

View File

@@ -2,12 +2,13 @@
import { MixinProvider, ScryptedDeviceType, ScryptedInterface, MediaObject, VideoCamera, MediaStreamOptions, Settings, Setting, ScryptedMimeTypes, FFMpegInput, RequestMediaStreamOptions, BufferConverter, ResponseMediaStreamOptions } from '@scrypted/sdk';
import sdk from '@scrypted/sdk';
import { once } from 'events';
import { SettingsMixinDeviceBase } from "../../../common/src/settings-mixin";
import { SettingsMixinDeviceBase } from "@scrypted/common/src/settings-mixin";
import { handleRebroadcasterClient, ParserOptions, ParserSession, startParserSession } from '@scrypted/common/src/ffmpeg-rebroadcast';
import { createMpegTsParser, createFragmentedMp4Parser, StreamChunk, StreamParser } from '@scrypted/common/src/stream-parser';
import { AutoenableMixinProvider } from '@scrypted/common/src/autoenable-mixin-provider';
import { listenZeroSingleClient } from '@scrypted/common/src/listen-cluster';
import { createRtspParser, RtspClient, RtspServer } from '../../../common/src/rtsp-server';
import { parsePayloadTypes } from '@scrypted/common/src/sdp-utils';
import { createRtspParser, RtspClient, RtspServer } from '@scrypted/common/src/rtsp-server';
import { Duplex } from 'stream';
import net from 'net';
import { readLength } from '@scrypted/common/src/read-stream';
@@ -428,7 +429,7 @@ class PrebufferSession {
const json = await mediaManager.convertMediaObjectToJSON<any>(mo, 'x-scrypted/x-rfc4571');
const { url, sdp, mediaStreamOptions } = json;
session = await startRFC4571Parser(connectRFC4571Parser(url), sdp, mediaStreamOptions);
session = await startRFC4571Parser(connectRFC4571Parser(url), sdp, mediaStreamOptions, false, rbo);
this.sdp = session.sdp.then(buffers => Buffer.concat(buffers).toString());
}
else {
@@ -447,7 +448,7 @@ class PrebufferSession {
await rtspClient.setup(0, '/audio');
await rtspClient.setup(2, '/video');
const socket = await rtspClient.play();
session = await startRFC4571Parser(socket, sdp, ffmpegInput.mediaStreamOptions, true);
session = await startRFC4571Parser(socket, sdp, ffmpegInput.mediaStreamOptions, true, rbo);
}
else {
// create missing pts from dts so mpegts and mp4 muxing does not fail
@@ -696,16 +697,16 @@ class PrebufferSession {
}
if (codecCopy) {
// reported codecs may be wrong/cached/etc, so before blindly copying the audio codec info,
// verify what was found.
if (session?.mediaStreamOptions?.audio?.codec === session?.inputAudioCodec) {
mediaStreamOptions.audio = session?.mediaStreamOptions?.audio;
}
else {
mediaStreamOptions.audio = {
codec: session?.inputAudioCodec,
}
// reported codecs may be wrong/cached/etc, so before blindly copying the audio codec info,
// verify what was found.
if (session?.mediaStreamOptions?.audio?.codec === session?.inputAudioCodec) {
mediaStreamOptions.audio = session?.mediaStreamOptions?.audio;
}
else {
mediaStreamOptions.audio = {
codec: session?.inputAudioCodec,
}
}
}
if (mediaStreamOptions.video && session.inputVideoResolution?.[2] && session.inputVideoResolution?.[3]) {
@@ -1006,18 +1007,7 @@ class PrebufferProvider extends AutoenableMixinProvider implements MixinProvider
const json = JSON.parse(data.toString());
const { url, sdp } = json;
const sdpString = sdp as string;
const audioPt = new Set<number>();
const videoPt = new Set<number>();
const addPts = (set: Set<number>, pts: string[]) => {
for (const pt of pts || []) {
set.add(parseInt(pt));
}
};
const audioPts = sdpString.match(/m=audio.*/)?.[0];
addPts(audioPt, audioPts?.split(' ').slice(3));
const videoPts = (sdp as string).match(/m=video.*/)?.[0];
addPts(videoPt, videoPts?.split(' ').slice(3));
const { audioPayloadTypes, videoPayloadTypes } = parsePayloadTypes(sdp);
const u = new URL(url);
if (!u.protocol.startsWith('tcp'))
@@ -1050,10 +1040,10 @@ class PrebufferProvider extends AutoenableMixinProvider implements MixinProvider
const length = header.readInt16BE(0);
const data = await readLength(socket, length);
const pt = data[1] & 0x7f;
if (audioPt.has(pt)) {
if (audioPayloadTypes.has(pt)) {
rtsp.sendAudio(data, false);
}
else if (videoPt.has(pt)) {
else if (videoPayloadTypes.has(pt)) {
rtsp.sendVideo(data, false);
}
else {

View File

@@ -19,7 +19,7 @@ export function connectRFC4571Parser(url: string) {
}
export async function startRFC4571Parser(socket: net.Socket, sdp: string, mediaStreamOptions: MediaStreamOptions, hasRstpPrefix?: boolean): Promise<ParserSession<"rtsp">> {
export async function startRFC4571Parser(socket: net.Socket, sdp: string, mediaStreamOptions: MediaStreamOptions, hasRstpPrefix?: boolean, options?: ParserOptions<"rtsp">): Promise<ParserSession<"rtsp">> {
let isActive = true;
const events = new EventEmitter();
@@ -38,7 +38,33 @@ export async function startRFC4571Parser(socket: net.Socket, sdp: string, mediaS
socket.on('close', kill);
socket.on('error', kill);
const setupActivityTimer = (container: string) => {
let dataTimeout: NodeJS.Timeout;
function dataKill() {
console.error('timeout waiting for data, killing parser session', container);
kill();
}
function resetActivityTimer() {
if (!options.timeout)
return;
clearTimeout(dataTimeout);
dataTimeout = setTimeout(dataKill, options.timeout);
}
events.once('killed', () => clearTimeout(dataTimeout));
resetActivityTimer();
return {
resetActivityTimer,
}
}
(async () => {
const { resetActivityTimer } = setupActivityTimer('rtsp');
while (true) {
let header: Buffer;
let length: number;
@@ -69,6 +95,7 @@ export async function startRFC4571Parser(socket: net.Socket, sdp: string, mediaS
chunks: [header, data],
}
events.emit('rtsp', chunk);
resetActivityTimer();
}
})()
.finally(kill);