From a4e484698d9881e8487cfb7860a4503b1878ec9e Mon Sep 17 00:00:00 2001 From: Brett Jia Date: Mon, 23 Sep 2024 12:34:10 -0400 Subject: [PATCH] server: implement python listen + listenDevice (#1587) * server: implement python listen + listenDevice * fix unregister * make functions synchronous --- server/python/plugin_remote.py | 151 +++++++++++++++++++++++++++++++-- 1 file changed, 143 insertions(+), 8 deletions(-) diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index f99f246c2..fc52ff2dc 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -4,10 +4,12 @@ import asyncio import base64 import gc import hashlib +import inspect import multiprocessing import multiprocessing.connection import os import platform +import random import sys import threading import time @@ -18,7 +20,7 @@ from asyncio.futures import Future from asyncio.streams import StreamReader, StreamWriter from collections.abc import Mapping from io import StringIO -from typing import Any, Optional, Set, Tuple, TypedDict +from typing import Any, Optional, Set, Tuple, TypedDict, Callable, Coroutine import plugin_volume as pv import rpc @@ -57,6 +59,14 @@ class SystemDeviceState(TypedDict): value: any +def ensure_not_coroutine(fn: Callable | Coroutine) -> Callable: + if inspect.iscoroutinefunction(fn): + def wrapper(*args, **kwargs): + return asyncio.create_task(fn(*args, **kwargs)) + return wrapper + return fn + + class DeviceProxy(object): def __init__(self, systemManager: SystemManager, id: str): self.systemManager = systemManager @@ -97,6 +107,116 @@ class DeviceProxy(object): return apply() +class EventListenerRegisterImpl(scrypted_python.scrypted_sdk.EventListenerRegister): + removeListener: Callable[[], None] + + def __init__(self, removeListener: Callable[[], None] | Coroutine[Any, None, None]) -> None: + self.removeListener = ensure_not_coroutine(removeListener) + + +class EventRegistry(object): + systemListeners: Set[scrypted_python.scrypted_sdk.EventListener] + listeners: Mapping[str, Set[Callable[[scrypted_python.scrypted_sdk.EventDetails, Any], None]]] + + __allowedEventInterfaces = set([ + ScryptedInterface.ScryptedDevice.value, + 'Logger', + 'Storage' + ]) + + def __init__(self) -> None: + self.systemListeners = set() + self.listeners = {} + + def __getMixinEventName(self, options: str | scrypted_python.scrypted_sdk.EventListenerOptions) -> str: + mixinId = None + if type(options) == str: + event = options + else: + options = options or {} + event = options.get("event", None) + mixinId = options.get("mixinId", None) + if not event: + event = "undefined" + if not mixinId: + return event + return f"{event}-mixin-{mixinId}" + + def __generateBase36Str(self) -> str: + alphabet = "0123456789abcdefghijklmnopqrstuvwxyz" + return "".join(random.choices(alphabet, k=10)) + + def listen( + self, callback: scrypted_python.scrypted_sdk.EventListener + ) -> scrypted_python.scrypted_sdk.EventListenerRegister: + callback = ensure_not_coroutine(callback) + self.systemListeners.add(callback) + return EventListenerRegisterImpl(lambda: self.systemListeners.remove(callback)) + + def listenDevice( + self, + id: str, + options: str | scrypted_python.scrypted_sdk.EventListenerOptions, + callback: Callable[[scrypted_python.scrypted_sdk.EventDetails, Any], None], + ) -> scrypted_python.scrypted_sdk.EventListenerRegister: + event = self.__getMixinEventName(options) + token = f"{id}#{event}" + events = self.listeners.get(token) + if not events: + events = set() + self.listeners[token] = events + callback = ensure_not_coroutine(callback) + self.listeners[id].add(callback) + return EventListenerRegisterImpl(lambda: self.listeners[id].remove(callback)) + + def notify(self, id: str, eventTime: int, eventInterface: str, property: str, value: Any, options: dict = None): + options = options or {} + changed = options.get("changed") + mixinId = options.get("mixinId") + + # prevent property event noise + if property and not changed: + return False + + eventDetails = { + "eventId": None, + "eventTime": eventTime, + "eventInterface": eventInterface, + "property": property, + "mixinId": mixinId, + } + + return self.notifyEventDetails(id, eventDetails, value) + + def notifyEventDetails(self, id: str, eventDetails: scrypted_python.scrypted_sdk.EventDetails, value: Any, eventInterface: str = None): + if not eventDetails.get("eventId"): + eventDetails["eventId"] = self.__generateBase36Str() + if not eventInterface: + eventInterface = eventDetails.get("eventInterface") + + # system listeners only get state changes. + # there are many potentially noisy stateless events, like + # object detection and settings changes + if (eventDetails.get("property") and not eventDetails.get("mixinId")) or \ + (eventInterface in EventRegistry.__allowedEventInterfaces): + for listener in self.systemListeners: + listener(id, eventDetails, value) + + token = f"{id}#{eventInterface}" + listeners = self.listeners.get(token) + if listeners: + for listener in listeners: + listener(eventDetails, value) + + token = f"{id}#undefined" + listeners = self.listeners.get(token) + if listeners: + for listener in listeners: + listener(eventDetails, value) + + return True + + class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager): def __init__( self, api: Any, systemState: Mapping[str, Mapping[str, SystemDeviceState]] @@ -105,6 +225,7 @@ class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager): self.api = api self.systemState = systemState self.deviceProxies: Mapping[str, DeviceProxy] = {} + self.events = EventRegistry() async def getComponent(self, id: str) -> Any: return await self.api.getComponent(id) @@ -170,20 +291,34 @@ class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager): if checkName.get("value", None) == name: return self.getDeviceById(check) - # TODO - async def listen( + def listen( self, callback: scrypted_python.scrypted_sdk.EventListener ) -> scrypted_python.scrypted_sdk.EventListenerRegister: - return super().listen(callback) + return self.events.listen(callback) - # TODO - async def listenDevice( + def listenDevice( self, id: str, - event: str | scrypted_python.scrypted_sdk.EventListenerOptions, + options: str | scrypted_python.scrypted_sdk.EventListenerOptions, callback: scrypted_python.scrypted_sdk.EventListener, ) -> scrypted_python.scrypted_sdk.EventListenerRegister: - return super().listenDevice(id, event, callback) + callback = ensure_not_coroutine(callback) + if type(options) != str and options.get("watch"): + return self.events.listenDevice( + id, options, + lambda eventDetails, eventData: callback(self.getDeviceById(id), eventDetails, eventData) + ) + + register_fut = asyncio.ensure_future( + self.api.listenDevice( + id, options, + lambda eventDetails, eventData: callback(self.getDeviceById(id), eventDetails, eventData) + ) + ) + async def unregister(): + register = await register_fut + await register.removeListener() + return EventListenerRegisterImpl(lambda: asyncio.ensure_future(unregister())) async def removeDevice(self, id: str) -> None: return await self.api.removeDevice(id)