diff --git a/plugins/prebuffer-mixin/package.json b/plugins/prebuffer-mixin/package.json index 11b737da4..d3d11fc03 100644 --- a/plugins/prebuffer-mixin/package.json +++ b/plugins/prebuffer-mixin/package.json @@ -37,6 +37,7 @@ "realfs": true }, "dependencies": { + "@koush/werift-src": "file:../../external/werift", "@scrypted/common": "file:../../common", "@scrypted/sdk": "file:../../sdk", "h264-sps-parser": "^0.2.1", diff --git a/plugins/prebuffer-mixin/src/au.ts b/plugins/prebuffer-mixin/src/au.ts new file mode 100644 index 000000000..162425c8b --- /dev/null +++ b/plugins/prebuffer-mixin/src/au.ts @@ -0,0 +1,119 @@ +/** + * Creates an AU header for AAC frames in MPEG-4 Generic format (RTP) + * + * @param frameSize - Size of the AAC frame in bytes + * @param auIndex - AU index (default 0 for continuous streams) + * @param sizeLength - Number of bits for frame size field (default 13) + * @param indexLength - Number of bits for AU index field (default 3) + * @returns The AU header as a Buffer + */ +export function createAUHeader( + frameSize: number, + auIndex: number = 0, + sizeLength: number = 13, + indexLength: number = 3 +): Buffer { + // Calculate total header bits and bytes + const totalBits = sizeLength + indexLength; + const totalBytes = Math.ceil(totalBits / 8); + + // Validate inputs + if (frameSize < 0 || frameSize > ((1 << sizeLength) - 1)) { + throw new Error(`Frame size ${frameSize} is too large for sizeLength ${sizeLength} (max ${(1 << sizeLength) - 1})`); + } + + if (auIndex < 0 || auIndex > ((1 << indexLength) - 1)) { + throw new Error(`AU index ${auIndex} is too large for indexLength ${indexLength} (max ${(1 << indexLength) - 1})`); + } + + // Combine size and index into a single value + const combinedValue = (frameSize << indexLength) | auIndex; + + // Convert to bytes (little-endian within the multi-byte value) + const header = new Buffer(totalBytes); + for (let i = 0; i < totalBytes; i++) { + header[i] = (combinedValue >> ((totalBytes - 1 - i) * 8)) & 0xFF; + } + + return header; +} + +/** + * Creates the AU-header-length field (precedes the AU headers in RTP payload) + * + * @param totalAUHeadersBytes - Total bytes of all AU headers combined + * @returns AU-header-length as a 2-byte Buffer (big-endian) + */ +export function createAUHeaderLength(totalAUHeadersBytes: number): Buffer { + const headerLengthBits = totalAUHeadersBytes * 8; + + if (headerLengthBits > 65535) { + throw new Error('Total AU header bits exceeds 16-bit limit'); + } + + // AU-header-length is a 16-bit integer in network byte order (big-endian) + const lengthHeader = new Buffer(2); + lengthHeader[0] = (headerLengthBits >> 8) & 0xFF; + lengthHeader[1] = headerLengthBits & 0xFF; + + return lengthHeader; +} + +/** + * Given raw AAC frames, creates the complete RTP payload with AU headers + * + * @param frames - Array of raw AAC frames (no ADTS headers) + * @param sizeLength - Number of bits for frame size field (default 13) + * @param indexLength - Number of bits for AU index field (default 3) + * @returns Complete RTP payload (AU-header-length + AU headers + raw frames) + */ +export function createAACRTPPayload( + frames: Buffer[], + sizeLength: number = 13, + indexLength: number = 3 +): Buffer { + if (frames.length === 0) { + throw new Error('No frames provided'); + } + + // Create AU headers for all frames + const auHeaders: Buffer[] = []; + let totalAUHeaderBytes = 0; + + for (let i = 0; i < frames.length; i++) { + const auHeader = createAUHeader(frames[i].length, 0, sizeLength, indexLength); + auHeaders.push(auHeader); + totalAUHeaderBytes += auHeader.length; + } + + // Create AU-header-length field + const headerLengthField = createAUHeaderLength(totalAUHeaderBytes); + + // Calculate total payload size + let totalSize = headerLengthField.length + totalAUHeaderBytes; + for (const frame of frames) { + totalSize += frame.length; + } + + // Assemble the payload + const payload = new Buffer(totalSize); + let offset = 0; + + // Copy AU-header-length + payload.set(headerLengthField, offset); + offset += headerLengthField.length; + + // Copy AU headers + for (const header of auHeaders) { + payload.set(header, offset); + offset += header.length; + } + + // Copy raw AAC frames + for (const frame of frames) { + payload.set(frame, offset); + offset += frame.length; + } + + return payload; +} diff --git a/plugins/prebuffer-mixin/src/flv.ts b/plugins/prebuffer-mixin/src/flv.ts new file mode 100644 index 000000000..50b5f1a05 --- /dev/null +++ b/plugins/prebuffer-mixin/src/flv.ts @@ -0,0 +1,408 @@ +/** + * FLV Audio/Video tag payload parser + * RTMP messages for audio (type 8) and video (type 9) contain FLV tag payloads + */ + +// ============================================================================ +// Video Tag Types (in FLV header, byte 0, low nibble) +// ============================================================================ +export enum VideoCodecId { + JPEG = 1, + SORENSON_H263 = 2, + SCREEN_VIDEO = 3, + ON2_VP6 = 4, + ON2_VP6_WITH_ALPHA = 5, + SCREEN_VIDEO_V2 = 6, + H264 = 7, +} + +// ============================================================================ +// Video Frame Types (in FLV header, byte 0, high nibble) +// ============================================================================ +export enum VideoFrameType { + KEY = 1, // Keyframe (I-frame) + INTER = 2, // Inter frame (P-frame) + DISPOSABLE_INTER = 3, // Disposable inter frame + GENERATED_KEYFRAME = 4, + VIDEO_INFO = 5, // Video info/command frame +} + +// ============================================================================ +// AVC Packet Types (byte 1 for H.264 codec) +// ============================================================================ +export enum AVC_PACKET_TYPE { + SEQUENCE_HEADER = 0, // AVC sequence header (decoder configuration) + NALU = 1, // AVC NALU unit + END_OF_SEQUENCE = 2, // AVC end of sequence +} + +// ============================================================================ +// Audio Sound Formats (in FLV header, byte 0, top 4 bits) +// ============================================================================ +export enum AudioSoundFormat { + PCM_BE = 0, + ADPCM = 1, + MP3 = 2, + PCM_LE = 3, + NELLYMOSER_16K = 4, + NELLYMOSER_8K = 5, + NELLYMOSER = 6, + G711_A = 7, + G711_U = 8, + AAC = 10, + SPEEX = 11, + MP3_8K = 14, +} + +// ============================================================================ +// Audio Sound Rates (in FLV header, byte 0, bits 2-3) +// ============================================================================ +export enum AudioSoundRate { + _5_5KHZ = 0, + _11KHZ = 1, + _22KHZ = 2, + _44KHZ = 3, +} + +// ============================================================================ +// Audio Sound Size (in FLV header, byte 0, bit 1) +// ============================================================================ +export enum AudioSoundSize { + SAMPLE_8BIT = 0, + SAMPLE_16BIT = 1, +} + +// ============================================================================ +// Audio Sound Type (in FLV header, byte 0, bit 0) +// ============================================================================ +export enum AudioSoundType { + MONO = 0, + STEREO = 1, +} + +// ============================================================================ +// AAC Packet Types (byte 1 for AAC codec) +// ============================================================================ +export enum AAC_PACKET_TYPE { + SEQUENCE_HEADER = 0, // AAC sequence header (AudioSpecificConfig) + RAW = 1, // AAC raw data +} + +// ============================================================================ +// Parsed Video Tag Structure +// ============================================================================ +export interface FlvVideoTag { + frameType: VideoFrameType; + codecId: VideoCodecId; + + // H.264 specific + avcPacketType?: AVC_PACKET_TYPE; + compositionTime?: number; + + // H.264 sequence header + avcDecoderConfigurationRecord?: { + configurationVersion: number; + avcProfileIndication: number; + profileCompatibility: number; + avcLevelIndication: number; + lengthSizeMinusOne: number; // NALU length = (value & 0x03) + 1 + sps: Buffer[]; // Sequence parameter sets + pps: Buffer[]; // Picture parameter sets + }; + + // H.264 NALU data + nalus?: Buffer[]; + + // Raw payload (for non-H.264 codecs) + rawPayload?: Buffer; +} + +// ============================================================================ +// Parsed Audio Tag Structure +// ============================================================================ +export interface FlvAudioTag { + soundFormat: AudioSoundFormat; + soundRate: AudioSoundRate; + soundSize: AudioSoundSize; + soundType: AudioSoundType; + + // AAC specific + aacPacketType?: AAC_PACKET_TYPE; + + // AAC sequence header (AudioSpecificConfig) + audioSpecificConfig?: { + audioObjectType: number; + samplingFrequencyIndex: number; + channelConfiguration: number; + }; + + // Raw audio data + data: Buffer; +} + +// ============================================================================ +// Parser Result +// ============================================================================ +export type FlvTag = FlvVideoTag | FlvAudioTag; + +// ============================================================================ +// Parse AVCDecoderConfigurationRecord (H.264 decoder configuration) +// ============================================================================ +function parseAVCDecoderConfigurationRecord(buffer: Buffer, offset: number, length: number): { + config: FlvVideoTag['avcDecoderConfigurationRecord'], + bytesConsumed: number +} { + if (length < 6) { + throw new Error('AVCDecoderConfigurationRecord too short'); + } + + const config: FlvVideoTag['avcDecoderConfigurationRecord'] = { + configurationVersion: buffer[offset], + avcProfileIndication: buffer[offset + 1], + profileCompatibility: buffer[offset + 2], + avcLevelIndication: buffer[offset + 3], + lengthSizeMinusOne: buffer[offset + 4] & 0x03, + sps: [], + pps: [], + }; + + const numSPS = buffer[offset + 5] & 0x1F; + let pos = offset + 6; + + // Parse SPS + for (let i = 0; i < numSPS; i++) { + if (pos + 2 > buffer.length) { + throw new Error('AVCDecoderConfigurationRecord truncated reading SPS length'); + } + const spsLength = buffer.readUInt16BE(pos); + pos += 2; + + if (pos + spsLength > buffer.length) { + throw new Error(`AVCDecoderConfigurationRecord: SPS data exceeds buffer length`); + } + + config.sps.push(buffer.subarray(pos, pos + spsLength)); + pos += spsLength; + } + + // Parse PPS + if (pos >= buffer.length) { + return { config, bytesConsumed: pos - offset }; + } + + const numPPS = buffer[pos]; + pos++; + + for (let i = 0; i < numPPS; i++) { + if (pos + 2 > buffer.length) { + throw new Error('AVCDecoderConfigurationRecord truncated reading PPS length'); + } + const ppsLength = buffer.readUInt16BE(pos); + pos += 2; + + if (pos + ppsLength > buffer.length) { + throw new Error(`AVCDecoderConfigurationRecord: PPS data exceeds buffer length`); + } + + config.pps.push(buffer.subarray(pos, pos + ppsLength)); + pos += ppsLength; + } + + return { config, bytesConsumed: pos - offset }; +} + +// ============================================================================ +// Parse H.264 NALU units from AVCPacketType=1 payload +// The NALUs are preceded by length fields (size = lengthSizeMinusOne + 1) +// ============================================================================ +function parseNALUUnits(buffer: Buffer, offset: number, length: number, naluLengthSize: number): Buffer[] { + const nalus: Buffer[] = []; + let pos = offset; + + if (naluLengthSize < 1 || naluLengthSize > 4) { + throw new Error(`Invalid NALU length size: ${naluLengthSize}`); + } + + while (pos + naluLengthSize <= offset + length) { + let naluLength = 0; + for (let i = 0; i < naluLengthSize; i++) { + naluLength = (naluLength << 8) | buffer[pos + i]; + } + pos += naluLengthSize; + + if (naluLength === 0) { + continue; // Skip zero-length NALUs + } + + if (pos + naluLength > offset + length) { + throw new Error(`NALU data exceeds buffer length at position ${pos}`); + } + + nalus.push(buffer.subarray(pos, pos + naluLength)); + pos += naluLength; + } + + return nalus; +} + +// ============================================================================ +// Parse AudioSpecificConfig (AAC decoder configuration) +// ============================================================================ +function parseAudioSpecificConfig(buffer: Buffer, offset: number, length: number): { + aacConfig: FlvAudioTag['audioSpecificConfig'], + bytesConsumed: number +} { + if (length < 2) { + throw new Error('AudioSpecificConfig too short'); + } + + // AudioSpecificConfig is 2+ bytes, bit-packed + const byte0 = buffer[offset]; + const byte1 = buffer[offset + 1]; + + const aacConfig: FlvAudioTag['audioSpecificConfig'] = { + audioObjectType: (byte0 >> 3) & 0x1F, + samplingFrequencyIndex: ((byte0 & 0x07) << 1) | ((byte1 >> 7) & 0x01), + channelConfiguration: (byte1 >> 3) & 0x0F, + }; + + return { aacConfig, bytesConsumed: 2 }; +} + +// ============================================================================ +// Parse FLV Video Tag Payload +// ============================================================================ +export function parseFlvVideoTag(buffer: Buffer): FlvVideoTag { + if (buffer.length < 1) { + throw new Error('Video tag too short'); + } + + const byte0 = buffer[0]; + const frameType = (byte0 >> 4) as VideoFrameType; + const codecId = (byte0 & 0x0F) as VideoCodecId; + + const result: FlvVideoTag = { + frameType, + codecId, + }; + + if (codecId === VideoCodecId.H264) { + // H.264/AVC codec + if (buffer.length < 5) { + throw new Error('H.264 video tag too short'); + } + + result.avcPacketType = buffer[1] as AVC_PACKET_TYPE; + result.compositionTime = buffer.readIntBE(2, 3); + + switch (result.avcPacketType) { + case AVC_PACKET_TYPE.SEQUENCE_HEADER: { + const data = buffer.subarray(5); + const parsed = parseAVCDecoderConfigurationRecord(data, 0, data.length); + result.avcDecoderConfigurationRecord = parsed.config; + break; + } + + case AVC_PACKET_TYPE.NALU: { + // Need to know NALU length size from the sequence header + // We'll assume 4 bytes (most common) if not provided + const naluLengthSize = 4; + const data = buffer.subarray(5); + result.nalus = parseNALUUnits(data, 0, data.length, naluLengthSize); + break; + } + + case AVC_PACKET_TYPE.END_OF_SEQUENCE: + // No payload + break; + } + } else { + // Other video codecs - just return raw payload + result.rawPayload = buffer.subarray(1); + } + + return result; +} + +// ============================================================================ +// Parse FLV Audio Tag Payload +// ============================================================================ +export function parseFlvAudioTag(buffer: Buffer): FlvAudioTag { + if (buffer.length < 1) { + throw new Error('Audio tag too short'); + } + + const byte0 = buffer[0]; + const soundFormat: AudioSoundFormat = (byte0 >> 4) & 0x0F; + const soundRate: AudioSoundRate = (byte0 >> 2) & 0x03; + const soundSize: AudioSoundSize = (byte0 >> 1) & 0x01; + const soundType: AudioSoundType = byte0 & 0x01; + + const result: FlvAudioTag = { + soundFormat, + soundRate, + soundSize, + soundType, + data: Buffer.alloc(0), + }; + + if (soundFormat === AudioSoundFormat.AAC) { + if (buffer.length < 2) { + throw new Error('AAC audio tag too short'); + } + + result.aacPacketType = buffer[1] as AAC_PACKET_TYPE; + + if (result.aacPacketType === AAC_PACKET_TYPE.SEQUENCE_HEADER) { + const data = buffer.subarray(2); + const parsed = parseAudioSpecificConfig(data, 0, data.length); + result.audioSpecificConfig = parsed.aacConfig; + } else { + result.data = buffer.subarray(2); + } + } else { + // Raw audio data for other formats + result.data = buffer.subarray(1); + } + + return result; +} + +// ============================================================================ +// Parse FLV Tag (auto-detect video or audio based on codec/format) +// This function requires you to know the RTMP message type (8=audio, 9=video) +// ============================================================================ +export function parseFlvTag(buffer: Buffer, messageType: number): FlvTag { + if (messageType === 9) { + return parseFlvVideoTag(buffer); + } else if (messageType === 8) { + return parseFlvAudioTag(buffer); + } else { + throw new Error(`Unsupported message type for FLV parsing: ${messageType}`); + } +} + +// ============================================================================ +// Parse H.264 NALU unit type (5-bit value in first byte's low bits) +// ============================================================================ +export function parseNALUHeader(buffer: Buffer): number { + if (buffer.length < 1) { + throw new Error('NALU too short'); + } + return buffer[0] & 0x1F; +} + +// ============================================================================ +// Helper: Format H.264 NALU unit type name +// ============================================================================ +export function getNALUTypeName(nalcType: number): string { + const types: Record = { + 1: 'slice_layer_without_partitioning_non_idr', + 5: 'slice_layer_without_partitioning_idr', + 6: 'sei', + 7: 'seq_parameter_set', + 8: 'pic_parameter_set', + 9: 'access_unit_delimiter', + }; + return types[nalcType] || `unknown (${nalcType})`; +} \ No newline at end of file diff --git a/plugins/prebuffer-mixin/src/main.ts b/plugins/prebuffer-mixin/src/main.ts index 3ec60d713..15ecaeef9 100644 --- a/plugins/prebuffer-mixin/src/main.ts +++ b/plugins/prebuffer-mixin/src/main.ts @@ -19,6 +19,7 @@ import { FileRtspServer } from './file-rtsp-server'; import { getUrlLocalAdresses } from './local-addresses'; import { REBROADCAST_MIXIN_INTERFACE_TOKEN } from './rebroadcast-mixin-token'; import { connectRFC4571Parser, startRFC4571Parser } from './rfc4571'; +import { startRtmpSession } from './rtmp-session'; import { RtspSessionParserSpecific, startRtspSession } from './rtsp-session'; import { getSpsResolution } from './sps-resolution'; import { createStreamSettings } from './stream-settings'; @@ -187,13 +188,17 @@ class PrebufferSession { return mediaStreamOptions?.container?.startsWith('rtsp'); } + canUseRtmpParser(mediaStreamOptions: MediaStreamOptions) { + return mediaStreamOptions?.container?.startsWith('rtmp'); + } + getParser(mediaStreamOptions: MediaStreamOptions) { let parser: string; let rtspParser = this.storage.getItem(this.rtspParserKey); let isDefault = !rtspParser || rtspParser === 'Default'; - if (!this.canUseRtspParser(mediaStreamOptions)) { + if (!this.canUseRtspParser(mediaStreamOptions) && !this.canUseRtmpParser(mediaStreamOptions)) { parser = STRING_DEFAULT; isDefault = true; rtspParser = undefined; @@ -340,7 +345,7 @@ class PrebufferSession { let usingFFmpeg = true; - if (this.canUseRtspParser(this.advertisedMediaStreamOptions)) { + if (this.canUseRtspParser(this.advertisedMediaStreamOptions) || this.canUseRtmpParser(this.advertisedMediaStreamOptions)) { const parser = this.getParser(this.advertisedMediaStreamOptions); const defaultValue = parser.parser; @@ -539,14 +544,26 @@ class PrebufferSession { this.usingScryptedUdpParser = parser === SCRYPTED_PARSER_UDP; if (this.usingScryptedParser) { - const rtspParser = createRtspParser(); - rbo.parsers.rtsp = rtspParser; + if (this.canUseRtmpParser(sessionMso)) { + // rtmp becomes repackaged as rtsp + const rtspParser = createRtspParser(); + rbo.parsers.rtsp = rtspParser; - session = await startRtspSession(this.console, ffmpegInput.url, ffmpegInput.mediaStreamOptions, { - useUdp: parser === SCRYPTED_PARSER_UDP, - audioSoftMuted, - rtspRequestTimeout: 10000, - }); + session = await startRtmpSession(this.console, ffmpegInput.url, ffmpegInput.mediaStreamOptions, { + audioSoftMuted, + rtspRequestTimeout: 10000, + }); + } + else { + const rtspParser = createRtspParser(); + rbo.parsers.rtsp = rtspParser; + + session = await startRtspSession(this.console, ffmpegInput.url, ffmpegInput.mediaStreamOptions, { + useUdp: parser === SCRYPTED_PARSER_UDP, + audioSoftMuted, + rtspRequestTimeout: 10000, + }); + } } else { let acodec: string[]; diff --git a/plugins/prebuffer-mixin/src/rtmp-client.ts b/plugins/prebuffer-mixin/src/rtmp-client.ts new file mode 100644 index 000000000..afbf9fdc3 --- /dev/null +++ b/plugins/prebuffer-mixin/src/rtmp-client.ts @@ -0,0 +1,604 @@ +import { readLength } from '@scrypted/common/src/read-stream'; +import { Socket } from 'net'; + +function writeUInt24BE(buffer: Buffer, value: number, offset: number): void { + buffer[offset] = (value >> 16) & 0xFF; + buffer[offset + 1] = (value >> 8) & 0xFF; + buffer[offset + 2] = value & 0xFF; +} + +// Constants +const HANDSHAKE_SIZE = 1536; +const RTMP_VERSION = 3; + +// Chunk format types +enum ChunkFormat { + TYPE_0 = 0, + TYPE_1 = 1, + TYPE_2 = 2, + TYPE_3 = 3 +} + +// RTMP message types +enum RtmpMessageType { + CHUNK_SIZE = 1, + ABORT = 2, + ACKNOWLEDGEMENT = 3, + USER_CONTROL = 4, + WINDOW_ACKNOWLEDGEMENT_SIZE = 5, + SET_PEER_BANDWIDTH = 6, + AUDIO = 8, + VIDEO = 9, + DATA_AMF0 = 18, + COMMAND_AMF0 = 20 +} + +// Control messages +export class SetChunkSize { + constructor(public chunkSize: number) { } +} + +export class UserControlSetBufferLength { + constructor(public streamId: number, public bufferLength: number) { } +} + +export interface CreateStreamResult { + streamId: number; +} + +export interface OnStatusResult { + level: string; + code: string; + description: string; +} + +interface ChunkStream { + chunkStreamId: number; + messageStreamId: number; + messageLength: number; + messageTypeId: number; + timestamp: number; + sequenceNumber: number; + messageData: Buffer[]; + totalReceived: number; + hasExtendedTimestamp: boolean; +} + +export class RtmpClient { + socket: Socket | null = null; + private chunkSize: number = 128; + private outgoingChunkSize: number = 128; + private windowAckSize: number = 5000000; + private streamId: number = 0; + private connected: boolean = false; + private receivedWindowBytes: number = 0; + private transactionId: number = 1; + private chunkStreams: Map = new Map(); + + constructor(public url: string, public console?: Console) { + this.socket = new Socket(); + + } + + async setup() { + this.console?.log('Starting stream()...'); + await this.connect(); + + // Send connect command + this.console?.log('Sending connect command...'); + await this.sendConnect(); + this.console?.log('Connect command sent'); + + while (true) { + const msg = await this.readMessage(); + const { messageTypeId } = msg.chunkStream; + if (messageTypeId === RtmpMessageType.WINDOW_ACKNOWLEDGEMENT_SIZE) { + continue; + } + if (messageTypeId === RtmpMessageType.SET_PEER_BANDWIDTH) { + continue; + } + if (messageTypeId === RtmpMessageType.CHUNK_SIZE) { + const newChunkSize = msg.message.readUInt32BE(0); + this.console?.log(`Server set chunk size to ${newChunkSize}`); + this.chunkSize = newChunkSize; + continue; + } + if (messageTypeId === RtmpMessageType.COMMAND_AMF0) { + // Parse AMF0 command + // For simplicity, we only handle _result for connect here + const commandName = msg.message.subarray(3, 10).toString('utf8'); + if (commandName === '_result') { + this.console?.log('Received _result for connect'); + break; + } + throw new Error(`Unexpected command: ${commandName}`); + } + throw new Error(`Unexpected message type: ${messageTypeId}`); + } + + // Send window acknowledgement size + this.sendWindowAckSize(5000000); + + // Send createStream + this.console?.log('Sending createStream...'); + this.streamId = await this.sendCreateStream(); + + // Wait for _result for createStream + const createStreamResult = await this.readMessage(); + // check it + const { messageTypeId } = createStreamResult.chunkStream; + if (messageTypeId !== RtmpMessageType.COMMAND_AMF0) { + throw new Error(`Unexpected message type waiting for createStream result: ${messageTypeId}, expected COMMAND_AMF0`); + } + this.console?.log('Got createStream _result'); + + // Send getStreamLength then play (matching ffmpeg's order) + const parsedUrl = new URL(this.url); + // Extract stream name (after /app/) + const parts = parsedUrl.pathname.split('/'); + const streamName = parts.length > 2 ? parts.slice(2).join('/') : ''; + const playPath = streamName + parsedUrl.search; + + this.console?.log('Sending getStreamLength with path:', playPath); + const getStreamLengthData = this.encodeAMF0Command('getStreamLength', this.transactionId++, null, playPath); + this.sendMessage(5, 0, RtmpMessageType.COMMAND_AMF0, 0, getStreamLengthData); + + this.console?.log('Sending play command with path:', playPath); + this.sendPlay(this.streamId, playPath); + + this.console?.log('Sending setBufferLength...'); + this.setBufferLength(this.streamId, 3000); + } + + /** + * Connect to the RTMP server and start streaming + */ + async *readLoop(): AsyncGenerator<{ + packet: Buffer, + codec: string, + timestamp: number, + }> { + this.console?.log('Starting to yield video/audio packets...'); + // Just yield video/audio packets as they arrive + while (true) { + const msg = await this.readMessage(); + if (msg.chunkStream.messageTypeId === RtmpMessageType.VIDEO) { + yield { packet: msg.message, codec: 'video', timestamp: msg.chunkStream.timestamp }; + } else if (msg.chunkStream.messageTypeId === RtmpMessageType.AUDIO) { + yield { packet: msg.message, codec: 'audio', timestamp: msg.chunkStream.timestamp }; + } + else { + this.console?.warn(`Ignoring message type ${msg.chunkStream.messageTypeId}`); + } + } + } + + /** + * Connect to RTMP server + */ + private async connect(): Promise { + const parsedUrl = new URL(this.url); + const host = parsedUrl.hostname; + const port = parseInt(parsedUrl.port) || 1935; + + this.console?.log(`Connecting to RTMP server at ${host}:${port}`); + + // Add socket event listeners + this.socket.on('close', (hadError) => { + this.console?.log(`Socket closed, hadError=${hadError}`); + }); + this.socket.on('end', () => { + this.console?.log('Socket received FIN'); + }); + this.socket.on('error', (err) => { + this.console?.error('Socket error:', err); + }); + + await new Promise((resolve, reject) => { + this.socket!.connect(port, host, () => { + this.console?.log('Socket connected'); + resolve(); + }); + + this.socket!.once('error', reject); + }); + + this.console?.log('Performing handshake...'); + await this.performHandshake(); + this.console?.log('Handshake complete'); + } + + /** + * Perform RTMP handshake + * Client sends: C0 + C1 + * Server responds: S0 + S1 + S2 + * Client responds: C2 + */ + private async performHandshake(): Promise { + if (!this.socket) throw new Error('Socket not connected'); + + // Send C0 (1 byte: version) + const c0 = Buffer.from([RTMP_VERSION]); + + // Send C1 (1536 bytes: time[4] + zero[4] + random[1528]) + const c1 = Buffer.alloc(HANDSHAKE_SIZE); + const timestamp = Math.floor(Date.now() / 1000); + c1.writeUInt32BE(timestamp, 0); + c1.writeUInt32BE(0, 4); // zero + + // Send C0 + C1 + this.socket.write(Buffer.concat([c0, c1])); + + // Read S0 (1 byte) + const s0 = await this.readExactly(1); + const serverVersion = s0[0]; + if (serverVersion !== RTMP_VERSION) { + throw new Error(`Unsupported RTMP version: ${serverVersion}`); + } + + // Read S1 (1536 bytes) + const s1 = await this.readExactly(HANDSHAKE_SIZE); + const s1Time = s1.readUInt32BE(0); + + // Read S2 (1536 bytes) + const s2 = await this.readExactly(HANDSHAKE_SIZE); + + // Send C2 (echo of S1) + const c2 = s1; + + this.socket.write(c2); + + this.connected = true; + } + + /** + * Parse RTMP chunks after handshake + */ + private async readMessage(): Promise<{ + message: Buffer, + chunkStream: ChunkStream, + }> { + const stream = this.socket!; + + while (true) { + // Read chunk basic header (1-3 bytes) + const basicHeader = await readLength(stream, 1); + const fmt = (basicHeader[0] >> 6) & 0x03; + let csId = basicHeader[0] & 0x3F; + + // Handle 2-byte and 3-byte forms + if (csId === 0) { + const secondByte = await readLength(stream, 1); + csId = secondByte[0] + 64; + } else if (csId === 1) { + const bytes = await readLength(stream, 2); + csId = (bytes[1] << 8) | bytes[0] + 64; + } + + // Chunk stream ID 2 is reserved for protocol control messages, but we should still parse it + + // Get or create chunk stream state + let chunkStream = this.chunkStreams.get(csId); + if (!chunkStream) { + chunkStream = { + chunkStreamId: csId, + messageStreamId: 0, + messageLength: 0, + messageTypeId: 0, + timestamp: 0, + sequenceNumber: 0, + messageData: [], + totalReceived: 0, + hasExtendedTimestamp: false + }; + this.chunkStreams.set(csId, chunkStream); + } + + // Parse message header based on format + let timestamp: number; + let messageLength: number; + let messageTypeId: number; + let messageStreamId: number; + let hasExtendedTimestamp = false; + + if (fmt === ChunkFormat.TYPE_0) { + // Type 0: 11 bytes + const header = await readLength(stream, 11); + timestamp = header.readUIntBE(0, 3); + messageLength = header.readUIntBE(3, 3); + messageTypeId = header[6]; + messageStreamId = header.readUInt32LE(7); + + // Update chunk stream state + chunkStream.messageStreamId = messageStreamId; + chunkStream.messageLength = messageLength; + chunkStream.messageTypeId = messageTypeId; + chunkStream.timestamp = timestamp; + chunkStream.totalReceived = 0; + chunkStream.messageData = []; + + if (timestamp >= 0xFFFFFF) { + hasExtendedTimestamp = true; + chunkStream.hasExtendedTimestamp = true; + } + + } else if (fmt === ChunkFormat.TYPE_1) { + // Type 1: 7 bytes + const header = await readLength(stream, 7); + const timestampDelta = header.readUIntBE(0, 3); + messageLength = header.readUIntBE(3, 3); + messageTypeId = header[6]; + + // Update chunk stream state + chunkStream.messageLength = messageLength; + chunkStream.messageTypeId = messageTypeId; + chunkStream.timestamp += timestampDelta; + chunkStream.totalReceived = 0; + chunkStream.messageData = []; + + if (timestampDelta >= 0xFFFFFF) { + hasExtendedTimestamp = true; + chunkStream.hasExtendedTimestamp = true; + } + + } else if (fmt === ChunkFormat.TYPE_2) { + // Type 2: 3 bytes + const header = await readLength(stream, 3); + const timestampDelta = header.readUIntBE(0, 3); + + // Update chunk stream state + chunkStream.timestamp += timestampDelta; + chunkStream.totalReceived = 0; + chunkStream.messageData = []; + + if (timestampDelta >= 0xFFFFFF) { + hasExtendedTimestamp = true; + chunkStream.hasExtendedTimestamp = true; + } + + } else { + // Type 3: 0 bytes - use previous values + if (chunkStream.totalReceived === 0) { + throw new Error('Type 3 chunk but no previous chunk in stream'); + } + } + + // Read extended timestamp if present + if (hasExtendedTimestamp || chunkStream.hasExtendedTimestamp) { + const extTs = await readLength(stream, 4); + const extendedTimestamp = extTs.readUInt32BE(0); + + if (fmt === ChunkFormat.TYPE_0) { + chunkStream.timestamp = extendedTimestamp; + } else if (fmt === ChunkFormat.TYPE_1 || fmt === ChunkFormat.TYPE_2) { + // For type 1 and 2, the extended timestamp replaces the delta + chunkStream.timestamp = chunkStream.timestamp - (fmt === ChunkFormat.TYPE_1 ? (await readLength(stream, 0)).readUIntBE(0, 3) : 0) + extendedTimestamp; + } + } + + // Calculate chunk data size + const remainingInMessage = chunkStream.messageLength - chunkStream.totalReceived; + const chunkDataSize = Math.min(this.chunkSize, remainingInMessage); + + const MAX_CHUNK_SIZE = 1024 * 1024; + if (chunkDataSize > MAX_CHUNK_SIZE) { + throw new Error(`Chunk size ${chunkDataSize} exceeds maximum allowed size of ${MAX_CHUNK_SIZE} bytes`); + } + + // Read chunk data + const chunkData = await readLength(stream, chunkDataSize); + chunkStream.messageData.push(chunkData); + chunkStream.totalReceived += chunkDataSize; + // this.bytesReceived += 1 + (fmt === ChunkFormat.TYPE_0 ? 11 : fmt === ChunkFormat.TYPE_1 ? 7 : fmt === ChunkFormat.TYPE_2 ? 3 : 0) + (hasExtendedTimestamp || chunkStream.hasExtendedTimestamp ? 4 : 0) + chunkDataSize; + + // Check if message is complete + if (chunkStream.totalReceived >= chunkStream.messageLength) { + const message = Buffer.concat(chunkStream.messageData); + chunkStream.messageData = []; + chunkStream.totalReceived = 0; + chunkStream.hasExtendedTimestamp = false; + return { + chunkStream, + message, + }; + } + } + } + + + /** + * Read exactly n bytes from socket + */ + private async readExactly(n: number): Promise { + return readLength(this.socket!, n); + } + + /** + * Encode value to AMF0 + */ + private encodeAMF0(value: any): Buffer { + if (typeof value === 'number') { + const buf = Buffer.alloc(9); + buf[0] = 0x00; // Number marker + buf.writeDoubleBE(value, 1); + return buf; + } else if (typeof value === 'string') { + const buf = Buffer.alloc(3 + value.length); + buf[0] = 0x02; // String marker + buf.writeUInt16BE(value.length, 1); + buf.write(value, 3, 'utf8'); + return buf; + } else if (typeof value === 'boolean') { + const buf = Buffer.alloc(2); + buf[0] = 0x01; // Boolean marker + buf[1] = value ? 1 : 0; + return buf; + } else if (value === null || value === undefined) { + return Buffer.from([0x05]); // Null marker + } else if (typeof value === 'object') { + // Object + const parts: Buffer[] = [Buffer.from([0x03])]; // Object marker + + for (const [key, val] of Object.entries(value)) { + // Key + const keyBuf = Buffer.alloc(2 + key.length); + keyBuf.writeUInt16BE(key.length, 0); + keyBuf.write(key, 2, 'utf8'); + parts.push(keyBuf); + + // Value + parts.push(this.encodeAMF0(val)); + } + + // End of object marker + parts.push(Buffer.from([0x00, 0x00, 0x09])); + + return Buffer.concat(parts); + } + + throw new Error(`Unsupported AMF0 type: ${typeof value}`); + } + + /** + * Encode command to AMF0 + */ + private encodeAMF0Command(commandName: string, transactionId: number, commandObject: any, ...args: any[]): Buffer { + const parts: Buffer[] = []; + + // Command name (string) + parts.push(this.encodeAMF0(commandName)); + + // Transaction ID (number) + parts.push(this.encodeAMF0(transactionId)); + + // Command object + parts.push(this.encodeAMF0(commandObject)); + + // Additional arguments + for (const arg of args) { + parts.push(this.encodeAMF0(arg)); + } + + return Buffer.concat(parts); + } + + /** + * Send a message as RTMP chunks + */ + private sendMessage( + chunkStreamId: number, + messageStreamId: number, + messageTypeId: number, + timestamp: number, + data: Buffer + ): void { + if (!this.socket) throw new Error('Socket not connected'); + + const chunks: Buffer[] = []; + let offset = 0; + + while (offset < data.length) { + const chunkDataSize = Math.min(this.outgoingChunkSize, data.length - offset); + const isType0 = offset === 0; + + // Type 0 header is 12 bytes (1 + 3 + 3 + 1 + 4) + const headerSize = isType0 ? 12 : 1; + const header = Buffer.alloc(headerSize); + + // Basic header (chunk stream ID) + if (chunkStreamId < 64) { + header[0] = (isType0 ? ChunkFormat.TYPE_0 : ChunkFormat.TYPE_3) << 6 | chunkStreamId; + } else { + // Handle extended chunk stream IDs (simplified for now) + header[0] = (isType0 ? ChunkFormat.TYPE_0 : ChunkFormat.TYPE_3) << 6 | 1; + } + + if (isType0) { + // Type 0 header + writeUInt24BE(header, timestamp, 1); + writeUInt24BE(header, data.length, 4); + header[7] = messageTypeId; + header.writeUInt32LE(messageStreamId, 8); + } + + chunks.push(header); + chunks.push(data.subarray(offset, offset + chunkDataSize)); + offset += chunkDataSize; + } + + for (const chunk of chunks) { + this.socket.write(chunk); + } + } + + /** + * Send connect command + */ + private async sendConnect(): Promise { + const parsedUrl = new URL(this.url); + const tcUrl = `${parsedUrl.protocol}//${parsedUrl.host}/${parsedUrl.pathname.split('/')[1]}`; + + const connectObject = { + app: parsedUrl.pathname.split('/')[1], + flashVer: 'LNX 9,0,124,2', + tcUrl: tcUrl, + fpad: false, + capabilities: 15, + audioCodecs: 4071, + videoCodecs: 252, + videoFunction: 1 + }; + + const data = this.encodeAMF0Command('connect', this.transactionId++, connectObject); + this.sendMessage(3, 0, RtmpMessageType.COMMAND_AMF0, 0, data); + } + + /** + * Send createStream command + */ + private async sendCreateStream(): Promise { + const data = this.encodeAMF0Command('createStream', this.transactionId++, null); + this.sendMessage(3, 0, RtmpMessageType.COMMAND_AMF0, 0, data); + return 1; + } + + /** + * Send play command + */ + private sendPlay(streamId: number, playPath: string): void { + const data = this.encodeAMF0Command('play', this.transactionId++, null, playPath, -2000); + this.sendMessage(4, streamId, RtmpMessageType.COMMAND_AMF0, 0, data); + } + + /** + * Send setBufferLength user control + */ + private setBufferLength(streamId: number, bufferLength: number): void { + const data = Buffer.alloc(10); + data.writeUInt16BE(3, 0); + data.writeUInt32BE(streamId, 2); + data.writeUInt32BE(bufferLength, 6); + this.sendMessage(2, 0, RtmpMessageType.USER_CONTROL, 1, data); + } + + /** + * Send window acknowledgement size + */ + private sendWindowAckSize(windowSize: number): void { + const data = Buffer.alloc(4); + data.writeUInt32BE(windowSize, 0); + this.sendMessage(2, 0, RtmpMessageType.WINDOW_ACKNOWLEDGEMENT_SIZE, 0, data); + } + + /** + * Destroy the connection + */ + destroy() { + if (this.socket) { + this.socket.destroy(); + this.socket = null; + } + this.connected = false; + } +} diff --git a/plugins/prebuffer-mixin/src/rtmp-session.ts b/plugins/prebuffer-mixin/src/rtmp-session.ts new file mode 100644 index 000000000..8f77f6fe8 --- /dev/null +++ b/plugins/prebuffer-mixin/src/rtmp-session.ts @@ -0,0 +1,224 @@ +import { RTSP_FRAME_MAGIC } from "@scrypted/common/src/rtsp-server"; +import { StreamChunk } from "@scrypted/common/src/stream-parser"; +import { ResponseMediaStreamOptions } from "@scrypted/sdk"; +import { EventEmitter } from "stream"; +import { RtpHeader, RtpPacket } from '../../../external/werift/packages/rtp/src/rtp/rtp'; +import { H264Repacketizer } from "../../homekit/src/types/camera/h264-packetizer"; +import { addRtpTimestamp, nextSequenceNumber } from "../../homekit/src/types/camera/jitter-buffer"; +import { createAACRTPPayload } from "./au"; +import { ParserSession, setupActivityTimer } from "./ffmpeg-rebroadcast"; +import { parseFlvAudioTag, parseFlvVideoTag, VideoCodecId } from "./flv"; +import { negotiateMediaStream } from "./rfc4571"; +import { RtmpClient } from "./rtmp-client"; + +export type RtspChannelCodecMapping = { [key: number]: string }; + +export interface RtspSessionParserSpecific { + interleaved: Map; +} + +export async function startRtmpSession(console: Console, url: string, mediaStreamOptions: ResponseMediaStreamOptions, options: { + audioSoftMuted: boolean, + rtspRequestTimeout: number, +}): Promise> { + let isActive = true; + const events = new EventEmitter(); + // need this to prevent kill from throwing due to uncaught Error during cleanup + events.on('error', () => { }); + + const rtmpClient = new RtmpClient(url, console); + + const cleanupSockets = () => { + rtmpClient.destroy(); + } + + let sessionKilled: any; + const killed = new Promise(resolve => { + sessionKilled = resolve; + }); + + const kill = (error?: Error) => { + if (isActive) { + events.emit('killed'); + events.emit('error', error || new Error('killed')); + } + isActive = false; + sessionKilled(); + cleanupSockets(); + }; + + rtmpClient.socket.on('close', () => { + kill(new Error('rtmp socket closed')); + }); + rtmpClient.socket.on('error', e => { + kill(e); + }); + + const { resetActivityTimer } = setupActivityTimer('rtsp', kill, events, options?.rtspRequestTimeout); + + try { + await rtmpClient.setup(); + + let sdp = `v=0 +o=- 0 0 IN IP4 0.0.0.0 +s=- +t=0 0 +m=video 0 RTP/AVP 96 +a=control:streamid=0 +a=rtpmap:96 H264/90000`; + if (!options?.audioSoftMuted) { + sdp += ` +m=audio 0 RTP/AVP 97 +a=control:streamid=2 +a=rtpmap:97 MPEG4-GENERIC/16000/1 +a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=1408`; + } + + sdp = sdp.split('\n').join('\r\n'); + + const start = async () => { + try { + let audioSequenceNumber = 0; + let videoSequenceNumber = 0; + const h264Repacketizer = new H264Repacketizer(console, 32000); + + for await (const rtmpPacket of rtmpClient.readLoop()) { + if (!isActive) + break; + + resetActivityTimer?.(); + + if (rtmpPacket.codec === 'audio') { + if (options?.audioSoftMuted) + continue; + + const flv = parseFlvAudioTag(rtmpPacket.packet); + + if (!flv.data?.length) + continue; + + const header = new RtpHeader({ + sequenceNumber: audioSequenceNumber, + timestamp: addRtpTimestamp(0, Math.floor(rtmpPacket.timestamp / 1000 * 16000)), + payloadType: 97, + marker: false, + }); + audioSequenceNumber = nextSequenceNumber(audioSequenceNumber); + + const audioPayload = createAACRTPPayload([flv.data]); + const rtp = new RtpPacket(header, audioPayload).serialize(); + + const prefix = Buffer.alloc(2); + prefix[0] = RTSP_FRAME_MAGIC; + prefix[1] = 2; + + const length = Buffer.alloc(2); + length.writeUInt16BE(rtp.length, 0); + + events.emit('rtsp', { + chunks: [Buffer.concat([prefix, length]), rtp], + type: 'aac', + }); + + continue; + } + + if (rtmpPacket.codec !== 'video') + throw new Error('unknown rtmp codec ' + rtmpPacket.codec); + + const flv = parseFlvVideoTag(rtmpPacket.packet); + if (flv.codecId !== VideoCodecId.H264) + throw new Error('unsupported rtmp video codec ' + flv.codecId); + + const prefix = Buffer.alloc(2); + prefix[0] = RTSP_FRAME_MAGIC; + prefix[1] = 0; + + const nalus: Buffer[] = []; + if (flv.nalus) { + nalus.push(...flv.nalus); + } + else if (flv.avcDecoderConfigurationRecord?.sps && flv.avcDecoderConfigurationRecord.pps) { + // make sure there's only one + if (flv.avcDecoderConfigurationRecord.sps.length > 1 || flv.avcDecoderConfigurationRecord.pps.length > 1) + throw new Error('rtmp sps/pps contains multiple nalus, only using the first of each'); + + nalus.push(flv.avcDecoderConfigurationRecord.sps[0]); + nalus.push(flv.avcDecoderConfigurationRecord.pps[0]); + } + else { + throw new Error('rtmp h264 nalus missing'); + } + + for (const nalu of nalus) { + const header = new RtpHeader({ + sequenceNumber: videoSequenceNumber, + timestamp: addRtpTimestamp(0, Math.floor(rtmpPacket.timestamp / 1000 * 90000)), + payloadType: 96, + marker: true, + }); + videoSequenceNumber = nextSequenceNumber(videoSequenceNumber); + + const rtp = new RtpPacket(header, nalu); + + const packets = h264Repacketizer.repacketize(rtp); + + for (const packet of packets) { + const length = Buffer.alloc(2); + const rtp = packet.serialize(); + length.writeUInt16BE(rtp.length, 0); + + events.emit('rtsp', { + chunks: [Buffer.concat([prefix, length]), rtp], + type: 'h264', + }); + } + } + } + } + catch (e) { + kill(e); + } + finally { + kill(new Error('rtsp read loop exited')); + } + }; + + // this return block is intentional, to ensure that the remaining code happens sync. + return (() => { + return { + start, + sdp: Promise.resolve(sdp), + get isActive() { return isActive }, + kill(error?: Error) { + kill(error); + }, + killed, + resetActivityTimer, + negotiateMediaStream: (requestMediaStream, inputVideoCodec, inputAudioCodec) => { + return negotiateMediaStream(sdp, mediaStreamOptions, inputVideoCodec, inputAudioCodec, requestMediaStream); + }, + emit(container: 'rtsp', chunk: StreamChunk) { + events.emit(container, chunk); + return this; + }, + on(event: string, cb: any) { + events.on(event, cb); + return this; + }, + once(event: any, cb: any) { + events.once(event, cb); + return this; + }, + removeListener(event, cb) { + events.removeListener(event, cb); + return this; + } + } + })(); + } + catch (e) { + cleanupSockets(); + throw e; + } +}