server: implement python listen + listenDevice (#1587)

* server: implement python listen + listenDevice

* fix unregister

* make functions synchronous
This commit is contained in:
Brett Jia
2024-09-23 12:34:10 -04:00
committed by GitHub
parent 1d659a5fb9
commit a4e484698d

View File

@@ -4,10 +4,12 @@ import asyncio
import base64
import gc
import hashlib
import inspect
import multiprocessing
import multiprocessing.connection
import os
import platform
import random
import sys
import threading
import time
@@ -18,7 +20,7 @@ from asyncio.futures import Future
from asyncio.streams import StreamReader, StreamWriter
from collections.abc import Mapping
from io import StringIO
from typing import Any, Optional, Set, Tuple, TypedDict
from typing import Any, Optional, Set, Tuple, TypedDict, Callable, Coroutine
import plugin_volume as pv
import rpc
@@ -57,6 +59,14 @@ class SystemDeviceState(TypedDict):
value: any
def ensure_not_coroutine(fn: Callable | Coroutine) -> Callable:
if inspect.iscoroutinefunction(fn):
def wrapper(*args, **kwargs):
return asyncio.create_task(fn(*args, **kwargs))
return wrapper
return fn
class DeviceProxy(object):
def __init__(self, systemManager: SystemManager, id: str):
self.systemManager = systemManager
@@ -97,6 +107,116 @@ class DeviceProxy(object):
return apply()
class EventListenerRegisterImpl(scrypted_python.scrypted_sdk.EventListenerRegister):
removeListener: Callable[[], None]
def __init__(self, removeListener: Callable[[], None] | Coroutine[Any, None, None]) -> None:
self.removeListener = ensure_not_coroutine(removeListener)
class EventRegistry(object):
systemListeners: Set[scrypted_python.scrypted_sdk.EventListener]
listeners: Mapping[str, Set[Callable[[scrypted_python.scrypted_sdk.EventDetails, Any], None]]]
__allowedEventInterfaces = set([
ScryptedInterface.ScryptedDevice.value,
'Logger',
'Storage'
])
def __init__(self) -> None:
self.systemListeners = set()
self.listeners = {}
def __getMixinEventName(self, options: str | scrypted_python.scrypted_sdk.EventListenerOptions) -> str:
mixinId = None
if type(options) == str:
event = options
else:
options = options or {}
event = options.get("event", None)
mixinId = options.get("mixinId", None)
if not event:
event = "undefined"
if not mixinId:
return event
return f"{event}-mixin-{mixinId}"
def __generateBase36Str(self) -> str:
alphabet = "0123456789abcdefghijklmnopqrstuvwxyz"
return "".join(random.choices(alphabet, k=10))
def listen(
self, callback: scrypted_python.scrypted_sdk.EventListener
) -> scrypted_python.scrypted_sdk.EventListenerRegister:
callback = ensure_not_coroutine(callback)
self.systemListeners.add(callback)
return EventListenerRegisterImpl(lambda: self.systemListeners.remove(callback))
def listenDevice(
self,
id: str,
options: str | scrypted_python.scrypted_sdk.EventListenerOptions,
callback: Callable[[scrypted_python.scrypted_sdk.EventDetails, Any], None],
) -> scrypted_python.scrypted_sdk.EventListenerRegister:
event = self.__getMixinEventName(options)
token = f"{id}#{event}"
events = self.listeners.get(token)
if not events:
events = set()
self.listeners[token] = events
callback = ensure_not_coroutine(callback)
self.listeners[id].add(callback)
return EventListenerRegisterImpl(lambda: self.listeners[id].remove(callback))
def notify(self, id: str, eventTime: int, eventInterface: str, property: str, value: Any, options: dict = None):
options = options or {}
changed = options.get("changed")
mixinId = options.get("mixinId")
# prevent property event noise
if property and not changed:
return False
eventDetails = {
"eventId": None,
"eventTime": eventTime,
"eventInterface": eventInterface,
"property": property,
"mixinId": mixinId,
}
return self.notifyEventDetails(id, eventDetails, value)
def notifyEventDetails(self, id: str, eventDetails: scrypted_python.scrypted_sdk.EventDetails, value: Any, eventInterface: str = None):
if not eventDetails.get("eventId"):
eventDetails["eventId"] = self.__generateBase36Str()
if not eventInterface:
eventInterface = eventDetails.get("eventInterface")
# system listeners only get state changes.
# there are many potentially noisy stateless events, like
# object detection and settings changes
if (eventDetails.get("property") and not eventDetails.get("mixinId")) or \
(eventInterface in EventRegistry.__allowedEventInterfaces):
for listener in self.systemListeners:
listener(id, eventDetails, value)
token = f"{id}#{eventInterface}"
listeners = self.listeners.get(token)
if listeners:
for listener in listeners:
listener(eventDetails, value)
token = f"{id}#undefined"
listeners = self.listeners.get(token)
if listeners:
for listener in listeners:
listener(eventDetails, value)
return True
class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager):
def __init__(
self, api: Any, systemState: Mapping[str, Mapping[str, SystemDeviceState]]
@@ -105,6 +225,7 @@ class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager):
self.api = api
self.systemState = systemState
self.deviceProxies: Mapping[str, DeviceProxy] = {}
self.events = EventRegistry()
async def getComponent(self, id: str) -> Any:
return await self.api.getComponent(id)
@@ -170,20 +291,34 @@ class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager):
if checkName.get("value", None) == name:
return self.getDeviceById(check)
# TODO
async def listen(
def listen(
self, callback: scrypted_python.scrypted_sdk.EventListener
) -> scrypted_python.scrypted_sdk.EventListenerRegister:
return super().listen(callback)
return self.events.listen(callback)
# TODO
async def listenDevice(
def listenDevice(
self,
id: str,
event: str | scrypted_python.scrypted_sdk.EventListenerOptions,
options: str | scrypted_python.scrypted_sdk.EventListenerOptions,
callback: scrypted_python.scrypted_sdk.EventListener,
) -> scrypted_python.scrypted_sdk.EventListenerRegister:
return super().listenDevice(id, event, callback)
callback = ensure_not_coroutine(callback)
if type(options) != str and options.get("watch"):
return self.events.listenDevice(
id, options,
lambda eventDetails, eventData: callback(self.getDeviceById(id), eventDetails, eventData)
)
register_fut = asyncio.ensure_future(
self.api.listenDevice(
id, options,
lambda eventDetails, eventData: callback(self.getDeviceById(id), eventDetails, eventData)
)
)
async def unregister():
register = await register_fut
await register.removeListener()
return EventListenerRegisterImpl(lambda: asyncio.ensure_future(unregister()))
async def removeDevice(self, id: str) -> None:
return await self.api.removeDevice(id)