From 19da68884be17a1a38f54f67eed680e82f1debb9 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Fri, 3 Mar 2023 23:17:43 -0800 Subject: [PATCH] server: implement python connectRPCObject --- sdk/package-lock.json | 4 +- sdk/package.json | 2 +- sdk/types/package-lock.json | 4 +- sdk/types/package.json | 2 +- .../scrypted_python/scrypted_sdk/__init__.py | 2 +- sdk/types/src/types.input.ts | 8 +- server/package-lock.json | 14 +-- server/package.json | 2 +- server/python/plugin_remote.py | 97 +++++++++++++++++-- server/python/rpc-iterator-test.py | 5 + server/python/rpc.py | 18 +++- server/python/rpc_reader.py | 72 +++++++------- server/src/plugin/plugin-remote-worker.ts | 23 ++--- server/src/rpc.ts | 17 ++-- server/test/rpc-python-test.ts | 6 ++ 15 files changed, 192 insertions(+), 84 deletions(-) diff --git a/sdk/package-lock.json b/sdk/package-lock.json index a071efd71..56be9a54a 100644 --- a/sdk/package-lock.json +++ b/sdk/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/sdk", - "version": "0.2.73", + "version": "0.2.78", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/sdk", - "version": "0.2.73", + "version": "0.2.78", "license": "ISC", "dependencies": { "@babel/preset-typescript": "^7.18.6", diff --git a/sdk/package.json b/sdk/package.json index 762f10361..0ff34c8a8 100644 --- a/sdk/package.json +++ b/sdk/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/sdk", - "version": "0.2.73", + "version": "0.2.78", "description": "", "main": "dist/src/index.js", "exports": { diff --git a/sdk/types/package-lock.json b/sdk/types/package-lock.json index 7a83e91e1..ee8bc80a6 100644 --- a/sdk/types/package-lock.json +++ b/sdk/types/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/types", - "version": "0.2.67", + "version": "0.2.71", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/types", - "version": "0.2.67", + "version": "0.2.71", "license": "ISC", "devDependencies": { "@types/rimraf": "^3.0.2", diff --git a/sdk/types/package.json b/sdk/types/package.json index 6f65d88b8..451de88ad 100644 --- a/sdk/types/package.json +++ b/sdk/types/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/types", - "version": "0.2.67", + "version": "0.2.71", "description": "", "main": "dist/index.js", "author": "", diff --git a/sdk/types/scrypted_python/scrypted_sdk/__init__.py b/sdk/types/scrypted_python/scrypted_sdk/__init__.py index 5ffb54310..d8315c649 100644 --- a/sdk/types/scrypted_python/scrypted_sdk/__init__.py +++ b/sdk/types/scrypted_python/scrypted_sdk/__init__.py @@ -2,7 +2,6 @@ from __future__ import annotations from .types import * from typing import Optional from zipfile import ZipFile -from asyncio import Future from multiprocessing import Process from typing import Callable import asyncio @@ -31,6 +30,7 @@ class ScryptedStatic: self.remote: Any = None self.api: Any = None self.fork: Callable[[], PluginFork] + self.connectRPCObject: Callable[[Any], asyncio.Task[Any]] def sdk_init(z: ZipFile, r, sm: DeviceManager, dm: SystemManager, mm: MediaManager): global zip diff --git a/sdk/types/src/types.input.ts b/sdk/types/src/types.input.ts index 5e9dceab9..e01ea7e29 100644 --- a/sdk/types/src/types.input.ts +++ b/sdk/types/src/types.input.ts @@ -2086,10 +2086,10 @@ export interface ScryptedStatic { */ connect?(socket: NodeNetSocket, options?: ConnectOptions): void; /** - * Attempt to retrieve an IPC object by directly connecting to the plugin - * that owns the object. All operations on this object will bypass routing + * Attempt to retrieve an RPC object by directly connecting to the plugin + * that created the object. All operations on this object will bypass routing * through the Scrypted Server which typically manages plugin communication. - * This is ideal for sending large amounts of data via IPC. + * This is ideal for sending large amounts of data. */ - ipcObject?(value: T): Promise; + connectRPCObject?(value: T): Promise; } diff --git a/server/package-lock.json b/server/package-lock.json index 8d478d7ff..5a1c84ba5 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -10,7 +10,7 @@ "license": "ISC", "dependencies": { "@mapbox/node-pre-gyp": "^1.0.10", - "@scrypted/types": "^0.2.67", + "@scrypted/types": "^0.2.71", "adm-zip": "^0.5.9", "axios": "^0.21.4", "body-parser": "^1.19.0", @@ -127,9 +127,9 @@ } }, "node_modules/@scrypted/types": { - "version": "0.2.67", - "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.2.67.tgz", - "integrity": "sha512-+f+Sm19Jyt667yt/3w4UTiE6SfsUKaZHU+iqzUMC6bL5TOxUwcnBy+YqZur1TPg4xyCKCgjvZCOhHeLkHnBZkg==" + "version": "0.2.71", + "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.2.71.tgz", + "integrity": "sha512-RRIf9g8YlaI8KrQT0IMOSM1DuPouj+uY3zx7n7QkGJky7pQ4Ktlccu/b3Vt2zi22s5LMaaNl5Z61/Yeg/v+7Tw==" }, "node_modules/@tootallnate/once": { "version": "1.1.2", @@ -3100,9 +3100,9 @@ } }, "@scrypted/types": { - "version": "0.2.67", - "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.2.67.tgz", - "integrity": "sha512-+f+Sm19Jyt667yt/3w4UTiE6SfsUKaZHU+iqzUMC6bL5TOxUwcnBy+YqZur1TPg4xyCKCgjvZCOhHeLkHnBZkg==" + "version": "0.2.71", + "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.2.71.tgz", + "integrity": "sha512-RRIf9g8YlaI8KrQT0IMOSM1DuPouj+uY3zx7n7QkGJky7pQ4Ktlccu/b3Vt2zi22s5LMaaNl5Z61/Yeg/v+7Tw==" }, "@tootallnate/once": { "version": "1.1.2", diff --git a/server/package.json b/server/package.json index 3f8714194..a96b11002 100644 --- a/server/package.json +++ b/server/package.json @@ -4,7 +4,7 @@ "description": "", "dependencies": { "@mapbox/node-pre-gyp": "^1.0.10", - "@scrypted/types": "^0.2.67", + "@scrypted/types": "^0.2.71", "adm-zip": "^0.5.9", "axios": "^0.21.4", "body-parser": "^1.19.0", diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index 5e700aa63..d4aa51662 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -27,6 +27,7 @@ import rpc import rpc_reader import multiprocessing import multiprocessing.connection +import hashlib class SystemDeviceState(TypedDict): lastEventTime: int @@ -210,7 +211,8 @@ class PluginRemote: loop: AbstractEventLoop consoles: Mapping[str, Future[Tuple[StreamReader, StreamWriter]]] = {} - def __init__(self, api, pluginId, hostInfo, loop: AbstractEventLoop): + def __init__(self, peer: rpc.RpcPeer, api, pluginId, hostInfo, loop: AbstractEventLoop): + self.peer = peer self.api = api self.pluginId = pluginId self.hostInfo = hostInfo @@ -255,6 +257,87 @@ class PluginRemote: nativeId, *values, sep=sep, end=end, flush=flush), self.loop) async def loadZip(self, packageJson, zipData, options: dict=None): + sdk = ScryptedStatic() + + clusterId = options['clusterId'] + clusterSecret = options['clusterSecret'] + + async def handleClusterClient(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + peer: rpc.RpcPeer + peer, peerReadLoop = await rpc_reader.prepare_peer_readloop(self.loop, reader = reader, writer = writer) + async def connectRPCObject(id: str, secret: str): + m = hashlib.sha256() + m.update(bytes('%s%s' % (clusterPort, clusterSecret), 'utf8')) + portSecret = m.hexdigest() + if secret != portSecret: + raise Exception('secret incorrect') + return self.peer.localProxyMap.get(id, None) + + peer.params['connectRPCObject'] = connectRPCObject + try: + await peerReadLoop() + except: + writer.close() + + clusterRpcServer = await asyncio.start_server(handleClusterClient, '127.0.0.1') + clusterPort = clusterRpcServer.sockets[0].getsockname()[1] + + clusterPeers: Mapping[int, asyncio.Future[rpc.RpcPeer]] = {} + async def connectRPCObject(value): + clusterObject = getattr(value, '__cluster') + if not clusterObject: + return value + + if clusterObject.get('id', None) != clusterId: + return value + + port = clusterObject['port'] + proxyId = clusterObject['proxyId'] + + clusterPeerPromise = clusterPeers.get(port) + if not clusterPeerPromise: + async def connectClusterPeer(): + reader, writer = await asyncio.open_connection( + '127.0.0.1', port) + peer, peerReadLoop = await rpc_reader.prepare_peer_readloop(self.loop, reader = reader, writer = writer) + async def run_loop(): + try: + await peerReadLoop() + except: + clusterPeers.pop(port) + asyncio.run_coroutine_threadsafe(run_loop(), self.loop) + return peer + clusterPeerPromise = self.loop.create_task(connectClusterPeer()) + clusterPeers[port] = clusterPeerPromise + + try: + clusterPeer = await clusterPeerPromise + c = await clusterPeer.getParam('connectRPCObject') + m = hashlib.sha256() + m.update(bytes('%s%s' % (port, clusterSecret), 'utf8')) + portSecret = m.hexdigest() + newValue = await c(proxyId, portSecret) + if not newValue: + raise Exception('ipc object not found?') + return newValue + except Exception as e: + return value + + sdk.connectRPCObject = connectRPCObject + + def onProxySerialization(value: Any, proxyId: str): + properties: dict = rpc.RpcPeer.getProxyProperties(value) + if not properties or not properties.get('__cluster', None): + properties = properties or {} + properties['__cluster'] = { + 'clusterId': clusterId, + 'proxyId': proxyId, + 'port': clusterPort, + } + rpc.RpcPeer.setProxyProperties(value, properties) + + self.peer.onProxySerialization = onProxySerialization + forkMain = options and options.get('fork') if not forkMain: @@ -352,7 +435,6 @@ class PluginRemote: try: from scrypted_sdk import sdk_init2 # type: ignore - sdk = ScryptedStatic() sdk.systemManager = self.systemManager sdk.deviceManager = self.deviceManager sdk.mediaManager = self.mediaManager @@ -367,10 +449,10 @@ class PluginRemote: pluginFork.worker.start() async def getFork(): fd = os.dup(parent_conn.fileno()) - peer, readLoop = await rpc_reader.prepare_peer_readloop(self.loop, fd, fd) - peer.peerName = 'thread' + forkPeer, readLoop = await rpc_reader.prepare_peer_readloop(self.loop, fd, fd) + forkPeer.peerName = 'thread' asyncio.run_coroutine_threadsafe(readLoop(), loop=self.loop) - getRemote = await peer.getParam('getRemote') + getRemote = await forkPeer.getParam('getRemote') remote: PluginRemote = await getRemote(self.api, self.pluginId, self.hostInfo) await remote.setSystemState(self.systemManager.getSystemState()) for nativeId, ds in self.nativeIds.items(): @@ -379,12 +461,12 @@ class PluginRemote: forkOptions['fork'] = True forkOptions['filename'] = zipPath return await remote.loadZip(packageJson, zipData, forkOptions) - pluginFork.result = asyncio.create_task(getFork()) return pluginFork sdk.fork = host_fork + # sdk. sdk_init2(sdk) except: @@ -457,8 +539,7 @@ class PluginRemote: async def plugin_async_main(loop: AbstractEventLoop, readFd: int, writeFd: int): peer, readLoop = await rpc_reader.prepare_peer_readloop(loop, readFd, writeFd) peer.params['print'] = print - peer.params['getRemote'] = lambda api, pluginId, hostInfo: PluginRemote( - api, pluginId, hostInfo, loop) + peer.params['getRemote'] = lambda api, pluginId, hostInfo: PluginRemote(peer, api, pluginId, hostInfo, loop) async def get_update_stats(): update_stats = await peer.getParam('updateStats') diff --git a/server/python/rpc-iterator-test.py b/server/python/rpc-iterator-test.py index d4e9bcf6f..6709efd09 100644 --- a/server/python/rpc-iterator-test.py +++ b/server/python/rpc-iterator-test.py @@ -6,6 +6,11 @@ async def main(): peer, peerReadLoop = await prepare_peer_readloop(loop, 4, 3) peer.params['foo'] = 3 + reader, writer = await asyncio.open_connection( + '127.0.0.1', 6666) + + writer.write(bytes('abcd', 'utf8')) + async def ticker(delay, to): for i in range(to): # print(i) diff --git a/server/python/rpc.py b/server/python/rpc.py index 695defdb9..6625294df 100644 --- a/server/python/rpc.py +++ b/server/python/rpc.py @@ -100,6 +100,7 @@ class RpcPeer: self.pendingResults: Mapping[str, Future] = {} self.remoteWeakProxies: Mapping[str, any] = {} self.nameDeserializerMap: Mapping[str, RpcSerializer] = {} + self.onProxySerialization: Callable[[Any, str], Any] = None def __apply__(self, proxyId: str, oneWayMethods: List[str], method: str, args: list): serializationContext: Dict = {} @@ -165,7 +166,13 @@ class RpcPeer: '__serialized_value': serialized, } - def getProxyProperties(self, value): + def getProxyProperties(value): + return getattr(value, '__proxy_props', None) + + def setProxyProperties(value, properties): + setattr(value, '__proxy_props', properties) + + def prepareProxyProperties(value): if not hasattr(value, '__aiter__') or not hasattr(value, '__anext__'): return getattr(value, '__proxy_props', None) @@ -195,7 +202,7 @@ class RpcPeer: '__remote_proxy_id': proxiedEntry['id'], '__remote_proxy_finalizer_id': proxiedEntry['finalizerId'], '__remote_constructor_name': __remote_constructor_name, - '__remote_proxy_props': self.getProxyProperties(value), + '__remote_proxy_props': RpcPeer.prepareProxyProperties(value), '__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None), } return ret @@ -218,7 +225,7 @@ class RpcPeer: '__remote_proxy_id': None, '__remote_proxy_finalizer_id': None, '__remote_constructor_name': __remote_constructor_name, - '__remote_proxy_props': self.getProxyProperties(value), + '__remote_proxy_props': RpcPeer.prepareProxyProperties(value), '__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None), '__serialized_value': serialized, } @@ -233,11 +240,14 @@ class RpcPeer: self.localProxied[value] = proxiedEntry self.localProxyMap[proxyId] = value + if self.onProxySerialization: + self.onProxySerialization(value, proxyId) + ret = { '__remote_proxy_id': proxyId, '__remote_proxy_finalizer_id': proxyId, '__remote_constructor_name': __remote_constructor_name, - '__remote_proxy_props': self.getProxyProperties(value), + '__remote_proxy_props': RpcPeer.prepareProxyProperties(value), '__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None), } diff --git a/server/python/rpc_reader.py b/server/python/rpc_reader.py index fc54cc874..5dd58d527 100644 --- a/server/python/rpc_reader.py +++ b/server/python/rpc_reader.py @@ -36,39 +36,52 @@ class SidebandBufferSerializer(rpc.RpcSerializer): buffer = buffers.pop() return buffer -async def readLoop(loop, peer: rpc.RpcPeer, reader): +async def readLoop(loop, peer: rpc.RpcPeer, reader: asyncio.StreamReader): deserializationContext = { 'buffers': [] } while True: - try: - lengthBytes = await reader.read(4) - typeBytes = await reader.read(1) - type = typeBytes[0] - length = int.from_bytes(lengthBytes, 'big') - data = await reader.read(length - 1) + lengthBytes = await reader.read(4) + typeBytes = await reader.read(1) + type = typeBytes[0] + length = int.from_bytes(lengthBytes, 'big') + data = await reader.read(length - 1) - if type == 1: - deserializationContext['buffers'].append(data) - continue + if type == 1: + deserializationContext['buffers'].append(data) + continue - message = json.loads(data) - asyncio.run_coroutine_threadsafe( - peer.handleMessage(message, deserializationContext), loop) + message = json.loads(data) + asyncio.run_coroutine_threadsafe( + peer.handleMessage(message, deserializationContext), loop) - deserializationContext = { - 'buffers': [] - } - except Exception as e: - print('read loop error: ' + peer.peerName, e) - sys.exit() + deserializationContext = { + 'buffers': [] + } -async def prepare_peer_readloop(loop: AbstractEventLoop, readFd: int, writeFd: int): - reader = await aiofiles.open(readFd, mode='rb') +async def prepare_peer_readloop(loop: AbstractEventLoop, readFd: int = None, writeFd: int = None, reader: asyncio.StreamReader = None, writer: asyncio.StreamWriter = None): + reader = reader or await aiofiles.open(readFd, mode='rb') mutex = threading.Lock() + if writer: + def write(buffers, reject): + try: + for b in buffers: + writer.write(b) + except Exception as e: + if reject: + reject(e) + else: + def write(buffers, reject): + try: + for b in buffers: + os.write(writeFd, b) + except Exception as e: + if reject: + reject(e) + def send(message, reject=None, serializationContext=None): with mutex: if serializationContext: @@ -78,27 +91,14 @@ async def prepare_peer_readloop(loop: AbstractEventLoop, readFd: int, writeFd: i length = len(buffer) + 1 lb = length.to_bytes(4, 'big') type = 1 - try: - os.write(writeFd, lb) - os.write(writeFd, bytes([type])) - os.write(writeFd, buffer) - except Exception as e: - if reject: - reject(e) - return + write([lb, bytes([type]), buffer], reject) jsonString = json.dumps(message) b = bytes(jsonString, 'utf8') length = len(b) + 1 lb = length.to_bytes(4, 'big') type = 0 - try: - os.write(writeFd, lb) - os.write(writeFd, bytes([type])) - os.write(writeFd, b) - except Exception as e: - if reject: - reject(e) + write([lb, bytes([type]), b], reject) peer = rpc.RpcPeer(send) peer.nameDeserializerMap['Buffer'] = SidebandBufferSerializer() diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index 24d9b1067..1be242602 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -17,6 +17,7 @@ import { attachPluginRemote, DeviceManagerImpl, PluginReader, setupPluginRemote import { PluginStats, startStatsUpdater } from './plugin-remote-stats'; import { createREPLServer } from './plugin-repl'; import { NodeThreadWorker } from './runtime/node-thread-worker'; +import crypto from 'crypto'; const { link } = require('linkfs'); const serverVersion = require('../../package.json').version; @@ -79,13 +80,11 @@ export function startPluginRemote(pluginId: string, peerSend: (message: RpcMessa async onLoadZip(scrypted: ScryptedStatic, params: any, packageJson: any, zipData: Buffer | string, zipOptions: PluginRemoteLoadZipOptions) { const { clusterId, clusterSecret } = zipOptions; const clusterRpcServer = net.createServer(client => { - console.error('connecting', peer.selfName) const clusterPeer = createDuplexRpcPeer(peer.selfName, 'cluster-client', client, client); - clusterPeer.params['ipcObject'] = async (id: string, secret: string) => { - if (secret !== clusterSecret) + const portSecret = crypto.createHash('sha256').update(`${clusterPort}${clusterSecret}`).digest().toString('hex'); + clusterPeer.params['connectRPCObject'] = async (id: string, secret: string) => { + if (secret !== portSecret) throw new Error('secret incorrect'); - // console.error('lmap', peer.localProxyMap) - // console.error('rmap', peer.remoteWeakProxies) return peer.localProxyMap[id]; } client.on('close', () => clusterPeer.kill('cluster socket closed')); @@ -97,20 +96,21 @@ export function startPluginRemote(pluginId: string, peerSend: (message: RpcMessa }; peer.onProxySerialization = (value, proxyId) => { - const properties = RpcPeer.getProxyProperies(value); - if (!properties?.__cluster) + const properties = RpcPeer.getProxyProperties(value); + if (!properties?.__cluster) { RpcPeer.setProxyProperties(value, Object.assign(properties || {}, { __cluster: { ...clusterEntry, proxyId, } })); + } } const clusterPeers = new Map>(); - scrypted.ipcObject = async (value: any) => { + scrypted.connectRPCObject = async (value: any) => { const clusterObject = value?.__cluster; - if (clusterObject?.id !== zipOptions.clusterId) + if (clusterObject?.id !== clusterId) return value; const { port, proxyId } = clusterObject; @@ -135,8 +135,9 @@ export function startPluginRemote(pluginId: string, peerSend: (message: RpcMessa try { const clusterPeer = await clusterPeerPromise; - const ipcObject = await clusterPeer.getParam('ipcObject'); - const newValue = await ipcObject(proxyId, clusterSecret); + const connectRPCObject = await clusterPeer.getParam('connectRPCObject'); + const portSecret = crypto.createHash('sha256').update(`${port}${clusterSecret}`).digest().toString('hex'); + const newValue = await connectRPCObject(proxyId, portSecret); if (!newValue) throw new Error('ipc object not found?'); return newValue; diff --git a/server/src/rpc.ts b/server/src/rpc.ts index 2920bb1a7..b31293222 100644 --- a/server/src/rpc.ts +++ b/server/src/rpc.ts @@ -347,10 +347,15 @@ export class RpcPeer { value[RpcPeer.PROPERTY_PROXY_PROPERTIES] = properties; } - static getProxyProperies(value: any) { + static getProxyProperties(value: any) { + return value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES]; + } + + static prepareProxyProperties(value: any) { + let props = value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES]; if (!value[Symbol.asyncIterator]) - return value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES]; - const props = value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES] || {}; + return props; + props ||= {}; props[Symbol.asyncIterator.toString()] = { next: 'next', throw: 'throw', @@ -568,7 +573,7 @@ export class RpcPeer { __remote_proxy_id: proxiedEntry.id, __remote_proxy_finalizer_id, __remote_constructor_name, - __remote_proxy_props: RpcPeer.getProxyProperies(value), + __remote_proxy_props: RpcPeer.prepareProxyProperties(value), __remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS], } return ret; @@ -595,7 +600,7 @@ export class RpcPeer { __remote_proxy_id: undefined, __remote_proxy_finalizer_id: undefined, __remote_constructor_name, - __remote_proxy_props: RpcPeer.getProxyProperies(value), + __remote_proxy_props: RpcPeer.prepareProxyProperties(value), __remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS], __serialized_value: serialized, } @@ -616,7 +621,7 @@ export class RpcPeer { __remote_proxy_id, __remote_proxy_finalizer_id: __remote_proxy_id, __remote_constructor_name, - __remote_proxy_props: RpcPeer.getProxyProperies(value), + __remote_proxy_props: RpcPeer.prepareProxyProperties(value), __remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS], } diff --git a/server/test/rpc-python-test.ts b/server/test/rpc-python-test.ts index ec817e400..9962ab8eb 100644 --- a/server/test/rpc-python-test.ts +++ b/server/test/rpc-python-test.ts @@ -3,8 +3,14 @@ import path from 'path'; import type { Readable, Writable } from "stream"; import { createDuplexRpcPeer } from '../src/rpc-serializer'; import assert from 'assert'; +import net from 'net'; async function main() { + const server = net.createServer(client => { + console.log('got client'); + client.on('data', b => console.log('data', b.toString())); + }); + server.listen(6666); const cp = child_process.spawn('python3', [path.join(__dirname, '../python/rpc-iterator-test.py')], { stdio: ['pipe', 'inherit', 'inherit', 'pipe', 'pipe'],