diff --git a/common/src/wrtc-to-rtsp.ts b/common/src/wrtc-to-rtsp.ts index 902f5d388..a07f80d07 100644 --- a/common/src/wrtc-to-rtsp.ts +++ b/common/src/wrtc-to-rtsp.ts @@ -1,6 +1,6 @@ import { RTCAVSignalingSetup, RTCSignalingChannel, FFMpegInput, MediaStreamOptions } from "@scrypted/sdk/types"; import { listenZeroSingleClient } from "./listen-cluster"; -import { RTCPeerConnection, RtcpPayloadSpecificFeedback, RTCRtpCodecParameters } from "@koush/werift"; +import { Output, Pipeline, RTCPeerConnection, RtcpPacket, RtcpPayloadSpecificFeedback, RTCRtpCodecParameters, RtpPacket, uint16Add } from "@koush/werift"; import dgram from 'dgram'; import { RtspServer } from "./rtsp-server"; import { Socket } from "net"; @@ -8,50 +8,6 @@ import { RTCSessionControl, RTCSignalingSession } from "@scrypted/sdk"; import { FullIntraRequest } from "@koush/werift/lib/rtp/src/rtcp/psfb/fullIntraRequest"; import { RpcPeer } from "../../server/src/rpc"; -// this is an sdp corresponding to what is requested from webrtc. -// h264 baseline and opus are required codecs that all webrtc implementations must provide. -function createSdpInput(audioPort: number, videoPort: number, sdp: string) { - let outputSdp = sdp - .replace(/c=IN .*/, `c=IN IP4 127.0.0.1`) - .replace(/m=audio \d+/, `m=audio ${audioPort}`) - .replace(/m=video \d+/, `m=video ${videoPort}`); - - let lines = outputSdp.split('\n').map(line => line.trim()); - lines = lines - .filter(line => !line.includes('a=rtcp-mux')) - .filter(line => !line.includes('a=candidate')) - .filter(line => !line.includes('a=ice')); - - const vindex = lines.findIndex(line => line.startsWith('m=video')); - lines.splice(vindex + 1, 0, 'a=control:trackID=video'); - const aindex = lines.findIndex(line => line.startsWith('m=audio')); - lines.splice(aindex + 1, 0, 'a=control:trackID=audio'); - outputSdp = lines.join('\r\n') - - outputSdp = outputSdp.split('m=') - .slice(1) - .map(line => 'm=' + line) - .join(''); - return outputSdp; -} - -export function getRTCMediaStreamOptions(id: string, name: string, useUdp: boolean): MediaStreamOptions { - return { - // set by consumer - id, - name, - // not compatible with scrypted parser currently when it is udp - tool: useUdp ? undefined : 'scrypted', - container: 'rtsp', - video: { - codec: 'h264', - }, - audio: { - codec: 'opus', - }, - }; -} - export async function createRTCPeerConnectionSource(options: { console: Console, mediaStreamOptions: MediaStreamOptions, @@ -172,27 +128,77 @@ export async function createRTCPeerConnectionSource(options: { const audioTransceiver = pc.addTransceiver("audio", setup.audio as any); audioTransceiver.onTrack.subscribe((track) => { - // audioTransceiver.sender.replaceTrack(track); - track.onReceiveRtp.subscribe((rtp) => { - if (!gotAudio) { - gotAudio = true; - console.log('received first audio packet'); + if (useUdp) { + track.onReceiveRtp.subscribe(rtp => { + if (!gotAudio) { + gotAudio = true; + console.log('received first audio packet'); + } + rtspServer.sendAudio(rtp.serialize(), false); + }); + track.onReceiveRtcp.subscribe(rtp => rtspServer.sendAudio(rtp.serialize(), true)); + } + else { + const jitter = new JitterBuffer({ + rtpStream: track.onReceiveRtp, + rtcpStream: track.onReceiveRtcp, + }); + class RtspOutput extends Output { + pushRtcpPackets(packets: RtcpPacket[]): void { + for (const rtcp of packets) { + rtspServer.sendAudio(rtcp.serialize(), true) + } + } + pushRtpPackets(packets: RtpPacket[]): void { + if (!gotAudio) { + gotAudio = true; + console.log('received first audio packet'); + } + for (const rtp of packets) { + rtspServer.sendAudio(rtp.serialize(), false); + } + } } - rtspServer.sendAudio(rtp.serialize(), false); - }); - // track.onReceiveRtcp.subscribe(rtcp => rtspServer.sendAudio(rtcp.serialize(), true)); + jitter.pipe(new RtspOutput()) + } }); const videoTransceiver = pc.addTransceiver("video", setup.video as any); videoTransceiver.onTrack.subscribe((track) => { - track.onReceiveRtp.subscribe((rtp) => { - if (!gotVideo) { - gotVideo = true; - console.log('received first video packet'); + if (useUdp) { + track.onReceiveRtp.subscribe(rtp => { + if (!gotVideo) { + gotVideo = true; + console.log('received first video packet'); + } + rtspServer.sendVideo(rtp.serialize(), false); + }); + track.onReceiveRtcp.subscribe(rtp => rtspServer.sendVideo(rtp.serialize(), true)); + } + else { + const jitter = new JitterBuffer({ + rtpStream: track.onReceiveRtp, + rtcpStream: track.onReceiveRtcp, + }); + class RtspOutput extends Output { + pushRtcpPackets(packets: RtcpPacket[]): void { + for (const rtcp of packets) { + rtspServer.sendVideo(rtcp.serialize(), true) + } + } + pushRtpPackets(packets: RtpPacket[]): void { + if (!gotVideo) { + gotVideo = true; + console.log('received first video packet'); + } + for (const rtp of packets) { + rtspServer.sendVideo(rtp.serialize(), false); + } + } } - rtspServer.sendVideo(rtp.serialize(), false); - }); - // track.onReceiveRtcp.subscribe(rtcp => rtspServer.sendVideo(rtcp.serialize(), true)); + jitter.pipe(new RtspOutput()) + } + // what is this for? it was in the example code, but as far as i can tell, it doesn't // actually do anything? // track.onReceiveRtp.once(() => { @@ -338,9 +344,106 @@ export async function createRTCPeerConnectionSource(options: { // unclear what this does in tcp. out of order packets in a tcp // stream probably breaks things. // should possibly use the werift jitter buffer in tcp mode to accomodate. - "-max_delay", "0", + // "-max_delay", "0", '-i', url, ] }; } } + +interface ReceivedRtpPacket extends RtpPacket { + uptime?: number; +} + +export class JitterBuffer extends Pipeline { + private buffer: ReceivedRtpPacket[] = []; + + // the number of packets to wait before giving up on a packet. + // 1/10th of a second. + maxDelay = .1 + + pushRtpPackets(packets: RtpPacket[]) { + packets.forEach(this.onRtp); + } + + pushRtcpPackets(packets: RtcpPacket[]) { + this.children?.pushRtcpPackets?.(packets); + } + + private onRtp = (p: RtpPacket) => { + const now = process.uptime(); + const received = p as ReceivedRtpPacket; + received.uptime = now; + + this.buffer.push(received); + this.buffer.sort((a, b) => a.header.timestamp - b.header.timestamp); + + // find sequenced packets + let send = 0; + while (this.buffer.length > send + 1 && uint16Add(this.buffer[send].header.sequenceNumber, 1) === this.buffer[send + 1].header.sequenceNumber) { + send++; + } + + // send sequenced packets + if (send) { + const packets = this.buffer.splice(0, send); + this.children?.pushRtpPackets?.(packets); + } + + // find dated packets + send = 0; + while (this.buffer.length > send && this.buffer[send].uptime + this.maxDelay < now) { + send++; + } + + // send dated packets + if (send) { + const packets = this.buffer.splice(0, send); + this.children?.pushRtpPackets?.(packets); + } + }; +} + +// this is an sdp corresponding to what is requested from webrtc. +// h264 baseline and opus are required codecs that all webrtc implementations must provide. +function createSdpInput(audioPort: number, videoPort: number, sdp: string) { + let outputSdp = sdp + .replace(/c=IN .*/, `c=IN IP4 127.0.0.1`) + .replace(/m=audio \d+/, `m=audio ${audioPort}`) + .replace(/m=video \d+/, `m=video ${videoPort}`); + + let lines = outputSdp.split('\n').map(line => line.trim()); + lines = lines + .filter(line => !line.includes('a=rtcp-mux')) + .filter(line => !line.includes('a=candidate')) + .filter(line => !line.includes('a=ice')); + + const vindex = lines.findIndex(line => line.startsWith('m=video')); + lines.splice(vindex + 1, 0, 'a=control:trackID=video'); + const aindex = lines.findIndex(line => line.startsWith('m=audio')); + lines.splice(aindex + 1, 0, 'a=control:trackID=audio'); + outputSdp = lines.join('\r\n') + + outputSdp = outputSdp.split('m=') + .slice(1) + .map(line => 'm=' + line) + .join(''); + return outputSdp; +} + +export function getRTCMediaStreamOptions(id: string, name: string, useUdp: boolean): MediaStreamOptions { + return { + // set by consumer + id, + name, + // not compatible with scrypted parser currently due to jitter issues + tool: useUdp ? undefined : 'scrypted', + container: useUdp ? 'sdp' : 'rtsp', + video: { + codec: 'h264', + }, + audio: { + codec: 'opus', + }, + }; +} diff --git a/plugins/webrtc-source/package-lock.json b/plugins/webrtc-source/package-lock.json index 496857798..8318fafbc 100644 --- a/plugins/webrtc-source/package-lock.json +++ b/plugins/webrtc-source/package-lock.json @@ -1,10 +1,12 @@ { "name": "@scrypted/webrtc-source", + "version": "0.0.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/webrtc-source", + "version": "0.0.1", "dependencies": { "@types/node": "^16.6.1" }, @@ -14,6 +16,7 @@ } }, "../../common": { + "name": "@scrypted/common", "version": "1.0.1", "dev": true, "license": "ISC", @@ -28,6 +31,59 @@ "@types/node": "^16.9.0" } }, + "../../external/werift/packages/webrtc": { + "name": "@koush/werift", + "version": "0.14.5-beta7", + "extraneous": true, + "license": "MIT", + "dependencies": { + "@fidm/x509": "^1.2.1", + "@minhducsun2002/leb128": "^0.2.0", + "@peculiar/webcrypto": "^1.1.6", + "@peculiar/x509": "^1.2.2", + "@shinyoshiaki/ebml-builder": "^0.0.1", + "aes-js": "^3.1.2", + "binary-data": "^0.6.0", + "buffer-crc32": "^0.2.13", + "date-fns": "^2.27.0", + "debug": "^4.3.3", + "elliptic": "^6.5.3", + "int64-buffer": "^1.0.1", + "ip": "^1.1.5", + "jspack": "^0.0.4", + "lodash": "^4.17.20", + "nano-time": "^1.0.0", + "p-cancelable": "^2.1.1", + "rx.mini": "^1.1.0", + "turbo-crc32": "^1.0.1", + "tweetnacl": "^1.0.3", + "uuid": "^8.3.2" + }, + "devDependencies": { + "@types/aes-js": "^3.1.1", + "@types/buffer-crc32": "^0.2.0", + "@types/debug": "^4.1.7", + "@types/elliptic": "^6.4.14", + "@types/ip": "^1.1.0", + "@types/jest": "^27.0.3", + "@types/lodash": "^4.14.178", + "@types/node": "^17.0.0", + "@types/uuid": "^8.3.3", + "@typescript-eslint/eslint-plugin": "^5.7.0", + "@typescript-eslint/parser": "^5.7.0", + "eslint-plugin-prettier": "^4.0.0", + "jest": "^27.4.5", + "node-actionlint": "^1.2.1", + "prettier": "^2.5.1", + "ts-jest": "^27.1.1", + "ts-node": "^10.4.0", + "typedoc": "^0.22.10", + "typescript": "^4.5.4" + }, + "engines": { + "node": ">=15" + } + }, "../../sdk": { "name": "@scrypted/sdk", "version": "0.0.174", diff --git a/plugins/webrtc-source/src/main.ts b/plugins/webrtc-source/src/main.ts index 3de384d5a..4c5931eb9 100644 --- a/plugins/webrtc-source/src/main.ts +++ b/plugins/webrtc-source/src/main.ts @@ -17,6 +17,7 @@ class WebRTCMixin extends SettingsMixinDeviceBase