mirror of
https://github.com/koush/scrypted.git
synced 2026-02-03 14:13:28 +00:00
* python: partial repl reimplementation * make more readable? * document questionable design choices
168 lines
5.9 KiB
Python
168 lines
5.9 KiB
Python
# Copy this here before it gets populated with the rest of this file's code
|
|
base_globals = globals().copy()
|
|
|
|
import asyncio
|
|
import inspect
|
|
import prompt_toolkit
|
|
from prompt_toolkit import print_formatted_text
|
|
from prompt_toolkit.application import Application
|
|
import prompt_toolkit.application.current
|
|
from prompt_toolkit.application.current import create_app_session
|
|
from prompt_toolkit.data_structures import Size
|
|
import prompt_toolkit.key_binding.key_processor
|
|
from prompt_toolkit.input import create_pipe_input
|
|
from prompt_toolkit.output.vt100 import Vt100_Output
|
|
from prompt_toolkit.output.color_depth import ColorDepth
|
|
from ptpython.repl import embed, PythonRepl
|
|
import ptpython.key_bindings
|
|
import ptpython.python_input
|
|
import ptpython.history_browser
|
|
import ptpython.layout
|
|
from typing import List, Dict, Any
|
|
|
|
from scrypted_python.scrypted_sdk import ScryptedStatic, ScryptedDevice
|
|
|
|
from cluster_setup import cluster_listen_zero
|
|
from rpc import maybe_await
|
|
|
|
|
|
# Our client is xtermjs, so no need to perform any color depth detection
|
|
ColorDepth.default = lambda *args, **kwargs: ColorDepth.DEPTH_4_BIT
|
|
|
|
|
|
# This section is a bit of a hack - prompt_toolkit has many assumptions
|
|
# that there is only one global Application, so multiple REPLs will confuse
|
|
# the library. The patches here allow us to scope a particular call stack
|
|
# to a particular REPL, and to get the current Application from the stack.
|
|
def patch_prompt_toolkit():
|
|
default_get_app = prompt_toolkit.application.current.get_app
|
|
|
|
def get_app_patched() -> Application[Any]:
|
|
stack = inspect.stack()
|
|
for frame in stack:
|
|
self_var = frame.frame.f_locals.get("self")
|
|
if self_var is not None and isinstance(self_var, Application):
|
|
return self_var
|
|
return default_get_app()
|
|
|
|
prompt_toolkit.application.current.get_app = get_app_patched
|
|
prompt_toolkit.key_binding.key_processor.get_app = get_app_patched
|
|
ptpython.python_input.get_app = get_app_patched
|
|
ptpython.key_bindings.get_app = get_app_patched
|
|
ptpython.history_browser.get_app = get_app_patched
|
|
ptpython.layout.get_app = get_app_patched
|
|
patch_prompt_toolkit()
|
|
|
|
|
|
def configure(repl: PythonRepl) -> None:
|
|
repl.confirm_exit = False
|
|
repl.enable_open_in_editor = False
|
|
repl.enable_system_bindings = False
|
|
|
|
|
|
class AsyncStreamStdout:
|
|
"""
|
|
Wrapper around StreamReader and StreamWriter to provide `write` and `flush`
|
|
methods for Vt100_Output.
|
|
"""
|
|
|
|
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
|
self.reader = reader
|
|
self.writer = writer
|
|
self.loop = asyncio.get_event_loop()
|
|
|
|
def write(self, data: bytes) -> None:
|
|
if isinstance(data, str):
|
|
data = data.encode()
|
|
self.writer.write(data)
|
|
|
|
def flush(self) -> None:
|
|
self.loop.create_task(self.writer.drain())
|
|
|
|
def isatty(self) -> bool:
|
|
return True
|
|
|
|
|
|
# keep a reference to the server alive so it doesn't get garbage collected
|
|
repl_server = None
|
|
|
|
|
|
async def createREPLServer(sdk: ScryptedStatic, plugin: ScryptedDevice) -> int:
|
|
global repl_server
|
|
|
|
if repl_server is not None:
|
|
return repl_server["port"]
|
|
|
|
deviceManager = sdk.deviceManager
|
|
systemManager = sdk.systemManager
|
|
mediaManager = sdk.mediaManager
|
|
|
|
async def on_repl_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
|
filter = await reader.read(1024)
|
|
filter = filter.decode()
|
|
if filter == "undefined":
|
|
filter = None
|
|
|
|
chain: List[str] = []
|
|
nativeIds: Dict[str, Any] = deviceManager.nativeIds
|
|
reversed: Dict[str, str] = {v.id: k for k, v in nativeIds.items()}
|
|
|
|
while filter is not None:
|
|
id = nativeIds.get(filter).id
|
|
d = systemManager.getDeviceById(id)
|
|
chain.append(filter)
|
|
filter = reversed.get(d.providerId)
|
|
|
|
chain.reverse()
|
|
device = plugin
|
|
for c in chain:
|
|
device = await maybe_await(device.getDevice(c))
|
|
realDevice = systemManager.getDeviceById(device.id)
|
|
|
|
with create_pipe_input() as vt100_input:
|
|
vt100_output = Vt100_Output(
|
|
AsyncStreamStdout(reader, writer),
|
|
lambda: Size(rows=24, columns=80),
|
|
term=None,
|
|
)
|
|
|
|
async def vt100_input_coro():
|
|
while True:
|
|
data = await reader.read(1024)
|
|
if not data:
|
|
break
|
|
vt100_input.send_bytes(data)
|
|
|
|
asyncio.create_task(vt100_input_coro())
|
|
|
|
with create_app_session(input=vt100_input, output=vt100_output):
|
|
global_dict = {
|
|
**base_globals.copy(),
|
|
"print": print_formatted_text,
|
|
"help": lambda *args, **kwargs: print_formatted_text(
|
|
"Help is not available in this environment"
|
|
),
|
|
"input": lambda *args, **kwargs: print_formatted_text(
|
|
"Input is not available in this environment"
|
|
),
|
|
}
|
|
locals_dict = {
|
|
"device": device,
|
|
"systemManager": systemManager,
|
|
"deviceManager": deviceManager,
|
|
"mediaManager": mediaManager,
|
|
"sdk": sdk,
|
|
"realDevice": realDevice,
|
|
}
|
|
vars_prompt = "\n".join([f" {k}" for k in locals_dict.keys()])
|
|
banner = f"Python REPL variables:\n{vars_prompt}"
|
|
print_formatted_text(banner)
|
|
await embed(
|
|
return_asyncio_coroutine=True,
|
|
globals=global_dict,
|
|
locals=locals_dict,
|
|
configure=configure,
|
|
)
|
|
|
|
repl_server = await cluster_listen_zero(on_repl_client)
|
|
return repl_server["port"] |