From 19832c953753d18b3c748561ebfbedb5292c4347 Mon Sep 17 00:00:00 2001 From: Brett Jia Date: Tue, 4 Mar 2025 17:37:55 -0500 Subject: [PATCH] python: partial repl reimplementation (#1763) * python: partial repl reimplementation * make more readable? * document questionable design choices --- server/python/plugin_remote.py | 12 +- server/python/plugin_repl.py | 408 +++++++++------------------------ 2 files changed, 109 insertions(+), 311 deletions(-) diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index b9ee50cc5..2737d9171 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -42,12 +42,6 @@ SCRYPTED_REQUIREMENTS = """ ptpython wheel """.strip() -if sys.version_info.minor >= 13: - # telnetlib was removed in Python 3.13 - SCRYPTED_REQUIREMENTS = f""" -{SCRYPTED_REQUIREMENTS} -standard-telnetlib -""".strip() class SystemDeviceState(TypedDict): @@ -500,13 +494,13 @@ class MediaManager: files_path = os.getenv('SCRYPTED_PLUGIN_VOLUME') if not files_path: raise ValueError('SCRYPTED_PLUGIN_VOLUME env variable not set?') - + # Construct the path for the 'files' directory ret = Path(files_path) / 'files' - + # Ensure the directory exists await asyncio.to_thread(ret.mkdir, parents=True, exist_ok=True) - + # Return the constructed directory path as a string return str(ret) diff --git a/server/python/plugin_repl.py b/server/python/plugin_repl.py index 492e05214..10e135ac2 100644 --- a/server/python/plugin_repl.py +++ b/server/python/plugin_repl.py @@ -1,32 +1,28 @@ +# Copy this here before it gets populated with the rest of this file's code +base_globals = globals().copy() + import asyncio -import concurrent.futures -from functools import partial import inspect -import os import prompt_toolkit from prompt_toolkit import print_formatted_text from prompt_toolkit.application import Application import prompt_toolkit.application.current +from prompt_toolkit.application.current import create_app_session +from prompt_toolkit.data_structures import Size import prompt_toolkit.key_binding.key_processor -import prompt_toolkit.contrib.telnet.server -from prompt_toolkit.contrib.telnet.server import TelnetServer, TelnetConnection +from prompt_toolkit.input import create_pipe_input +from prompt_toolkit.output.vt100 import Vt100_Output from prompt_toolkit.output.color_depth import ColorDepth -from prompt_toolkit.shortcuts import clear_title, set_title -from ptpython.repl import embed, PythonRepl, _has_coroutine_flag +from ptpython.repl import embed, PythonRepl import ptpython.key_bindings import ptpython.python_input import ptpython.history_browser import ptpython.layout -import socket -import sys -import telnetlib -import threading -import traceback -import types from typing import List, Dict, Any from scrypted_python.scrypted_sdk import ScryptedStatic, ScryptedDevice +from cluster_setup import cluster_listen_zero from rpc import maybe_await @@ -38,175 +34,72 @@ ColorDepth.default = lambda *args, **kwargs: ColorDepth.DEPTH_4_BIT # that there is only one global Application, so multiple REPLs will confuse # the library. The patches here allow us to scope a particular call stack # to a particular REPL, and to get the current Application from the stack. -default_get_app = prompt_toolkit.application.current.get_app +def patch_prompt_toolkit(): + default_get_app = prompt_toolkit.application.current.get_app + + def get_app_patched() -> Application[Any]: + stack = inspect.stack() + for frame in stack: + self_var = frame.frame.f_locals.get("self") + if self_var is not None and isinstance(self_var, Application): + return self_var + return default_get_app() + + prompt_toolkit.application.current.get_app = get_app_patched + prompt_toolkit.key_binding.key_processor.get_app = get_app_patched + ptpython.python_input.get_app = get_app_patched + ptpython.key_bindings.get_app = get_app_patched + ptpython.history_browser.get_app = get_app_patched + ptpython.layout.get_app = get_app_patched +patch_prompt_toolkit() -def get_app_patched() -> Application[Any]: - stack = inspect.stack() - for frame in stack: - self_var = frame.frame.f_locals.get("self") - if self_var is not None and isinstance(self_var, Application): - return self_var - return default_get_app() - - -prompt_toolkit.application.current.get_app = get_app_patched -prompt_toolkit.key_binding.key_processor.get_app = get_app_patched -prompt_toolkit.contrib.telnet.server.get_app = get_app_patched -ptpython.python_input.get_app = get_app_patched -ptpython.key_bindings.get_app = get_app_patched -ptpython.history_browser.get_app = get_app_patched -ptpython.layout.get_app = get_app_patched - - -# This is a patched version of PythonRepl.run_async to handle an -# AssertionError raised by prompt_toolkit when the TelnetServer exits. -# Original: https://github.com/prompt-toolkit/ptpython/blob/3.0.26/ptpython/repl.py#L215 -async def run_async_patched(self: PythonRepl) -> None: - """ - Run the REPL loop, but run the blocking parts in an executor, so that - we don't block the event loop. Both the input and output (which can - display a pager) will run in a separate thread with their own event - loop, this way ptpython's own event loop won't interfere with the - asyncio event loop from where this is called. - - The "eval" however happens in the current thread, which is important. - (Both for control-C to work, as well as for the code to see the right - thread in which it was embedded). - """ - loop = asyncio.get_running_loop() - - if self.terminal_title: - set_title(self.terminal_title) - - self._add_to_namespace() - - try: - while True: - try: - # Read. - try: - text = await loop.run_in_executor(None, self.read) - except EOFError: - return - except asyncio.CancelledError: - return - except AssertionError: - return - except BaseException: - # Something went wrong while reading input. - # (E.g., a bug in the completer that propagates. Don't - # crash the REPL.) - traceback.print_exc() - continue - - # Eval. - await self.run_and_show_expression_async(text) - - except KeyboardInterrupt as e: - # XXX: This does not yet work properly. In some situations, - # `KeyboardInterrupt` exceptions can end up in the event - # loop selector. - self._handle_keyboard_interrupt(e) - except SystemExit: - return - finally: - if self.terminal_title: - clear_title() - self._remove_from_namespace() - - -# This is a patched version of PythonRepl.eval_async to cross thread -# boundaries when running user REPL instructions. -# Original: https://github.com/prompt-toolkit/ptpython/blob/3.0.26/ptpython/repl.py#L304 -async def eval_async_patched(self: PythonRepl, line: str) -> object: - """ - Evaluate the line and print the result. - """ - scrypted_loop: asyncio.AbstractEventLoop = self.scrypted_loop - - def task_done_cb(future: concurrent.futures.Future, task: asyncio.Task): - try: - result = task.result() - future.set_result(result) - except BaseException as e: - future.set_exception(e) - - def eval_in_scrypted(future: concurrent.futures.Future, code, *args, **kwargs): - try: - result = eval(code, *args, **kwargs) - if _has_coroutine_flag(code): - task = scrypted_loop.create_task(result) - task.add_done_callback(partial(task_done_cb, future)) - else: - future.set_result(result) - except BaseException as e: - future.set_exception(e) - - def eval_across_loops(code, *args, **kwargs): - future = concurrent.futures.Future() - scrypted_loop.call_soon_threadsafe( - partial(eval_in_scrypted, future), code, *args, **kwargs - ) - return future.result() - - # WORKAROUND: Due to a bug in Jedi, the current directory is removed - # from sys.path. See: https://github.com/davidhalter/jedi/issues/1148 - if "" not in sys.path: - sys.path.insert(0, "") - - if line.lstrip().startswith("!"): - # Run as shell command - os.system(line[1:]) - else: - # Try eval first - try: - code = self._compile_with_flags(line, "eval") - except SyntaxError: - pass - else: - # No syntax errors for eval. Do eval. - - result = eval_across_loops(code, self.get_globals(), self.get_locals()) - self._store_eval_result(result) - return result - - # If not a valid `eval` expression, compile as `exec` expression - # but still run with eval to get an awaitable in case of a - # awaitable expression. - code = self._compile_with_flags(line, "exec") - result = eval_across_loops(code, self.get_globals(), self.get_locals()) - - return None - - -def configure(scrypted_loop: asyncio.AbstractEventLoop, repl: PythonRepl) -> None: +def configure(repl: PythonRepl) -> None: repl.confirm_exit = False + repl.enable_open_in_editor = False repl.enable_system_bindings = False - repl.enable_mouse_support = False - repl.run_async = types.MethodType(run_async_patched, repl) - repl.eval_async = types.MethodType(eval_async_patched, repl) - repl.scrypted_loop = scrypted_loop + + +class AsyncStreamStdout: + """ + Wrapper around StreamReader and StreamWriter to provide `write` and `flush` + methods for Vt100_Output. + """ + + def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + self.reader = reader + self.writer = writer + self.loop = asyncio.get_event_loop() + + def write(self, data: bytes) -> None: + if isinstance(data, str): + data = data.encode() + self.writer.write(data) + + def flush(self) -> None: + self.loop.create_task(self.writer.drain()) + + def isatty(self) -> bool: + return True + + +# keep a reference to the server alive so it doesn't get garbage collected +repl_server = None async def createREPLServer(sdk: ScryptedStatic, plugin: ScryptedDevice) -> int: + global repl_server + + if repl_server is not None: + return repl_server["port"] + deviceManager = sdk.deviceManager systemManager = sdk.systemManager mediaManager = sdk.mediaManager - # Create the proxy server to handle initial control messages - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.settimeout(None) - # TODO: this should use an equivalent to cluster_listen_zero - sock.bind(("0.0.0.0" if os.getenv("SCRYPTED_CLUSTER_ADDRESS") else "127.0.0.1", 0)) - sock.listen() - - scrypted_loop: asyncio.AbstractEventLoop = asyncio.get_event_loop() - - async def start_telnet_repl(future: concurrent.futures.Future, filter: str) -> None: - repl_loop = asyncio.get_event_loop() - + async def on_repl_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + filter = await reader.read(1024) + filter = filter.decode() if filter == "undefined": filter = None @@ -224,141 +117,52 @@ async def createREPLServer(sdk: ScryptedStatic, plugin: ScryptedDevice) -> int: device = plugin for c in chain: device = await maybe_await(device.getDevice(c)) - realDevice = systemManager.getDeviceById(device.id) - # Select a free port for the telnet server - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.bind(("localhost", 0)) - telnet_port = s.getsockname()[1] - s.close() - - async def interact(connection: TelnetConnection) -> None: - # repl_loop owns the print capabilities, but the prints will - # be executed in scrypted_loop. We need to bridge the two here - repl_print = partial(print_formatted_text, output=connection.vt100_output) - - def print_across_loops(*args, **kwargs): - repl_loop.call_soon_threadsafe(repl_print, *args, **kwargs) - - global_dict = { - **globals(), - "print": print_across_loops, - "help": lambda *args, **kwargs: print_across_loops( - "Help is not available in this environment" - ), - "input": lambda *args, **kwargs: print_across_loops( - "Input is not available in this environment" - ), - } - locals_dict = { - "device": device, - "systemManager": systemManager, - "deviceManager": deviceManager, - "mediaManager": mediaManager, - "sdk": sdk, - "realDevice": realDevice, - } - vars_prompt = "\n".join([f" {k}" for k in locals_dict.keys()]) - banner = f"Python REPL variables:\n{vars_prompt}" - print_formatted_text(banner) - await embed( - return_asyncio_coroutine=True, - globals=global_dict, - locals=locals_dict, - configure=partial(configure, scrypted_loop), + with create_pipe_input() as vt100_input: + vt100_output = Vt100_Output( + AsyncStreamStdout(reader, writer), + lambda: Size(rows=24, columns=80), + term=None, ) - server_task: asyncio.Task = None + async def vt100_input_coro(): + while True: + data = await reader.read(1024) + if not data: + break + vt100_input.send_bytes(data) - def ready_cb(): - future.set_result( - ( - telnet_port, - lambda: repl_loop.call_soon_threadsafe(server_task.cancel), + asyncio.create_task(vt100_input_coro()) + + with create_app_session(input=vt100_input, output=vt100_output): + global_dict = { + **base_globals.copy(), + "print": print_formatted_text, + "help": lambda *args, **kwargs: print_formatted_text( + "Help is not available in this environment" + ), + "input": lambda *args, **kwargs: print_formatted_text( + "Input is not available in this environment" + ), + } + locals_dict = { + "device": device, + "systemManager": systemManager, + "deviceManager": deviceManager, + "mediaManager": mediaManager, + "sdk": sdk, + "realDevice": realDevice, + } + vars_prompt = "\n".join([f" {k}" for k in locals_dict.keys()]) + banner = f"Python REPL variables:\n{vars_prompt}" + print_formatted_text(banner) + await embed( + return_asyncio_coroutine=True, + globals=global_dict, + locals=locals_dict, + configure=configure, ) - ) - # Start the REPL server - telnet_server = TelnetServer( - interact=interact, port=telnet_port, enable_cpr=False - ) - server_task = asyncio.create_task(telnet_server.run(ready_cb=ready_cb)) - try: - await server_task - except: - pass - - def handle_connection(conn: socket.socket): - conn.settimeout(None) - filter = conn.recv(1024).decode() - server_started_future = concurrent.futures.Future() - repl_loop = asyncio.SelectorEventLoop() - - # we're not in the main loop, so can't handle any signals anyways - repl_loop.add_signal_handler = lambda sig, cb: None - repl_loop.remove_signal_handler = lambda sig: True - - def finish_setup(): - telnet_port, exit_server = server_started_future.result() - - telnet_client = telnetlib.Telnet("localhost", telnet_port, timeout=None) - - def telnet_negotiation_cb(telnet_socket, command, option): - pass # ignore telnet negotiation - - telnet_client.set_option_negotiation_callback(telnet_negotiation_cb) - - # initialize telnet terminal - # this tells the telnet server we are a vt100 terminal - telnet_client.get_socket().sendall( - b"\xff\xfb\x18\xff\xfa\x18\x00\x61\x6e\x73\x69\xff\xf0" - ) - telnet_client.get_socket().sendall(b"\r\n") - - # Bridge the connection to the telnet server, two way - def forward_to_telnet(): - while True: - data = conn.recv(1024) - if not data: - break - telnet_client.write(data) - telnet_client.close() - exit_server() - - def forward_to_socket(): - prompt_count = 0 - while True: - data = telnet_client.read_some() - if not data: - conn.sendall("REPL exited".encode()) - break - if b">>>" in data: - # This is an ugly hack - somewhere in ptpython, the - # initial prompt is being printed many times. Normal - # telnet clients handle it properly, but xtermjs doesn't - # like it. We just replace the first few with spaces - # so it's not too ugly. - prompt_count += 1 - if prompt_count < 5: - data = data.replace(b">>>", b" ") - conn.sendall(data) - conn.close() - exit_server() - - threading.Thread(target=forward_to_telnet).start() - threading.Thread(target=forward_to_socket).start() - - threading.Thread(target=finish_setup).start() - - repl_loop.run_until_complete(start_telnet_repl(server_started_future, filter)) - - def accept_connection(): - while True: - conn, addr = sock.accept() - threading.Thread(target=handle_connection, args=(conn,)).start() - - threading.Thread(target=accept_connection).start() - - proxy_port = sock.getsockname()[1] - return proxy_port + repl_server = await cluster_listen_zero(on_repl_client) + return repl_server["port"] \ No newline at end of file