webrtc: wip data channel generator

This commit is contained in:
Koushik Dutta
2025-08-25 12:03:30 -07:00
parent f556ae7ff3
commit d4da11bb2c
2 changed files with 90 additions and 7 deletions

View File

@@ -1,6 +1,6 @@
{
"compilerOptions": {
"module": "commonjs",
"module": "Node16",
"moduleResolution": "Node16",
"target": "esnext",
"noImplicitAny": true,

View File

@@ -1,7 +1,7 @@
import { MediaStreamTrack, PeerConfig, RTCPeerConnection, RTCRtpCodecParameters, RTCRtpTransceiver, RtpPacket } from "./werift";
import { MediaStreamTrack, PeerConfig, RTCDataChannel, RTCPeerConnection, RTCRtpTransceiver, RtpPacket } from "./werift";
import { Deferred } from "@scrypted/common/src/deferred";
import sdk, { FFmpegInput, FFmpegTranscodeStream, Intercom, MediaObject, MediaStreamDestination, MediaStreamFeedback, RequestMediaStream, RTCAVSignalingSetup, RTCConnectionManagement, RTCInputMediaObjectTrack, RTCOutputMediaObjectTrack, RTCSignalingOptions, RTCSignalingSession, ScryptedDevice, ScryptedMimeTypes } from "@scrypted/sdk";
import sdk, { FFmpegInput, FFmpegTranscodeStream, Intercom, MediaObject, MediaStreamDestination, MediaStreamFeedback, RequestMediaStream, RTCAVSignalingSetup, RTCConnectionManagement, RTCGeneratorDataChannel, RTCInputMediaObjectTrack, RTCOutputMediaObjectTrack, RTCSignalingOptions, RTCSignalingSession, ScryptedMimeTypes } from "@scrypted/sdk";
import { ScryptedSessionControl } from "./session-control";
import { optionalVideoCodec, opusAudioCodecOnly, requiredAudioCodecs, requiredVideoCodec } from "./webrtc-required-codecs";
import { logIsLocalIceTransport } from "./werift-util";
@@ -16,6 +16,7 @@ import { logConnectionState, waitClosed, waitConnected, waitIceConnected } from
import { RtpCodecCopy, RtpTrack, RtpTracks, startRtpForwarderProcess } from "./rtp-forwarders";
import { getAudioCodec, getFFmpegRtpAudioOutputArguments } from "./webrtc-required-codecs";
import { WeriftSignalingSession } from "./werift-signaling-session";
import { createAsyncQueue } from "../../../common/src/async-queue";
function getDebugModeH264EncoderArgs() {
return [
@@ -461,8 +462,8 @@ export function parseOptions(options: RTCSignalingOptions) {
// Some devices return a `screen width` value that is not a multiple of 2, which is not allowed for the h264 codec.
// Convert to a smaller even value.
const screenWidthForTranscodeH264 = !options?.screen?.width
? 960
: Math.trunc(options?.screen?.width / 2) * 2;
? 960
: Math.trunc(options?.screen?.width / 2) * 2;
const transcodeWidth = Math.max(640, Math.min(screenWidthForTranscodeH264, 1280));
const devicePixelRatio = options?.screen?.devicePixelRatio || 1;
@@ -664,7 +665,7 @@ export class WebRTCConnectionManagement implements RTCConnectionManagement {
// no support for 2 way video yet.
const videoDirection = 'sendonly';
let audioDirection = options?.audioDirection || 'sendrecv';
if (!intercom){
if (!intercom) {
audioDirection = 'sendonly';
}
@@ -705,6 +706,79 @@ export class WebRTCConnectionManagement implements RTCConnectionManagement {
return ret;
}
async createRPCGeneratorDataChannel(label: string, generator: AsyncGenerator<Buffer>, options?: {
bufferedAmountLowThreshold?: number,
}) {
generator = await sdk.connectRPCObject(generator);
const createdDc = this.pc.createDataChannel(label, {
ordered: true,
});
const q = createAsyncQueue<Buffer>();
const dcDeferred = new Deferred<typeof createdDc>();
createdDc.onopen = () => dcDeferred.resolve(createdDc);
createdDc.onclose = () => {
q.end();
dcDeferred.reject(new Error('data channel closed'));
};
createdDc.onerror = (e) => {
q.end(e.error);
dcDeferred.reject(e.error);
};
if (options?.bufferedAmountLowThreshold > 0)
createdDc.bufferedAmountLowThreshold = options.bufferedAmountLowThreshold;
await this.negotiation;
const dc = await dcDeferred.promise;
const closed = waitClosed(this.pc).finally(() => q.end());
(async () => {
try {
for await (const chunk of generator) {
if (!q.submit(chunk))
break;
}
}
catch (e) {
q.end(e);
}
finally {
q.end();
}
})();
(async () => {
try {
for await (const chunk of q.queue) {
if (dc.readyState !== 'open')
break;
dc.send(chunk);
if (dc.bufferedAmount > dc.bufferedAmountLowThreshold) {
await Promise.any([closed, dc.bufferedAmountLow.asPromise()]);
}
}
}
catch (e) {
this.console.error('Generator ended with error.', e);
if (dc.readyState === 'open') {
dc.send(e.toString());
}
}
finally {
q.end();
}
})();
return new RTCGeneratorDataChannelWrapper(dc);
}
async close(): Promise<void> {
for (const track of this.activeTracks) {
track.cleanup(false);
@@ -723,6 +797,15 @@ export class WebRTCConnectionManagement implements RTCConnectionManagement {
}
}
class RTCGeneratorDataChannelWrapper implements RTCGeneratorDataChannel {
constructor(public dc: RTCDataChannel) {
}
async close() {
this.dc.close();
}
}
export async function createRTCPeerConnectionSink(
clientSignalingSession: RTCSignalingSession,
console: Console,