homekit: wip

This commit is contained in:
Koushik Dutta
2022-05-29 17:17:40 -07:00
parent 58520a8a50
commit 5f6c7c66b4
3 changed files with 138 additions and 65 deletions

View File

@@ -8,6 +8,7 @@ import dgram from 'dgram';
import { AudioStreamingSamplerate } from '../../hap';
import { ntpTime } from './camera-utils';
import { H264Repacketizer } from './h264-packetizer';
import { JitterBuffer } from './jitter-buffer';
import { OpusRepacketizer } from './opus-repacketizer';
export function createCameraStreamSender(console: Console, config: Config, sender: dgram.Socket, ssrc: number, payloadType: number, port: number, targetAddress: string, rtcpInterval: number,
@@ -34,6 +35,7 @@ export function createCameraStreamSender(console: Console, config: Config, sende
let rolloverCount = 0;
let opusPacketizer: OpusRepacketizer;
let h264Packetizer: H264Repacketizer;
let h264JitterBuffer: JitterBuffer;
let analyzeVideo = true;
let audioIntervalScale = 1;
@@ -53,6 +55,7 @@ export function createCameraStreamSender(console: Console, config: Config, sende
// adjust packet size for the rtp packet header (12).
const adjustedMtu = videoOptions.maxPacketSize - 12;
h264Packetizer = new H264Repacketizer(console, adjustedMtu, videoOptions);
h264JitterBuffer = new JitterBuffer(console, 4);
sender.setSendBufferSize(1024 * 1024);
}
@@ -144,16 +147,18 @@ export function createCameraStreamSender(console: Console, config: Config, sende
return;
}
const packets = h264Packetizer.repacketize(rtp);
if (!packets)
return;
for (const packet of packets) {
if (analyzeVideo) {
const naluTypes = getNaluTypesInNalu(packet.payload, true);
console.log('scanning for idr start found:', ...[...naluTypes]);
analyzeVideo = !naluTypes.has(H264_NAL_TYPE_IDR);
for (const dejittered of h264JitterBuffer.queue(rtp)) {
const packets = h264Packetizer.repacketize(dejittered);
if (!packets?.length)
continue;
for (const packet of packets) {
if (analyzeVideo) {
const naluTypes = getNaluTypesInNalu(packet.payload, true);
console.log('scanning for idr start found:', ...[...naluTypes]);
analyzeVideo = !naluTypes.has(H264_NAL_TYPE_IDR);
}
sendPacket(packet);
}
sendPacket(packet);
}
}

View File

@@ -1,4 +1,5 @@
import type { RtpPacket } from "@koush/werift-src/packages/rtp/src/rtp/rtp";
import { isNextSequenceNumber } from "./jitter-buffer";
// https://yumichan.net/video-processing/video-compression/introduction-to-h264-nal-unit/
export const NAL_TYPE_STAP_A = 24;
@@ -63,10 +64,9 @@ function splitBitstream(data: Buffer) {
export class H264Repacketizer {
extraPackets = 0;
fuaMax: number;
pendingStapA: RtpPacket[];
pendingFuA: RtpPacket[];
pendingFuASeenStart = false;
seenSps = false;
seenStapASps = false;
constructor(public console: Console, public maxPacketSize: number, public codecInfo: {
sps: Buffer,
@@ -76,6 +76,25 @@ export class H264Repacketizer {
this.fuaMax = maxPacketSize - FU_A_HEADER_SIZE;;
}
ensureCodecInfo() {
if (!this.codecInfo) {
this.codecInfo = {
sps: undefined,
pps: undefined,
};
}
}
updateSps(sps: Buffer) {
this.ensureCodecInfo();
this.codecInfo.sps = sps;
}
updatePps(pps: Buffer) {
this.ensureCodecInfo();
this.codecInfo.pps = pps;
}
shouldFilter(nalType: number) {
// currently nothing is filtered, but it seems that some SEI packets cause issues
// and should be ignored, while others show up in the stap-a sps/pps packet
@@ -203,28 +222,6 @@ export class H264Repacketizer {
return ret;
}
flushPendingStapA(ret: RtpPacket[]) {
if (!this.pendingStapA)
return;
const first = this.pendingStapA[0];
const hadMarker = first.header.marker;
const aggregates = this.packetizeStapA(this.pendingStapA.map(packet => packet.payload));
if (aggregates.length !== 1) {
this.console.error('expected only 1 packet for sps/pps stapa');
this.pendingStapA = undefined;
return;
}
aggregates.forEach((packetized, index) => {
const marker = hadMarker && index === aggregates.length - 1;
ret.push(this.createPacket(first, packetized, marker));
});
this.extraPackets -= this.pendingStapA.length - 1;
this.pendingStapA = undefined;
}
flushPendingFuA(ret: RtpPacket[], allowRecoverableErrors?: boolean) {
if (!this.pendingFuA)
return;
@@ -267,6 +264,10 @@ export class H264Repacketizer {
let lastSequenceNumber: number;
for (const packet of this.pendingFuA) {
if (lastSequenceNumber !== undefined && !isNextSequenceNumber(lastSequenceNumber, packet.header.sequenceNumber)) {
}
const nalType = packet.payload[1] & 0x1f;
if (nalType !== originalNalType) {
this.console.error('unexpected nal type mismatch. skipping refragmentation.', originalNalType, nalType);
@@ -296,22 +297,15 @@ export class H264Repacketizer {
const defragmented = Buffer.concat(originalFragments);
if (originalNalType === NAL_TYPE_SPS) {
if (!this.codecInfo) {
this.codecInfo = {
sps: undefined,
pps: undefined,
};
}
const splits = splitBitstream(defragmented);
while (splits.length) {
const split = splits.shift();
const splitNaluType = split[0] & 0x1f;
if (splitNaluType === NAL_TYPE_SPS) {
this.codecInfo.sps = split;
this.updateSps(split);
}
else if (splitNaluType === NAL_TYPE_PPS) {
this.codecInfo.pps = split;
this.updatePps(split);
}
else {
if (splitNaluType === NAL_TYPE_IDR)
@@ -391,7 +385,6 @@ export class H264Repacketizer {
// empty packets are apparently valid from webrtc. filter those out.
if (!packet.payload.length) {
this.flushPendingFuA(ret);
this.flushPendingStapA(ret);
this.extraPackets--;
return ret;
}
@@ -403,15 +396,7 @@ export class H264Repacketizer {
this.flushPendingFuA(ret);
}
// stapa packets must share the same timestamp
if (this.pendingStapA && this.pendingStapA[0].header.timestamp !== packet.header.timestamp) {
this.flushPendingStapA(ret);
}
if (nalType === NAL_TYPE_FU_A) {
// fua may share a timestamp as stapa, but don't aggregated with stapa
this.flushPendingStapA(ret);
const data = packet.payload;
const originalNalType = data[1] & 0x1f;
@@ -424,7 +409,7 @@ export class H264Repacketizer {
const isFuEnd = !!(packet.payload[1] & 0x40);
// if this is an idr frame, but no sps has been sent, dummy one up.
// the stream may not contain sps.
if (originalNalType === NAL_TYPE_IDR && isFuStart && !this.seenSps) {
if (originalNalType === NAL_TYPE_IDR && isFuStart && !this.seenStapASps) {
this.maybeSendSpsPps(packet, ret);
}
@@ -446,8 +431,6 @@ export class H264Repacketizer {
if (this.pendingFuA) {
this.pendingFuA.push(packet);
this.pendingFuA = this.pendingFuA.sort((a, b) => a.header.sequenceNumber - b.header.sequenceNumber);
if (isFuEnd) {
this.flushPendingFuA(ret);
}
@@ -477,7 +460,7 @@ export class H264Repacketizer {
const depacketized = depacketizeStapA(packet.payload)
.filter(payload => {
const nalType = payload[0] & 0x1F;
this.seenSps = this.seenSps || (nalType === NAL_TYPE_SPS);
this.seenStapASps = this.seenStapASps || (nalType === NAL_TYPE_SPS);
if (this.shouldFilter(nalType)) {
return false;
}
@@ -494,28 +477,26 @@ export class H264Repacketizer {
this.flushPendingFuA(ret);
if (this.shouldFilter(nalType)) {
this.flushPendingStapA(ret);
this.extraPackets--;
return ret;
}
// codec information should be aggregated. usually around 50 bytes total.
if (nalType === NAL_TYPE_SPS || nalType === NAL_TYPE_PPS) {
this.seenSps = this.seenSps || (nalType === NAL_TYPE_SPS);
if (!this.pendingStapA)
this.pendingStapA = [];
this.pendingStapA.push(packet);
// codec information should be aggregated into a stapa. usually around 50 bytes total.
if (nalType === NAL_TYPE_SPS) {
this.updateSps(packet.payload);
return ret;
}
else if (nalType === NAL_TYPE_PPS) {
this.updatePps(packet.payload);
return ret;
}
this.flushPendingStapA(ret);
if (this.shouldFilter(nalType)) {
this.extraPackets--;
return ret;
}
if (nalType === NAL_TYPE_IDR && !this.seenSps) {
if (nalType === NAL_TYPE_IDR && !this.seenStapASps) {
// if this is an idr frame, but no sps has been sent, dummy one up.
// the stream may not contain sps.
this.maybeSendSpsPps(packet, ret);

View File

@@ -0,0 +1,87 @@
import type { RtpPacket } from "@koush/werift-src/packages/rtp/src/rtp/rtp";
export function sequenceNumberDistance(s1: number, s2: number): number {
if (s2 === s1)
return 0;
const distance = s2 - s1;
const rolloverDistance = s2 + 0xFFFF - s1;
if (Math.abs(distance) < Math.abs(rolloverDistance))
return distance;
return rolloverDistance;
}
export function nextSequenceNumber(current: number) {
return (current + 1) % 0x10000;
}
export function isNextSequenceNumber(current: number, next: number) {
return nextSequenceNumber(current) === next;
}
export class JitterBuffer {
lastSequenceNumber: number;
pending: RtpPacket[] = [];
constructor(public console: Console, public jitterSize: number, ) {
}
flushPending(afterSequenceNumber: number, ret: RtpPacket[]): RtpPacket[] {
if (!this.pending)
return ret;
const start = nextSequenceNumber(afterSequenceNumber);
for (let i = 0; i < this.jitterSize; i++) {
const index = (start + i) % this.jitterSize;
const packet = this.pending[index];
if (!packet)
continue;
const { sequenceNumber } = packet.header;
const sd = sequenceNumberDistance(this.lastSequenceNumber, sequenceNumber);
// packet needs to be purged from the the buffer for being too old.
if (sd <= 0) {
this.console.log('jitter buffer purged packet:', sequenceNumber);
this.pending[index] = undefined;
ret.push(packet);
}
else if (sd === 1) {
this.pending[index] = undefined;
this.lastSequenceNumber = sequenceNumber;
ret.push(packet);
}
else {
// can't do anything with this packet yet.
}
}
return ret;
}
queue(packet: RtpPacket): RtpPacket[] {
if (this.lastSequenceNumber === undefined || isNextSequenceNumber(this.lastSequenceNumber, packet.header.sequenceNumber)) {
this.lastSequenceNumber = packet.header.sequenceNumber;
return this.flushPending(this.lastSequenceNumber, [packet]);
}
const { sequenceNumber } = packet.header;
const packetDistance = sequenceNumberDistance(this.lastSequenceNumber, sequenceNumber);
// late/duplicate packet
if (packetDistance <= 0)
return [];
const ret: RtpPacket[] = [];
// missed/late bunch of packets
if (packetDistance > this.jitterSize) {
this.console.log('jitter buffer skipped packets:', packetDistance);
const { lastSequenceNumber } = this;
this.lastSequenceNumber = sequenceNumber - this.jitterSize;
// use the previous sequence number to flush any packets that are too old compared
// to the new sequence number.
this.flushPending(lastSequenceNumber, ret);
}
this.pending[packet.header.sequenceNumber % this.jitterSize] = packet;
return this.flushPending(this.lastSequenceNumber, ret);
}
}