diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index 4f70879a1..accb7d016 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -41,6 +41,12 @@ import rpc import rpc_reader +OPTIONAL_REQUIREMENTS = """ +ptpython +""".strip() + + + class ClusterObject(TypedDict): id: str port: int @@ -337,6 +343,7 @@ class PluginRemote: self.pluginId = pluginId self.hostInfo = hostInfo self.loop = loop + self.replPort = None self.__dict__['__proxy_oneway_methods'] = [ 'notify', 'updateDeviceState', @@ -567,81 +574,100 @@ class PluginRemote: if not os.path.exists(python_prefix): os.makedirs(python_prefix) + str_requirements = "" if 'requirements.txt' in zip.namelist(): requirements = zip.open('requirements.txt').read() str_requirements = requirements.decode('utf8') - requirementstxt = os.path.join( - python_prefix, 'requirements.txt') - installed_requirementstxt = os.path.join( - python_prefix, 'requirements.installed.txt') + installed_optional_requirementstxt = os.path.join( + python_prefix, 'optional-requirements.installed.txt') + optional_requirementstxt = os.path.join( + python_prefix, 'optional-requirements.txt') + requirementstxt = os.path.join( + python_prefix, 'requirements.txt') + installed_requirementstxt = os.path.join( + python_prefix, 'requirements.installed.txt') - need_pip = True + def install_with_pip(want_requirements: str, requirementstxt: str, installed_requirementstxt: str, ignore_error: bool = False): + os.makedirs(python_prefix, exist_ok=True) + + print(f'{os.path.basename(requirementstxt)} (outdated)') + print(want_requirements) + + f = open(requirementstxt, 'wb') + f.write(want_requirements.encode()) + f.close() + + try: + pythonVersion = packageJson['scrypted']['pythonVersion'] + except: + pythonVersion = None + + pipArgs = [ + sys.executable, + '-m', 'pip', 'install', '-r', requirementstxt, + '--prefix', python_prefix + ] + if pythonVersion: + print('Specific Python version requested. Forcing reinstall.') + # prevent uninstalling system packages. + pipArgs.append('--ignore-installed') + # force reinstall even if it exists in system packages. + pipArgs.append('--force-reinstall') + + p = subprocess.Popen(pipArgs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + while True: + line = p.stdout.readline() + if not line: + break + line = line.decode('utf8').rstrip('\r\n') + print(line) + result = p.wait() + print('pip install result %s' % result) + if result: + if not ignore_error: + raise Exception('non-zero result from pip %s' % result) + else: + print('ignoring non-zero result from pip %s' % result) + else: + f = open(installed_requirementstxt, 'wb') + f.write(want_requirements.encode()) + f.close() + + need_pip = True + if str_requirements: try: existing = open(installed_requirementstxt).read() need_pip = existing != str_requirements except: pass + if not need_pip: + try: + existing = open(installed_optional_requirementstxt).read() + need_pip = existing != OPTIONAL_REQUIREMENTS + except: + need_pip = True - if need_pip: - try: - for de in os.listdir(plugin_volume): - if de.startswith('linux') or de.startswith('darwin') or de.startswith('win32') or de.startswith('python') or de.startswith('node'): - filePath = os.path.join(plugin_volume, de) - print('Removing old dependencies: %s' % - filePath) - try: - shutil.rmtree(filePath) - except: - pass - except: - pass + if need_pip: + try: + for de in os.listdir(plugin_volume): + if de.startswith('linux') or de.startswith('darwin') or de.startswith('win32') or de.startswith('python') or de.startswith('node'): + filePath = os.path.join(plugin_volume, de) + print('Removing old dependencies: %s' % + filePath) + try: + shutil.rmtree(filePath) + except: + pass + except: + pass - os.makedirs(python_prefix) - - print('requirements.txt (outdated)') - print(str_requirements) - - f = open(requirementstxt, 'wb') - f.write(requirements) - f.close() - - try: - pythonVersion = packageJson['scrypted']['pythonVersion'] - except: - pythonVersion = None - - pipArgs = [ - sys.executable, - '-m', 'pip', 'install', '-r', requirementstxt, - '--prefix', python_prefix - ] - if pythonVersion: - print('Specific Python version requested. Forcing reinstall.') - # prevent uninstalling system packages. - pipArgs.append('--ignore-installed') - # force reinstall even if it exists in system packages. - pipArgs.append('--force-reinstall') - - p = subprocess.Popen(pipArgs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - - while True: - line = p.stdout.readline() - if not line: - break - line = line.decode('utf8').rstrip('\r\n') - print(line) - result = p.wait() - print('pip install result %s' % result) - if result: - raise Exception('non-zero result from pip %s' % result) - - f = open(installed_requirementstxt, 'wb') - f.write(requirements) - f.close() - else: - print('requirements.txt (up to date)') - print(str_requirements) + install_with_pip(OPTIONAL_REQUIREMENTS, optional_requirementstxt, installed_optional_requirementstxt, ignore_error=True) + install_with_pip(str_requirements, requirementstxt, installed_requirementstxt, ignore_error=False) + else: + print('requirements.txt (up to date)') + print(str_requirements) sys.path.insert(0, zipPath) if platform.system() != 'Windows': @@ -744,7 +770,14 @@ class PluginRemote: if not forkMain: from main import create_scrypted_plugin # type: ignore - return await rpc.maybe_await(create_scrypted_plugin()) + pluginInstance = await rpc.maybe_await(create_scrypted_plugin()) + try: + from plugin_repl import createREPLServer + self.replPort = await createREPLServer(sdk, pluginInstance) + except Exception as e: + print(f"Warning: Python REPL cannot be loaded: {e}") + self.replPort = 0 + return pluginInstance from main import fork # type: ignore forked = await rpc.maybe_await(fork()) @@ -795,7 +828,13 @@ class PluginRemote: pass async def getServicePort(self, name): - pass + if name == "repl": + if self.replPort is None: + raise Exception('REPL unavailable: Plugin not loaded.') + if self.replPort == 0: + raise Exception('REPL unavailable: Python REPL not available.') + return self.replPort + raise Exception(f'unknown service {name}') async def start_stats_runner(self): update_stats = await self.peer.getParam('updateStats') diff --git a/server/python/plugin_repl.py b/server/python/plugin_repl.py new file mode 100644 index 000000000..292ea4ada --- /dev/null +++ b/server/python/plugin_repl.py @@ -0,0 +1,143 @@ +import asyncio +import concurrent.futures +from prompt_toolkit import print_formatted_text +from prompt_toolkit.contrib.telnet.server import TelnetServer +from ptpython.repl import embed, PythonRepl +import socket +import telnetlib +import threading +from typing import List, Dict, Any + +from scrypted_python.scrypted_sdk import ScryptedStatic, ScryptedDevice + +from rpc import maybe_await + + +def configure(repl: PythonRepl) -> None: + repl.confirm_exit = False + repl.enable_system_bindings = False + repl.enable_mouse_support = False + + +async def createREPLServer(sdk: ScryptedStatic, plugin: ScryptedDevice) -> int: + deviceManager = sdk.deviceManager + systemManager = sdk.systemManager + mediaManager = sdk.mediaManager + + # Create the proxy server to handle initial control messages + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.settimeout(None) + sock.bind(('localhost', 0)) + sock.listen(1) + + async def start_telnet_repl(future, filter) -> None: + if filter == "undefined": + filter = None + + chain: List[str] = [] + nativeIds: Dict[str, Any] = deviceManager.nativeIds + reversed: Dict[str, str] = {v.id: k for k, v in nativeIds.items()} + + while filter is not None: + id = nativeIds.get(filter).id + d = systemManager.getDeviceById(id) + chain.append(filter) + filter = reversed.get(d.providerId) + + chain.reverse() + device = plugin + for c in chain: + device = await maybe_await(device.getDevice(c)) + + realDevice = systemManager.getDeviceById(device.id) + + # Select a free port for the telnet server + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(('localhost', 0)) + telnet_port = s.getsockname()[1] + s.close() + + async def interact(connection) -> None: + global_dict = { + **globals(), + "print": print_formatted_text, + "help": lambda *args, **kwargs: print_formatted_text("Help is not available in this environment"), + } + locals_dict = { + "device": device, + "systemManager": systemManager, + "deviceManager": deviceManager, + "mediaManager": mediaManager, + "sdk": sdk, + "realDevice": realDevice + } + vars_prompt = '\n'.join([f" {k}" for k in locals_dict.keys()]) + banner = f"Python REPL variables:\n{vars_prompt}" + print_formatted_text(banner) + await embed(return_asyncio_coroutine=True, globals=global_dict, locals=locals_dict, configure=configure) + + # Start the REPL server + telnet_server = TelnetServer(interact=interact, port=telnet_port, enable_cpr=False) + telnet_server.start() + + future.set_result(telnet_port) + + loop = asyncio.get_event_loop() + + def handle_connection(conn: socket.socket): + conn.settimeout(None) + filter = conn.recv(1024).decode() + + future = concurrent.futures.Future() + loop.call_soon_threadsafe(loop.create_task, start_telnet_repl(future, filter)) + telnet_port = future.result() + + telnet_client = telnetlib.Telnet('localhost', telnet_port, timeout=None) + + def telnet_negotiation_cb(telnet_socket, command, option): + pass # ignore telnet negotiation + telnet_client.set_option_negotiation_callback(telnet_negotiation_cb) + + # initialize telnet terminal + # this tells the telnet server we are a vt100 terminal + telnet_client.get_socket().sendall(b'\xff\xfb\x18\xff\xfa\x18\x00\x61\x6e\x73\x69\xff\xf0') + telnet_client.get_socket().sendall(b'\r\n') + + # Bridge the connection to the telnet server, two way + def forward_to_telnet(): + while True: + data = conn.recv(1024) + if not data: + break + telnet_client.write(data) + def forward_to_socket(): + prompt_count = 0 + while True: + data = telnet_client.read_some() + if not data: + conn.sendall('REPL exited'.encode()) + break + if b">>>" in data: + # This is an ugly hack - somewhere in ptpython, the + # initial prompt is being printed many times. Normal + # telnet clients handle it properly, but xtermjs doesn't + # like it. We just replace the first few with spaces + # so it's not too ugly. + prompt_count += 1 + if prompt_count < 5: + data = data.replace(b">>>", b" ") + conn.sendall(data) + + threading.Thread(target=forward_to_telnet).start() + threading.Thread(target=forward_to_socket).start() + + def accept_connection(): + while True: + conn, addr = sock.accept() + threading.Thread(target=handle_connection, args=(conn,)).start() + + threading.Thread(target=accept_connection).start() + + proxy_port = sock.getsockname()[1] + return proxy_port \ No newline at end of file