rebroadcast:

rtsp server can handle multiple tracks
single stream cameras should be able to disable prebuffer
fix rtsp keyframe detection
This commit is contained in:
Koushik Dutta
2022-04-11 11:58:16 -07:00
parent 0fa228cd03
commit 09d1666400
8 changed files with 151 additions and 162 deletions

View File

@@ -2,7 +2,7 @@ import { readLength, readLine } from './read-stream';
import { Duplex, PassThrough, Readable } from 'stream';
import { randomBytes } from 'crypto';
import { StreamChunk, StreamParser, StreamParserOptions } from './stream-parser';
import { findTrackByType } from './sdp-utils';
import { parseSdp } from './sdp-utils';
import dgram from 'dgram';
import net from 'net';
import tls from 'tls';
@@ -33,11 +33,42 @@ export async function readMessage(client: Readable): Promise<string[]> {
// https://yumichan.net/video-processing/video-compression/introduction-to-h264-nal-unit/
const NAL_TYPE_SPS = 7;
export const H264_NAL_TYPE_IDR = 5;
export const H264_NAL_TYPE_SPS = 7;
// aggregate NAL Unit
const NAL_TYPE_STAP_A = 24;
export const H264_NAL_TYPE_STAP_A = 24;
// fragmented NAL Unit (need to match against first)
const NAL_TYPE_FU_A = 28;
export const H264_NAL_TYPE_FU_A = 28;
export function hasH264NaluType(streamChunk: StreamChunk, naluType: number) {
if (streamChunk.type !== 'h264')
return false;
const nalu = streamChunk.chunks[streamChunk.chunks.length - 1].subarray(12);
const checkNaluType = nalu[0] & 0x1f;
if (checkNaluType === H264_NAL_TYPE_STAP_A) {
let pos = 1;
while (pos < nalu.length) {
const naluLength = nalu.readUInt16BE(pos);
pos += 2;
const stapaType = nalu[pos] & 0x1f;
if (stapaType === naluType)
return true;
pos += naluLength;
}
}
else if (checkNaluType === H264_NAL_TYPE_FU_A) {
const fuaType = nalu[1] & 0x1f;
const isFuStart = !!(nalu[1] & 0x80);
if (fuaType === naluType && isFuStart)
return true;
}
else if (checkNaluType === naluType) {
return true;
}
return false;
}
export function createRtspParser(options?: StreamParserOptions): RtspStreamParser {
let resolve: any;
@@ -61,33 +92,8 @@ export function createRtspParser(options?: StreamParserOptions): RtspStreamParse
for (let prebufferIndex = 0; prebufferIndex < streamChunks.length; prebufferIndex++) {
const streamChunk = streamChunks[prebufferIndex];
if (streamChunk.type !== 'h264')
continue;
// last packet is rtp packet, strip off the rtp header (12 bytes)
const nalu = streamChunk.chunks[streamChunk.chunks.length - 1].subarray(12);
const naluType = nalu[0] & 0x1f;
if (naluType === NAL_TYPE_STAP_A) {
let pos = 1;
while (pos < nalu.length) {
const naluLength = nalu.readUInt16BE(pos);
pos += 2;
const stapaType = nalu[pos] & 0x1f;
if (stapaType === NAL_TYPE_SPS)
foundIndex = prebufferIndex;
pos += naluLength;
}
}
else if (naluType === NAL_TYPE_FU_A) {
const fuaType = nalu[1] & 0x1f;
const isFuStart = !!(nalu[1] & 0x80);
if (fuaType === NAL_TYPE_SPS && isFuStart)
foundIndex = prebufferIndex;
}
else if (naluType === NAL_TYPE_SPS) {
if (hasH264NaluType(streamChunk, H264_NAL_TYPE_SPS))
foundIndex = prebufferIndex;
}
}
if (foundIndex !== undefined)
@@ -396,15 +402,19 @@ export class RtspClient extends RtspBase {
}
}
export interface RtspTrack {
protocol: 'tcp' | 'udp';
destination: number;
codec: string;
control: string;
}
export class RtspServer {
videoChannel: number;
audioChannel: number;
session: string;
console: Console;
udpPorts = {
video: 0,
audio: 0,
};
setupTracks: {
[trackId: string]: RtspTrack;
} = {};
constructor(public client: Duplex, public sdp?: string, public udp?: dgram.Socket, public checkRequest?: (method: string, url: string, headers: Headers, rawMessage: string[]) => Promise<boolean>) {
this.session = randomBytes(4).toString('hex');
@@ -436,7 +446,7 @@ export class RtspServer {
}
async *handleRecord(): AsyncGenerator<{
type: 'audio' | 'video',
type: string,
rtcp: boolean,
header: Buffer,
packet: Buffer,
@@ -450,8 +460,13 @@ export class RtspServer {
const length = header.readUInt16BE(2);
const packet = await readLength(this.client, length);
const id = header.readUInt8(1);
const destination = id - (id % 2);
const track = Object.values(this.setupTracks).find(track => track.destination === destination);
if (!track)
throw new Error('RSTP Server received unknown channel: ' + id);
yield {
type: id - (id % 2) === this.videoChannel ? 'video' : 'audio',
type: track.codec,
rtcp: id % 2 === 1,
header,
packet,
@@ -474,26 +489,22 @@ export class RtspServer {
this.udp.send(packet, rtcp ? port + 1 : port, '127.0.0.1');
}
sendVideo(packet: Buffer, rtcp: boolean) {
if (this.udp && this.udpPorts.video) {
this.sendUdp(this.udpPorts.video, packet, rtcp)
sendTrack(trackId: string, packet: Buffer, rtcp: boolean) {
const track = this.setupTracks[trackId];
if (!track) {
this.console?.warn('RTSP Server track not found:', trackId);
return;
}
else {
if (this.videoChannel == null)
throw new Error('rtsp videoChannel not set up');
this.send(packet, rtcp ? this.videoChannel + 1 : this.videoChannel);
}
}
sendAudio(packet: Buffer, rtcp: boolean) {
if (this.udp && this.udpPorts.audio) {
this.sendUdp(this.udpPorts.audio, packet, rtcp)
}
else {
if (this.audioChannel == null)
throw new Error('rtsp audioChannel not set up');
this.send(packet, rtcp ? this.audioChannel + 1 : this.audioChannel);
if (track.protocol === 'udp') {
if (!this.udp)
this.console?.warn('RTSP Server UDP socket not available.');
else
this.sendUdp(track.destination, packet, rtcp);
return;
}
this.send(packet, rtcp ? track.destination + 1 : track.destination);
}
options(url: string, requestHeaders: Headers) {
@@ -517,8 +528,13 @@ export class RtspServer {
const transport = requestHeaders['transport'];
headers['Transport'] = transport;
headers['Session'] = this.session;
let audioTrack = findTrackByType(this.sdp, 'audio');
let videoTrack = findTrackByType(this.sdp, 'video');
const parsedSdp = parseSdp(this.sdp);
const msection = parsedSdp.msections.find(msection => url.endsWith(msection.control));
if (!msection) {
this.respond(404, 'Not Found', requestHeaders, headers);
return;
}
if (transport.includes('UDP')) {
if (!this.udp) {
this.respond(461, 'Unsupported Transport', requestHeaders, {});
@@ -526,25 +542,24 @@ export class RtspServer {
}
const match = transport.match(/.*?client_port=([0-9]+)-([0-9]+)/);
const [_, rtp, rtcp] = match;
if (audioTrack && url.includes(audioTrack.trackId))
this.udpPorts.audio = parseInt(rtp);
else if (videoTrack && url.includes(videoTrack.trackId))
this.udpPorts.video = parseInt(rtp);
else
this.console?.warn('unknown track id', url);
this.setupTracks[msection.control] = {
control: msection.control,
protocol: 'udp',
destination: parseInt(rtp),
codec: msection.codec,
}
}
else if (transport.includes('TCP')) {
const match = transport.match(/.*?interleaved=([0-9]+)-([0-9]+)/);
if (match) {
const low = parseInt(match[1]);
const high = parseInt(match[2]);
if (audioTrack && url.includes(audioTrack.trackId))
this.audioChannel = low;
else if (videoTrack && url.includes(videoTrack.trackId))
this.videoChannel = low;
else
this.console?.warn('unknown track id', url);
this.setupTracks[msection.control] = {
control: msection.control,
protocol: 'tcp',
destination: low,
codec: msection.codec,
}
}
}
this.respond(200, 'OK', requestHeaders, headers)
@@ -552,15 +567,8 @@ export class RtspServer {
play(url: string, requestHeaders: Headers) {
const headers: Headers = {};
let audioTrack = findTrackByType(this.sdp, 'audio');
let videoTrack = findTrackByType(this.sdp, 'video');
let rtpInfo = '';
if (audioTrack)
rtpInfo = `url=${url}/trackID=${audioTrack.trackId};seq=0;rtptime=0`
if (audioTrack && videoTrack)
rtpInfo += ',';
if (videoTrack)
rtpInfo += `url=${url}/trackID=${videoTrack.trackId};seq=0;rtptime=0`;
const rtpInfos = Object.values(this.setupTracks).map(track => `url=${url}/trackID=${track.control};seq=0;rtptime=0`);
const rtpInfo = rtpInfos.join(',');
headers['RTP-Info'] = rtpInfo;
headers['Range'] = 'npt=now-';
headers['Session'] = this.session;

View File

@@ -1,3 +1,4 @@
// todo: move this to ring.
export function replacePorts(sdp: string, audioPort: number, videoPort: number) {
let outputSdp = sdp
.replace(/c=IN .*/, `c=IN IP4 127.0.0.1`)
@@ -9,15 +10,18 @@ export function replacePorts(sdp: string, audioPort: number, videoPort: number)
export function addTrackControls(sdp: string) {
let lines = sdp.split('\n').map(line => line.trim());
lines = lines.filter(line => !line.includes('a=control:'));
const vindex = lines.findIndex(line => line.startsWith('m=video'));
if (vindex !== -1)
lines.splice(vindex + 1, 0, 'a=control:trackID=video');
const aindex = lines.findIndex(line => line.startsWith('m=audio'));
if (aindex !== -1)
lines.splice(aindex + 1, 0, 'a=control:trackID=audio');
let trackCount = 0;
for (let i = 0; i < lines.length; i++) {
const line = lines[i];
if (!line.startsWith('m='))
continue;
lines.splice(i + 1, 0, 'a=control:trackID=' + trackCount);
trackCount++;
}
return lines.join('\r\n')
}
// todo: move this to webrtc
// this is an sdp corresponding to what is requested from webrtc.
// h264 baseline and opus are required codecs that all webrtc implementations must provide.
export function createSdpInput(audioPort: number, videoPort: number, sdp: string) {
@@ -46,6 +50,7 @@ export function createSdpInput(audioPort: number, videoPort: number, sdp: string
return outputSdp;
}
// todo: move this to webrtc
export function findFmtp(sdp: string, codec: string) {
let lines = sdp.split('\n').map(line => line.trim());
@@ -62,24 +67,6 @@ export function findFmtp(sdp: string, codec: string) {
})
}
export function parsePayloadTypes(sdp: string) {
const audioPayloadTypes = new Set<number>();
const videoPayloadTypes = new Set<number>();
const addPts = (set: Set<number>, pts: string[]) => {
for (const pt of pts || []) {
set.add(parseInt(pt));
}
};
const audioPts = sdp.match(/m=audio.*/)?.[0];
addPts(audioPayloadTypes, audioPts?.split(' ').slice(3));
const videoPts = sdp.match(/m=video.*/)?.[0];
addPts(videoPayloadTypes, videoPts?.split(' ').slice(3));
return {
audioPayloadTypes,
videoPayloadTypes,
}
}
function getSections(sdp: string) {
const sections = ('\n' + sdp).split('\nm=');
return sections;

View File

@@ -5,19 +5,19 @@ import { handleRebroadcasterClient, ParserOptions, ParserSession, setupActivityT
import { closeQuiet, createBindZero, listenZeroSingleClient } from '@scrypted/common/src/listen-cluster';
import { safeKillFFmpeg } from '@scrypted/common/src/media-helpers';
import { readLength } from '@scrypted/common/src/read-stream';
import { createRtspParser, RtspClient, RtspServer, RTSP_FRAME_MAGIC } from '@scrypted/common/src/rtsp-server';
import { addTrackControls, parsePayloadTypes, parseSdp } from '@scrypted/common/src/sdp-utils';
import { createRtspParser, H264_NAL_TYPE_IDR, hasH264NaluType, RtspClient, RtspServer, RTSP_FRAME_MAGIC } from '@scrypted/common/src/rtsp-server';
import { addTrackControls, parseSdp } from '@scrypted/common/src/sdp-utils';
import { StorageSettings } from '@scrypted/common/src/settings';
import { SettingsMixinDeviceBase, SettingsMixinDeviceOptions } from "@scrypted/common/src/settings-mixin";
import { sleep } from '@scrypted/common/src/sleep';
import { createFragmentedMp4Parser, createMpegTsParser, parseMp4StreamChunks, StreamChunk, StreamParser } from '@scrypted/common/src/stream-parser';
import sdk, { BufferConverter, FFMpegInput, MediaObject, MediaStreamOptions, MixinProvider, RequestMediaStreamOptions, ResponseMediaStreamOptions, ScryptedDeviceType, ScryptedInterface, ScryptedMimeTypes, Setting, Settings, SettingValue, VideoCamera, VideoCameraConfiguration } from '@scrypted/sdk';
import crypto from 'crypto';
import dgram from 'dgram';
import net from 'net';
import { Duplex } from 'stream';
import { connectRFC4571Parser, RtspChannelCodecMapping, startRFC4571Parser } from './rfc4571';
import { createStreamSettings, getPrebufferedStreams } from './stream-settings';
import dgram from 'dgram';
const { mediaManager, log, systemManager, deviceManager } = sdk;
@@ -837,14 +837,8 @@ class PrebufferSession {
if (chunk.type === 'mdat') {
updateIdr();
}
if (chunk.type === 'rtp-video') {
const fragmentType = chunk.chunks[1].readUInt8(12) & 0x1f;
const second = chunk.chunks[1].readUInt8(13);
const nalType = second & 0x1f;
const startBit = second & 0x80;
if (((fragmentType === 28 || fragmentType === 29) && nalType === 5 && startBit == 128) || fragmentType == 5) {
updateIdr();
}
if (chunk.type === 'h264' && hasH264NaluType(chunk, H264_NAL_TYPE_IDR)) {
updateIdr();
}
prebufferContainer.push({
@@ -948,7 +942,7 @@ class PrebufferSession {
session.once('killed', cleanup);
const prebufferContainer: PrebufferStreamChunk[] = this.prebuffers[container];
if (container !== 'rtsp') {
if (true || container !== 'rtsp') {
for (const prebuffer of prebufferContainer) {
if (prebuffer.time < now - requestedPrebuffer)
continue;
@@ -1008,34 +1002,24 @@ class PrebufferSession {
let socketPromise: Promise<Duplex>;
let url: string;
let filter: (chunk: StreamChunk) => StreamChunk;
const codecMap = new Map<string, number>();
if (container === 'rtsp') {
let audioChannel: number;
let videoChannel: number;
const parsedSdp = parseSdp(sdp);
if (parsedSdp.msections.length > 2) {
parsedSdp.msections = parsedSdp.msections.filter(msection => msection.codec === mediaStreamOptions.video?.codec || msection.codec === mediaStreamOptions.audio?.codec);
sdp = parsedSdp.toSdp();
filter = chunk => {
if (chunk.type === mediaStreamOptions.video?.codec && videoChannel !== undefined) {
const chunks = chunk.chunks.slice();
const header = chunks[0];
header.writeUInt8(videoChannel, 1);
return {
startStream: chunk.startStream,
chunks,
}
}
if (chunk.type === mediaStreamOptions.audio?.codec && audioChannel !== undefined) {
const chunks = chunk.chunks.slice();
const header = chunks[0];
header.writeUInt8(audioChannel, 1);
return {
startStream: chunk.startStream,
chunks,
}
const channel = codecMap.get(chunk.type);
if (channel == undefined)
return;
const chunks = chunk.chunks.slice();
const header = Buffer.from(chunks[0]);
header.writeUInt8(channel, 1);
chunks[0] = header;
return {
startStream: chunk.startStream,
chunks,
}
}
}
@@ -1046,8 +1030,9 @@ class PrebufferSession {
const server = new RtspServer(socket, sdp);
server.console = this.console;
await server.handlePlayback();
audioChannel = server.audioChannel;
videoChannel = server.videoChannel;
for (const track of Object.values(server.setupTracks)) {
codecMap.set(track.codec, track.destination);
}
return socket;
})
url = client.url.replace('tcp://', 'rtsp://');
@@ -1155,6 +1140,7 @@ class PrebufferMixin extends SettingsMixinDeviceBase<VideoCamera & VideoCameraCo
};
switch (options.destination) {
case 'medium-resolution':
case 'remote':
result = this.streamSettings.getRemoteStream(msos);
break;
@@ -1467,7 +1453,13 @@ class PrebufferProvider extends AutoenableMixinProvider implements MixinProvider
const json = JSON.parse(data.toString());
const { url, sdp } = json;
const { audioPayloadTypes, videoPayloadTypes } = parsePayloadTypes(sdp);
const parsedSdp = parseSdp(sdp);
const trackLookups = new Map<number, string>();
for (const msection of parsedSdp.msections) {
for (const pt of msection.payloadTypes) {
trackLookups.set(pt, msection.control);
}
}
const u = new URL(url);
if (!u.protocol.startsWith('tcp'))
@@ -1499,19 +1491,15 @@ class PrebufferProvider extends AutoenableMixinProvider implements MixinProvider
const length = header.readInt16BE(0);
const data = await readLength(socket, length);
const pt = data[1] & 0x7f;
if (audioPayloadTypes.has(pt)) {
rtsp.sendAudio(data, false);
}
else if (videoPayloadTypes.has(pt)) {
rtsp.sendVideo(data, false);
}
else {
const track = trackLookups.get(pt);
if (!track) {
client.destroy();
socket.destroy();
throw new Error('unknown payload type ' + pt);
}
rtsp.sendTrack(track, data, false);
}
})
});
return Buffer.from(JSON.stringify(ffmpeg));
}
@@ -1532,7 +1520,7 @@ class PrebufferProvider extends AutoenableMixinProvider implements MixinProvider
mixinDeviceState,
mixinProviderNativeId: this.nativeId,
mixinDeviceInterfaces,
group: "Stream Selection",
group: "Stream Management",
groupKey: "prebuffer",
});
this.currentMixins.set(mixinDeviceState.id, ret);

View File

@@ -2,15 +2,12 @@ import { cloneDeep } from "@scrypted/common/src/clone-deep";
import { ParserOptions, ParserSession, setupActivityTimer } from "@scrypted/common/src/ffmpeg-rebroadcast";
import { readLength } from "@scrypted/common/src/read-stream";
import { RTSP_FRAME_MAGIC } from "@scrypted/common/src/rtsp-server";
import { findTrackByType, parseSdp } from "@scrypted/common/src/sdp-utils";
import { parseSdp } from "@scrypted/common/src/sdp-utils";
import { StreamChunk } from "@scrypted/common/src/stream-parser";
import sdk, { ResponseMediaStreamOptions } from "@scrypted/sdk";
import { ResponseMediaStreamOptions } from "@scrypted/sdk";
import net from 'net';
import { EventEmitter, Readable } from "stream";
const { mediaManager } = sdk;
export function connectRFC4571Parser(url: string) {
const u = new URL(url);
if (!u.protocol.startsWith('tcp'))

View File

@@ -94,7 +94,7 @@ export function createStreamSettings(device: MixinDeviceBase<VideoCamera>) {
title: 'Prebuffered Streams',
description: 'Prebuffering maintains an active connection to the stream and improves load times. Prebuffer also retains the recent video for capturing motion events with HomeKit Secure video. Enabling Prebuffer is not recommended on Cloud cameras.',
multiple: true,
hide: true,
hide: false,
},
...streamTypes,
});
@@ -139,16 +139,20 @@ export function createStreamSettings(device: MixinDeviceBase<VideoCamera>) {
storageSettings.options = {
onGet: async () => {
let enabledStreams: StorageSetting;
try {
const msos = await device.mixinDevice.getVideoStreamOptions();
enabledStreams = {
defaultValue: getDefaultPrebufferedStreams(msos)?.map(mso => mso.name),
choices: msos.map(mso => mso.name),
hide: false,
};
if (msos?.length > 1) {
return {
enabledStreams: {
defaultValue: getDefaultPrebufferedStreams(msos)?.map(mso => mso.name),
choices: msos.map(mso => mso.name),
hide: false,
},
enabledStreams,
defaultStream: createStreamOptions(streamTypes.defaultStream, msos),
remoteStream: createStreamOptions(streamTypes.remoteStream, msos),
lowResolutionStream: createStreamOptions(streamTypes.lowResolutionStream, msos),
@@ -156,6 +160,11 @@ export function createStreamSettings(device: MixinDeviceBase<VideoCamera>) {
remoteRecordingStream: createStreamOptions(streamTypes.remoteRecordingStream, msos),
}
}
else {
return {
enabledStreams,
}
}
}
catch (e) {
device.console.error('error retrieving getVideoStreamOptions', e);

View File

@@ -431,7 +431,7 @@ export interface ResponseMediaStreamOptions extends MediaStreamOptions {
sdp?: string;
}
export type MediaStreamDestination = "local" | "remote" | "low-resolution" | "local-recorder" | "remote-recorder";
export type MediaStreamDestination = "local" | "remote" | "medium-resolution" | "low-resolution" | "local-recorder" | "remote-recorder";
export interface RequestMediaStreamOptions extends MediaStreamOptions {
/**

View File

@@ -556,7 +556,7 @@ export interface ResponseMediaStreamOptions extends MediaStreamOptions {
userConfigurable?: boolean;
sdp?: string;
}
export declare type MediaStreamDestination = "local" | "remote" | "low-resolution" | "local-recorder" | "remote-recorder";
export declare type MediaStreamDestination = "local" | "remote" | "medium-resolution" | "low-resolution" | "local-recorder" | "remote-recorder";
export interface RequestMediaStreamOptions extends MediaStreamOptions {
/**
* When retrieving media, setting disableMediaProxies=true

View File

@@ -1132,7 +1132,7 @@ export interface ResponseMediaStreamOptions extends MediaStreamOptions {
sdp?: string;
}
export type MediaStreamDestination = "local" | "remote" | "low-resolution" | "local-recorder" | "remote-recorder";
export type MediaStreamDestination = "local" | "remote" | "medium-resolution" | "low-resolution" | "local-recorder" | "remote-recorder";
export interface RequestMediaStreamOptions extends MediaStreamOptions {
/**