diff --git a/common/src/rtsp-server.ts b/common/src/rtsp-server.ts index 958f3a0c2..cfd7fed84 100644 --- a/common/src/rtsp-server.ts +++ b/common/src/rtsp-server.ts @@ -92,13 +92,7 @@ export const H264_NAL_TYPE_MTAP32 = 27; export function findH264NaluType(streamChunk: StreamChunk, naluType: number) { if (streamChunk.type !== 'h264') return; - const { chunks } = streamChunk; - for (let i = 1; i < chunks.length; i += 2) { - const chunk = chunks[i]; - const r = findH264NaluTypeInNalu(chunk.subarray(12), naluType); - if (r) - return r; - } + return findH264NaluTypeInNalu(streamChunk.chunks[streamChunk.chunks.length - 1].subarray(12), naluType); } export function findH264NaluTypeInNalu(nalu: Buffer, naluType: number) { @@ -130,15 +124,7 @@ export function findH264NaluTypeInNalu(nalu: Buffer, naluType: number) { export function getNaluTypes(streamChunk: StreamChunk) { if (streamChunk.type !== 'h264') return new Set(); - const sets: Set[] = []; - const { chunks } = streamChunk; - for (let i = 1; i < chunks.length; i += 2) { - const chunk = chunks[i]; - const r = getNaluTypesInNalu(chunk.subarray(12)); - sets.push(r); - } - - return new Set(sets.map(s => [...s]).flat()); + return getNaluTypesInNalu(streamChunk.chunks[streamChunk.chunks.length - 1].subarray(12)) } export function getNaluFragmentInformation(nalu: Buffer) { @@ -315,7 +301,7 @@ const quote = (str: string): string => `"${str.replace(/"/g, '\\"')}"`; export interface RtspClientSetupOptions { type: 'tcp' | 'udp'; path?: string; - onRtp: (...headerBuffers: [Buffer, Buffer][]) => void; + onRtp: (rtspHeader: Buffer, rtp: Buffer) => void; } export interface RtspClientTcpSetupOptions extends RtspClientSetupOptions { @@ -426,7 +412,7 @@ export class RtspClient extends RtspBase { const data = await readLength(this.client, length); const options = this.setupOptions.get(channel); - options?.onRtp?.([header, data]); + options?.onRtp?.(header, data); } async readDataPayload() { @@ -438,26 +424,32 @@ export class RtspClient extends RtspBase { return new Error('RTSP Client received invalid frame magic. This may be a bug in your camera firmware. If this error persists, switch your RTSP Parser to FFmpeg or Scrypted (UDP): ' + header.toString()); } + async readLoopLegacy() { + try { + while (true) { + if (this.needKeepAlive) { + this.needKeepAlive = false; + if (this.hasGetParameter) + await this.getParameter(); + else + await this.options(); + } + await this.readDataPayload(); + } + } + catch (e) { + this.client.destroy(e); + throw e; + } + } + async readLoop() { const deferred = new Deferred(); - let headerBuffers: [Buffer, Buffer][] = []; let header: Buffer; let channel: number; let length: number; - const flush = (newChannel?: number) => { - const c = channel; - channel = newChannel; - const channelChange = newChannel !== c; - if (!channelChange || !headerBuffers.length) - return; - const hb = headerBuffers; - headerBuffers = []; - const options = this.setupOptions.get(c); - options?.onRtp?.(...hb); - } - const read = async () => { if (this.needKeepAlive) { this.needKeepAlive = false; @@ -473,19 +465,14 @@ export class RtspClient extends RtspBase { if (!header) { header = this.client.read(4); - if (!header) { - // flush if waiting for a header. - flush(); + if (!header) return; - } // validate header once. if (header[0] !== RTSP_FRAME_MAGIC) { if (header.toString() !== 'RTSP') throw this.createBadHeader(header); - flush(); - this.client.unshift(header); header = undefined; @@ -502,23 +489,18 @@ export class RtspClient extends RtspBase { continue; } - const newChannel = header.readUInt8(1); - flush(newChannel); + channel = header.readUInt8(1); length = header.readUInt16BE(2); } - const currentChannel = channel; const data = this.client.read(length); - if (!data) { - // flush if waiting for data, but restore the channel. - flush(); - channel = currentChannel; + if (!data) return; - } const h = header; header = undefined; - headerBuffers.push([h, data]); + const options = this.setupOptions.get(channel); + options?.onRtp?.(h, data); } } catch (e) { @@ -692,7 +674,7 @@ export class RtspClient extends RtspBase { this.client.on('close', () => closeQuiet(udp.server)); } port = options.dgram.address().port; - options.dgram.on('message', data => options.onRtp([undefined, data])); + options.dgram.on('message', data => options.onRtp(undefined, data)); } headers = Object.assign({ Transport: `RTP/AVP${protocol};unicast;${client}=${port}-${port + 1}`, diff --git a/common/src/stream-parser.ts b/common/src/stream-parser.ts index 5e48d6ab0..24c7bd2f3 100644 --- a/common/src/stream-parser.ts +++ b/common/src/stream-parser.ts @@ -108,9 +108,8 @@ export function createMpegTsParser(options?: StreamParserOptions): StreamParser for (let prebufferIndex = 0; prebufferIndex < streamChunks.length; prebufferIndex++) { const streamChunk = streamChunks[prebufferIndex]; - const { chunks } = streamChunk; - for (let chunkIndex = 0; chunkIndex < chunks.length; chunkIndex++) { - const chunk = chunks[chunkIndex]; + for (let chunkIndex = 0; chunkIndex < streamChunk.chunks.length; chunkIndex++) { + const chunk = streamChunk.chunks[chunkIndex]; let offset = 0; while (offset + 188 < chunk.length) { diff --git a/plugins/prebuffer-mixin/src/main.ts b/plugins/prebuffer-mixin/src/main.ts index 56f5b0cd6..986d17897 100644 --- a/plugins/prebuffer-mixin/src/main.ts +++ b/plugins/prebuffer-mixin/src/main.ts @@ -676,26 +676,26 @@ class PrebufferSession { session.killed.finally(() => clearTimeout(refreshTimeout)); } - let shifts = 0; - let prebufferContainer: PrebufferStreamChunk[] = this.rtspPrebuffer; + let shifts = 0; + let prebufferContainer: PrebufferStreamChunk[] = this.rtspPrebuffer; - session.on('rtsp', (chunk: PrebufferStreamChunk) => { - const now = Date.now(); + session.on('rtsp', (chunk: PrebufferStreamChunk) => { + const now = Date.now(); - chunk.time = now; - prebufferContainer.push(chunk); + chunk.time = now; + prebufferContainer.push(chunk); - while (prebufferContainer.length && prebufferContainer[0].time < now - prebufferDurationMs) { - prebufferContainer.shift(); - shifts++; - } + while (prebufferContainer.length && prebufferContainer[0].time < now - prebufferDurationMs) { + prebufferContainer.shift(); + shifts++; + } - if (shifts > 100000) { - prebufferContainer = prebufferContainer.slice(); - this.rtspPrebuffer = prebufferContainer; - shifts = 0; - } - }); + if (shifts > 100000) { + prebufferContainer = prebufferContainer.slice(); + this.rtspPrebuffer = prebufferContainer; + shifts = 0; + } + }); session.start(); return session; @@ -946,22 +946,15 @@ class PrebufferSession { if (!interleavePassthrough) { if (channel == undefined) { const udp = serverPortMap.get(chunk.type); - if (udp) { - const { chunks } = chunk; - for (let i = 1; i < chunks.length; i += 2) { - const c = chunks[i]; - server.sendTrack(udp.control, c, chunk.type.startsWith('rtcp-')); - } - } + if (udp) + server.sendTrack(udp.control, chunk.chunks[1], chunk.type.startsWith('rtcp-')); return; } const chunks = chunk.chunks.slice(); - for (let i = 0; i < chunks.length; i += 2) { - const header = Buffer.from(chunks[0]); - header.writeUInt8(channel, 1); - chunks[i] = header; - } + const header = Buffer.from(chunks[0]); + header.writeUInt8(channel, 1); + chunks[0] = header; chunk = { startStream: chunk.startStream, chunks, @@ -972,12 +965,7 @@ class PrebufferSession { } if (server.writeStream) { - const { chunks } = chunk; - for (let i = 0; i < chunks.length; i += 2) { - const header = chunks[i]; - const rtp = chunks[i + 1]; - server.writeRtpPayload(header, rtp); - } + server.writeRtpPayload(chunk.chunks[0], chunk.chunks[1]); return; } @@ -1184,13 +1172,8 @@ class PrebufferMixin extends SettingsMixinDeviceBase implements Vid requestedPrebuffer, filter: (chunk, prebuffer) => { const track = map.get(chunk.type); - if (track) { - const { chunks } = chunk; - for (let i = 1; i < chunks.length; i += 2) { - const c = chunks[i]; - server.sendTrack(track, c, false); - } - } + if (track) + server.sendTrack(track, chunk.chunks[1], false); return undefined; } }); diff --git a/plugins/prebuffer-mixin/src/rfc4571.ts b/plugins/prebuffer-mixin/src/rfc4571.ts index 92d7aece3..c082d49c3 100644 --- a/plugins/prebuffer-mixin/src/rfc4571.ts +++ b/plugins/prebuffer-mixin/src/rfc4571.ts @@ -1,13 +1,14 @@ import { cloneDeep } from "@scrypted/common/src/clone-deep"; +import { ParserOptions, ParserSession, setupActivityTimer } from "@scrypted/common/src/ffmpeg-rebroadcast"; import { read16BELengthLoop } from "@scrypted/common/src/read-stream"; -import { H264_NAL_TYPE_SPS, RTSP_FRAME_MAGIC, findH264NaluType } from "@scrypted/common/src/rtsp-server"; +import { findH264NaluType, H264_NAL_TYPE_SPS, RTSP_FRAME_MAGIC } from "@scrypted/common/src/rtsp-server"; import { parseSdp } from "@scrypted/common/src/sdp-utils"; +import { sleep } from "@scrypted/common/src/sleep"; import { StreamChunk } from "@scrypted/common/src/stream-parser"; import { MediaStreamOptions, ResponseMediaStreamOptions } from "@scrypted/sdk"; import { parse as spsParse } from "h264-sps-parser"; import net from 'net'; import { EventEmitter, Readable } from "stream"; -import { ParserSession, setupActivityTimer } from "./ffmpeg-rebroadcast"; import { getSpsResolution } from "./sps-resolution"; export function negotiateMediaStream(sdp: string, mediaStreamOptions: MediaStreamOptions, inputVideoCodec: string, inputAudioCodec: string, requestMediaStream: MediaStreamOptions) { diff --git a/plugins/prebuffer-mixin/src/rtsp-session.ts b/plugins/prebuffer-mixin/src/rtsp-session.ts index 649086055..2fba4d1eb 100644 --- a/plugins/prebuffer-mixin/src/rtsp-session.ts +++ b/plugins/prebuffer-mixin/src/rtsp-session.ts @@ -1,12 +1,12 @@ -import { closeQuiet } from "@scrypted/common/src/listen-cluster"; -import { H264_NAL_TYPE_SPS, RTSP_FRAME_MAGIC, RtspClient, RtspClientUdpSetupOptions, findH264NaluType, parseSemicolonDelimited } from "@scrypted/common/src/rtsp-server"; +import { ParserSession, setupActivityTimer } from "@scrypted/common/src/ffmpeg-rebroadcast"; +import { closeQuiet, createBindZero } from "@scrypted/common/src/listen-cluster"; +import { findH264NaluType, H264_NAL_TYPE_SPS, parseSemicolonDelimited, RtspClient, RtspClientUdpSetupOptions, RTSP_FRAME_MAGIC } from "@scrypted/common/src/rtsp-server"; import { parseSdp } from "@scrypted/common/src/sdp-utils"; import { StreamChunk } from "@scrypted/common/src/stream-parser"; import { ResponseMediaStreamOptions } from "@scrypted/sdk"; import dgram from 'dgram'; import { parse as spsParse } from "h264-sps-parser"; import { EventEmitter } from "stream"; -import { ParserSession, setupActivityTimer } from "./ffmpeg-rebroadcast"; import { negotiateMediaStream } from "./rfc4571"; import { getSpsResolution } from "./sps-resolution"; @@ -95,15 +95,13 @@ export async function startRtspSession(console: Console, url: string, mediaStrea const setup: RtspClientUdpSetupOptions = { path: control, type: 'udp', - onRtp: (...headerBuffers) => { + onRtp: (header, data) => { + const prefix = Buffer.alloc(4); + prefix.writeUInt8(RTSP_FRAME_MAGIC, 0); + prefix.writeUInt8(rtspChannel, 1); + prefix.writeUInt16BE(data.length, 2); const chunk: StreamChunk = { - chunks: headerBuffers.map(headerBuffer => headerBuffer[1]).map(data => { - const prefix = Buffer.alloc(4); - prefix.writeUInt8(RTSP_FRAME_MAGIC, 0); - prefix.writeUInt8(rtspChannel, 1); - prefix.writeUInt16BE(data.length, 2); - return [prefix, data]; - }).flat(), + chunks: [prefix, data], type: codec, }; events.emit('rtsp', chunk); @@ -131,9 +129,9 @@ export async function startRtspSession(console: Console, url: string, mediaStrea path: control, type: 'tcp', port: channel, - onRtp: (...headerBuffers) => { + onRtp: (header, data) => { const chunk: StreamChunk = { - chunks: headerBuffers.flat(), + chunks: [header, data], type: codec, }; events.emit('rtsp', chunk);