From 1e004d6700123bc8b609a55bb239290249f70f69 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Sat, 11 Mar 2023 19:38:43 -0800 Subject: [PATCH] rpc: fixup various async iterator bugs, add memoryview support to python --- server/python/plugin_remote.py | 22 ++++++--- server/python/rpc.py | 47 +++++++++--------- server/python/rpc_reader.py | 1 + server/src/plugin/media.ts | 16 +++--- server/src/plugin/plugin-remote-worker.ts | 2 +- server/src/rpc.ts | 59 +++++++++++++---------- 6 files changed, 80 insertions(+), 67 deletions(-) diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index 88fdfd04c..0ddf6fdc3 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -46,13 +46,19 @@ class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager): class MediaObject(scrypted_python.scrypted_sdk.types.MediaObject): - def __init__(self, data, mimeType, sourceId): - self.mimeType = mimeType + def __init__(self, data, mimeType, options): self.data = data - setattr(self, '__proxy_props', { - 'mimeType': mimeType, - 'sourceId': sourceId, - }) + + proxyProps = {} + setattr(self, rpc.RpcPeer.PROPERTY_PROXY_PROPERTIES, proxyProps) + + options = options or {} + options['mimeType'] = mimeType + + for key, value in options.items(): + if rpc.RpcPeer.isTransportSafe(value): + proxyProps[key] = value + setattr(self, key, value) async def getData(self): return self.data @@ -91,9 +97,9 @@ class MediaManager: async def createMediaObject(self, data: Any, mimeType: str, options: scrypted_python.scrypted_sdk.types.MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject: # return await self.createMediaObject(data, mimetypes, options) - return MediaObject(data, mimeType, options.get('sourceId', None) if options else None) + return MediaObject(data, mimeType, options) - async def createMediaObjectFromUrl(self, data: str, options: scrypted_python.scrypted_sdk.types. MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject: + async def createMediaObjectFromUrl(self, data: str, options: scrypted_python.scrypted_sdk.types.MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject: return await self.mediaManager.createMediaObjectFromUrl(data, options) async def getFFmpegPath(self) -> str: diff --git a/server/python/rpc.py b/server/python/rpc.py index 0132f2511..83553e085 100644 --- a/server/python/rpc.py +++ b/server/python/rpc.py @@ -59,7 +59,7 @@ class RpcProxy(object): self.__dict__['__proxy_entry'] = entry self.__dict__['__proxy_constructor'] = proxyConstructorName self.__dict__['__proxy_peer'] = peer - self.__dict__['__proxy_props'] = proxyProps + self.__dict__[RpcPeer.PROPERTY_PROXY_PROPERTIES] = proxyProps self.__dict__['__proxy_oneway_methods'] = proxyOneWayMethods def __getattr__(self, name): @@ -67,8 +67,8 @@ class RpcProxy(object): return self.dict['__proxy_entry']['finalizerId'] if name in self.__dict__: return self.__dict__[name] - if self.__dict__['__proxy_props'] and name in self.__dict__['__proxy_props']: - return self.__dict__['__proxy_props'][name] + if self.__dict__[RpcPeer.PROPERTY_PROXY_PROPERTIES] and name in self.__dict__[RpcPeer.PROPERTY_PROXY_PROPERTIES]: + return self.__dict__[RpcPeer.PROPERTY_PROXY_PROPERTIES][name] return RpcProxyMethod(self, name) def __setattr__(self, name: str, value: Any) -> None: @@ -87,6 +87,7 @@ class RpcProxy(object): class RpcPeer: RPC_RESULT_ERROR_NAME = 'RPCResultError' + PROPERTY_PROXY_PROPERTIES = '__proxy_props' def __init__(self, send: Callable[[object, Callable[[Exception], None], Dict], None]) -> None: self.send = send @@ -167,16 +168,16 @@ class RpcPeer: } # def getProxyProperties(value): - # return getattr(value, '__proxy_props', None) + # return getattr(value, RpcPeer.PROPERTY_PROXY_PROPERTIES, None) # def setProxyProperties(value, properties): - # setattr(value, '__proxy_props', properties) + # setattr(value, RpcPeer.PROPERTY_PROXY_PROPERTIES, properties) def prepareProxyProperties(value): if not hasattr(value, '__aiter__') or not hasattr(value, '__anext__'): - return getattr(value, '__proxy_props', None) + return getattr(value, RpcPeer.PROPERTY_PROXY_PROPERTIES, None) - props = getattr(value, '__proxy_props', None) or {} + props = getattr(value, RpcPeer.PROPERTY_PROXY_PROPERTIES, None) or {} if not props.get('Symbol(Symbol.asyncIterator)'): props['Symbol(Symbol.asyncIterator)'] = { 'next': '__anext__', @@ -198,6 +199,22 @@ class RpcPeer: if isinstance(value, Exception): return self.serializeError(value) + serializerMapName = self.constructorSerializerMap.get( + type(value), None) + if serializerMapName: + __remote_constructor_name = serializerMapName + serializer = self.nameDeserializerMap.get(serializerMapName, None) + serialized = serializer.serialize(value, serializationContext) + ret = { + '__remote_proxy_id': None, + '__remote_proxy_finalizer_id': None, + '__remote_constructor_name': __remote_constructor_name, + '__remote_proxy_props': RpcPeer.prepareProxyProperties(value), + '__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None), + '__serialized_value': serialized, + } + return ret + proxiedEntry = self.localProxied.get(value, None) if proxiedEntry: proxiedEntry['finalizerId'] = str(self.proxyCounter) @@ -219,22 +236,6 @@ class RpcPeer: } return ret - serializerMapName = self.constructorSerializerMap.get( - type(value), None) - if serializerMapName: - __remote_constructor_name = serializerMapName - serializer = self.nameDeserializerMap.get(serializerMapName, None) - serialized = serializer.serialize(value, serializationContext) - ret = { - '__remote_proxy_id': None, - '__remote_proxy_finalizer_id': None, - '__remote_constructor_name': __remote_constructor_name, - '__remote_proxy_props': RpcPeer.prepareProxyProperties(value), - '__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None), - '__serialized_value': serialized, - } - return ret - proxyId = str(self.proxyCounter) self.proxyCounter = self.proxyCounter + 1 proxiedEntry = { diff --git a/server/python/rpc_reader.py b/server/python/rpc_reader.py index 5dd58d527..af4366836 100644 --- a/server/python/rpc_reader.py +++ b/server/python/rpc_reader.py @@ -104,6 +104,7 @@ async def prepare_peer_readloop(loop: AbstractEventLoop, readFd: int = None, wri peer.nameDeserializerMap['Buffer'] = SidebandBufferSerializer() peer.constructorSerializerMap[bytes] = 'Buffer' peer.constructorSerializerMap[bytearray] = 'Buffer' + peer.constructorSerializerMap[memoryview] = 'Buffer' async def peerReadLoop(): await readLoop(loop, peer, reader) diff --git a/server/src/plugin/media.ts b/server/src/plugin/media.ts index a2192cb68..8e36fe884 100644 --- a/server/src/plugin/media.ts +++ b/server/src/plugin/media.ts @@ -16,15 +16,13 @@ class MediaObject implements MediaObjectRemote { __proxy_props: any; constructor(public mimeType: string, public data: any, options: MediaObjectOptions) { - this.__proxy_props = { - mimeType, - } - if (options) { - for (const [key, value] of Object.entries(options)) { - if (RpcPeer.isTransportSafe(value)) - this.__proxy_props[key] = value; - (this as any)[key] = value; - } + this.__proxy_props = {} + options ||= {}; + options.mimeType = mimeType; + for (const [key, value] of Object.entries(options)) { + if (RpcPeer.isTransportSafe(value)) + this.__proxy_props[key] = value; + (this as any)[key] = value; } } diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index a081b2879..c800e7943 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -85,7 +85,7 @@ export function startPluginRemote(pluginId: string, peerSend: (message: RpcMessa clusterPeer.params['connectRPCObject'] = async (id: string, secret: string) => { if (secret !== portSecret) throw new Error('secret incorrect'); - return peer.localProxyMap[id]; + return peer.localProxyMap.get(id); } client.on('close', () => clusterPeer.kill('cluster socket closed')); }) diff --git a/server/src/rpc.ts b/server/src/rpc.ts index 871b88f26..60014ff86 100644 --- a/server/src/rpc.ts +++ b/server/src/rpc.ts @@ -297,7 +297,7 @@ export class RpcPeer { pendingResults: { [id: string]: Deferred } = {}; proxyCounter = 1; localProxied = new Map(); - localProxyMap: { [id: string]: any } = {}; + localProxyMap = new Map(); // @ts-ignore remoteWeakProxies: { [id: string]: WeakRef } = {}; // @ts-ignore @@ -356,6 +356,13 @@ export class RpcPeer { // return value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES]; // } + static getIteratorNext(target: any): string { + if (!target[Symbol.asyncIterator]) + return; + const proxyProps = target[this.PROPERTY_PROXY_PROPERTIES]?.[Symbol.asyncIterator.toString()]; + return proxyProps?.next || 'next'; + } + static prepareProxyProperties(value: any) { let props = value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES]; if (!value[Symbol.asyncIterator]) @@ -431,7 +438,7 @@ export class RpcPeer { this.pendingResults = Object.freeze({}); this.params = Object.freeze({}); this.remoteWeakProxies = Object.freeze({}); - this.localProxyMap = Object.freeze({}); + this.localProxyMap.clear() this.localProxied.clear(); } @@ -526,7 +533,7 @@ export class RpcPeer { } if (__local_proxy_id) { - const ret = this.localProxyMap[__local_proxy_id]; + const ret = this.localProxyMap.get(__local_proxy_id); if (!ret) throw new RPCResultError(this, `invalid local proxy id ${__local_proxy_id}`); return ret; @@ -580,6 +587,24 @@ export class RpcPeer { if (value instanceof Error) return this.serializeError(value); + const serializerMapName = this.constructorSerializerMap.get(value.constructor); + if (serializerMapName) { + __remote_constructor_name = serializerMapName; + const serializer = this.nameDeserializerMap.get(serializerMapName); + if (!serializer) + throw new Error('serializer not found for ' + serializerMapName); + const serialized = serializer.serialize(value, serializationContext); + const ret: RpcRemoteProxyValue = { + __remote_proxy_id: undefined, + __remote_proxy_finalizer_id: undefined, + __remote_constructor_name, + __remote_proxy_props: RpcPeer.prepareProxyProperties(value), + __remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS], + __serialized_value: serialized, + } + return ret; + } + let proxiedEntry = this.localProxied.get(value); if (proxiedEntry) { const __remote_proxy_finalizer_id = (this.proxyCounter++).toString(); @@ -604,31 +629,13 @@ export class RpcPeer { this.onProxyTypeSerialization.get(__remote_constructor_name)?.(value); - const serializerMapName = this.constructorSerializerMap.get(value.constructor); - if (serializerMapName) { - __remote_constructor_name = serializerMapName; - const serializer = this.nameDeserializerMap.get(serializerMapName); - if (!serializer) - throw new Error('serializer not found for ' + serializerMapName); - const serialized = serializer.serialize(value, serializationContext); - const ret: RpcRemoteProxyValue = { - __remote_proxy_id: undefined, - __remote_proxy_finalizer_id: undefined, - __remote_constructor_name, - __remote_proxy_props: RpcPeer.prepareProxyProperties(value), - __remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS], - __serialized_value: serialized, - } - return ret; - } - const __remote_proxy_id = (this.proxyCounter++).toString(); proxiedEntry = { id: __remote_proxy_id, finalizerId: __remote_proxy_id, }; this.localProxied.set(value, proxiedEntry); - this.localProxyMap[__remote_proxy_id] = value; + this.localProxyMap.set(__remote_proxy_id, value); const __remote_proxy_props = this.onProxySerialization ? this.onProxySerialization(value, __remote_proxy_id) : RpcPeer.prepareProxyProperties(value); @@ -696,7 +703,7 @@ export class RpcPeer { const serializationContext: any = {}; try { - const target = this.localProxyMap[rpcApply.proxyId]; + const target = this.localProxyMap.get(rpcApply.proxyId); if (!target) throw new Error(`proxy id ${rpcApply.proxyId} not found`); @@ -712,7 +719,7 @@ export class RpcPeer { throw new Error(`target ${target?.constructor?.name} does not have method ${rpcApply.method}`); value = await target[rpcApply.method](...args); - if (target[Symbol.asyncIterator] && rpcApply.method === 'next') { + if (RpcPeer.getIteratorNext(target) === rpcApply.method) { if (value.done) { const errorType: ErrorType = { name: 'StopAsyncIteration', @@ -764,14 +771,14 @@ export class RpcPeer { } case 'finalize': { const rpcFinalize = message as RpcFinalize; - const local = this.localProxyMap[rpcFinalize.__local_proxy_id]; + const local = this.localProxyMap.get(rpcFinalize.__local_proxy_id); if (local) { const localProxiedEntry = this.localProxied.get(local); // if a finalizer id is specified, it must match. if (rpcFinalize.__local_proxy_finalizer_id && rpcFinalize.__local_proxy_finalizer_id !== localProxiedEntry?.finalizerId) { break; } - delete this.localProxyMap[rpcFinalize.__local_proxy_id]; + this.localProxyMap.delete(rpcFinalize.__local_proxy_id); this.localProxied.delete(local); } break;