diff --git a/sdk/package-lock.json b/sdk/package-lock.json index a637f47fb..dfc250985 100644 --- a/sdk/package-lock.json +++ b/sdk/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/sdk", - "version": "0.2.59", + "version": "0.2.62", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/sdk", - "version": "0.2.59", + "version": "0.2.62", "license": "ISC", "dependencies": { "@babel/preset-typescript": "^7.18.6", diff --git a/sdk/package.json b/sdk/package.json index 45cab6ed3..133cb0641 100644 --- a/sdk/package.json +++ b/sdk/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/sdk", - "version": "0.2.59", + "version": "0.2.62", "description": "", "main": "dist/src/index.js", "exports": { diff --git a/sdk/types/package-lock.json b/sdk/types/package-lock.json index ab1cd807a..0a8b7cc7a 100644 --- a/sdk/types/package-lock.json +++ b/sdk/types/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/types", - "version": "0.2.56", + "version": "0.2.57", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/types", - "version": "0.2.56", + "version": "0.2.57", "license": "ISC", "devDependencies": { "@types/rimraf": "^3.0.2", diff --git a/sdk/types/package.json b/sdk/types/package.json index 9254b7900..274121a06 100644 --- a/sdk/types/package.json +++ b/sdk/types/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/types", - "version": "0.2.56", + "version": "0.2.57", "description": "", "main": "dist/index.js", "author": "", diff --git a/sdk/types/scrypted_python/scrypted_sdk/__init__.py b/sdk/types/scrypted_python/scrypted_sdk/__init__.py index 4b72b7947..5ffb54310 100644 --- a/sdk/types/scrypted_python/scrypted_sdk/__init__.py +++ b/sdk/types/scrypted_python/scrypted_sdk/__init__.py @@ -5,9 +5,10 @@ from zipfile import ZipFile from asyncio import Future from multiprocessing import Process from typing import Callable +import asyncio class PluginFork: - result: Future + result: asyncio.Task worker: Process deviceManager: DeviceManager = None diff --git a/server/package-lock.json b/server/package-lock.json index 575f6697e..5e7c77434 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -11,7 +11,7 @@ "dependencies": { "@ffmpeg-installer/ffmpeg": "^1.1.0", "@mapbox/node-pre-gyp": "^1.0.10", - "@scrypted/types": "^0.2.56", + "@scrypted/types": "^0.2.57", "adm-zip": "^0.5.9", "axios": "^0.21.4", "body-parser": "^1.19.0", @@ -245,9 +245,9 @@ } }, "node_modules/@scrypted/types": { - "version": "0.2.56", - "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.2.56.tgz", - "integrity": "sha512-1MUTKR5kGdrmz/HGMudYggVojZJPSnhr42CDH5s09IjVSH/FXRHzq56yJ9eiXJW3duJN/g2P8XCY4/r1CHK7xg==" + "version": "0.2.57", + "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.2.57.tgz", + "integrity": "sha512-kRqg0r32XasIzAdjrGi7q5anwkuJQvPEn6c/L3nQRQ32bBrtp6cvAWlAMwybsv2s3oBIZsjbAcfTpZ12Ux3k4A==" }, "node_modules/@tootallnate/once": { "version": "1.1.2", @@ -3281,9 +3281,9 @@ } }, "@scrypted/types": { - "version": "0.2.56", - "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.2.56.tgz", - "integrity": "sha512-1MUTKR5kGdrmz/HGMudYggVojZJPSnhr42CDH5s09IjVSH/FXRHzq56yJ9eiXJW3duJN/g2P8XCY4/r1CHK7xg==" + "version": "0.2.57", + "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.2.57.tgz", + "integrity": "sha512-kRqg0r32XasIzAdjrGi7q5anwkuJQvPEn6c/L3nQRQ32bBrtp6cvAWlAMwybsv2s3oBIZsjbAcfTpZ12Ux3k4A==" }, "@tootallnate/once": { "version": "1.1.2", diff --git a/server/package.json b/server/package.json index 0253aac9e..b8a5e5736 100644 --- a/server/package.json +++ b/server/package.json @@ -5,7 +5,7 @@ "dependencies": { "@ffmpeg-installer/ffmpeg": "^1.1.0", "@mapbox/node-pre-gyp": "^1.0.10", - "@scrypted/types": "^0.2.56", + "@scrypted/types": "^0.2.57", "adm-zip": "^0.5.9", "axios": "^0.21.4", "body-parser": "^1.19.0", diff --git a/server/python/plugin-remote.py b/server/python/plugin-remote.py index d89f025aa..f1811a529 100644 --- a/server/python/plugin-remote.py +++ b/server/python/plugin-remote.py @@ -23,20 +23,20 @@ from typing import Any, List, Optional, Set, Tuple import aiofiles import scrypted_python.scrypted_sdk.types -from scrypted_python.scrypted_sdk import ScryptedStatic -from scrypted_python.scrypted_sdk.types import (Device, DeviceManifest, - EventDetails, - ScryptedInterfaceProperty, - Storage) +from scrypted_python.scrypted_sdk import ScryptedStatic, PluginFork +from scrypted_python.scrypted_sdk.types import Device, DeviceManifest, EventDetails, ScryptedInterfaceProperty, Storage from typing_extensions import TypedDict - import rpc +import rpc_reader +import multiprocessing +import multiprocessing.connection class SystemDeviceState(TypedDict): lastEventTime: int stateTime: int value: any + class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager): def __init__(self, api: Any, systemState: Mapping[str, Mapping[str, SystemDeviceState]]) -> None: super().__init__() @@ -46,6 +46,7 @@ class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager): async def getComponent(self, id: str) -> Any: return await self.api.getComponent(id) + class MediaObjectRemote(scrypted_python.scrypted_sdk.types.MediaObject): def __init__(self, data, mimeType, sourceId): self.mimeType = mimeType @@ -58,38 +59,52 @@ class MediaObjectRemote(scrypted_python.scrypted_sdk.types.MediaObject): async def getData(self): return self.data + class MediaManager: def __init__(self, mediaManager: scrypted_python.scrypted_sdk.types.MediaManager): self.mediaManager = mediaManager async def addConverter(self, converter: scrypted_python.scrypted_sdk.types.BufferConverter) -> None: return await self.mediaManager.addConverter(converter) + async def clearConverters(self) -> None: return await self.mediaManager.clearConverters() + async def convertMediaObject(self, mediaObject: scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> Any: return await self.mediaManager.convertMediaObject(mediaObject, toMimeType) + async def convertMediaObjectToBuffer(self, mediaObject: scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> bytearray: return await self.mediaManager.convertMediaObjectToBuffer(mediaObject, toMimeType) + async def convertMediaObjectToInsecureLocalUrl(self, mediaObject: str | scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> str: return await self.mediaManager.convertMediaObjectToInsecureLocalUrl(mediaObject, toMimeType) + async def convertMediaObjectToJSON(self, mediaObject: scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> Any: return await self.mediaManager.convertMediaObjectToJSON(mediaObject, toMimeType) + async def convertMediaObjectToLocalUrl(self, mediaObject: str | scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> str: return await self.mediaManager.convertMediaObjectToLocalUrl(mediaObject, toMimeType) + async def convertMediaObjectToUrl(self, mediaObject: str | scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> str: return await self.mediaManager.convertMediaObjectToUrl(mediaObject, toMimeType) + async def createFFmpegMediaObject(self, ffmpegInput: scrypted_python.scrypted_sdk.types.FFmpegInput, options: scrypted_python.scrypted_sdk.types.MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject: return await self.mediaManager.createFFmpegMediaObject(ffmpegInput, options) + async def createMediaObject(self, data: Any, mimeType: str, options: scrypted_python.scrypted_sdk.types.MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject: # return await self.createMediaObject(data, mimetypes, options) return MediaObjectRemote(data, mimeType, options.get('sourceId', None) if options else None) - async def createMediaObjectFromUrl(self, data: str, options:scrypted_python.scrypted_sdk.types. MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject: + + async def createMediaObjectFromUrl(self, data: str, options: scrypted_python.scrypted_sdk.types. MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject: return await self.mediaManager.createMediaObjectFromUrl(data, options) + async def getFFmpegPath(self) -> str: return await self.mediaManager.getFFmpegPath() + async def getFilesPath(self) -> str: return await self.mediaManager.getFilesPath() + class DeviceState(scrypted_python.scrypted_sdk.types.DeviceState): def __init__(self, id: str, nativeId: str, systemManager: SystemManager, deviceManager: scrypted_python.scrypted_sdk.types.DeviceManager) -> None: super().__init__() @@ -157,6 +172,7 @@ class DeviceStorage(Storage): self.storage = {} self.update_storage() + class DeviceManager(scrypted_python.scrypted_sdk.types.DeviceManager): def __init__(self, nativeIds: Mapping[str, DeviceStorage], systemManager: SystemManager) -> None: super().__init__() @@ -188,6 +204,7 @@ class DeviceManager(scrypted_python.scrypted_sdk.types.DeviceManager): def getDeviceStorage(self, nativeId: str = None) -> Storage: return self.nativeIds.get(nativeId, None) + class BufferSerializer(rpc.RpcSerializer): def serialize(self, value, serializationContext): return base64.b64encode(value).decode('utf8') @@ -210,6 +227,7 @@ class SidebandBufferSerializer(rpc.RpcSerializer): buffer = buffers.pop() return buffer + class PluginRemote: systemState: Mapping[str, Mapping[str, SystemDeviceState]] = {} nativeIds: Mapping[str, DeviceStorage] = {} @@ -263,85 +281,94 @@ class PluginRemote: asyncio.run_coroutine_threadsafe(self.print_async( nativeId, *values, sep=sep, end=end, flush=flush), self.loop) - async def loadZip(self, packageJson, zipData, options=None): - zipPath: str + async def loadZip(self, packageJson, zipData, options: dict=None): + forkMain = options and options.get('fork') - if isinstance(zipData, str): - zipPath = (options and options.get('filename', None)) or zipData - if zipPath != zipData: - shutil.copyfile(zipData, zipPath) - else: - zipPath = options['filename'] - f = open(zipPath, 'wb') - f.write(zipData) - f.close() + if not forkMain: + multiprocessing.set_start_method('spawn') - zipData = None + zipPath: str - zip = zipfile.ZipFile(zipPath) - - plugin_volume = os.environ.get('SCRYPTED_PLUGIN_VOLUME') - - python_version = 'python%s' % str( - sys.version_info[0])+"."+str(sys.version_info[1]) - print('python version:', python_version) - - python_prefix = os.path.join(plugin_volume, '%s-%s-%s' % (python_version, platform.system(), platform.machine())) - if not os.path.exists(python_prefix): - os.makedirs(python_prefix) - - if 'requirements.txt' in zip.namelist(): - requirements = zip.open('requirements.txt').read() - str_requirements = requirements.decode('utf8') - - requirementstxt = os.path.join(python_prefix, 'requirements.txt') - installed_requirementstxt = os.path.join( - python_prefix, 'requirements.installed.txt') - - need_pip = True - try: - existing = open(installed_requirementstxt).read() - need_pip = existing != str_requirements - except: - pass - - if need_pip: - print('requirements.txt (outdated)') - print(str_requirements) - - f = open(requirementstxt, 'wb') - f.write(requirements) - f.close() - - p = subprocess.Popen([sys.executable, '-m', 'pip', 'install', '-r', requirementstxt, - '--prefix', python_prefix], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - while True: - line = p.stdout.readline() - if not line: - break - line = line.decode('utf8').rstrip('\r\n') - print(line) - result = p.wait() - print('pip install result %s' % result) - if result: - raise Exception('non-zero result from pip %s' % result) - - f = open(installed_requirementstxt, 'wb') - f.write(requirements) - f.close() + if isinstance(zipData, str): + zipPath = (options and options.get('filename', None)) or zipData + if zipPath != zipData: + shutil.copyfile(zipData, zipPath) else: - print('requirements.txt (up to date)') - print(str_requirements) + zipPath = options['filename'] + f = open(zipPath, 'wb') + f.write(zipData) + f.close() - sys.path.insert(0, zipPath) - if platform.system() != 'Windows': - site_packages = os.path.join( - python_prefix, 'lib', python_version, 'site-packages') + zipData = None + + zip = zipfile.ZipFile(zipPath) + + plugin_volume = os.environ.get('SCRYPTED_PLUGIN_VOLUME') + + python_version = 'python%s' % str( + sys.version_info[0])+"."+str(sys.version_info[1]) + print('python version:', python_version) + + python_prefix = os.path.join( + plugin_volume, '%s-%s-%s' % (python_version, platform.system(), platform.machine())) + if not os.path.exists(python_prefix): + os.makedirs(python_prefix) + + if 'requirements.txt' in zip.namelist(): + requirements = zip.open('requirements.txt').read() + str_requirements = requirements.decode('utf8') + + requirementstxt = os.path.join(python_prefix, 'requirements.txt') + installed_requirementstxt = os.path.join( + python_prefix, 'requirements.installed.txt') + + need_pip = True + try: + existing = open(installed_requirementstxt).read() + need_pip = existing != str_requirements + except: + pass + + if need_pip: + print('requirements.txt (outdated)') + print(str_requirements) + + f = open(requirementstxt, 'wb') + f.write(requirements) + f.close() + + p = subprocess.Popen([sys.executable, '-m', 'pip', 'install', '-r', requirementstxt, + '--prefix', python_prefix], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + while True: + line = p.stdout.readline() + if not line: + break + line = line.decode('utf8').rstrip('\r\n') + print(line) + result = p.wait() + print('pip install result %s' % result) + if result: + raise Exception('non-zero result from pip %s' % result) + + f = open(installed_requirementstxt, 'wb') + f.write(requirements) + f.close() + else: + print('requirements.txt (up to date)') + print(str_requirements) + + sys.path.insert(0, zipPath) + if platform.system() != 'Windows': + site_packages = os.path.join( + python_prefix, 'lib', python_version, 'site-packages') + else: + site_packages = os.path.join( + python_prefix, 'Lib', 'site-packages') + print('site-packages: %s' % site_packages) + sys.path.insert(0, site_packages) else: - site_packages = os.path.join( - python_prefix, 'Lib', 'site-packages') - print('site-packages: %s' % site_packages) - sys.path.insert(0, site_packages) + zip = zipfile.ZipFile(options['filename']) + self.systemManager = SystemManager(self.api, self.systemState) self.deviceManager = DeviceManager(self.nativeIds, self.systemManager) self.mediaManager = MediaManager(await self.api.getMediaManager()) @@ -356,18 +383,55 @@ class PluginRemote: sdk.remote = self sdk.api = self.api sdk.zip = zip + + def host_fork() -> PluginFork: + parent_conn, child_conn = multiprocessing.Pipe() + pluginFork = PluginFork() + pluginFork.worker = multiprocessing.Process(target=plugin_fork, args=(child_conn,), daemon=True) + pluginFork.worker.start() + async def getFork(): + fd = os.dup(parent_conn.fileno()) + peer, readLoop = await rpc_reader.prepare_peer_readloop(self.loop, fd, fd) + peer.peerName = 'thread' + asyncio.run_coroutine_threadsafe(readLoop(), loop=self.loop) + getRemote = await peer.getParam('getRemote') + remote: PluginRemote = await getRemote(self.api, self.pluginId, self.hostInfo) + await remote.setSystemState(self.systemManager.getSystemState()) + for nativeId, ds in self.nativeIds.items(): + await remote.setNativeId(nativeId, ds.id, ds.storage) + forkOptions = (options or {}).copy() + forkOptions['fork'] = True + forkOptions['filename'] = zipPath + return await remote.loadZip(packageJson, zipData, forkOptions) + + + pluginFork.result = asyncio.create_task(getFork()) + return pluginFork + + sdk.fork = host_fork + sdk_init2(sdk) except: from scrypted_sdk import sdk_init # type: ignore sdk_init(zip, self, self.systemManager, - self.deviceManager, self.mediaManager) + self.deviceManager, self.mediaManager) + + if not forkMain: + try: + from main import create_scrypted_plugin # type: ignore + except: + print('plugin failed to start') + traceback.print_exc() + raise + return await rpc.maybe_await(create_scrypted_plugin()) + try: - from main import create_scrypted_plugin # type: ignore + from main import fork # type: ignore except: - print('plugin failed to start') + print('fork failed to start') traceback.print_exc() raise - return await rpc.maybe_await(create_scrypted_plugin()) + return await rpc.maybe_await(fork()) async def setSystemState(self, state): self.systemState = state @@ -414,75 +478,8 @@ class PluginRemote: async def getServicePort(self, name): pass - -async def readLoop(loop, peer: rpc.RpcPeer, reader): - deserializationContext = { - 'buffers': [] - } - - while True: - try: - lengthBytes = await reader.read(4) - typeBytes = await reader.read(1) - type = typeBytes[0] - length = int.from_bytes(lengthBytes, 'big') - data = await reader.read(length - 1) - - if type == 1: - deserializationContext['buffers'].append(data) - continue - - message = json.loads(data) - asyncio.run_coroutine_threadsafe(peer.handleMessage(message, deserializationContext), loop) - - deserializationContext = { - 'buffers': [] - } - except Exception as e: - print('read loop error', e) - sys.exit() - - -async def async_main(loop: AbstractEventLoop): - reader = await aiofiles.open(3, mode='rb') - - mutex = threading.Lock() - - def send(message, reject=None, serializationContext = None): - with mutex: - if serializationContext: - buffers = serializationContext.get('buffers', None) - if buffers: - for buffer in buffers: - length = len(buffer) + 1 - lb = length.to_bytes(4, 'big') - type = 1 - try: - os.write(4, lb) - os.write(4, bytes([type])) - os.write(4, buffer) - except Exception as e: - if reject: - reject(e) - return - - jsonString = json.dumps(message) - b = bytes(jsonString, 'utf8') - length = len(b) + 1 - lb = length.to_bytes(4, 'big') - type = 0 - try: - os.write(4, lb) - os.write(4, bytes([type])) - os.write(4, b) - except Exception as e: - if reject: - reject(e) - - peer = rpc.RpcPeer(send) - peer.nameDeserializerMap['Buffer'] = SidebandBufferSerializer() - peer.constructorSerializerMap[bytes] = 'Buffer' - peer.constructorSerializerMap[bytearray] = 'Buffer' +async def plugin_async_main(loop: AbstractEventLoop, readFd: int, writeFd: int): + peer, readLoop = await rpc_reader.prepare_peer_readloop(loop, readFd, writeFd) peer.params['print'] = print peer.params['getRemote'] = lambda api, pluginId, hostInfo: PluginRemote( api, pluginId, hostInfo, loop) @@ -499,7 +496,8 @@ async def async_main(loop: AbstractEventLoop): except: try: import resource - heapTotal = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + heapTotal = resource.getrusage( + resource.RUSAGE_SELF).ru_maxrss except: heapTotal = 0 stats = { @@ -519,9 +517,10 @@ async def async_main(loop: AbstractEventLoop): asyncio.run_coroutine_threadsafe(get_update_stats(), loop) - await readLoop(loop, peer, reader) + await readLoop() -def main(): + +def main(readFd: int, writeFd: int): loop = asyncio.new_event_loop() def gc_runner(): @@ -529,21 +528,30 @@ def main(): loop.call_later(10, gc_runner) gc_runner() - loop.run_until_complete(async_main(loop)) + loop.run_until_complete(plugin_async_main(loop, readFd, writeFd)) loop.close() +print('running') -if __name__ == "__main__": +def plugin_main(readFd: int, writeFd: int): try: import gi gi.require_version('Gst', '1.0') from gi.repository import GLib, Gst Gst.init(None) - worker = threading.Thread(target=main) + worker = threading.Thread(target=main, args=(readFd, writeFd)) worker.start() loop = GLib.MainLoop() loop.run() except: - main() + main(readFd, writeFd) + + +def plugin_fork(conn: multiprocessing.connection.Connection): + fd = os.dup(conn.fileno()) + plugin_main(fd, fd) + +if __name__ == "__main__": + plugin_main(3, 4) diff --git a/server/python/rpc_reader.py b/server/python/rpc_reader.py new file mode 100644 index 000000000..75d88a23a --- /dev/null +++ b/server/python/rpc_reader.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import asyncio +import base64 +import gc +import json +import sys +import os +import platform +import shutil +import subprocess +import threading +import time +import traceback +import zipfile +from asyncio.events import AbstractEventLoop +from asyncio.futures import Future +from asyncio.streams import StreamReader, StreamWriter +from collections.abc import Mapping +from io import StringIO +from os import sys +from typing import Any, List, Optional, Set, Tuple + +import aiofiles +import scrypted_python.scrypted_sdk.types +from scrypted_python.scrypted_sdk import ScryptedStatic, PluginFork +from scrypted_python.scrypted_sdk.types import Device, DeviceManifest, EventDetails, ScryptedInterfaceProperty, Storage +from typing_extensions import TypedDict +import rpc +import multiprocessing +import multiprocessing.connection + + +class BufferSerializer(rpc.RpcSerializer): + def serialize(self, value, serializationContext): + return base64.b64encode(value).decode('utf8') + + def deserialize(self, value, serializationContext): + return base64.b64decode(value) + + +class SidebandBufferSerializer(rpc.RpcSerializer): + def serialize(self, value, serializationContext): + buffers = serializationContext.get('buffers', None) + if not buffers: + buffers = [] + serializationContext['buffers'] = buffers + buffers.append(value) + return len(buffers) - 1 + + def deserialize(self, value, serializationContext): + buffers: List = serializationContext.get('buffers', None) + buffer = buffers.pop() + return buffer + +async def readLoop(loop, peer: rpc.RpcPeer, reader): + deserializationContext = { + 'buffers': [] + } + + while True: + try: + lengthBytes = await reader.read(4) + typeBytes = await reader.read(1) + type = typeBytes[0] + length = int.from_bytes(lengthBytes, 'big') + data = await reader.read(length - 1) + + if type == 1: + deserializationContext['buffers'].append(data) + continue + + message = json.loads(data) + asyncio.run_coroutine_threadsafe( + peer.handleMessage(message, deserializationContext), loop) + + deserializationContext = { + 'buffers': [] + } + except Exception as e: + print('read loop error: ' + peer.peerName, e) + sys.exit() + +async def prepare_peer_readloop(loop: AbstractEventLoop, readFd: int, writeFd: int): + reader = await aiofiles.open(readFd, mode='rb') + + mutex = threading.Lock() + + def send(message, reject=None, serializationContext=None): + with mutex: + if serializationContext: + buffers = serializationContext.get('buffers', None) + if buffers: + for buffer in buffers: + length = len(buffer) + 1 + lb = length.to_bytes(4, 'big') + type = 1 + try: + os.write(writeFd, lb) + os.write(writeFd, bytes([type])) + os.write(writeFd, buffer) + except Exception as e: + if reject: + reject(e) + return + + jsonString = json.dumps(message) + b = bytes(jsonString, 'utf8') + length = len(b) + 1 + lb = length.to_bytes(4, 'big') + type = 0 + try: + os.write(writeFd, lb) + os.write(writeFd, bytes([type])) + os.write(writeFd, b) + except Exception as e: + if reject: + reject(e) + + peer = rpc.RpcPeer(send) + peer.nameDeserializerMap['Buffer'] = SidebandBufferSerializer() + peer.constructorSerializerMap[bytes] = 'Buffer' + peer.constructorSerializerMap[bytearray] = 'Buffer' + + async def peerReadLoop(): + await readLoop(loop, peer, reader) + + return peer, peerReadLoop