From 997a4732ec70aeee138b8dd326ed2cd15d49bb44 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Fri, 17 Mar 2023 23:21:07 -0700 Subject: [PATCH] server: additional python rpc transport fixes --- server/package-lock.json | 4 ++-- server/python/rpc_reader.py | 27 +++++++++++++++++++++------ 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/server/package-lock.json b/server/package-lock.json index ecea402b3..f70367286 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/server", - "version": "0.7.13", + "version": "0.7.14", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/server", - "version": "0.7.13", + "version": "0.7.14", "license": "ISC", "dependencies": { "@mapbox/node-pre-gyp": "^1.0.10", diff --git a/server/python/rpc_reader.py b/server/python/rpc_reader.py index e14348630..64660f187 100644 --- a/server/python/rpc_reader.py +++ b/server/python/rpc_reader.py @@ -13,6 +13,7 @@ import rpc import concurrent.futures import json + class BufferSerializer(rpc.RpcSerializer): def serialize(self, value, serializationContext): return base64.b64encode(value).decode('utf8') @@ -35,6 +36,7 @@ class SidebandBufferSerializer(rpc.RpcSerializer): buffer = buffers.pop() return buffer + class RpcTransport: async def prepare(self): pass @@ -48,6 +50,7 @@ class RpcTransport: def writeJSON(self, json, reject): pass + class RpcFileTransport(RpcTransport): reader: asyncio.StreamReader @@ -71,7 +74,7 @@ class RpcFileTransport(RpcTransport): return data message = json.loads(data) return message - + def writeMessage(self, type: int, buffer, reject): length = len(buffer) + 1 lb = length.to_bytes(4, 'big') @@ -88,15 +91,24 @@ class RpcFileTransport(RpcTransport): def writeBuffer(self, buffer, reject): return self.writeMessage(1, buffer, reject) + class RpcStreamTransport(RpcTransport): def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: super().__init__() self.reader = reader self.writer = writer - async def read(self, n: int): - return await self.reader.readexactly(n) - + async def read(self): + lengthBytes = await self.reader.readexactly(4) + typeBytes = await self.reader.readexactly(1) + type = typeBytes[0] + length = int.from_bytes(lengthBytes, 'big') + data = await self.reader.readexactly(length - 1) + if type == 1: + return data + message = json.loads(data) + return message + def writeMessage(self, type: int, buffer, reject): length = len(buffer) + 1 lb = length.to_bytes(4, 'big') @@ -113,6 +125,7 @@ class RpcStreamTransport(RpcTransport): def writeBuffer(self, buffer, reject): return self.writeMessage(1, buffer, reject) + class RpcConnectionTransport(RpcTransport): def __init__(self, connection: multiprocessing.connection.Connection) -> None: super().__init__() @@ -121,7 +134,7 @@ class RpcConnectionTransport(RpcTransport): async def read(self): return await asyncio.get_event_loop().run_in_executor(self.executor, lambda: self.connection.recv()) - + def writeMessage(self, json, reject): try: self.connection.send(json) @@ -131,10 +144,11 @@ class RpcConnectionTransport(RpcTransport): def writeJSON(self, json, reject): return self.writeMessage(json, reject) - + def writeBuffer(self, buffer, reject): return self.writeMessage(bytes(buffer), reject) + async def readLoop(loop, peer: rpc.RpcPeer, rpcTransport: RpcTransport): deserializationContext = { 'buffers': [] @@ -154,6 +168,7 @@ async def readLoop(loop, peer: rpc.RpcPeer, rpcTransport: RpcTransport): 'buffers': [] } + async def prepare_peer_readloop(loop: AbstractEventLoop, rpcTransport: RpcTransport): await rpcTransport.prepare()