From 5432b5b917ca1d5ea35a5b074e97802a22ad32cc Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Thu, 16 May 2024 22:33:23 -0700 Subject: [PATCH] rebroadcast: more parser refactor --- common/src/rtsp-server.ts | 17 ++++++++-------- plugins/prebuffer-mixin/src/rtsp-session.ts | 22 +++++++++++---------- plugins/webrtc/src/rtp-forwarders.ts | 12 +++++++++-- 3 files changed, 30 insertions(+), 21 deletions(-) diff --git a/common/src/rtsp-server.ts b/common/src/rtsp-server.ts index 958f3a0c2..ffa8879be 100644 --- a/common/src/rtsp-server.ts +++ b/common/src/rtsp-server.ts @@ -315,7 +315,7 @@ const quote = (str: string): string => `"${str.replace(/"/g, '\\"')}"`; export interface RtspClientSetupOptions { type: 'tcp' | 'udp'; path?: string; - onRtp: (...headerBuffers: [Buffer, Buffer][]) => void; + onRtp: (headerBuffers: Buffer[]) => void; } export interface RtspClientTcpSetupOptions extends RtspClientSetupOptions { @@ -441,7 +441,7 @@ export class RtspClient extends RtspBase { async readLoop() { const deferred = new Deferred(); - let headerBuffers: [Buffer, Buffer][] = []; + let headerBuffers: Buffer[] = []; let header: Buffer; let channel: number; let length: number; @@ -449,13 +449,12 @@ export class RtspClient extends RtspBase { const flush = (newChannel?: number) => { const c = channel; channel = newChannel; - const channelChange = newChannel !== c; - if (!channelChange || !headerBuffers.length) + if (!headerBuffers.length || newChannel === c) return; const hb = headerBuffers; headerBuffers = []; const options = this.setupOptions.get(c); - options?.onRtp?.(...hb); + options?.onRtp?.(hb); } const read = async () => { @@ -481,11 +480,11 @@ export class RtspClient extends RtspBase { // validate header once. if (header[0] !== RTSP_FRAME_MAGIC) { + flush(); + if (header.toString() !== 'RTSP') throw this.createBadHeader(header); - flush(); - this.client.unshift(header); header = undefined; @@ -507,10 +506,10 @@ export class RtspClient extends RtspBase { length = header.readUInt16BE(2); } - const currentChannel = channel; const data = this.client.read(length); if (!data) { // flush if waiting for data, but restore the channel. + const currentChannel = channel; flush(); channel = currentChannel; return; @@ -518,7 +517,7 @@ export class RtspClient extends RtspBase { const h = header; header = undefined; - headerBuffers.push([h, data]); + headerBuffers.push(h, data); } } catch (e) { diff --git a/plugins/prebuffer-mixin/src/rtsp-session.ts b/plugins/prebuffer-mixin/src/rtsp-session.ts index 649086055..8519d2ac6 100644 --- a/plugins/prebuffer-mixin/src/rtsp-session.ts +++ b/plugins/prebuffer-mixin/src/rtsp-session.ts @@ -95,15 +95,17 @@ export async function startRtspSession(console: Console, url: string, mediaStrea const setup: RtspClientUdpSetupOptions = { path: control, type: 'udp', - onRtp: (...headerBuffers) => { + onRtp: (headerBuffers) => { + for (let i = 0; i < headerBuffers.length; i += 2) { + const data = headerBuffers[i + 1]; + const prefix = Buffer.alloc(4); + prefix.writeUInt8(RTSP_FRAME_MAGIC, 0); + prefix.writeUInt8(rtspChannel, 1); + prefix.writeUInt16BE(data.length, 2); + headerBuffers[i] = prefix; + } const chunk: StreamChunk = { - chunks: headerBuffers.map(headerBuffer => headerBuffer[1]).map(data => { - const prefix = Buffer.alloc(4); - prefix.writeUInt8(RTSP_FRAME_MAGIC, 0); - prefix.writeUInt8(rtspChannel, 1); - prefix.writeUInt16BE(data.length, 2); - return [prefix, data]; - }).flat(), + chunks: headerBuffers, type: codec, }; events.emit('rtsp', chunk); @@ -131,9 +133,9 @@ export async function startRtspSession(console: Console, url: string, mediaStrea path: control, type: 'tcp', port: channel, - onRtp: (...headerBuffers) => { + onRtp: (headerBuffers) => { const chunk: StreamChunk = { - chunks: headerBuffers.flat(), + chunks: headerBuffers, type: codec, }; events.emit('rtsp', chunk); diff --git a/plugins/webrtc/src/rtp-forwarders.ts b/plugins/webrtc/src/rtp-forwarders.ts index 643127e88..6828974e0 100644 --- a/plugins/webrtc/src/rtp-forwarders.ts +++ b/plugins/webrtc/src/rtp-forwarders.ts @@ -66,7 +66,11 @@ async function setupRtspClient(console: Console, rtspClient: RtspClient, channel const result = await rtspClient.setup({ type: 'udp', path: section.control, - onRtp: (rtspHeader, rtp) => deliver(rtp), + onRtp: (headerBuffers) => { + for (let i = 1; i < headerBuffers.length; i += 2) { + deliver(headerBuffers[i]); + } + }, }); console.log('rtsp/udp', section.codec, result); return false; @@ -80,7 +84,11 @@ async function setupRtspClient(console: Console, rtspClient: RtspClient, channel type: 'tcp', port: channel, path: section.control, - onRtp: (rtspHeader, rtp) => deliver(rtp), + onRtp: (headerBuffers) => { + for (let i = 1; i < headerBuffers.length; i += 2) { + deliver(headerBuffers[i]); + } + }, }); console.log('rtsp/tcp', section.codec); return true;