server: working transferible buffers

This commit is contained in:
Koushik Dutta
2024-08-19 13:20:21 -07:00
parent bfbc6ba6ce
commit 2da762dfc2
5 changed files with 53 additions and 16 deletions

View File

@@ -1,8 +1,41 @@
import v8 from 'v8';
import worker_threads from "worker_threads";
import { EventEmitter } from "events";
import { RpcMessage, RpcPeer } from "../../rpc";
import { RpcMessage, RpcPeer, RpcSerializer } from "../../rpc";
import { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker";
import { BufferSerializer } from '../../rpc-buffer-serializer';
class BufferTransfer implements RpcSerializer {
bufferSerializer = new BufferSerializer();
serialize(value: Buffer, serializationContext?: any): any {
if (!serializationContext)
return this.bufferSerializer.serialize(value);
// must create a copy. Buffers as allocated by node are not transferable.
const ab = value.buffer.slice(value.byteOffset, value.byteOffset + value.byteLength);
value = Buffer.from(ab);
serializationContext.transferList ||= [];
const transferList: worker_threads.TransferListItem[] = serializationContext.transferList;
transferList.push(value.buffer);
// can return the value directly, as the buffer is transferred.
return value;
}
deserialize(serialized: any, serializationContext?: any): any {
if (!serializationContext?.transferList)
return this.bufferSerializer.deserialize(serialized);
// the buffer was transferred, so we can return the value directly.
const u: Uint8Array = serialized;
return Buffer.from(u.buffer, u.byteOffset, u.byteLength);
}
}
interface PortMessage {
message: any;
serializationContext: any;
}
export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
worker: worker_threads.Worker;
@@ -78,7 +111,15 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
static send(message: RpcMessage, port: worker_threads.MessagePort, reject?: (e: Error) => void, serializationContext?: any) {
try {
port.postMessage(v8.serialize(message));
const postMessage: PortMessage = {
message,
serializationContext,
};
const transferList: worker_threads.TransferListItem[] = serializationContext?.transferList;
// delete the transfer list since that is simply transfered.
if (transferList)
serializationContext.transferList = [];
port.postMessage(postMessage, transferList);
}
catch (e) {
reject?.(e);
@@ -86,9 +127,12 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
}
static setupRpcPeer(peer: RpcPeer, port: worker_threads.MessagePort) {
port.on('message', message => peer.handleMessage(v8.deserialize(message)));
peer.transportSafeArgumentTypes.add(Buffer.name);
peer.transportSafeArgumentTypes.add(Uint8Array.name);
port.on('message', (portMessage: PortMessage) => {
const { message, serializationContext } = portMessage;
peer.handleMessage(message, serializationContext);
});
peer.addSerializer(Buffer, 'Buffer', new BufferTransfer());
peer.addSerializer(Uint8Array, 'Uint8Array', new BufferTransfer());
}
static createRpcPeer(selfName: string, peerName: string, port: worker_threads.MessagePort): RpcPeer {