Revert "rebroadcast: parser perf refactor"

This reverts commit f677cf7393.
This commit is contained in:
Koushik Dutta
2024-05-30 09:27:27 -07:00
parent 1e2fd46cd3
commit 7071808514
5 changed files with 69 additions and 106 deletions

View File

@@ -92,13 +92,7 @@ export const H264_NAL_TYPE_MTAP32 = 27;
export function findH264NaluType(streamChunk: StreamChunk, naluType: number) {
if (streamChunk.type !== 'h264')
return;
const { chunks } = streamChunk;
for (let i = 1; i < chunks.length; i += 2) {
const chunk = chunks[i];
const r = findH264NaluTypeInNalu(chunk.subarray(12), naluType);
if (r)
return r;
}
return findH264NaluTypeInNalu(streamChunk.chunks[streamChunk.chunks.length - 1].subarray(12), naluType);
}
export function findH264NaluTypeInNalu(nalu: Buffer, naluType: number) {
@@ -130,15 +124,7 @@ export function findH264NaluTypeInNalu(nalu: Buffer, naluType: number) {
export function getNaluTypes(streamChunk: StreamChunk) {
if (streamChunk.type !== 'h264')
return new Set<number>();
const sets: Set<number>[] = [];
const { chunks } = streamChunk;
for (let i = 1; i < chunks.length; i += 2) {
const chunk = chunks[i];
const r = getNaluTypesInNalu(chunk.subarray(12));
sets.push(r);
}
return new Set(sets.map(s => [...s]).flat());
return getNaluTypesInNalu(streamChunk.chunks[streamChunk.chunks.length - 1].subarray(12))
}
export function getNaluFragmentInformation(nalu: Buffer) {
@@ -315,7 +301,7 @@ const quote = (str: string): string => `"${str.replace(/"/g, '\\"')}"`;
export interface RtspClientSetupOptions {
type: 'tcp' | 'udp';
path?: string;
onRtp: (...headerBuffers: [Buffer, Buffer][]) => void;
onRtp: (rtspHeader: Buffer, rtp: Buffer) => void;
}
export interface RtspClientTcpSetupOptions extends RtspClientSetupOptions {
@@ -426,7 +412,7 @@ export class RtspClient extends RtspBase {
const data = await readLength(this.client, length);
const options = this.setupOptions.get(channel);
options?.onRtp?.([header, data]);
options?.onRtp?.(header, data);
}
async readDataPayload() {
@@ -438,26 +424,32 @@ export class RtspClient extends RtspBase {
return new Error('RTSP Client received invalid frame magic. This may be a bug in your camera firmware. If this error persists, switch your RTSP Parser to FFmpeg or Scrypted (UDP): ' + header.toString());
}
async readLoopLegacy() {
try {
while (true) {
if (this.needKeepAlive) {
this.needKeepAlive = false;
if (this.hasGetParameter)
await this.getParameter();
else
await this.options();
}
await this.readDataPayload();
}
}
catch (e) {
this.client.destroy(e);
throw e;
}
}
async readLoop() {
const deferred = new Deferred<void>();
let headerBuffers: [Buffer, Buffer][] = [];
let header: Buffer;
let channel: number;
let length: number;
const flush = (newChannel?: number) => {
const c = channel;
channel = newChannel;
const channelChange = newChannel !== c;
if (!channelChange || !headerBuffers.length)
return;
const hb = headerBuffers;
headerBuffers = [];
const options = this.setupOptions.get(c);
options?.onRtp?.(...hb);
}
const read = async () => {
if (this.needKeepAlive) {
this.needKeepAlive = false;
@@ -473,19 +465,14 @@ export class RtspClient extends RtspBase {
if (!header) {
header = this.client.read(4);
if (!header) {
// flush if waiting for a header.
flush();
if (!header)
return;
}
// validate header once.
if (header[0] !== RTSP_FRAME_MAGIC) {
if (header.toString() !== 'RTSP')
throw this.createBadHeader(header);
flush();
this.client.unshift(header);
header = undefined;
@@ -502,23 +489,18 @@ export class RtspClient extends RtspBase {
continue;
}
const newChannel = header.readUInt8(1);
flush(newChannel);
channel = header.readUInt8(1);
length = header.readUInt16BE(2);
}
const currentChannel = channel;
const data = this.client.read(length);
if (!data) {
// flush if waiting for data, but restore the channel.
flush();
channel = currentChannel;
if (!data)
return;
}
const h = header;
header = undefined;
headerBuffers.push([h, data]);
const options = this.setupOptions.get(channel);
options?.onRtp?.(h, data);
}
}
catch (e) {
@@ -692,7 +674,7 @@ export class RtspClient extends RtspBase {
this.client.on('close', () => closeQuiet(udp.server));
}
port = options.dgram.address().port;
options.dgram.on('message', data => options.onRtp([undefined, data]));
options.dgram.on('message', data => options.onRtp(undefined, data));
}
headers = Object.assign({
Transport: `RTP/AVP${protocol};unicast;${client}=${port}-${port + 1}`,

View File

@@ -108,9 +108,8 @@ export function createMpegTsParser(options?: StreamParserOptions): StreamParser
for (let prebufferIndex = 0; prebufferIndex < streamChunks.length; prebufferIndex++) {
const streamChunk = streamChunks[prebufferIndex];
const { chunks } = streamChunk;
for (let chunkIndex = 0; chunkIndex < chunks.length; chunkIndex++) {
const chunk = chunks[chunkIndex];
for (let chunkIndex = 0; chunkIndex < streamChunk.chunks.length; chunkIndex++) {
const chunk = streamChunk.chunks[chunkIndex];
let offset = 0;
while (offset + 188 < chunk.length) {

View File

@@ -676,26 +676,26 @@ class PrebufferSession {
session.killed.finally(() => clearTimeout(refreshTimeout));
}
let shifts = 0;
let prebufferContainer: PrebufferStreamChunk[] = this.rtspPrebuffer;
let shifts = 0;
let prebufferContainer: PrebufferStreamChunk[] = this.rtspPrebuffer;
session.on('rtsp', (chunk: PrebufferStreamChunk) => {
const now = Date.now();
session.on('rtsp', (chunk: PrebufferStreamChunk) => {
const now = Date.now();
chunk.time = now;
prebufferContainer.push(chunk);
chunk.time = now;
prebufferContainer.push(chunk);
while (prebufferContainer.length && prebufferContainer[0].time < now - prebufferDurationMs) {
prebufferContainer.shift();
shifts++;
}
while (prebufferContainer.length && prebufferContainer[0].time < now - prebufferDurationMs) {
prebufferContainer.shift();
shifts++;
}
if (shifts > 100000) {
prebufferContainer = prebufferContainer.slice();
this.rtspPrebuffer = prebufferContainer;
shifts = 0;
}
});
if (shifts > 100000) {
prebufferContainer = prebufferContainer.slice();
this.rtspPrebuffer = prebufferContainer;
shifts = 0;
}
});
session.start();
return session;
@@ -946,22 +946,15 @@ class PrebufferSession {
if (!interleavePassthrough) {
if (channel == undefined) {
const udp = serverPortMap.get(chunk.type);
if (udp) {
const { chunks } = chunk;
for (let i = 1; i < chunks.length; i += 2) {
const c = chunks[i];
server.sendTrack(udp.control, c, chunk.type.startsWith('rtcp-'));
}
}
if (udp)
server.sendTrack(udp.control, chunk.chunks[1], chunk.type.startsWith('rtcp-'));
return;
}
const chunks = chunk.chunks.slice();
for (let i = 0; i < chunks.length; i += 2) {
const header = Buffer.from(chunks[0]);
header.writeUInt8(channel, 1);
chunks[i] = header;
}
const header = Buffer.from(chunks[0]);
header.writeUInt8(channel, 1);
chunks[0] = header;
chunk = {
startStream: chunk.startStream,
chunks,
@@ -972,12 +965,7 @@ class PrebufferSession {
}
if (server.writeStream) {
const { chunks } = chunk;
for (let i = 0; i < chunks.length; i += 2) {
const header = chunks[i];
const rtp = chunks[i + 1];
server.writeRtpPayload(header, rtp);
}
server.writeRtpPayload(chunk.chunks[0], chunk.chunks[1]);
return;
}
@@ -1184,13 +1172,8 @@ class PrebufferMixin extends SettingsMixinDeviceBase<VideoCamera> implements Vid
requestedPrebuffer,
filter: (chunk, prebuffer) => {
const track = map.get(chunk.type);
if (track) {
const { chunks } = chunk;
for (let i = 1; i < chunks.length; i += 2) {
const c = chunks[i];
server.sendTrack(track, c, false);
}
}
if (track)
server.sendTrack(track, chunk.chunks[1], false);
return undefined;
}
});

View File

@@ -1,13 +1,14 @@
import { cloneDeep } from "@scrypted/common/src/clone-deep";
import { ParserOptions, ParserSession, setupActivityTimer } from "@scrypted/common/src/ffmpeg-rebroadcast";
import { read16BELengthLoop } from "@scrypted/common/src/read-stream";
import { H264_NAL_TYPE_SPS, RTSP_FRAME_MAGIC, findH264NaluType } from "@scrypted/common/src/rtsp-server";
import { findH264NaluType, H264_NAL_TYPE_SPS, RTSP_FRAME_MAGIC } from "@scrypted/common/src/rtsp-server";
import { parseSdp } from "@scrypted/common/src/sdp-utils";
import { sleep } from "@scrypted/common/src/sleep";
import { StreamChunk } from "@scrypted/common/src/stream-parser";
import { MediaStreamOptions, ResponseMediaStreamOptions } from "@scrypted/sdk";
import { parse as spsParse } from "h264-sps-parser";
import net from 'net';
import { EventEmitter, Readable } from "stream";
import { ParserSession, setupActivityTimer } from "./ffmpeg-rebroadcast";
import { getSpsResolution } from "./sps-resolution";
export function negotiateMediaStream(sdp: string, mediaStreamOptions: MediaStreamOptions, inputVideoCodec: string, inputAudioCodec: string, requestMediaStream: MediaStreamOptions) {

View File

@@ -1,12 +1,12 @@
import { closeQuiet } from "@scrypted/common/src/listen-cluster";
import { H264_NAL_TYPE_SPS, RTSP_FRAME_MAGIC, RtspClient, RtspClientUdpSetupOptions, findH264NaluType, parseSemicolonDelimited } from "@scrypted/common/src/rtsp-server";
import { ParserSession, setupActivityTimer } from "@scrypted/common/src/ffmpeg-rebroadcast";
import { closeQuiet, createBindZero } from "@scrypted/common/src/listen-cluster";
import { findH264NaluType, H264_NAL_TYPE_SPS, parseSemicolonDelimited, RtspClient, RtspClientUdpSetupOptions, RTSP_FRAME_MAGIC } from "@scrypted/common/src/rtsp-server";
import { parseSdp } from "@scrypted/common/src/sdp-utils";
import { StreamChunk } from "@scrypted/common/src/stream-parser";
import { ResponseMediaStreamOptions } from "@scrypted/sdk";
import dgram from 'dgram';
import { parse as spsParse } from "h264-sps-parser";
import { EventEmitter } from "stream";
import { ParserSession, setupActivityTimer } from "./ffmpeg-rebroadcast";
import { negotiateMediaStream } from "./rfc4571";
import { getSpsResolution } from "./sps-resolution";
@@ -95,15 +95,13 @@ export async function startRtspSession(console: Console, url: string, mediaStrea
const setup: RtspClientUdpSetupOptions = {
path: control,
type: 'udp',
onRtp: (...headerBuffers) => {
onRtp: (header, data) => {
const prefix = Buffer.alloc(4);
prefix.writeUInt8(RTSP_FRAME_MAGIC, 0);
prefix.writeUInt8(rtspChannel, 1);
prefix.writeUInt16BE(data.length, 2);
const chunk: StreamChunk = {
chunks: headerBuffers.map(headerBuffer => headerBuffer[1]).map(data => {
const prefix = Buffer.alloc(4);
prefix.writeUInt8(RTSP_FRAME_MAGIC, 0);
prefix.writeUInt8(rtspChannel, 1);
prefix.writeUInt16BE(data.length, 2);
return [prefix, data];
}).flat(),
chunks: [prefix, data],
type: codec,
};
events.emit('rtsp', chunk);
@@ -131,9 +129,9 @@ export async function startRtspSession(console: Console, url: string, mediaStrea
path: control,
type: 'tcp',
port: channel,
onRtp: (...headerBuffers) => {
onRtp: (header, data) => {
const chunk: StreamChunk = {
chunks: headerBuffers.flat(),
chunks: [header, data],
type: codec,
};
events.emit('rtsp', chunk);