From 3494106857a7be12c862547b89576cd45759265b Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Sat, 15 Jul 2023 08:52:24 -0700 Subject: [PATCH] python-client: fix message queues --- packages/python-client/test.py | 80 +++++++++++++++++++++++++--------- 1 file changed, 60 insertions(+), 20 deletions(-) diff --git a/packages/python-client/test.py b/packages/python-client/test.py index 4a314d74a..ce43a69ca 100644 --- a/packages/python-client/test.py +++ b/packages/python-client/test.py @@ -1,44 +1,68 @@ from __future__ import annotations import asyncio -from contextlib import nullcontext -import engineio import os +from contextlib import nullcontext + import aiohttp -import rpc_reader +import engineio + import plugin_remote -from plugin_remote import DeviceManager, SystemManager, MediaManager -from scrypted_python.scrypted_sdk import ScryptedStatic +import rpc_reader +from plugin_remote import DeviceManager, MediaManager, SystemManager +from scrypted_python.scrypted_sdk import ScryptedInterface, ScryptedStatic + class EioRpcTransport(rpc_reader.RpcTransport): - message_queue = asyncio.Queue() - def __init__(self, loop: asyncio.AbstractEventLoop): super().__init__() self.eio = engineio.AsyncClient(ssl_verify=False) self.loop = loop + self.write_error: Exception = None + self.read_queue = asyncio.Queue() + self.write_queue = asyncio.Queue() @self.eio.on("message") def on_message(data): - self.message_queue.put_nowait(data) + self.read_queue.put_nowait(data) + + asyncio.run_coroutine_threadsafe(self.send_loop(), self.loop) async def read(self): - return await self.message_queue.get() + return await self.read_queue.get() + + async def send_loop(self): + while True: + data = await self.write_queue.get() + try: + await self.eio.send(data) + except Exception as e: + self.write_error = e + self.write_queue = None + break def writeBuffer(self, buffer, reject): - self.writeBuffer(buffer, reject) - - def writeJSON(self, json, reject): async def send(): try: - await self.eio.send(json) + if self.write_error: + raise self.write_error + self.write_queue.put_nowait(buffer) except Exception as e: reject(e) + asyncio.run_coroutine_threadsafe(send(), self.loop) + def writeJSON(self, json, reject): + return self.writeBuffer(json, reject) + async def connect_scrypted_client( - transport: EioRpcTransport, base_url: str, username: str, password: str, plugin_id: str = "@scrypted/core", session: aiohttp.ClientSession | None = None + transport: EioRpcTransport, + base_url: str, + username: str, + password: str, + plugin_id: str = "@scrypted/core", + session: aiohttp.ClientSession | None = None, ) -> ScryptedStatic: login_url = f"{base_url}/login" login_body = { @@ -64,32 +88,45 @@ async def connect_scrypted_client( headers=headers, engineio_path=f"/endpoint/{plugin_id}/engine.io/api/", ) - + ret = asyncio.Future[ScryptedStatic](loop=transport.loop) - peer, peerReadLoop = await rpc_reader.prepare_peer_readloop(transport.loop, transport) - peer.params['print'] = print + peer, peerReadLoop = await rpc_reader.prepare_peer_readloop( + transport.loop, transport + ) + peer.params["print"] = print + def callback(api, pluginId, hostInfo): - remote = plugin_remote.PluginRemote(peer, api, pluginId, hostInfo, transport.loop) + remote = plugin_remote.PluginRemote( + peer, api, pluginId, hostInfo, transport.loop + ) wrapped = remote.setSystemState + async def remoteSetSystemState(systemState): await wrapped(systemState) + async def resolve(): sdk = ScryptedStatic() sdk.api = api sdk.remote = remote sdk.systemManager = SystemManager(api, remote.systemState) - sdk.deviceManager = DeviceManager(remote.nativeIds, sdk.systemManager) + sdk.deviceManager = DeviceManager( + remote.nativeIds, sdk.systemManager + ) sdk.mediaManager = MediaManager(await api.getMediaManager()) ret.set_result(sdk) + asyncio.run_coroutine_threadsafe(resolve(), transport.loop) + remote.setSystemState = remoteSetSystemState return remote - peer.params['getRemote'] = callback + + peer.params["getRemote"] = callback asyncio.run_coroutine_threadsafe(peerReadLoop(), transport.loop) sdk = await ret return sdk + async def main(): transport = EioRpcTransport(asyncio.get_event_loop()) sdk = await connect_scrypted_client( @@ -102,10 +139,13 @@ async def main(): for id in sdk.systemManager.getSystemState(): device = sdk.systemManager.getDeviceById(id) print(device.name) + if ScryptedInterface.OnOff.value in device.interfaces: + print(f"OnOff: device is {device.on}") await transport.eio.disconnect() os._exit(0) + loop = asyncio.new_event_loop() asyncio.run_coroutine_threadsafe(main(), loop) loop.run_forever()