rebroadcast: slop rtmp implementation

This commit is contained in:
Koushik Dutta
2026-01-08 21:29:05 -08:00
parent 85074aaa7a
commit 1349bb7433
6 changed files with 1382 additions and 9 deletions

View File

@@ -37,6 +37,7 @@
"realfs": true
},
"dependencies": {
"@koush/werift-src": "file:../../external/werift",
"@scrypted/common": "file:../../common",
"@scrypted/sdk": "file:../../sdk",
"h264-sps-parser": "^0.2.1",

View File

@@ -0,0 +1,119 @@
/**
* Creates an AU header for AAC frames in MPEG-4 Generic format (RTP)
*
* @param frameSize - Size of the AAC frame in bytes
* @param auIndex - AU index (default 0 for continuous streams)
* @param sizeLength - Number of bits for frame size field (default 13)
* @param indexLength - Number of bits for AU index field (default 3)
* @returns The AU header as a Buffer
*/
export function createAUHeader(
frameSize: number,
auIndex: number = 0,
sizeLength: number = 13,
indexLength: number = 3
): Buffer {
// Calculate total header bits and bytes
const totalBits = sizeLength + indexLength;
const totalBytes = Math.ceil(totalBits / 8);
// Validate inputs
if (frameSize < 0 || frameSize > ((1 << sizeLength) - 1)) {
throw new Error(`Frame size ${frameSize} is too large for sizeLength ${sizeLength} (max ${(1 << sizeLength) - 1})`);
}
if (auIndex < 0 || auIndex > ((1 << indexLength) - 1)) {
throw new Error(`AU index ${auIndex} is too large for indexLength ${indexLength} (max ${(1 << indexLength) - 1})`);
}
// Combine size and index into a single value
const combinedValue = (frameSize << indexLength) | auIndex;
// Convert to bytes (little-endian within the multi-byte value)
const header = new Buffer(totalBytes);
for (let i = 0; i < totalBytes; i++) {
header[i] = (combinedValue >> ((totalBytes - 1 - i) * 8)) & 0xFF;
}
return header;
}
/**
* Creates the AU-header-length field (precedes the AU headers in RTP payload)
*
* @param totalAUHeadersBytes - Total bytes of all AU headers combined
* @returns AU-header-length as a 2-byte Buffer (big-endian)
*/
export function createAUHeaderLength(totalAUHeadersBytes: number): Buffer {
const headerLengthBits = totalAUHeadersBytes * 8;
if (headerLengthBits > 65535) {
throw new Error('Total AU header bits exceeds 16-bit limit');
}
// AU-header-length is a 16-bit integer in network byte order (big-endian)
const lengthHeader = new Buffer(2);
lengthHeader[0] = (headerLengthBits >> 8) & 0xFF;
lengthHeader[1] = headerLengthBits & 0xFF;
return lengthHeader;
}
/**
* Given raw AAC frames, creates the complete RTP payload with AU headers
*
* @param frames - Array of raw AAC frames (no ADTS headers)
* @param sizeLength - Number of bits for frame size field (default 13)
* @param indexLength - Number of bits for AU index field (default 3)
* @returns Complete RTP payload (AU-header-length + AU headers + raw frames)
*/
export function createAACRTPPayload(
frames: Buffer[],
sizeLength: number = 13,
indexLength: number = 3
): Buffer {
if (frames.length === 0) {
throw new Error('No frames provided');
}
// Create AU headers for all frames
const auHeaders: Buffer[] = [];
let totalAUHeaderBytes = 0;
for (let i = 0; i < frames.length; i++) {
const auHeader = createAUHeader(frames[i].length, 0, sizeLength, indexLength);
auHeaders.push(auHeader);
totalAUHeaderBytes += auHeader.length;
}
// Create AU-header-length field
const headerLengthField = createAUHeaderLength(totalAUHeaderBytes);
// Calculate total payload size
let totalSize = headerLengthField.length + totalAUHeaderBytes;
for (const frame of frames) {
totalSize += frame.length;
}
// Assemble the payload
const payload = new Buffer(totalSize);
let offset = 0;
// Copy AU-header-length
payload.set(headerLengthField, offset);
offset += headerLengthField.length;
// Copy AU headers
for (const header of auHeaders) {
payload.set(header, offset);
offset += header.length;
}
// Copy raw AAC frames
for (const frame of frames) {
payload.set(frame, offset);
offset += frame.length;
}
return payload;
}

View File

@@ -0,0 +1,408 @@
/**
* FLV Audio/Video tag payload parser
* RTMP messages for audio (type 8) and video (type 9) contain FLV tag payloads
*/
// ============================================================================
// Video Tag Types (in FLV header, byte 0, low nibble)
// ============================================================================
export enum VideoCodecId {
JPEG = 1,
SORENSON_H263 = 2,
SCREEN_VIDEO = 3,
ON2_VP6 = 4,
ON2_VP6_WITH_ALPHA = 5,
SCREEN_VIDEO_V2 = 6,
H264 = 7,
}
// ============================================================================
// Video Frame Types (in FLV header, byte 0, high nibble)
// ============================================================================
export enum VideoFrameType {
KEY = 1, // Keyframe (I-frame)
INTER = 2, // Inter frame (P-frame)
DISPOSABLE_INTER = 3, // Disposable inter frame
GENERATED_KEYFRAME = 4,
VIDEO_INFO = 5, // Video info/command frame
}
// ============================================================================
// AVC Packet Types (byte 1 for H.264 codec)
// ============================================================================
export enum AVC_PACKET_TYPE {
SEQUENCE_HEADER = 0, // AVC sequence header (decoder configuration)
NALU = 1, // AVC NALU unit
END_OF_SEQUENCE = 2, // AVC end of sequence
}
// ============================================================================
// Audio Sound Formats (in FLV header, byte 0, top 4 bits)
// ============================================================================
export enum AudioSoundFormat {
PCM_BE = 0,
ADPCM = 1,
MP3 = 2,
PCM_LE = 3,
NELLYMOSER_16K = 4,
NELLYMOSER_8K = 5,
NELLYMOSER = 6,
G711_A = 7,
G711_U = 8,
AAC = 10,
SPEEX = 11,
MP3_8K = 14,
}
// ============================================================================
// Audio Sound Rates (in FLV header, byte 0, bits 2-3)
// ============================================================================
export enum AudioSoundRate {
_5_5KHZ = 0,
_11KHZ = 1,
_22KHZ = 2,
_44KHZ = 3,
}
// ============================================================================
// Audio Sound Size (in FLV header, byte 0, bit 1)
// ============================================================================
export enum AudioSoundSize {
SAMPLE_8BIT = 0,
SAMPLE_16BIT = 1,
}
// ============================================================================
// Audio Sound Type (in FLV header, byte 0, bit 0)
// ============================================================================
export enum AudioSoundType {
MONO = 0,
STEREO = 1,
}
// ============================================================================
// AAC Packet Types (byte 1 for AAC codec)
// ============================================================================
export enum AAC_PACKET_TYPE {
SEQUENCE_HEADER = 0, // AAC sequence header (AudioSpecificConfig)
RAW = 1, // AAC raw data
}
// ============================================================================
// Parsed Video Tag Structure
// ============================================================================
export interface FlvVideoTag {
frameType: VideoFrameType;
codecId: VideoCodecId;
// H.264 specific
avcPacketType?: AVC_PACKET_TYPE;
compositionTime?: number;
// H.264 sequence header
avcDecoderConfigurationRecord?: {
configurationVersion: number;
avcProfileIndication: number;
profileCompatibility: number;
avcLevelIndication: number;
lengthSizeMinusOne: number; // NALU length = (value & 0x03) + 1
sps: Buffer[]; // Sequence parameter sets
pps: Buffer[]; // Picture parameter sets
};
// H.264 NALU data
nalus?: Buffer[];
// Raw payload (for non-H.264 codecs)
rawPayload?: Buffer;
}
// ============================================================================
// Parsed Audio Tag Structure
// ============================================================================
export interface FlvAudioTag {
soundFormat: AudioSoundFormat;
soundRate: AudioSoundRate;
soundSize: AudioSoundSize;
soundType: AudioSoundType;
// AAC specific
aacPacketType?: AAC_PACKET_TYPE;
// AAC sequence header (AudioSpecificConfig)
audioSpecificConfig?: {
audioObjectType: number;
samplingFrequencyIndex: number;
channelConfiguration: number;
};
// Raw audio data
data: Buffer;
}
// ============================================================================
// Parser Result
// ============================================================================
export type FlvTag = FlvVideoTag | FlvAudioTag;
// ============================================================================
// Parse AVCDecoderConfigurationRecord (H.264 decoder configuration)
// ============================================================================
function parseAVCDecoderConfigurationRecord(buffer: Buffer, offset: number, length: number): {
config: FlvVideoTag['avcDecoderConfigurationRecord'],
bytesConsumed: number
} {
if (length < 6) {
throw new Error('AVCDecoderConfigurationRecord too short');
}
const config: FlvVideoTag['avcDecoderConfigurationRecord'] = {
configurationVersion: buffer[offset],
avcProfileIndication: buffer[offset + 1],
profileCompatibility: buffer[offset + 2],
avcLevelIndication: buffer[offset + 3],
lengthSizeMinusOne: buffer[offset + 4] & 0x03,
sps: [],
pps: [],
};
const numSPS = buffer[offset + 5] & 0x1F;
let pos = offset + 6;
// Parse SPS
for (let i = 0; i < numSPS; i++) {
if (pos + 2 > buffer.length) {
throw new Error('AVCDecoderConfigurationRecord truncated reading SPS length');
}
const spsLength = buffer.readUInt16BE(pos);
pos += 2;
if (pos + spsLength > buffer.length) {
throw new Error(`AVCDecoderConfigurationRecord: SPS data exceeds buffer length`);
}
config.sps.push(buffer.subarray(pos, pos + spsLength));
pos += spsLength;
}
// Parse PPS
if (pos >= buffer.length) {
return { config, bytesConsumed: pos - offset };
}
const numPPS = buffer[pos];
pos++;
for (let i = 0; i < numPPS; i++) {
if (pos + 2 > buffer.length) {
throw new Error('AVCDecoderConfigurationRecord truncated reading PPS length');
}
const ppsLength = buffer.readUInt16BE(pos);
pos += 2;
if (pos + ppsLength > buffer.length) {
throw new Error(`AVCDecoderConfigurationRecord: PPS data exceeds buffer length`);
}
config.pps.push(buffer.subarray(pos, pos + ppsLength));
pos += ppsLength;
}
return { config, bytesConsumed: pos - offset };
}
// ============================================================================
// Parse H.264 NALU units from AVCPacketType=1 payload
// The NALUs are preceded by length fields (size = lengthSizeMinusOne + 1)
// ============================================================================
function parseNALUUnits(buffer: Buffer, offset: number, length: number, naluLengthSize: number): Buffer[] {
const nalus: Buffer[] = [];
let pos = offset;
if (naluLengthSize < 1 || naluLengthSize > 4) {
throw new Error(`Invalid NALU length size: ${naluLengthSize}`);
}
while (pos + naluLengthSize <= offset + length) {
let naluLength = 0;
for (let i = 0; i < naluLengthSize; i++) {
naluLength = (naluLength << 8) | buffer[pos + i];
}
pos += naluLengthSize;
if (naluLength === 0) {
continue; // Skip zero-length NALUs
}
if (pos + naluLength > offset + length) {
throw new Error(`NALU data exceeds buffer length at position ${pos}`);
}
nalus.push(buffer.subarray(pos, pos + naluLength));
pos += naluLength;
}
return nalus;
}
// ============================================================================
// Parse AudioSpecificConfig (AAC decoder configuration)
// ============================================================================
function parseAudioSpecificConfig(buffer: Buffer, offset: number, length: number): {
aacConfig: FlvAudioTag['audioSpecificConfig'],
bytesConsumed: number
} {
if (length < 2) {
throw new Error('AudioSpecificConfig too short');
}
// AudioSpecificConfig is 2+ bytes, bit-packed
const byte0 = buffer[offset];
const byte1 = buffer[offset + 1];
const aacConfig: FlvAudioTag['audioSpecificConfig'] = {
audioObjectType: (byte0 >> 3) & 0x1F,
samplingFrequencyIndex: ((byte0 & 0x07) << 1) | ((byte1 >> 7) & 0x01),
channelConfiguration: (byte1 >> 3) & 0x0F,
};
return { aacConfig, bytesConsumed: 2 };
}
// ============================================================================
// Parse FLV Video Tag Payload
// ============================================================================
export function parseFlvVideoTag(buffer: Buffer): FlvVideoTag {
if (buffer.length < 1) {
throw new Error('Video tag too short');
}
const byte0 = buffer[0];
const frameType = (byte0 >> 4) as VideoFrameType;
const codecId = (byte0 & 0x0F) as VideoCodecId;
const result: FlvVideoTag = {
frameType,
codecId,
};
if (codecId === VideoCodecId.H264) {
// H.264/AVC codec
if (buffer.length < 5) {
throw new Error('H.264 video tag too short');
}
result.avcPacketType = buffer[1] as AVC_PACKET_TYPE;
result.compositionTime = buffer.readIntBE(2, 3);
switch (result.avcPacketType) {
case AVC_PACKET_TYPE.SEQUENCE_HEADER: {
const data = buffer.subarray(5);
const parsed = parseAVCDecoderConfigurationRecord(data, 0, data.length);
result.avcDecoderConfigurationRecord = parsed.config;
break;
}
case AVC_PACKET_TYPE.NALU: {
// Need to know NALU length size from the sequence header
// We'll assume 4 bytes (most common) if not provided
const naluLengthSize = 4;
const data = buffer.subarray(5);
result.nalus = parseNALUUnits(data, 0, data.length, naluLengthSize);
break;
}
case AVC_PACKET_TYPE.END_OF_SEQUENCE:
// No payload
break;
}
} else {
// Other video codecs - just return raw payload
result.rawPayload = buffer.subarray(1);
}
return result;
}
// ============================================================================
// Parse FLV Audio Tag Payload
// ============================================================================
export function parseFlvAudioTag(buffer: Buffer): FlvAudioTag {
if (buffer.length < 1) {
throw new Error('Audio tag too short');
}
const byte0 = buffer[0];
const soundFormat: AudioSoundFormat = (byte0 >> 4) & 0x0F;
const soundRate: AudioSoundRate = (byte0 >> 2) & 0x03;
const soundSize: AudioSoundSize = (byte0 >> 1) & 0x01;
const soundType: AudioSoundType = byte0 & 0x01;
const result: FlvAudioTag = {
soundFormat,
soundRate,
soundSize,
soundType,
data: Buffer.alloc(0),
};
if (soundFormat === AudioSoundFormat.AAC) {
if (buffer.length < 2) {
throw new Error('AAC audio tag too short');
}
result.aacPacketType = buffer[1] as AAC_PACKET_TYPE;
if (result.aacPacketType === AAC_PACKET_TYPE.SEQUENCE_HEADER) {
const data = buffer.subarray(2);
const parsed = parseAudioSpecificConfig(data, 0, data.length);
result.audioSpecificConfig = parsed.aacConfig;
} else {
result.data = buffer.subarray(2);
}
} else {
// Raw audio data for other formats
result.data = buffer.subarray(1);
}
return result;
}
// ============================================================================
// Parse FLV Tag (auto-detect video or audio based on codec/format)
// This function requires you to know the RTMP message type (8=audio, 9=video)
// ============================================================================
export function parseFlvTag(buffer: Buffer, messageType: number): FlvTag {
if (messageType === 9) {
return parseFlvVideoTag(buffer);
} else if (messageType === 8) {
return parseFlvAudioTag(buffer);
} else {
throw new Error(`Unsupported message type for FLV parsing: ${messageType}`);
}
}
// ============================================================================
// Parse H.264 NALU unit type (5-bit value in first byte's low bits)
// ============================================================================
export function parseNALUHeader(buffer: Buffer): number {
if (buffer.length < 1) {
throw new Error('NALU too short');
}
return buffer[0] & 0x1F;
}
// ============================================================================
// Helper: Format H.264 NALU unit type name
// ============================================================================
export function getNALUTypeName(nalcType: number): string {
const types: Record<number, string> = {
1: 'slice_layer_without_partitioning_non_idr',
5: 'slice_layer_without_partitioning_idr',
6: 'sei',
7: 'seq_parameter_set',
8: 'pic_parameter_set',
9: 'access_unit_delimiter',
};
return types[nalcType] || `unknown (${nalcType})`;
}

View File

@@ -19,6 +19,7 @@ import { FileRtspServer } from './file-rtsp-server';
import { getUrlLocalAdresses } from './local-addresses';
import { REBROADCAST_MIXIN_INTERFACE_TOKEN } from './rebroadcast-mixin-token';
import { connectRFC4571Parser, startRFC4571Parser } from './rfc4571';
import { startRtmpSession } from './rtmp-session';
import { RtspSessionParserSpecific, startRtspSession } from './rtsp-session';
import { getSpsResolution } from './sps-resolution';
import { createStreamSettings } from './stream-settings';
@@ -187,13 +188,17 @@ class PrebufferSession {
return mediaStreamOptions?.container?.startsWith('rtsp');
}
canUseRtmpParser(mediaStreamOptions: MediaStreamOptions) {
return mediaStreamOptions?.container?.startsWith('rtmp');
}
getParser(mediaStreamOptions: MediaStreamOptions) {
let parser: string;
let rtspParser = this.storage.getItem(this.rtspParserKey);
let isDefault = !rtspParser || rtspParser === 'Default';
if (!this.canUseRtspParser(mediaStreamOptions)) {
if (!this.canUseRtspParser(mediaStreamOptions) && !this.canUseRtmpParser(mediaStreamOptions)) {
parser = STRING_DEFAULT;
isDefault = true;
rtspParser = undefined;
@@ -340,7 +345,7 @@ class PrebufferSession {
let usingFFmpeg = true;
if (this.canUseRtspParser(this.advertisedMediaStreamOptions)) {
if (this.canUseRtspParser(this.advertisedMediaStreamOptions) || this.canUseRtmpParser(this.advertisedMediaStreamOptions)) {
const parser = this.getParser(this.advertisedMediaStreamOptions);
const defaultValue = parser.parser;
@@ -539,14 +544,26 @@ class PrebufferSession {
this.usingScryptedUdpParser = parser === SCRYPTED_PARSER_UDP;
if (this.usingScryptedParser) {
const rtspParser = createRtspParser();
rbo.parsers.rtsp = rtspParser;
if (this.canUseRtmpParser(sessionMso)) {
// rtmp becomes repackaged as rtsp
const rtspParser = createRtspParser();
rbo.parsers.rtsp = rtspParser;
session = await startRtspSession(this.console, ffmpegInput.url, ffmpegInput.mediaStreamOptions, {
useUdp: parser === SCRYPTED_PARSER_UDP,
audioSoftMuted,
rtspRequestTimeout: 10000,
});
session = await startRtmpSession(this.console, ffmpegInput.url, ffmpegInput.mediaStreamOptions, {
audioSoftMuted,
rtspRequestTimeout: 10000,
});
}
else {
const rtspParser = createRtspParser();
rbo.parsers.rtsp = rtspParser;
session = await startRtspSession(this.console, ffmpegInput.url, ffmpegInput.mediaStreamOptions, {
useUdp: parser === SCRYPTED_PARSER_UDP,
audioSoftMuted,
rtspRequestTimeout: 10000,
});
}
}
else {
let acodec: string[];

View File

@@ -0,0 +1,604 @@
import { readLength } from '@scrypted/common/src/read-stream';
import { Socket } from 'net';
function writeUInt24BE(buffer: Buffer, value: number, offset: number): void {
buffer[offset] = (value >> 16) & 0xFF;
buffer[offset + 1] = (value >> 8) & 0xFF;
buffer[offset + 2] = value & 0xFF;
}
// Constants
const HANDSHAKE_SIZE = 1536;
const RTMP_VERSION = 3;
// Chunk format types
enum ChunkFormat {
TYPE_0 = 0,
TYPE_1 = 1,
TYPE_2 = 2,
TYPE_3 = 3
}
// RTMP message types
enum RtmpMessageType {
CHUNK_SIZE = 1,
ABORT = 2,
ACKNOWLEDGEMENT = 3,
USER_CONTROL = 4,
WINDOW_ACKNOWLEDGEMENT_SIZE = 5,
SET_PEER_BANDWIDTH = 6,
AUDIO = 8,
VIDEO = 9,
DATA_AMF0 = 18,
COMMAND_AMF0 = 20
}
// Control messages
export class SetChunkSize {
constructor(public chunkSize: number) { }
}
export class UserControlSetBufferLength {
constructor(public streamId: number, public bufferLength: number) { }
}
export interface CreateStreamResult {
streamId: number;
}
export interface OnStatusResult {
level: string;
code: string;
description: string;
}
interface ChunkStream {
chunkStreamId: number;
messageStreamId: number;
messageLength: number;
messageTypeId: number;
timestamp: number;
sequenceNumber: number;
messageData: Buffer[];
totalReceived: number;
hasExtendedTimestamp: boolean;
}
export class RtmpClient {
socket: Socket | null = null;
private chunkSize: number = 128;
private outgoingChunkSize: number = 128;
private windowAckSize: number = 5000000;
private streamId: number = 0;
private connected: boolean = false;
private receivedWindowBytes: number = 0;
private transactionId: number = 1;
private chunkStreams: Map<number, ChunkStream> = new Map();
constructor(public url: string, public console?: Console) {
this.socket = new Socket();
}
async setup() {
this.console?.log('Starting stream()...');
await this.connect();
// Send connect command
this.console?.log('Sending connect command...');
await this.sendConnect();
this.console?.log('Connect command sent');
while (true) {
const msg = await this.readMessage();
const { messageTypeId } = msg.chunkStream;
if (messageTypeId === RtmpMessageType.WINDOW_ACKNOWLEDGEMENT_SIZE) {
continue;
}
if (messageTypeId === RtmpMessageType.SET_PEER_BANDWIDTH) {
continue;
}
if (messageTypeId === RtmpMessageType.CHUNK_SIZE) {
const newChunkSize = msg.message.readUInt32BE(0);
this.console?.log(`Server set chunk size to ${newChunkSize}`);
this.chunkSize = newChunkSize;
continue;
}
if (messageTypeId === RtmpMessageType.COMMAND_AMF0) {
// Parse AMF0 command
// For simplicity, we only handle _result for connect here
const commandName = msg.message.subarray(3, 10).toString('utf8');
if (commandName === '_result') {
this.console?.log('Received _result for connect');
break;
}
throw new Error(`Unexpected command: ${commandName}`);
}
throw new Error(`Unexpected message type: ${messageTypeId}`);
}
// Send window acknowledgement size
this.sendWindowAckSize(5000000);
// Send createStream
this.console?.log('Sending createStream...');
this.streamId = await this.sendCreateStream();
// Wait for _result for createStream
const createStreamResult = await this.readMessage();
// check it
const { messageTypeId } = createStreamResult.chunkStream;
if (messageTypeId !== RtmpMessageType.COMMAND_AMF0) {
throw new Error(`Unexpected message type waiting for createStream result: ${messageTypeId}, expected COMMAND_AMF0`);
}
this.console?.log('Got createStream _result');
// Send getStreamLength then play (matching ffmpeg's order)
const parsedUrl = new URL(this.url);
// Extract stream name (after /app/)
const parts = parsedUrl.pathname.split('/');
const streamName = parts.length > 2 ? parts.slice(2).join('/') : '';
const playPath = streamName + parsedUrl.search;
this.console?.log('Sending getStreamLength with path:', playPath);
const getStreamLengthData = this.encodeAMF0Command('getStreamLength', this.transactionId++, null, playPath);
this.sendMessage(5, 0, RtmpMessageType.COMMAND_AMF0, 0, getStreamLengthData);
this.console?.log('Sending play command with path:', playPath);
this.sendPlay(this.streamId, playPath);
this.console?.log('Sending setBufferLength...');
this.setBufferLength(this.streamId, 3000);
}
/**
* Connect to the RTMP server and start streaming
*/
async *readLoop(): AsyncGenerator<{
packet: Buffer,
codec: string,
timestamp: number,
}> {
this.console?.log('Starting to yield video/audio packets...');
// Just yield video/audio packets as they arrive
while (true) {
const msg = await this.readMessage();
if (msg.chunkStream.messageTypeId === RtmpMessageType.VIDEO) {
yield { packet: msg.message, codec: 'video', timestamp: msg.chunkStream.timestamp };
} else if (msg.chunkStream.messageTypeId === RtmpMessageType.AUDIO) {
yield { packet: msg.message, codec: 'audio', timestamp: msg.chunkStream.timestamp };
}
else {
this.console?.warn(`Ignoring message type ${msg.chunkStream.messageTypeId}`);
}
}
}
/**
* Connect to RTMP server
*/
private async connect(): Promise<void> {
const parsedUrl = new URL(this.url);
const host = parsedUrl.hostname;
const port = parseInt(parsedUrl.port) || 1935;
this.console?.log(`Connecting to RTMP server at ${host}:${port}`);
// Add socket event listeners
this.socket.on('close', (hadError) => {
this.console?.log(`Socket closed, hadError=${hadError}`);
});
this.socket.on('end', () => {
this.console?.log('Socket received FIN');
});
this.socket.on('error', (err) => {
this.console?.error('Socket error:', err);
});
await new Promise<void>((resolve, reject) => {
this.socket!.connect(port, host, () => {
this.console?.log('Socket connected');
resolve();
});
this.socket!.once('error', reject);
});
this.console?.log('Performing handshake...');
await this.performHandshake();
this.console?.log('Handshake complete');
}
/**
* Perform RTMP handshake
* Client sends: C0 + C1
* Server responds: S0 + S1 + S2
* Client responds: C2
*/
private async performHandshake(): Promise<void> {
if (!this.socket) throw new Error('Socket not connected');
// Send C0 (1 byte: version)
const c0 = Buffer.from([RTMP_VERSION]);
// Send C1 (1536 bytes: time[4] + zero[4] + random[1528])
const c1 = Buffer.alloc(HANDSHAKE_SIZE);
const timestamp = Math.floor(Date.now() / 1000);
c1.writeUInt32BE(timestamp, 0);
c1.writeUInt32BE(0, 4); // zero
// Send C0 + C1
this.socket.write(Buffer.concat([c0, c1]));
// Read S0 (1 byte)
const s0 = await this.readExactly(1);
const serverVersion = s0[0];
if (serverVersion !== RTMP_VERSION) {
throw new Error(`Unsupported RTMP version: ${serverVersion}`);
}
// Read S1 (1536 bytes)
const s1 = await this.readExactly(HANDSHAKE_SIZE);
const s1Time = s1.readUInt32BE(0);
// Read S2 (1536 bytes)
const s2 = await this.readExactly(HANDSHAKE_SIZE);
// Send C2 (echo of S1)
const c2 = s1;
this.socket.write(c2);
this.connected = true;
}
/**
* Parse RTMP chunks after handshake
*/
private async readMessage(): Promise<{
message: Buffer,
chunkStream: ChunkStream,
}> {
const stream = this.socket!;
while (true) {
// Read chunk basic header (1-3 bytes)
const basicHeader = await readLength(stream, 1);
const fmt = (basicHeader[0] >> 6) & 0x03;
let csId = basicHeader[0] & 0x3F;
// Handle 2-byte and 3-byte forms
if (csId === 0) {
const secondByte = await readLength(stream, 1);
csId = secondByte[0] + 64;
} else if (csId === 1) {
const bytes = await readLength(stream, 2);
csId = (bytes[1] << 8) | bytes[0] + 64;
}
// Chunk stream ID 2 is reserved for protocol control messages, but we should still parse it
// Get or create chunk stream state
let chunkStream = this.chunkStreams.get(csId);
if (!chunkStream) {
chunkStream = {
chunkStreamId: csId,
messageStreamId: 0,
messageLength: 0,
messageTypeId: 0,
timestamp: 0,
sequenceNumber: 0,
messageData: [],
totalReceived: 0,
hasExtendedTimestamp: false
};
this.chunkStreams.set(csId, chunkStream);
}
// Parse message header based on format
let timestamp: number;
let messageLength: number;
let messageTypeId: number;
let messageStreamId: number;
let hasExtendedTimestamp = false;
if (fmt === ChunkFormat.TYPE_0) {
// Type 0: 11 bytes
const header = await readLength(stream, 11);
timestamp = header.readUIntBE(0, 3);
messageLength = header.readUIntBE(3, 3);
messageTypeId = header[6];
messageStreamId = header.readUInt32LE(7);
// Update chunk stream state
chunkStream.messageStreamId = messageStreamId;
chunkStream.messageLength = messageLength;
chunkStream.messageTypeId = messageTypeId;
chunkStream.timestamp = timestamp;
chunkStream.totalReceived = 0;
chunkStream.messageData = [];
if (timestamp >= 0xFFFFFF) {
hasExtendedTimestamp = true;
chunkStream.hasExtendedTimestamp = true;
}
} else if (fmt === ChunkFormat.TYPE_1) {
// Type 1: 7 bytes
const header = await readLength(stream, 7);
const timestampDelta = header.readUIntBE(0, 3);
messageLength = header.readUIntBE(3, 3);
messageTypeId = header[6];
// Update chunk stream state
chunkStream.messageLength = messageLength;
chunkStream.messageTypeId = messageTypeId;
chunkStream.timestamp += timestampDelta;
chunkStream.totalReceived = 0;
chunkStream.messageData = [];
if (timestampDelta >= 0xFFFFFF) {
hasExtendedTimestamp = true;
chunkStream.hasExtendedTimestamp = true;
}
} else if (fmt === ChunkFormat.TYPE_2) {
// Type 2: 3 bytes
const header = await readLength(stream, 3);
const timestampDelta = header.readUIntBE(0, 3);
// Update chunk stream state
chunkStream.timestamp += timestampDelta;
chunkStream.totalReceived = 0;
chunkStream.messageData = [];
if (timestampDelta >= 0xFFFFFF) {
hasExtendedTimestamp = true;
chunkStream.hasExtendedTimestamp = true;
}
} else {
// Type 3: 0 bytes - use previous values
if (chunkStream.totalReceived === 0) {
throw new Error('Type 3 chunk but no previous chunk in stream');
}
}
// Read extended timestamp if present
if (hasExtendedTimestamp || chunkStream.hasExtendedTimestamp) {
const extTs = await readLength(stream, 4);
const extendedTimestamp = extTs.readUInt32BE(0);
if (fmt === ChunkFormat.TYPE_0) {
chunkStream.timestamp = extendedTimestamp;
} else if (fmt === ChunkFormat.TYPE_1 || fmt === ChunkFormat.TYPE_2) {
// For type 1 and 2, the extended timestamp replaces the delta
chunkStream.timestamp = chunkStream.timestamp - (fmt === ChunkFormat.TYPE_1 ? (await readLength(stream, 0)).readUIntBE(0, 3) : 0) + extendedTimestamp;
}
}
// Calculate chunk data size
const remainingInMessage = chunkStream.messageLength - chunkStream.totalReceived;
const chunkDataSize = Math.min(this.chunkSize, remainingInMessage);
const MAX_CHUNK_SIZE = 1024 * 1024;
if (chunkDataSize > MAX_CHUNK_SIZE) {
throw new Error(`Chunk size ${chunkDataSize} exceeds maximum allowed size of ${MAX_CHUNK_SIZE} bytes`);
}
// Read chunk data
const chunkData = await readLength(stream, chunkDataSize);
chunkStream.messageData.push(chunkData);
chunkStream.totalReceived += chunkDataSize;
// this.bytesReceived += 1 + (fmt === ChunkFormat.TYPE_0 ? 11 : fmt === ChunkFormat.TYPE_1 ? 7 : fmt === ChunkFormat.TYPE_2 ? 3 : 0) + (hasExtendedTimestamp || chunkStream.hasExtendedTimestamp ? 4 : 0) + chunkDataSize;
// Check if message is complete
if (chunkStream.totalReceived >= chunkStream.messageLength) {
const message = Buffer.concat(chunkStream.messageData);
chunkStream.messageData = [];
chunkStream.totalReceived = 0;
chunkStream.hasExtendedTimestamp = false;
return {
chunkStream,
message,
};
}
}
}
/**
* Read exactly n bytes from socket
*/
private async readExactly(n: number): Promise<Buffer> {
return readLength(this.socket!, n);
}
/**
* Encode value to AMF0
*/
private encodeAMF0(value: any): Buffer {
if (typeof value === 'number') {
const buf = Buffer.alloc(9);
buf[0] = 0x00; // Number marker
buf.writeDoubleBE(value, 1);
return buf;
} else if (typeof value === 'string') {
const buf = Buffer.alloc(3 + value.length);
buf[0] = 0x02; // String marker
buf.writeUInt16BE(value.length, 1);
buf.write(value, 3, 'utf8');
return buf;
} else if (typeof value === 'boolean') {
const buf = Buffer.alloc(2);
buf[0] = 0x01; // Boolean marker
buf[1] = value ? 1 : 0;
return buf;
} else if (value === null || value === undefined) {
return Buffer.from([0x05]); // Null marker
} else if (typeof value === 'object') {
// Object
const parts: Buffer[] = [Buffer.from([0x03])]; // Object marker
for (const [key, val] of Object.entries(value)) {
// Key
const keyBuf = Buffer.alloc(2 + key.length);
keyBuf.writeUInt16BE(key.length, 0);
keyBuf.write(key, 2, 'utf8');
parts.push(keyBuf);
// Value
parts.push(this.encodeAMF0(val));
}
// End of object marker
parts.push(Buffer.from([0x00, 0x00, 0x09]));
return Buffer.concat(parts);
}
throw new Error(`Unsupported AMF0 type: ${typeof value}`);
}
/**
* Encode command to AMF0
*/
private encodeAMF0Command(commandName: string, transactionId: number, commandObject: any, ...args: any[]): Buffer {
const parts: Buffer[] = [];
// Command name (string)
parts.push(this.encodeAMF0(commandName));
// Transaction ID (number)
parts.push(this.encodeAMF0(transactionId));
// Command object
parts.push(this.encodeAMF0(commandObject));
// Additional arguments
for (const arg of args) {
parts.push(this.encodeAMF0(arg));
}
return Buffer.concat(parts);
}
/**
* Send a message as RTMP chunks
*/
private sendMessage(
chunkStreamId: number,
messageStreamId: number,
messageTypeId: number,
timestamp: number,
data: Buffer
): void {
if (!this.socket) throw new Error('Socket not connected');
const chunks: Buffer[] = [];
let offset = 0;
while (offset < data.length) {
const chunkDataSize = Math.min(this.outgoingChunkSize, data.length - offset);
const isType0 = offset === 0;
// Type 0 header is 12 bytes (1 + 3 + 3 + 1 + 4)
const headerSize = isType0 ? 12 : 1;
const header = Buffer.alloc(headerSize);
// Basic header (chunk stream ID)
if (chunkStreamId < 64) {
header[0] = (isType0 ? ChunkFormat.TYPE_0 : ChunkFormat.TYPE_3) << 6 | chunkStreamId;
} else {
// Handle extended chunk stream IDs (simplified for now)
header[0] = (isType0 ? ChunkFormat.TYPE_0 : ChunkFormat.TYPE_3) << 6 | 1;
}
if (isType0) {
// Type 0 header
writeUInt24BE(header, timestamp, 1);
writeUInt24BE(header, data.length, 4);
header[7] = messageTypeId;
header.writeUInt32LE(messageStreamId, 8);
}
chunks.push(header);
chunks.push(data.subarray(offset, offset + chunkDataSize));
offset += chunkDataSize;
}
for (const chunk of chunks) {
this.socket.write(chunk);
}
}
/**
* Send connect command
*/
private async sendConnect(): Promise<void> {
const parsedUrl = new URL(this.url);
const tcUrl = `${parsedUrl.protocol}//${parsedUrl.host}/${parsedUrl.pathname.split('/')[1]}`;
const connectObject = {
app: parsedUrl.pathname.split('/')[1],
flashVer: 'LNX 9,0,124,2',
tcUrl: tcUrl,
fpad: false,
capabilities: 15,
audioCodecs: 4071,
videoCodecs: 252,
videoFunction: 1
};
const data = this.encodeAMF0Command('connect', this.transactionId++, connectObject);
this.sendMessage(3, 0, RtmpMessageType.COMMAND_AMF0, 0, data);
}
/**
* Send createStream command
*/
private async sendCreateStream(): Promise<number> {
const data = this.encodeAMF0Command('createStream', this.transactionId++, null);
this.sendMessage(3, 0, RtmpMessageType.COMMAND_AMF0, 0, data);
return 1;
}
/**
* Send play command
*/
private sendPlay(streamId: number, playPath: string): void {
const data = this.encodeAMF0Command('play', this.transactionId++, null, playPath, -2000);
this.sendMessage(4, streamId, RtmpMessageType.COMMAND_AMF0, 0, data);
}
/**
* Send setBufferLength user control
*/
private setBufferLength(streamId: number, bufferLength: number): void {
const data = Buffer.alloc(10);
data.writeUInt16BE(3, 0);
data.writeUInt32BE(streamId, 2);
data.writeUInt32BE(bufferLength, 6);
this.sendMessage(2, 0, RtmpMessageType.USER_CONTROL, 1, data);
}
/**
* Send window acknowledgement size
*/
private sendWindowAckSize(windowSize: number): void {
const data = Buffer.alloc(4);
data.writeUInt32BE(windowSize, 0);
this.sendMessage(2, 0, RtmpMessageType.WINDOW_ACKNOWLEDGEMENT_SIZE, 0, data);
}
/**
* Destroy the connection
*/
destroy() {
if (this.socket) {
this.socket.destroy();
this.socket = null;
}
this.connected = false;
}
}

View File

@@ -0,0 +1,224 @@
import { RTSP_FRAME_MAGIC } from "@scrypted/common/src/rtsp-server";
import { StreamChunk } from "@scrypted/common/src/stream-parser";
import { ResponseMediaStreamOptions } from "@scrypted/sdk";
import { EventEmitter } from "stream";
import { RtpHeader, RtpPacket } from '../../../external/werift/packages/rtp/src/rtp/rtp';
import { H264Repacketizer } from "../../homekit/src/types/camera/h264-packetizer";
import { addRtpTimestamp, nextSequenceNumber } from "../../homekit/src/types/camera/jitter-buffer";
import { createAACRTPPayload } from "./au";
import { ParserSession, setupActivityTimer } from "./ffmpeg-rebroadcast";
import { parseFlvAudioTag, parseFlvVideoTag, VideoCodecId } from "./flv";
import { negotiateMediaStream } from "./rfc4571";
import { RtmpClient } from "./rtmp-client";
export type RtspChannelCodecMapping = { [key: number]: string };
export interface RtspSessionParserSpecific {
interleaved: Map<string, number>;
}
export async function startRtmpSession(console: Console, url: string, mediaStreamOptions: ResponseMediaStreamOptions, options: {
audioSoftMuted: boolean,
rtspRequestTimeout: number,
}): Promise<ParserSession<"rtsp">> {
let isActive = true;
const events = new EventEmitter();
// need this to prevent kill from throwing due to uncaught Error during cleanup
events.on('error', () => { });
const rtmpClient = new RtmpClient(url, console);
const cleanupSockets = () => {
rtmpClient.destroy();
}
let sessionKilled: any;
const killed = new Promise<void>(resolve => {
sessionKilled = resolve;
});
const kill = (error?: Error) => {
if (isActive) {
events.emit('killed');
events.emit('error', error || new Error('killed'));
}
isActive = false;
sessionKilled();
cleanupSockets();
};
rtmpClient.socket.on('close', () => {
kill(new Error('rtmp socket closed'));
});
rtmpClient.socket.on('error', e => {
kill(e);
});
const { resetActivityTimer } = setupActivityTimer('rtsp', kill, events, options?.rtspRequestTimeout);
try {
await rtmpClient.setup();
let sdp = `v=0
o=- 0 0 IN IP4 0.0.0.0
s=-
t=0 0
m=video 0 RTP/AVP 96
a=control:streamid=0
a=rtpmap:96 H264/90000`;
if (!options?.audioSoftMuted) {
sdp += `
m=audio 0 RTP/AVP 97
a=control:streamid=2
a=rtpmap:97 MPEG4-GENERIC/16000/1
a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=1408`;
}
sdp = sdp.split('\n').join('\r\n');
const start = async () => {
try {
let audioSequenceNumber = 0;
let videoSequenceNumber = 0;
const h264Repacketizer = new H264Repacketizer(console, 32000);
for await (const rtmpPacket of rtmpClient.readLoop()) {
if (!isActive)
break;
resetActivityTimer?.();
if (rtmpPacket.codec === 'audio') {
if (options?.audioSoftMuted)
continue;
const flv = parseFlvAudioTag(rtmpPacket.packet);
if (!flv.data?.length)
continue;
const header = new RtpHeader({
sequenceNumber: audioSequenceNumber,
timestamp: addRtpTimestamp(0, Math.floor(rtmpPacket.timestamp / 1000 * 16000)),
payloadType: 97,
marker: false,
});
audioSequenceNumber = nextSequenceNumber(audioSequenceNumber);
const audioPayload = createAACRTPPayload([flv.data]);
const rtp = new RtpPacket(header, audioPayload).serialize();
const prefix = Buffer.alloc(2);
prefix[0] = RTSP_FRAME_MAGIC;
prefix[1] = 2;
const length = Buffer.alloc(2);
length.writeUInt16BE(rtp.length, 0);
events.emit('rtsp', {
chunks: [Buffer.concat([prefix, length]), rtp],
type: 'aac',
});
continue;
}
if (rtmpPacket.codec !== 'video')
throw new Error('unknown rtmp codec ' + rtmpPacket.codec);
const flv = parseFlvVideoTag(rtmpPacket.packet);
if (flv.codecId !== VideoCodecId.H264)
throw new Error('unsupported rtmp video codec ' + flv.codecId);
const prefix = Buffer.alloc(2);
prefix[0] = RTSP_FRAME_MAGIC;
prefix[1] = 0;
const nalus: Buffer[] = [];
if (flv.nalus) {
nalus.push(...flv.nalus);
}
else if (flv.avcDecoderConfigurationRecord?.sps && flv.avcDecoderConfigurationRecord.pps) {
// make sure there's only one
if (flv.avcDecoderConfigurationRecord.sps.length > 1 || flv.avcDecoderConfigurationRecord.pps.length > 1)
throw new Error('rtmp sps/pps contains multiple nalus, only using the first of each');
nalus.push(flv.avcDecoderConfigurationRecord.sps[0]);
nalus.push(flv.avcDecoderConfigurationRecord.pps[0]);
}
else {
throw new Error('rtmp h264 nalus missing');
}
for (const nalu of nalus) {
const header = new RtpHeader({
sequenceNumber: videoSequenceNumber,
timestamp: addRtpTimestamp(0, Math.floor(rtmpPacket.timestamp / 1000 * 90000)),
payloadType: 96,
marker: true,
});
videoSequenceNumber = nextSequenceNumber(videoSequenceNumber);
const rtp = new RtpPacket(header, nalu);
const packets = h264Repacketizer.repacketize(rtp);
for (const packet of packets) {
const length = Buffer.alloc(2);
const rtp = packet.serialize();
length.writeUInt16BE(rtp.length, 0);
events.emit('rtsp', {
chunks: [Buffer.concat([prefix, length]), rtp],
type: 'h264',
});
}
}
}
}
catch (e) {
kill(e);
}
finally {
kill(new Error('rtsp read loop exited'));
}
};
// this return block is intentional, to ensure that the remaining code happens sync.
return (() => {
return {
start,
sdp: Promise.resolve(sdp),
get isActive() { return isActive },
kill(error?: Error) {
kill(error);
},
killed,
resetActivityTimer,
negotiateMediaStream: (requestMediaStream, inputVideoCodec, inputAudioCodec) => {
return negotiateMediaStream(sdp, mediaStreamOptions, inputVideoCodec, inputAudioCodec, requestMediaStream);
},
emit(container: 'rtsp', chunk: StreamChunk) {
events.emit(container, chunk);
return this;
},
on(event: string, cb: any) {
events.on(event, cb);
return this;
},
once(event: any, cb: any) {
events.once(event, cb);
return this;
},
removeListener(event, cb) {
events.removeListener(event, cb);
return this;
}
}
})();
}
catch (e) {
cleanupSockets();
throw e;
}
}