Revert "rebroadcast: handle pcm audio"

This reverts commit 66a2259242.
This commit is contained in:
Koushik Dutta
2021-10-12 13:20:53 -07:00
parent 119af4093e
commit 2fb11368f1
2 changed files with 104 additions and 180 deletions

View File

@@ -22,82 +22,47 @@ export interface StreamChunk {
height?: number;
}
// function checkTsPacket(pkt: Buffer) {
// const pid = ((pkt[1] & 0x1F) << 8) | pkt[2];
// if (pid == 256) {
// // found video stream
// if ((pkt[3] & 0x20) && (pkt[4] > 0)) {
// // have AF
// if (pkt[5] & 0x40) {
// // found keyframe
// console.log('keyframe');
// }
// }
// }
// }
function createLengthParser(length: number, verify?: (concat: Buffer) => void) {
async function* parse(socket: Socket): AsyncGenerator<StreamChunk> {
let pending: Buffer[] = [];
let pendingSize = 0;
while (true) {
const data: Buffer = socket.read();
if (!data) {
await once(socket, 'readable');
continue;
}
pending.push(data);
pendingSize += data.length;
if (pendingSize < length)
continue;
const concat = Buffer.concat(pending);
verify?.(concat);
const remaining = concat.length % length;
const left = concat.slice(0, concat.length - remaining);
const right = concat.slice(concat.length - remaining);
pending = [right];
pendingSize = right.length;
yield {
chunks: [left],
};
}
}
return parse;
}
// -ac num channels? cameras are always mono?
export function createPCMParser(): StreamParser {
return {
container: 's16le',
outputArguments: [
'-vn',
'-acodec', 'pcm_s16le',
'-f', 's16le',
],
parse: createLengthParser(64),
}
}
export function createMpegTsParser(options?: StreamParserOptions): StreamParser {
return {
container: 'mpegts',
outputArguments: [
'-f', 'mpegts',
...(options?.vcodec || []),
...(options?.acodec || []),
'-f', 'mpegts',
],
parse: createLengthParser(188, concat => {
if (concat[0] != 0x47) {
throw new Error('Invalid sync byte in mpeg-ts packet. Terminating stream.')
async *parse(socket: Socket): AsyncGenerator<StreamChunk> {
let pending: Buffer[] = [];
let pendingSize = 0;
while (true) {
const data: Buffer = socket.read();
if (!data) {
await once(socket, 'readable');
continue;
}
pending.push(data);
pendingSize += data.length;
if (pendingSize < 188)
continue;
const concat = Buffer.concat(pending);
if (concat[0] != 0x47) {
throw new Error('Invalid sync byte in mpeg-ts packet. Terminating stream.')
}
const remaining = concat.length % 188;
const left = concat.slice(0, concat.length - remaining);
const right = concat.slice(concat.length - remaining);
pending = [right];
pendingSize = right.length;
yield {
chunks: [left],
};
}
}),
}
}
}
export interface MP4Atom {
header: Buffer;
length: number;
@@ -125,10 +90,10 @@ export function createFragmentedMp4Parser(options?: StreamParserOptions): Stream
return {
container: 'mp4',
outputArguments: [
'-f', 'mp4',
...(options?.vcodec || []),
...(options?.acodec || []),
'-movflags', 'frag_keyframe+empty_moov+default_base_moof',
'-f', 'mp4',
],
async *parse(socket: Socket): AsyncGenerator<StreamChunk> {
const parser = parseFragmentedMP4(socket);
@@ -143,7 +108,7 @@ export function createFragmentedMp4Parser(options?: StreamParserOptions): Stream
moov = atom;
}
yield {
yield{
startStream,
chunks: [atom.header, atom.data],
type: atom.type,
@@ -191,7 +156,7 @@ export function createRawVideoParser(options?: RawVideoParserOptions): StreamPar
],
async *parse(socket: Socket, width: number, height: number): AsyncGenerator<StreamChunk> {
if (!width || !height)
throw new Error("error parsing rawvideo, unknown width and height");
throw new Error("error parsing rawvideo, unknown width and height");
width = size?.width || width;
height = size?.height || height

View File

@@ -5,9 +5,9 @@ import { Server } from 'net';
import { listenZeroCluster } from '@scrypted/common/src/listen-cluster';
import EventEmitter from 'events';
import { SettingsMixinDeviceBase } from "../../../common/src/settings-mixin";
import { FFMpegRebroadcastOptions, FFMpegRebroadcastSession, startRebroadcastSession } from '@scrypted/common/src/ffmpeg-rebroadcast';
import { FFMpegRebroadcastSession, startRebroadcastSession } from '@scrypted/common/src/ffmpeg-rebroadcast';
import { probeVideoCamera } from '@scrypted/common/src/media-helpers';
import { createMpegTsParser, createFragmentedMp4Parser, MP4Atom, StreamChunk, createPCMParser } from '@scrypted/common/src/stream-parser';
import { createMpegTsParser, createFragmentedMp4Parser, MP4Atom, StreamChunk } from '@scrypted/common/src/stream-parser';
import { AutoenableMixinProvider } from '@scrypted/common/src/autoenable-mixin-provider';
const { mediaManager, log, systemManager, deviceManager } = sdk;
@@ -15,10 +15,8 @@ const { mediaManager, log, systemManager, deviceManager } = sdk;
const defaultPrebufferDuration = 15000;
const PREBUFFER_DURATION_MS = 'prebufferDuration';
const SEND_KEYFRAME = 'sendKeyframe';
const AUDIO_CONFIGURATION = 'audioConfiguration';
const COMPATIBLE_AUDIO = 'MPEG-TS/MP4 Compatible';
const PCM_AUDIO = 'PCM Audio';
const OTHER_AUDIO = 'Other Audio (reencode)';
const REENCODE_AUDIO = 'reencodeAudio';
const REENCODE_VIDEO = 'reencodeVideo';
interface PrebufferStreamChunk {
chunk: StreamChunk;
@@ -30,7 +28,6 @@ class PrebufferMixin extends SettingsMixinDeviceBase<VideoCamera> implements Vid
prebuffers = {
mp4: [],
mpegts: [],
s16le: [],
};
events = new EventEmitter();
released = false;
@@ -39,7 +36,7 @@ class PrebufferMixin extends SettingsMixinDeviceBase<VideoCamera> implements Vid
session: FFMpegRebroadcastSession;
detectedIdrInterval = 0;
prevIdr = 0;
expectingPCM = false;
unexpectedPCM = false;
constructor(mixinDevice: VideoCamera & Settings, mixinDeviceInterfaces: ScryptedInterface[], mixinDeviceState: { [key: string]: any }, providerNativeId: string) {
super(mixinDevice, mixinDeviceState, {
@@ -73,16 +70,11 @@ class PrebufferMixin extends SettingsMixinDeviceBase<VideoCamera> implements Vid
value: (this.storage.getItem(SEND_KEYFRAME) !== 'false').toString(),
},
{
title: 'Audio Configuration',
description: 'Override the Audio Configuration for the rebroadcast stream.',
type: 'string',
key: AUDIO_CONFIGURATION,
value: this.storage.getItem(AUDIO_CONFIGURATION),
choices: [
COMPATIBLE_AUDIO,
PCM_AUDIO,
OTHER_AUDIO,
],
title: 'Reencode Audio',
description: 'Reencode the audio (necessary if camera outputs PCM).',
type: 'boolean',
key: REENCODE_AUDIO,
value: (this.storage.getItem(REENCODE_AUDIO) === 'true').toString(),
},
{
group: 'Media Information',
@@ -125,30 +117,20 @@ class PrebufferMixin extends SettingsMixinDeviceBase<VideoCamera> implements Vid
async startPrebufferSession() {
this.prebuffers.mp4 = [];
this.prebuffers.mpegts = [];
this.prebuffers.s16le = [];
const prebufferDurationMs = parseInt(this.storage.getItem(PREBUFFER_DURATION_MS)) || defaultPrebufferDuration;
const ffmpegInput = JSON.parse((await mediaManager.convertMediaObjectToBuffer(await this.mixinDevice.getVideoStream(), ScryptedMimeTypes.FFmpegInput)).toString()) as FFMpegInput;
const audioConfig = this.storage.getItem(AUDIO_CONFIGURATION);
const reencodeAudio = audioConfig === OTHER_AUDIO;
const reencodeAudio = this.storage.getItem(REENCODE_AUDIO) === 'true' || this.unexpectedPCM;
const probe = await probeVideoCamera(this.mixinDevice);
const probeAudioCodec = probe?.options?.[0].audio?.codec;
this.expectingPCM = probeAudioCodec && probeAudioCodec.indexOf('pcm') !== -1;
const pcmAudio = audioConfig === PCM_AUDIO || this.expectingPCM;
let acodec: string[];
if (probe.noAudio || pcmAudio) {
// no audio? explicitly disable it.
// no audio? explicitly disable it.
if (probe.noAudio) {
acodec = ['-an'];
}
else if (reencodeAudio) {
// setting no audio codec will allow ffmpeg to do an implicit conversion.
acodec = [];
}
else {
// NOTE: if there is no audio track, this will still work fine.
acodec = [
acodec = reencodeAudio ? [] : [
'-acodec',
'copy',
];
@@ -159,7 +141,7 @@ class PrebufferMixin extends SettingsMixinDeviceBase<VideoCamera> implements Vid
'copy',
];
const rbo: FFMpegRebroadcastOptions = {
const session = await startRebroadcastSession(ffmpegInput, {
console: this.console,
parsers: {
mp4: createFragmentedMp4Parser({
@@ -171,13 +153,7 @@ class PrebufferMixin extends SettingsMixinDeviceBase<VideoCamera> implements Vid
acodec,
}),
}
};
if (pcmAudio) {
rbo.parsers.s16le = createPCMParser();
}
const session = await startRebroadcastSession(ffmpegInput, rbo);
});
this.session = session;
@@ -186,11 +162,9 @@ class PrebufferMixin extends SettingsMixinDeviceBase<VideoCamera> implements Vid
}
else if (this.session.inputAudioCodec !== 'aac') {
this.console.error('Detected audio codec was not AAC.');
if (!probe.noAudio && session.inputAudioCodec && session.inputAudioCodec.indexOf('pcm') !== -1 && !pcmAudio) {
if (!probe.noAudio && session.inputAudioCodec && session.inputAudioCodec.indexOf('pcm') !== -1 && !reencodeAudio) {
log.a(`${this.name} is using PCM audio and will be reencoded. Enable Reencode Audio in Rebroadcast Settings to disable this alert.`);
this.expectingPCM = true;
// this will probably crash ffmpeg due to mp4/mpegts not being a valid container for pcm,
// and then it will automatically restart with pcm handling.
this.unexpectedPCM = true;
}
}
@@ -202,8 +176,7 @@ class PrebufferMixin extends SettingsMixinDeviceBase<VideoCamera> implements Vid
this.prebufferSession = undefined;
});
// s16le will be a no-op if there's no pcm, no harm.
for (const container of ['mpegts', 'mp4', 's16le']) {
for (const container of ['mpegts', 'mp4']) {
const eventName = container + '-data';
let prebufferContainer: PrebufferStreamChunk[] = this.prebuffers[container];
let shifts = 0;
@@ -270,69 +243,63 @@ class PrebufferMixin extends SettingsMixinDeviceBase<VideoCamera> implements Vid
this.console.log('prebuffer request started');
const createContainerServer = async (container: string) => {
const eventName = container + '-data';
const prebufferContainer: PrebufferStreamChunk[] = this.prebuffers[container];
const server = new Server(socket => {
server.close();
const requestedPrebuffer = options?.prebuffer || (sendKeyframe ? Math.max(4000, (this.detectedIdrInterval || 4000)) * 1.5 : 0);
const now = Date.now();
let cleanup: () => void;
let first = true;
const writeData = (data: StreamChunk) => {
if (first) {
first = false;
if (data.startStream) {
socket.write(data.startStream)
}
}
for (const chunk of data.chunks) {
socket.write(chunk);
}
};
for (const prebuffer of prebufferContainer) {
if (prebuffer.time < now - requestedPrebuffer)
continue;
writeData(prebuffer.chunk);
}
this.events.on(eventName, writeData);
cleanup = () => {
this.console.log('prebuffer request ended');
this.events.removeListener(eventName, writeData);
this.events.removeListener('killed', cleanup);
socket.removeAllListeners();
socket.destroy();
}
this.events.once('killed', cleanup);
socket.once('end', cleanup);
socket.once('close', cleanup);
socket.once('error', cleanup);
});
setTimeout(() => server.close(), 30000);
return listenZeroCluster(server);
}
const container = options?.container || 'mpegts';
const eventName = container + '-data';
const prebufferContainer: PrebufferStreamChunk[] = this.prebuffers[container];
const server = new Server(socket => {
server.close();
const requestedPrebuffer = options?.prebuffer || (sendKeyframe ? Math.max(4000, (this.detectedIdrInterval || 4000)) * 1.5 : 0);
const now = Date.now();
let cleanup: () => void;
let first = true;
const writeData = (data: StreamChunk) => {
if (first) {
first = false;
if (data.startStream) {
socket.write(data.startStream)
}
}
for (const chunk of data.chunks) {
socket.write(chunk);
}
};
for (const prebuffer of prebufferContainer) {
if (prebuffer.time < now - requestedPrebuffer)
continue;
writeData(prebuffer.chunk);
}
this.events.on(eventName, writeData);
cleanup = () => {
this.console.log('prebuffer request ended');
this.events.removeListener(eventName, writeData);
this.events.removeListener('killed', cleanup);
socket.removeAllListeners();
socket.destroy();
}
this.events.once('killed', cleanup);
socket.once('end', cleanup);
socket.once('close', cleanup);
socket.once('error', cleanup);
});
setTimeout(() => server.close(), 30000);
const port = await listenZeroCluster(server);
const mediaStreamOptions = session.ffmpegInputs[container].mediaStreamOptions
? Object.assign({}, session.ffmpegInputs[container].mediaStreamOptions)
: undefined;
const audioConfig = this.storage.getItem(AUDIO_CONFIGURATION);
const reencodeAudio = audioConfig === OTHER_AUDIO;
if (mediaStreamOptions && mediaStreamOptions.audio) {
const reencodeAudio = audioConfig === OTHER_AUDIO;
const reencodeAudio = this.storage.getItem(REENCODE_AUDIO) === 'true';
if (reencodeAudio)
mediaStreamOptions.audio = {
codec: 'aac',
@@ -349,19 +316,11 @@ class PrebufferMixin extends SettingsMixinDeviceBase<VideoCamera> implements Vid
const ffmpegInput: FFMpegInput = {
inputArguments: [
'-f', container,
'-i', `tcp://127.0.0.1:${await createContainerServer(container)}`,
'-i', `tcp://127.0.0.1:${port}`,
],
mediaStreamOptions,
}
const pcmAudio = audioConfig === PCM_AUDIO || this.expectingPCM;
if (pcmAudio) {
ffmpegInput.inputArguments.push(
'-f', 's16le',
'-i', `tcp://127.0.0.1:${await createContainerServer('s16le')}`,
)
}
this.console.log('prebuffer ffmpeg input', ffmpegInput.inputArguments[3]);
const mo = mediaManager.createFFmpegMediaObject(ffmpegInput);
return mo;