rebroadcast: fix up messy worker creation/destruciton

This commit is contained in:
Koushik Dutta
2023-02-18 09:33:34 -08:00
parent c718f4da31
commit 269c418e40
4 changed files with 33 additions and 60 deletions

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/prebuffer-mixin",
"version": "0.9.71",
"version": "0.9.72",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/prebuffer-mixin",
"version": "0.9.71",
"version": "0.9.72",
"license": "Apache-2.0",
"dependencies": {
"@scrypted/common": "file:../../common",

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/prebuffer-mixin",
"version": "0.9.71",
"version": "0.9.72",
"description": "Video Stream Rebroadcast, Prebuffer, and Management Plugin for Scrypted.",
"author": "Scrypted",
"license": "Apache-2.0",

View File

@@ -8,6 +8,7 @@ import { readLength } from '@scrypted/common/src/read-stream';
import { createRtspParser, findH264NaluType, getNaluTypes, H264_NAL_TYPE_FU_B, H264_NAL_TYPE_IDR, H264_NAL_TYPE_MTAP16, H264_NAL_TYPE_MTAP32, H264_NAL_TYPE_RESERVED0, H264_NAL_TYPE_RESERVED30, H264_NAL_TYPE_RESERVED31, H264_NAL_TYPE_SEI, H264_NAL_TYPE_STAP_B, RtspServer, RtspTrack } from '@scrypted/common/src/rtsp-server';
import { addTrackControls, parseSdp } from '@scrypted/common/src/sdp-utils';
import { SettingsMixinDeviceBase, SettingsMixinDeviceOptions } from "@scrypted/common/src/settings-mixin";
import { sleep } from '@scrypted/common/src/sleep';
import { createFragmentedMp4Parser, createMpegTsParser, StreamChunk, StreamParser } from '@scrypted/common/src/stream-parser';
import sdk, { BufferConverter, DeviceProvider, DeviceState, EventListenerRegister, FFmpegInput, H264Info, MediaObject, MediaStreamDestination, MediaStreamOptions, MixinProvider, RequestMediaStreamOptions, ResponseMediaStreamOptions, ScryptedDevice, ScryptedDeviceType, ScryptedInterface, ScryptedMimeTypes, Setting, Settings, SettingValue, VideoCamera, VideoCameraConfiguration } from '@scrypted/sdk';
import { StorageSettings } from '@scrypted/sdk/storage-settings';
@@ -1688,9 +1689,9 @@ export class RebroadcastPlugin extends AutoenableMixinProvider implements MixinP
},
}
});
currentMixins = new Map<string, {
currentMixins = new Map<PrebufferMixin, {
worker: Worker,
mixin: Promise<PrebufferMixin>,
id: string,
}>();
constructor(nativeId?: string) {
@@ -1833,71 +1834,41 @@ export class RebroadcastPlugin extends AutoenableMixinProvider implements MixinP
catch (e) {
}
const { id } = mixinDeviceState;
if (fork && sdk.fork && typeof mixinDeviceState.id === 'string') {
const forked = sdk.fork<RebroadcastPluginFork>();
const { worker } = forked;
const { id } = mixinDeviceState;
const cleanupWorker = () => {
const found = this.currentMixins.get(id);
if (found?.worker === worker) {
worker.terminate();
this.currentMixins.delete(id);
}
}
forked.worker.on('error', e => {
this.console.error('prebuffer worker error', e);
cleanupWorker();
});
forked.worker.on('exit', exitCode => {
if (exitCode)
this.console.error('prebuffer worker error non-zero result:', exitCode);
else
this.console.log('prebuffer worker exited');
cleanupWorker();
});
const ret = forked.result.then(result => result.newPrebufferMixin(async () => this.transcodeStorageSettings.values, mixinDevice, mixinDeviceInterfaces, mixinDeviceState));
// scrypted should call release on the mixin, but just in case...
const previous = this.currentMixins.get(id);
if (previous?.worker) {
this.console.log('terminating previous worker');
previous.worker.terminate();
try {
const result = await forked.result;
const mixin = await result.newPrebufferMixin(async () => this.transcodeStorageSettings.values, mixinDevice, mixinDeviceInterfaces, mixinDeviceState);
this.currentMixins.set(mixin, {
worker,
id,
});
return mixin;
}
catch (e) {
throw e;
}
this.currentMixins.set(id, {
worker,
mixin: ret,
});
return ret;
}
else {
const ret = newPrebufferMixin(async () => this.transcodeStorageSettings.values, mixinDevice, mixinDeviceInterfaces, mixinDeviceState);
this.currentMixins.set(mixinDeviceState.id, {
mixin: Promise.resolve(ret),
const ret = await newPrebufferMixin(async () => this.transcodeStorageSettings.values, mixinDevice, mixinDeviceInterfaces, mixinDeviceState);
this.currentMixins.set(ret, {
worker: undefined,
id,
});
return ret;
}
}
async releaseMixin(id: string, mixinDevice: PrebufferMixin) {
try {
const current = this.currentMixins.get(id);
const currentMixin = await current?.mixin;
// mixin may have changed during the await.
if (currentMixin === mixinDevice && this.currentMixins.get(id) === current) {
this.currentMixins.delete(id);
// TODO: 1/7/2023 remove this legacy code check, there will always be a worker
if (current.worker) {
this.console.log('terminating worker for mixin release');
current.worker?.terminate();
return;
}
}
}
catch (e) {
}
await mixinDevice.release();
const worker = this.currentMixins.get(mixinDevice)?.worker;
this.currentMixins.delete(mixinDevice);
await mixinDevice.release().catch(() => { });
await sleep(1000);
worker?.terminate();
}
}

View File

@@ -18,7 +18,7 @@ export class TranscodeMixinProvider extends ScryptedDeviceBase implements MixinP
}
getSettings(): Promise<Setting[]> {
return this.plugin.transcodeStorageSettings.getSettings();
return this.plugin.transcodeStorageSettings.getSettings();
}
putSetting(key: string, value: SettingValue): Promise<void> {
@@ -34,9 +34,11 @@ export class TranscodeMixinProvider extends ScryptedDeviceBase implements MixinP
}
invalidateSettings(id: string) {
process.nextTick(async () =>{
const mixin = await this.plugin.currentMixins.get(id)?.mixin;
mixin?.onDeviceEvent(ScryptedInterface.Settings, undefined)
process.nextTick(async () => {
for (const [mixin, v] of this.plugin.currentMixins.entries()) {
if (v.id === id)
mixin?.onDeviceEvent(ScryptedInterface.Settings, undefined)
}
});
}