From 2cd6864495460fe0c286f41e0a55ceaab5259b58 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Sun, 26 Dec 2021 19:03:12 -0800 Subject: [PATCH] rebroadcast: performance improvements --- common/src/stream-parser.ts | 21 +++++++++++++++- plugins/prebuffer-mixin/package-lock.json | 4 +-- plugins/prebuffer-mixin/package.json | 2 +- plugins/prebuffer-mixin/src/main.ts | 30 ++++++++++++++--------- 4 files changed, 42 insertions(+), 15 deletions(-) diff --git a/common/src/stream-parser.ts b/common/src/stream-parser.ts index e3c7dd8b8..86a660a58 100644 --- a/common/src/stream-parser.ts +++ b/common/src/stream-parser.ts @@ -90,6 +90,8 @@ export function createPCMParser(): StreamParser { } export function createMpegTsParser(options?: StreamParserOptions): StreamParser { + let pat: Buffer; + let pmt: Buffer; return { container: 'mpegts', outputArguments: [ @@ -101,6 +103,20 @@ export function createMpegTsParser(options?: StreamParserOptions): StreamParser if (concat[0] != 0x47) { throw new Error('Invalid sync byte in mpeg-ts packet. Terminating stream.') } + + if (pat && pmt) + return; + + const pid = ((concat[1] & 0x1F) << 8) | concat[2]; + if (pid === 0) { + const tableId = concat[5]; + if (tableId === 0) { + pat = concat.slice(0, 188); + } + else if (tableId === 2) { + pmt = concat.slice(0, 188); + } + } }), findSyncFrame(streamChunks): StreamChunk[] { for (let prebufferIndex = 0; prebufferIndex < streamChunks.length; prebufferIndex++) { @@ -118,12 +134,15 @@ export function createMpegTsParser(options?: StreamParserOptions): StreamParser if ((pkt[3] & 0x20) && (pkt[4] > 0)) { // have AF if (pkt[5] & 0x40) { + // we found the sync frame, but also need to send the pat and pmt + // which might be at the start of this chunk before the keyframe. + // yolo! return streamChunks.slice(prebufferIndex); // const chunks = streamChunk.chunks.slice(chunkIndex + 1); // const take = chunk.subarray(offset); // chunks.unshift(take); - // const remainingChunks = findSyncFrame(streamChunks.slice(prebufferIndex + 1)); + // const remainingChunks = streamChunks.slice(prebufferIndex + 1); // const ret = Object.assign({}, streamChunk); // ret.chunks = chunks; // return [ diff --git a/plugins/prebuffer-mixin/package-lock.json b/plugins/prebuffer-mixin/package-lock.json index e3b687283..e6f962439 100644 --- a/plugins/prebuffer-mixin/package-lock.json +++ b/plugins/prebuffer-mixin/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/prebuffer-mixin", - "version": "0.1.103", + "version": "0.1.106", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/prebuffer-mixin", - "version": "0.1.103", + "version": "0.1.106", "license": "Apache-2.0", "dependencies": { "@scrypted/common": "file:../../common", diff --git a/plugins/prebuffer-mixin/package.json b/plugins/prebuffer-mixin/package.json index 227f3fca6..7f0a055de 100644 --- a/plugins/prebuffer-mixin/package.json +++ b/plugins/prebuffer-mixin/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/prebuffer-mixin", - "version": "0.1.103", + "version": "0.1.106", "description": "Rebroadcast and Prebuffer for VideoCameras.", "author": "Scrypted", "license": "Apache-2.0", diff --git a/plugins/prebuffer-mixin/src/main.ts b/plugins/prebuffer-mixin/src/main.ts index a00c29561..dc909b402 100644 --- a/plugins/prebuffer-mixin/src/main.ts +++ b/plugins/prebuffer-mixin/src/main.ts @@ -152,6 +152,13 @@ class PrebufferSession { readonly: true, value: ((this.detectedIdrInterval || 0) / 1000).toString() || 'none', }, + { + group, + key: 'rebroadcastUrl', + title: 'Rebroadcast Url', + readonly: true, + value: this.parserSession?.ffmpegInputs?.mpegts.url, + } ); return settings; } @@ -228,7 +235,6 @@ class PrebufferSession { acodec, }), }, - parseOnly: true, }; // if pcm prebuffer is requested, create the the parser. don't do it if @@ -370,19 +376,19 @@ class PrebufferSession { this.events.on(eventName, safeWriteData); session.events.once('killed', cleanup); - for (const prebuffer of prebufferContainer) { - if (prebuffer.time < now - requestedPrebuffer) - continue; + // for (const prebuffer of prebufferContainer) { + // if (prebuffer.time < now - requestedPrebuffer) + // continue; - safeWriteData(prebuffer.chunk); - } + // safeWriteData(prebuffer.chunk); + // } // for some reason this doesn't work as well as simply guessing and dumping. - // const parser = this.parsers[container]; - // const availablePrebuffers = parser.findSyncFrame(prebufferContainer.filter(pb => pb.time >= now - requestedPrebuffer).map(pb => pb.chunk)); - // for (const prebuffer of availablePrebuffers) { - // safeWriteData(prebuffer); - // } + const parser = this.parsers[container]; + const availablePrebuffers = parser.findSyncFrame(prebufferContainer.filter(pb => pb.time >= now - requestedPrebuffer).map(pb => pb.chunk)); + for (const prebuffer of availablePrebuffers) { + safeWriteData(prebuffer); + } return cleanup; } }) @@ -428,6 +434,7 @@ class PrebufferSession { url, container, inputArguments: [ + '-analyzeduration', '0', '-probesize', '100000', '-f', container, '-i', url, ], @@ -436,6 +443,7 @@ class PrebufferSession { if (pcmAudio) { ffmpegInput.inputArguments.push( + '-analyzeduration', '0', '-probesize', '100000', '-f', 's16le', '-i', `tcp://127.0.0.1:${await createContainerServer('s16le')}`, )