From ef037a76fb176c69a0455faa246a8515daab5634 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Tue, 13 Dec 2022 20:21:04 -0800 Subject: [PATCH] webrtc: fix further race conditions, threadify non-api path. --- plugins/webrtc/package-lock.json | 4 +- plugins/webrtc/package.json | 2 +- plugins/webrtc/src/ffmpeg-to-wrtc.ts | 32 ++------- plugins/webrtc/src/main.ts | 86 ++++++++++++++++++----- plugins/webrtc/src/peerconnection-util.ts | 8 +-- 5 files changed, 83 insertions(+), 49 deletions(-) diff --git a/plugins/webrtc/package-lock.json b/plugins/webrtc/package-lock.json index 997b3b913..d96df8182 100644 --- a/plugins/webrtc/package-lock.json +++ b/plugins/webrtc/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/webrtc", - "version": "0.0.111", + "version": "0.0.112", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/webrtc", - "version": "0.0.111", + "version": "0.0.112", "dependencies": { "@koush/werift": "file:../../external/werift/packages/webrtc", "@scrypted/common": "file:../../common", diff --git a/plugins/webrtc/package.json b/plugins/webrtc/package.json index 03c270d7a..df3e6782d 100644 --- a/plugins/webrtc/package.json +++ b/plugins/webrtc/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/webrtc", - "version": "0.0.111", + "version": "0.0.112", "scripts": { "scrypted-setup-project": "scrypted-setup-project", "prescrypted-setup-project": "scrypted-package-json", diff --git a/plugins/webrtc/src/ffmpeg-to-wrtc.ts b/plugins/webrtc/src/ffmpeg-to-wrtc.ts index b58285d52..884cfc913 100644 --- a/plugins/webrtc/src/ffmpeg-to-wrtc.ts +++ b/plugins/webrtc/src/ffmpeg-to-wrtc.ts @@ -372,6 +372,9 @@ export class WebRTCConnectionManagement implements RTCConnectionManagement { this.weriftSignalingSession = new WeriftSignalingSession(console, this.pc); } + async probe() { + } + async createTracks(mediaObject: MediaObject, intercomId?: string) { let requestMediaStream: RequestMediaStream; @@ -452,7 +455,7 @@ export class WebRTCConnectionManagement implements RTCConnectionManagement { const { atrack, vtrack, createTrackForwarder, intercom } = await this.createTracks(mediaObject, options?.intercomId); const videoTransceiver = this.pc.addTransceiver(vtrack, { - direction: 'sendonly' , + direction: 'sendonly', }); videoTransceiver.mid = options?.videoMid; @@ -492,31 +495,10 @@ export class WebRTCConnectionManagement implements RTCConnectionManagement { async waitClosed() { await waitClosed(this.pc); } -} -export class WebRTCBridge extends ScryptedDeviceBase implements BufferConverter { - constructor(public plugin: WebRTCPlugin, nativeId: string) { - super(nativeId); - - this.fromMimeType = ScryptedMimeTypes.RTCSignalingSession; - this.toMimeType = ScryptedMimeTypes.RTCConnectionManagement; - } - - async convert(data: any, fromMimeType: string, toMimeType: string, options?: BufferConvertorOptions): Promise { - const session = data as RTCSignalingSession; - const maximumCompatibilityMode = !!this.plugin.storageSettings.values.maximumCompatibilityMode; - const { transcodeWidth, sessionSupportsH264High } = parseOptions(await session.getOptions()); - - const console = sdk.deviceManager.getMixinConsole(options?.sourceId, this.nativeId); - const ret = new WebRTCConnectionManagement(console, session, maximumCompatibilityMode, transcodeWidth, sessionSupportsH264High, { - configuration: this.plugin.getRTCConfiguration(), - weriftConfiguration: this.plugin.getWeriftConfiguration(), - }); - // todo: move this into api, provide a client stream. - ret.pc.createDataChannel('dummy'); - const offer = await ret.pc.createOffer(); - ret.pc.setLocalDescription(offer); - return ret; + async waitConnected() { + await waitIceConnected(this.pc); + await waitConnected(this.pc); } } diff --git a/plugins/webrtc/src/main.ts b/plugins/webrtc/src/main.ts index 648105c8a..ba93ea6d2 100644 --- a/plugins/webrtc/src/main.ts +++ b/plugins/webrtc/src/main.ts @@ -4,12 +4,12 @@ import { Deferred } from '@scrypted/common/src/deferred'; import { listenZeroSingleClient } from '@scrypted/common/src/listen-cluster'; import { createBrowserSignalingSession } from "@scrypted/common/src/rtc-connect"; import { SettingsMixinDeviceBase, SettingsMixinDeviceOptions } from '@scrypted/common/src/settings-mixin'; -import sdk, { BufferConverter, BufferConvertorOptions, DeviceCreator, DeviceCreatorSettings, DeviceProvider, FFmpegInput, HttpRequest, Intercom, MediaObject, MixinProvider, RequestMediaStream, RequestMediaStreamOptions, ResponseMediaStreamOptions, RTCSessionControl, RTCSignalingChannel, RTCSignalingClient, RTCSignalingSession, ScryptedDeviceType, ScryptedInterface, ScryptedMimeTypes, Setting, Settings, SettingValue, VideoCamera } from '@scrypted/sdk'; +import sdk, { BufferConverter, BufferConvertorOptions, DeviceCreator, DeviceCreatorSettings, DeviceProvider, FFmpegInput, HttpRequest, Intercom, MediaObject, MixinProvider, RequestMediaStream, RequestMediaStreamOptions, ResponseMediaStreamOptions, RTCSessionControl, RTCSignalingChannel, RTCSignalingClient, RTCSignalingSession, ScryptedDeviceBase, ScryptedDeviceType, ScryptedInterface, ScryptedMimeTypes, Setting, Settings, SettingValue, VideoCamera } from '@scrypted/sdk'; import { StorageSettings } from '@scrypted/sdk/storage-settings'; import crypto from 'crypto'; import net from 'net'; import { DataChannelDebouncer } from './datachannel-debouncer'; -import { createRTCPeerConnectionSink, parseOptions, RTC_BRIDGE_NATIVE_ID, WebRTCBridge, WebRTCConnectionManagement } from "./ffmpeg-to-wrtc"; +import { createRTCPeerConnectionSink, parseOptions, RTC_BRIDGE_NATIVE_ID, WebRTCConnectionManagement } from "./ffmpeg-to-wrtc"; import { stunServer, turnServer } from './ice-servers'; import { waitClosed } from './peerconnection-util'; import { WebRTCCamera } from "./webrtc-camera"; @@ -489,25 +489,77 @@ export async function fork() { } } - const socket = net.connect(port, '127.0.0.1'); - cleanup.promise.finally(() => socket.destroy()); - - const dc = pc.createDataChannel('rpc'); - dc.message.subscribe(message => socket.write(message)); - - const debouncer = new DataChannelDebouncer({ - send: u8 => dc.send(Buffer.from(u8)), - }, e => { - this.console.error('datachannel send error', e); - socket.destroy(); - }); - socket.on('data', data => debouncer.send(data)); - socket.on('close', () => cleanup.resolve('socket closed')); - socket.on('error', () => cleanup.resolve('socket error')); + if (port) { + const socket = net.connect(port, '127.0.0.1'); + cleanup.promise.finally(() => socket.destroy()); + + const dc = pc.createDataChannel('rpc'); + dc.message.subscribe(message => socket.write(message)); + + const debouncer = new DataChannelDebouncer({ + send: u8 => dc.send(Buffer.from(u8)), + }, e => { + this.console.error('datachannel send error', e); + socket.destroy(); + }); + socket.on('data', data => debouncer.send(data)); + socket.on('close', () => cleanup.resolve('socket closed')); + socket.on('error', () => cleanup.resolve('socket error')); + } + else { + pc.createDataChannel('dummy'); + const offer = await pc.createOffer(); + pc.setLocalDescription(offer); + } return connection; } } } +class WebRTCBridge extends ScryptedDeviceBase implements BufferConverter { + constructor(public plugin: WebRTCPlugin, nativeId: string) { + super(nativeId); + + this.fromMimeType = ScryptedMimeTypes.RTCSignalingSession; + this.toMimeType = ScryptedMimeTypes.RTCConnectionManagement; + } + + async convert(data: any, fromMimeType: string, toMimeType: string, options?: BufferConvertorOptions): Promise { + const session = data as RTCSignalingSession; + const maximumCompatibilityMode = !!this.plugin.storageSettings.values.maximumCompatibilityMode; + const { transcodeWidth, sessionSupportsH264High } = parseOptions(await session.getOptions()); + + const console = sdk.deviceManager.getMixinConsole(options?.sourceId, this.nativeId); + + const result = zygote(); + + const cleanup = new Deferred(); + + this.plugin.activeConnections++; + result.worker.on('exit', () => { + this.plugin.activeConnections--; + cleanup.resolve('worker exited'); + }); + cleanup.promise.finally(() => { + result.worker.terminate() + }); + + const { createConnection } = await result.result; + const connection = await createConnection({}, undefined, session, + maximumCompatibilityMode, transcodeWidth, sessionSupportsH264High, { + configuration: this.plugin.getRTCConfiguration(), + weriftConfiguration: this.plugin.getWeriftConfiguration(), + }); + cleanup.promise.finally(() => connection.close().catch(() => { })); + connection.waitClosed().finally(() => cleanup.resolve('peer connection closed')); + await connection.negotiateRTCSignalingSession(); + await connection.waitConnected(); + + // await connection.negotiateRTCSignalingSession(); + + return connection; + } +} + export default WebRTCPlugin; diff --git a/plugins/webrtc/src/peerconnection-util.ts b/plugins/webrtc/src/peerconnection-util.ts index 2c8c7a0d9..58b738718 100644 --- a/plugins/webrtc/src/peerconnection-util.ts +++ b/plugins/webrtc/src/peerconnection-util.ts @@ -28,8 +28,8 @@ async function statePromise(e: Event, check: () => boolean): Promise { function isPeerConnectionClosed(pc: RTCPeerConnection) { return (pc.connectionState === 'closed' - || pc.connectionState === 'disconnected' - || pc.connectionState === 'failed') + || pc.connectionState === 'disconnected' + || pc.connectionState === 'failed') } export function waitConnected(pc: RTCPeerConnection) { @@ -42,8 +42,8 @@ export function waitConnected(pc: RTCPeerConnection) { function isPeerIceConnectionClosed(pc: RTCPeerConnection) { return (pc.iceConnectionState === 'disconnected' - || pc.iceConnectionState === 'failed' - || pc.iceConnectionState === 'closed') + || pc.iceConnectionState === 'failed' + || pc.iceConnectionState === 'closed') } export function waitIceConnected(pc: RTCPeerConnection) {