From 8fc8a17d254745142a3509018f7e62e5d11cadce Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Wed, 2 Mar 2022 20:28:13 -0800 Subject: [PATCH] common: add auth and timeout support to rtsp client. --- common/package-lock.json | 35 +++++++++- common/package.json | 1 + common/src/ffmpeg-rebroadcast.ts | 1 + common/src/rtsp-server.ts | 113 +++++++++++++++++++++++++------ common/src/sdp-utils.ts | 30 ++++---- 5 files changed, 147 insertions(+), 33 deletions(-) diff --git a/common/package-lock.json b/common/package-lock.json index e5d2fe0c0..a58a28cc4 100644 --- a/common/package-lock.json +++ b/common/package-lock.json @@ -11,6 +11,7 @@ "dependencies": { "@koush/werift": "file:../external/werift/packages/webrtc", "@scrypted/sdk": "file:../sdk", + "http-auth-utils": "^3.0.2", "node-fetch-commonjs": "^3.1.1", "typescript": "^4.4.3" }, @@ -72,7 +73,7 @@ }, "../sdk": { "name": "@scrypted/sdk", - "version": "0.0.173", + "version": "0.0.174", "license": "ISC", "dependencies": { "@babel/preset-typescript": "^7.16.7", @@ -152,6 +153,17 @@ "node": ">=12.20.0" } }, + "node_modules/http-auth-utils": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/http-auth-utils/-/http-auth-utils-3.0.2.tgz", + "integrity": "sha512-cQ8957aiUX0lgV1620uIGKGJc0sEuD/QK4ueZ0hb60MGbO0f6ahcuIgPjamAD98D/AUGizKVm+dNvUVHs0f4Ow==", + "dependencies": { + "yerror": "^6.0.0" + }, + "engines": { + "node": ">=12.19.0" + } + }, "node_modules/node-domexception": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/node-domexception/-/node-domexception-1.0.0.tgz", @@ -205,6 +217,14 @@ "engines": { "node": ">= 8" } + }, + "node_modules/yerror": { + "version": "6.0.1", + "resolved": "https://registry.npmjs.org/yerror/-/yerror-6.0.1.tgz", + "integrity": "sha512-0Bxo+NyeucjxhmPB5z3lmI/N/cOu8L1Q8JVta6/I5G6J/JhCSSPwk8qt9N4yOFSjwkvhDwzUSQglfBIAllvi1Q==", + "engines": { + "node": ">=12.19.0" + } } }, "dependencies": { @@ -299,6 +319,14 @@ "fetch-blob": "^3.1.2" } }, + "http-auth-utils": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/http-auth-utils/-/http-auth-utils-3.0.2.tgz", + "integrity": "sha512-cQ8957aiUX0lgV1620uIGKGJc0sEuD/QK4ueZ0hb60MGbO0f6ahcuIgPjamAD98D/AUGizKVm+dNvUVHs0f4Ow==", + "requires": { + "yerror": "^6.0.0" + } + }, "node-domexception": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/node-domexception/-/node-domexception-1.0.0.tgz", @@ -322,6 +350,11 @@ "version": "3.2.0", "resolved": "https://registry.npmjs.org/web-streams-polyfill/-/web-streams-polyfill-3.2.0.tgz", "integrity": "sha512-EqPmREeOzttaLRm5HS7io98goBgZ7IVz79aDvqjD0kYXLtFZTc0T/U6wHTPKyIjb+MdN7DFIIX6hgdBEpWmfPA==" + }, + "yerror": { + "version": "6.0.1", + "resolved": "https://registry.npmjs.org/yerror/-/yerror-6.0.1.tgz", + "integrity": "sha512-0Bxo+NyeucjxhmPB5z3lmI/N/cOu8L1Q8JVta6/I5G6J/JhCSSPwk8qt9N4yOFSjwkvhDwzUSQglfBIAllvi1Q==" } } } diff --git a/common/package.json b/common/package.json index 9eab72edd..ba2666192 100644 --- a/common/package.json +++ b/common/package.json @@ -11,6 +11,7 @@ "dependencies": { "@koush/werift": "file:../external/werift/packages/webrtc", "@scrypted/sdk": "file:../sdk", + "http-auth-utils": "^3.0.2", "node-fetch-commonjs": "^3.1.1", "typescript": "^4.4.3" }, diff --git a/common/src/ffmpeg-rebroadcast.ts b/common/src/ffmpeg-rebroadcast.ts index f7f3c9b93..493fe42d7 100644 --- a/common/src/ffmpeg-rebroadcast.ts +++ b/common/src/ffmpeg-rebroadcast.ts @@ -95,6 +95,7 @@ export async function startParserSession(ffmpegInput: FFMpegIn let ffmpegIncomingConnectionTimeout: NodeJS.Timeout; let isActive = true; const events = new EventEmitter(); + // need this to prevent kill from throwing due to uncaught Error during cleanup events.on('error', e => console.error('rebroadcast error', e)); let inputAudioCodec: string; diff --git a/common/src/rtsp-server.ts b/common/src/rtsp-server.ts index af3b70bcb..f930aa374 100644 --- a/common/src/rtsp-server.ts +++ b/common/src/rtsp-server.ts @@ -1,10 +1,12 @@ import { readLength, readLine } from './read-stream'; -import { Duplex } from 'stream'; +import { Duplex, Readable } from 'stream'; import { randomBytes } from 'crypto'; import { StreamChunk, StreamParser } from './stream-parser'; import dgram from 'dgram'; import net from 'net'; import tls from 'tls'; +import { DIGEST } from 'http-auth-utils/src/index'; +import crypto from 'crypto'; export const RTSP_FRAME_MAGIC = 36; @@ -20,6 +22,17 @@ export interface RtspStreamParser extends StreamParser { sdp: Promise; } +export async function readMessage(client: Readable): Promise { + let currentHeaders: string[] = []; + while (true) { + let line = await readLine(client); + line = line.trim(); + if (!line) + return currentHeaders; + currentHeaders.push(line); + } +} + export function createRtspParser(): RtspStreamParser { let resolve: any; @@ -55,7 +68,7 @@ export function createRtspParser(): RtspStreamParser { } } -function parseHeaders(headers: string[]): Headers { +export function parseHeaders(headers: string[]): Headers { const ret: any = {}; for (const header of headers.slice(1)) { const index = header.indexOf(':'); @@ -85,21 +98,17 @@ export class RtspBase { } async readMessage(): Promise { - let currentHeaders: string[] = []; - while (true) { - let line = await readLine(this.client); - line = line.trim(); - if (!line) - return currentHeaders; - currentHeaders.push(line); - } + return readMessage(this.client); } } +const quote = (str: string): string => `"${str.replace(/"/g, '\\"')}"`; + // probably only works with scrypted rtsp server. export class RtspClient extends RtspBase { cseq = 0; session: string; + authorization: string; constructor(public url: string) { super(); @@ -117,7 +126,7 @@ export class RtspClient extends RtspBase { } } - async request(method: string, headers?: Headers, path?: string, body?: Buffer) { + writeRequest(method: string, headers?: Headers, path?: string, body?: Buffer, authenticating?: boolean) { headers = headers || {}; let fullUrl: string; @@ -126,11 +135,56 @@ export class RtspClient extends RtspBase { else fullUrl = this.url; + const sanitized = new URL(fullUrl); + sanitized.username = ''; + sanitized.password = ''; + fullUrl = sanitized.toString(); + const line = `${method} ${fullUrl} RTSP/1.0`; - headers['CSeq'] = (this.cseq++).toString(); + const cseq = this.cseq++; + headers['CSeq'] = cseq.toString(); + + if (this.authorization) + headers['Authorization'] = this.authorization; + + if (this.session) + headers['Session'] = this.session; + this.write(line, headers, body); + } + + async request(method: string, headers?: Headers, path?: string, body?: Buffer, authenticating?: boolean): Promise<{ + headers: Headers, + body: Buffer + }> { + this.writeRequest(method, headers, path, body, authenticating); const response = parseHeaders(await this.readMessage()); + if (response['www-authenticate']) { + if (authenticating) + throw new Error('auth failed'); + + const parsedUrl = new URL(this.url); + + const wwwAuth = DIGEST.parseWWWAuthenticateRest(response['www-authenticate']); + + const ha1 = crypto.createHash('md5').update(`${parsedUrl.username}:${wwwAuth.realm}:${parsedUrl.password}`).digest('hex'); + const ha2 = crypto.createHash('md5').update(`${method}:${parsedUrl.pathname}`).digest('hex'); + const hash = crypto.createHash('md5').update(`${ha1}:${wwwAuth.nonce}:${ha2}`).digest('hex'); + + const params = { + username: parsedUrl.username, + realm: wwwAuth.realm, + nonce: wwwAuth.nonce, + uri: parsedUrl.pathname, + algorithm: 'MD5', + response: hash, + }; + + const paramsString = Object.entries(params).map(([key, value]) => `${key}=${value && quote(value)}`).join(', '); + this.authorization = `Digest ${paramsString}`; + return this.request(method, headers, path, body, true); + } const cl = parseInt(response['content-length']); if (cl) return { headers: response, body: await readLength(this.client, cl) }; @@ -138,11 +192,18 @@ export class RtspClient extends RtspBase { } async options() { - return this.request('OPTIONS'); + const headers: Headers = {}; + return this.request('OPTIONS', headers); } - async describe() { + async writeGetParameter() { + const headers: Headers = {}; + return this.writeRequest('GET_PARAMETER', headers); + } + + async describe(headers?: Headers) { return this.request('DESCRIBE', { + ...(headers || {}), Accept: 'application/sdp', }); } @@ -151,11 +212,25 @@ export class RtspClient extends RtspBase { const headers: any = { Transport: `RTP/AVP/TCP;unicast;interleaved=${channel}-${channel + 1}`, }; - if (this.session) - headers['Session'] = this.session; const response = await this.request('SETUP', headers, path) - if (response.headers.session) - this.session = response.headers.session; + if (response.headers.session) { + const sessionDict: { [key: string]: string } = {}; + for (const part of response.headers.session.split(';')) { + const [key, value] = part.split('=', 2); + sessionDict[key] = value; + } + let timeout = parseInt(sessionDict['timeout']); + if (timeout) { + // if a timeout is requested, need to keep the session alive with periodic refresh. + // one suggestion is calling OPTIONS, but apparently GET_PARAMETER is more reliable. + // https://stackoverflow.com/a/39818378 + let interval = (timeout - 5) * 1000; + let timer = setInterval(() => this.writeGetParameter(), interval); + this.client.once('close', () => clearInterval(timer)); + } + + this.session = response.headers.session.split(';')[0]; + } return response; } @@ -163,8 +238,6 @@ export class RtspClient extends RtspBase { const headers: any = { Range: 'npt=0.000-', }; - if (this.session) - headers['Session'] = this.session; await this.request('PLAY', headers); return this.client; } diff --git a/common/src/sdp-utils.ts b/common/src/sdp-utils.ts index 84c036cbc..61bc4725b 100644 --- a/common/src/sdp-utils.ts +++ b/common/src/sdp-utils.ts @@ -16,21 +16,27 @@ export function parsePayloadTypes(sdp: string) { } } -function getTrackId(track: string) { - if (!track) - return; - const lines = track.split('\n').map(line => line.trim()); - const control = lines.find(line => line.startsWith('a=control:')); - return control?.split('a=control:')?.[1]; +export function findTrack(sdp: string, type: string, directions: TrackDirection[] = ['recvonly']) { + const tracks = sdp.split('m=').filter(track => track.startsWith(type)); + for (const track of tracks) { + for (const dir of directions) { + if (track.includes(`a=${dir}`)) { + const lines = track.split('\n').map(line => line.trim()); + const control = lines.find(line => line.startsWith('a=control:')); + return { + section: 'm=' + track, + trackId: control?.split('a=control:')?.[1], + }; + } + } + } } -export function parseTrackIds(sdp: string) { - const tracks = sdp.split('m='); +type TrackDirection = 'sendonly' | 'sendrecv' | 'recvonly'; - const audioTrack = tracks.find(track => track.startsWith('audio')); - const videoTrack = tracks.find(track => track.startsWith('video')); +export function parseTrackIds(sdp: string, directions: TrackDirection[] = ['recvonly', 'sendrecv']) { return { - audio: getTrackId(audioTrack), - video: getTrackId(videoTrack), + audio: findTrack(sdp, 'audio', directions)?.trackId, + video: findTrack(sdp, 'video', directions)?.trackId, }; }