From d4da11bb2c3fee04c3f555ff893ed8571f455611 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Mon, 25 Aug 2025 12:03:30 -0700 Subject: [PATCH] webrtc: wip data channel generator --- common/tsconfig.json | 2 +- plugins/webrtc/src/ffmpeg-to-wrtc.ts | 95 ++++++++++++++++++++++++++-- 2 files changed, 90 insertions(+), 7 deletions(-) diff --git a/common/tsconfig.json b/common/tsconfig.json index 452fe012d..0eac7cc3a 100644 --- a/common/tsconfig.json +++ b/common/tsconfig.json @@ -1,6 +1,6 @@ { "compilerOptions": { - "module": "commonjs", + "module": "Node16", "moduleResolution": "Node16", "target": "esnext", "noImplicitAny": true, diff --git a/plugins/webrtc/src/ffmpeg-to-wrtc.ts b/plugins/webrtc/src/ffmpeg-to-wrtc.ts index 00f450c79..af6f8579c 100644 --- a/plugins/webrtc/src/ffmpeg-to-wrtc.ts +++ b/plugins/webrtc/src/ffmpeg-to-wrtc.ts @@ -1,7 +1,7 @@ -import { MediaStreamTrack, PeerConfig, RTCPeerConnection, RTCRtpCodecParameters, RTCRtpTransceiver, RtpPacket } from "./werift"; +import { MediaStreamTrack, PeerConfig, RTCDataChannel, RTCPeerConnection, RTCRtpTransceiver, RtpPacket } from "./werift"; import { Deferred } from "@scrypted/common/src/deferred"; -import sdk, { FFmpegInput, FFmpegTranscodeStream, Intercom, MediaObject, MediaStreamDestination, MediaStreamFeedback, RequestMediaStream, RTCAVSignalingSetup, RTCConnectionManagement, RTCInputMediaObjectTrack, RTCOutputMediaObjectTrack, RTCSignalingOptions, RTCSignalingSession, ScryptedDevice, ScryptedMimeTypes } from "@scrypted/sdk"; +import sdk, { FFmpegInput, FFmpegTranscodeStream, Intercom, MediaObject, MediaStreamDestination, MediaStreamFeedback, RequestMediaStream, RTCAVSignalingSetup, RTCConnectionManagement, RTCGeneratorDataChannel, RTCInputMediaObjectTrack, RTCOutputMediaObjectTrack, RTCSignalingOptions, RTCSignalingSession, ScryptedMimeTypes } from "@scrypted/sdk"; import { ScryptedSessionControl } from "./session-control"; import { optionalVideoCodec, opusAudioCodecOnly, requiredAudioCodecs, requiredVideoCodec } from "./webrtc-required-codecs"; import { logIsLocalIceTransport } from "./werift-util"; @@ -16,6 +16,7 @@ import { logConnectionState, waitClosed, waitConnected, waitIceConnected } from import { RtpCodecCopy, RtpTrack, RtpTracks, startRtpForwarderProcess } from "./rtp-forwarders"; import { getAudioCodec, getFFmpegRtpAudioOutputArguments } from "./webrtc-required-codecs"; import { WeriftSignalingSession } from "./werift-signaling-session"; +import { createAsyncQueue } from "../../../common/src/async-queue"; function getDebugModeH264EncoderArgs() { return [ @@ -460,9 +461,9 @@ export function parseOptions(options: RTCSignalingOptions) { // Some devices return a `screen width` value that is not a multiple of 2, which is not allowed for the h264 codec. // Convert to a smaller even value. - const screenWidthForTranscodeH264 = !options?.screen?.width - ? 960 - : Math.trunc(options?.screen?.width / 2) * 2; + const screenWidthForTranscodeH264 = !options?.screen?.width + ? 960 + : Math.trunc(options?.screen?.width / 2) * 2; const transcodeWidth = Math.max(640, Math.min(screenWidthForTranscodeH264, 1280)); const devicePixelRatio = options?.screen?.devicePixelRatio || 1; @@ -664,7 +665,7 @@ export class WebRTCConnectionManagement implements RTCConnectionManagement { // no support for 2 way video yet. const videoDirection = 'sendonly'; let audioDirection = options?.audioDirection || 'sendrecv'; - if (!intercom){ + if (!intercom) { audioDirection = 'sendonly'; } @@ -705,6 +706,79 @@ export class WebRTCConnectionManagement implements RTCConnectionManagement { return ret; } + async createRPCGeneratorDataChannel(label: string, generator: AsyncGenerator, options?: { + bufferedAmountLowThreshold?: number, + }) { + generator = await sdk.connectRPCObject(generator); + + const createdDc = this.pc.createDataChannel(label, { + ordered: true, + }); + + const q = createAsyncQueue(); + + const dcDeferred = new Deferred(); + + createdDc.onopen = () => dcDeferred.resolve(createdDc); + createdDc.onclose = () => { + q.end(); + dcDeferred.reject(new Error('data channel closed')); + }; + createdDc.onerror = (e) => { + q.end(e.error); + dcDeferred.reject(e.error); + }; + + if (options?.bufferedAmountLowThreshold > 0) + createdDc.bufferedAmountLowThreshold = options.bufferedAmountLowThreshold; + + await this.negotiation; + const dc = await dcDeferred.promise; + + const closed = waitClosed(this.pc).finally(() => q.end()); + + (async () => { + try { + for await (const chunk of generator) { + if (!q.submit(chunk)) + break; + } + } + catch (e) { + q.end(e); + } + finally { + q.end(); + } + })(); + + + (async () => { + try { + for await (const chunk of q.queue) { + if (dc.readyState !== 'open') + break; + dc.send(chunk); + + if (dc.bufferedAmount > dc.bufferedAmountLowThreshold) { + await Promise.any([closed, dc.bufferedAmountLow.asPromise()]); + } + } + } + catch (e) { + this.console.error('Generator ended with error.', e); + if (dc.readyState === 'open') { + dc.send(e.toString()); + } + } + finally { + q.end(); + } + })(); + + return new RTCGeneratorDataChannelWrapper(dc); + } + async close(): Promise { for (const track of this.activeTracks) { track.cleanup(false); @@ -723,6 +797,15 @@ export class WebRTCConnectionManagement implements RTCConnectionManagement { } } +class RTCGeneratorDataChannelWrapper implements RTCGeneratorDataChannel { + constructor(public dc: RTCDataChannel) { + } + + async close() { + this.dc.close(); + } +} + export async function createRTCPeerConnectionSink( clientSignalingSession: RTCSignalingSession, console: Console,