python-client: fix message queues

This commit is contained in:
Koushik Dutta
2023-07-15 08:52:24 -07:00
parent 7d3dfb16f0
commit 3494106857

View File

@@ -1,44 +1,68 @@
from __future__ import annotations
import asyncio
from contextlib import nullcontext
import engineio
import os
from contextlib import nullcontext
import aiohttp
import rpc_reader
import engineio
import plugin_remote
from plugin_remote import DeviceManager, SystemManager, MediaManager
from scrypted_python.scrypted_sdk import ScryptedStatic
import rpc_reader
from plugin_remote import DeviceManager, MediaManager, SystemManager
from scrypted_python.scrypted_sdk import ScryptedInterface, ScryptedStatic
class EioRpcTransport(rpc_reader.RpcTransport):
message_queue = asyncio.Queue()
def __init__(self, loop: asyncio.AbstractEventLoop):
super().__init__()
self.eio = engineio.AsyncClient(ssl_verify=False)
self.loop = loop
self.write_error: Exception = None
self.read_queue = asyncio.Queue()
self.write_queue = asyncio.Queue()
@self.eio.on("message")
def on_message(data):
self.message_queue.put_nowait(data)
self.read_queue.put_nowait(data)
asyncio.run_coroutine_threadsafe(self.send_loop(), self.loop)
async def read(self):
return await self.message_queue.get()
return await self.read_queue.get()
async def send_loop(self):
while True:
data = await self.write_queue.get()
try:
await self.eio.send(data)
except Exception as e:
self.write_error = e
self.write_queue = None
break
def writeBuffer(self, buffer, reject):
self.writeBuffer(buffer, reject)
def writeJSON(self, json, reject):
async def send():
try:
await self.eio.send(json)
if self.write_error:
raise self.write_error
self.write_queue.put_nowait(buffer)
except Exception as e:
reject(e)
asyncio.run_coroutine_threadsafe(send(), self.loop)
def writeJSON(self, json, reject):
return self.writeBuffer(json, reject)
async def connect_scrypted_client(
transport: EioRpcTransport, base_url: str, username: str, password: str, plugin_id: str = "@scrypted/core", session: aiohttp.ClientSession | None = None
transport: EioRpcTransport,
base_url: str,
username: str,
password: str,
plugin_id: str = "@scrypted/core",
session: aiohttp.ClientSession | None = None,
) -> ScryptedStatic:
login_url = f"{base_url}/login"
login_body = {
@@ -64,32 +88,45 @@ async def connect_scrypted_client(
headers=headers,
engineio_path=f"/endpoint/{plugin_id}/engine.io/api/",
)
ret = asyncio.Future[ScryptedStatic](loop=transport.loop)
peer, peerReadLoop = await rpc_reader.prepare_peer_readloop(transport.loop, transport)
peer.params['print'] = print
peer, peerReadLoop = await rpc_reader.prepare_peer_readloop(
transport.loop, transport
)
peer.params["print"] = print
def callback(api, pluginId, hostInfo):
remote = plugin_remote.PluginRemote(peer, api, pluginId, hostInfo, transport.loop)
remote = plugin_remote.PluginRemote(
peer, api, pluginId, hostInfo, transport.loop
)
wrapped = remote.setSystemState
async def remoteSetSystemState(systemState):
await wrapped(systemState)
async def resolve():
sdk = ScryptedStatic()
sdk.api = api
sdk.remote = remote
sdk.systemManager = SystemManager(api, remote.systemState)
sdk.deviceManager = DeviceManager(remote.nativeIds, sdk.systemManager)
sdk.deviceManager = DeviceManager(
remote.nativeIds, sdk.systemManager
)
sdk.mediaManager = MediaManager(await api.getMediaManager())
ret.set_result(sdk)
asyncio.run_coroutine_threadsafe(resolve(), transport.loop)
remote.setSystemState = remoteSetSystemState
return remote
peer.params['getRemote'] = callback
peer.params["getRemote"] = callback
asyncio.run_coroutine_threadsafe(peerReadLoop(), transport.loop)
sdk = await ret
return sdk
async def main():
transport = EioRpcTransport(asyncio.get_event_loop())
sdk = await connect_scrypted_client(
@@ -102,10 +139,13 @@ async def main():
for id in sdk.systemManager.getSystemState():
device = sdk.systemManager.getDeviceById(id)
print(device.name)
if ScryptedInterface.OnOff.value in device.interfaces:
print(f"OnOff: device is {device.on}")
await transport.eio.disconnect()
os._exit(0)
loop = asyncio.new_event_loop()
asyncio.run_coroutine_threadsafe(main(), loop)
loop.run_forever()