From 5f6c7c66b4cb44e5c96c85404bc63ea185c042ff Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Sun, 29 May 2022 17:17:40 -0700 Subject: [PATCH] homekit: wip --- .../camera/camera-streaming-srtp-sender.ts | 23 +++-- .../src/types/camera/h264-packetizer.ts | 93 ++++++++----------- .../homekit/src/types/camera/jitter-buffer.ts | 87 +++++++++++++++++ 3 files changed, 138 insertions(+), 65 deletions(-) create mode 100644 plugins/homekit/src/types/camera/jitter-buffer.ts diff --git a/plugins/homekit/src/types/camera/camera-streaming-srtp-sender.ts b/plugins/homekit/src/types/camera/camera-streaming-srtp-sender.ts index b862d4c94..c775f6295 100644 --- a/plugins/homekit/src/types/camera/camera-streaming-srtp-sender.ts +++ b/plugins/homekit/src/types/camera/camera-streaming-srtp-sender.ts @@ -8,6 +8,7 @@ import dgram from 'dgram'; import { AudioStreamingSamplerate } from '../../hap'; import { ntpTime } from './camera-utils'; import { H264Repacketizer } from './h264-packetizer'; +import { JitterBuffer } from './jitter-buffer'; import { OpusRepacketizer } from './opus-repacketizer'; export function createCameraStreamSender(console: Console, config: Config, sender: dgram.Socket, ssrc: number, payloadType: number, port: number, targetAddress: string, rtcpInterval: number, @@ -34,6 +35,7 @@ export function createCameraStreamSender(console: Console, config: Config, sende let rolloverCount = 0; let opusPacketizer: OpusRepacketizer; let h264Packetizer: H264Repacketizer; + let h264JitterBuffer: JitterBuffer; let analyzeVideo = true; let audioIntervalScale = 1; @@ -53,6 +55,7 @@ export function createCameraStreamSender(console: Console, config: Config, sende // adjust packet size for the rtp packet header (12). const adjustedMtu = videoOptions.maxPacketSize - 12; h264Packetizer = new H264Repacketizer(console, adjustedMtu, videoOptions); + h264JitterBuffer = new JitterBuffer(console, 4); sender.setSendBufferSize(1024 * 1024); } @@ -144,16 +147,18 @@ export function createCameraStreamSender(console: Console, config: Config, sende return; } - const packets = h264Packetizer.repacketize(rtp); - if (!packets) - return; - for (const packet of packets) { - if (analyzeVideo) { - const naluTypes = getNaluTypesInNalu(packet.payload, true); - console.log('scanning for idr start found:', ...[...naluTypes]); - analyzeVideo = !naluTypes.has(H264_NAL_TYPE_IDR); + for (const dejittered of h264JitterBuffer.queue(rtp)) { + const packets = h264Packetizer.repacketize(dejittered); + if (!packets?.length) + continue; + for (const packet of packets) { + if (analyzeVideo) { + const naluTypes = getNaluTypesInNalu(packet.payload, true); + console.log('scanning for idr start found:', ...[...naluTypes]); + analyzeVideo = !naluTypes.has(H264_NAL_TYPE_IDR); + } + sendPacket(packet); } - sendPacket(packet); } } diff --git a/plugins/homekit/src/types/camera/h264-packetizer.ts b/plugins/homekit/src/types/camera/h264-packetizer.ts index d94851375..b420392ad 100644 --- a/plugins/homekit/src/types/camera/h264-packetizer.ts +++ b/plugins/homekit/src/types/camera/h264-packetizer.ts @@ -1,4 +1,5 @@ import type { RtpPacket } from "@koush/werift-src/packages/rtp/src/rtp/rtp"; +import { isNextSequenceNumber } from "./jitter-buffer"; // https://yumichan.net/video-processing/video-compression/introduction-to-h264-nal-unit/ export const NAL_TYPE_STAP_A = 24; @@ -63,10 +64,9 @@ function splitBitstream(data: Buffer) { export class H264Repacketizer { extraPackets = 0; fuaMax: number; - pendingStapA: RtpPacket[]; pendingFuA: RtpPacket[]; pendingFuASeenStart = false; - seenSps = false; + seenStapASps = false; constructor(public console: Console, public maxPacketSize: number, public codecInfo: { sps: Buffer, @@ -76,6 +76,25 @@ export class H264Repacketizer { this.fuaMax = maxPacketSize - FU_A_HEADER_SIZE;; } + ensureCodecInfo() { + if (!this.codecInfo) { + this.codecInfo = { + sps: undefined, + pps: undefined, + }; + } + } + + updateSps(sps: Buffer) { + this.ensureCodecInfo(); + this.codecInfo.sps = sps; + } + + updatePps(pps: Buffer) { + this.ensureCodecInfo(); + this.codecInfo.pps = pps; + } + shouldFilter(nalType: number) { // currently nothing is filtered, but it seems that some SEI packets cause issues // and should be ignored, while others show up in the stap-a sps/pps packet @@ -203,28 +222,6 @@ export class H264Repacketizer { return ret; } - flushPendingStapA(ret: RtpPacket[]) { - if (!this.pendingStapA) - return; - const first = this.pendingStapA[0]; - const hadMarker = first.header.marker; - - const aggregates = this.packetizeStapA(this.pendingStapA.map(packet => packet.payload)); - if (aggregates.length !== 1) { - this.console.error('expected only 1 packet for sps/pps stapa'); - this.pendingStapA = undefined; - return; - } - - aggregates.forEach((packetized, index) => { - const marker = hadMarker && index === aggregates.length - 1; - ret.push(this.createPacket(first, packetized, marker)); - }); - - this.extraPackets -= this.pendingStapA.length - 1; - this.pendingStapA = undefined; - } - flushPendingFuA(ret: RtpPacket[], allowRecoverableErrors?: boolean) { if (!this.pendingFuA) return; @@ -267,6 +264,10 @@ export class H264Repacketizer { let lastSequenceNumber: number; for (const packet of this.pendingFuA) { + if (lastSequenceNumber !== undefined && !isNextSequenceNumber(lastSequenceNumber, packet.header.sequenceNumber)) { + + } + const nalType = packet.payload[1] & 0x1f; if (nalType !== originalNalType) { this.console.error('unexpected nal type mismatch. skipping refragmentation.', originalNalType, nalType); @@ -296,22 +297,15 @@ export class H264Repacketizer { const defragmented = Buffer.concat(originalFragments); if (originalNalType === NAL_TYPE_SPS) { - if (!this.codecInfo) { - this.codecInfo = { - sps: undefined, - pps: undefined, - }; - } - const splits = splitBitstream(defragmented); while (splits.length) { const split = splits.shift(); const splitNaluType = split[0] & 0x1f; if (splitNaluType === NAL_TYPE_SPS) { - this.codecInfo.sps = split; + this.updateSps(split); } else if (splitNaluType === NAL_TYPE_PPS) { - this.codecInfo.pps = split; + this.updatePps(split); } else { if (splitNaluType === NAL_TYPE_IDR) @@ -391,7 +385,6 @@ export class H264Repacketizer { // empty packets are apparently valid from webrtc. filter those out. if (!packet.payload.length) { this.flushPendingFuA(ret); - this.flushPendingStapA(ret); this.extraPackets--; return ret; } @@ -403,15 +396,7 @@ export class H264Repacketizer { this.flushPendingFuA(ret); } - // stapa packets must share the same timestamp - if (this.pendingStapA && this.pendingStapA[0].header.timestamp !== packet.header.timestamp) { - this.flushPendingStapA(ret); - } - if (nalType === NAL_TYPE_FU_A) { - // fua may share a timestamp as stapa, but don't aggregated with stapa - this.flushPendingStapA(ret); - const data = packet.payload; const originalNalType = data[1] & 0x1f; @@ -424,7 +409,7 @@ export class H264Repacketizer { const isFuEnd = !!(packet.payload[1] & 0x40); // if this is an idr frame, but no sps has been sent, dummy one up. // the stream may not contain sps. - if (originalNalType === NAL_TYPE_IDR && isFuStart && !this.seenSps) { + if (originalNalType === NAL_TYPE_IDR && isFuStart && !this.seenStapASps) { this.maybeSendSpsPps(packet, ret); } @@ -446,8 +431,6 @@ export class H264Repacketizer { if (this.pendingFuA) { this.pendingFuA.push(packet); - this.pendingFuA = this.pendingFuA.sort((a, b) => a.header.sequenceNumber - b.header.sequenceNumber); - if (isFuEnd) { this.flushPendingFuA(ret); } @@ -477,7 +460,7 @@ export class H264Repacketizer { const depacketized = depacketizeStapA(packet.payload) .filter(payload => { const nalType = payload[0] & 0x1F; - this.seenSps = this.seenSps || (nalType === NAL_TYPE_SPS); + this.seenStapASps = this.seenStapASps || (nalType === NAL_TYPE_SPS); if (this.shouldFilter(nalType)) { return false; } @@ -494,28 +477,26 @@ export class H264Repacketizer { this.flushPendingFuA(ret); if (this.shouldFilter(nalType)) { - this.flushPendingStapA(ret); this.extraPackets--; return ret; } - // codec information should be aggregated. usually around 50 bytes total. - if (nalType === NAL_TYPE_SPS || nalType === NAL_TYPE_PPS) { - this.seenSps = this.seenSps || (nalType === NAL_TYPE_SPS); - if (!this.pendingStapA) - this.pendingStapA = []; - this.pendingStapA.push(packet); + // codec information should be aggregated into a stapa. usually around 50 bytes total. + if (nalType === NAL_TYPE_SPS) { + this.updateSps(packet.payload); + return ret; + } + else if (nalType === NAL_TYPE_PPS) { + this.updatePps(packet.payload); return ret; } - - this.flushPendingStapA(ret); if (this.shouldFilter(nalType)) { this.extraPackets--; return ret; } - if (nalType === NAL_TYPE_IDR && !this.seenSps) { + if (nalType === NAL_TYPE_IDR && !this.seenStapASps) { // if this is an idr frame, but no sps has been sent, dummy one up. // the stream may not contain sps. this.maybeSendSpsPps(packet, ret); diff --git a/plugins/homekit/src/types/camera/jitter-buffer.ts b/plugins/homekit/src/types/camera/jitter-buffer.ts new file mode 100644 index 000000000..94bf04ddb --- /dev/null +++ b/plugins/homekit/src/types/camera/jitter-buffer.ts @@ -0,0 +1,87 @@ +import type { RtpPacket } from "@koush/werift-src/packages/rtp/src/rtp/rtp"; + +export function sequenceNumberDistance(s1: number, s2: number): number { + if (s2 === s1) + return 0; + const distance = s2 - s1; + const rolloverDistance = s2 + 0xFFFF - s1; + + if (Math.abs(distance) < Math.abs(rolloverDistance)) + return distance; + return rolloverDistance; +} + +export function nextSequenceNumber(current: number) { + return (current + 1) % 0x10000; +} + +export function isNextSequenceNumber(current: number, next: number) { + return nextSequenceNumber(current) === next; +} + +export class JitterBuffer { + lastSequenceNumber: number; + pending: RtpPacket[] = []; + + constructor(public console: Console, public jitterSize: number, ) { + } + + flushPending(afterSequenceNumber: number, ret: RtpPacket[]): RtpPacket[] { + if (!this.pending) + return ret; + + const start = nextSequenceNumber(afterSequenceNumber); + + for (let i = 0; i < this.jitterSize; i++) { + const index = (start + i) % this.jitterSize; + const packet = this.pending[index]; + if (!packet) + continue; + const { sequenceNumber } = packet.header; + const sd = sequenceNumberDistance(this.lastSequenceNumber, sequenceNumber); + // packet needs to be purged from the the buffer for being too old. + if (sd <= 0) { + this.console.log('jitter buffer purged packet:', sequenceNumber); + this.pending[index] = undefined; + ret.push(packet); + } + else if (sd === 1) { + this.pending[index] = undefined; + this.lastSequenceNumber = sequenceNumber; + ret.push(packet); + } + else { + // can't do anything with this packet yet. + } + } + return ret; + } + + queue(packet: RtpPacket): RtpPacket[] { + if (this.lastSequenceNumber === undefined || isNextSequenceNumber(this.lastSequenceNumber, packet.header.sequenceNumber)) { + this.lastSequenceNumber = packet.header.sequenceNumber; + return this.flushPending(this.lastSequenceNumber, [packet]); + } + + const { sequenceNumber } = packet.header; + const packetDistance = sequenceNumberDistance(this.lastSequenceNumber, sequenceNumber); + // late/duplicate packet + if (packetDistance <= 0) + return []; + + const ret: RtpPacket[] = []; + + // missed/late bunch of packets + if (packetDistance > this.jitterSize) { + this.console.log('jitter buffer skipped packets:', packetDistance); + const { lastSequenceNumber } = this; + this.lastSequenceNumber = sequenceNumber - this.jitterSize; + // use the previous sequence number to flush any packets that are too old compared + // to the new sequence number. + this.flushPending(lastSequenceNumber, ret); + } + + this.pending[packet.header.sequenceNumber % this.jitterSize] = packet; + return this.flushPending(this.lastSequenceNumber, ret); + } +}