from asyncio.futures import Future from typing import Any, Callable, Mapping, List import traceback import inspect from typing_extensions import TypedDict import weakref import sys jsonSerializable = set() jsonSerializable.add(float) jsonSerializable.add(int) jsonSerializable.add(str) jsonSerializable.add(dict) jsonSerializable.add(bool) jsonSerializable.add(list) async def maybe_await(value): if (inspect.iscoroutinefunction(value) or inspect.iscoroutine(value)): return await value return value class RpcResultException(Exception): name = None stack = None def __init__(self, caught, message): self.caught = caught self.message = message class RpcSerializer: def serialize(self, value): pass def deserialize(self, value): pass class RpcProxyMethod: def __init__(self, proxy, name): self.__proxy = proxy self.__proxy_method_name = name def __call__(self, *args, **kwargs): return self.__proxy.__apply__(self.__proxy_method_name, args) class LocalProxiedEntry(TypedDict): id: str finalizerId: str class RpcProxy(object): def __init__(self, peer, entry: LocalProxiedEntry, proxyConstructorName: str, proxyProps: any, proxyOneWayMethods: List[str]): self.__dict__['__proxy_id'] = entry['id'] self.__dict__['__proxy_entry'] = entry self.__dict__['__proxy_constructor'] = proxyConstructorName self.__dict__['__proxy_peer'] = peer self.__dict__['__proxy_props'] = proxyProps self.__dict__['__proxy_oneway_methods'] = proxyOneWayMethods def __getattr__(self, name): if name == '__proxy_finalizer_id': return self.dict['__proxy_entry']['finalizerId'] if name in self.__dict__: return self.__dict__[name] if self.__dict__['__proxy_props'] and name in self.__dict__['__proxy_props']: return self.__dict__['__proxy_props'][name] return RpcProxyMethod(self, name) def __setattr__(self, name: str, value: Any) -> None: if name == '__proxy_finalizer_id': self.dict['__proxy_entry']['finalizerId'] = value return super().__setattr__(name, value) def __call__(self, *args, **kwargs): print('call') pass def __apply__(self, method: str, args: list): return self.__dict__['__proxy_peer'].__apply__(self.__dict__['__proxy_id'], self.__dict__['__proxy_oneway_methods'], method, args) class RpcPeer: def __init__(self, send: Callable[[object, Callable[[Exception], None]], None]) -> None: self.send = send self.idCounter = 1 self.peerName = 'Unnamed Peer' self.params: Mapping[str, any] = {} self.localProxied: Mapping[any, LocalProxiedEntry] = {} self.localProxyMap: Mapping[str, any] = {} self.constructorSerializerMap = {} self.proxyCounter = 1 self.pendingResults: Mapping[str, Future] = {} self.remoteWeakProxies: Mapping[str, any] = {} self.nameDeserializerMap: Mapping[str, RpcSerializer] = {} def __apply__(self, proxyId: str, oneWayMethods: List[str], method: str, args: list): serializedArgs = [] for arg in args: serializedArgs.append(self.serialize(arg, False)) rpcApply = { 'type': 'apply', 'id': None, 'proxyId': proxyId, 'args': serializedArgs, 'method': method, } if oneWayMethods and method in oneWayMethods: rpcApply['oneway'] = True self.send(rpcApply) future = Future() future.set_result(None) return future async def send(id: str, reject: Callable[[Exception], None]): rpcApply['id'] = id self.send(rpcApply, reject) return self.createPendingResult(send) def kill(self): self.killed = True def createErrorResult(self, result: any, name: str, message: str, tb: str): result['stack'] = tb if tb else 'no stack' result['result'] = name if name else 'no name' result['message'] = message if message else 'no message' def serialize(self, value, requireProxy): if (not value or (not requireProxy and type(value) in jsonSerializable)): return value __remote_constructor_name = 'Function' if callable(value) else value.__proxy_constructor if hasattr( value, '__proxy_constructor') else type(value).__name__ proxiedEntry = self.localProxied.get(value, None) if proxiedEntry: proxiedEntry['finalizerId'] = str(self.proxyCounter) self.proxyCounter = self.proxyCounter + 1 ret = { '__remote_proxy_id': proxiedEntry['id'], '__remote_proxy_finalizer_id': proxiedEntry['finalizerId'], '__remote_constructor_name': __remote_constructor_name, '__remote_proxy_props': getattr(value, '__proxy_props', None), '__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None), } return ret __proxy_id = getattr(value, '__proxy_id', None) __proxy_peer = getattr(value, '__proxy_peer', None) if __proxy_id and __proxy_peer == self: ret = { '__local_proxy_id': __proxy_id, } return ret serializerMapName = self.constructorSerializerMap.get( type(value), None) if serializerMapName: __remote_constructor_name = serializerMapName serializer = self.nameDeserializerMap.get(serializerMapName, None) serialized = serializer.serialize(value) ret = { '__remote_proxy_id': None, '__remote_proxy_finalizer_id': None, '__remote_constructor_name': __remote_constructor_name, '__remote_proxy_props': getattr(value, '__proxy_props', None), '__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None), '__serialized_value': serialized, } return ret proxyId = str(self.proxyCounter) self.proxyCounter = self.proxyCounter + 1 proxiedEntry = { 'id': proxyId, 'finalizerId': proxyId, } self.localProxied[value] = proxiedEntry self.localProxyMap[proxyId] = value ret = { '__remote_proxy_id': proxyId, '__remote_proxy_finalizer_id': proxyId, '__remote_constructor_name': __remote_constructor_name, '__remote_proxy_props': getattr(value, '__proxy_props', None), '__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None), } return ret def finalize(self, localProxiedEntry: LocalProxiedEntry): id = localProxiedEntry['id'] self.remoteWeakProxies.pop(id, None) rpcFinalize = { '__local_proxy_id': id, '__local_proxy_finalizer_id': localProxiedEntry['finalizerId'], 'type': 'finalize', } self.send(rpcFinalize) def newProxy(self, proxyId: str, proxyConstructorName: str, proxyProps: any, proxyOneWayMethods: List[str]): localProxiedEntry: LocalProxiedEntry = { 'id': proxyId, 'finalizerId': None, } proxy = RpcProxy(self, localProxiedEntry, proxyConstructorName, proxyProps, proxyOneWayMethods) wr = weakref.ref(proxy) self.remoteWeakProxies[proxyId] = wr weakref.finalize(proxy, lambda: self.finalize(localProxiedEntry)) return proxy def deserialize(self, value): if not value: return value if type(value) != dict: return value __remote_proxy_id = value.get('__remote_proxy_id', None) __remote_proxy_finalizer_id = value.get( '__remote_proxy_finalizer_id', None) __local_proxy_id = value.get('__local_proxy_id', None) __remote_constructor_name = value.get( '__remote_constructor_name', None) __serialized_value = value.get('__serialized_value', None) __remote_proxy_props = value.get('__remote_proxy_props', None) __remote_proxy_oneway_methods = value.get( '__remote_proxy_oneway_methods', None) if __remote_proxy_id: weakref = self.remoteWeakProxies.get('__remote_proxy_id', None) proxy = weakref() if weakref else None if not proxy: proxy = self.newProxy(__remote_proxy_id, __remote_constructor_name, __remote_proxy_props, __remote_proxy_oneway_methods) proxy.__proxy_finalizer_id = __remote_proxy_finalizer_id return proxy if __local_proxy_id: ret = self.localProxyMap.get(__local_proxy_id, None) if not ret: raise RpcResultException( None, 'invalid local proxy id %s' % __local_proxy_id) return ret deserializer = self.nameDeserializerMap.get( __remote_constructor_name, None) if deserializer: return deserializer.deserialize(__serialized_value) return value async def handleMessage(self, message: any): try: messageType = message['type'] if messageType == 'param': result = { 'type': 'result', 'id': message['id'], } try: value = self.params.get(message['param'], None) value = await maybe_await(value) result['result'] = self.serialize( value, message.get('requireProxy', None)) except Exception as e: tb = traceback.format_exc() self.createErrorResult( result, type(e).__name, str(e), tb) self.send(result) elif messageType == 'apply': result = { 'type': 'result', 'id': message.get('id', None), } method = message.get('method', None) try: target = self.localProxyMap.get( message['proxyId'], None) if not target: raise Exception('proxy id %s not found' % message['proxyId']) args = [] for arg in (message['args'] or []): args.append(self.deserialize(arg)) value = None if method: if not hasattr(target, method): raise Exception( 'target %s does not have method %s' % (type(target), method)) invoke = getattr(target, method) value = await maybe_await(invoke(*args)) else: value = await maybe_await(target(*args)) result['result'] = self.serialize(value, False) except Exception as e: tb = traceback.format_exc() print('failure', method, e, tb) self.createErrorResult( result, type(e).__name__, str(e), tb) if not message.get('oneway', False): self.send(result) elif messageType == 'result': future = self.pendingResults.get(message['id'], None) if not future: raise RpcResultException( None, 'unknown result %s' % message['id']) del message['id'] if hasattr(message, 'message') or hasattr(message, 'stack'): e = RpcResultException( None, message.get('message', None)) e.stack = message.get('stack', None) e.name = message.get('name', None) future.set_exception(e) return future.set_result(self.deserialize( message.get('result', None))) elif messageType == 'finalize': finalizerId = message.get('__local_proxy_finalizer_id', None) proxyId = message['__local_proxy_id'] local = self.localProxyMap.get(proxyId, None) if local: localProxiedEntry = self.localProxied.get(local) if localProxiedEntry and finalizerId and localProxiedEntry['finalizerId'] != finalizerId: print('mismatch finalizer id', file=sys.stderr) return self.localProxied.pop(local, None) local = self.localProxyMap.pop(proxyId, None) else: raise RpcResultException( None, 'unknown rpc message type %s' % type) except Exception as e: print("unhandled rpc error", self.peerName, e) pass async def createPendingResult(self, cb: Callable[[str, Callable[[Exception], None]], None]): # if (Object.isFrozen(this.pendingResults)) # return Promise.reject(new RPCResultError('RpcPeer has been killed')); id = str(self.idCounter) self.idCounter = self.idCounter + 1 future = Future() self.pendingResults[id] = future await cb(id, lambda e: future.set_exception(RpcResultException(e, None))) return await future async def getParam(self, param): async def send(id: str, reject: Callable[[Exception], None]): paramMessage = { 'id': id, 'type': 'param', 'param': param, } self.send(paramMessage, reject) return await self.createPendingResult(send)