rpc: fixup various async iterator bugs, add memoryview support to python

This commit is contained in:
Koushik Dutta
2023-03-11 19:38:43 -08:00
parent 4570f9cd38
commit 1e004d6700
6 changed files with 80 additions and 67 deletions

View File

@@ -46,13 +46,19 @@ class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager):
class MediaObject(scrypted_python.scrypted_sdk.types.MediaObject):
def __init__(self, data, mimeType, sourceId):
self.mimeType = mimeType
def __init__(self, data, mimeType, options):
self.data = data
setattr(self, '__proxy_props', {
'mimeType': mimeType,
'sourceId': sourceId,
})
proxyProps = {}
setattr(self, rpc.RpcPeer.PROPERTY_PROXY_PROPERTIES, proxyProps)
options = options or {}
options['mimeType'] = mimeType
for key, value in options.items():
if rpc.RpcPeer.isTransportSafe(value):
proxyProps[key] = value
setattr(self, key, value)
async def getData(self):
return self.data
@@ -91,9 +97,9 @@ class MediaManager:
async def createMediaObject(self, data: Any, mimeType: str, options: scrypted_python.scrypted_sdk.types.MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject:
# return await self.createMediaObject(data, mimetypes, options)
return MediaObject(data, mimeType, options.get('sourceId', None) if options else None)
return MediaObject(data, mimeType, options)
async def createMediaObjectFromUrl(self, data: str, options: scrypted_python.scrypted_sdk.types. MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject:
async def createMediaObjectFromUrl(self, data: str, options: scrypted_python.scrypted_sdk.types.MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject:
return await self.mediaManager.createMediaObjectFromUrl(data, options)
async def getFFmpegPath(self) -> str:

View File

@@ -59,7 +59,7 @@ class RpcProxy(object):
self.__dict__['__proxy_entry'] = entry
self.__dict__['__proxy_constructor'] = proxyConstructorName
self.__dict__['__proxy_peer'] = peer
self.__dict__['__proxy_props'] = proxyProps
self.__dict__[RpcPeer.PROPERTY_PROXY_PROPERTIES] = proxyProps
self.__dict__['__proxy_oneway_methods'] = proxyOneWayMethods
def __getattr__(self, name):
@@ -67,8 +67,8 @@ class RpcProxy(object):
return self.dict['__proxy_entry']['finalizerId']
if name in self.__dict__:
return self.__dict__[name]
if self.__dict__['__proxy_props'] and name in self.__dict__['__proxy_props']:
return self.__dict__['__proxy_props'][name]
if self.__dict__[RpcPeer.PROPERTY_PROXY_PROPERTIES] and name in self.__dict__[RpcPeer.PROPERTY_PROXY_PROPERTIES]:
return self.__dict__[RpcPeer.PROPERTY_PROXY_PROPERTIES][name]
return RpcProxyMethod(self, name)
def __setattr__(self, name: str, value: Any) -> None:
@@ -87,6 +87,7 @@ class RpcProxy(object):
class RpcPeer:
RPC_RESULT_ERROR_NAME = 'RPCResultError'
PROPERTY_PROXY_PROPERTIES = '__proxy_props'
def __init__(self, send: Callable[[object, Callable[[Exception], None], Dict], None]) -> None:
self.send = send
@@ -167,16 +168,16 @@ class RpcPeer:
}
# def getProxyProperties(value):
# return getattr(value, '__proxy_props', None)
# return getattr(value, RpcPeer.PROPERTY_PROXY_PROPERTIES, None)
# def setProxyProperties(value, properties):
# setattr(value, '__proxy_props', properties)
# setattr(value, RpcPeer.PROPERTY_PROXY_PROPERTIES, properties)
def prepareProxyProperties(value):
if not hasattr(value, '__aiter__') or not hasattr(value, '__anext__'):
return getattr(value, '__proxy_props', None)
return getattr(value, RpcPeer.PROPERTY_PROXY_PROPERTIES, None)
props = getattr(value, '__proxy_props', None) or {}
props = getattr(value, RpcPeer.PROPERTY_PROXY_PROPERTIES, None) or {}
if not props.get('Symbol(Symbol.asyncIterator)'):
props['Symbol(Symbol.asyncIterator)'] = {
'next': '__anext__',
@@ -198,6 +199,22 @@ class RpcPeer:
if isinstance(value, Exception):
return self.serializeError(value)
serializerMapName = self.constructorSerializerMap.get(
type(value), None)
if serializerMapName:
__remote_constructor_name = serializerMapName
serializer = self.nameDeserializerMap.get(serializerMapName, None)
serialized = serializer.serialize(value, serializationContext)
ret = {
'__remote_proxy_id': None,
'__remote_proxy_finalizer_id': None,
'__remote_constructor_name': __remote_constructor_name,
'__remote_proxy_props': RpcPeer.prepareProxyProperties(value),
'__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None),
'__serialized_value': serialized,
}
return ret
proxiedEntry = self.localProxied.get(value, None)
if proxiedEntry:
proxiedEntry['finalizerId'] = str(self.proxyCounter)
@@ -219,22 +236,6 @@ class RpcPeer:
}
return ret
serializerMapName = self.constructorSerializerMap.get(
type(value), None)
if serializerMapName:
__remote_constructor_name = serializerMapName
serializer = self.nameDeserializerMap.get(serializerMapName, None)
serialized = serializer.serialize(value, serializationContext)
ret = {
'__remote_proxy_id': None,
'__remote_proxy_finalizer_id': None,
'__remote_constructor_name': __remote_constructor_name,
'__remote_proxy_props': RpcPeer.prepareProxyProperties(value),
'__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None),
'__serialized_value': serialized,
}
return ret
proxyId = str(self.proxyCounter)
self.proxyCounter = self.proxyCounter + 1
proxiedEntry = {

View File

@@ -104,6 +104,7 @@ async def prepare_peer_readloop(loop: AbstractEventLoop, readFd: int = None, wri
peer.nameDeserializerMap['Buffer'] = SidebandBufferSerializer()
peer.constructorSerializerMap[bytes] = 'Buffer'
peer.constructorSerializerMap[bytearray] = 'Buffer'
peer.constructorSerializerMap[memoryview] = 'Buffer'
async def peerReadLoop():
await readLoop(loop, peer, reader)

View File

@@ -16,15 +16,13 @@ class MediaObject implements MediaObjectRemote {
__proxy_props: any;
constructor(public mimeType: string, public data: any, options: MediaObjectOptions) {
this.__proxy_props = {
mimeType,
}
if (options) {
for (const [key, value] of Object.entries(options)) {
if (RpcPeer.isTransportSafe(value))
this.__proxy_props[key] = value;
(this as any)[key] = value;
}
this.__proxy_props = {}
options ||= {};
options.mimeType = mimeType;
for (const [key, value] of Object.entries(options)) {
if (RpcPeer.isTransportSafe(value))
this.__proxy_props[key] = value;
(this as any)[key] = value;
}
}

View File

@@ -85,7 +85,7 @@ export function startPluginRemote(pluginId: string, peerSend: (message: RpcMessa
clusterPeer.params['connectRPCObject'] = async (id: string, secret: string) => {
if (secret !== portSecret)
throw new Error('secret incorrect');
return peer.localProxyMap[id];
return peer.localProxyMap.get(id);
}
client.on('close', () => clusterPeer.kill('cluster socket closed'));
})

View File

@@ -297,7 +297,7 @@ export class RpcPeer {
pendingResults: { [id: string]: Deferred } = {};
proxyCounter = 1;
localProxied = new Map<any, LocalProxiedEntry>();
localProxyMap: { [id: string]: any } = {};
localProxyMap = new Map<string, any>();
// @ts-ignore
remoteWeakProxies: { [id: string]: WeakRef<any> } = {};
// @ts-ignore
@@ -356,6 +356,13 @@ export class RpcPeer {
// return value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES];
// }
static getIteratorNext(target: any): string {
if (!target[Symbol.asyncIterator])
return;
const proxyProps = target[this.PROPERTY_PROXY_PROPERTIES]?.[Symbol.asyncIterator.toString()];
return proxyProps?.next || 'next';
}
static prepareProxyProperties(value: any) {
let props = value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES];
if (!value[Symbol.asyncIterator])
@@ -431,7 +438,7 @@ export class RpcPeer {
this.pendingResults = Object.freeze({});
this.params = Object.freeze({});
this.remoteWeakProxies = Object.freeze({});
this.localProxyMap = Object.freeze({});
this.localProxyMap.clear()
this.localProxied.clear();
}
@@ -526,7 +533,7 @@ export class RpcPeer {
}
if (__local_proxy_id) {
const ret = this.localProxyMap[__local_proxy_id];
const ret = this.localProxyMap.get(__local_proxy_id);
if (!ret)
throw new RPCResultError(this, `invalid local proxy id ${__local_proxy_id}`);
return ret;
@@ -580,6 +587,24 @@ export class RpcPeer {
if (value instanceof Error)
return this.serializeError(value);
const serializerMapName = this.constructorSerializerMap.get(value.constructor);
if (serializerMapName) {
__remote_constructor_name = serializerMapName;
const serializer = this.nameDeserializerMap.get(serializerMapName);
if (!serializer)
throw new Error('serializer not found for ' + serializerMapName);
const serialized = serializer.serialize(value, serializationContext);
const ret: RpcRemoteProxyValue = {
__remote_proxy_id: undefined,
__remote_proxy_finalizer_id: undefined,
__remote_constructor_name,
__remote_proxy_props: RpcPeer.prepareProxyProperties(value),
__remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS],
__serialized_value: serialized,
}
return ret;
}
let proxiedEntry = this.localProxied.get(value);
if (proxiedEntry) {
const __remote_proxy_finalizer_id = (this.proxyCounter++).toString();
@@ -604,31 +629,13 @@ export class RpcPeer {
this.onProxyTypeSerialization.get(__remote_constructor_name)?.(value);
const serializerMapName = this.constructorSerializerMap.get(value.constructor);
if (serializerMapName) {
__remote_constructor_name = serializerMapName;
const serializer = this.nameDeserializerMap.get(serializerMapName);
if (!serializer)
throw new Error('serializer not found for ' + serializerMapName);
const serialized = serializer.serialize(value, serializationContext);
const ret: RpcRemoteProxyValue = {
__remote_proxy_id: undefined,
__remote_proxy_finalizer_id: undefined,
__remote_constructor_name,
__remote_proxy_props: RpcPeer.prepareProxyProperties(value),
__remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS],
__serialized_value: serialized,
}
return ret;
}
const __remote_proxy_id = (this.proxyCounter++).toString();
proxiedEntry = {
id: __remote_proxy_id,
finalizerId: __remote_proxy_id,
};
this.localProxied.set(value, proxiedEntry);
this.localProxyMap[__remote_proxy_id] = value;
this.localProxyMap.set(__remote_proxy_id, value);
const __remote_proxy_props = this.onProxySerialization ? this.onProxySerialization(value, __remote_proxy_id) : RpcPeer.prepareProxyProperties(value);
@@ -696,7 +703,7 @@ export class RpcPeer {
const serializationContext: any = {};
try {
const target = this.localProxyMap[rpcApply.proxyId];
const target = this.localProxyMap.get(rpcApply.proxyId);
if (!target)
throw new Error(`proxy id ${rpcApply.proxyId} not found`);
@@ -712,7 +719,7 @@ export class RpcPeer {
throw new Error(`target ${target?.constructor?.name} does not have method ${rpcApply.method}`);
value = await target[rpcApply.method](...args);
if (target[Symbol.asyncIterator] && rpcApply.method === 'next') {
if (RpcPeer.getIteratorNext(target) === rpcApply.method) {
if (value.done) {
const errorType: ErrorType = {
name: 'StopAsyncIteration',
@@ -764,14 +771,14 @@ export class RpcPeer {
}
case 'finalize': {
const rpcFinalize = message as RpcFinalize;
const local = this.localProxyMap[rpcFinalize.__local_proxy_id];
const local = this.localProxyMap.get(rpcFinalize.__local_proxy_id);
if (local) {
const localProxiedEntry = this.localProxied.get(local);
// if a finalizer id is specified, it must match.
if (rpcFinalize.__local_proxy_finalizer_id && rpcFinalize.__local_proxy_finalizer_id !== localProxiedEntry?.finalizerId) {
break;
}
delete this.localProxyMap[rpcFinalize.__local_proxy_id];
this.localProxyMap.delete(rpcFinalize.__local_proxy_id);
this.localProxied.delete(local);
}
break;