From 035f72d4a496da09e97d652a678545c9e534bb5c Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Fri, 18 Feb 2022 18:13:16 -0800 Subject: [PATCH] rebroadcast: watch rtsp parser data timeout --- plugins/prebuffer-mixin/src/main.ts | 44 ++++++++++---------------- plugins/prebuffer-mixin/src/rfc4571.ts | 29 ++++++++++++++++- 2 files changed, 45 insertions(+), 28 deletions(-) diff --git a/plugins/prebuffer-mixin/src/main.ts b/plugins/prebuffer-mixin/src/main.ts index fdfd78044..638fe589c 100644 --- a/plugins/prebuffer-mixin/src/main.ts +++ b/plugins/prebuffer-mixin/src/main.ts @@ -2,12 +2,13 @@ import { MixinProvider, ScryptedDeviceType, ScryptedInterface, MediaObject, VideoCamera, MediaStreamOptions, Settings, Setting, ScryptedMimeTypes, FFMpegInput, RequestMediaStreamOptions, BufferConverter, ResponseMediaStreamOptions } from '@scrypted/sdk'; import sdk from '@scrypted/sdk'; import { once } from 'events'; -import { SettingsMixinDeviceBase } from "../../../common/src/settings-mixin"; +import { SettingsMixinDeviceBase } from "@scrypted/common/src/settings-mixin"; import { handleRebroadcasterClient, ParserOptions, ParserSession, startParserSession } from '@scrypted/common/src/ffmpeg-rebroadcast'; import { createMpegTsParser, createFragmentedMp4Parser, StreamChunk, StreamParser } from '@scrypted/common/src/stream-parser'; import { AutoenableMixinProvider } from '@scrypted/common/src/autoenable-mixin-provider'; import { listenZeroSingleClient } from '@scrypted/common/src/listen-cluster'; -import { createRtspParser, RtspClient, RtspServer } from '../../../common/src/rtsp-server'; +import { parsePayloadTypes } from '@scrypted/common/src/sdp-utils'; +import { createRtspParser, RtspClient, RtspServer } from '@scrypted/common/src/rtsp-server'; import { Duplex } from 'stream'; import net from 'net'; import { readLength } from '@scrypted/common/src/read-stream'; @@ -428,7 +429,7 @@ class PrebufferSession { const json = await mediaManager.convertMediaObjectToJSON(mo, 'x-scrypted/x-rfc4571'); const { url, sdp, mediaStreamOptions } = json; - session = await startRFC4571Parser(connectRFC4571Parser(url), sdp, mediaStreamOptions); + session = await startRFC4571Parser(connectRFC4571Parser(url), sdp, mediaStreamOptions, false, rbo); this.sdp = session.sdp.then(buffers => Buffer.concat(buffers).toString()); } else { @@ -447,7 +448,7 @@ class PrebufferSession { await rtspClient.setup(0, '/audio'); await rtspClient.setup(2, '/video'); const socket = await rtspClient.play(); - session = await startRFC4571Parser(socket, sdp, ffmpegInput.mediaStreamOptions, true); + session = await startRFC4571Parser(socket, sdp, ffmpegInput.mediaStreamOptions, true, rbo); } else { // create missing pts from dts so mpegts and mp4 muxing does not fail @@ -696,16 +697,16 @@ class PrebufferSession { } if (codecCopy) { - // reported codecs may be wrong/cached/etc, so before blindly copying the audio codec info, - // verify what was found. - if (session?.mediaStreamOptions?.audio?.codec === session?.inputAudioCodec) { - mediaStreamOptions.audio = session?.mediaStreamOptions?.audio; - } - else { - mediaStreamOptions.audio = { - codec: session?.inputAudioCodec, - } + // reported codecs may be wrong/cached/etc, so before blindly copying the audio codec info, + // verify what was found. + if (session?.mediaStreamOptions?.audio?.codec === session?.inputAudioCodec) { + mediaStreamOptions.audio = session?.mediaStreamOptions?.audio; + } + else { + mediaStreamOptions.audio = { + codec: session?.inputAudioCodec, } + } } if (mediaStreamOptions.video && session.inputVideoResolution?.[2] && session.inputVideoResolution?.[3]) { @@ -1006,18 +1007,7 @@ class PrebufferProvider extends AutoenableMixinProvider implements MixinProvider const json = JSON.parse(data.toString()); const { url, sdp } = json; - const sdpString = sdp as string; - const audioPt = new Set(); - const videoPt = new Set(); - const addPts = (set: Set, pts: string[]) => { - for (const pt of pts || []) { - set.add(parseInt(pt)); - } - }; - const audioPts = sdpString.match(/m=audio.*/)?.[0]; - addPts(audioPt, audioPts?.split(' ').slice(3)); - const videoPts = (sdp as string).match(/m=video.*/)?.[0]; - addPts(videoPt, videoPts?.split(' ').slice(3)); + const { audioPayloadTypes, videoPayloadTypes } = parsePayloadTypes(sdp); const u = new URL(url); if (!u.protocol.startsWith('tcp')) @@ -1050,10 +1040,10 @@ class PrebufferProvider extends AutoenableMixinProvider implements MixinProvider const length = header.readInt16BE(0); const data = await readLength(socket, length); const pt = data[1] & 0x7f; - if (audioPt.has(pt)) { + if (audioPayloadTypes.has(pt)) { rtsp.sendAudio(data, false); } - else if (videoPt.has(pt)) { + else if (videoPayloadTypes.has(pt)) { rtsp.sendVideo(data, false); } else { diff --git a/plugins/prebuffer-mixin/src/rfc4571.ts b/plugins/prebuffer-mixin/src/rfc4571.ts index 27b9b8d4c..3f502c0da 100644 --- a/plugins/prebuffer-mixin/src/rfc4571.ts +++ b/plugins/prebuffer-mixin/src/rfc4571.ts @@ -19,7 +19,7 @@ export function connectRFC4571Parser(url: string) { } -export async function startRFC4571Parser(socket: net.Socket, sdp: string, mediaStreamOptions: MediaStreamOptions, hasRstpPrefix?: boolean): Promise> { +export async function startRFC4571Parser(socket: net.Socket, sdp: string, mediaStreamOptions: MediaStreamOptions, hasRstpPrefix?: boolean, options?: ParserOptions<"rtsp">): Promise> { let isActive = true; const events = new EventEmitter(); @@ -38,7 +38,33 @@ export async function startRFC4571Parser(socket: net.Socket, sdp: string, mediaS socket.on('close', kill); socket.on('error', kill); + const setupActivityTimer = (container: string) => { + let dataTimeout: NodeJS.Timeout; + + function dataKill() { + console.error('timeout waiting for data, killing parser session', container); + kill(); + } + + function resetActivityTimer() { + if (!options.timeout) + return; + clearTimeout(dataTimeout); + dataTimeout = setTimeout(dataKill, options.timeout); + } + + events.once('killed', () => clearTimeout(dataTimeout)); + + resetActivityTimer(); + return { + resetActivityTimer, + } + } + + (async () => { + const { resetActivityTimer } = setupActivityTimer('rtsp'); + while (true) { let header: Buffer; let length: number; @@ -69,6 +95,7 @@ export async function startRFC4571Parser(socket: net.Socket, sdp: string, mediaS chunks: [header, data], } events.emit('rtsp', chunk); + resetActivityTimer(); } })() .finally(kill);