From 09d1666400712d982fe2a3b67067280965aebda2 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Mon, 11 Apr 2022 11:58:16 -0700 Subject: [PATCH] rebroadcast: rtsp server can handle multiple tracks single stream cameras should be able to disable prebuffer fix rtsp keyframe detection --- common/src/rtsp-server.ts | 166 +++++++++--------- common/src/sdp-utils.ts | 35 ++-- plugins/prebuffer-mixin/src/main.ts | 78 ++++---- plugins/prebuffer-mixin/src/rfc4571.ts | 7 +- .../prebuffer-mixin/src/stream-settings.ts | 21 ++- sdk/gen/types.input.ts | 2 +- sdk/types/index.d.ts | 2 +- sdk/types/index.ts | 2 +- 8 files changed, 151 insertions(+), 162 deletions(-) diff --git a/common/src/rtsp-server.ts b/common/src/rtsp-server.ts index d45ef7be5..c3968fd70 100644 --- a/common/src/rtsp-server.ts +++ b/common/src/rtsp-server.ts @@ -2,7 +2,7 @@ import { readLength, readLine } from './read-stream'; import { Duplex, PassThrough, Readable } from 'stream'; import { randomBytes } from 'crypto'; import { StreamChunk, StreamParser, StreamParserOptions } from './stream-parser'; -import { findTrackByType } from './sdp-utils'; +import { parseSdp } from './sdp-utils'; import dgram from 'dgram'; import net from 'net'; import tls from 'tls'; @@ -33,11 +33,42 @@ export async function readMessage(client: Readable): Promise { // https://yumichan.net/video-processing/video-compression/introduction-to-h264-nal-unit/ -const NAL_TYPE_SPS = 7; +export const H264_NAL_TYPE_IDR = 5; +export const H264_NAL_TYPE_SPS = 7; // aggregate NAL Unit -const NAL_TYPE_STAP_A = 24; +export const H264_NAL_TYPE_STAP_A = 24; // fragmented NAL Unit (need to match against first) -const NAL_TYPE_FU_A = 28; +export const H264_NAL_TYPE_FU_A = 28; + +export function hasH264NaluType(streamChunk: StreamChunk, naluType: number) { + if (streamChunk.type !== 'h264') + return false; + + const nalu = streamChunk.chunks[streamChunk.chunks.length - 1].subarray(12); + const checkNaluType = nalu[0] & 0x1f; + if (checkNaluType === H264_NAL_TYPE_STAP_A) { + let pos = 1; + while (pos < nalu.length) { + const naluLength = nalu.readUInt16BE(pos); + pos += 2; + const stapaType = nalu[pos] & 0x1f; + if (stapaType === naluType) + return true; + pos += naluLength; + } + } + else if (checkNaluType === H264_NAL_TYPE_FU_A) { + const fuaType = nalu[1] & 0x1f; + const isFuStart = !!(nalu[1] & 0x80); + + if (fuaType === naluType && isFuStart) + return true; + } + else if (checkNaluType === naluType) { + return true; + } + return false; +} export function createRtspParser(options?: StreamParserOptions): RtspStreamParser { let resolve: any; @@ -61,33 +92,8 @@ export function createRtspParser(options?: StreamParserOptions): RtspStreamParse for (let prebufferIndex = 0; prebufferIndex < streamChunks.length; prebufferIndex++) { const streamChunk = streamChunks[prebufferIndex]; - if (streamChunk.type !== 'h264') - continue; - - // last packet is rtp packet, strip off the rtp header (12 bytes) - const nalu = streamChunk.chunks[streamChunk.chunks.length - 1].subarray(12); - const naluType = nalu[0] & 0x1f; - if (naluType === NAL_TYPE_STAP_A) { - let pos = 1; - while (pos < nalu.length) { - const naluLength = nalu.readUInt16BE(pos); - pos += 2; - const stapaType = nalu[pos] & 0x1f; - if (stapaType === NAL_TYPE_SPS) - foundIndex = prebufferIndex; - pos += naluLength; - } - } - else if (naluType === NAL_TYPE_FU_A) { - const fuaType = nalu[1] & 0x1f; - const isFuStart = !!(nalu[1] & 0x80); - - if (fuaType === NAL_TYPE_SPS && isFuStart) - foundIndex = prebufferIndex; - } - else if (naluType === NAL_TYPE_SPS) { + if (hasH264NaluType(streamChunk, H264_NAL_TYPE_SPS)) foundIndex = prebufferIndex; - } } if (foundIndex !== undefined) @@ -396,15 +402,19 @@ export class RtspClient extends RtspBase { } } +export interface RtspTrack { + protocol: 'tcp' | 'udp'; + destination: number; + codec: string; + control: string; +} + export class RtspServer { - videoChannel: number; - audioChannel: number; session: string; console: Console; - udpPorts = { - video: 0, - audio: 0, - }; + setupTracks: { + [trackId: string]: RtspTrack; + } = {}; constructor(public client: Duplex, public sdp?: string, public udp?: dgram.Socket, public checkRequest?: (method: string, url: string, headers: Headers, rawMessage: string[]) => Promise) { this.session = randomBytes(4).toString('hex'); @@ -436,7 +446,7 @@ export class RtspServer { } async *handleRecord(): AsyncGenerator<{ - type: 'audio' | 'video', + type: string, rtcp: boolean, header: Buffer, packet: Buffer, @@ -450,8 +460,13 @@ export class RtspServer { const length = header.readUInt16BE(2); const packet = await readLength(this.client, length); const id = header.readUInt8(1); + const destination = id - (id % 2); + const track = Object.values(this.setupTracks).find(track => track.destination === destination); + if (!track) + throw new Error('RSTP Server received unknown channel: ' + id); + yield { - type: id - (id % 2) === this.videoChannel ? 'video' : 'audio', + type: track.codec, rtcp: id % 2 === 1, header, packet, @@ -474,26 +489,22 @@ export class RtspServer { this.udp.send(packet, rtcp ? port + 1 : port, '127.0.0.1'); } - sendVideo(packet: Buffer, rtcp: boolean) { - if (this.udp && this.udpPorts.video) { - this.sendUdp(this.udpPorts.video, packet, rtcp) + sendTrack(trackId: string, packet: Buffer, rtcp: boolean) { + const track = this.setupTracks[trackId]; + if (!track) { + this.console?.warn('RTSP Server track not found:', trackId); + return; } - else { - if (this.videoChannel == null) - throw new Error('rtsp videoChannel not set up'); - this.send(packet, rtcp ? this.videoChannel + 1 : this.videoChannel); - } - } - sendAudio(packet: Buffer, rtcp: boolean) { - if (this.udp && this.udpPorts.audio) { - this.sendUdp(this.udpPorts.audio, packet, rtcp) - } - else { - if (this.audioChannel == null) - throw new Error('rtsp audioChannel not set up'); - this.send(packet, rtcp ? this.audioChannel + 1 : this.audioChannel); + if (track.protocol === 'udp') { + if (!this.udp) + this.console?.warn('RTSP Server UDP socket not available.'); + else + this.sendUdp(track.destination, packet, rtcp); + return; } + + this.send(packet, rtcp ? track.destination + 1 : track.destination); } options(url: string, requestHeaders: Headers) { @@ -517,8 +528,13 @@ export class RtspServer { const transport = requestHeaders['transport']; headers['Transport'] = transport; headers['Session'] = this.session; - let audioTrack = findTrackByType(this.sdp, 'audio'); - let videoTrack = findTrackByType(this.sdp, 'video'); + const parsedSdp = parseSdp(this.sdp); + const msection = parsedSdp.msections.find(msection => url.endsWith(msection.control)); + if (!msection) { + this.respond(404, 'Not Found', requestHeaders, headers); + return; + } + if (transport.includes('UDP')) { if (!this.udp) { this.respond(461, 'Unsupported Transport', requestHeaders, {}); @@ -526,25 +542,24 @@ export class RtspServer { } const match = transport.match(/.*?client_port=([0-9]+)-([0-9]+)/); const [_, rtp, rtcp] = match; - if (audioTrack && url.includes(audioTrack.trackId)) - this.udpPorts.audio = parseInt(rtp); - else if (videoTrack && url.includes(videoTrack.trackId)) - this.udpPorts.video = parseInt(rtp); - else - this.console?.warn('unknown track id', url); + this.setupTracks[msection.control] = { + control: msection.control, + protocol: 'udp', + destination: parseInt(rtp), + codec: msection.codec, + } } else if (transport.includes('TCP')) { const match = transport.match(/.*?interleaved=([0-9]+)-([0-9]+)/); if (match) { const low = parseInt(match[1]); const high = parseInt(match[2]); - - if (audioTrack && url.includes(audioTrack.trackId)) - this.audioChannel = low; - else if (videoTrack && url.includes(videoTrack.trackId)) - this.videoChannel = low; - else - this.console?.warn('unknown track id', url); + this.setupTracks[msection.control] = { + control: msection.control, + protocol: 'tcp', + destination: low, + codec: msection.codec, + } } } this.respond(200, 'OK', requestHeaders, headers) @@ -552,15 +567,8 @@ export class RtspServer { play(url: string, requestHeaders: Headers) { const headers: Headers = {}; - let audioTrack = findTrackByType(this.sdp, 'audio'); - let videoTrack = findTrackByType(this.sdp, 'video'); - let rtpInfo = ''; - if (audioTrack) - rtpInfo = `url=${url}/trackID=${audioTrack.trackId};seq=0;rtptime=0` - if (audioTrack && videoTrack) - rtpInfo += ','; - if (videoTrack) - rtpInfo += `url=${url}/trackID=${videoTrack.trackId};seq=0;rtptime=0`; + const rtpInfos = Object.values(this.setupTracks).map(track => `url=${url}/trackID=${track.control};seq=0;rtptime=0`); + const rtpInfo = rtpInfos.join(','); headers['RTP-Info'] = rtpInfo; headers['Range'] = 'npt=now-'; headers['Session'] = this.session; diff --git a/common/src/sdp-utils.ts b/common/src/sdp-utils.ts index 62b3a6208..382e22465 100644 --- a/common/src/sdp-utils.ts +++ b/common/src/sdp-utils.ts @@ -1,3 +1,4 @@ +// todo: move this to ring. export function replacePorts(sdp: string, audioPort: number, videoPort: number) { let outputSdp = sdp .replace(/c=IN .*/, `c=IN IP4 127.0.0.1`) @@ -9,15 +10,18 @@ export function replacePorts(sdp: string, audioPort: number, videoPort: number) export function addTrackControls(sdp: string) { let lines = sdp.split('\n').map(line => line.trim()); lines = lines.filter(line => !line.includes('a=control:')); - const vindex = lines.findIndex(line => line.startsWith('m=video')); - if (vindex !== -1) - lines.splice(vindex + 1, 0, 'a=control:trackID=video'); - const aindex = lines.findIndex(line => line.startsWith('m=audio')); - if (aindex !== -1) - lines.splice(aindex + 1, 0, 'a=control:trackID=audio'); + let trackCount = 0; + for (let i = 0; i < lines.length; i++) { + const line = lines[i]; + if (!line.startsWith('m=')) + continue; + lines.splice(i + 1, 0, 'a=control:trackID=' + trackCount); + trackCount++; + } return lines.join('\r\n') } +// todo: move this to webrtc // this is an sdp corresponding to what is requested from webrtc. // h264 baseline and opus are required codecs that all webrtc implementations must provide. export function createSdpInput(audioPort: number, videoPort: number, sdp: string) { @@ -46,6 +50,7 @@ export function createSdpInput(audioPort: number, videoPort: number, sdp: string return outputSdp; } +// todo: move this to webrtc export function findFmtp(sdp: string, codec: string) { let lines = sdp.split('\n').map(line => line.trim()); @@ -62,24 +67,6 @@ export function findFmtp(sdp: string, codec: string) { }) } -export function parsePayloadTypes(sdp: string) { - const audioPayloadTypes = new Set(); - const videoPayloadTypes = new Set(); - const addPts = (set: Set, pts: string[]) => { - for (const pt of pts || []) { - set.add(parseInt(pt)); - } - }; - const audioPts = sdp.match(/m=audio.*/)?.[0]; - addPts(audioPayloadTypes, audioPts?.split(' ').slice(3)); - const videoPts = sdp.match(/m=video.*/)?.[0]; - addPts(videoPayloadTypes, videoPts?.split(' ').slice(3)); - return { - audioPayloadTypes, - videoPayloadTypes, - } -} - function getSections(sdp: string) { const sections = ('\n' + sdp).split('\nm='); return sections; diff --git a/plugins/prebuffer-mixin/src/main.ts b/plugins/prebuffer-mixin/src/main.ts index 65c76b599..659b1f59d 100644 --- a/plugins/prebuffer-mixin/src/main.ts +++ b/plugins/prebuffer-mixin/src/main.ts @@ -5,19 +5,19 @@ import { handleRebroadcasterClient, ParserOptions, ParserSession, setupActivityT import { closeQuiet, createBindZero, listenZeroSingleClient } from '@scrypted/common/src/listen-cluster'; import { safeKillFFmpeg } from '@scrypted/common/src/media-helpers'; import { readLength } from '@scrypted/common/src/read-stream'; -import { createRtspParser, RtspClient, RtspServer, RTSP_FRAME_MAGIC } from '@scrypted/common/src/rtsp-server'; -import { addTrackControls, parsePayloadTypes, parseSdp } from '@scrypted/common/src/sdp-utils'; +import { createRtspParser, H264_NAL_TYPE_IDR, hasH264NaluType, RtspClient, RtspServer, RTSP_FRAME_MAGIC } from '@scrypted/common/src/rtsp-server'; +import { addTrackControls, parseSdp } from '@scrypted/common/src/sdp-utils'; import { StorageSettings } from '@scrypted/common/src/settings'; import { SettingsMixinDeviceBase, SettingsMixinDeviceOptions } from "@scrypted/common/src/settings-mixin"; import { sleep } from '@scrypted/common/src/sleep'; import { createFragmentedMp4Parser, createMpegTsParser, parseMp4StreamChunks, StreamChunk, StreamParser } from '@scrypted/common/src/stream-parser'; import sdk, { BufferConverter, FFMpegInput, MediaObject, MediaStreamOptions, MixinProvider, RequestMediaStreamOptions, ResponseMediaStreamOptions, ScryptedDeviceType, ScryptedInterface, ScryptedMimeTypes, Setting, Settings, SettingValue, VideoCamera, VideoCameraConfiguration } from '@scrypted/sdk'; import crypto from 'crypto'; +import dgram from 'dgram'; import net from 'net'; import { Duplex } from 'stream'; import { connectRFC4571Parser, RtspChannelCodecMapping, startRFC4571Parser } from './rfc4571'; import { createStreamSettings, getPrebufferedStreams } from './stream-settings'; -import dgram from 'dgram'; const { mediaManager, log, systemManager, deviceManager } = sdk; @@ -837,14 +837,8 @@ class PrebufferSession { if (chunk.type === 'mdat') { updateIdr(); } - if (chunk.type === 'rtp-video') { - const fragmentType = chunk.chunks[1].readUInt8(12) & 0x1f; - const second = chunk.chunks[1].readUInt8(13); - const nalType = second & 0x1f; - const startBit = second & 0x80; - if (((fragmentType === 28 || fragmentType === 29) && nalType === 5 && startBit == 128) || fragmentType == 5) { - updateIdr(); - } + if (chunk.type === 'h264' && hasH264NaluType(chunk, H264_NAL_TYPE_IDR)) { + updateIdr(); } prebufferContainer.push({ @@ -948,7 +942,7 @@ class PrebufferSession { session.once('killed', cleanup); const prebufferContainer: PrebufferStreamChunk[] = this.prebuffers[container]; - if (container !== 'rtsp') { + if (true || container !== 'rtsp') { for (const prebuffer of prebufferContainer) { if (prebuffer.time < now - requestedPrebuffer) continue; @@ -1008,34 +1002,24 @@ class PrebufferSession { let socketPromise: Promise; let url: string; let filter: (chunk: StreamChunk) => StreamChunk; + const codecMap = new Map(); if (container === 'rtsp') { - - let audioChannel: number; - let videoChannel: number; - const parsedSdp = parseSdp(sdp); if (parsedSdp.msections.length > 2) { parsedSdp.msections = parsedSdp.msections.filter(msection => msection.codec === mediaStreamOptions.video?.codec || msection.codec === mediaStreamOptions.audio?.codec); sdp = parsedSdp.toSdp(); filter = chunk => { - if (chunk.type === mediaStreamOptions.video?.codec && videoChannel !== undefined) { - const chunks = chunk.chunks.slice(); - const header = chunks[0]; - header.writeUInt8(videoChannel, 1); - return { - startStream: chunk.startStream, - chunks, - } - } - if (chunk.type === mediaStreamOptions.audio?.codec && audioChannel !== undefined) { - const chunks = chunk.chunks.slice(); - const header = chunks[0]; - header.writeUInt8(audioChannel, 1); - return { - startStream: chunk.startStream, - chunks, - } + const channel = codecMap.get(chunk.type); + if (channel == undefined) + return; + const chunks = chunk.chunks.slice(); + const header = Buffer.from(chunks[0]); + header.writeUInt8(channel, 1); + chunks[0] = header; + return { + startStream: chunk.startStream, + chunks, } } } @@ -1046,8 +1030,9 @@ class PrebufferSession { const server = new RtspServer(socket, sdp); server.console = this.console; await server.handlePlayback(); - audioChannel = server.audioChannel; - videoChannel = server.videoChannel; + for (const track of Object.values(server.setupTracks)) { + codecMap.set(track.codec, track.destination); + } return socket; }) url = client.url.replace('tcp://', 'rtsp://'); @@ -1155,6 +1140,7 @@ class PrebufferMixin extends SettingsMixinDeviceBase(); + for (const msection of parsedSdp.msections) { + for (const pt of msection.payloadTypes) { + trackLookups.set(pt, msection.control); + } + } const u = new URL(url); if (!u.protocol.startsWith('tcp')) @@ -1499,19 +1491,15 @@ class PrebufferProvider extends AutoenableMixinProvider implements MixinProvider const length = header.readInt16BE(0); const data = await readLength(socket, length); const pt = data[1] & 0x7f; - if (audioPayloadTypes.has(pt)) { - rtsp.sendAudio(data, false); - } - else if (videoPayloadTypes.has(pt)) { - rtsp.sendVideo(data, false); - } - else { + const track = trackLookups.get(pt); + if (!track) { client.destroy(); socket.destroy(); throw new Error('unknown payload type ' + pt); } + rtsp.sendTrack(track, data, false); } - }) + }); return Buffer.from(JSON.stringify(ffmpeg)); } @@ -1532,7 +1520,7 @@ class PrebufferProvider extends AutoenableMixinProvider implements MixinProvider mixinDeviceState, mixinProviderNativeId: this.nativeId, mixinDeviceInterfaces, - group: "Stream Selection", + group: "Stream Management", groupKey: "prebuffer", }); this.currentMixins.set(mixinDeviceState.id, ret); diff --git a/plugins/prebuffer-mixin/src/rfc4571.ts b/plugins/prebuffer-mixin/src/rfc4571.ts index 1fb5dfb97..5b4d6f42d 100644 --- a/plugins/prebuffer-mixin/src/rfc4571.ts +++ b/plugins/prebuffer-mixin/src/rfc4571.ts @@ -2,15 +2,12 @@ import { cloneDeep } from "@scrypted/common/src/clone-deep"; import { ParserOptions, ParserSession, setupActivityTimer } from "@scrypted/common/src/ffmpeg-rebroadcast"; import { readLength } from "@scrypted/common/src/read-stream"; import { RTSP_FRAME_MAGIC } from "@scrypted/common/src/rtsp-server"; -import { findTrackByType, parseSdp } from "@scrypted/common/src/sdp-utils"; +import { parseSdp } from "@scrypted/common/src/sdp-utils"; import { StreamChunk } from "@scrypted/common/src/stream-parser"; -import sdk, { ResponseMediaStreamOptions } from "@scrypted/sdk"; +import { ResponseMediaStreamOptions } from "@scrypted/sdk"; import net from 'net'; import { EventEmitter, Readable } from "stream"; - -const { mediaManager } = sdk; - export function connectRFC4571Parser(url: string) { const u = new URL(url); if (!u.protocol.startsWith('tcp')) diff --git a/plugins/prebuffer-mixin/src/stream-settings.ts b/plugins/prebuffer-mixin/src/stream-settings.ts index 97cb51bb9..5ff9f0771 100644 --- a/plugins/prebuffer-mixin/src/stream-settings.ts +++ b/plugins/prebuffer-mixin/src/stream-settings.ts @@ -94,7 +94,7 @@ export function createStreamSettings(device: MixinDeviceBase) { title: 'Prebuffered Streams', description: 'Prebuffering maintains an active connection to the stream and improves load times. Prebuffer also retains the recent video for capturing motion events with HomeKit Secure video. Enabling Prebuffer is not recommended on Cloud cameras.', multiple: true, - hide: true, + hide: false, }, ...streamTypes, }); @@ -139,16 +139,20 @@ export function createStreamSettings(device: MixinDeviceBase) { storageSettings.options = { onGet: async () => { + let enabledStreams: StorageSetting; + try { const msos = await device.mixinDevice.getVideoStreamOptions(); + enabledStreams = { + defaultValue: getDefaultPrebufferedStreams(msos)?.map(mso => mso.name), + choices: msos.map(mso => mso.name), + hide: false, + }; + if (msos?.length > 1) { return { - enabledStreams: { - defaultValue: getDefaultPrebufferedStreams(msos)?.map(mso => mso.name), - choices: msos.map(mso => mso.name), - hide: false, - }, + enabledStreams, defaultStream: createStreamOptions(streamTypes.defaultStream, msos), remoteStream: createStreamOptions(streamTypes.remoteStream, msos), lowResolutionStream: createStreamOptions(streamTypes.lowResolutionStream, msos), @@ -156,6 +160,11 @@ export function createStreamSettings(device: MixinDeviceBase) { remoteRecordingStream: createStreamOptions(streamTypes.remoteRecordingStream, msos), } } + else { + return { + enabledStreams, + } + } } catch (e) { device.console.error('error retrieving getVideoStreamOptions', e); diff --git a/sdk/gen/types.input.ts b/sdk/gen/types.input.ts index 376df1d1d..5f474c93e 100644 --- a/sdk/gen/types.input.ts +++ b/sdk/gen/types.input.ts @@ -431,7 +431,7 @@ export interface ResponseMediaStreamOptions extends MediaStreamOptions { sdp?: string; } -export type MediaStreamDestination = "local" | "remote" | "low-resolution" | "local-recorder" | "remote-recorder"; +export type MediaStreamDestination = "local" | "remote" | "medium-resolution" | "low-resolution" | "local-recorder" | "remote-recorder"; export interface RequestMediaStreamOptions extends MediaStreamOptions { /** diff --git a/sdk/types/index.d.ts b/sdk/types/index.d.ts index 2ac671ec7..0002ada9b 100644 --- a/sdk/types/index.d.ts +++ b/sdk/types/index.d.ts @@ -556,7 +556,7 @@ export interface ResponseMediaStreamOptions extends MediaStreamOptions { userConfigurable?: boolean; sdp?: string; } -export declare type MediaStreamDestination = "local" | "remote" | "low-resolution" | "local-recorder" | "remote-recorder"; +export declare type MediaStreamDestination = "local" | "remote" | "medium-resolution" | "low-resolution" | "local-recorder" | "remote-recorder"; export interface RequestMediaStreamOptions extends MediaStreamOptions { /** * When retrieving media, setting disableMediaProxies=true diff --git a/sdk/types/index.ts b/sdk/types/index.ts index 74e75ac6c..f0a2f146a 100644 --- a/sdk/types/index.ts +++ b/sdk/types/index.ts @@ -1132,7 +1132,7 @@ export interface ResponseMediaStreamOptions extends MediaStreamOptions { sdp?: string; } -export type MediaStreamDestination = "local" | "remote" | "low-resolution" | "local-recorder" | "remote-recorder"; +export type MediaStreamDestination = "local" | "remote" | "medium-resolution" | "low-resolution" | "local-recorder" | "remote-recorder"; export interface RequestMediaStreamOptions extends MediaStreamOptions { /**