webrtc-source: use ffmpeg jitter buffer

This commit is contained in:
Koushik Dutta
2022-03-04 17:51:39 -08:00
parent a003bb5ca5
commit 299e4599de
3 changed files with 221 additions and 61 deletions

View File

@@ -1,6 +1,6 @@
import { RTCAVSignalingSetup, RTCSignalingChannel, FFMpegInput, MediaStreamOptions } from "@scrypted/sdk/types";
import { listenZeroSingleClient } from "./listen-cluster";
import { RTCPeerConnection, RtcpPayloadSpecificFeedback, RTCRtpCodecParameters } from "@koush/werift";
import { Output, Pipeline, RTCPeerConnection, RtcpPacket, RtcpPayloadSpecificFeedback, RTCRtpCodecParameters, RtpPacket, uint16Add } from "@koush/werift";
import dgram from 'dgram';
import { RtspServer } from "./rtsp-server";
import { Socket } from "net";
@@ -8,50 +8,6 @@ import { RTCSessionControl, RTCSignalingSession } from "@scrypted/sdk";
import { FullIntraRequest } from "@koush/werift/lib/rtp/src/rtcp/psfb/fullIntraRequest";
import { RpcPeer } from "../../server/src/rpc";
// this is an sdp corresponding to what is requested from webrtc.
// h264 baseline and opus are required codecs that all webrtc implementations must provide.
function createSdpInput(audioPort: number, videoPort: number, sdp: string) {
let outputSdp = sdp
.replace(/c=IN .*/, `c=IN IP4 127.0.0.1`)
.replace(/m=audio \d+/, `m=audio ${audioPort}`)
.replace(/m=video \d+/, `m=video ${videoPort}`);
let lines = outputSdp.split('\n').map(line => line.trim());
lines = lines
.filter(line => !line.includes('a=rtcp-mux'))
.filter(line => !line.includes('a=candidate'))
.filter(line => !line.includes('a=ice'));
const vindex = lines.findIndex(line => line.startsWith('m=video'));
lines.splice(vindex + 1, 0, 'a=control:trackID=video');
const aindex = lines.findIndex(line => line.startsWith('m=audio'));
lines.splice(aindex + 1, 0, 'a=control:trackID=audio');
outputSdp = lines.join('\r\n')
outputSdp = outputSdp.split('m=')
.slice(1)
.map(line => 'm=' + line)
.join('');
return outputSdp;
}
export function getRTCMediaStreamOptions(id: string, name: string, useUdp: boolean): MediaStreamOptions {
return {
// set by consumer
id,
name,
// not compatible with scrypted parser currently when it is udp
tool: useUdp ? undefined : 'scrypted',
container: 'rtsp',
video: {
codec: 'h264',
},
audio: {
codec: 'opus',
},
};
}
export async function createRTCPeerConnectionSource(options: {
console: Console,
mediaStreamOptions: MediaStreamOptions,
@@ -172,27 +128,77 @@ export async function createRTCPeerConnectionSource(options: {
const audioTransceiver = pc.addTransceiver("audio", setup.audio as any);
audioTransceiver.onTrack.subscribe((track) => {
// audioTransceiver.sender.replaceTrack(track);
track.onReceiveRtp.subscribe((rtp) => {
if (!gotAudio) {
gotAudio = true;
console.log('received first audio packet');
if (useUdp) {
track.onReceiveRtp.subscribe(rtp => {
if (!gotAudio) {
gotAudio = true;
console.log('received first audio packet');
}
rtspServer.sendAudio(rtp.serialize(), false);
});
track.onReceiveRtcp.subscribe(rtp => rtspServer.sendAudio(rtp.serialize(), true));
}
else {
const jitter = new JitterBuffer({
rtpStream: track.onReceiveRtp,
rtcpStream: track.onReceiveRtcp,
});
class RtspOutput extends Output {
pushRtcpPackets(packets: RtcpPacket[]): void {
for (const rtcp of packets) {
rtspServer.sendAudio(rtcp.serialize(), true)
}
}
pushRtpPackets(packets: RtpPacket[]): void {
if (!gotAudio) {
gotAudio = true;
console.log('received first audio packet');
}
for (const rtp of packets) {
rtspServer.sendAudio(rtp.serialize(), false);
}
}
}
rtspServer.sendAudio(rtp.serialize(), false);
});
// track.onReceiveRtcp.subscribe(rtcp => rtspServer.sendAudio(rtcp.serialize(), true));
jitter.pipe(new RtspOutput())
}
});
const videoTransceiver = pc.addTransceiver("video", setup.video as any);
videoTransceiver.onTrack.subscribe((track) => {
track.onReceiveRtp.subscribe((rtp) => {
if (!gotVideo) {
gotVideo = true;
console.log('received first video packet');
if (useUdp) {
track.onReceiveRtp.subscribe(rtp => {
if (!gotVideo) {
gotVideo = true;
console.log('received first video packet');
}
rtspServer.sendVideo(rtp.serialize(), false);
});
track.onReceiveRtcp.subscribe(rtp => rtspServer.sendVideo(rtp.serialize(), true));
}
else {
const jitter = new JitterBuffer({
rtpStream: track.onReceiveRtp,
rtcpStream: track.onReceiveRtcp,
});
class RtspOutput extends Output {
pushRtcpPackets(packets: RtcpPacket[]): void {
for (const rtcp of packets) {
rtspServer.sendVideo(rtcp.serialize(), true)
}
}
pushRtpPackets(packets: RtpPacket[]): void {
if (!gotVideo) {
gotVideo = true;
console.log('received first video packet');
}
for (const rtp of packets) {
rtspServer.sendVideo(rtp.serialize(), false);
}
}
}
rtspServer.sendVideo(rtp.serialize(), false);
});
// track.onReceiveRtcp.subscribe(rtcp => rtspServer.sendVideo(rtcp.serialize(), true));
jitter.pipe(new RtspOutput())
}
// what is this for? it was in the example code, but as far as i can tell, it doesn't
// actually do anything?
// track.onReceiveRtp.once(() => {
@@ -338,9 +344,106 @@ export async function createRTCPeerConnectionSource(options: {
// unclear what this does in tcp. out of order packets in a tcp
// stream probably breaks things.
// should possibly use the werift jitter buffer in tcp mode to accomodate.
"-max_delay", "0",
// "-max_delay", "0",
'-i', url,
]
};
}
}
interface ReceivedRtpPacket extends RtpPacket {
uptime?: number;
}
export class JitterBuffer extends Pipeline {
private buffer: ReceivedRtpPacket[] = [];
// the number of packets to wait before giving up on a packet.
// 1/10th of a second.
maxDelay = .1
pushRtpPackets(packets: RtpPacket[]) {
packets.forEach(this.onRtp);
}
pushRtcpPackets(packets: RtcpPacket[]) {
this.children?.pushRtcpPackets?.(packets);
}
private onRtp = (p: RtpPacket) => {
const now = process.uptime();
const received = p as ReceivedRtpPacket;
received.uptime = now;
this.buffer.push(received);
this.buffer.sort((a, b) => a.header.timestamp - b.header.timestamp);
// find sequenced packets
let send = 0;
while (this.buffer.length > send + 1 && uint16Add(this.buffer[send].header.sequenceNumber, 1) === this.buffer[send + 1].header.sequenceNumber) {
send++;
}
// send sequenced packets
if (send) {
const packets = this.buffer.splice(0, send);
this.children?.pushRtpPackets?.(packets);
}
// find dated packets
send = 0;
while (this.buffer.length > send && this.buffer[send].uptime + this.maxDelay < now) {
send++;
}
// send dated packets
if (send) {
const packets = this.buffer.splice(0, send);
this.children?.pushRtpPackets?.(packets);
}
};
}
// this is an sdp corresponding to what is requested from webrtc.
// h264 baseline and opus are required codecs that all webrtc implementations must provide.
function createSdpInput(audioPort: number, videoPort: number, sdp: string) {
let outputSdp = sdp
.replace(/c=IN .*/, `c=IN IP4 127.0.0.1`)
.replace(/m=audio \d+/, `m=audio ${audioPort}`)
.replace(/m=video \d+/, `m=video ${videoPort}`);
let lines = outputSdp.split('\n').map(line => line.trim());
lines = lines
.filter(line => !line.includes('a=rtcp-mux'))
.filter(line => !line.includes('a=candidate'))
.filter(line => !line.includes('a=ice'));
const vindex = lines.findIndex(line => line.startsWith('m=video'));
lines.splice(vindex + 1, 0, 'a=control:trackID=video');
const aindex = lines.findIndex(line => line.startsWith('m=audio'));
lines.splice(aindex + 1, 0, 'a=control:trackID=audio');
outputSdp = lines.join('\r\n')
outputSdp = outputSdp.split('m=')
.slice(1)
.map(line => 'm=' + line)
.join('');
return outputSdp;
}
export function getRTCMediaStreamOptions(id: string, name: string, useUdp: boolean): MediaStreamOptions {
return {
// set by consumer
id,
name,
// not compatible with scrypted parser currently due to jitter issues
tool: useUdp ? undefined : 'scrypted',
container: useUdp ? 'sdp' : 'rtsp',
video: {
codec: 'h264',
},
audio: {
codec: 'opus',
},
};
}