diff --git a/common/src/stream-parser.ts b/common/src/stream-parser.ts index 24c7bd2f3..d4ec38db1 100644 --- a/common/src/stream-parser.ts +++ b/common/src/stream-parser.ts @@ -2,7 +2,6 @@ import { Socket as DatagramSocket } from "dgram"; import { once } from "events"; import { Duplex } from "stream"; import { FFMPEG_FRAGMENTED_MP4_OUTPUT_ARGS, MP4Atom, parseFragmentedMP4 } from "./ffmpeg-mp4-parser-session"; -import { readLength } from "./read-stream"; export interface StreamParser { container: string; @@ -25,59 +24,11 @@ export interface StreamParserOptions { export interface StreamChunk { startStream?: Buffer; chunks: Buffer[]; - type?: string; + type: string; width?: number; height?: number; } -// function checkTsPacket(pkt: Buffer) { -// const pid = ((pkt[1] & 0x1F) << 8) | pkt[2]; -// if (pid == 256) { -// // found video stream -// if ((pkt[3] & 0x20) && (pkt[4] > 0)) { -// // have AF -// if (pkt[5] & 0x40) { -// // found keyframe -// console.log('keyframe'); -// } -// } -// } -// } - -function createLengthParser(length: number, verify?: (concat: Buffer) => void) { - async function* parse(socket: Duplex): AsyncGenerator { - let pending: Buffer[] = []; - let pendingSize = 0; - while (true) { - const data: Buffer = socket.read(); - if (!data) { - await once(socket, 'readable'); - continue; - } - pending.push(data); - pendingSize += data.length; - if (pendingSize < length) - continue; - - const concat = Buffer.concat(pending); - - verify?.(concat); - - const remaining = concat.length % length; - const left = concat.slice(0, concat.length - remaining); - const right = concat.slice(concat.length - remaining); - pending = [right]; - pendingSize = right.length; - - yield { - chunks: [left], - }; - } - } - - return parse; -} - export function createDgramParser() { async function* parse(socket: DatagramSocket, width: number, height: number, type: string) { while (true) { @@ -91,65 +42,6 @@ export function createDgramParser() { return parse; } -export function createMpegTsParser(options?: StreamParserOptions): StreamParser { - return { - container: 'mpegts', - outputArguments: [ - ...(options?.vcodec || []), - ...(options?.acodec || []), - '-f', 'mpegts', - ], - parse: createLengthParser(188, concat => { - if (concat[0] != 0x47) { - throw new Error('Invalid sync byte in mpeg-ts packet. Terminating stream.') - } - }), - findSyncFrame(streamChunks): StreamChunk[] { - for (let prebufferIndex = 0; prebufferIndex < streamChunks.length; prebufferIndex++) { - const streamChunk = streamChunks[prebufferIndex]; - - for (let chunkIndex = 0; chunkIndex < streamChunk.chunks.length; chunkIndex++) { - const chunk = streamChunk.chunks[chunkIndex]; - - let offset = 0; - while (offset + 188 < chunk.length) { - const pkt = chunk.subarray(offset, offset + 188); - const pid = ((pkt[1] & 0x1F) << 8) | pkt[2]; - if (pid == 256) { - // found video stream - if ((pkt[3] & 0x20) && (pkt[4] > 0)) { - // have AF - if (pkt[5] & 0x40) { - // we found the sync frame, but also need to send the pat and pmt - // which might be at the start of this chunk before the keyframe. - // yolo! - return streamChunks.slice(prebufferIndex); - // const chunks = streamChunk.chunks.slice(chunkIndex + 1); - // const take = chunk.subarray(offset); - // chunks.unshift(take); - - // const remainingChunks = streamChunks.slice(prebufferIndex + 1); - // const ret = Object.assign({}, streamChunk); - // ret.chunks = chunks; - // return [ - // ret, - // ...remainingChunks - // ]; - } - } - } - - offset += 188; - } - - } - } - - return findSyncFrame(streamChunks); - } - } -} - export async function* parseMp4StreamChunks(parser: AsyncGenerator) { let ftyp: MP4Atom; let moov: MP4Atom; @@ -213,54 +105,3 @@ export const PIXEL_FORMAT_RGB24: RawVideoPixelFormat = { name: 'rgb24', computeLength: (width, height) => width * height * 3, } - -export function createRawVideoParser(options: RawVideoParserOptions): StreamParser { - const pixelFormat = options?.pixelFormat || PIXEL_FORMAT_YUV420P; - let filter: string; - const { size, everyNFrames } = options; - if (size) { - filter = `scale=${size.width}:${size.height}`; - } - if (everyNFrames && everyNFrames > 1) { - if (filter) - filter += ','; - else - filter = ''; - filter = filter + `select=not(mod(n\\,${everyNFrames}))` - } - - const inputArguments: string[] = []; - if (options.size) - inputArguments.push('-s', `${options.size.width}x${options.size.height}`); - - inputArguments.push('-pix_fmt', pixelFormat.name); - return { - inputArguments, - container: 'rawvideo', - outputArguments: [ - '-s', `${options.size.width}x${options.size.height}`, - '-an', - '-vcodec', 'rawvideo', - '-pix_fmt', pixelFormat.name, - '-f', 'rawvideo', - ], - async *parse(socket: Duplex, width: number, height: number): AsyncGenerator { - width = size?.width || width; - height = size?.height || height - - if (!width || !height) - throw new Error("error parsing rawvideo, unknown width and height"); - - const toRead = pixelFormat.computeLength(width, height); - while (true) { - const buffer = await readLength(socket, toRead); - yield { - chunks: [buffer], - width, - height, - } - } - }, - findSyncFrame, - } -} \ No newline at end of file diff --git a/plugins/prebuffer-mixin/package-lock.json b/plugins/prebuffer-mixin/package-lock.json index 629f4f44e..9f909a2f4 100644 --- a/plugins/prebuffer-mixin/package-lock.json +++ b/plugins/prebuffer-mixin/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/prebuffer-mixin", - "version": "0.10.60", + "version": "0.10.61", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/prebuffer-mixin", - "version": "0.10.60", + "version": "0.10.61", "license": "Apache-2.0", "dependencies": { "@scrypted/common": "file:../../common", diff --git a/plugins/prebuffer-mixin/package.json b/plugins/prebuffer-mixin/package.json index bc9671ddd..93e44b025 100644 --- a/plugins/prebuffer-mixin/package.json +++ b/plugins/prebuffer-mixin/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/prebuffer-mixin", - "version": "0.10.60", + "version": "0.10.61", "description": "Video Stream Rebroadcast, Prebuffer, and Management Plugin for Scrypted.", "author": "Scrypted", "license": "Apache-2.0", diff --git a/plugins/prebuffer-mixin/src/main.ts b/plugins/prebuffer-mixin/src/main.ts index c570d6ba6..d097a59f5 100644 --- a/plugins/prebuffer-mixin/src/main.ts +++ b/plugins/prebuffer-mixin/src/main.ts @@ -1,12 +1,12 @@ import { AutoenableMixinProvider } from '@scrypted/common/src/autoenable-mixin-provider'; import { ListenZeroSingleClientTimeoutError, closeQuiet, listenZeroSingleClient } from '@scrypted/common/src/listen-cluster'; import { readLength } from '@scrypted/common/src/read-stream'; -import { H264_NAL_TYPE_FU_B, H264_NAL_TYPE_IDR, H264_NAL_TYPE_MTAP16, H264_NAL_TYPE_MTAP32, H264_NAL_TYPE_RESERVED0, H264_NAL_TYPE_RESERVED30, H264_NAL_TYPE_RESERVED31, H264_NAL_TYPE_SEI, H264_NAL_TYPE_SPS, H264_NAL_TYPE_STAP_B, RtspServer, RtspTrack, createRtspParser, findH264NaluType, getStartedH264NaluTypes, listenSingleRtspClient } from '@scrypted/common/src/rtsp-server'; +import { H264_NAL_TYPE_IDR, H264_NAL_TYPE_SPS, RtspServer, RtspTrack, createRtspParser, findH264NaluType, listenSingleRtspClient } from '@scrypted/common/src/rtsp-server'; import { addTrackControls, getSpsPps, parseSdp } from '@scrypted/common/src/sdp-utils'; import { SettingsMixinDeviceBase, SettingsMixinDeviceOptions } from "@scrypted/common/src/settings-mixin"; import { sleep } from '@scrypted/common/src/sleep'; import { StreamChunk, StreamParser } from '@scrypted/common/src/stream-parser'; -import sdk, { BufferConverter, ChargeState, EventListenerRegister, FFmpegInput, ForkWorker, H264Info, MediaObject, MediaStreamDestination, MediaStreamOptions, MixinProvider, RequestMediaStreamOptions, ResponseMediaStreamOptions, ScryptedDevice, ScryptedDeviceType, ScryptedInterface, ScryptedMimeTypes, Setting, SettingValue, Settings, VideoCamera, VideoCameraConfiguration, WritableDeviceState } from '@scrypted/sdk'; +import sdk, { BufferConverter, ChargeState, EventListenerRegister, FFmpegInput, ForkWorker, MediaObject, MediaStreamDestination, MediaStreamOptions, MixinProvider, RequestMediaStreamOptions, ResponseMediaStreamOptions, ScryptedDevice, ScryptedDeviceType, ScryptedInterface, ScryptedMimeTypes, Setting, SettingValue, Settings, VideoCamera, VideoCameraConfiguration, WritableDeviceState } from '@scrypted/sdk'; import { StorageSettings } from '@scrypted/sdk/storage-settings'; import crypto from 'crypto'; import { once } from 'events'; @@ -38,18 +38,6 @@ interface PrebufferStreamChunk extends StreamChunk { time?: number; } -function hasOddities(h264Info: H264Info) { - const h264Oddities = h264Info.fuab - || h264Info.mtap16 - || h264Info.mtap32 - || h264Info.sei - || h264Info.stapb - || h264Info.reserved0 - || h264Info.reserved30 - || h264Info.reserved31; - return h264Oddities; -} - type PrebufferParsers = 'rtsp'; class PrebufferSession { @@ -72,7 +60,6 @@ class PrebufferSession { ffmpegInputArgumentsKey: string; ffmpegOutputArgumentsKey: string; lastDetectedAudioCodecKey: string; - lastH264ProbeKey: string; rtspParserKey: string; rtspServerPath: string; rtspServerMutedPath: string; @@ -88,7 +75,6 @@ class PrebufferSession { this.ffmpegInputArgumentsKey = 'ffmpegInputArguments-' + this.streamId; this.ffmpegOutputArgumentsKey = 'ffmpegOutputArguments-' + this.streamId; this.lastDetectedAudioCodecKey = 'lastDetectedAudioCodec-' + this.streamId; - this.lastH264ProbeKey = 'lastH264Probe-' + this.streamId; this.rtspParserKey = 'rtspParser-' + this.streamId; const rtspServerPathKey = 'rtspServerPathKey-' + this.streamId; const rtspServerMutedPathKey = 'rtspServerMutedPathKey-' + this.streamId; @@ -112,24 +98,6 @@ class PrebufferSession { return !this.enabled || this.shouldDisableBatteryPrebuffer(); } - getLastH264Probe(): H264Info { - const str = this.storage.getItem(this.lastH264ProbeKey); - if (!str) { - return {}; - } - - try { - return JSON.parse(str); - } - catch (e) { - return {}; - } - } - - getLastH264Oddities() { - return hasOddities(this.getLastH264Probe()); - } - getDetectedIdrInterval() { const durations: number[] = []; if (this.rtspPrebuffer.length) { @@ -403,20 +371,6 @@ class PrebufferSession { addFFmpegInputSettings(); } - const addOddities = () => { - settings.push( - { - key: 'detectedOddities', - group, - subgroup, - title: 'Detected H264 Oddities', - readonly: true, - value: JSON.stringify(this.getLastH264Probe()), - description: 'Cameras with oddities in the H264 video stream may not function correctly with Scrypted RTSP Parsers or Senders.', - } - ) - }; - if (session) { const codecInfo = await this.parseCodecs(); const resolution = codecInfo.inputVideoResolution?.width && codecInfo.inputVideoResolution?.height @@ -453,7 +407,6 @@ class PrebufferSession { value: (idrInterval || 0) / 1000 || 'unknown', }, ); - addOddities(); } else { settings.push( @@ -467,7 +420,6 @@ class PrebufferSession { readonly: true, }, ); - addOddities(); } settings.push({ @@ -544,8 +496,6 @@ class PrebufferSession { this.storage.removeItem(this.lastDetectedAudioCodecKey); this.usingScryptedParser = false; - const h264Oddities = this.getLastH264Oddities(); - if (isRfc4571) { this.usingScryptedParser = true; this.console.log('bypassing ffmpeg: using scrypted rfc4571 parser') @@ -635,47 +585,6 @@ class PrebufferSession { console.error('rebroadcast error', e) }); - if (this.usingScryptedParser && !isRfc4571) { - // watch the stream for 10 seconds to see if an weird nalu is encountered. - // if one is found and using scrypted parser as default, will need to restart rebroadcast to prevent - // downstream issues. - const h264Probe: H264Info = {}; - let reportedOddity = false; - const oddityProbe = (chunk: StreamChunk) => { - if (chunk.type !== 'h264') - return; - - const types = getStartedH264NaluTypes(chunk); - h264Probe.fuab ||= types.has(H264_NAL_TYPE_FU_B); - h264Probe.stapb ||= types.has(H264_NAL_TYPE_STAP_B); - h264Probe.mtap16 ||= types.has(H264_NAL_TYPE_MTAP16); - h264Probe.mtap32 ||= types.has(H264_NAL_TYPE_MTAP32); - h264Probe.sei ||= types.has(H264_NAL_TYPE_SEI); - h264Probe.reserved0 ||= types.has(H264_NAL_TYPE_RESERVED0); - h264Probe.reserved30 ||= types.has(H264_NAL_TYPE_RESERVED30); - h264Probe.reserved31 ||= types.has(H264_NAL_TYPE_RESERVED31); - const oddity = hasOddities(h264Probe); - if (oddity && !reportedOddity) { - reportedOddity = true; - let { isDefault } = this.getParser(sessionMso); - this.console.warn('H264 oddity detected.'); - if (!isDefault) { - this.console.warn('If there are issues streaming, consider using the Default parser.'); - return; - } - - // this.console.warn('Oddity in non prebuffered stream. Next restart will use FFmpeg instead.'); - } - } - const removeOddityProbe = () => session.removeListener('rtsp', oddityProbe); - session.killed.finally(() => clearTimeout(oddityTimeout)); - session.on('rtsp', oddityProbe); - const oddityTimeout = setTimeout(() => { - removeOddityProbe(); - this.storage.setItem(this.lastH264ProbeKey, JSON.stringify(h264Probe)); - }, h264Oddities ? 60000 : 10000); - } - await session.sdp; this.parserSession = session; session.killed.finally(() => { @@ -927,8 +836,7 @@ class PrebufferSession { // if starting on a sync frame, ffmpeg will skip the first segment while initializing // on live sources like rtsp. the buffer before the sync frame stream will be enough // for ffmpeg to analyze and start up in time for the sync frame. - // If h264 oddities are detected, assume ffmpeg will be used. - if (!options.findSyncFrame || this.getLastH264Oddities()) { + if (!options.findSyncFrame) { for (const chunk of prebufferContainer) { if (chunk.time < now - requestedPrebuffer) continue; @@ -978,10 +886,6 @@ class PrebufferSession { const codecInfo = await this.parseCodecs(true); const mediaStreamOptions: ResponseMediaStreamOptions = session.negotiateMediaStream(options, codecInfo.inputVideoCodec, codecInfo.inputAudioCodec); let sdp = await this.sdp; - if (!mediaStreamOptions.video?.h264Info && this.usingScryptedParser) { - mediaStreamOptions.video ||= {}; - mediaStreamOptions.video.h264Info = this.getLastH264Probe(); - } if (this.mixin.streamSettings.storageSettings.values.noAudio) mediaStreamOptions.audio = null; @@ -1025,6 +929,7 @@ class PrebufferSession { header.writeUInt8(channel, 1); chunks[0] = header; chunk = { + type: chunk.type, startStream: chunk.startStream, chunks, } @@ -1264,8 +1169,14 @@ class PrebufferMixin extends SettingsMixinDeviceBase implements Vid requestedPrebuffer, filter: (chunk, prebuffer) => { const track = map.get(chunk.type); - if (track) + if (track) { server.sendTrack(track, chunk.chunks[1], false); + const buffered = server.client.writableLength; + if (buffered > 100000000) { + this.console.log('more than 100MB has been buffered to RTSP Client, did downstream die? killing connection.'); + client.destroy(); + } + } return undefined; } }); @@ -1542,10 +1453,6 @@ class PrebufferMixin extends SettingsMixinDeviceBase implements Vid const session = this.sessions.get(mso.id); if (session?.parserSession || enabledStreams.includes(mso)) mso.prebuffer = prebufferDurationMs; - if (session && !mso.video?.h264Info) { - mso.video ||= {}; - mso.video.h264Info = session.getLastH264Probe(); - } if (!mso.destinations) { mso.destinations = []; for (const [k, v] of map.entries()) {