diff --git a/plugins/prebuffer-mixin/src/main.ts b/plugins/prebuffer-mixin/src/main.ts index 7e9cf798a..9867dee75 100644 --- a/plugins/prebuffer-mixin/src/main.ts +++ b/plugins/prebuffer-mixin/src/main.ts @@ -529,8 +529,7 @@ class PrebufferSession { }); } else { - const moBuffer = await mediaManager.convertMediaObjectToBuffer(mo, ScryptedMimeTypes.FFmpegInput); - const ffmpegInput = JSON.parse(moBuffer.toString()) as FFmpegInput; + const ffmpegInput: FFmpegInput = await mediaManager.convertMediaObjectToJSON(mo, ScryptedMimeTypes.FFmpegInput); sessionMso = ffmpegInput.mediaStreamOptions || this.advertisedMediaStreamOptions; let { parser, isDefault } = this.getParser(sessionMso); @@ -723,8 +722,7 @@ class PrebufferSession { if (!session.isActive) return; const mo = await this.mixinDevice.getVideoStream(mso); - const moBuffer = await mediaManager.convertMediaObjectToBuffer(mo, ScryptedMimeTypes.FFmpegInput); - const ffmpegInput = JSON.parse(moBuffer.toString()) as FFmpegInput; + const ffmpegInput: FFmpegInput = await mediaManager.convertMediaObjectToJSON(mo, ScryptedMimeTypes.FFmpegInput); mso = ffmpegInput.mediaStreamOptions; scheduleRefresh(mso); diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index c6913cbd2..075c499de 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -262,8 +262,6 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe } function connectTidPeer(tid: number) { - if (tid === undefined) - debugger; let peerPromise = tidPeers.get(tid); if (peerPromise) return peerPromise; @@ -575,7 +573,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe } const result = (async () => { - const threadPeer = new RpcPeer('main', 'thread', (message, reject) => runtimeWorker.send(message, reject)); + const threadPeer = new RpcPeer('main', 'thread', (message, reject, serializationContext) => runtimeWorker.send(message, reject, serializationContext)); threadPeer.params.updateStats = (stats: PluginStats) => { allMemoryStats.set(runtimeWorker, stats.memoryUsage); } diff --git a/server/src/plugin/runtime/node-thread-worker.ts b/server/src/plugin/runtime/node-thread-worker.ts index 61e4f9abb..0848e746e 100644 --- a/server/src/plugin/runtime/node-thread-worker.ts +++ b/server/src/plugin/runtime/node-thread-worker.ts @@ -1,8 +1,41 @@ -import v8 from 'v8'; import worker_threads from "worker_threads"; import { EventEmitter } from "events"; -import { RpcMessage, RpcPeer } from "../../rpc"; +import { RpcMessage, RpcPeer, RpcSerializer } from "../../rpc"; import { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker"; +import { BufferSerializer } from '../../rpc-buffer-serializer'; + + +class BufferTransfer implements RpcSerializer { + bufferSerializer = new BufferSerializer(); + + serialize(value: Buffer, serializationContext?: any): any { + if (!serializationContext) + return this.bufferSerializer.serialize(value); + + // must create a copy. Buffers as allocated by node are not transferable. + const ab = value.buffer.slice(value.byteOffset, value.byteOffset + value.byteLength); + value = Buffer.from(ab); + + serializationContext.transferList ||= []; + const transferList: worker_threads.TransferListItem[] = serializationContext.transferList; + transferList.push(value.buffer); + // can return the value directly, as the buffer is transferred. + return value; + } + + deserialize(serialized: any, serializationContext?: any): any { + if (!serializationContext?.transferList) + return this.bufferSerializer.deserialize(serialized); + // the buffer was transferred, so we can return the value directly. + const u: Uint8Array = serialized; + return Buffer.from(u.buffer, u.byteOffset, u.byteLength); + } +} + +interface PortMessage { + message: any; + serializationContext: any; +} export class NodeThreadWorker extends EventEmitter implements RuntimeWorker { worker: worker_threads.Worker; @@ -78,7 +111,15 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker { static send(message: RpcMessage, port: worker_threads.MessagePort, reject?: (e: Error) => void, serializationContext?: any) { try { - port.postMessage(v8.serialize(message)); + const postMessage: PortMessage = { + message, + serializationContext, + }; + const transferList: worker_threads.TransferListItem[] = serializationContext?.transferList; + // delete the transfer list since that is simply transfered. + if (transferList) + serializationContext.transferList = []; + port.postMessage(postMessage, transferList); } catch (e) { reject?.(e); @@ -86,9 +127,12 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker { } static setupRpcPeer(peer: RpcPeer, port: worker_threads.MessagePort) { - port.on('message', message => peer.handleMessage(v8.deserialize(message))); - peer.transportSafeArgumentTypes.add(Buffer.name); - peer.transportSafeArgumentTypes.add(Uint8Array.name); + port.on('message', (portMessage: PortMessage) => { + const { message, serializationContext } = portMessage; + peer.handleMessage(message, serializationContext); + }); + peer.addSerializer(Buffer, 'Buffer', new BufferTransfer()); + peer.addSerializer(Uint8Array, 'Uint8Array', new BufferTransfer()); } static createRpcPeer(selfName: string, peerName: string, port: worker_threads.MessagePort): RpcPeer { diff --git a/server/src/rpc-buffer-serializer.ts b/server/src/rpc-buffer-serializer.ts index f7ce1e584..89dec00e3 100644 --- a/server/src/rpc-buffer-serializer.ts +++ b/server/src/rpc-buffer-serializer.ts @@ -3,12 +3,10 @@ import { RpcSerializer } from "./rpc"; export class BufferSerializer implements RpcSerializer { serialize(value: Buffer) { console.warn('Using slow buffer serialization. Ensure the peer supports SidebandBufferSerializer.'); - debugger; return value.toString('base64'); } deserialize(serialized: any) { console.warn('Using slow buffer deserialization. Ensure the peer supports SidebandBufferSerializer.'); - debugger; return Buffer.from(serialized, 'base64'); } } diff --git a/server/src/scrypted-plugin-main.ts b/server/src/scrypted-plugin-main.ts index c20e35ab7..c43396fd1 100644 --- a/server/src/scrypted-plugin-main.ts +++ b/server/src/scrypted-plugin-main.ts @@ -1,5 +1,4 @@ import net from 'net'; -import v8 from 'v8'; import worker_threads from "worker_threads"; import { getPluginNodePath } from "./plugin/plugin-npm-dependencies"; import { startPluginRemote } from "./plugin/plugin-remote-worker";