webrtc: fix peer connection not respecting configuration. add startSession to allow the remote to begin sending packets on connection success.

This commit is contained in:
Koushik Dutta
2022-05-26 20:39:02 -07:00
parent 7c051d2975
commit 3acc9bd0bc
5 changed files with 96 additions and 53 deletions

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/webrtc",
"version": "0.0.35",
"version": "0.0.36",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/webrtc",
"version": "0.0.35",
"version": "0.0.36",
"dependencies": {
"@koush/werift": "file:../../external/werift/packages/webrtc",
"@scrypted/common": "file:../../common",

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/webrtc",
"version": "0.0.35",
"version": "0.0.36",
"scripts": {
"prepublishOnly": "NODE_ENV=production scrypted-webpack",
"prescrypted-vscode-launch": "scrypted-webpack",

View File

@@ -11,4 +11,6 @@ export class ScryptedSessionControl implements RTCSessionControl {
async endSession() {
await this.cleanup();
}
async startSession() {
}
}

View File

@@ -1,4 +1,4 @@
import { RTCPeerConnection, RTCSessionDescription } from "@koush/werift";
import { RTCIceServer, RTCPeerConnection, RTCSessionDescription } from "@koush/werift";
export function createRawResponse(response: RTCSessionDescription): RTCSessionDescriptionInit {
return {
@@ -18,3 +18,25 @@ export function isPeerConnectionAlive(pc: RTCPeerConnection) {
return false;
return true;
}
export function getWeriftIceServers(configuration: RTCConfiguration): RTCIceServer[] {
if (!configuration?.iceServers)
return;
const ret: RTCIceServer[] = [];
for (const ice of configuration.iceServers) {
if (typeof ice.urls === 'string') {
ret.push({
...ice as RTCIceServer,
});
}
else {
for (const url of ice.urls) {
ret.push(Object.assign({}, ice, {
urls: url,
}));
}
}
}
return ret;
}

View File

@@ -1,16 +1,14 @@
import { Output, Pipeline, RTCPeerConnection, RtcpPacket, RtcpPayloadSpecificFeedback, RTCRtpCodecParameters, RTCRtpTransceiver, RTCSessionDescription, RtpPacket, uint16Add } from "@koush/werift";
import { BundlePolicy, Output, Pipeline, RTCPeerConnection, RtcpPacket, RtcpPayloadSpecificFeedback, RTCRtpTransceiver, RTCSessionDescription, RtpPacket, uint16Add } from "@koush/werift";
import { FullIntraRequest } from "@koush/werift/lib/rtp/src/rtcp/psfb/fullIntraRequest";
import { listenZeroSingleClient } from "@scrypted/common/src/listen-cluster";
import { safeKillFFmpeg } from "@scrypted/common/src/media-helpers";
import { RtspServer } from "@scrypted/common/src/rtsp-server";
import { createSdpInput, parseSdp } from '@scrypted/common/src/sdp-utils';
import sdk, { FFmpegInput, Intercom, MediaObject, MediaStreamUrl, ResponseMediaStreamOptions, RTCAVSignalingSetup, RTCSessionControl, RTCSignalingChannel, RTCSignalingOptions, RTCSignalingSendIceCandidate, RTCSignalingSession, ScryptedMimeTypes } from "@scrypted/sdk";
import { ChildProcess } from "child_process";
import dgram from 'dgram';
import { Socket } from "net";
import { getFFmpegRtpAudioOutputArguments, startRtpForwarderProcess } from "./rtp-forwarders";
import { requiredAudioCodecs, requiredVideoCodec } from "./webrtc-required-codecs";
import { createRawResponse, isPeerConnectionAlive } from "./werift-util";
import { createRawResponse, getWeriftIceServers, isPeerConnectionAlive } from "./werift-util";
const { mediaManager } = sdk;
@@ -33,7 +31,6 @@ export async function createRTCPeerConnectionSource(options: {
const { clientPromise, port } = await listenZeroSingleClient();
let pictureLossInterval: NodeJS.Timeout;
let pc: RTCPeerConnection;
let socket: Socket;
// rtsp server must operate in udp forwarding mode to accomodate packet reordering.
let udp = dgram.createSocket('udp4');
@@ -41,7 +38,7 @@ export async function createRTCPeerConnectionSource(options: {
const cleanup = () => {
console.log('webrtc/rtsp cleaning up');
pc?.close();
pcPromise.then(pc => pc.close());
socket?.destroy();
clearInterval(pictureLossInterval);
try {
@@ -52,43 +49,60 @@ export async function createRTCPeerConnectionSource(options: {
sessionControl?.endSession().catch(() => { });
};
const pcPromise = clientPromise.then(async (client) => {
socket = client;
clientPromise.then(socket => {
socket.on('close', cleanup);
socket.on('error', cleanup);
});
const pcPromise = new Promise<RTCPeerConnection>(async (resolve, reject) => {
socket = await clientPromise;
const rtspServer = new RtspServer(socket, undefined, udp);
// rtspServer.console = console;
const pc = new RTCPeerConnection({
codecs: {
audio: [
...requiredAudioCodecs,
],
video: [
requiredVideoCodec,
],
}
});
let pc: RTCPeerConnection;
socket.on('close', cleanup);
socket.on('error', cleanup);
pc.iceGatheringStateChange.subscribe(() => {
console.log('iceGatheringStateChange', pc.iceGatheringState);
});
pc.iceConnectionStateChange.subscribe(() => {
console.log('iceConnectionStateChange', pc.connectionState, pc.iceConnectionState);
if (pc.iceConnectionState === 'disconnected'
|| pc.iceConnectionState === 'failed'
|| pc.iceConnectionState === 'closed') {
cleanup();
}
});
pc.connectionStateChange.subscribe(() => {
console.log('connectionStateChange', pc.connectionState, pc.iceConnectionState);
if (pc.connectionState === 'closed'
|| pc.connectionState === 'disconnected'
|| pc.connectionState === 'failed') {
cleanup();
}
});
const ensurePeerConnection = (setup: RTCAVSignalingSetup) => {
if (pc)
return;
pc = new RTCPeerConnection({
bundlePolicy: setup.configuration.bundlePolicy as BundlePolicy,
codecs: {
audio: [
...requiredAudioCodecs,
],
video: [
requiredVideoCodec,
],
},
iceServers: getWeriftIceServers(setup.configuration),
});
pc.iceGatheringStateChange.subscribe(() => {
console.log('iceGatheringStateChange', pc.iceGatheringState);
});
pc.iceConnectionStateChange.subscribe(() => {
console.log('iceConnectionStateChange', pc.connectionState, pc.iceConnectionState);
if (pc.iceConnectionState === 'disconnected'
|| pc.iceConnectionState === 'failed'
|| pc.iceConnectionState === 'closed') {
cleanup();
}
});
pc.connectionStateChange.subscribe(() => {
console.log('connectionStateChange', pc.connectionState, pc.iceConnectionState);
if (pc.connectionState === 'closed'
|| pc.connectionState === 'disconnected'
|| pc.connectionState === 'failed') {
cleanup();
}
});
pc.connectionStateChange.watch(state => state === 'connected').then(() => {
console.log('startSession');
sessionControl.startSession().catch(() => {});
});
resolve(pc);
}
let audioTrack: string;
let videoTrack: string;
@@ -97,6 +111,8 @@ export async function createRTCPeerConnectionSource(options: {
const useRtspJitterBuffer = false;
const doSetup = async (setup: RTCAVSignalingSetup) => {
ensurePeerConnection(setup);
let gotAudio = false;
let gotVideo = false;
@@ -209,7 +225,7 @@ export async function createRTCPeerConnectionSource(options: {
const parsedSdp = parseSdp(rtspServer.sdp);
audioTrack = parsedSdp.msections.find(msection => msection.type === 'audio').control;
videoTrack = parsedSdp.msections.find(msection => msection.type === 'video').control;
console.log('sdp sent', rtspServer.sdp);
// console.log('sdp sent', rtspServer.sdp);
if (useUdp) {
rtspServer.setupTracks[videoTrack] = {
@@ -249,21 +265,25 @@ export async function createRTCPeerConnectionSource(options: {
if (state === 'complete')
resolve(undefined);
}));
pc.onicecandidate = ev => {
sendIceCandidate?.({
...ev.candidate,
});
};
if (sendIceCandidate) {
pc.onicecandidate = ev => {
console.log('sendIceCandidate', ev.candidate.sdpMLineIndex, ev.candidate.candidate);
sendIceCandidate({
...ev.candidate,
});
};
}
const handleRawResponse = async (response: RTCSessionDescription): Promise<RTCSessionDescriptionInit> => {
const ret = createRawResponse(response);
console.log('createLocalDescription', ret.sdp)
await handleRtspSetup(ret);
return ret;
}
if (type === 'answer') {
let answer = await pc.createAnswer();
console.log('sdp received', answer.sdp);
const set = pc.setLocalDescription(answer);
if (sendIceCandidate)
return handleRawResponse(answer);
@@ -274,7 +294,6 @@ export async function createRTCPeerConnectionSource(options: {
}
else {
let offer = await pc.createOffer();
// console.log(offer.sdp);
const set = pc.setLocalDescription(offer);
if (sendIceCandidate)
return handleRawResponse(offer);
@@ -285,6 +304,7 @@ export async function createRTCPeerConnectionSource(options: {
}
}
async setRemoteDescription(description: RTCSessionDescriptionInit, setup: RTCAVSignalingSetup): Promise<void> {
console.log('setRemoteDescription', description.sdp)
if (description.type === 'offer')
doSetup(setup);
@@ -292,6 +312,7 @@ export async function createRTCPeerConnectionSource(options: {
await pc.setRemoteDescription(description as any);
}
async addIceCandidate(candidate: RTCIceCandidateInit): Promise<void> {
console.log('addIceCandidate', candidate.sdpMLineIndex, candidate.candidate)
await pc.addIceCandidate(candidate as RTCIceCandidate);
}
@@ -299,8 +320,6 @@ export async function createRTCPeerConnectionSource(options: {
sessionControl = await channel.startRTCSignalingSession(new SignalingSession());
console.log('session setup complete');
return pc;
});
pcPromise.catch(e => {