server: implement python connectRPCObject

This commit is contained in:
Koushik Dutta
2023-03-03 23:17:43 -08:00
parent 544349de8d
commit 19da68884b
15 changed files with 192 additions and 84 deletions

4
sdk/package-lock.json generated
View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/sdk",
"version": "0.2.73",
"version": "0.2.78",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/sdk",
"version": "0.2.73",
"version": "0.2.78",
"license": "ISC",
"dependencies": {
"@babel/preset-typescript": "^7.18.6",

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/sdk",
"version": "0.2.73",
"version": "0.2.78",
"description": "",
"main": "dist/src/index.js",
"exports": {

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/types",
"version": "0.2.67",
"version": "0.2.71",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/types",
"version": "0.2.67",
"version": "0.2.71",
"license": "ISC",
"devDependencies": {
"@types/rimraf": "^3.0.2",

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/types",
"version": "0.2.67",
"version": "0.2.71",
"description": "",
"main": "dist/index.js",
"author": "",

View File

@@ -2,7 +2,6 @@ from __future__ import annotations
from .types import *
from typing import Optional
from zipfile import ZipFile
from asyncio import Future
from multiprocessing import Process
from typing import Callable
import asyncio
@@ -31,6 +30,7 @@ class ScryptedStatic:
self.remote: Any = None
self.api: Any = None
self.fork: Callable[[], PluginFork]
self.connectRPCObject: Callable[[Any], asyncio.Task[Any]]
def sdk_init(z: ZipFile, r, sm: DeviceManager, dm: SystemManager, mm: MediaManager):
global zip

View File

@@ -2086,10 +2086,10 @@ export interface ScryptedStatic {
*/
connect?(socket: NodeNetSocket, options?: ConnectOptions): void;
/**
* Attempt to retrieve an IPC object by directly connecting to the plugin
* that owns the object. All operations on this object will bypass routing
* Attempt to retrieve an RPC object by directly connecting to the plugin
* that created the object. All operations on this object will bypass routing
* through the Scrypted Server which typically manages plugin communication.
* This is ideal for sending large amounts of data via IPC.
* This is ideal for sending large amounts of data.
*/
ipcObject?<T>(value: T): Promise<T>;
connectRPCObject?<T>(value: T): Promise<T>;
}

View File

@@ -10,7 +10,7 @@
"license": "ISC",
"dependencies": {
"@mapbox/node-pre-gyp": "^1.0.10",
"@scrypted/types": "^0.2.67",
"@scrypted/types": "^0.2.71",
"adm-zip": "^0.5.9",
"axios": "^0.21.4",
"body-parser": "^1.19.0",
@@ -127,9 +127,9 @@
}
},
"node_modules/@scrypted/types": {
"version": "0.2.67",
"resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.2.67.tgz",
"integrity": "sha512-+f+Sm19Jyt667yt/3w4UTiE6SfsUKaZHU+iqzUMC6bL5TOxUwcnBy+YqZur1TPg4xyCKCgjvZCOhHeLkHnBZkg=="
"version": "0.2.71",
"resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.2.71.tgz",
"integrity": "sha512-RRIf9g8YlaI8KrQT0IMOSM1DuPouj+uY3zx7n7QkGJky7pQ4Ktlccu/b3Vt2zi22s5LMaaNl5Z61/Yeg/v+7Tw=="
},
"node_modules/@tootallnate/once": {
"version": "1.1.2",
@@ -3100,9 +3100,9 @@
}
},
"@scrypted/types": {
"version": "0.2.67",
"resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.2.67.tgz",
"integrity": "sha512-+f+Sm19Jyt667yt/3w4UTiE6SfsUKaZHU+iqzUMC6bL5TOxUwcnBy+YqZur1TPg4xyCKCgjvZCOhHeLkHnBZkg=="
"version": "0.2.71",
"resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.2.71.tgz",
"integrity": "sha512-RRIf9g8YlaI8KrQT0IMOSM1DuPouj+uY3zx7n7QkGJky7pQ4Ktlccu/b3Vt2zi22s5LMaaNl5Z61/Yeg/v+7Tw=="
},
"@tootallnate/once": {
"version": "1.1.2",

View File

@@ -4,7 +4,7 @@
"description": "",
"dependencies": {
"@mapbox/node-pre-gyp": "^1.0.10",
"@scrypted/types": "^0.2.67",
"@scrypted/types": "^0.2.71",
"adm-zip": "^0.5.9",
"axios": "^0.21.4",
"body-parser": "^1.19.0",

View File

@@ -27,6 +27,7 @@ import rpc
import rpc_reader
import multiprocessing
import multiprocessing.connection
import hashlib
class SystemDeviceState(TypedDict):
lastEventTime: int
@@ -210,7 +211,8 @@ class PluginRemote:
loop: AbstractEventLoop
consoles: Mapping[str, Future[Tuple[StreamReader, StreamWriter]]] = {}
def __init__(self, api, pluginId, hostInfo, loop: AbstractEventLoop):
def __init__(self, peer: rpc.RpcPeer, api, pluginId, hostInfo, loop: AbstractEventLoop):
self.peer = peer
self.api = api
self.pluginId = pluginId
self.hostInfo = hostInfo
@@ -255,6 +257,87 @@ class PluginRemote:
nativeId, *values, sep=sep, end=end, flush=flush), self.loop)
async def loadZip(self, packageJson, zipData, options: dict=None):
sdk = ScryptedStatic()
clusterId = options['clusterId']
clusterSecret = options['clusterSecret']
async def handleClusterClient(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
peer: rpc.RpcPeer
peer, peerReadLoop = await rpc_reader.prepare_peer_readloop(self.loop, reader = reader, writer = writer)
async def connectRPCObject(id: str, secret: str):
m = hashlib.sha256()
m.update(bytes('%s%s' % (clusterPort, clusterSecret), 'utf8'))
portSecret = m.hexdigest()
if secret != portSecret:
raise Exception('secret incorrect')
return self.peer.localProxyMap.get(id, None)
peer.params['connectRPCObject'] = connectRPCObject
try:
await peerReadLoop()
except:
writer.close()
clusterRpcServer = await asyncio.start_server(handleClusterClient, '127.0.0.1')
clusterPort = clusterRpcServer.sockets[0].getsockname()[1]
clusterPeers: Mapping[int, asyncio.Future[rpc.RpcPeer]] = {}
async def connectRPCObject(value):
clusterObject = getattr(value, '__cluster')
if not clusterObject:
return value
if clusterObject.get('id', None) != clusterId:
return value
port = clusterObject['port']
proxyId = clusterObject['proxyId']
clusterPeerPromise = clusterPeers.get(port)
if not clusterPeerPromise:
async def connectClusterPeer():
reader, writer = await asyncio.open_connection(
'127.0.0.1', port)
peer, peerReadLoop = await rpc_reader.prepare_peer_readloop(self.loop, reader = reader, writer = writer)
async def run_loop():
try:
await peerReadLoop()
except:
clusterPeers.pop(port)
asyncio.run_coroutine_threadsafe(run_loop(), self.loop)
return peer
clusterPeerPromise = self.loop.create_task(connectClusterPeer())
clusterPeers[port] = clusterPeerPromise
try:
clusterPeer = await clusterPeerPromise
c = await clusterPeer.getParam('connectRPCObject')
m = hashlib.sha256()
m.update(bytes('%s%s' % (port, clusterSecret), 'utf8'))
portSecret = m.hexdigest()
newValue = await c(proxyId, portSecret)
if not newValue:
raise Exception('ipc object not found?')
return newValue
except Exception as e:
return value
sdk.connectRPCObject = connectRPCObject
def onProxySerialization(value: Any, proxyId: str):
properties: dict = rpc.RpcPeer.getProxyProperties(value)
if not properties or not properties.get('__cluster', None):
properties = properties or {}
properties['__cluster'] = {
'clusterId': clusterId,
'proxyId': proxyId,
'port': clusterPort,
}
rpc.RpcPeer.setProxyProperties(value, properties)
self.peer.onProxySerialization = onProxySerialization
forkMain = options and options.get('fork')
if not forkMain:
@@ -352,7 +435,6 @@ class PluginRemote:
try:
from scrypted_sdk import sdk_init2 # type: ignore
sdk = ScryptedStatic()
sdk.systemManager = self.systemManager
sdk.deviceManager = self.deviceManager
sdk.mediaManager = self.mediaManager
@@ -367,10 +449,10 @@ class PluginRemote:
pluginFork.worker.start()
async def getFork():
fd = os.dup(parent_conn.fileno())
peer, readLoop = await rpc_reader.prepare_peer_readloop(self.loop, fd, fd)
peer.peerName = 'thread'
forkPeer, readLoop = await rpc_reader.prepare_peer_readloop(self.loop, fd, fd)
forkPeer.peerName = 'thread'
asyncio.run_coroutine_threadsafe(readLoop(), loop=self.loop)
getRemote = await peer.getParam('getRemote')
getRemote = await forkPeer.getParam('getRemote')
remote: PluginRemote = await getRemote(self.api, self.pluginId, self.hostInfo)
await remote.setSystemState(self.systemManager.getSystemState())
for nativeId, ds in self.nativeIds.items():
@@ -379,12 +461,12 @@ class PluginRemote:
forkOptions['fork'] = True
forkOptions['filename'] = zipPath
return await remote.loadZip(packageJson, zipData, forkOptions)
pluginFork.result = asyncio.create_task(getFork())
return pluginFork
sdk.fork = host_fork
# sdk.
sdk_init2(sdk)
except:
@@ -457,8 +539,7 @@ class PluginRemote:
async def plugin_async_main(loop: AbstractEventLoop, readFd: int, writeFd: int):
peer, readLoop = await rpc_reader.prepare_peer_readloop(loop, readFd, writeFd)
peer.params['print'] = print
peer.params['getRemote'] = lambda api, pluginId, hostInfo: PluginRemote(
api, pluginId, hostInfo, loop)
peer.params['getRemote'] = lambda api, pluginId, hostInfo: PluginRemote(peer, api, pluginId, hostInfo, loop)
async def get_update_stats():
update_stats = await peer.getParam('updateStats')

View File

@@ -6,6 +6,11 @@ async def main():
peer, peerReadLoop = await prepare_peer_readloop(loop, 4, 3)
peer.params['foo'] = 3
reader, writer = await asyncio.open_connection(
'127.0.0.1', 6666)
writer.write(bytes('abcd', 'utf8'))
async def ticker(delay, to):
for i in range(to):
# print(i)

View File

@@ -100,6 +100,7 @@ class RpcPeer:
self.pendingResults: Mapping[str, Future] = {}
self.remoteWeakProxies: Mapping[str, any] = {}
self.nameDeserializerMap: Mapping[str, RpcSerializer] = {}
self.onProxySerialization: Callable[[Any, str], Any] = None
def __apply__(self, proxyId: str, oneWayMethods: List[str], method: str, args: list):
serializationContext: Dict = {}
@@ -165,7 +166,13 @@ class RpcPeer:
'__serialized_value': serialized,
}
def getProxyProperties(self, value):
def getProxyProperties(value):
return getattr(value, '__proxy_props', None)
def setProxyProperties(value, properties):
setattr(value, '__proxy_props', properties)
def prepareProxyProperties(value):
if not hasattr(value, '__aiter__') or not hasattr(value, '__anext__'):
return getattr(value, '__proxy_props', None)
@@ -195,7 +202,7 @@ class RpcPeer:
'__remote_proxy_id': proxiedEntry['id'],
'__remote_proxy_finalizer_id': proxiedEntry['finalizerId'],
'__remote_constructor_name': __remote_constructor_name,
'__remote_proxy_props': self.getProxyProperties(value),
'__remote_proxy_props': RpcPeer.prepareProxyProperties(value),
'__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None),
}
return ret
@@ -218,7 +225,7 @@ class RpcPeer:
'__remote_proxy_id': None,
'__remote_proxy_finalizer_id': None,
'__remote_constructor_name': __remote_constructor_name,
'__remote_proxy_props': self.getProxyProperties(value),
'__remote_proxy_props': RpcPeer.prepareProxyProperties(value),
'__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None),
'__serialized_value': serialized,
}
@@ -233,11 +240,14 @@ class RpcPeer:
self.localProxied[value] = proxiedEntry
self.localProxyMap[proxyId] = value
if self.onProxySerialization:
self.onProxySerialization(value, proxyId)
ret = {
'__remote_proxy_id': proxyId,
'__remote_proxy_finalizer_id': proxyId,
'__remote_constructor_name': __remote_constructor_name,
'__remote_proxy_props': self.getProxyProperties(value),
'__remote_proxy_props': RpcPeer.prepareProxyProperties(value),
'__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None),
}

View File

@@ -36,39 +36,52 @@ class SidebandBufferSerializer(rpc.RpcSerializer):
buffer = buffers.pop()
return buffer
async def readLoop(loop, peer: rpc.RpcPeer, reader):
async def readLoop(loop, peer: rpc.RpcPeer, reader: asyncio.StreamReader):
deserializationContext = {
'buffers': []
}
while True:
try:
lengthBytes = await reader.read(4)
typeBytes = await reader.read(1)
type = typeBytes[0]
length = int.from_bytes(lengthBytes, 'big')
data = await reader.read(length - 1)
lengthBytes = await reader.read(4)
typeBytes = await reader.read(1)
type = typeBytes[0]
length = int.from_bytes(lengthBytes, 'big')
data = await reader.read(length - 1)
if type == 1:
deserializationContext['buffers'].append(data)
continue
if type == 1:
deserializationContext['buffers'].append(data)
continue
message = json.loads(data)
asyncio.run_coroutine_threadsafe(
peer.handleMessage(message, deserializationContext), loop)
message = json.loads(data)
asyncio.run_coroutine_threadsafe(
peer.handleMessage(message, deserializationContext), loop)
deserializationContext = {
'buffers': []
}
except Exception as e:
print('read loop error: ' + peer.peerName, e)
sys.exit()
deserializationContext = {
'buffers': []
}
async def prepare_peer_readloop(loop: AbstractEventLoop, readFd: int, writeFd: int):
reader = await aiofiles.open(readFd, mode='rb')
async def prepare_peer_readloop(loop: AbstractEventLoop, readFd: int = None, writeFd: int = None, reader: asyncio.StreamReader = None, writer: asyncio.StreamWriter = None):
reader = reader or await aiofiles.open(readFd, mode='rb')
mutex = threading.Lock()
if writer:
def write(buffers, reject):
try:
for b in buffers:
writer.write(b)
except Exception as e:
if reject:
reject(e)
else:
def write(buffers, reject):
try:
for b in buffers:
os.write(writeFd, b)
except Exception as e:
if reject:
reject(e)
def send(message, reject=None, serializationContext=None):
with mutex:
if serializationContext:
@@ -78,27 +91,14 @@ async def prepare_peer_readloop(loop: AbstractEventLoop, readFd: int, writeFd: i
length = len(buffer) + 1
lb = length.to_bytes(4, 'big')
type = 1
try:
os.write(writeFd, lb)
os.write(writeFd, bytes([type]))
os.write(writeFd, buffer)
except Exception as e:
if reject:
reject(e)
return
write([lb, bytes([type]), buffer], reject)
jsonString = json.dumps(message)
b = bytes(jsonString, 'utf8')
length = len(b) + 1
lb = length.to_bytes(4, 'big')
type = 0
try:
os.write(writeFd, lb)
os.write(writeFd, bytes([type]))
os.write(writeFd, b)
except Exception as e:
if reject:
reject(e)
write([lb, bytes([type]), b], reject)
peer = rpc.RpcPeer(send)
peer.nameDeserializerMap['Buffer'] = SidebandBufferSerializer()

View File

@@ -17,6 +17,7 @@ import { attachPluginRemote, DeviceManagerImpl, PluginReader, setupPluginRemote
import { PluginStats, startStatsUpdater } from './plugin-remote-stats';
import { createREPLServer } from './plugin-repl';
import { NodeThreadWorker } from './runtime/node-thread-worker';
import crypto from 'crypto';
const { link } = require('linkfs');
const serverVersion = require('../../package.json').version;
@@ -79,13 +80,11 @@ export function startPluginRemote(pluginId: string, peerSend: (message: RpcMessa
async onLoadZip(scrypted: ScryptedStatic, params: any, packageJson: any, zipData: Buffer | string, zipOptions: PluginRemoteLoadZipOptions) {
const { clusterId, clusterSecret } = zipOptions;
const clusterRpcServer = net.createServer(client => {
console.error('connecting', peer.selfName)
const clusterPeer = createDuplexRpcPeer(peer.selfName, 'cluster-client', client, client);
clusterPeer.params['ipcObject'] = async (id: string, secret: string) => {
if (secret !== clusterSecret)
const portSecret = crypto.createHash('sha256').update(`${clusterPort}${clusterSecret}`).digest().toString('hex');
clusterPeer.params['connectRPCObject'] = async (id: string, secret: string) => {
if (secret !== portSecret)
throw new Error('secret incorrect');
// console.error('lmap', peer.localProxyMap)
// console.error('rmap', peer.remoteWeakProxies)
return peer.localProxyMap[id];
}
client.on('close', () => clusterPeer.kill('cluster socket closed'));
@@ -97,20 +96,21 @@ export function startPluginRemote(pluginId: string, peerSend: (message: RpcMessa
};
peer.onProxySerialization = (value, proxyId) => {
const properties = RpcPeer.getProxyProperies(value);
if (!properties?.__cluster)
const properties = RpcPeer.getProxyProperties(value);
if (!properties?.__cluster) {
RpcPeer.setProxyProperties(value, Object.assign(properties || {}, {
__cluster: {
...clusterEntry,
proxyId,
}
}));
}
}
const clusterPeers = new Map<number, Promise<RpcPeer>>();
scrypted.ipcObject = async (value: any) => {
scrypted.connectRPCObject = async (value: any) => {
const clusterObject = value?.__cluster;
if (clusterObject?.id !== zipOptions.clusterId)
if (clusterObject?.id !== clusterId)
return value;
const { port, proxyId } = clusterObject;
@@ -135,8 +135,9 @@ export function startPluginRemote(pluginId: string, peerSend: (message: RpcMessa
try {
const clusterPeer = await clusterPeerPromise;
const ipcObject = await clusterPeer.getParam('ipcObject');
const newValue = await ipcObject(proxyId, clusterSecret);
const connectRPCObject = await clusterPeer.getParam('connectRPCObject');
const portSecret = crypto.createHash('sha256').update(`${port}${clusterSecret}`).digest().toString('hex');
const newValue = await connectRPCObject(proxyId, portSecret);
if (!newValue)
throw new Error('ipc object not found?');
return newValue;

View File

@@ -347,10 +347,15 @@ export class RpcPeer {
value[RpcPeer.PROPERTY_PROXY_PROPERTIES] = properties;
}
static getProxyProperies(value: any) {
static getProxyProperties(value: any) {
return value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES];
}
static prepareProxyProperties(value: any) {
let props = value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES];
if (!value[Symbol.asyncIterator])
return value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES];
const props = value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES] || {};
return props;
props ||= {};
props[Symbol.asyncIterator.toString()] = {
next: 'next',
throw: 'throw',
@@ -568,7 +573,7 @@ export class RpcPeer {
__remote_proxy_id: proxiedEntry.id,
__remote_proxy_finalizer_id,
__remote_constructor_name,
__remote_proxy_props: RpcPeer.getProxyProperies(value),
__remote_proxy_props: RpcPeer.prepareProxyProperties(value),
__remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS],
}
return ret;
@@ -595,7 +600,7 @@ export class RpcPeer {
__remote_proxy_id: undefined,
__remote_proxy_finalizer_id: undefined,
__remote_constructor_name,
__remote_proxy_props: RpcPeer.getProxyProperies(value),
__remote_proxy_props: RpcPeer.prepareProxyProperties(value),
__remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS],
__serialized_value: serialized,
}
@@ -616,7 +621,7 @@ export class RpcPeer {
__remote_proxy_id,
__remote_proxy_finalizer_id: __remote_proxy_id,
__remote_constructor_name,
__remote_proxy_props: RpcPeer.getProxyProperies(value),
__remote_proxy_props: RpcPeer.prepareProxyProperties(value),
__remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS],
}

View File

@@ -3,8 +3,14 @@ import path from 'path';
import type { Readable, Writable } from "stream";
import { createDuplexRpcPeer } from '../src/rpc-serializer';
import assert from 'assert';
import net from 'net';
async function main() {
const server = net.createServer(client => {
console.log('got client');
client.on('data', b => console.log('data', b.toString()));
});
server.listen(6666);
const cp = child_process.spawn('python3', [path.join(__dirname, '../python/rpc-iterator-test.py')], {
stdio: ['pipe', 'inherit', 'inherit', 'pipe', 'pipe'],