rebroadcast: watch for sudden worker exits putting device into bad state

This commit is contained in:
Koushik Dutta
2023-01-07 10:36:37 -08:00
parent 70f2cb68f4
commit 88c873c98e
3 changed files with 39 additions and 24 deletions

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/prebuffer-mixin",
"version": "0.9.61",
"version": "0.9.62",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/prebuffer-mixin",
"version": "0.9.61",
"version": "0.9.62",
"license": "Apache-2.0",
"dependencies": {
"@scrypted/common": "file:../../common",

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/prebuffer-mixin",
"version": "0.9.61",
"version": "0.9.62",
"description": "Video Stream Rebroadcast, Prebuffer, and Management Plugin for Scrypted.",
"author": "Scrypted",
"license": "Apache-2.0",

View File

@@ -1666,7 +1666,7 @@ export class RebroadcastPlugin extends AutoenableMixinProvider implements MixinP
}
});
currentMixins = new Map<string, {
terminate(): Promise<number>,
worker: Worker,
mixin: Promise<PrebufferMixin>,
}>();
@@ -1812,21 +1812,31 @@ export class RebroadcastPlugin extends AutoenableMixinProvider implements MixinP
if (fork && sdk.fork && typeof mixinDeviceState.id === 'string') {
const forked = sdk.fork<RebroadcastPluginFork>();
const { worker } = forked;
const { id } = mixinDeviceState;
const cleanupWorker = () => {
const found = this.currentMixins.get(id);
if (found.worker === worker) {
worker.terminate();
this.currentMixins.delete(id);
}
}
forked.worker.on('error', e => {
this.console.error('prebuffer worker error', e);
cleanupWorker();
});
forked.worker.on('exit', exitCode => {
if (exitCode)
this.console.error('prebuffer worker error non-zero result:', exitCode);
cleanupWorker();
});
const result = await forked.result as RebroadcastPluginFork;
const ret = result.newPrebufferMixin(async () => this.transcodeStorageSettings.values, mixinDevice, mixinDeviceInterfaces, mixinDeviceState);
// scrypted should call release on the mixin, but just in case...
const previous = this.currentMixins.get(mixinDeviceState.id);
previous?.mixin?.then(async mixinDevice => {
try {
await mixinDevice.release();
}
catch (e) {
}
previous.terminate();
});
previous?.terminate();
this.currentMixins.set(mixinDeviceState.id, {
terminate: () => forked.worker.terminate(),
const previous = this.currentMixins.get(id);
previous?.worker?.terminate();
this.currentMixins.set(id, {
worker,
mixin: ret,
});
return ret;
@@ -1835,7 +1845,7 @@ export class RebroadcastPlugin extends AutoenableMixinProvider implements MixinP
const ret = newPrebufferMixin(async () => this.transcodeStorageSettings.values, mixinDevice, mixinDeviceInterfaces, mixinDeviceState);
this.currentMixins.set(mixinDeviceState.id, {
mixin: Promise.resolve(ret),
terminate: undefined,
worker: undefined,
});
return ret;
}
@@ -1843,17 +1853,22 @@ export class RebroadcastPlugin extends AutoenableMixinProvider implements MixinP
async releaseMixin(id: string, mixinDevice: PrebufferMixin) {
try {
await mixinDevice.release();
const current = this.currentMixins.get(id);
const currentMixin = await current?.mixin;
// mixin may have changed during the await.
if (currentMixin === mixinDevice && this.currentMixins.get(id) === current) {
this.currentMixins.delete(id);
// TODO: 1/7/2023 remove this legacy code check, there will always be a worker
if (current.worker) {
current.worker?.terminate();
return;
}
}
}
catch (e) {
}
const current = this.currentMixins.get(id);
const currentMixin = await current?.mixin;
if (currentMixin === mixinDevice && this.currentMixins.get(id) === current) {
this.currentMixins.delete(id);
current?.terminate?.();
}
await mixinDevice.release();
}
}