python: partial repl reimplementation (#1763)

* python: partial repl reimplementation

* make more readable?

* document questionable design choices
This commit is contained in:
Brett Jia
2025-03-04 17:37:55 -05:00
committed by GitHub
parent 0b24e57262
commit 19832c9537
2 changed files with 109 additions and 311 deletions

View File

@@ -42,12 +42,6 @@ SCRYPTED_REQUIREMENTS = """
ptpython
wheel
""".strip()
if sys.version_info.minor >= 13:
# telnetlib was removed in Python 3.13
SCRYPTED_REQUIREMENTS = f"""
{SCRYPTED_REQUIREMENTS}
standard-telnetlib
""".strip()
class SystemDeviceState(TypedDict):
@@ -500,13 +494,13 @@ class MediaManager:
files_path = os.getenv('SCRYPTED_PLUGIN_VOLUME')
if not files_path:
raise ValueError('SCRYPTED_PLUGIN_VOLUME env variable not set?')
# Construct the path for the 'files' directory
ret = Path(files_path) / 'files'
# Ensure the directory exists
await asyncio.to_thread(ret.mkdir, parents=True, exist_ok=True)
# Return the constructed directory path as a string
return str(ret)

View File

@@ -1,32 +1,28 @@
# Copy this here before it gets populated with the rest of this file's code
base_globals = globals().copy()
import asyncio
import concurrent.futures
from functools import partial
import inspect
import os
import prompt_toolkit
from prompt_toolkit import print_formatted_text
from prompt_toolkit.application import Application
import prompt_toolkit.application.current
from prompt_toolkit.application.current import create_app_session
from prompt_toolkit.data_structures import Size
import prompt_toolkit.key_binding.key_processor
import prompt_toolkit.contrib.telnet.server
from prompt_toolkit.contrib.telnet.server import TelnetServer, TelnetConnection
from prompt_toolkit.input import create_pipe_input
from prompt_toolkit.output.vt100 import Vt100_Output
from prompt_toolkit.output.color_depth import ColorDepth
from prompt_toolkit.shortcuts import clear_title, set_title
from ptpython.repl import embed, PythonRepl, _has_coroutine_flag
from ptpython.repl import embed, PythonRepl
import ptpython.key_bindings
import ptpython.python_input
import ptpython.history_browser
import ptpython.layout
import socket
import sys
import telnetlib
import threading
import traceback
import types
from typing import List, Dict, Any
from scrypted_python.scrypted_sdk import ScryptedStatic, ScryptedDevice
from cluster_setup import cluster_listen_zero
from rpc import maybe_await
@@ -38,175 +34,72 @@ ColorDepth.default = lambda *args, **kwargs: ColorDepth.DEPTH_4_BIT
# that there is only one global Application, so multiple REPLs will confuse
# the library. The patches here allow us to scope a particular call stack
# to a particular REPL, and to get the current Application from the stack.
default_get_app = prompt_toolkit.application.current.get_app
def patch_prompt_toolkit():
default_get_app = prompt_toolkit.application.current.get_app
def get_app_patched() -> Application[Any]:
stack = inspect.stack()
for frame in stack:
self_var = frame.frame.f_locals.get("self")
if self_var is not None and isinstance(self_var, Application):
return self_var
return default_get_app()
prompt_toolkit.application.current.get_app = get_app_patched
prompt_toolkit.key_binding.key_processor.get_app = get_app_patched
ptpython.python_input.get_app = get_app_patched
ptpython.key_bindings.get_app = get_app_patched
ptpython.history_browser.get_app = get_app_patched
ptpython.layout.get_app = get_app_patched
patch_prompt_toolkit()
def get_app_patched() -> Application[Any]:
stack = inspect.stack()
for frame in stack:
self_var = frame.frame.f_locals.get("self")
if self_var is not None and isinstance(self_var, Application):
return self_var
return default_get_app()
prompt_toolkit.application.current.get_app = get_app_patched
prompt_toolkit.key_binding.key_processor.get_app = get_app_patched
prompt_toolkit.contrib.telnet.server.get_app = get_app_patched
ptpython.python_input.get_app = get_app_patched
ptpython.key_bindings.get_app = get_app_patched
ptpython.history_browser.get_app = get_app_patched
ptpython.layout.get_app = get_app_patched
# This is a patched version of PythonRepl.run_async to handle an
# AssertionError raised by prompt_toolkit when the TelnetServer exits.
# Original: https://github.com/prompt-toolkit/ptpython/blob/3.0.26/ptpython/repl.py#L215
async def run_async_patched(self: PythonRepl) -> None:
"""
Run the REPL loop, but run the blocking parts in an executor, so that
we don't block the event loop. Both the input and output (which can
display a pager) will run in a separate thread with their own event
loop, this way ptpython's own event loop won't interfere with the
asyncio event loop from where this is called.
The "eval" however happens in the current thread, which is important.
(Both for control-C to work, as well as for the code to see the right
thread in which it was embedded).
"""
loop = asyncio.get_running_loop()
if self.terminal_title:
set_title(self.terminal_title)
self._add_to_namespace()
try:
while True:
try:
# Read.
try:
text = await loop.run_in_executor(None, self.read)
except EOFError:
return
except asyncio.CancelledError:
return
except AssertionError:
return
except BaseException:
# Something went wrong while reading input.
# (E.g., a bug in the completer that propagates. Don't
# crash the REPL.)
traceback.print_exc()
continue
# Eval.
await self.run_and_show_expression_async(text)
except KeyboardInterrupt as e:
# XXX: This does not yet work properly. In some situations,
# `KeyboardInterrupt` exceptions can end up in the event
# loop selector.
self._handle_keyboard_interrupt(e)
except SystemExit:
return
finally:
if self.terminal_title:
clear_title()
self._remove_from_namespace()
# This is a patched version of PythonRepl.eval_async to cross thread
# boundaries when running user REPL instructions.
# Original: https://github.com/prompt-toolkit/ptpython/blob/3.0.26/ptpython/repl.py#L304
async def eval_async_patched(self: PythonRepl, line: str) -> object:
"""
Evaluate the line and print the result.
"""
scrypted_loop: asyncio.AbstractEventLoop = self.scrypted_loop
def task_done_cb(future: concurrent.futures.Future, task: asyncio.Task):
try:
result = task.result()
future.set_result(result)
except BaseException as e:
future.set_exception(e)
def eval_in_scrypted(future: concurrent.futures.Future, code, *args, **kwargs):
try:
result = eval(code, *args, **kwargs)
if _has_coroutine_flag(code):
task = scrypted_loop.create_task(result)
task.add_done_callback(partial(task_done_cb, future))
else:
future.set_result(result)
except BaseException as e:
future.set_exception(e)
def eval_across_loops(code, *args, **kwargs):
future = concurrent.futures.Future()
scrypted_loop.call_soon_threadsafe(
partial(eval_in_scrypted, future), code, *args, **kwargs
)
return future.result()
# WORKAROUND: Due to a bug in Jedi, the current directory is removed
# from sys.path. See: https://github.com/davidhalter/jedi/issues/1148
if "" not in sys.path:
sys.path.insert(0, "")
if line.lstrip().startswith("!"):
# Run as shell command
os.system(line[1:])
else:
# Try eval first
try:
code = self._compile_with_flags(line, "eval")
except SyntaxError:
pass
else:
# No syntax errors for eval. Do eval.
result = eval_across_loops(code, self.get_globals(), self.get_locals())
self._store_eval_result(result)
return result
# If not a valid `eval` expression, compile as `exec` expression
# but still run with eval to get an awaitable in case of a
# awaitable expression.
code = self._compile_with_flags(line, "exec")
result = eval_across_loops(code, self.get_globals(), self.get_locals())
return None
def configure(scrypted_loop: asyncio.AbstractEventLoop, repl: PythonRepl) -> None:
def configure(repl: PythonRepl) -> None:
repl.confirm_exit = False
repl.enable_open_in_editor = False
repl.enable_system_bindings = False
repl.enable_mouse_support = False
repl.run_async = types.MethodType(run_async_patched, repl)
repl.eval_async = types.MethodType(eval_async_patched, repl)
repl.scrypted_loop = scrypted_loop
class AsyncStreamStdout:
"""
Wrapper around StreamReader and StreamWriter to provide `write` and `flush`
methods for Vt100_Output.
"""
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
self.reader = reader
self.writer = writer
self.loop = asyncio.get_event_loop()
def write(self, data: bytes) -> None:
if isinstance(data, str):
data = data.encode()
self.writer.write(data)
def flush(self) -> None:
self.loop.create_task(self.writer.drain())
def isatty(self) -> bool:
return True
# keep a reference to the server alive so it doesn't get garbage collected
repl_server = None
async def createREPLServer(sdk: ScryptedStatic, plugin: ScryptedDevice) -> int:
global repl_server
if repl_server is not None:
return repl_server["port"]
deviceManager = sdk.deviceManager
systemManager = sdk.systemManager
mediaManager = sdk.mediaManager
# Create the proxy server to handle initial control messages
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(None)
# TODO: this should use an equivalent to cluster_listen_zero
sock.bind(("0.0.0.0" if os.getenv("SCRYPTED_CLUSTER_ADDRESS") else "127.0.0.1", 0))
sock.listen()
scrypted_loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
async def start_telnet_repl(future: concurrent.futures.Future, filter: str) -> None:
repl_loop = asyncio.get_event_loop()
async def on_repl_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
filter = await reader.read(1024)
filter = filter.decode()
if filter == "undefined":
filter = None
@@ -224,141 +117,52 @@ async def createREPLServer(sdk: ScryptedStatic, plugin: ScryptedDevice) -> int:
device = plugin
for c in chain:
device = await maybe_await(device.getDevice(c))
realDevice = systemManager.getDeviceById(device.id)
# Select a free port for the telnet server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("localhost", 0))
telnet_port = s.getsockname()[1]
s.close()
async def interact(connection: TelnetConnection) -> None:
# repl_loop owns the print capabilities, but the prints will
# be executed in scrypted_loop. We need to bridge the two here
repl_print = partial(print_formatted_text, output=connection.vt100_output)
def print_across_loops(*args, **kwargs):
repl_loop.call_soon_threadsafe(repl_print, *args, **kwargs)
global_dict = {
**globals(),
"print": print_across_loops,
"help": lambda *args, **kwargs: print_across_loops(
"Help is not available in this environment"
),
"input": lambda *args, **kwargs: print_across_loops(
"Input is not available in this environment"
),
}
locals_dict = {
"device": device,
"systemManager": systemManager,
"deviceManager": deviceManager,
"mediaManager": mediaManager,
"sdk": sdk,
"realDevice": realDevice,
}
vars_prompt = "\n".join([f" {k}" for k in locals_dict.keys()])
banner = f"Python REPL variables:\n{vars_prompt}"
print_formatted_text(banner)
await embed(
return_asyncio_coroutine=True,
globals=global_dict,
locals=locals_dict,
configure=partial(configure, scrypted_loop),
with create_pipe_input() as vt100_input:
vt100_output = Vt100_Output(
AsyncStreamStdout(reader, writer),
lambda: Size(rows=24, columns=80),
term=None,
)
server_task: asyncio.Task = None
async def vt100_input_coro():
while True:
data = await reader.read(1024)
if not data:
break
vt100_input.send_bytes(data)
def ready_cb():
future.set_result(
(
telnet_port,
lambda: repl_loop.call_soon_threadsafe(server_task.cancel),
asyncio.create_task(vt100_input_coro())
with create_app_session(input=vt100_input, output=vt100_output):
global_dict = {
**base_globals.copy(),
"print": print_formatted_text,
"help": lambda *args, **kwargs: print_formatted_text(
"Help is not available in this environment"
),
"input": lambda *args, **kwargs: print_formatted_text(
"Input is not available in this environment"
),
}
locals_dict = {
"device": device,
"systemManager": systemManager,
"deviceManager": deviceManager,
"mediaManager": mediaManager,
"sdk": sdk,
"realDevice": realDevice,
}
vars_prompt = "\n".join([f" {k}" for k in locals_dict.keys()])
banner = f"Python REPL variables:\n{vars_prompt}"
print_formatted_text(banner)
await embed(
return_asyncio_coroutine=True,
globals=global_dict,
locals=locals_dict,
configure=configure,
)
)
# Start the REPL server
telnet_server = TelnetServer(
interact=interact, port=telnet_port, enable_cpr=False
)
server_task = asyncio.create_task(telnet_server.run(ready_cb=ready_cb))
try:
await server_task
except:
pass
def handle_connection(conn: socket.socket):
conn.settimeout(None)
filter = conn.recv(1024).decode()
server_started_future = concurrent.futures.Future()
repl_loop = asyncio.SelectorEventLoop()
# we're not in the main loop, so can't handle any signals anyways
repl_loop.add_signal_handler = lambda sig, cb: None
repl_loop.remove_signal_handler = lambda sig: True
def finish_setup():
telnet_port, exit_server = server_started_future.result()
telnet_client = telnetlib.Telnet("localhost", telnet_port, timeout=None)
def telnet_negotiation_cb(telnet_socket, command, option):
pass # ignore telnet negotiation
telnet_client.set_option_negotiation_callback(telnet_negotiation_cb)
# initialize telnet terminal
# this tells the telnet server we are a vt100 terminal
telnet_client.get_socket().sendall(
b"\xff\xfb\x18\xff\xfa\x18\x00\x61\x6e\x73\x69\xff\xf0"
)
telnet_client.get_socket().sendall(b"\r\n")
# Bridge the connection to the telnet server, two way
def forward_to_telnet():
while True:
data = conn.recv(1024)
if not data:
break
telnet_client.write(data)
telnet_client.close()
exit_server()
def forward_to_socket():
prompt_count = 0
while True:
data = telnet_client.read_some()
if not data:
conn.sendall("REPL exited".encode())
break
if b">>>" in data:
# This is an ugly hack - somewhere in ptpython, the
# initial prompt is being printed many times. Normal
# telnet clients handle it properly, but xtermjs doesn't
# like it. We just replace the first few with spaces
# so it's not too ugly.
prompt_count += 1
if prompt_count < 5:
data = data.replace(b">>>", b" ")
conn.sendall(data)
conn.close()
exit_server()
threading.Thread(target=forward_to_telnet).start()
threading.Thread(target=forward_to_socket).start()
threading.Thread(target=finish_setup).start()
repl_loop.run_until_complete(start_telnet_repl(server_started_future, filter))
def accept_connection():
while True:
conn, addr = sock.accept()
threading.Thread(target=handle_connection, args=(conn,)).start()
threading.Thread(target=accept_connection).start()
proxy_port = sock.getsockname()[1]
return proxy_port
repl_server = await cluster_listen_zero(on_repl_client)
return repl_server["port"]