From eae580708ba2b090b1b6da8889171616ded60d4e Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Thu, 10 Mar 2022 09:47:47 -0800 Subject: [PATCH] ring: stream import options, direct sdp usage --- common/src/listen-cluster.ts | 12 +++- common/src/sdp-utils.ts | 8 +++ plugins/ring/package-lock.json | 4 +- plugins/ring/package.json | 2 +- plugins/ring/src/main.ts | 108 +++++++++++++++++++++++++-------- 5 files changed, 106 insertions(+), 28 deletions(-) diff --git a/common/src/listen-cluster.ts b/common/src/listen-cluster.ts index 89bfd19ff..58e2ea356 100644 --- a/common/src/listen-cluster.ts +++ b/common/src/listen-cluster.ts @@ -1,4 +1,4 @@ -import net, { AddressInfo } from 'net'; +import net from 'net'; import { once } from 'events'; import dgram from 'dgram'; @@ -8,6 +8,16 @@ export async function listenZero(server: net.Server) { return (server.address() as net.AddressInfo).port; } +export function closeQuiet(socket: dgram.Socket) { + if (!socket) + return; + try { + socket.close() + } + catch (e) { + } +} + export async function bindUdp(server: dgram.Socket, usePort: number) { server.bind(usePort); await once(server, 'listening'); diff --git a/common/src/sdp-utils.ts b/common/src/sdp-utils.ts index 53f965b23..e8fc3b50b 100644 --- a/common/src/sdp-utils.ts +++ b/common/src/sdp-utils.ts @@ -1,3 +1,11 @@ +export function replacePorts(sdp: string, audioPort: number, videoPort: number) { + let outputSdp = sdp + .replace(/c=IN .*/, `c=IN IP4 127.0.0.1`) + .replace(/m=audio \d+/, `m=audio ${audioPort}`) + .replace(/m=video \d+/, `m=video ${videoPort}`); + return outputSdp; +} + export function addTrackControls(sdp: string) { let lines = sdp.split('\n').map(line => line.trim()); lines = lines.filter(line => !line.includes('a=control:')); diff --git a/plugins/ring/package-lock.json b/plugins/ring/package-lock.json index 7526d9d53..fe9eacddf 100644 --- a/plugins/ring/package-lock.json +++ b/plugins/ring/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/ring", - "version": "0.0.60", + "version": "0.0.65", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/ring", - "version": "0.0.60", + "version": "0.0.65", "dependencies": { "@koush/ring-client-api": "file:../../external/ring-client-api", "@types/node": "^16.6.1", diff --git a/plugins/ring/package.json b/plugins/ring/package.json index 43a74d7f8..6783fffa4 100644 --- a/plugins/ring/package.json +++ b/plugins/ring/package.json @@ -37,5 +37,5 @@ "@scrypted/sdk": "file:../../sdk", "typescript": "^4.6.2" }, - "version": "0.0.60" + "version": "0.0.65" } diff --git a/plugins/ring/src/main.ts b/plugins/ring/src/main.ts index b0e832260..954d423d5 100644 --- a/plugins/ring/src/main.ts +++ b/plugins/ring/src/main.ts @@ -3,11 +3,19 @@ import { SipSession, isStunMessage, LiveCallNegotiation, clientApi, generateUuid import { StorageSettings } from '@scrypted/common/src/settings'; import { startRTCSignalingSession } from '@scrypted/common/src/rtc-signaling'; import { RefreshPromise } from "@scrypted/common/src/promise-utils" -import { ChildProcess } from 'child_process'; -import { createBindZero, listenZeroSingleClient } from '@scrypted/common/src/listen-cluster'; +import { closeQuiet, createBindZero, listenZeroSingleClient } from '@scrypted/common/src/listen-cluster'; +import { replacePorts } from '@scrypted/common/src/sdp-utils'; import { RtspServer } from '@scrypted/common/src/rtsp-server' import dgram from 'dgram'; -import { createCryptoLine, getPayloadType, isRtpMessagePayloadType } from './srtp-utils'; +import { createCryptoLine, encodeSrtpOptions, getPayloadType, isRtpMessagePayloadType } from './srtp-utils'; +import child_process, { ChildProcess } from 'child_process'; + +enum CaptureModes { + Default = 'Default', + UDP = 'RTSP+UDP', + TCP = 'RTSP+TCP', + FFmpeg = 'FFmpeg Direct Capture', +} const STREAM_TIMEOUT = 120000; @@ -42,10 +50,11 @@ class RingCameraLight extends ScryptedDeviceBase implements OnOff { class RingCameraDevice extends ScryptedDeviceBase implements Intercom, Settings, DeviceProvider, Camera, MotionSensor, BinarySensor, RTCSignalingChannel, VideoCamera { storageSettings = new StorageSettings(this, { - ffmpegDirectCapture: { - title: 'SIP FFmpeg Direct Capture', - description: 'Experimental: May be faster. May not work.', - type: 'boolean', + captureMode: { + title: 'SIP Gateway', + description: 'The gateway used to import the stream.', + choices: Object.values(CaptureModes), + value: CaptureModes.Default, } }); buttonTimeout: NodeJS.Timeout; @@ -67,10 +76,56 @@ class RingCameraDevice extends ScryptedDeviceBase implements Intercom, Settings, } async startIntercom(media: MediaObject): Promise { - } - async stopIntercom(): Promise { + if (!this.session) + throw new Error("not in call"); + + this.stopIntercom(); + + const ffmpegInput: FFMpegInput = JSON.parse((await mediaManager.convertMediaObjectToBuffer(media, ScryptedMimeTypes.FFmpegInput)).toString()); + + const ringRtpOptions = this.rtpDescription; + let cameraSpeakerActive = false; + const audioOutForwarder = await createBindZero(); + this.audioOutForwarder = audioOutForwarder.server; + audioOutForwarder.server.on('message', message => { + if (!cameraSpeakerActive) { + cameraSpeakerActive = true; + this.session.activateCameraSpeaker().catch(e => this.console.error('camera speaker activation error', e)) + } + + this.session.audioSplitter.send(message, ringRtpOptions.audio.port, ringRtpOptions.address); + return null; + }); + + + const args = ffmpegInput.inputArguments.slice(); + args.push( + '-vn', '-dn', '-sn', + '-acodec', 'pcm_mulaw', + '-flags', '+global_header', + '-ac', '1', + '-ar', '8k', + '-f', 'rtp', + '-srtp_out_suite', 'AES_CM_128_HMAC_SHA1_80', + '-srtp_out_params', encodeSrtpOptions(this.session.rtpOptions.audio), + `srtp://127.0.0.1:${audioOutForwarder.port}?pkt_size=188`, + ); + + const cp = child_process.spawn(await mediaManager.getFFmpegPath(), args); + this.audioOutProcess = cp; + cp.on('exit', () => this.console.log('two way audio ended')); + this.session.onCallEnded.subscribe(() => { + closeQuiet(audioOutForwarder.server); + cp.kill('SIGKILL'); + }); } + async stopIntercom(): Promise { + closeQuiet(this.audioOutForwarder); + this.audioOutProcess?.kill('SIGKILL'); + this.audioOutProcess = undefined; + this.audioOutForwarder = undefined; + } resetStreamTimeout() { this.console.log('starting/refreshing stream'); @@ -103,7 +158,8 @@ class RingCameraDevice extends ScryptedDeviceBase implements Intercom, Settings, const { clientPromise: playbackPromise, port: playbackPort, url: clientUrl } = await listenZeroSingleClient(); - const useRtsp = !this.storageSettings.values.ffmpegDirectCapture; + const useRtsp = this.storageSettings.values.captureMode !== CaptureModes.FFmpeg; + const useRtspTcp = this.storageSettings.values.captureMode === CaptureModes.TCP; const playbackUrl = useRtsp ? `rtsp://127.0.0.1:${playbackPort}` : clientUrl; @@ -122,11 +178,7 @@ class RingCameraDevice extends ScryptedDeviceBase implements Intercom, Settings, } catch (e) { } - try { - udp.close(); - } - catch (e) { - } + closeQuiet(udp); } client.on('close', cleanup); @@ -135,6 +187,7 @@ class RingCameraDevice extends ScryptedDeviceBase implements Intercom, Settings, sip = await camera.createSipSession(undefined); sip.onCallEnded.subscribe(cleanup); this.rtpDescription = await sip.start(); + this.console.log('ring sdp', this.rtpDescription.sdp) const videoPort = useRtsp ? 0 : sip.videoSplitter.address().port; const audioPort = useRtsp ? 0 : sip.audioSplitter.address().port; @@ -159,7 +212,10 @@ class RingCameraDevice extends ScryptedDeviceBase implements Intercom, Settings, 'a=rtcp-mux' ]; - const sdp = inputSdpLines.filter((x) => Boolean(x)).join('\n'); + const proposedSdp = inputSdpLines.filter((x) => Boolean(x)).join('\n'); + const sdp = replacePorts(this.rtpDescription.sdp, audioPort, videoPort); + this.console.log('proposed sdp', sdp); + if (useRtsp) { const rtsp = new RtspServer(client, sdp, udp); rtsp.console = this.console; @@ -170,21 +226,23 @@ class RingCameraDevice extends ScryptedDeviceBase implements Intercom, Settings, sip.videoSplitter.on('message', message => { if (!isStunMessage(message)) { const isRtpMessage = isRtpMessagePayloadType(getPayloadType(message)); - if (!isRtpMessage) - this.console.log('rtcp') + if (!isRtpMessage) { + this.console.log('rtcp'); + } rtsp.sendVideo(message, !isRtpMessage); } }); sip.audioSplitter.on('message', message => { if (!isStunMessage(message)) { const isRtpMessage = isRtpMessagePayloadType(getPayloadType(message)); - if (!isRtpMessage) - this.console.log('rtcp') + if (!isRtpMessage) { + this.console.log('rtcp'); + } rtsp.sendAudio(message, !isRtpMessage); } }); - // sip.requestKeyFrame(); + sip.requestKeyFrame(); this.session = sip; try { @@ -233,7 +291,7 @@ class RingCameraDevice extends ScryptedDeviceBase implements Intercom, Settings, } }); - // sip.requestKeyFrame(); + sip.requestKeyFrame(); this.session = sip; await packetWaiter; @@ -260,7 +318,7 @@ class RingCameraDevice extends ScryptedDeviceBase implements Intercom, Settings, }), inputArguments: [ ...(useRtsp - ? ['-rtsp_transport', 'udp'] + ? ['-rtsp_transport', useRtspTcp ? 'tcp' : 'udp'] : ['-f', 'sdp']), '-i', playbackUrl, ], @@ -272,13 +330,15 @@ class RingCameraDevice extends ScryptedDeviceBase implements Intercom, Settings, } getSipMediaStreamOptions(): MediaStreamOptions { + const useRtsp = this.storageSettings.values.captureMode !== CaptureModes.FFmpeg; + return { id: 'sip', name: 'SIP', // note that the rtsp stream comes from scrypted, // can bypass ffmpeg parsing. // tool: "scrypted", - container: 'rtsp', + container: useRtsp ? 'rtsp' : 'sdp', video: { codec: 'h264', },