rebroadcast: improve cleanups

This commit is contained in:
Koushik Dutta
2025-04-11 21:38:43 -07:00
parent b02c17e185
commit dd3e7fe238
2 changed files with 25 additions and 29 deletions

View File

@@ -1,24 +1,27 @@
import net from 'net';
import tls from 'tls';
import { Deferred } from "@scrypted/common/src/deferred";
import { parseSdp } from "@scrypted/common/src/sdp-utils";
import { sleep } from "@scrypted/common/src/sleep";
import { StreamChunk } from "@scrypted/common/src/stream-parser";
import { AVFormatContext, createAVFormatContext } from '@scrypted/libav';
import { ResponseMediaStreamOptions } from "@scrypted/sdk";
import { EventEmitter } from "stream";
import { ParserSession, setupActivityTimer } from "./ffmpeg-session";
import { negotiateMediaStream } from "./rfc4571";
import { installLibavAddon } from "./libav-setup";
import { RTSP_FRAME_MAGIC } from "../../../common/src/rtsp-server";
import { sleep } from "@scrypted/common/src/sleep";
import { once } from 'events';
import net from 'net';
import { EventEmitter } from "stream";
import tls from 'tls';
import { RTSP_FRAME_MAGIC } from "../../../common/src/rtsp-server";
import { ParserSession, setupActivityTimer } from "./ffmpeg-session";
import { installLibavAddon } from "./libav-setup";
import { negotiateMediaStream } from "./rfc4571";
let installPromise: Promise<void>;
export async function startLibavSession(console: Console, url: string, mediaStreamOptions: ResponseMediaStreamOptions, options: {
useUdp: boolean,
audioSoftMuted: boolean,
activityTimeout: number,
}): Promise<ParserSession<"rtsp">> {
await installLibavAddon();
installPromise ||= installLibavAddon();
await installPromise;
const formatContext = createAVFormatContext();
try {
@@ -109,14 +112,14 @@ export async function startLibavSessionWrapped(formatContext: AVFormatContext, c
const { resetActivityTimer } = setupActivityTimer('rtsp', kill, events, options?.activityTimeout);
(async () => {
const indexToContext = new Map<number, {
rtp: AVFormatContext,
index: number,
}>();
const pipelines: {
streamIndex: number,
writeFormatContext: AVFormatContext,
}[] = [];
try {
await startDeferred.promise;
const pipelines = formatContext.streams.map(stream => {
formatContext.streams.forEach(stream => {
if (options.audioSoftMuted && stream.type === 'audio')
return;
if (stream.type !== 'video' && stream.type !== 'audio')
@@ -137,20 +140,16 @@ export async function startLibavSessionWrapped(formatContext: AVFormatContext, c
events.emit('rtsp', chunk);
});
const index = rtp.newStream({
rtp.newStream({
formatContext,
streamIndex: stream.index,
});
indexToContext.set(stream.index, {
rtp,
index,
});
return {
pipelines.push({
streamIndex: stream.index,
writeFormatContext: rtp,
}
}).filter(Boolean);
});
});
while (!killDeferred.finished) {
using result = await formatContext.receiveFrame(pipelines);
@@ -167,11 +166,8 @@ export async function startLibavSessionWrapped(formatContext: AVFormatContext, c
kill(new Error('rtsp read loop exited'));
await sleep(1000);
for (const context of indexToContext.values()) {
await context.rtp.close();
}
indexToContext.clear();
await Promise.allSettled(pipelines.map(pipeline => pipeline.writeFormatContext.close()));
await sleep(1000);
await formatContext.close();
}
})();

View File

@@ -16,13 +16,13 @@ import path from 'path';
import { Duplex } from 'stream';
import { ParserOptions, ParserSession, startParserSession } from './ffmpeg-session';
import { FileRtspServer } from './file-rtsp-server';
import { startLibavSession } from './libav-parser';
import { getUrlLocalAdresses } from './local-addresses';
import { REBROADCAST_MIXIN_INTERFACE_TOKEN } from './rebroadcast-mixin-token';
import { connectRFC4571Parser, startRFC4571Parser } from './rfc4571';
import { RtspSessionParserSpecific, startRtspSession } from './rtsp-session';
import { getSpsResolution } from './sps-resolution';
import { createStreamSettings } from './stream-settings';
import { startLibavSession } from './libav-parser';
const { mediaManager, log, systemManager, deviceManager } = sdk;