Files
scrypted/server/python/plugin-remote.py
2021-11-04 20:29:02 -07:00

131 lines
3.4 KiB
Python

from collections.abc import Mapping
from rpc import RpcPeer, readLoop, RpcSerializer
import asyncio
from asyncio.events import AbstractEventLoop
import json
import aiofiles
import os
from typing import TypedDict
import base64
import zipimport
class BufferSerializer(RpcSerializer):
def serialize(self, value):
return base64.b64encode(value)
def deserialize(self, value):
return base64.b64decode(value)
class SystemDeviceState(TypedDict):
lastEventTime: int
stateTime: int
value: any
class DeviceStorage:
id: str
nativeId: str
storage: Mapping[str, str] = {}
class PluginRemote:
systemState: Mapping[str, Mapping[str, SystemDeviceState]] = {}
nativeIds: Mapping[str, DeviceStorage] = {}
pluginId: str
def __init__(self, api, pluginId):
self.api = api
self.pluginId = pluginId
async def loadZip(self, packageJson, zipData, options=None):
zipPath = options['filename']
f = open(zipPath, 'wb')
f.write(zipData)
f.close()
z = zipimport.zipimporter(zipPath)
m = z.load_module('plugin')
from plugin import plugin_main
plugin_main()
async def setSystemState(self, state):
self.systemState = state
async def setNativeId(self, nativeId, id, storage):
if nativeId:
ds = DeviceStorage()
ds.id = id
ds.storage = storage
self.nativeIds[nativeId] = ds
else:
self.nativeIds.pop(nativeId, None)
async def updateDeviceState(self, id, state):
if not state:
self.systemState.pop(id, None)
else:
self.systemState[id] = state
async def notify(self, id, eventTime, eventInterface, property, value, changed=False):
if property:
state = None
if self.systemState:
state = self.systemState.get(id, None)
if not state:
print('state not found for %s' % id)
return
state[property] = value
# systemManager.events.notify(id, eventTime, eventInterface, property, value.value, changed);
else:
# systemManager.events.notify(id, eventTime, eventInterface, property, value, changed);
pass
async def ioEvent(self, id, event, message=None):
pass
async def createDeviceState(self, id, setState):
pass
async def getServicePort(self, name):
pass
async def async_main(loop: AbstractEventLoop):
reader = await aiofiles.open(3, mode='r')
# writer = open(4, 'r+')
def send(message, reject=None):
jsonString = json.dumps(message)
try:
os.write(4, bytes(jsonString + '\n', 'utf8'))
except Exception as e:
if reject:
reject(e)
peer = RpcPeer(send)
peer.nameDeserializerMap['Buffer'] = BufferSerializer()
peer.params['print'] = print
peer.params['getRemote'] = lambda api, pluginId: PluginRemote(
api, pluginId)
async def consoleTest():
console = await peer.getParam('console')
# await console.log('test', 'poops', 'peddeps')
await asyncio.gather(readLoop(loop, peer, reader), consoleTest())
print('done')
# print("line %s" % line)
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(async_main(loop))
loop.close()
if __name__ == "__main__":
main()