diff --git a/common/src/stream-parser.ts b/common/src/stream-parser.ts index 913c02d4c..aa6fa54c1 100644 --- a/common/src/stream-parser.ts +++ b/common/src/stream-parser.ts @@ -22,82 +22,47 @@ export interface StreamChunk { height?: number; } -// function checkTsPacket(pkt: Buffer) { -// const pid = ((pkt[1] & 0x1F) << 8) | pkt[2]; -// if (pid == 256) { -// // found video stream -// if ((pkt[3] & 0x20) && (pkt[4] > 0)) { -// // have AF -// if (pkt[5] & 0x40) { -// // found keyframe -// console.log('keyframe'); -// } -// } -// } -// } - -function createLengthParser(length: number, verify?: (concat: Buffer) => void) { - async function* parse(socket: Socket): AsyncGenerator { - let pending: Buffer[] = []; - let pendingSize = 0; - while (true) { - const data: Buffer = socket.read(); - if (!data) { - await once(socket, 'readable'); - continue; - } - pending.push(data); - pendingSize += data.length; - if (pendingSize < length) - continue; - - const concat = Buffer.concat(pending); - - verify?.(concat); - - const remaining = concat.length % length; - const left = concat.slice(0, concat.length - remaining); - const right = concat.slice(concat.length - remaining); - pending = [right]; - pendingSize = right.length; - - yield { - chunks: [left], - }; - } - } - - return parse; -} - -// -ac num channels? cameras are always mono? -export function createPCMParser(): StreamParser { - return { - container: 's16le', - outputArguments: [ - '-vn', - '-acodec', 'pcm_s16le', - '-f', 's16le', - ], - parse: createLengthParser(64), - } -} - export function createMpegTsParser(options?: StreamParserOptions): StreamParser { return { container: 'mpegts', outputArguments: [ + '-f', 'mpegts', ...(options?.vcodec || []), ...(options?.acodec || []), - '-f', 'mpegts', ], - parse: createLengthParser(188, concat => { - if (concat[0] != 0x47) { - throw new Error('Invalid sync byte in mpeg-ts packet. Terminating stream.') + async *parse(socket: Socket): AsyncGenerator { + let pending: Buffer[] = []; + let pendingSize = 0; + while (true) { + const data: Buffer = socket.read(); + if (!data) { + await once(socket, 'readable'); + continue; + } + pending.push(data); + pendingSize += data.length; + if (pendingSize < 188) + continue; + + const concat = Buffer.concat(pending); + + if (concat[0] != 0x47) { + throw new Error('Invalid sync byte in mpeg-ts packet. Terminating stream.') + } + + const remaining = concat.length % 188; + const left = concat.slice(0, concat.length - remaining); + const right = concat.slice(concat.length - remaining); + pending = [right]; + pendingSize = right.length; + yield { + chunks: [left], + }; } - }), + } } } + export interface MP4Atom { header: Buffer; length: number; @@ -125,10 +90,10 @@ export function createFragmentedMp4Parser(options?: StreamParserOptions): Stream return { container: 'mp4', outputArguments: [ + '-f', 'mp4', ...(options?.vcodec || []), ...(options?.acodec || []), '-movflags', 'frag_keyframe+empty_moov+default_base_moof', - '-f', 'mp4', ], async *parse(socket: Socket): AsyncGenerator { const parser = parseFragmentedMP4(socket); @@ -143,7 +108,7 @@ export function createFragmentedMp4Parser(options?: StreamParserOptions): Stream moov = atom; } - yield { + yield{ startStream, chunks: [atom.header, atom.data], type: atom.type, @@ -191,7 +156,7 @@ export function createRawVideoParser(options?: RawVideoParserOptions): StreamPar ], async *parse(socket: Socket, width: number, height: number): AsyncGenerator { if (!width || !height) - throw new Error("error parsing rawvideo, unknown width and height"); + throw new Error("error parsing rawvideo, unknown width and height"); width = size?.width || width; height = size?.height || height diff --git a/plugins/prebuffer-mixin/src/main.ts b/plugins/prebuffer-mixin/src/main.ts index 3d8a7cc01..5670c1ccb 100644 --- a/plugins/prebuffer-mixin/src/main.ts +++ b/plugins/prebuffer-mixin/src/main.ts @@ -5,9 +5,9 @@ import { Server } from 'net'; import { listenZeroCluster } from '@scrypted/common/src/listen-cluster'; import EventEmitter from 'events'; import { SettingsMixinDeviceBase } from "../../../common/src/settings-mixin"; -import { FFMpegRebroadcastOptions, FFMpegRebroadcastSession, startRebroadcastSession } from '@scrypted/common/src/ffmpeg-rebroadcast'; +import { FFMpegRebroadcastSession, startRebroadcastSession } from '@scrypted/common/src/ffmpeg-rebroadcast'; import { probeVideoCamera } from '@scrypted/common/src/media-helpers'; -import { createMpegTsParser, createFragmentedMp4Parser, MP4Atom, StreamChunk, createPCMParser } from '@scrypted/common/src/stream-parser'; +import { createMpegTsParser, createFragmentedMp4Parser, MP4Atom, StreamChunk } from '@scrypted/common/src/stream-parser'; import { AutoenableMixinProvider } from '@scrypted/common/src/autoenable-mixin-provider'; const { mediaManager, log, systemManager, deviceManager } = sdk; @@ -15,10 +15,8 @@ const { mediaManager, log, systemManager, deviceManager } = sdk; const defaultPrebufferDuration = 15000; const PREBUFFER_DURATION_MS = 'prebufferDuration'; const SEND_KEYFRAME = 'sendKeyframe'; -const AUDIO_CONFIGURATION = 'audioConfiguration'; -const COMPATIBLE_AUDIO = 'MPEG-TS/MP4 Compatible'; -const PCM_AUDIO = 'PCM Audio'; -const OTHER_AUDIO = 'Other Audio (reencode)'; +const REENCODE_AUDIO = 'reencodeAudio'; +const REENCODE_VIDEO = 'reencodeVideo'; interface PrebufferStreamChunk { chunk: StreamChunk; @@ -30,7 +28,6 @@ class PrebufferMixin extends SettingsMixinDeviceBase implements Vid prebuffers = { mp4: [], mpegts: [], - s16le: [], }; events = new EventEmitter(); released = false; @@ -39,7 +36,7 @@ class PrebufferMixin extends SettingsMixinDeviceBase implements Vid session: FFMpegRebroadcastSession; detectedIdrInterval = 0; prevIdr = 0; - expectingPCM = false; + unexpectedPCM = false; constructor(mixinDevice: VideoCamera & Settings, mixinDeviceInterfaces: ScryptedInterface[], mixinDeviceState: { [key: string]: any }, providerNativeId: string) { super(mixinDevice, mixinDeviceState, { @@ -73,16 +70,11 @@ class PrebufferMixin extends SettingsMixinDeviceBase implements Vid value: (this.storage.getItem(SEND_KEYFRAME) !== 'false').toString(), }, { - title: 'Audio Configuration', - description: 'Override the Audio Configuration for the rebroadcast stream.', - type: 'string', - key: AUDIO_CONFIGURATION, - value: this.storage.getItem(AUDIO_CONFIGURATION), - choices: [ - COMPATIBLE_AUDIO, - PCM_AUDIO, - OTHER_AUDIO, - ], + title: 'Reencode Audio', + description: 'Reencode the audio (necessary if camera outputs PCM).', + type: 'boolean', + key: REENCODE_AUDIO, + value: (this.storage.getItem(REENCODE_AUDIO) === 'true').toString(), }, { group: 'Media Information', @@ -125,30 +117,20 @@ class PrebufferMixin extends SettingsMixinDeviceBase implements Vid async startPrebufferSession() { this.prebuffers.mp4 = []; this.prebuffers.mpegts = []; - this.prebuffers.s16le = []; const prebufferDurationMs = parseInt(this.storage.getItem(PREBUFFER_DURATION_MS)) || defaultPrebufferDuration; const ffmpegInput = JSON.parse((await mediaManager.convertMediaObjectToBuffer(await this.mixinDevice.getVideoStream(), ScryptedMimeTypes.FFmpegInput)).toString()) as FFMpegInput; - const audioConfig = this.storage.getItem(AUDIO_CONFIGURATION); - const reencodeAudio = audioConfig === OTHER_AUDIO; + const reencodeAudio = this.storage.getItem(REENCODE_AUDIO) === 'true' || this.unexpectedPCM; const probe = await probeVideoCamera(this.mixinDevice); - const probeAudioCodec = probe?.options?.[0].audio?.codec; - this.expectingPCM = probeAudioCodec && probeAudioCodec.indexOf('pcm') !== -1; - const pcmAudio = audioConfig === PCM_AUDIO || this.expectingPCM; let acodec: string[]; - if (probe.noAudio || pcmAudio) { - // no audio? explicitly disable it. + // no audio? explicitly disable it. + if (probe.noAudio) { acodec = ['-an']; } - else if (reencodeAudio) { - // setting no audio codec will allow ffmpeg to do an implicit conversion. - acodec = []; - } else { - // NOTE: if there is no audio track, this will still work fine. - acodec = [ + acodec = reencodeAudio ? [] : [ '-acodec', 'copy', ]; @@ -159,7 +141,7 @@ class PrebufferMixin extends SettingsMixinDeviceBase implements Vid 'copy', ]; - const rbo: FFMpegRebroadcastOptions = { + const session = await startRebroadcastSession(ffmpegInput, { console: this.console, parsers: { mp4: createFragmentedMp4Parser({ @@ -171,13 +153,7 @@ class PrebufferMixin extends SettingsMixinDeviceBase implements Vid acodec, }), } - }; - - if (pcmAudio) { - rbo.parsers.s16le = createPCMParser(); - } - - const session = await startRebroadcastSession(ffmpegInput, rbo); + }); this.session = session; @@ -186,11 +162,9 @@ class PrebufferMixin extends SettingsMixinDeviceBase implements Vid } else if (this.session.inputAudioCodec !== 'aac') { this.console.error('Detected audio codec was not AAC.'); - if (!probe.noAudio && session.inputAudioCodec && session.inputAudioCodec.indexOf('pcm') !== -1 && !pcmAudio) { + if (!probe.noAudio && session.inputAudioCodec && session.inputAudioCodec.indexOf('pcm') !== -1 && !reencodeAudio) { log.a(`${this.name} is using PCM audio and will be reencoded. Enable Reencode Audio in Rebroadcast Settings to disable this alert.`); - this.expectingPCM = true; - // this will probably crash ffmpeg due to mp4/mpegts not being a valid container for pcm, - // and then it will automatically restart with pcm handling. + this.unexpectedPCM = true; } } @@ -202,8 +176,7 @@ class PrebufferMixin extends SettingsMixinDeviceBase implements Vid this.prebufferSession = undefined; }); - // s16le will be a no-op if there's no pcm, no harm. - for (const container of ['mpegts', 'mp4', 's16le']) { + for (const container of ['mpegts', 'mp4']) { const eventName = container + '-data'; let prebufferContainer: PrebufferStreamChunk[] = this.prebuffers[container]; let shifts = 0; @@ -270,69 +243,63 @@ class PrebufferMixin extends SettingsMixinDeviceBase implements Vid this.console.log('prebuffer request started'); - const createContainerServer = async (container: string) => { - const eventName = container + '-data'; - const prebufferContainer: PrebufferStreamChunk[] = this.prebuffers[container]; - - const server = new Server(socket => { - server.close(); - const requestedPrebuffer = options?.prebuffer || (sendKeyframe ? Math.max(4000, (this.detectedIdrInterval || 4000)) * 1.5 : 0); - - const now = Date.now(); - - let cleanup: () => void; - - let first = true; - const writeData = (data: StreamChunk) => { - if (first) { - first = false; - if (data.startStream) { - socket.write(data.startStream) - } - } - for (const chunk of data.chunks) { - socket.write(chunk); - } - }; - - for (const prebuffer of prebufferContainer) { - if (prebuffer.time < now - requestedPrebuffer) - continue; - - writeData(prebuffer.chunk); - } - - this.events.on(eventName, writeData); - cleanup = () => { - this.console.log('prebuffer request ended'); - this.events.removeListener(eventName, writeData); - this.events.removeListener('killed', cleanup); - socket.removeAllListeners(); - socket.destroy(); - } - - this.events.once('killed', cleanup); - socket.once('end', cleanup); - socket.once('close', cleanup); - socket.once('error', cleanup); - }); - - setTimeout(() => server.close(), 30000); - - return listenZeroCluster(server); - } - const container = options?.container || 'mpegts'; + const eventName = container + '-data'; + const prebufferContainer: PrebufferStreamChunk[] = this.prebuffers[container]; + + const server = new Server(socket => { + server.close(); + const requestedPrebuffer = options?.prebuffer || (sendKeyframe ? Math.max(4000, (this.detectedIdrInterval || 4000)) * 1.5 : 0); + + const now = Date.now(); + + let cleanup: () => void; + + let first = true; + const writeData = (data: StreamChunk) => { + if (first) { + first = false; + if (data.startStream) { + socket.write(data.startStream) + } + } + for (const chunk of data.chunks) { + socket.write(chunk); + } + }; + + for (const prebuffer of prebufferContainer) { + if (prebuffer.time < now - requestedPrebuffer) + continue; + + writeData(prebuffer.chunk); + } + + this.events.on(eventName, writeData); + cleanup = () => { + this.console.log('prebuffer request ended'); + this.events.removeListener(eventName, writeData); + this.events.removeListener('killed', cleanup); + socket.removeAllListeners(); + socket.destroy(); + } + + this.events.once('killed', cleanup); + socket.once('end', cleanup); + socket.once('close', cleanup); + socket.once('error', cleanup); + }); + + setTimeout(() => server.close(), 30000); + + const port = await listenZeroCluster(server); const mediaStreamOptions = session.ffmpegInputs[container].mediaStreamOptions ? Object.assign({}, session.ffmpegInputs[container].mediaStreamOptions) : undefined; - const audioConfig = this.storage.getItem(AUDIO_CONFIGURATION); - const reencodeAudio = audioConfig === OTHER_AUDIO; - if (mediaStreamOptions && mediaStreamOptions.audio) { - const reencodeAudio = audioConfig === OTHER_AUDIO; + const reencodeAudio = this.storage.getItem(REENCODE_AUDIO) === 'true'; if (reencodeAudio) mediaStreamOptions.audio = { codec: 'aac', @@ -349,19 +316,11 @@ class PrebufferMixin extends SettingsMixinDeviceBase implements Vid const ffmpegInput: FFMpegInput = { inputArguments: [ '-f', container, - '-i', `tcp://127.0.0.1:${await createContainerServer(container)}`, + '-i', `tcp://127.0.0.1:${port}`, ], mediaStreamOptions, } - const pcmAudio = audioConfig === PCM_AUDIO || this.expectingPCM; - if (pcmAudio) { - ffmpegInput.inputArguments.push( - '-f', 's16le', - '-i', `tcp://127.0.0.1:${await createContainerServer('s16le')}`, - ) - } - this.console.log('prebuffer ffmpeg input', ffmpegInput.inputArguments[3]); const mo = mediaManager.createFFmpegMediaObject(ffmpegInput); return mo;