webrtc: stapa/sei fix. stream start failure fix/logging.

This commit is contained in:
Koushik Dutta
2023-06-28 11:24:26 -07:00
parent 2ffe67b2db
commit 56f127a203
8 changed files with 137 additions and 110 deletions

View File

@@ -1,6 +1,6 @@
import net from 'net';
import { once } from 'events';
import dgram, { SocketType } from 'dgram';
import { once } from 'events';
import net from 'net';
export async function closeQuiet(socket: dgram.Socket | net.Server) {
if (!socket)
@@ -37,6 +37,23 @@ export async function createBindZero(socketType?: SocketType) {
return createBindUdp(0, socketType);
}
export async function createSquentialBindZero(socketType?: SocketType) {
let attempts = 0;
while (true) {
const rtpServer = await createBindZero(socketType);
try {
const rtcpServer = await createBindUdp(rtpServer.port + 1, socketType);
return [rtpServer, rtcpServer];
}
catch (e) {
attempts++;
closeQuiet(rtpServer.server);
}
if (attempts === 10)
throw new Error('unable to reserve sequential udp ports')
}
}
export async function reserveUdpPort() {
const udp = await createBindZero();
await new Promise(resolve => udp.server.close(() => resolve(undefined)));
@@ -62,4 +79,4 @@ export async function bind(server: dgram.Socket, port: number) {
}
}
export { listenZero, listenZeroSingleClient, ListenZeroSingleClientTimeoutError } from "@scrypted/server/src/listen-zero";
export { ListenZeroSingleClientTimeoutError, listenZero, listenZeroSingleClient } from "@scrypted/server/src/listen-zero";

View File

@@ -6,14 +6,14 @@ import { parseHTTPHeadersQuotedKeyValueSet } from 'http-auth-utils/dist/utils';
import net from 'net';
import { Duplex, Readable, Writable } from 'stream';
import tls from 'tls';
import { URL } from 'url';
import { Deferred } from './deferred';
import { closeQuiet, createBindUdp, createBindZero, listenZeroSingleClient } from './listen-cluster';
import { closeQuiet, createBindZero, createSquentialBindZero, listenZeroSingleClient } from './listen-cluster';
import { timeoutPromise } from './promise-utils';
import { readLength, readLine } from './read-stream';
import { MSection, parseSdp } from './sdp-utils';
import { sleep } from './sleep';
import { StreamChunk, StreamParser, StreamParserOptions } from './stream-parser';
import { URL } from 'url';
const REQUIRED_WWW_AUTHENTICATE_KEYS = ['realm', 'nonce'];
@@ -964,8 +964,7 @@ export class RtspServer {
const match = transport.match(/.*?client_port=([0-9]+)-([0-9]+)/);
const [_, rtp, rtcp] = match;
const rtpServer = await createBindZero();
const rtcpServer = await createBindUdp(rtpServer.port + 1);
const [rtpServer, rtcpServer] = await createSquentialBindZero();
this.client.on('close', () => closeQuiet(rtpServer.server));
this.client.on('close', () => closeQuiet(rtcpServer.server));
this.setupTracks[msection.control] = {

View File

@@ -64,15 +64,14 @@ export class H264Repacketizer {
extraPackets = 0;
fuaMax: number;
pendingFuA: RtpPacket[];
// log whether a stapa sps/pps has been seen.
// resets on every idr frame, to trigger codec information
// to be resent.
seenStapASps = false;
// the stapa packet that will be sent before an idr frame.
stapa: RtpPacket;
fuaMin: number;
constructor(public console: Console, public maxPacketSize: number, public codecInfo: {
sps: Buffer,
pps: Buffer,
sei?: Buffer,
}, public jitterBuffer = new JitterBuffer(console, 4)) {
// 12 is the rtp/srtp header size.
this.fuaMax = maxPacketSize - FU_A_HEADER_SIZE;
@@ -98,6 +97,11 @@ export class H264Repacketizer {
this.codecInfo.pps = pps;
}
updateSei(sei: Buffer) {
this.ensureCodecInfo();
this.codecInfo.sei = sei;
}
shouldFilter(nalType: number) {
// currently nothing is filtered, but it seems that some SEI packets cause issues
// and should be ignored, while others show up in the stap-a sps/pps packet
@@ -266,7 +270,7 @@ export class H264Repacketizer {
}
else {
if (splitNaluType === NAL_TYPE_IDR)
this.maybeSendSpsPps(first, ret);
this.maybeSendStapACodecInfo(first, ret);
this.fragment(first, ret, {
payload: split,
@@ -319,11 +323,21 @@ export class H264Repacketizer {
});
}
maybeSendSpsPps(packet: RtpPacket, ret: RtpPacket[]) {
maybeSendStapACodecInfo(packet: RtpPacket, ret: RtpPacket[]) {
if (this.stapa) {
const newStapa = this.createPacket(packet, this.stapa.payload, this.stapa.header.marker);
this.extraPackets++;
ret.push(newStapa);
return;
}
if (!this.codecInfo?.sps || !this.codecInfo?.pps)
return;
const aggregates = this.packetizeStapA([this.codecInfo.sps, this.codecInfo.pps]);
const agg = [this.codecInfo.sps, this.codecInfo.pps];
if (this.codecInfo?.sei)
agg.push(this.codecInfo.sei);
const aggregates = this.packetizeStapA(agg);
if (aggregates.length !== 1) {
this.console.error('expected only 1 packet for sps/pps stapa');
return;
@@ -406,9 +420,7 @@ export class H264Repacketizer {
// the stream may not contain codec information in stapa or may be sending it
// in separate sps/pps packets which is not supported by homekit.
if (originalNalType === NAL_TYPE_IDR) {
if (!this.seenStapASps)
this.maybeSendSpsPps(packet, ret);
this.seenStapASps = false;
this.maybeSendStapACodecInfo(packet, ret);
}
}
@@ -451,26 +463,22 @@ export class H264Repacketizer {
else if (nalType === NAL_TYPE_STAP_A) {
this.flushPendingFuA(ret);
this.stapa = packet;
this.extraPackets--;
// break the aggregated packet up and send it.
const depacketized = depacketizeStapA(packet.payload)
.filter(payload => {
depacketizeStapA(packet.payload)
.forEach(payload => {
const nalType = payload[0] & 0x1F;
this.seenStapASps = this.seenStapASps || (nalType === NAL_TYPE_SPS);
if (this.shouldFilter(nalType)) {
return false;
}
if (nalType === NAL_TYPE_SPS)
this.updateSps(payload);
if (nalType === NAL_TYPE_PPS)
else if (nalType === NAL_TYPE_PPS)
this.updatePps(payload);
return true;
else if (nalType === NAL_TYPE_SEI)
this.updateSei(payload);
else
this.console.warn('Skipped a stapa type. Please report this to @koush on Discord.', nalType)
});
if (depacketized.length === 0) {
this.extraPackets--;
return;
}
const aggregates = this.packetizeStapA(depacketized);
this.createRtpPackets(packet, aggregates, ret);
}
else if (nalType >= 1 && nalType < 24) {
this.flushPendingFuA(ret);
@@ -491,6 +499,11 @@ export class H264Repacketizer {
this.updatePps(packet.payload);
return;
}
else if (nalType === NAL_TYPE_SEI) {
this.extraPackets--;
this.updateSei(packet.payload);
return;
}
if (this.shouldFilter(nalType)) {
this.extraPackets--;
@@ -500,9 +513,7 @@ export class H264Repacketizer {
if (nalType === NAL_TYPE_IDR) {
// if this is an idr frame, but no sps has been sent, dummy one up.
// the stream may not contain sps.
if (!this.seenStapASps)
this.maybeSendSpsPps(packet, ret);
this.seenStapASps = false;
this.maybeSendStapACodecInfo(packet, ret);
}
this.fragment(packet, ret);

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/webrtc",
"version": "0.1.50",
"version": "0.1.51",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/webrtc",
"version": "0.1.50",
"version": "0.1.51",
"dependencies": {
"@scrypted/common": "file:../../common",
"@scrypted/sdk": "file:../../sdk",

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/webrtc",
"version": "0.1.50",
"version": "0.1.51",
"scripts": {
"scrypted-setup-project": "scrypted-setup-project",
"prescrypted-setup-project": "scrypted-package-json",

View File

@@ -533,18 +533,24 @@ export class WebRTCConnectionManagement implements RTCConnectionManagement {
const ret = new WebRTCTrack(this, videoTransceiver, audioTransceiver, intercom);
this.negotiation.then(async () => {
this.console.log('waiting ice connected');
if (this.pc.remoteIsBundled)
await waitConnected(this.pc);
else
await waitIceConnected(this.pc);
if (ret.removed.finished)
return;
this.console.log('done waiting ice connected');
const f = await createTrackForwarder(videoTransceiver, audioTransceiver);
ret.attachForwarder(f);
waitClosed(this.pc).finally(() => f?.kill());
ret.removed.promise.finally(() => f?.kill());
try {
this.console.log('waiting ice connected');
if (this.pc.remoteIsBundled)
await waitConnected(this.pc);
else
await waitIceConnected(this.pc);
if (ret.removed.finished)
return;
this.console.log('done waiting ice connected');
const f = await createTrackForwarder(videoTransceiver, audioTransceiver);
ret.attachForwarder(f);
waitClosed(this.pc).finally(() => f?.kill());
ret.removed.promise.finally(() => f?.kill());
}
catch (e) {
this.console.error('Error starting playback for WebRTC track.', e);
// todo: report this to the client somehow.
}
});
return ret;

View File

@@ -3,17 +3,17 @@ import { Deferred } from '@scrypted/common/src/deferred';
import { listenZeroSingleClient } from '@scrypted/common/src/listen-cluster';
import { createBrowserSignalingSession } from "@scrypted/common/src/rtc-connect";
import { SettingsMixinDeviceBase, SettingsMixinDeviceOptions } from '@scrypted/common/src/settings-mixin';
import sdk, { BufferConverter, ConnectOptions, DeviceCreator, DeviceCreatorSettings, DeviceProvider, FFmpegInput, HttpRequest, Intercom, MediaObject, MediaObjectOptions, MixinProvider, RequestMediaStream, RequestMediaStreamOptions, ResponseMediaStreamOptions, RTCSessionControl, RTCSignalingChannel, RTCSignalingClient, RTCSignalingOptions, RTCSignalingSession, ScryptedDeviceBase, ScryptedDeviceType, ScryptedInterface, ScryptedMimeTypes, Setting, Settings, SettingValue, VideoCamera } from '@scrypted/sdk';
import sdk, { BufferConverter, ConnectOptions, DeviceCreator, DeviceCreatorSettings, DeviceProvider, FFmpegInput, HttpRequest, Intercom, MediaObject, MediaObjectOptions, MixinProvider, RTCSessionControl, RTCSignalingChannel, RTCSignalingClient, RTCSignalingOptions, RTCSignalingSession, RequestMediaStream, RequestMediaStreamOptions, ResponseMediaStreamOptions, ScryptedDeviceBase, ScryptedDeviceType, ScryptedInterface, ScryptedMimeTypes, Setting, SettingValue, Settings, VideoCamera } from '@scrypted/sdk';
import { StorageSettings } from '@scrypted/sdk/storage-settings';
import crypto from 'crypto';
import ip from 'ip';
import net from 'net';
import { DataChannelDebouncer } from './datachannel-debouncer';
import { createRTCPeerConnectionSink, createTrackForwarder, RTC_BRIDGE_NATIVE_ID, WebRTCConnectionManagement } from "./ffmpeg-to-wrtc";
import { RTC_BRIDGE_NATIVE_ID, WebRTCConnectionManagement, createRTCPeerConnectionSink, createTrackForwarder } from "./ffmpeg-to-wrtc";
import { stunServer, turnServer, weriftStunServer, weriftTurnServer } from './ice-servers';
import { waitClosed } from './peerconnection-util';
import { WebRTCCamera } from "./webrtc-camera";
import { defaultPeerConfig, InterfaceAddresses, MediaStreamTrack, PeerConfig, RTCPeerConnection } from './werift';
import { InterfaceAddresses, MediaStreamTrack, PeerConfig, RTCPeerConnection, defaultPeerConfig } from './werift';
import { WeriftSignalingSession } from './werift-signaling-session';
import { createRTCPeerConnectionSource, getRTCMediaStreamOptions } from './wrtc-to-rtsp';
import { createZygote } from './zygote';
@@ -451,55 +451,56 @@ export class WebRTCPlugin extends AutoenableMixinProvider implements DeviceCreat
async onConnection(request: HttpRequest, webSocketUrl: string) {
const cleanup = new Deferred<string>();
cleanup.promise.catch(e => this.console.log('cleaning up rtc connection:', e.message));
const ws = new WebSocket(webSocketUrl);
cleanup.promise.finally(() => ws.close());
if (request.isPublicEndpoint) {
ws.close();
return;
}
const client = await listenZeroSingleClient();
cleanup.promise.finally(() => {
client.clientPromise.then(cp => cp.destroy());
});
const message = await new Promise<{
connectionManagementId: string,
updateSessionId: string,
} & ConnectOptions>((resolve, reject) => {
const close = () => {
const str = 'Connection closed while waiting for message';
reject(new Error(str));
cleanup.resolve(str);
};
ws.addEventListener('close', close);
ws.onmessage = message => {
ws.removeEventListener('close', close);
resolve(JSON.parse(message.data));
}
});
message.username = request.username;
const { connectionManagementId, updateSessionId } = message;
if (connectionManagementId) {
cleanup.promise.finally(async () => {
const plugins = await systemManager.getComponent('plugins');
plugins.setHostParam('@scrypted/webrtc', connectionManagementId);
});
}
if (updateSessionId) {
cleanup.promise.finally(async () => {
const plugins = await systemManager.getComponent('plugins');
plugins.setHostParam('@scrypted/webrtc', updateSessionId);
});
}
cleanup.promise.then(e => this.console.log('cleaning up rtc connection:', e));
try {
const ws = new WebSocket(webSocketUrl);
cleanup.promise.finally(() => ws.close());
if (request.isPublicEndpoint) {
cleanup.resolve('public endpoint not supported');
return;
}
const client = await listenZeroSingleClient();
cleanup.promise.finally(() => {
client.cancel();
client.clientPromise.then(cp => cp.destroy()).catch(() => {});
});
const message = await new Promise<{
connectionManagementId: string,
updateSessionId: string,
} & ConnectOptions>((resolve, reject) => {
const close = () => {
const str = 'Connection closed while waiting for message';
reject(new Error(str));
cleanup.resolve(str);
};
ws.addEventListener('close', close);
ws.onmessage = message => {
ws.removeEventListener('close', close);
resolve(JSON.parse(message.data));
}
});
message.username = request.username;
const { connectionManagementId, updateSessionId } = message;
if (connectionManagementId) {
cleanup.promise.finally(async () => {
const plugins = await systemManager.getComponent('plugins');
plugins.setHostParam('@scrypted/webrtc', connectionManagementId);
});
}
if (updateSessionId) {
cleanup.promise.finally(async () => {
const plugins = await systemManager.getComponent('plugins');
plugins.setHostParam('@scrypted/webrtc', updateSessionId);
});
}
const session = await createBrowserSignalingSession(ws, '@scrypted/webrtc', 'remote');
const clientOptions = await session.getOptions();
@@ -526,17 +527,11 @@ export class WebRTCPlugin extends AutoenableMixinProvider implements DeviceCreat
const cp = await client.clientPromise;
cp.on('close', () => cleanup.resolve('socket client closed'));
// TODO: remove process.send hack
// 12/16/2022
if (sdk.connect)
sdk.connect(cp, message);
else
process.send(message, cp);
sdk.connect(cp, message);
}
catch (e) {
console.error("error negotiating browser RTCC signaling", e);
cleanup.resolve('error');
throw e;
}
}
}

View File

@@ -1,14 +1,13 @@
import { RtpPacket } from "../../../external/werift/packages/rtp/src/rtp/rtp";
import { Deferred } from "@scrypted/common/src/deferred";
import { closeQuiet, createBindZero, listenZeroSingleClient, reserveUdpPort } from "@scrypted/common/src/listen-cluster";
import { closeQuiet, createBindZero, listenZeroSingleClient } from "@scrypted/common/src/listen-cluster";
import { ffmpegLogInitialOutput, safeKillFFmpeg, safePrintFFmpegArguments } from "@scrypted/common/src/media-helpers";
import { RtspClient, RtspServer, RtspServerResponse, RtspStatusError } from "@scrypted/common/src/rtsp-server";
import { addTrackControls, MSection, parseSdp, replaceSectionPort } from "@scrypted/common/src/sdp-utils";
import { MSection, addTrackControls, parseSdp, replaceSectionPort } from "@scrypted/common/src/sdp-utils";
import sdk, { FFmpegInput } from "@scrypted/sdk";
import child_process, { ChildProcess } from 'child_process';
import dgram from 'dgram';
import { Socket } from "net";
import { Writable } from "stream";
import { RtpPacket } from "../../../external/werift/packages/rtp/src/rtp/rtp";
const { mediaManager } = sdk;