Merge remote-tracking branch 'origin/main' into beta

This commit is contained in:
Koushik Dutta
2024-03-20 12:57:05 -07:00
2 changed files with 138 additions and 54 deletions

View File

@@ -2,6 +2,7 @@ import asyncio
import concurrent.futures
from functools import partial
import inspect
import os
import prompt_toolkit
from prompt_toolkit import print_formatted_text
from prompt_toolkit.application import Application
@@ -10,12 +11,13 @@ import prompt_toolkit.key_binding.key_processor
import prompt_toolkit.contrib.telnet.server
from prompt_toolkit.contrib.telnet.server import TelnetServer, TelnetConnection
from prompt_toolkit.shortcuts import clear_title, set_title
from ptpython.repl import embed, PythonRepl
from ptpython.repl import embed, PythonRepl, _has_coroutine_flag
import ptpython.key_bindings
import ptpython.python_input
import ptpython.history_browser
import ptpython.layout
import socket
import sys
import telnetlib
import threading
import traceback
@@ -48,11 +50,10 @@ ptpython.history_browser.get_app = get_app_patched
ptpython.layout.get_app = get_app_patched
# This is a patched version of PythonRepl.run_async to handle an
# AssertionError raised by prompt_toolkit when the TelnetServer exits.
# Original: https://github.com/prompt-toolkit/ptpython/blob/3.0.26/ptpython/repl.py#L215
async def run_async_patched(self: PythonRepl) -> None:
# This is a patched version of PythonRepl.run_async to handle an
# AssertionError raised by prompt_toolkit when the TelnetServer exits.
# Original: https://github.com/prompt-toolkit/ptpython/blob/3.0.26/ptpython/repl.py#L215
"""
Run the REPL loop, but run the blocking parts in an executor, so that
we don't block the event loop. Both the input and output (which can
@@ -106,11 +107,75 @@ async def run_async_patched(self: PythonRepl) -> None:
self._remove_from_namespace()
def configure(repl: PythonRepl) -> None:
# This is a patched version of PythonRepl.eval_async to cross thread
# boundaries when running user REPL instructions.
# Original: https://github.com/prompt-toolkit/ptpython/blob/3.0.26/ptpython/repl.py#L304
async def eval_async_patched(self: PythonRepl, line: str) -> object:
"""
Evaluate the line and print the result.
"""
scrypted_loop: asyncio.AbstractEventLoop = self.scrypted_loop
def task_done_cb(future: concurrent.futures.Future, task: asyncio.Task):
try:
result = task.result()
future.set_result(result)
except BaseException as e:
future.set_exception(e)
def eval_in_scrypted(future: concurrent.futures.Future, code, *args, **kwargs):
try:
result = eval(code, *args, **kwargs)
if _has_coroutine_flag(code):
task = scrypted_loop.create_task(result)
task.add_done_callback(partial(task_done_cb, future))
else:
future.set_result(result)
except BaseException as e:
future.set_exception(e)
def eval_across_loops(code, *args, **kwargs):
future = concurrent.futures.Future()
scrypted_loop.call_soon_threadsafe(partial(eval_in_scrypted, future), code, *args, **kwargs)
return future.result()
# WORKAROUND: Due to a bug in Jedi, the current directory is removed
# from sys.path. See: https://github.com/davidhalter/jedi/issues/1148
if "" not in sys.path:
sys.path.insert(0, "")
if line.lstrip().startswith("!"):
# Run as shell command
os.system(line[1:])
else:
# Try eval first
try:
code = self._compile_with_flags(line, "eval")
except SyntaxError:
pass
else:
# No syntax errors for eval. Do eval.
result = eval_across_loops(code, self.get_globals(), self.get_locals())
self._store_eval_result(result)
return result
# If not a valid `eval` expression, compile as `exec` expression
# but still run with eval to get an awaitable in case of a
# awaitable expression.
code = self._compile_with_flags(line, "exec")
result = eval_across_loops(code, self.get_globals(), self.get_locals())
return None
def configure(scrypted_loop: asyncio.AbstractEventLoop, repl: PythonRepl) -> None:
repl.confirm_exit = False
repl.enable_system_bindings = False
repl.enable_mouse_support = False
repl.run_async = types.MethodType(run_async_patched, repl)
repl.eval_async = types.MethodType(eval_async_patched, repl)
repl.scrypted_loop = scrypted_loop
async def createREPLServer(sdk: ScryptedStatic, plugin: ScryptedDevice) -> int:
@@ -125,9 +190,11 @@ async def createREPLServer(sdk: ScryptedStatic, plugin: ScryptedDevice) -> int:
sock.bind(('localhost', 0))
sock.listen()
loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
scrypted_loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
async def start_telnet_repl(future: concurrent.futures.Future, filter: str) -> None:
repl_loop = asyncio.get_event_loop()
if filter == "undefined":
filter = None
@@ -155,12 +222,17 @@ async def createREPLServer(sdk: ScryptedStatic, plugin: ScryptedDevice) -> int:
s.close()
async def interact(connection: TelnetConnection) -> None:
# repl_loop owns the print capabilities, but the prints will
# be executed in scrypted_loop. We need to bridge the two here
repl_print = partial(print_formatted_text, output=connection.vt100_output)
def print_across_loops(*args, **kwargs):
repl_loop.call_soon_threadsafe(repl_print, *args, **kwargs)
global_dict = {
**globals(),
"print": repl_print,
"help": lambda *args, **kwargs: repl_print("Help is not available in this environment"),
"input": lambda *args, **kwargs: repl_print("Input is not available in this environment"),
"print": print_across_loops,
"help": lambda *args, **kwargs: print_across_loops("Help is not available in this environment"),
"input": lambda *args, **kwargs: print_across_loops("Input is not available in this environment"),
}
locals_dict = {
"device": device,
@@ -173,67 +245,79 @@ async def createREPLServer(sdk: ScryptedStatic, plugin: ScryptedDevice) -> int:
vars_prompt = '\n'.join([f" {k}" for k in locals_dict.keys()])
banner = f"Python REPL variables:\n{vars_prompt}"
print_formatted_text(banner)
await embed(return_asyncio_coroutine=True, globals=global_dict, locals=locals_dict, configure=configure)
await embed(return_asyncio_coroutine=True, globals=global_dict, locals=locals_dict, configure=partial(configure, scrypted_loop))
server_task: asyncio.Task = None
def ready_cb():
future.set_result((telnet_port, lambda: loop.call_soon_threadsafe(server_task.cancel)))
future.set_result((telnet_port, lambda: repl_loop.call_soon_threadsafe(server_task.cancel)))
# Start the REPL server
telnet_server = TelnetServer(interact=interact, port=telnet_port, enable_cpr=False)
server_task = asyncio.create_task(telnet_server.run(ready_cb=ready_cb))
try:
await server_task
except:
pass
def handle_connection(conn: socket.socket):
conn.settimeout(None)
filter = conn.recv(1024).decode()
server_started_future = concurrent.futures.Future()
repl_loop = asyncio.SelectorEventLoop()
future = concurrent.futures.Future()
loop.call_soon_threadsafe(loop.create_task, start_telnet_repl(future, filter))
telnet_port, exit_server = future.result()
# we're not in the main loop, so can't handle any signals anyways
repl_loop.add_signal_handler = lambda sig, cb: None
telnet_client = telnetlib.Telnet('localhost', telnet_port, timeout=None)
def finish_setup():
telnet_port, exit_server = server_started_future.result()
def telnet_negotiation_cb(telnet_socket, command, option):
pass # ignore telnet negotiation
telnet_client.set_option_negotiation_callback(telnet_negotiation_cb)
telnet_client = telnetlib.Telnet('localhost', telnet_port, timeout=None)
# initialize telnet terminal
# this tells the telnet server we are a vt100 terminal
telnet_client.get_socket().sendall(b'\xff\xfb\x18\xff\xfa\x18\x00\x61\x6e\x73\x69\xff\xf0')
telnet_client.get_socket().sendall(b'\r\n')
def telnet_negotiation_cb(telnet_socket, command, option):
pass # ignore telnet negotiation
telnet_client.set_option_negotiation_callback(telnet_negotiation_cb)
# Bridge the connection to the telnet server, two way
def forward_to_telnet():
while True:
data = conn.recv(1024)
if not data:
break
telnet_client.write(data)
telnet_client.close()
exit_server()
# initialize telnet terminal
# this tells the telnet server we are a vt100 terminal
telnet_client.get_socket().sendall(b'\xff\xfb\x18\xff\xfa\x18\x00\x61\x6e\x73\x69\xff\xf0')
telnet_client.get_socket().sendall(b'\r\n')
def forward_to_socket():
prompt_count = 0
while True:
data = telnet_client.read_some()
if not data:
conn.sendall('REPL exited'.encode())
break
if b">>>" in data:
# This is an ugly hack - somewhere in ptpython, the
# initial prompt is being printed many times. Normal
# telnet clients handle it properly, but xtermjs doesn't
# like it. We just replace the first few with spaces
# so it's not too ugly.
prompt_count += 1
if prompt_count < 5:
data = data.replace(b">>>", b" ")
conn.sendall(data)
conn.close()
exit_server()
# Bridge the connection to the telnet server, two way
def forward_to_telnet():
while True:
data = conn.recv(1024)
if not data:
break
telnet_client.write(data)
telnet_client.close()
exit_server()
threading.Thread(target=forward_to_telnet).start()
threading.Thread(target=forward_to_socket).start()
def forward_to_socket():
prompt_count = 0
while True:
data = telnet_client.read_some()
if not data:
conn.sendall('REPL exited'.encode())
break
if b">>>" in data:
# This is an ugly hack - somewhere in ptpython, the
# initial prompt is being printed many times. Normal
# telnet clients handle it properly, but xtermjs doesn't
# like it. We just replace the first few with spaces
# so it's not too ugly.
prompt_count += 1
if prompt_count < 5:
data = data.replace(b">>>", b" ")
conn.sendall(data)
conn.close()
exit_server()
threading.Thread(target=forward_to_telnet).start()
threading.Thread(target=forward_to_socket).start()
threading.Thread(target=finish_setup).start()
repl_loop.run_until_complete(start_telnet_repl(server_started_future, filter))
def accept_connection():
while True: