homekit: experimental packetizer work

This commit is contained in:
Koushik Dutta
2022-04-30 14:52:52 -07:00
parent b2b1eaab57
commit c2705ab981
3 changed files with 76 additions and 37 deletions

View File

@@ -83,10 +83,14 @@ export function createCameraStreamSender(console: Console, config: Config, sende
rtp.header.payloadType = payloadType;
const srtp = srtpSession.encrypt(rtp.payload, rtp.header);
// if (!audioOptions)
// console.log('sent', rtp.header.sequenceNumber);
sender.send(srtp, port, targetAddress);
}
return (rtp: RtpPacket) => {
// console.log('received', rtp.header.sequenceNumber);
if (!firstSequenceNumber) {
console.log(`sending first ${audioOptions ? 'audio' : 'video'} packet`);
firstSequenceNumber = rtp.header.sequenceNumber;
@@ -130,6 +134,9 @@ export function createCameraStreamSender(console: Console, config: Config, sende
sendPacket(rtp);
return;
}
else {
// console.log('video', Date.now())
}
const packets = h264Packetizer.repacketize(rtp);
if (!packets)

View File

@@ -1,5 +1,6 @@
import { readLength } from '@scrypted/common/src/read-stream';
import { getSpsPps, parseSdp } from '@scrypted/common/src/sdp-utils';
import { sleep } from '@scrypted/common/src/sleep';
import { FFmpegInput } from '@scrypted/sdk';
import net from 'net';
import { Readable } from 'stream';

View File

@@ -1,4 +1,4 @@
import { RtpHeader, RtpPacket } from "../../../../../external/werift/packages/rtp/src/rtp/rtp";
import { RtpPacket } from "../../../../../external/werift/packages/rtp/src/rtp/rtp";
// https://yumichan.net/video-processing/video-compression/introduction-to-h264-nal-unit/
const NAL_TYPE_STAP_A = 24;
@@ -13,6 +13,9 @@ const FU_A_HEADER_SIZE = 2;
const LENGTH_FIELD_SIZE = 2;
const STAP_A_HEADER_SIZE = NAL_HEADER_SIZE + LENGTH_FIELD_SIZE;
function isSequential(sq1: number, sq2: number) {
return sq2 === (sq1 + 1) % 0x10000;
}
// a stap a packet is a packet that aggregates multiple nals
function depacketizeStapA(data: Buffer) {
@@ -48,7 +51,7 @@ export class H264Repacketizer {
// a fragmentation unit (fua) is a NAL unit broken into multiple fragments.
// https://datatracker.ietf.org/doc/html/rfc6184#section-5.8
packetizeFuA(data: Buffer, noStart?: boolean, noEnd?: boolean): Buffer[] {
packetizeFuA(data: Buffer, noStart?: boolean, noEnd?: boolean, fillPacket = false): Buffer[] {
// handle both normal packets and fua packets.
// a fua packet can be fragmented easily into smaller packets, as
// it is already a fragment, and splitting segments is
@@ -79,9 +82,20 @@ export class H264Repacketizer {
}
const payloadSize = data.length - NAL_HEADER_SIZE;
const numPackets = Math.ceil(payloadSize / this.fuaMax);
let numLargerPackets = payloadSize % numPackets;
const packageSize = Math.floor(payloadSize / numPackets);
const numPackets = Math.max(2, Math.ceil(payloadSize / this.fuaMax));
let numLargerPackets: number;
let largerPackageSize: number;
let smallerPackageSize: number;
if (!fillPacket) {
numLargerPackets = payloadSize % numPackets;
smallerPackageSize = Math.floor(payloadSize / numPackets);
largerPackageSize = smallerPackageSize + 1;
}
else {
numLargerPackets = numPackets - 1;
largerPackageSize = this.fuaMax;
smallerPackageSize = payloadSize - numLargerPackets * largerPackageSize;
}
const fnri = data[0] & (0x80 | 0x60);
const nal = data[0] & 0x1F;
@@ -100,12 +114,12 @@ export class H264Repacketizer {
let payload: Buffer;
if (numLargerPackets > 0) {
numLargerPackets -= 1;
payload = data.subarray(offset, offset + packageSize + 1);
offset += packageSize + 1;
payload = data.subarray(offset, offset + largerPackageSize);
offset += largerPackageSize;
}
else {
payload = data.subarray(offset, offset + packageSize);
offset += packageSize;
payload = data.subarray(offset, offset + smallerPackageSize);
offset += smallerPackageSize;
}
if (offset === data.length) {
@@ -204,12 +218,12 @@ export class H264Repacketizer {
this.pendingStapA = undefined;
}
flushPendingFuA(ret: Buffer[]) {
flushPendingFuA(ret: Buffer[], allowPartialFlush?: boolean) {
if (!this.pendingFuA)
return;
// defragmenting assumes packets are sorted by sequence number,
// and are all available, which is guaranteed over rtsp/tcp, but not over rtp/udp.
// and are all available, which is guaranteed over rtsp/tcp (maybe?), but not over rtp/udp.
const first = this.pendingFuA[0];
const last = this.pendingFuA[this.pendingFuA.length - 1];
@@ -217,23 +231,6 @@ export class H264Repacketizer {
const hasFuEnd = !!(last.payload[1] & 0x40);
const originalNalType = first.payload[1] & 0x1f;
let lastSequenceNumber: number;
for (const packet of this.pendingFuA) {
const nalType = packet.payload[1] & 0x1f;
if (nalType !== originalNalType) {
console.error('nal type mismatch');
this.pendingFuA = undefined;
return;
}
if (lastSequenceNumber !== undefined) {
if (packet.header.sequenceNumber !== (lastSequenceNumber + 1) % 0x10000) {
console.error('fua packet is missing. skipping refragmentation.');
this.pendingFuA = undefined;
return;
}
}
lastSequenceNumber = packet.header.sequenceNumber;
}
const fnri = first.payload[0] & (0x80 | 0x60);
const originalNalHeader = Buffer.from([fnri | originalNalType]);
@@ -241,14 +238,37 @@ export class H264Repacketizer {
const originalFragments = this.pendingFuA.map(packet => packet.payload.subarray(FU_A_HEADER_SIZE));
originalFragments.unshift(originalNalHeader);
const defragmented = Buffer.concat(originalFragments);
if (defragmented.length <= this.maxPacketSize && hasFuStart && hasFuEnd) {
ret.push(this.createPacket(first, defragmented, true));
this.extraPackets -= this.pendingFuA.length - 1;
this.pendingFuA = undefined;
return;
}
const fragments = this.packetizeFuA(defragmented, !hasFuStart, !hasFuEnd);
const hadMarker = last.header.marker;
this.createRtpPackets(first, fragments, ret, hadMarker);
const keepTrailer = allowPartialFlush && !hasFuEnd;
const fragments = this.packetizeFuA(defragmented, !hasFuStart, !hasFuEnd, keepTrailer);
this.extraPackets -= this.pendingFuA.length - 1;
this.pendingFuA = undefined;
// fu start/end packet can't be combined into a signle fragment.
if (keepTrailer) {
// keep the new final fua nal for combining with the next fua nal.
// console.log('keep')
const keep = fragments.pop();
if (fragments.length) {
this.createRtpPackets(first, fragments, ret, false);
this.extraPackets -= this.pendingFuA.length - 2;
}
else {
// 2 packets got combined into a packet that is still too small.
this.extraPackets--;
}
last.payload = keep;
this.pendingFuA = [last];
}
else {
this.createRtpPackets(first, fragments, ret, last.header.marker);
this.extraPackets -= this.pendingFuA.length - 1;
this.pendingFuA = undefined;
}
}
createRtpPackets(packet: RtpPacket, nalus: Buffer[], ret: Buffer[], hadMarker = packet.header.marker) {
@@ -275,6 +295,7 @@ export class H264Repacketizer {
repacketize(packet: RtpPacket): Buffer[] {
const ret: Buffer[] = [];
const nalType = packet.payload[0] & 0x1F;
// fragmented packets must share a timestamp
@@ -293,6 +314,7 @@ export class H264Repacketizer {
const data = packet.payload;
const originalNalType = data[1] & 0x1f;
const fnri = data[0] & (0x80 | 0x60);
const isFuStart = !!(data[1] & 0x80);
// if this is an idr frame, but no sps has been sent, dummy one up.
// the stream may not contain sps.
@@ -304,10 +326,13 @@ export class H264Repacketizer {
// the fua packet may already fit, in which case we could just send it.
// but for some reason that doesn't work??
if (false && packet.payload.length <= this.maxPacketSize) {
this.flushPendingFuA(ret);
const isFuEnd = !!(data[1] & 0x40);
ret.push(this.createPacket(packet, packet.payload, packet.header.marker && isFuEnd));
return ret;
}
else if (packet.payload.length >= this.maxPacketSize * 2) {
else if (false && packet.payload.length >= this.maxPacketSize * 2) {
// most rtsp implementations send fat fua packets ~64k. can just repacketize those
// with minimal extra packet overhead.
const fragments = this.packetizeFuA(packet.payload);
@@ -320,12 +345,18 @@ export class H264Repacketizer {
}
}
// watch for missing sequence numbers so that non-sequential fragments aren't combined.
if (this.pendingFuA?.length && !isSequential(this.pendingFuA[0].header.sequenceNumber, packet.header.sequenceNumber)) {
this.flushPendingFuA(ret);
}
if (this.pendingFuA) {
this.pendingFuA.push(packet);
const isFuEnd = !!(packet.payload[1] & 0x40);
if (isFuEnd)
this.flushPendingFuA(ret);
if (isFuEnd || this.pendingFuA.length > 1)
this.flushPendingFuA(ret, true);
}
}
else if (nalType === NAL_TYPE_STAP_A) {