rebroadcast: prevent buffering from buggy RTSP clients like frigate from causing memory leaks in scrypted

This commit is contained in:
Koushik Dutta
2025-10-01 09:16:30 -07:00
parent e703efc1aa
commit 8099df4a2a
4 changed files with 15 additions and 267 deletions

View File

@@ -2,7 +2,6 @@ import { Socket as DatagramSocket } from "dgram";
import { once } from "events";
import { Duplex } from "stream";
import { FFMPEG_FRAGMENTED_MP4_OUTPUT_ARGS, MP4Atom, parseFragmentedMP4 } from "./ffmpeg-mp4-parser-session";
import { readLength } from "./read-stream";
export interface StreamParser {
container: string;
@@ -25,59 +24,11 @@ export interface StreamParserOptions {
export interface StreamChunk {
startStream?: Buffer;
chunks: Buffer[];
type?: string;
type: string;
width?: number;
height?: number;
}
// function checkTsPacket(pkt: Buffer) {
// const pid = ((pkt[1] & 0x1F) << 8) | pkt[2];
// if (pid == 256) {
// // found video stream
// if ((pkt[3] & 0x20) && (pkt[4] > 0)) {
// // have AF
// if (pkt[5] & 0x40) {
// // found keyframe
// console.log('keyframe');
// }
// }
// }
// }
function createLengthParser(length: number, verify?: (concat: Buffer) => void) {
async function* parse(socket: Duplex): AsyncGenerator<StreamChunk> {
let pending: Buffer[] = [];
let pendingSize = 0;
while (true) {
const data: Buffer = socket.read();
if (!data) {
await once(socket, 'readable');
continue;
}
pending.push(data);
pendingSize += data.length;
if (pendingSize < length)
continue;
const concat = Buffer.concat(pending);
verify?.(concat);
const remaining = concat.length % length;
const left = concat.slice(0, concat.length - remaining);
const right = concat.slice(concat.length - remaining);
pending = [right];
pendingSize = right.length;
yield {
chunks: [left],
};
}
}
return parse;
}
export function createDgramParser() {
async function* parse(socket: DatagramSocket, width: number, height: number, type: string) {
while (true) {
@@ -91,65 +42,6 @@ export function createDgramParser() {
return parse;
}
export function createMpegTsParser(options?: StreamParserOptions): StreamParser {
return {
container: 'mpegts',
outputArguments: [
...(options?.vcodec || []),
...(options?.acodec || []),
'-f', 'mpegts',
],
parse: createLengthParser(188, concat => {
if (concat[0] != 0x47) {
throw new Error('Invalid sync byte in mpeg-ts packet. Terminating stream.')
}
}),
findSyncFrame(streamChunks): StreamChunk[] {
for (let prebufferIndex = 0; prebufferIndex < streamChunks.length; prebufferIndex++) {
const streamChunk = streamChunks[prebufferIndex];
for (let chunkIndex = 0; chunkIndex < streamChunk.chunks.length; chunkIndex++) {
const chunk = streamChunk.chunks[chunkIndex];
let offset = 0;
while (offset + 188 < chunk.length) {
const pkt = chunk.subarray(offset, offset + 188);
const pid = ((pkt[1] & 0x1F) << 8) | pkt[2];
if (pid == 256) {
// found video stream
if ((pkt[3] & 0x20) && (pkt[4] > 0)) {
// have AF
if (pkt[5] & 0x40) {
// we found the sync frame, but also need to send the pat and pmt
// which might be at the start of this chunk before the keyframe.
// yolo!
return streamChunks.slice(prebufferIndex);
// const chunks = streamChunk.chunks.slice(chunkIndex + 1);
// const take = chunk.subarray(offset);
// chunks.unshift(take);
// const remainingChunks = streamChunks.slice(prebufferIndex + 1);
// const ret = Object.assign({}, streamChunk);
// ret.chunks = chunks;
// return [
// ret,
// ...remainingChunks
// ];
}
}
}
offset += 188;
}
}
}
return findSyncFrame(streamChunks);
}
}
}
export async function* parseMp4StreamChunks(parser: AsyncGenerator<MP4Atom>) {
let ftyp: MP4Atom;
let moov: MP4Atom;
@@ -213,54 +105,3 @@ export const PIXEL_FORMAT_RGB24: RawVideoPixelFormat = {
name: 'rgb24',
computeLength: (width, height) => width * height * 3,
}
export function createRawVideoParser(options: RawVideoParserOptions): StreamParser {
const pixelFormat = options?.pixelFormat || PIXEL_FORMAT_YUV420P;
let filter: string;
const { size, everyNFrames } = options;
if (size) {
filter = `scale=${size.width}:${size.height}`;
}
if (everyNFrames && everyNFrames > 1) {
if (filter)
filter += ',';
else
filter = '';
filter = filter + `select=not(mod(n\\,${everyNFrames}))`
}
const inputArguments: string[] = [];
if (options.size)
inputArguments.push('-s', `${options.size.width}x${options.size.height}`);
inputArguments.push('-pix_fmt', pixelFormat.name);
return {
inputArguments,
container: 'rawvideo',
outputArguments: [
'-s', `${options.size.width}x${options.size.height}`,
'-an',
'-vcodec', 'rawvideo',
'-pix_fmt', pixelFormat.name,
'-f', 'rawvideo',
],
async *parse(socket: Duplex, width: number, height: number): AsyncGenerator<StreamChunk> {
width = size?.width || width;
height = size?.height || height
if (!width || !height)
throw new Error("error parsing rawvideo, unknown width and height");
const toRead = pixelFormat.computeLength(width, height);
while (true) {
const buffer = await readLength(socket, toRead);
yield {
chunks: [buffer],
width,
height,
}
}
},
findSyncFrame,
}
}

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/prebuffer-mixin",
"version": "0.10.60",
"version": "0.10.61",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/prebuffer-mixin",
"version": "0.10.60",
"version": "0.10.61",
"license": "Apache-2.0",
"dependencies": {
"@scrypted/common": "file:../../common",

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/prebuffer-mixin",
"version": "0.10.60",
"version": "0.10.61",
"description": "Video Stream Rebroadcast, Prebuffer, and Management Plugin for Scrypted.",
"author": "Scrypted",
"license": "Apache-2.0",

View File

@@ -1,12 +1,12 @@
import { AutoenableMixinProvider } from '@scrypted/common/src/autoenable-mixin-provider';
import { ListenZeroSingleClientTimeoutError, closeQuiet, listenZeroSingleClient } from '@scrypted/common/src/listen-cluster';
import { readLength } from '@scrypted/common/src/read-stream';
import { H264_NAL_TYPE_FU_B, H264_NAL_TYPE_IDR, H264_NAL_TYPE_MTAP16, H264_NAL_TYPE_MTAP32, H264_NAL_TYPE_RESERVED0, H264_NAL_TYPE_RESERVED30, H264_NAL_TYPE_RESERVED31, H264_NAL_TYPE_SEI, H264_NAL_TYPE_SPS, H264_NAL_TYPE_STAP_B, RtspServer, RtspTrack, createRtspParser, findH264NaluType, getStartedH264NaluTypes, listenSingleRtspClient } from '@scrypted/common/src/rtsp-server';
import { H264_NAL_TYPE_IDR, H264_NAL_TYPE_SPS, RtspServer, RtspTrack, createRtspParser, findH264NaluType, listenSingleRtspClient } from '@scrypted/common/src/rtsp-server';
import { addTrackControls, getSpsPps, parseSdp } from '@scrypted/common/src/sdp-utils';
import { SettingsMixinDeviceBase, SettingsMixinDeviceOptions } from "@scrypted/common/src/settings-mixin";
import { sleep } from '@scrypted/common/src/sleep';
import { StreamChunk, StreamParser } from '@scrypted/common/src/stream-parser';
import sdk, { BufferConverter, ChargeState, EventListenerRegister, FFmpegInput, ForkWorker, H264Info, MediaObject, MediaStreamDestination, MediaStreamOptions, MixinProvider, RequestMediaStreamOptions, ResponseMediaStreamOptions, ScryptedDevice, ScryptedDeviceType, ScryptedInterface, ScryptedMimeTypes, Setting, SettingValue, Settings, VideoCamera, VideoCameraConfiguration, WritableDeviceState } from '@scrypted/sdk';
import sdk, { BufferConverter, ChargeState, EventListenerRegister, FFmpegInput, ForkWorker, MediaObject, MediaStreamDestination, MediaStreamOptions, MixinProvider, RequestMediaStreamOptions, ResponseMediaStreamOptions, ScryptedDevice, ScryptedDeviceType, ScryptedInterface, ScryptedMimeTypes, Setting, SettingValue, Settings, VideoCamera, VideoCameraConfiguration, WritableDeviceState } from '@scrypted/sdk';
import { StorageSettings } from '@scrypted/sdk/storage-settings';
import crypto from 'crypto';
import { once } from 'events';
@@ -38,18 +38,6 @@ interface PrebufferStreamChunk extends StreamChunk {
time?: number;
}
function hasOddities(h264Info: H264Info) {
const h264Oddities = h264Info.fuab
|| h264Info.mtap16
|| h264Info.mtap32
|| h264Info.sei
|| h264Info.stapb
|| h264Info.reserved0
|| h264Info.reserved30
|| h264Info.reserved31;
return h264Oddities;
}
type PrebufferParsers = 'rtsp';
class PrebufferSession {
@@ -72,7 +60,6 @@ class PrebufferSession {
ffmpegInputArgumentsKey: string;
ffmpegOutputArgumentsKey: string;
lastDetectedAudioCodecKey: string;
lastH264ProbeKey: string;
rtspParserKey: string;
rtspServerPath: string;
rtspServerMutedPath: string;
@@ -88,7 +75,6 @@ class PrebufferSession {
this.ffmpegInputArgumentsKey = 'ffmpegInputArguments-' + this.streamId;
this.ffmpegOutputArgumentsKey = 'ffmpegOutputArguments-' + this.streamId;
this.lastDetectedAudioCodecKey = 'lastDetectedAudioCodec-' + this.streamId;
this.lastH264ProbeKey = 'lastH264Probe-' + this.streamId;
this.rtspParserKey = 'rtspParser-' + this.streamId;
const rtspServerPathKey = 'rtspServerPathKey-' + this.streamId;
const rtspServerMutedPathKey = 'rtspServerMutedPathKey-' + this.streamId;
@@ -112,24 +98,6 @@ class PrebufferSession {
return !this.enabled || this.shouldDisableBatteryPrebuffer();
}
getLastH264Probe(): H264Info {
const str = this.storage.getItem(this.lastH264ProbeKey);
if (!str) {
return {};
}
try {
return JSON.parse(str);
}
catch (e) {
return {};
}
}
getLastH264Oddities() {
return hasOddities(this.getLastH264Probe());
}
getDetectedIdrInterval() {
const durations: number[] = [];
if (this.rtspPrebuffer.length) {
@@ -403,20 +371,6 @@ class PrebufferSession {
addFFmpegInputSettings();
}
const addOddities = () => {
settings.push(
{
key: 'detectedOddities',
group,
subgroup,
title: 'Detected H264 Oddities',
readonly: true,
value: JSON.stringify(this.getLastH264Probe()),
description: 'Cameras with oddities in the H264 video stream may not function correctly with Scrypted RTSP Parsers or Senders.',
}
)
};
if (session) {
const codecInfo = await this.parseCodecs();
const resolution = codecInfo.inputVideoResolution?.width && codecInfo.inputVideoResolution?.height
@@ -453,7 +407,6 @@ class PrebufferSession {
value: (idrInterval || 0) / 1000 || 'unknown',
},
);
addOddities();
}
else {
settings.push(
@@ -467,7 +420,6 @@ class PrebufferSession {
readonly: true,
},
);
addOddities();
}
settings.push({
@@ -544,8 +496,6 @@ class PrebufferSession {
this.storage.removeItem(this.lastDetectedAudioCodecKey);
this.usingScryptedParser = false;
const h264Oddities = this.getLastH264Oddities();
if (isRfc4571) {
this.usingScryptedParser = true;
this.console.log('bypassing ffmpeg: using scrypted rfc4571 parser')
@@ -635,47 +585,6 @@ class PrebufferSession {
console.error('rebroadcast error', e)
});
if (this.usingScryptedParser && !isRfc4571) {
// watch the stream for 10 seconds to see if an weird nalu is encountered.
// if one is found and using scrypted parser as default, will need to restart rebroadcast to prevent
// downstream issues.
const h264Probe: H264Info = {};
let reportedOddity = false;
const oddityProbe = (chunk: StreamChunk) => {
if (chunk.type !== 'h264')
return;
const types = getStartedH264NaluTypes(chunk);
h264Probe.fuab ||= types.has(H264_NAL_TYPE_FU_B);
h264Probe.stapb ||= types.has(H264_NAL_TYPE_STAP_B);
h264Probe.mtap16 ||= types.has(H264_NAL_TYPE_MTAP16);
h264Probe.mtap32 ||= types.has(H264_NAL_TYPE_MTAP32);
h264Probe.sei ||= types.has(H264_NAL_TYPE_SEI);
h264Probe.reserved0 ||= types.has(H264_NAL_TYPE_RESERVED0);
h264Probe.reserved30 ||= types.has(H264_NAL_TYPE_RESERVED30);
h264Probe.reserved31 ||= types.has(H264_NAL_TYPE_RESERVED31);
const oddity = hasOddities(h264Probe);
if (oddity && !reportedOddity) {
reportedOddity = true;
let { isDefault } = this.getParser(sessionMso);
this.console.warn('H264 oddity detected.');
if (!isDefault) {
this.console.warn('If there are issues streaming, consider using the Default parser.');
return;
}
// this.console.warn('Oddity in non prebuffered stream. Next restart will use FFmpeg instead.');
}
}
const removeOddityProbe = () => session.removeListener('rtsp', oddityProbe);
session.killed.finally(() => clearTimeout(oddityTimeout));
session.on('rtsp', oddityProbe);
const oddityTimeout = setTimeout(() => {
removeOddityProbe();
this.storage.setItem(this.lastH264ProbeKey, JSON.stringify(h264Probe));
}, h264Oddities ? 60000 : 10000);
}
await session.sdp;
this.parserSession = session;
session.killed.finally(() => {
@@ -927,8 +836,7 @@ class PrebufferSession {
// if starting on a sync frame, ffmpeg will skip the first segment while initializing
// on live sources like rtsp. the buffer before the sync frame stream will be enough
// for ffmpeg to analyze and start up in time for the sync frame.
// If h264 oddities are detected, assume ffmpeg will be used.
if (!options.findSyncFrame || this.getLastH264Oddities()) {
if (!options.findSyncFrame) {
for (const chunk of prebufferContainer) {
if (chunk.time < now - requestedPrebuffer)
continue;
@@ -978,10 +886,6 @@ class PrebufferSession {
const codecInfo = await this.parseCodecs(true);
const mediaStreamOptions: ResponseMediaStreamOptions = session.negotiateMediaStream(options, codecInfo.inputVideoCodec, codecInfo.inputAudioCodec);
let sdp = await this.sdp;
if (!mediaStreamOptions.video?.h264Info && this.usingScryptedParser) {
mediaStreamOptions.video ||= {};
mediaStreamOptions.video.h264Info = this.getLastH264Probe();
}
if (this.mixin.streamSettings.storageSettings.values.noAudio)
mediaStreamOptions.audio = null;
@@ -1025,6 +929,7 @@ class PrebufferSession {
header.writeUInt8(channel, 1);
chunks[0] = header;
chunk = {
type: chunk.type,
startStream: chunk.startStream,
chunks,
}
@@ -1264,8 +1169,14 @@ class PrebufferMixin extends SettingsMixinDeviceBase<VideoCamera> implements Vid
requestedPrebuffer,
filter: (chunk, prebuffer) => {
const track = map.get(chunk.type);
if (track)
if (track) {
server.sendTrack(track, chunk.chunks[1], false);
const buffered = server.client.writableLength;
if (buffered > 100000000) {
this.console.log('more than 100MB has been buffered to RTSP Client, did downstream die? killing connection.');
client.destroy();
}
}
return undefined;
}
});
@@ -1542,10 +1453,6 @@ class PrebufferMixin extends SettingsMixinDeviceBase<VideoCamera> implements Vid
const session = this.sessions.get(mso.id);
if (session?.parserSession || enabledStreams.includes(mso))
mso.prebuffer = prebufferDurationMs;
if (session && !mso.video?.h264Info) {
mso.video ||= {};
mso.video.h264Info = session.getLastH264Probe();
}
if (!mso.destinations) {
mso.destinations = [];
for (const [k, v] of map.entries()) {