diff --git a/common/src/ffmpeg-rebroadcast.ts b/common/src/ffmpeg-rebroadcast.ts index 8d360eff5..b97ad0f55 100644 --- a/common/src/ffmpeg-rebroadcast.ts +++ b/common/src/ffmpeg-rebroadcast.ts @@ -2,11 +2,12 @@ import { createServer, Server } from 'net'; import child_process, { StdioOptions } from 'child_process'; import { ChildProcess } from 'child_process'; import { FFMpegInput, MediaStreamOptions } from '@scrypted/sdk/types'; -import { listenZero } from './listen-cluster'; +import { bindZero, listenZero } from './listen-cluster'; import { EventEmitter } from 'events'; import sdk from "@scrypted/sdk"; import { ffmpegLogInitialOutput, safePrintFFmpegArguments } from './media-helpers'; import { StreamChunk, StreamParser } from './stream-parser'; +import dgram from 'dgram'; const { mediaManager } = sdk; @@ -18,6 +19,7 @@ export interface MP4Atom { } export interface ParserSession { + sdp: Buffer[]; mediaStreamOptions: MediaStreamOptions; inputAudioCodec?: string; inputVideoCodec?: string; @@ -142,15 +144,35 @@ export async function startParserSession(ffmpegInput: FFMpegIn const stdio: StdioOptions = ['pipe', 'pipe', 'pipe'] let pipeCount = 3; for (const container of Object.keys(options.parsers)) { - const parser = options.parsers[container]; - args.push( - ...parser.outputArguments, - `pipe:${pipeCount}`, - ); - stdio.push('pipe'); - pipeCount++; + const parser: StreamParser = options.parsers[container]; + if (parser.parseDatagram) { + const socket = dgram.createSocket('udp4') + const udp = await bindZero(socket); + args.push( + ...parser.outputArguments, + udp.url, + ); + + (async () => { + for await (const chunk of parser.parseDatagram(socket, parseInt(inputVideoResolution?.[2]), parseInt(inputVideoResolution?.[3]))) { + ffmpegStartedResolve?.(undefined); + events.emit(container, chunk); + resetActivityTimer(); + } + })(); + } + else { + args.push( + ...parser.outputArguments, + `pipe:${pipeCount++}`, + ); + stdio.push('pipe'); + } } + args.push('-sdp_file', `pipe:${pipeCount++}`); + stdio.push('pipe'); + // start ffmpeg process with child process pipes args.unshift('-hide_banner'); safePrintFFmpegArguments(console, args); @@ -160,13 +182,20 @@ export async function startParserSession(ffmpegInput: FFMpegIn ffmpegLogInitialOutput(console, cp); cp.on('exit', kill); + const sdp: Buffer[] = []; + (cp.stdio[pipeCount - 1]).on('data', buffer => sdp.push(buffer)); + // now parse the created pipes - Object.keys(options.parsers).forEach(async (container, index) => { - const pipe = cp.stdio[3 + index]; - const parser = options.parsers[container]; + let pipeIndex = 0; + Object.keys(options.parsers).forEach(async (container) => { + const parser: StreamParser = options.parsers[container]; + if (!parser.parse) + return; + const pipe = cp.stdio[3 + pipeIndex]; + pipeIndex++; try { - for await (const chunk of parser.parse(pipe, parseInt(inputVideoResolution?.[2]), parseInt(inputVideoResolution?.[3]))) { + for await (const chunk of parser.parse(pipe as any, parseInt(inputVideoResolution?.[2]), parseInt(inputVideoResolution?.[3]))) { ffmpegStartedResolve?.(undefined); events.emit(container, chunk); resetActivityTimer(); @@ -189,6 +218,7 @@ export async function startParserSession(ffmpegInput: FFMpegIn clearTimeout(ffmpegIncomingConnectionTimeout); return { + sdp, inputAudioCodec, inputVideoCodec, inputVideoResolution, @@ -225,7 +255,7 @@ export interface RebroadcastSessionCleanup { } export interface RebroadcasterOptions { - connect?: (writeData: (data: StreamChunk) => number, cleanup: () => void) => RebroadcastSessionCleanup|undefined; + connect?: (writeData: (data: StreamChunk) => number, cleanup: () => void) => RebroadcastSessionCleanup | undefined; console?: Console; } diff --git a/common/src/listen-cluster.ts b/common/src/listen-cluster.ts index 82c5d6e22..971ead63b 100644 --- a/common/src/listen-cluster.ts +++ b/common/src/listen-cluster.ts @@ -11,7 +11,11 @@ export async function listenZero(server: net.Server) { export async function bindZero(server: dgram.Socket) { server.bind(0); await once(server, 'listening'); - return (server.address() as net.AddressInfo).port; + const { port } = server.address() as net.AddressInfo; + return { + port, + url: `udp://127.0.0.1:${port}`, + } } export async function listenZeroSingleClient() { diff --git a/common/src/stream-parser.ts b/common/src/stream-parser.ts index cb0bdc70f..aad1f304e 100644 --- a/common/src/stream-parser.ts +++ b/common/src/stream-parser.ts @@ -1,12 +1,14 @@ import { once } from "events"; import { Socket } from "net"; +import { Socket as DatagramSocket } from "dgram"; import { Readable } from "stream"; import { readLength } from "./read-length"; export interface StreamParser { container: string; outputArguments: string[]; - parse: (socket: Socket, width: number, height: number) => AsyncGenerator; + parse?: (socket: Socket, width: number, height: number) => AsyncGenerator; + parseDatagram?: (socket: DatagramSocket, width: number, height: number) => AsyncGenerator; findSyncFrame(streamChunks: StreamChunk[]): StreamChunk[]; } @@ -89,6 +91,30 @@ export function createPCMParser(): StreamParser { } } +export function createDgramParser() { + async function* parse(socket: DatagramSocket) { + while (true) { + const [buffer] = await once(socket, 'message'); + yield { + chunks: [buffer], + } + } + }; + return parse; +} + +export function createRtpParser(...codec: string[]): StreamParser { + return { + container: 'sdp', + outputArguments: [ + ...codec, + '-f', 'rtp', + ], + parseDatagram: createDgramParser(), + findSyncFrame, + } +} + export function createMpegTsParser(options?: StreamParserOptions): StreamParser { return { container: 'mpegts', diff --git a/plugins/prebuffer-mixin/package-lock.json b/plugins/prebuffer-mixin/package-lock.json index b29e86f3f..180e4e03b 100644 --- a/plugins/prebuffer-mixin/package-lock.json +++ b/plugins/prebuffer-mixin/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/prebuffer-mixin", - "version": "0.1.142", + "version": "0.1.143", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/prebuffer-mixin", - "version": "0.1.142", + "version": "0.1.143", "license": "Apache-2.0", "dependencies": { "@scrypted/common": "file:../../common", diff --git a/plugins/prebuffer-mixin/package.json b/plugins/prebuffer-mixin/package.json index c5b6c7f4d..edcbf82e9 100644 --- a/plugins/prebuffer-mixin/package.json +++ b/plugins/prebuffer-mixin/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/prebuffer-mixin", - "version": "0.1.142", + "version": "0.1.143", "description": "Rebroadcast and Prebuffer for VideoCameras.", "author": "Scrypted", "license": "Apache-2.0", diff --git a/plugins/prebuffer-mixin/src/main.ts b/plugins/prebuffer-mixin/src/main.ts index 54c4433cc..d3b27e1b6 100644 --- a/plugins/prebuffer-mixin/src/main.ts +++ b/plugins/prebuffer-mixin/src/main.ts @@ -4,8 +4,10 @@ import sdk from '@scrypted/sdk'; import { once } from 'events'; import { SettingsMixinDeviceBase } from "../../../common/src/settings-mixin"; import { createRebroadcaster, ParserOptions, ParserSession, startParserSession } from '@scrypted/common/src/ffmpeg-rebroadcast'; -import { createMpegTsParser, createFragmentedMp4Parser, StreamChunk, createPCMParser, StreamParser } from '@scrypted/common/src/stream-parser'; +import { createMpegTsParser, createFragmentedMp4Parser, StreamChunk, createPCMParser, StreamParser, createRtpParser } from '@scrypted/common/src/stream-parser'; import { AutoenableMixinProvider } from '@scrypted/common/src/autoenable-mixin-provider'; +import dgram from 'dgram'; +import { listenZeroSingleClient } from '@scrypted/common/src/listen-cluster'; const { mediaManager, log, systemManager, deviceManager } = sdk; @@ -14,6 +16,7 @@ const PREBUFFER_DURATION_MS = 'prebufferDuration'; const SEND_KEYFRAME = 'sendKeyframe'; const AUDIO_CONFIGURATION_KEY_PREFIX = 'audioConfiguration-'; const FFMPEG_INPUT_ARGUMENTS_KEY_PREFIX = 'ffmpegInputArguments-'; +const REBROADCAST_MODE_KEY_PREFIX = 'rebroadcastMode-'; const DEFAULT_AUDIO = 'Default'; const AAC_AUDIO = 'AAC or No Audio'; const AAC_AUDIO_DESCRIPTION = `${AAC_AUDIO} (Copy)`; @@ -30,7 +33,7 @@ const VALID_AUDIO_CONFIGS = [ AAC_AUDIO, COMPATIBLE_AUDIO, TRANSCODE_AUDIO, - PCM_AUDIO, + // PCM_AUDIO, ]; interface PrebufferStreamChunk { @@ -42,10 +45,12 @@ interface Prebuffers { mp4: PrebufferStreamChunk[]; mpegts: PrebufferStreamChunk[]; s16le: PrebufferStreamChunk[]; + rtpvideo: PrebufferStreamChunk[]; + rtpaudio: PrebufferStreamChunk[]; } -type PrebufferParsers = "mpegts" | "mp4" | "s16le"; -const PrebufferParserValues: PrebufferParsers[] = ['mpegts', 'mp4', 's16le']; +type PrebufferParsers = "mpegts" | "mp4" | "s16le" | "rtpvideo" | "rtpaudio"; +const PrebufferParserValues: PrebufferParsers[] = ['mpegts', 'mp4', 's16le', 'rtpvideo', 'rtpaudio']; class PrebufferSession { @@ -55,6 +60,8 @@ class PrebufferSession { mp4: [], mpegts: [], s16le: [], + rtpvideo: [], + rtpaudio: [], }; parsers: { [container: string]: StreamParser }; @@ -72,6 +79,7 @@ class PrebufferSession { inactivityTimeout: NodeJS.Timeout; audioConfigurationKey: string; ffmpegInputArgumentsKey: string; + rebroadcastModeKey: string; constructor(public mixin: PrebufferMixin, public streamName: string, public streamId: string, public stopInactive: boolean) { this.storage = mixin.storage; @@ -79,12 +87,15 @@ class PrebufferSession { this.mixinDevice = mixin.mixinDevice; this.audioConfigurationKey = AUDIO_CONFIGURATION_KEY_PREFIX + this.streamId; this.ffmpegInputArgumentsKey = FFMPEG_INPUT_ARGUMENTS_KEY_PREFIX + this.streamId; + this.rebroadcastModeKey = REBROADCAST_MODE_KEY_PREFIX + this.streamId; } clearPrebuffers() { this.prebuffers.mp4 = []; this.prebuffers.mpegts = []; this.prebuffers.s16le = []; + this.prebuffers.rtpaudio = []; + this.prebuffers.rtpvideo = []; } ensurePrebufferSession() { @@ -167,6 +178,18 @@ class PrebufferSession { '-v verbose', ], combobox: true, + }, + { + title: 'Rebroadcast Mode', + group, + description: 'The stream format to use when rebroadcasting. RTP will increase startup time but may resolve PCM audio issues.', + placeholder: 'MPEG-TS', + choices: [ + 'MPEG-TS', + 'RTP', + ], + key: this.rebroadcastModeKey, + value: this.storage.getItem(this.rebroadcastModeKey) || 'MPEG-TS', } ); @@ -218,6 +241,8 @@ class PrebufferSession { this.prebuffers.mp4 = []; this.prebuffers.mpegts = []; this.prebuffers.s16le = []; + this.prebuffers.rtpvideo = []; + this.prebuffers.rtpaudio = []; const prebufferDurationMs = parseInt(this.storage.getItem(PREBUFFER_DURATION_MS)) || defaultPrebufferDuration; let mso: MediaStreamOptions; @@ -354,13 +379,21 @@ class PrebufferSession { vcodec, acodec, }), - mpegts: createMpegTsParser({ - vcodec, - acodec, - }), }, }; + const rtpMode = this.storage.getItem(this.rebroadcastModeKey) === 'RTP'; + if (!rtpMode) { + rbo.parsers.mpegts = createMpegTsParser({ + vcodec, + acodec, + }); + } + else { + rbo.parsers.rtpvideo = createRtpParser('-an', '-vcodec', 'copy'); + rbo.parsers.rtpaudio = createRtpParser('-vn', '-acodec', 'copy'); + } + // if pcm prebuffer is requested, create the the parser. don't do it if // the camera wants to mute the audio though, or no audio was detected // in a prior attempt. @@ -436,6 +469,9 @@ class PrebufferSession { // s16le will be a no-op if there's no pcm, no harm. for (const container of PrebufferParserValues) { + if (this.parsers[container]?.parseDatagram) + continue; + let shifts = 0; session.on(container, (chunk: StreamChunk) => { @@ -503,6 +539,65 @@ class PrebufferSession { const createContainerServer = async (container: PrebufferParsers) => { const prebufferContainer: PrebufferStreamChunk[] = this.prebuffers[container]; + if (this.parsers[container].parseDatagram) { + let sdp = Buffer.concat(session.sdp).toString(); + const audioPort = Math.round(Math.random() * 40000 + 10000); + const videoPort = Math.round(Math.random() * 40000 + 10000); + sdp = sdp.replace('m=audio 0', 'm=audio ' + audioPort); + sdp = sdp.replace('m=video 0', 'm=video ' + videoPort); + + const d = dgram.createSocket('udp4'); + d.bind(); + + const safeWriteData = (chunk: StreamChunk, port: number) => { + for (const c of chunk.chunks) { + d.send(c, port); + } + } + + const wv = (chunk: StreamChunk) => safeWriteData(chunk, videoPort); + const wa = (chunk: StreamChunk) => safeWriteData(chunk, audioPort); + const cleanup = () => { + d.close(); + session.removeListener('rtpvideo', wv); + session.removeListener('rtpaudio', wa); + session.removeListener('killed', cleanup); + } + + session.once('killed', cleanup); + + const sdpClient = await listenZeroSingleClient(); + sdpClient.clientPromise.then(async (c) => { + this.activeClients++; + this.printActiveClients(); + + c.once('close', () => { + this.activeClients--; + this.inactivityCheck(session); + cleanup(); + }); + c.write(sdp); + c.end(); + + // await new Promise(resolve => setTimeout(resolve, 500)); + // for (const prebuffer of this.prebuffers.rtpvideo) { + // if (prebuffer.time < now - requestedPrebuffer) + // continue; + // safeWriteData(prebuffer.chunk, videoPort); + // } + // for (const prebuffer of this.prebuffers.rtpaudio) { + // if (prebuffer.time < now - requestedPrebuffer) + // continue; + // safeWriteData(prebuffer.chunk, audioPort); + // } + }) + .catch(cleanup); + + session.on('rtpvideo', wv) + session.on('rtpaudio', wa); + return sdpClient.url; + } + const { server, port } = await createRebroadcaster({ console: this.console, connect: (writeData, destroy) => { @@ -557,10 +652,13 @@ class PrebufferSession { setTimeout(() => server.close(), 30000); - return port; + return `tcp://127.0.0.1:${port}`; } - const container: PrebufferParsers = PrebufferParserValues.find(parser => parser === options?.container) || 'mpegts'; + const rtpMode = this.storage.getItem(this.rebroadcastModeKey) === 'RTP'; + const defaultContainer = rtpMode ? 'rtpvideo' : 'mpegts'; + + const container: PrebufferParsers = this.parsers[options?.container] ? options?.container as PrebufferParsers : defaultContainer; const mediaStreamOptions: MediaStreamOptions = Object.assign({}, session.mediaStreamOptions); @@ -604,13 +702,13 @@ class PrebufferSession { const length = Math.max(500000, available).toString(); - const url = `tcp://127.0.0.1:${await createContainerServer(container)}`; + const url = await createContainerServer(container); const ffmpegInput: FFMpegInput = { url, container, inputArguments: [ '-analyzeduration', '0', '-probesize', length, - '-f', container, + '-f', this.parsers[container].container, '-i', url, ], mediaStreamOptions, @@ -620,7 +718,7 @@ class PrebufferSession { ffmpegInput.inputArguments.push( '-analyzeduration', '0', '-probesize', length, '-f', 's16le', - '-i', `tcp://127.0.0.1:${await createContainerServer('s16le')}`, + '-i', await createContainerServer('s16le'), ) } diff --git a/server/src/media-helpers.ts b/server/src/media-helpers.ts index c5e47c82a..c478c0c13 100644 --- a/server/src/media-helpers.ts +++ b/server/src/media-helpers.ts @@ -1,6 +1,4 @@ import { ChildProcess } from "child_process"; -import { MediaStreamOptions, VideoCamera } from "@scrypted/sdk"; - const filtered = [ 'decode_slice_header error', @@ -41,7 +39,7 @@ export function safePrintFFmpegArguments(console: Console, args: string[]) { for (const arg of args) { try { const url = new URL(arg); - ret.push(`${url.protocol}:[REDACTED]`) + ret.push(`${url.protocol}[REDACTED]`) } catch (e) { ret.push(arg);