Files
scrypted/server/python/plugin_repl.py
2024-03-05 12:30:56 -05:00

136 lines
4.7 KiB
Python

import asyncio
import concurrent.futures
from prompt_toolkit import print_formatted_text
from prompt_toolkit.contrib.telnet.server import TelnetServer
from ptpython.repl import embed, PythonRepl
import socket
import telnetlib
import threading
from typing import List, Dict, Any
from scrypted_python.scrypted_sdk import ScryptedStatic, ScryptedDevice
from rpc import maybe_await
def configure(repl: PythonRepl) -> None:
repl.confirm_exit = False
repl.enable_system_bindings = False
repl.enable_mouse_support = False
async def createREPLServer(sdk: ScryptedStatic, plugin: ScryptedDevice) -> int:
deviceManager = sdk.deviceManager
systemManager = sdk.systemManager
mediaManager = sdk.mediaManager
# Create the proxy server to handle initial control messages
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(None)
sock.bind(('localhost', 0))
sock.listen(1)
async def start_telnet_repl(future, filter) -> None:
if filter == "undefined":
filter = None
chain: List[str] = []
nativeIds: Dict[str, Any] = deviceManager.nativeIds
reversed: Dict[str, str] = {v.id: k for k, v in nativeIds.items()}
while filter is not None:
id = nativeIds.get(filter).id
d = systemManager.getDeviceById(id)
chain.append(filter)
filter = reversed.get(d.providerId)
chain.reverse()
device = plugin
for c in chain:
device = await maybe_await(device.getDevice(c))
realDevice = systemManager.getDeviceById(device.id)
# Select a free port for the telnet server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('localhost', 0))
telnet_port = s.getsockname()[1]
s.close()
async def interact(connection) -> None:
global_dict = {**globals(), "print": print_formatted_text}
locals_dict = {
"device": device,
"systemManager": systemManager,
"deviceManager": deviceManager,
"mediaManager": mediaManager,
"sdk": sdk,
"realDevice": realDevice
}
vars_prompt = '\n'.join([f" {k}" for k in locals_dict.keys()])
banner = f"Python REPL variables:\n{vars_prompt}"
print_formatted_text(banner)
await embed(return_asyncio_coroutine=True, globals=global_dict, locals=locals_dict, configure=configure)
# Start the REPL server
telnet_server = TelnetServer(interact=interact, port=telnet_port, enable_cpr=False)
telnet_server.start()
print(f"Running telnet server on port {telnet_port}...")
future.set_result(telnet_port)
loop = asyncio.get_event_loop()
def handle_connection(conn):
filter = conn.recv(1024).decode()
print(f"Filter: {filter}")
future = concurrent.futures.Future()
loop.call_soon_threadsafe(loop.create_task, start_telnet_repl(future, filter))
telnet_port = future.result()
telnet_client = telnetlib.Telnet('localhost', telnet_port, timeout=None)
def telnet_negotiation_cb(telnet_socket, command, option):
pass # ignore telnet negotiation
telnet_client.set_option_negotiation_callback(telnet_negotiation_cb)
print('Connected to telnet server')
# initialize telnet terminal
telnet_client.get_socket().sendall(b'\xff\xfb\x18\xff\xfa\x18\x00\x61\x6e\x73\x69\xff\xf0')
telnet_client.get_socket().sendall(b'\r\n')
#telnet_client.get_socket().sendall(b'\xff\xfa\x18\x39\x36\x2c\x32\x34\xff\xf0')
#telnet_client.get_socket().sendall(b'\r\n')
# Bridge the connection to the telnet server, two way
def forward_to_telnet():
while True:
data = conn.recv(1024)
if not data:
break
telnet_client.write(data)
def forward_to_socket():
while True:
data = telnet_client.read_some()
if not data:
conn.sendall('REPL exited'.encode())
break
print(data)
conn.sendall(data)
threading.Thread(target=forward_to_telnet).start()
threading.Thread(target=forward_to_socket).start()
def accept_connection():
while True:
conn, addr = sock.accept()
threading.Thread(target=handle_connection, args=(conn,)).start()
threading.Thread(target=accept_connection).start()
proxy_port = sock.getsockname()[1]
print(f"Running proxy server on port {proxy_port}...")
return proxy_port