From 1c72d53eb57add900bb23157c4976b9940ebcee2 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Tue, 25 Oct 2022 09:41:21 -0700 Subject: [PATCH] pam-diff: watch for ffmpeg death --- plugins/pam-diff/package-lock.json | 4 +- plugins/pam-diff/package.json | 2 +- plugins/pam-diff/src/main.ts | 76 ++++++++++++++++++++---------- 3 files changed, 54 insertions(+), 28 deletions(-) diff --git a/plugins/pam-diff/package-lock.json b/plugins/pam-diff/package-lock.json index aecde41c4..aa3f487e4 100644 --- a/plugins/pam-diff/package-lock.json +++ b/plugins/pam-diff/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/pam-diff", - "version": "0.0.13", + "version": "0.0.14", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/pam-diff", - "version": "0.0.13", + "version": "0.0.14", "hasInstallScript": true, "dependencies": { "@types/node": "^16.6.1", diff --git a/plugins/pam-diff/package.json b/plugins/pam-diff/package.json index 86563cf7b..be23de93a 100644 --- a/plugins/pam-diff/package.json +++ b/plugins/pam-diff/package.json @@ -43,5 +43,5 @@ "devDependencies": { "@scrypted/sdk": "file:../../sdk" }, - "version": "0.0.13" + "version": "0.0.14" } diff --git a/plugins/pam-diff/src/main.ts b/plugins/pam-diff/src/main.ts index 15ba0743e..488c9976f 100644 --- a/plugins/pam-diff/src/main.ts +++ b/plugins/pam-diff/src/main.ts @@ -1,6 +1,6 @@ import { FFmpegInput, MediaObject, ObjectDetection, ObjectDetectionCallbacks, ObjectDetectionModel, ObjectDetectionSession, ObjectsDetected, ScryptedDeviceBase, ScryptedInterface, ScryptedMimeTypes } from '@scrypted/sdk'; import sdk from '@scrypted/sdk'; -import { ffmpegLogInitialOutput, safePrintFFmpegArguments } from "../../../common/src/media-helpers"; +import { ffmpegLogInitialOutput, safeKillFFmpeg, safePrintFFmpegArguments } from "../../../common/src/media-helpers"; import child_process, { ChildProcess } from 'child_process'; @@ -17,12 +17,16 @@ interface PamDiffSession { timeout?: NodeJS.Timeout; cp?: ChildProcess; pamDiff?: any; + callbacks: ObjectDetectionCallbacks; } class PamDiff extends ScryptedDeviceBase implements ObjectDetection { sessions = new Map(); - endSession(pds: PamDiffSession, callbacks: ObjectDetectionCallbacks) { + endSession(id: string) { + const pds = this.sessions.get(id); + if (!pds) + return; this.sessions.delete(pds.id); const event: ObjectsDetected = { timestamp: Date.now(), @@ -30,18 +34,21 @@ class PamDiff extends ScryptedDeviceBase implements ObjectDetection { detectionId: pds.id, } clearTimeout(pds.timeout); - pds.cp.kill('SIGKILL'); - if (callbacks) { - callbacks.onDetectionEnded(event); + safeKillFFmpeg(pds.cp); + if (pds.callbacks) { + pds.callbacks.onDetectionEnded(event); } else { this.onDeviceEvent(ScryptedInterface.ObjectDetection, event); } } - reschedule(pds: PamDiffSession, duration: number, callbacks: ObjectDetectionCallbacks) { + reschedule(id: string, duration: number,) { + const pds = this.sessions.get(id); + if (!pds) + return; clearTimeout(pds.timeout); - pds.timeout = setTimeout(() => this.endSession(pds, callbacks), duration); + pds.timeout = setTimeout(() => this.endSession(id), duration); } async detectObjects(mediaObject: MediaObject, session?: ObjectDetectionSession, callbacks?: ObjectDetectionCallbacks): Promise { @@ -50,17 +57,11 @@ class PamDiff extends ScryptedDeviceBase implements ObjectDetection { let { detectionId } = session; let pds = this.sessions.get(detectionId); - if (!session?.duration) { - if (pds) - this.endSession(pds, callbacks); - return { - detectionId, - running: false, - timestamp: Date.now(), - } - } + if (pds) + pds.callbacks = callbacks; - if (!mediaObject) { + if (!session?.duration) { + this.endSession(detectionId); return { detectionId, running: false, @@ -69,7 +70,7 @@ class PamDiff extends ScryptedDeviceBase implements ObjectDetection { } if (pds) { - this.reschedule(pds, session.duration, callbacks); + this.reschedule(detectionId, session.duration); pds.pamDiff.setDifference(session.settings?.difference || defaultDifference).setPercent(session.settings?.percent || defaultPercentage); return { detectionId, @@ -78,6 +79,16 @@ class PamDiff extends ScryptedDeviceBase implements ObjectDetection { }; } + // unable to start/extend this session. + if (!mediaObject) { + this.endSession(detectionId); + return { + detectionId, + running: false, + timestamp: Date.now(), + } + } + const ffmpeg = await mediaManager.getFFmpegPath(); const ffmpegInput: FFmpegInput = JSON.parse((await mediaManager.convertMediaObjectToBuffer( mediaObject, @@ -86,8 +97,9 @@ class PamDiff extends ScryptedDeviceBase implements ObjectDetection { pds = { id: detectionId, + callbacks, } - this.reschedule(pds, session.duration, callbacks); + this.reschedule(detectionId, session.duration); const args = ffmpegInput.inputArguments.slice(); args.unshift( @@ -114,11 +126,10 @@ class PamDiff extends ScryptedDeviceBase implements ObjectDetection { response: 'percent', }); - // eslint-disable-next-line no-unused-vars - pamDiff.on('diff', async (data) => { + pamDiff.on('diff', async (data: any) => { const event: ObjectsDetected = { timestamp: Date.now(), - running: false, + running: true, detectionId: pds.id, detections: [ { @@ -127,15 +138,14 @@ class PamDiff extends ScryptedDeviceBase implements ObjectDetection { } ] } - if (callbacks) { - callbacks.onDetection(event); + if (pds.callbacks) { + pds.callbacks.onDetection(event); } else { this.onDeviceEvent(ScryptedInterface.ObjectDetection, event); } }); - const console = sdk.deviceManager.getMixinConsole(mediaObject.sourceId, this.nativeId); pds.pamDiff = pamDiff; @@ -144,7 +154,23 @@ class PamDiff extends ScryptedDeviceBase implements ObjectDetection { pds.cp = child_process.spawn(ffmpeg, args, { stdio:[ 'inherit', 'pipe', 'pipe', 'pipe'] }); + let pamTimeout: NodeJS.Timeout; + const resetTimeout = () => { + clearTimeout(pamTimeout); + pamTimeout = setTimeout(() => { + const check = this.sessions.get(detectionId); + if (check !== pds) + return; + console.error('PAM image stream timed out. Ending session.'); + this.endSession(detectionId); + }, 60000); + } + p2p.on('data', () => { + resetTimeout(); + }) + resetTimeout(); pds.cp.stdio[3].pipe(p2p as any).pipe(pamDiff as any); + pds.cp.on('exit', () => this.endSession(detectionId)); ffmpegLogInitialOutput(console, pds.cp); this.sessions.set(detectionId, pds);