common: add auth and timeout support to rtsp client.

This commit is contained in:
Koushik Dutta
2022-03-02 20:28:13 -08:00
parent 75fefbb2cf
commit 8fc8a17d25
5 changed files with 147 additions and 33 deletions

View File

@@ -95,6 +95,7 @@ export async function startParserSession<T extends string>(ffmpegInput: FFMpegIn
let ffmpegIncomingConnectionTimeout: NodeJS.Timeout;
let isActive = true;
const events = new EventEmitter();
// need this to prevent kill from throwing due to uncaught Error during cleanup
events.on('error', e => console.error('rebroadcast error', e));
let inputAudioCodec: string;

View File

@@ -1,10 +1,12 @@
import { readLength, readLine } from './read-stream';
import { Duplex } from 'stream';
import { Duplex, Readable } from 'stream';
import { randomBytes } from 'crypto';
import { StreamChunk, StreamParser } from './stream-parser';
import dgram from 'dgram';
import net from 'net';
import tls from 'tls';
import { DIGEST } from 'http-auth-utils/src/index';
import crypto from 'crypto';
export const RTSP_FRAME_MAGIC = 36;
@@ -20,6 +22,17 @@ export interface RtspStreamParser extends StreamParser {
sdp: Promise<string>;
}
export async function readMessage(client: Readable): Promise<string[]> {
let currentHeaders: string[] = [];
while (true) {
let line = await readLine(client);
line = line.trim();
if (!line)
return currentHeaders;
currentHeaders.push(line);
}
}
export function createRtspParser(): RtspStreamParser {
let resolve: any;
@@ -55,7 +68,7 @@ export function createRtspParser(): RtspStreamParser {
}
}
function parseHeaders(headers: string[]): Headers {
export function parseHeaders(headers: string[]): Headers {
const ret: any = {};
for (const header of headers.slice(1)) {
const index = header.indexOf(':');
@@ -85,21 +98,17 @@ export class RtspBase {
}
async readMessage(): Promise<string[]> {
let currentHeaders: string[] = [];
while (true) {
let line = await readLine(this.client);
line = line.trim();
if (!line)
return currentHeaders;
currentHeaders.push(line);
}
return readMessage(this.client);
}
}
const quote = (str: string): string => `"${str.replace(/"/g, '\\"')}"`;
// probably only works with scrypted rtsp server.
export class RtspClient extends RtspBase {
cseq = 0;
session: string;
authorization: string;
constructor(public url: string) {
super();
@@ -117,7 +126,7 @@ export class RtspClient extends RtspBase {
}
}
async request(method: string, headers?: Headers, path?: string, body?: Buffer) {
writeRequest(method: string, headers?: Headers, path?: string, body?: Buffer, authenticating?: boolean) {
headers = headers || {};
let fullUrl: string;
@@ -126,11 +135,56 @@ export class RtspClient extends RtspBase {
else
fullUrl = this.url;
const sanitized = new URL(fullUrl);
sanitized.username = '';
sanitized.password = '';
fullUrl = sanitized.toString();
const line = `${method} ${fullUrl} RTSP/1.0`;
headers['CSeq'] = (this.cseq++).toString();
const cseq = this.cseq++;
headers['CSeq'] = cseq.toString();
if (this.authorization)
headers['Authorization'] = this.authorization;
if (this.session)
headers['Session'] = this.session;
this.write(line, headers, body);
}
async request(method: string, headers?: Headers, path?: string, body?: Buffer, authenticating?: boolean): Promise<{
headers: Headers,
body: Buffer
}> {
this.writeRequest(method, headers, path, body, authenticating);
const response = parseHeaders(await this.readMessage());
if (response['www-authenticate']) {
if (authenticating)
throw new Error('auth failed');
const parsedUrl = new URL(this.url);
const wwwAuth = DIGEST.parseWWWAuthenticateRest(response['www-authenticate']);
const ha1 = crypto.createHash('md5').update(`${parsedUrl.username}:${wwwAuth.realm}:${parsedUrl.password}`).digest('hex');
const ha2 = crypto.createHash('md5').update(`${method}:${parsedUrl.pathname}`).digest('hex');
const hash = crypto.createHash('md5').update(`${ha1}:${wwwAuth.nonce}:${ha2}`).digest('hex');
const params = {
username: parsedUrl.username,
realm: wwwAuth.realm,
nonce: wwwAuth.nonce,
uri: parsedUrl.pathname,
algorithm: 'MD5',
response: hash,
};
const paramsString = Object.entries(params).map(([key, value]) => `${key}=${value && quote(value)}`).join(', ');
this.authorization = `Digest ${paramsString}`;
return this.request(method, headers, path, body, true);
}
const cl = parseInt(response['content-length']);
if (cl)
return { headers: response, body: await readLength(this.client, cl) };
@@ -138,11 +192,18 @@ export class RtspClient extends RtspBase {
}
async options() {
return this.request('OPTIONS');
const headers: Headers = {};
return this.request('OPTIONS', headers);
}
async describe() {
async writeGetParameter() {
const headers: Headers = {};
return this.writeRequest('GET_PARAMETER', headers);
}
async describe(headers?: Headers) {
return this.request('DESCRIBE', {
...(headers || {}),
Accept: 'application/sdp',
});
}
@@ -151,11 +212,25 @@ export class RtspClient extends RtspBase {
const headers: any = {
Transport: `RTP/AVP/TCP;unicast;interleaved=${channel}-${channel + 1}`,
};
if (this.session)
headers['Session'] = this.session;
const response = await this.request('SETUP', headers, path)
if (response.headers.session)
this.session = response.headers.session;
if (response.headers.session) {
const sessionDict: { [key: string]: string } = {};
for (const part of response.headers.session.split(';')) {
const [key, value] = part.split('=', 2);
sessionDict[key] = value;
}
let timeout = parseInt(sessionDict['timeout']);
if (timeout) {
// if a timeout is requested, need to keep the session alive with periodic refresh.
// one suggestion is calling OPTIONS, but apparently GET_PARAMETER is more reliable.
// https://stackoverflow.com/a/39818378
let interval = (timeout - 5) * 1000;
let timer = setInterval(() => this.writeGetParameter(), interval);
this.client.once('close', () => clearInterval(timer));
}
this.session = response.headers.session.split(';')[0];
}
return response;
}
@@ -163,8 +238,6 @@ export class RtspClient extends RtspBase {
const headers: any = {
Range: 'npt=0.000-',
};
if (this.session)
headers['Session'] = this.session;
await this.request('PLAY', headers);
return this.client;
}

View File

@@ -16,21 +16,27 @@ export function parsePayloadTypes(sdp: string) {
}
}
function getTrackId(track: string) {
if (!track)
return;
const lines = track.split('\n').map(line => line.trim());
const control = lines.find(line => line.startsWith('a=control:'));
return control?.split('a=control:')?.[1];
export function findTrack(sdp: string, type: string, directions: TrackDirection[] = ['recvonly']) {
const tracks = sdp.split('m=').filter(track => track.startsWith(type));
for (const track of tracks) {
for (const dir of directions) {
if (track.includes(`a=${dir}`)) {
const lines = track.split('\n').map(line => line.trim());
const control = lines.find(line => line.startsWith('a=control:'));
return {
section: 'm=' + track,
trackId: control?.split('a=control:')?.[1],
};
}
}
}
}
export function parseTrackIds(sdp: string) {
const tracks = sdp.split('m=');
type TrackDirection = 'sendonly' | 'sendrecv' | 'recvonly';
const audioTrack = tracks.find(track => track.startsWith('audio'));
const videoTrack = tracks.find(track => track.startsWith('video'));
export function parseTrackIds(sdp: string, directions: TrackDirection[] = ['recvonly', 'sendrecv']) {
return {
audio: getTrackId(audioTrack),
video: getTrackId(videoTrack),
audio: findTrack(sdp, 'audio', directions)?.trackId,
video: findTrack(sdp, 'video', directions)?.trackId,
};
}