rebroadcast: more parser refactor

This commit is contained in:
Koushik Dutta
2024-05-16 22:33:23 -07:00
parent f677cf7393
commit 5432b5b917
3 changed files with 30 additions and 21 deletions

View File

@@ -315,7 +315,7 @@ const quote = (str: string): string => `"${str.replace(/"/g, '\\"')}"`;
export interface RtspClientSetupOptions {
type: 'tcp' | 'udp';
path?: string;
onRtp: (...headerBuffers: [Buffer, Buffer][]) => void;
onRtp: (headerBuffers: Buffer[]) => void;
}
export interface RtspClientTcpSetupOptions extends RtspClientSetupOptions {
@@ -441,7 +441,7 @@ export class RtspClient extends RtspBase {
async readLoop() {
const deferred = new Deferred<void>();
let headerBuffers: [Buffer, Buffer][] = [];
let headerBuffers: Buffer[] = [];
let header: Buffer;
let channel: number;
let length: number;
@@ -449,13 +449,12 @@ export class RtspClient extends RtspBase {
const flush = (newChannel?: number) => {
const c = channel;
channel = newChannel;
const channelChange = newChannel !== c;
if (!channelChange || !headerBuffers.length)
if (!headerBuffers.length || newChannel === c)
return;
const hb = headerBuffers;
headerBuffers = [];
const options = this.setupOptions.get(c);
options?.onRtp?.(...hb);
options?.onRtp?.(hb);
}
const read = async () => {
@@ -481,11 +480,11 @@ export class RtspClient extends RtspBase {
// validate header once.
if (header[0] !== RTSP_FRAME_MAGIC) {
flush();
if (header.toString() !== 'RTSP')
throw this.createBadHeader(header);
flush();
this.client.unshift(header);
header = undefined;
@@ -507,10 +506,10 @@ export class RtspClient extends RtspBase {
length = header.readUInt16BE(2);
}
const currentChannel = channel;
const data = this.client.read(length);
if (!data) {
// flush if waiting for data, but restore the channel.
const currentChannel = channel;
flush();
channel = currentChannel;
return;
@@ -518,7 +517,7 @@ export class RtspClient extends RtspBase {
const h = header;
header = undefined;
headerBuffers.push([h, data]);
headerBuffers.push(h, data);
}
}
catch (e) {

View File

@@ -95,15 +95,17 @@ export async function startRtspSession(console: Console, url: string, mediaStrea
const setup: RtspClientUdpSetupOptions = {
path: control,
type: 'udp',
onRtp: (...headerBuffers) => {
onRtp: (headerBuffers) => {
for (let i = 0; i < headerBuffers.length; i += 2) {
const data = headerBuffers[i + 1];
const prefix = Buffer.alloc(4);
prefix.writeUInt8(RTSP_FRAME_MAGIC, 0);
prefix.writeUInt8(rtspChannel, 1);
prefix.writeUInt16BE(data.length, 2);
headerBuffers[i] = prefix;
}
const chunk: StreamChunk = {
chunks: headerBuffers.map(headerBuffer => headerBuffer[1]).map(data => {
const prefix = Buffer.alloc(4);
prefix.writeUInt8(RTSP_FRAME_MAGIC, 0);
prefix.writeUInt8(rtspChannel, 1);
prefix.writeUInt16BE(data.length, 2);
return [prefix, data];
}).flat(),
chunks: headerBuffers,
type: codec,
};
events.emit('rtsp', chunk);
@@ -131,9 +133,9 @@ export async function startRtspSession(console: Console, url: string, mediaStrea
path: control,
type: 'tcp',
port: channel,
onRtp: (...headerBuffers) => {
onRtp: (headerBuffers) => {
const chunk: StreamChunk = {
chunks: headerBuffers.flat(),
chunks: headerBuffers,
type: codec,
};
events.emit('rtsp', chunk);

View File

@@ -66,7 +66,11 @@ async function setupRtspClient(console: Console, rtspClient: RtspClient, channel
const result = await rtspClient.setup({
type: 'udp',
path: section.control,
onRtp: (rtspHeader, rtp) => deliver(rtp),
onRtp: (headerBuffers) => {
for (let i = 1; i < headerBuffers.length; i += 2) {
deliver(headerBuffers[i]);
}
},
});
console.log('rtsp/udp', section.codec, result);
return false;
@@ -80,7 +84,11 @@ async function setupRtspClient(console: Console, rtspClient: RtspClient, channel
type: 'tcp',
port: channel,
path: section.control,
onRtp: (rtspHeader, rtp) => deliver(rtp),
onRtp: (headerBuffers) => {
for (let i = 1; i < headerBuffers.length; i += 2) {
deliver(headerBuffers[i]);
}
},
});
console.log('rtsp/tcp', section.codec);
return true;