client: fix connection race for webrtc

This commit is contained in:
Koushik Dutta
2023-01-27 15:00:22 -08:00
parent 6ce6daf832
commit ae8d9bbdd3

View File

@@ -226,7 +226,6 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
const explicitBaseUrl = baseUrl || `${globalThis.location.protocol}//${globalThis.location.host}`;
let rpcPeer: RpcPeer;
// underlying webrtc rpc transport may queue up messages before its ready to be to be handled.
// watch for this flush.
const flush = new Deferred<void>();
@@ -251,7 +250,7 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
localEioOptions.extraHeaders['Authorization'] ||= authorization;
let sockets: IOClientSocket[] = [];
type EIOResult = { ready: IOClientSocket, connectionType: ScryptedClientConnectionType, address?: string };
type EIOResult = { ready: IOClientSocket, connectionType: ScryptedClientConnectionType, address?: string, rpcPeer?: RpcPeer };
const promises: Promise<EIOResult>[] = [];
if (tryLocalAddressess) {
@@ -284,13 +283,124 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
rejectUnauthorized: false,
transports: options?.transports,
};
const check = new eio.Socket(explicitBaseUrl, webrtcEioOptions);
const check = new eio.Socket(explicitBaseUrl, webrtcEioOptions) as IOClientSocket;
sockets.push(check);
promises.push((async () => {
await once(check, 'open');
const connectionManagementId = `connectionManagement-${Math.random()}`;
const updateSessionId = `updateSessionId-${Math.random()}`;
check.send(JSON.stringify({
pluginId,
updateSessionId,
connectionManagementId,
}));
const dcDeferred = new Deferred<RTCDataChannel>();
const session = new BrowserSignalingSession();
const droppedMessages: any[] = [];
session.onPeerConnection = async pc => {
pc.ondatachannel = e => {
e.channel.onmessage = message => droppedMessages.push(message);
e.channel.binaryType = 'arraybuffer';
dcDeferred.resolve(e.channel)
};
}
const pcPromise = session.pcDeferred.promise;
const serializer = createRpcSerializer({
sendMessageBuffer: buffer => check.send(buffer),
sendMessageFinish: message => check.send(JSON.stringify(message)),
});
const upgradingPeer = new RpcPeer(clientName || 'webrtc-upgrade', "api", (message, reject, serializationContext) => {
try {
serializer.sendMessage(message, reject, serializationContext);
}
catch (e) {
reject?.(e);
}
});
check.on('message', data => {
if (data.constructor === Buffer || data.constructor === ArrayBuffer) {
serializer.onMessageBuffer(Buffer.from(data));
}
else {
serializer.onMessageFinish(JSON.parse(data as string));
}
});
serializer.setupRpcPeer(upgradingPeer);
const readyClose = new Promise<RpcPeer>((resolve, reject) => {
check.on('close', () => reject(new Error('closed')))
})
upgradingPeer.params['session'] = session;
const pc = await pcPromise;
console.log('peer connection received');
await waitPeerConnectionIceConnected(pc);
console.log('waiting for data channel');
const dc = await dcDeferred.promise;
console.log('datachannel received', Date.now() - start);
const debouncer = new DataChannelDebouncer(dc, e => {
console.error('datachannel send error', e);
rpcPeer.kill('datachannel send error');
});
const dcSerializer = createRpcDuplexSerializer({
write: (data) => debouncer.send(data),
});
while (droppedMessages.length) {
const message = droppedMessages.shift();
dc.dispatchEvent(message);
}
const rpcPeer = new RpcPeer('webrtc-client', "api", (message, reject, serializationContext) => {
try {
dcSerializer.sendMessage(message, reject, serializationContext);
}
catch (e) {
reject?.(e);
pc.close();
}
});
dcSerializer.setupRpcPeer(rpcPeer);
rpcPeer.params['connectionManagementId'] = connectionManagementId;
rpcPeer.params['updateSessionId'] = updateSessionId;
rpcPeer.params['browserSignalingSession'] = session;
waitPeerIceConnectionClosed(pc).then(() => check.close());
check.on('close', () => {
console.log('datachannel upgrade cancelled/closed');
pc.close()
});
await new Promise(resolve => {
let buffers: Buffer[] = [];
dc.onmessage = message => {
buffers.push(Buffer.from(message.data));
resolve(undefined);
flush.promise.finally(() => {
if (buffers) {
for (const buffer of buffers) {
dcSerializer.onData(Buffer.from(buffer));
}
buffers = undefined;
}
dc.onmessage = message => dcSerializer.onData(Buffer.from(message.data));
});
};
});
return {
ready: check,
connectionType: 'webrtc',
rpcPeer,
};
})());
}
@@ -298,7 +408,8 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
const p2pPromises = [...promises];
promises.push((async () => {
const waitDuration = tryWebrtc ? 10000 : (tryLocalAddressess ? 1000 : 0);
const waitDuration = tryWebrtc ? 3000 : (tryLocalAddressess ? 1000 : 0);
console.log('waiting', waitDuration);
if (waitDuration) {
// give the peer to peers a second, but then try connecting directly.
try {
@@ -322,134 +433,10 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
})());
const any = Promise.any(promises);
const { ready, connectionType, address } = await any;
let { ready, connectionType, address, rpcPeer } = await any;
console.log('connected', connectionType, address)
if (connectionType === 'webrtc') {
console.log('using peer to peer', Date.now() - start);
const connectionManagementId = `connectionManagement-${Math.random()}`;
const updateSessionId = `updateSessionId-${Math.random()}`;
ready.send(JSON.stringify({
pluginId,
updateSessionId,
connectionManagementId,
}));
const dcDeferred = new Deferred<RTCDataChannel>();
const session = new BrowserSignalingSession();
const droppedMessages: any[] = [];
session.onPeerConnection = async pc => {
pc.ondatachannel = e => {
e.channel.onmessage = message => droppedMessages.push(message);
e.channel.binaryType = 'arraybuffer';
dcDeferred.resolve(e.channel)
};
}
const pcPromise = session.pcDeferred.promise;
const serializer = createRpcSerializer({
sendMessageBuffer: buffer => ready.send(buffer),
sendMessageFinish: message => ready.send(JSON.stringify(message)),
});
const upgradingPeer = new RpcPeer(clientName || 'webrtc-upgrade', "api", (message, reject, serializationContext) => {
try {
serializer.sendMessage(message, reject, serializationContext);
}
catch (e) {
reject?.(e);
}
});
ready.on('message', data => {
if (data.constructor === Buffer || data.constructor === ArrayBuffer) {
serializer.onMessageBuffer(Buffer.from(data));
}
else {
serializer.onMessageFinish(JSON.parse(data as string));
}
});
serializer.setupRpcPeer(upgradingPeer);
const readyClose = new Promise<RpcPeer>((resolve, reject) => {
ready.on('close', () => reject(new Error('closed')))
})
upgradingPeer.params['session'] = session;
rpcPeer = await Promise.race([readyClose, timeoutFunction(10000, async (isTimedOut) => {
const pc = await pcPromise;
console.log('peer connection received');
await waitPeerConnectionIceConnected(pc);
console.log('waiting for data channel');
const dc = await dcDeferred.promise;
console.log('datachannel received', Date.now() - start);
const debouncer = new DataChannelDebouncer(dc, e => {
console.error('datachannel send error', e);
ret.kill('datachannel send error');
});
const serializer = createRpcDuplexSerializer({
write: (data) => debouncer.send(data),
});
while (droppedMessages.length) {
const message = droppedMessages.shift();
dc.dispatchEvent(message);
}
const ret = new RpcPeer('webrtc-client', "api", (message, reject, serializationContext) => {
try {
serializer.sendMessage(message, reject, serializationContext);
}
catch (e) {
reject?.(e);
pc.close();
}
});
serializer.setupRpcPeer(ret);
ret.params['connectionManagementId'] = connectionManagementId;
ret.params['updateSessionId'] = updateSessionId;
ret.params['browserSignalingSession'] = session;
waitPeerIceConnectionClosed(pc).then(() => ready.close());
ready.on('close', () => {
console.log('datachannel upgrade cancelled/closed');
pc.close()
});
await new Promise(resolve => {
let buffers: Buffer[] = [];
dc.onmessage = message => {
buffers.push(Buffer.from(message.data));
resolve(undefined);
flush.promise.finally(() => {
if (buffers) {
for (const buffer of buffers) {
serializer.onData(Buffer.from(buffer));
}
buffers = undefined;
}
dc.onmessage = message => serializer.onData(Buffer.from(message.data));
});
};
});
if (isTimedOut()) {
console.log('peer connection established too late. closing.', Date.now() - start);
ready.close();
}
else {
console.log('peer connection api connected', Date.now() - start);
}
return ret;
})]);
}
socket = ready;
sockets = sockets.filter(s => s !== ready);