From c2705ab981b1ea2e242c16984674358605a7ae5d Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Sat, 30 Apr 2022 14:52:52 -0700 Subject: [PATCH] homekit: experimental packetizer work --- .../camera/camera-streaming-srtp-sender.ts | 7 ++ .../src/types/camera/camera-streaming-srtp.ts | 1 + .../src/types/camera/h264-packetizer.ts | 105 ++++++++++++------ 3 files changed, 76 insertions(+), 37 deletions(-) diff --git a/plugins/homekit/src/types/camera/camera-streaming-srtp-sender.ts b/plugins/homekit/src/types/camera/camera-streaming-srtp-sender.ts index 8da19349d..5ba90c747 100644 --- a/plugins/homekit/src/types/camera/camera-streaming-srtp-sender.ts +++ b/plugins/homekit/src/types/camera/camera-streaming-srtp-sender.ts @@ -83,10 +83,14 @@ export function createCameraStreamSender(console: Console, config: Config, sende rtp.header.payloadType = payloadType; const srtp = srtpSession.encrypt(rtp.payload, rtp.header); + // if (!audioOptions) + // console.log('sent', rtp.header.sequenceNumber); sender.send(srtp, port, targetAddress); } return (rtp: RtpPacket) => { + // console.log('received', rtp.header.sequenceNumber); + if (!firstSequenceNumber) { console.log(`sending first ${audioOptions ? 'audio' : 'video'} packet`); firstSequenceNumber = rtp.header.sequenceNumber; @@ -130,6 +134,9 @@ export function createCameraStreamSender(console: Console, config: Config, sende sendPacket(rtp); return; } + else { + // console.log('video', Date.now()) + } const packets = h264Packetizer.repacketize(rtp); if (!packets) diff --git a/plugins/homekit/src/types/camera/camera-streaming-srtp.ts b/plugins/homekit/src/types/camera/camera-streaming-srtp.ts index 456ee3aba..f4c86933a 100644 --- a/plugins/homekit/src/types/camera/camera-streaming-srtp.ts +++ b/plugins/homekit/src/types/camera/camera-streaming-srtp.ts @@ -1,5 +1,6 @@ import { readLength } from '@scrypted/common/src/read-stream'; import { getSpsPps, parseSdp } from '@scrypted/common/src/sdp-utils'; +import { sleep } from '@scrypted/common/src/sleep'; import { FFmpegInput } from '@scrypted/sdk'; import net from 'net'; import { Readable } from 'stream'; diff --git a/plugins/homekit/src/types/camera/h264-packetizer.ts b/plugins/homekit/src/types/camera/h264-packetizer.ts index 511937201..3ce066b47 100644 --- a/plugins/homekit/src/types/camera/h264-packetizer.ts +++ b/plugins/homekit/src/types/camera/h264-packetizer.ts @@ -1,4 +1,4 @@ -import { RtpHeader, RtpPacket } from "../../../../../external/werift/packages/rtp/src/rtp/rtp"; +import { RtpPacket } from "../../../../../external/werift/packages/rtp/src/rtp/rtp"; // https://yumichan.net/video-processing/video-compression/introduction-to-h264-nal-unit/ const NAL_TYPE_STAP_A = 24; @@ -13,6 +13,9 @@ const FU_A_HEADER_SIZE = 2; const LENGTH_FIELD_SIZE = 2; const STAP_A_HEADER_SIZE = NAL_HEADER_SIZE + LENGTH_FIELD_SIZE; +function isSequential(sq1: number, sq2: number) { + return sq2 === (sq1 + 1) % 0x10000; +} // a stap a packet is a packet that aggregates multiple nals function depacketizeStapA(data: Buffer) { @@ -48,7 +51,7 @@ export class H264Repacketizer { // a fragmentation unit (fua) is a NAL unit broken into multiple fragments. // https://datatracker.ietf.org/doc/html/rfc6184#section-5.8 - packetizeFuA(data: Buffer, noStart?: boolean, noEnd?: boolean): Buffer[] { + packetizeFuA(data: Buffer, noStart?: boolean, noEnd?: boolean, fillPacket = false): Buffer[] { // handle both normal packets and fua packets. // a fua packet can be fragmented easily into smaller packets, as // it is already a fragment, and splitting segments is @@ -79,9 +82,20 @@ export class H264Repacketizer { } const payloadSize = data.length - NAL_HEADER_SIZE; - const numPackets = Math.ceil(payloadSize / this.fuaMax); - let numLargerPackets = payloadSize % numPackets; - const packageSize = Math.floor(payloadSize / numPackets); + const numPackets = Math.max(2, Math.ceil(payloadSize / this.fuaMax)); + let numLargerPackets: number; + let largerPackageSize: number; + let smallerPackageSize: number; + if (!fillPacket) { + numLargerPackets = payloadSize % numPackets; + smallerPackageSize = Math.floor(payloadSize / numPackets); + largerPackageSize = smallerPackageSize + 1; + } + else { + numLargerPackets = numPackets - 1; + largerPackageSize = this.fuaMax; + smallerPackageSize = payloadSize - numLargerPackets * largerPackageSize; + } const fnri = data[0] & (0x80 | 0x60); const nal = data[0] & 0x1F; @@ -100,12 +114,12 @@ export class H264Repacketizer { let payload: Buffer; if (numLargerPackets > 0) { numLargerPackets -= 1; - payload = data.subarray(offset, offset + packageSize + 1); - offset += packageSize + 1; + payload = data.subarray(offset, offset + largerPackageSize); + offset += largerPackageSize; } else { - payload = data.subarray(offset, offset + packageSize); - offset += packageSize; + payload = data.subarray(offset, offset + smallerPackageSize); + offset += smallerPackageSize; } if (offset === data.length) { @@ -204,12 +218,12 @@ export class H264Repacketizer { this.pendingStapA = undefined; } - flushPendingFuA(ret: Buffer[]) { + flushPendingFuA(ret: Buffer[], allowPartialFlush?: boolean) { if (!this.pendingFuA) return; // defragmenting assumes packets are sorted by sequence number, - // and are all available, which is guaranteed over rtsp/tcp, but not over rtp/udp. + // and are all available, which is guaranteed over rtsp/tcp (maybe?), but not over rtp/udp. const first = this.pendingFuA[0]; const last = this.pendingFuA[this.pendingFuA.length - 1]; @@ -217,23 +231,6 @@ export class H264Repacketizer { const hasFuEnd = !!(last.payload[1] & 0x40); const originalNalType = first.payload[1] & 0x1f; - let lastSequenceNumber: number; - for (const packet of this.pendingFuA) { - const nalType = packet.payload[1] & 0x1f; - if (nalType !== originalNalType) { - console.error('nal type mismatch'); - this.pendingFuA = undefined; - return; - } - if (lastSequenceNumber !== undefined) { - if (packet.header.sequenceNumber !== (lastSequenceNumber + 1) % 0x10000) { - console.error('fua packet is missing. skipping refragmentation.'); - this.pendingFuA = undefined; - return; - } - } - lastSequenceNumber = packet.header.sequenceNumber; - } const fnri = first.payload[0] & (0x80 | 0x60); const originalNalHeader = Buffer.from([fnri | originalNalType]); @@ -241,14 +238,37 @@ export class H264Repacketizer { const originalFragments = this.pendingFuA.map(packet => packet.payload.subarray(FU_A_HEADER_SIZE)); originalFragments.unshift(originalNalHeader); const defragmented = Buffer.concat(originalFragments); + if (defragmented.length <= this.maxPacketSize && hasFuStart && hasFuEnd) { + ret.push(this.createPacket(first, defragmented, true)); + this.extraPackets -= this.pendingFuA.length - 1; + this.pendingFuA = undefined; + return; + } - const fragments = this.packetizeFuA(defragmented, !hasFuStart, !hasFuEnd); - const hadMarker = last.header.marker; - this.createRtpPackets(first, fragments, ret, hadMarker); + const keepTrailer = allowPartialFlush && !hasFuEnd; + const fragments = this.packetizeFuA(defragmented, !hasFuStart, !hasFuEnd, keepTrailer); - this.extraPackets -= this.pendingFuA.length - 1; - - this.pendingFuA = undefined; + // fu start/end packet can't be combined into a signle fragment. + if (keepTrailer) { + // keep the new final fua nal for combining with the next fua nal. + // console.log('keep') + const keep = fragments.pop(); + if (fragments.length) { + this.createRtpPackets(first, fragments, ret, false); + this.extraPackets -= this.pendingFuA.length - 2; + } + else { + // 2 packets got combined into a packet that is still too small. + this.extraPackets--; + } + last.payload = keep; + this.pendingFuA = [last]; + } + else { + this.createRtpPackets(first, fragments, ret, last.header.marker); + this.extraPackets -= this.pendingFuA.length - 1; + this.pendingFuA = undefined; + } } createRtpPackets(packet: RtpPacket, nalus: Buffer[], ret: Buffer[], hadMarker = packet.header.marker) { @@ -275,6 +295,7 @@ export class H264Repacketizer { repacketize(packet: RtpPacket): Buffer[] { const ret: Buffer[] = []; + const nalType = packet.payload[0] & 0x1F; // fragmented packets must share a timestamp @@ -293,6 +314,7 @@ export class H264Repacketizer { const data = packet.payload; const originalNalType = data[1] & 0x1f; + const fnri = data[0] & (0x80 | 0x60); const isFuStart = !!(data[1] & 0x80); // if this is an idr frame, but no sps has been sent, dummy one up. // the stream may not contain sps. @@ -304,10 +326,13 @@ export class H264Repacketizer { // the fua packet may already fit, in which case we could just send it. // but for some reason that doesn't work?? if (false && packet.payload.length <= this.maxPacketSize) { + this.flushPendingFuA(ret); + const isFuEnd = !!(data[1] & 0x40); ret.push(this.createPacket(packet, packet.payload, packet.header.marker && isFuEnd)); + return ret; } - else if (packet.payload.length >= this.maxPacketSize * 2) { + else if (false && packet.payload.length >= this.maxPacketSize * 2) { // most rtsp implementations send fat fua packets ~64k. can just repacketize those // with minimal extra packet overhead. const fragments = this.packetizeFuA(packet.payload); @@ -320,12 +345,18 @@ export class H264Repacketizer { } } + // watch for missing sequence numbers so that non-sequential fragments aren't combined. + if (this.pendingFuA?.length && !isSequential(this.pendingFuA[0].header.sequenceNumber, packet.header.sequenceNumber)) { + this.flushPendingFuA(ret); + } + if (this.pendingFuA) { this.pendingFuA.push(packet); const isFuEnd = !!(packet.payload[1] & 0x40); - if (isFuEnd) - this.flushPendingFuA(ret); + + if (isFuEnd || this.pendingFuA.length > 1) + this.flushPendingFuA(ret, true); } } else if (nalType === NAL_TYPE_STAP_A) {