from asyncio.futures import Future from typing import Callable, Mapping, List import traceback import inspect import weakref jsonSerializable = set() jsonSerializable.add(float) jsonSerializable.add(int) jsonSerializable.add(str) jsonSerializable.add(dict) jsonSerializable.add(bool) jsonSerializable.add(list) async def maybe_await(value): if (inspect.iscoroutinefunction(value) or inspect.iscoroutine(value)): return await value return value class RpcResultException(Exception): name = None stack = None def __init__(self, caught, message): self.caught = caught self.message = message class RpcSerializer: def serialize(self, value): pass def deserialize(self, value): pass class RpcProxyMethod: def __init__(self, proxy, name): self.__proxy = proxy self.__proxy_method_name = name def __call__(self, *args, **kwargs): return self.__proxy.__apply__(self.__proxy_method_name, args) class RpcProxy(object): def __init__(self, peer, proxyId: str, proxyConstructorName: str, proxyProps: any, proxyOneWayMethods: List[str]): self.__dict__['__proxy_id'] = proxyId self.__dict__['__proxy_constructor'] = proxyConstructorName self.__dict__['__proxy_peer'] = peer self.__dict__['__proxy_props'] = proxyProps self.__dict__['__proxy_oneway_methods'] = proxyOneWayMethods def __getattr__(self, name): if name in self.__dict__: return self.__dict__[name] if self.__dict__['__proxy_props'] and name in self.__dict__['__proxy_props']: return self.__dict__['__proxy_props'][name] return RpcProxyMethod(self, name) def __call__(self, *args, **kwargs): print('call') pass def __apply__(self, method: str, args: list): return self.__dict__['__proxy_peer'].__apply__(self.__dict__['__proxy_id'], self.__dict__['__proxy_oneway_methods'], method, args) class RpcPeer: # todo: these are all class statics lol, fix this. idCounter = 1 peerName = 'Unnamed Peer' params: Mapping[str, any] = {} localProxied: Mapping[any, str] = {} localProxyMap: Mapping[str, any] = {} constructorSerializerMap = {} proxyCounter = 1 pendingResults: Mapping[str, Future] = {} remoteWeakProxies: Mapping[str, any] = {} nameDeserializerMap: Mapping[str, RpcSerializer] = {} def __init__(self, send: Callable[[object, Callable[[Exception], None]], None]) -> None: self.send = send def __apply__(self, proxyId: str, oneWayMethods: List[str], method: str, args: list): serializedArgs = [] for arg in args: serializedArgs.append(self.serialize(arg, False)) rpcApply = { 'type': 'apply', 'id': None, 'proxyId': proxyId, 'args': serializedArgs, 'method': method, } if oneWayMethods and method in oneWayMethods: rpcApply['oneway'] = True self.send(rpcApply) future = Future() future.set_result(None) return future async def send(id: str, reject: Callable[[Exception], None]): rpcApply['id'] = id self.send(rpcApply, reject) return self.createPendingResult(send) def kill(self): self.killed = True def createErrorResult(self, result: any, name: str, message: str, tb: str): result['stack'] = tb if tb else 'no stack' result['result'] = name if name else 'no name' result['message'] = message if message else 'no message' def serialize(self, value, requireProxy): if (not value or (not requireProxy and type(value) in jsonSerializable)): return value __remote_constructor_name = 'Function' if callable(value) else value.__proxy_constructor if hasattr( value, '__proxy_constructor') else type(value).__name__ proxyId = self.localProxied.get(value, None) if proxyId: ret = { '__remote_proxy_id': proxyId, '__remote_constructor_name': __remote_constructor_name, '__remote_proxy_props': getattr(value, '__proxy_props', None), '__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None), } return ret __proxy_id = getattr(value, '__proxy_id', None) __proxy_peer = getattr(value, '__proxy_peer', None) if __proxy_id and __proxy_peer == self: ret = { '__local_proxy_id': __proxy_id, } return ret serializerMapName = self.constructorSerializerMap.get(type(value), None) if serializerMapName: __remote_constructor_name = serializerMapName serializer = self.nameDeserializerMap.get(serializerMapName, None) serialized = serializer.serialize(value) ret = { '__remote_proxy_id': None, '__remote_constructor_name': __remote_constructor_name, '__remote_proxy_props': getattr(value, '__proxy_props', None), '__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None), '__serialized_value': serialized, } return ret proxyId = str(self.proxyCounter) self.proxyCounter = self.proxyCounter + 1 self.localProxied[value] = proxyId self.localProxyMap[proxyId] = value ret = { '__remote_proxy_id': proxyId, '__remote_constructor_name': __remote_constructor_name, '__remote_proxy_props': getattr(value, '__proxy_props', None), '__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None), } return ret def finalize(self, id: str): self.remoteWeakProxies.pop(id, None) rpcFinalize = { '__local_proxy_id': id, 'type': 'finalize', } self.send(rpcFinalize) def newProxy(self, proxyId: str, proxyConstructorName: str, proxyProps: any, proxyOneWayMethods: List[str]): proxy = RpcProxy(self, proxyId, proxyConstructorName, proxyProps, proxyOneWayMethods) wr = weakref.ref(proxy) self.remoteWeakProxies[proxyId] = wr weakref.finalize(proxy, lambda: self.finalize(proxyId)) return proxy def deserialize(self, value): if not value: return value if type(value) != dict: return value __remote_proxy_id = value.get('__remote_proxy_id', None) __local_proxy_id = value.get('__local_proxy_id', None) __remote_constructor_name = value.get( '__remote_constructor_name', None) __serialized_value = value.get('__serialized_value', None) __remote_proxy_props = value.get('__remote_proxy_props', None) __remote_proxy_oneway_methods = value.get( '__remote_proxy_oneway_methods', None) if __remote_proxy_id: weakref = self.remoteWeakProxies.get('__remote_proxy_id', None) proxy = weakref() if weakref else None if not proxy: proxy = self.newProxy(__remote_proxy_id, __remote_constructor_name, __remote_proxy_props, __remote_proxy_oneway_methods) return proxy if __local_proxy_id: ret = self.localProxyMap.get(__local_proxy_id, None) if not ret: raise RpcResultException( None, 'invalid local proxy id %s' % __local_proxy_id) return ret deserializer = self.nameDeserializerMap.get( __remote_constructor_name, None) if deserializer: return deserializer.deserialize(__serialized_value) return value async def handleMessage(self, message: any): try: messageType = message['type'] if messageType == 'param': result = { 'type': 'result', 'id': message['id'], } try: value = self.params.get(message['param'], None) value = await maybe_await(value) result['result'] = self.serialize( value, message.get('requireProxy', None)) except Exception as e: tb = traceback.format_exc() self.createErrorResult( result, type(e).__name, str(e), tb) self.send(result) elif messageType == 'apply': result = { 'type': 'result', 'id': message.get('id', None), } method = message.get('method', None) try: target = self.localProxyMap.get( message['proxyId'], None) if not target: raise Exception('proxy id %s not found' % message['proxyId']) args = [] for arg in (message['args'] or []): args.append(self.deserialize(arg)) value = None if method: if not hasattr(target, method): raise Exception( 'target %s does not have method %s' % (type(target), method)) invoke = getattr(target, method) value = await maybe_await(invoke(*args)) else: value = await maybe_await(target(*args)) result['result'] = self.serialize(value, False) except Exception as e: print('failure', method, e) tb = traceback.format_exc() self.createErrorResult( result, type(e).__name__, str(e), tb) if not message.get('oneway', False): self.send(result) elif messageType == 'result': future = self.pendingResults.get(message['id'], None) if not future: raise RpcResultException( None, 'unknown result %s' % message['id']) del message['id'] if hasattr(message, 'message') or hasattr(message, 'stack'): e = RpcResultException( None, message.get('message', None)) e.stack = message.get('stack', None) e.name = message.get('name', None) future.set_exception(e) return future.set_result(self.deserialize( message.get('result', None))) elif messageType == 'finalize': local = self.localProxyMap.pop( message['__local_proxy_id'], None) self.localProxied.pop(local, None) else: raise RpcResultException( None, 'unknown rpc message type %s' % type) except Exception as e: print("unhandled rpc error", self.peerName, e) pass async def createPendingResult(self, cb: Callable[[str, Callable[[Exception], None]], None]): # if (Object.isFrozen(this.pendingResults)) # return Promise.reject(new RPCResultError('RpcPeer has been killed')); id = str(self.idCounter) self.idCounter = self.idCounter + 1 future = Future() self.pendingResults[id] = future await cb(id, lambda e: future.set_exception(RpcResultException(e, None))) return await future async def getParam(self, param): async def send(id: str, reject: Callable[[Exception], None]): paramMessage = { 'id': id, 'type': 'param', 'param': param, } self.send(paramMessage, reject) return await self.createPendingResult(send)