rebroadcast: rtp mode that may handle audio better.

This commit is contained in:
Koushik Dutta
2022-01-29 23:32:52 -08:00
parent 294072cbd5
commit 70260d2bb4
7 changed files with 190 additions and 34 deletions

View File

@@ -2,11 +2,12 @@ import { createServer, Server } from 'net';
import child_process, { StdioOptions } from 'child_process';
import { ChildProcess } from 'child_process';
import { FFMpegInput, MediaStreamOptions } from '@scrypted/sdk/types';
import { listenZero } from './listen-cluster';
import { bindZero, listenZero } from './listen-cluster';
import { EventEmitter } from 'events';
import sdk from "@scrypted/sdk";
import { ffmpegLogInitialOutput, safePrintFFmpegArguments } from './media-helpers';
import { StreamChunk, StreamParser } from './stream-parser';
import dgram from 'dgram';
const { mediaManager } = sdk;
@@ -18,6 +19,7 @@ export interface MP4Atom {
}
export interface ParserSession<T extends string> {
sdp: Buffer[];
mediaStreamOptions: MediaStreamOptions;
inputAudioCodec?: string;
inputVideoCodec?: string;
@@ -142,15 +144,35 @@ export async function startParserSession<T extends string>(ffmpegInput: FFMpegIn
const stdio: StdioOptions = ['pipe', 'pipe', 'pipe']
let pipeCount = 3;
for (const container of Object.keys(options.parsers)) {
const parser = options.parsers[container];
args.push(
...parser.outputArguments,
`pipe:${pipeCount}`,
);
stdio.push('pipe');
pipeCount++;
const parser: StreamParser = options.parsers[container];
if (parser.parseDatagram) {
const socket = dgram.createSocket('udp4')
const udp = await bindZero(socket);
args.push(
...parser.outputArguments,
udp.url,
);
(async () => {
for await (const chunk of parser.parseDatagram(socket, parseInt(inputVideoResolution?.[2]), parseInt(inputVideoResolution?.[3]))) {
ffmpegStartedResolve?.(undefined);
events.emit(container, chunk);
resetActivityTimer();
}
})();
}
else {
args.push(
...parser.outputArguments,
`pipe:${pipeCount++}`,
);
stdio.push('pipe');
}
}
args.push('-sdp_file', `pipe:${pipeCount++}`);
stdio.push('pipe');
// start ffmpeg process with child process pipes
args.unshift('-hide_banner');
safePrintFFmpegArguments(console, args);
@@ -160,13 +182,20 @@ export async function startParserSession<T extends string>(ffmpegInput: FFMpegIn
ffmpegLogInitialOutput(console, cp);
cp.on('exit', kill);
const sdp: Buffer[] = [];
(cp.stdio[pipeCount - 1]).on('data', buffer => sdp.push(buffer));
// now parse the created pipes
Object.keys(options.parsers).forEach(async (container, index) => {
const pipe = cp.stdio[3 + index];
const parser = options.parsers[container];
let pipeIndex = 0;
Object.keys(options.parsers).forEach(async (container) => {
const parser: StreamParser = options.parsers[container];
if (!parser.parse)
return;
const pipe = cp.stdio[3 + pipeIndex];
pipeIndex++;
try {
for await (const chunk of parser.parse(pipe, parseInt(inputVideoResolution?.[2]), parseInt(inputVideoResolution?.[3]))) {
for await (const chunk of parser.parse(pipe as any, parseInt(inputVideoResolution?.[2]), parseInt(inputVideoResolution?.[3]))) {
ffmpegStartedResolve?.(undefined);
events.emit(container, chunk);
resetActivityTimer();
@@ -189,6 +218,7 @@ export async function startParserSession<T extends string>(ffmpegInput: FFMpegIn
clearTimeout(ffmpegIncomingConnectionTimeout);
return {
sdp,
inputAudioCodec,
inputVideoCodec,
inputVideoResolution,
@@ -225,7 +255,7 @@ export interface RebroadcastSessionCleanup {
}
export interface RebroadcasterOptions {
connect?: (writeData: (data: StreamChunk) => number, cleanup: () => void) => RebroadcastSessionCleanup|undefined;
connect?: (writeData: (data: StreamChunk) => number, cleanup: () => void) => RebroadcastSessionCleanup | undefined;
console?: Console;
}

View File

@@ -11,7 +11,11 @@ export async function listenZero(server: net.Server) {
export async function bindZero(server: dgram.Socket) {
server.bind(0);
await once(server, 'listening');
return (server.address() as net.AddressInfo).port;
const { port } = server.address() as net.AddressInfo;
return {
port,
url: `udp://127.0.0.1:${port}`,
}
}
export async function listenZeroSingleClient() {

View File

@@ -1,12 +1,14 @@
import { once } from "events";
import { Socket } from "net";
import { Socket as DatagramSocket } from "dgram";
import { Readable } from "stream";
import { readLength } from "./read-length";
export interface StreamParser {
container: string;
outputArguments: string[];
parse: (socket: Socket, width: number, height: number) => AsyncGenerator<StreamChunk>;
parse?: (socket: Socket, width: number, height: number) => AsyncGenerator<StreamChunk>;
parseDatagram?: (socket: DatagramSocket, width: number, height: number) => AsyncGenerator<StreamChunk>;
findSyncFrame(streamChunks: StreamChunk[]): StreamChunk[];
}
@@ -89,6 +91,30 @@ export function createPCMParser(): StreamParser {
}
}
export function createDgramParser() {
async function* parse(socket: DatagramSocket) {
while (true) {
const [buffer] = await once(socket, 'message');
yield {
chunks: [buffer],
}
}
};
return parse;
}
export function createRtpParser(...codec: string[]): StreamParser {
return {
container: 'sdp',
outputArguments: [
...codec,
'-f', 'rtp',
],
parseDatagram: createDgramParser(),
findSyncFrame,
}
}
export function createMpegTsParser(options?: StreamParserOptions): StreamParser {
return {
container: 'mpegts',

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/prebuffer-mixin",
"version": "0.1.142",
"version": "0.1.143",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/prebuffer-mixin",
"version": "0.1.142",
"version": "0.1.143",
"license": "Apache-2.0",
"dependencies": {
"@scrypted/common": "file:../../common",

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/prebuffer-mixin",
"version": "0.1.142",
"version": "0.1.143",
"description": "Rebroadcast and Prebuffer for VideoCameras.",
"author": "Scrypted",
"license": "Apache-2.0",

View File

@@ -4,8 +4,10 @@ import sdk from '@scrypted/sdk';
import { once } from 'events';
import { SettingsMixinDeviceBase } from "../../../common/src/settings-mixin";
import { createRebroadcaster, ParserOptions, ParserSession, startParserSession } from '@scrypted/common/src/ffmpeg-rebroadcast';
import { createMpegTsParser, createFragmentedMp4Parser, StreamChunk, createPCMParser, StreamParser } from '@scrypted/common/src/stream-parser';
import { createMpegTsParser, createFragmentedMp4Parser, StreamChunk, createPCMParser, StreamParser, createRtpParser } from '@scrypted/common/src/stream-parser';
import { AutoenableMixinProvider } from '@scrypted/common/src/autoenable-mixin-provider';
import dgram from 'dgram';
import { listenZeroSingleClient } from '@scrypted/common/src/listen-cluster';
const { mediaManager, log, systemManager, deviceManager } = sdk;
@@ -14,6 +16,7 @@ const PREBUFFER_DURATION_MS = 'prebufferDuration';
const SEND_KEYFRAME = 'sendKeyframe';
const AUDIO_CONFIGURATION_KEY_PREFIX = 'audioConfiguration-';
const FFMPEG_INPUT_ARGUMENTS_KEY_PREFIX = 'ffmpegInputArguments-';
const REBROADCAST_MODE_KEY_PREFIX = 'rebroadcastMode-';
const DEFAULT_AUDIO = 'Default';
const AAC_AUDIO = 'AAC or No Audio';
const AAC_AUDIO_DESCRIPTION = `${AAC_AUDIO} (Copy)`;
@@ -30,7 +33,7 @@ const VALID_AUDIO_CONFIGS = [
AAC_AUDIO,
COMPATIBLE_AUDIO,
TRANSCODE_AUDIO,
PCM_AUDIO,
// PCM_AUDIO,
];
interface PrebufferStreamChunk {
@@ -42,10 +45,12 @@ interface Prebuffers {
mp4: PrebufferStreamChunk[];
mpegts: PrebufferStreamChunk[];
s16le: PrebufferStreamChunk[];
rtpvideo: PrebufferStreamChunk[];
rtpaudio: PrebufferStreamChunk[];
}
type PrebufferParsers = "mpegts" | "mp4" | "s16le";
const PrebufferParserValues: PrebufferParsers[] = ['mpegts', 'mp4', 's16le'];
type PrebufferParsers = "mpegts" | "mp4" | "s16le" | "rtpvideo" | "rtpaudio";
const PrebufferParserValues: PrebufferParsers[] = ['mpegts', 'mp4', 's16le', 'rtpvideo', 'rtpaudio'];
class PrebufferSession {
@@ -55,6 +60,8 @@ class PrebufferSession {
mp4: [],
mpegts: [],
s16le: [],
rtpvideo: [],
rtpaudio: [],
};
parsers: { [container: string]: StreamParser };
@@ -72,6 +79,7 @@ class PrebufferSession {
inactivityTimeout: NodeJS.Timeout;
audioConfigurationKey: string;
ffmpegInputArgumentsKey: string;
rebroadcastModeKey: string;
constructor(public mixin: PrebufferMixin, public streamName: string, public streamId: string, public stopInactive: boolean) {
this.storage = mixin.storage;
@@ -79,12 +87,15 @@ class PrebufferSession {
this.mixinDevice = mixin.mixinDevice;
this.audioConfigurationKey = AUDIO_CONFIGURATION_KEY_PREFIX + this.streamId;
this.ffmpegInputArgumentsKey = FFMPEG_INPUT_ARGUMENTS_KEY_PREFIX + this.streamId;
this.rebroadcastModeKey = REBROADCAST_MODE_KEY_PREFIX + this.streamId;
}
clearPrebuffers() {
this.prebuffers.mp4 = [];
this.prebuffers.mpegts = [];
this.prebuffers.s16le = [];
this.prebuffers.rtpaudio = [];
this.prebuffers.rtpvideo = [];
}
ensurePrebufferSession() {
@@ -167,6 +178,18 @@ class PrebufferSession {
'-v verbose',
],
combobox: true,
},
{
title: 'Rebroadcast Mode',
group,
description: 'The stream format to use when rebroadcasting. RTP will increase startup time but may resolve PCM audio issues.',
placeholder: 'MPEG-TS',
choices: [
'MPEG-TS',
'RTP',
],
key: this.rebroadcastModeKey,
value: this.storage.getItem(this.rebroadcastModeKey) || 'MPEG-TS',
}
);
@@ -218,6 +241,8 @@ class PrebufferSession {
this.prebuffers.mp4 = [];
this.prebuffers.mpegts = [];
this.prebuffers.s16le = [];
this.prebuffers.rtpvideo = [];
this.prebuffers.rtpaudio = [];
const prebufferDurationMs = parseInt(this.storage.getItem(PREBUFFER_DURATION_MS)) || defaultPrebufferDuration;
let mso: MediaStreamOptions;
@@ -354,13 +379,21 @@ class PrebufferSession {
vcodec,
acodec,
}),
mpegts: createMpegTsParser({
vcodec,
acodec,
}),
},
};
const rtpMode = this.storage.getItem(this.rebroadcastModeKey) === 'RTP';
if (!rtpMode) {
rbo.parsers.mpegts = createMpegTsParser({
vcodec,
acodec,
});
}
else {
rbo.parsers.rtpvideo = createRtpParser('-an', '-vcodec', 'copy');
rbo.parsers.rtpaudio = createRtpParser('-vn', '-acodec', 'copy');
}
// if pcm prebuffer is requested, create the the parser. don't do it if
// the camera wants to mute the audio though, or no audio was detected
// in a prior attempt.
@@ -436,6 +469,9 @@ class PrebufferSession {
// s16le will be a no-op if there's no pcm, no harm.
for (const container of PrebufferParserValues) {
if (this.parsers[container]?.parseDatagram)
continue;
let shifts = 0;
session.on(container, (chunk: StreamChunk) => {
@@ -503,6 +539,65 @@ class PrebufferSession {
const createContainerServer = async (container: PrebufferParsers) => {
const prebufferContainer: PrebufferStreamChunk[] = this.prebuffers[container];
if (this.parsers[container].parseDatagram) {
let sdp = Buffer.concat(session.sdp).toString();
const audioPort = Math.round(Math.random() * 40000 + 10000);
const videoPort = Math.round(Math.random() * 40000 + 10000);
sdp = sdp.replace('m=audio 0', 'm=audio ' + audioPort);
sdp = sdp.replace('m=video 0', 'm=video ' + videoPort);
const d = dgram.createSocket('udp4');
d.bind();
const safeWriteData = (chunk: StreamChunk, port: number) => {
for (const c of chunk.chunks) {
d.send(c, port);
}
}
const wv = (chunk: StreamChunk) => safeWriteData(chunk, videoPort);
const wa = (chunk: StreamChunk) => safeWriteData(chunk, audioPort);
const cleanup = () => {
d.close();
session.removeListener('rtpvideo', wv);
session.removeListener('rtpaudio', wa);
session.removeListener('killed', cleanup);
}
session.once('killed', cleanup);
const sdpClient = await listenZeroSingleClient();
sdpClient.clientPromise.then(async (c) => {
this.activeClients++;
this.printActiveClients();
c.once('close', () => {
this.activeClients--;
this.inactivityCheck(session);
cleanup();
});
c.write(sdp);
c.end();
// await new Promise(resolve => setTimeout(resolve, 500));
// for (const prebuffer of this.prebuffers.rtpvideo) {
// if (prebuffer.time < now - requestedPrebuffer)
// continue;
// safeWriteData(prebuffer.chunk, videoPort);
// }
// for (const prebuffer of this.prebuffers.rtpaudio) {
// if (prebuffer.time < now - requestedPrebuffer)
// continue;
// safeWriteData(prebuffer.chunk, audioPort);
// }
})
.catch(cleanup);
session.on('rtpvideo', wv)
session.on('rtpaudio', wa);
return sdpClient.url;
}
const { server, port } = await createRebroadcaster({
console: this.console,
connect: (writeData, destroy) => {
@@ -557,10 +652,13 @@ class PrebufferSession {
setTimeout(() => server.close(), 30000);
return port;
return `tcp://127.0.0.1:${port}`;
}
const container: PrebufferParsers = PrebufferParserValues.find(parser => parser === options?.container) || 'mpegts';
const rtpMode = this.storage.getItem(this.rebroadcastModeKey) === 'RTP';
const defaultContainer = rtpMode ? 'rtpvideo' : 'mpegts';
const container: PrebufferParsers = this.parsers[options?.container] ? options?.container as PrebufferParsers : defaultContainer;
const mediaStreamOptions: MediaStreamOptions = Object.assign({}, session.mediaStreamOptions);
@@ -604,13 +702,13 @@ class PrebufferSession {
const length = Math.max(500000, available).toString();
const url = `tcp://127.0.0.1:${await createContainerServer(container)}`;
const url = await createContainerServer(container);
const ffmpegInput: FFMpegInput = {
url,
container,
inputArguments: [
'-analyzeduration', '0', '-probesize', length,
'-f', container,
'-f', this.parsers[container].container,
'-i', url,
],
mediaStreamOptions,
@@ -620,7 +718,7 @@ class PrebufferSession {
ffmpegInput.inputArguments.push(
'-analyzeduration', '0', '-probesize', length,
'-f', 's16le',
'-i', `tcp://127.0.0.1:${await createContainerServer('s16le')}`,
'-i', await createContainerServer('s16le'),
)
}

View File

@@ -1,6 +1,4 @@
import { ChildProcess } from "child_process";
import { MediaStreamOptions, VideoCamera } from "@scrypted/sdk";
const filtered = [
'decode_slice_header error',
@@ -41,7 +39,7 @@ export function safePrintFFmpegArguments(console: Console, args: string[]) {
for (const arg of args) {
try {
const url = new URL(arg);
ret.push(`${url.protocol}:[REDACTED]`)
ret.push(`${url.protocol}[REDACTED]`)
}
catch (e) {
ret.push(arg);