rebroadcast: rtmp window acks

This commit is contained in:
Koushik Dutta
2026-01-09 12:35:11 -08:00
parent 0185680791
commit 7ef868e42d

View File

@@ -70,8 +70,8 @@ export class RtmpClient {
private outgoingChunkSize: number = 128;
private windowAckSize: number = 5000000;
private streamId: number = 0;
private connected: boolean = false;
private receivedWindowBytes: number = 0;
private lastAcknowledgementBytes: number = 0;
private totalBytesReceived: number = 0;
private transactionId: number = 1;
private chunkStreams: Map<number, ChunkStream> = new Map();
@@ -248,8 +248,6 @@ export class RtmpClient {
const c2 = s1;
this.socket.write(c2);
this.connected = true;
}
/**
@@ -301,9 +299,11 @@ export class RtmpClient {
let messageTypeId: number;
let messageStreamId: number;
let hasExtendedTimestamp = false;
let headerSize: number;
if (fmt === ChunkFormat.TYPE_0) {
// Type 0: 11 bytes
headerSize = 11;
const header = await readLength(stream, 11);
timestamp = header.readUIntBE(0, 3);
messageLength = header.readUIntBE(3, 3);
@@ -325,6 +325,7 @@ export class RtmpClient {
} else if (fmt === ChunkFormat.TYPE_1) {
// Type 1: 7 bytes
headerSize = 7;
const header = await readLength(stream, 7);
const timestampDelta = header.readUIntBE(0, 3);
messageLength = header.readUIntBE(3, 3);
@@ -344,6 +345,7 @@ export class RtmpClient {
} else if (fmt === ChunkFormat.TYPE_2) {
// Type 2: 3 bytes
headerSize = 3;
const header = await readLength(stream, 3);
const timestampDelta = header.readUIntBE(0, 3);
@@ -358,6 +360,7 @@ export class RtmpClient {
}
} else {
headerSize = 0;
// Type 3: 0 bytes - use previous values
if (chunkStream.totalReceived === 0) {
throw new Error('Type 3 chunk but no previous chunk in stream');
@@ -390,7 +393,15 @@ export class RtmpClient {
const chunkData = await readLength(stream, chunkDataSize);
chunkStream.messageData.push(chunkData);
chunkStream.totalReceived += chunkDataSize;
// this.bytesReceived += 1 + (fmt === ChunkFormat.TYPE_0 ? 11 : fmt === ChunkFormat.TYPE_1 ? 7 : fmt === ChunkFormat.TYPE_2 ? 3 : 0) + (hasExtendedTimestamp || chunkStream.hasExtendedTimestamp ? 4 : 0) + chunkDataSize;
// Track bytes received for window acknowledgements
// Count: basic header (1 byte) + message header (0-11 bytes) + extended timestamp (0-4 bytes) + payload
const extTimestampSize = (hasExtendedTimestamp || chunkStream.hasExtendedTimestamp) ? 4 : 0;
const bytesInChunk = 1 + headerSize + extTimestampSize + chunkDataSize;
this.totalBytesReceived += bytesInChunk;
// Send window acknowledgement if threshold exceeded
this.sendAcknowledgementIfNeeded();
// Check if message is complete
if (chunkStream.totalReceived >= chunkStream.messageLength) {
@@ -406,6 +417,19 @@ export class RtmpClient {
}
}
/**
* Send acknowledgement if window threshold exceeded
*/
private sendAcknowledgementIfNeeded(): void {
const bytesToAck = this.totalBytesReceived - this.lastAcknowledgementBytes;
if (bytesToAck >= this.windowAckSize) {
this.lastAcknowledgementBytes = this.totalBytesReceived;
console.log(`Sending acknowledgement: ${this.lastAcknowledgementBytes} bytes received (${bytesToAck} since last ACK)`);
const data = Buffer.alloc(4);
data.writeUInt32BE(this.lastAcknowledgementBytes & 0xFFFFFFFF, 0);
this.sendMessage(2, 0, RtmpMessageType.ACKNOWLEDGEMENT, 0, data);
}
}
/**
* Read exactly n bytes from socket
@@ -599,6 +623,5 @@ export class RtmpClient {
this.socket.destroy();
this.socket = null;
}
this.connected = false;
}
}