mirror of
https://github.com/koush/scrypted.git
synced 2026-02-08 16:29:57 +00:00
sdk/server: wip python fork
This commit is contained in:
@@ -23,20 +23,20 @@ from typing import Any, List, Optional, Set, Tuple
|
||||
|
||||
import aiofiles
|
||||
import scrypted_python.scrypted_sdk.types
|
||||
from scrypted_python.scrypted_sdk import ScryptedStatic
|
||||
from scrypted_python.scrypted_sdk.types import (Device, DeviceManifest,
|
||||
EventDetails,
|
||||
ScryptedInterfaceProperty,
|
||||
Storage)
|
||||
from scrypted_python.scrypted_sdk import ScryptedStatic, PluginFork
|
||||
from scrypted_python.scrypted_sdk.types import Device, DeviceManifest, EventDetails, ScryptedInterfaceProperty, Storage
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
import rpc
|
||||
import rpc_reader
|
||||
import multiprocessing
|
||||
import multiprocessing.connection
|
||||
|
||||
class SystemDeviceState(TypedDict):
|
||||
lastEventTime: int
|
||||
stateTime: int
|
||||
value: any
|
||||
|
||||
|
||||
class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager):
|
||||
def __init__(self, api: Any, systemState: Mapping[str, Mapping[str, SystemDeviceState]]) -> None:
|
||||
super().__init__()
|
||||
@@ -46,6 +46,7 @@ class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager):
|
||||
async def getComponent(self, id: str) -> Any:
|
||||
return await self.api.getComponent(id)
|
||||
|
||||
|
||||
class MediaObjectRemote(scrypted_python.scrypted_sdk.types.MediaObject):
|
||||
def __init__(self, data, mimeType, sourceId):
|
||||
self.mimeType = mimeType
|
||||
@@ -58,38 +59,52 @@ class MediaObjectRemote(scrypted_python.scrypted_sdk.types.MediaObject):
|
||||
async def getData(self):
|
||||
return self.data
|
||||
|
||||
|
||||
class MediaManager:
|
||||
def __init__(self, mediaManager: scrypted_python.scrypted_sdk.types.MediaManager):
|
||||
self.mediaManager = mediaManager
|
||||
|
||||
async def addConverter(self, converter: scrypted_python.scrypted_sdk.types.BufferConverter) -> None:
|
||||
return await self.mediaManager.addConverter(converter)
|
||||
|
||||
async def clearConverters(self) -> None:
|
||||
return await self.mediaManager.clearConverters()
|
||||
|
||||
async def convertMediaObject(self, mediaObject: scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> Any:
|
||||
return await self.mediaManager.convertMediaObject(mediaObject, toMimeType)
|
||||
|
||||
async def convertMediaObjectToBuffer(self, mediaObject: scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> bytearray:
|
||||
return await self.mediaManager.convertMediaObjectToBuffer(mediaObject, toMimeType)
|
||||
|
||||
async def convertMediaObjectToInsecureLocalUrl(self, mediaObject: str | scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> str:
|
||||
return await self.mediaManager.convertMediaObjectToInsecureLocalUrl(mediaObject, toMimeType)
|
||||
|
||||
async def convertMediaObjectToJSON(self, mediaObject: scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> Any:
|
||||
return await self.mediaManager.convertMediaObjectToJSON(mediaObject, toMimeType)
|
||||
|
||||
async def convertMediaObjectToLocalUrl(self, mediaObject: str | scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> str:
|
||||
return await self.mediaManager.convertMediaObjectToLocalUrl(mediaObject, toMimeType)
|
||||
|
||||
async def convertMediaObjectToUrl(self, mediaObject: str | scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> str:
|
||||
return await self.mediaManager.convertMediaObjectToUrl(mediaObject, toMimeType)
|
||||
|
||||
async def createFFmpegMediaObject(self, ffmpegInput: scrypted_python.scrypted_sdk.types.FFmpegInput, options: scrypted_python.scrypted_sdk.types.MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject:
|
||||
return await self.mediaManager.createFFmpegMediaObject(ffmpegInput, options)
|
||||
|
||||
async def createMediaObject(self, data: Any, mimeType: str, options: scrypted_python.scrypted_sdk.types.MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject:
|
||||
# return await self.createMediaObject(data, mimetypes, options)
|
||||
return MediaObjectRemote(data, mimeType, options.get('sourceId', None) if options else None)
|
||||
async def createMediaObjectFromUrl(self, data: str, options:scrypted_python.scrypted_sdk.types. MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject:
|
||||
|
||||
async def createMediaObjectFromUrl(self, data: str, options: scrypted_python.scrypted_sdk.types. MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject:
|
||||
return await self.mediaManager.createMediaObjectFromUrl(data, options)
|
||||
|
||||
async def getFFmpegPath(self) -> str:
|
||||
return await self.mediaManager.getFFmpegPath()
|
||||
|
||||
async def getFilesPath(self) -> str:
|
||||
return await self.mediaManager.getFilesPath()
|
||||
|
||||
|
||||
class DeviceState(scrypted_python.scrypted_sdk.types.DeviceState):
|
||||
def __init__(self, id: str, nativeId: str, systemManager: SystemManager, deviceManager: scrypted_python.scrypted_sdk.types.DeviceManager) -> None:
|
||||
super().__init__()
|
||||
@@ -157,6 +172,7 @@ class DeviceStorage(Storage):
|
||||
self.storage = {}
|
||||
self.update_storage()
|
||||
|
||||
|
||||
class DeviceManager(scrypted_python.scrypted_sdk.types.DeviceManager):
|
||||
def __init__(self, nativeIds: Mapping[str, DeviceStorage], systemManager: SystemManager) -> None:
|
||||
super().__init__()
|
||||
@@ -188,6 +204,7 @@ class DeviceManager(scrypted_python.scrypted_sdk.types.DeviceManager):
|
||||
def getDeviceStorage(self, nativeId: str = None) -> Storage:
|
||||
return self.nativeIds.get(nativeId, None)
|
||||
|
||||
|
||||
class BufferSerializer(rpc.RpcSerializer):
|
||||
def serialize(self, value, serializationContext):
|
||||
return base64.b64encode(value).decode('utf8')
|
||||
@@ -210,6 +227,7 @@ class SidebandBufferSerializer(rpc.RpcSerializer):
|
||||
buffer = buffers.pop()
|
||||
return buffer
|
||||
|
||||
|
||||
class PluginRemote:
|
||||
systemState: Mapping[str, Mapping[str, SystemDeviceState]] = {}
|
||||
nativeIds: Mapping[str, DeviceStorage] = {}
|
||||
@@ -263,85 +281,94 @@ class PluginRemote:
|
||||
asyncio.run_coroutine_threadsafe(self.print_async(
|
||||
nativeId, *values, sep=sep, end=end, flush=flush), self.loop)
|
||||
|
||||
async def loadZip(self, packageJson, zipData, options=None):
|
||||
zipPath: str
|
||||
async def loadZip(self, packageJson, zipData, options: dict=None):
|
||||
forkMain = options and options.get('fork')
|
||||
|
||||
if isinstance(zipData, str):
|
||||
zipPath = (options and options.get('filename', None)) or zipData
|
||||
if zipPath != zipData:
|
||||
shutil.copyfile(zipData, zipPath)
|
||||
else:
|
||||
zipPath = options['filename']
|
||||
f = open(zipPath, 'wb')
|
||||
f.write(zipData)
|
||||
f.close()
|
||||
if not forkMain:
|
||||
multiprocessing.set_start_method('spawn')
|
||||
|
||||
zipData = None
|
||||
zipPath: str
|
||||
|
||||
zip = zipfile.ZipFile(zipPath)
|
||||
|
||||
plugin_volume = os.environ.get('SCRYPTED_PLUGIN_VOLUME')
|
||||
|
||||
python_version = 'python%s' % str(
|
||||
sys.version_info[0])+"."+str(sys.version_info[1])
|
||||
print('python version:', python_version)
|
||||
|
||||
python_prefix = os.path.join(plugin_volume, '%s-%s-%s' % (python_version, platform.system(), platform.machine()))
|
||||
if not os.path.exists(python_prefix):
|
||||
os.makedirs(python_prefix)
|
||||
|
||||
if 'requirements.txt' in zip.namelist():
|
||||
requirements = zip.open('requirements.txt').read()
|
||||
str_requirements = requirements.decode('utf8')
|
||||
|
||||
requirementstxt = os.path.join(python_prefix, 'requirements.txt')
|
||||
installed_requirementstxt = os.path.join(
|
||||
python_prefix, 'requirements.installed.txt')
|
||||
|
||||
need_pip = True
|
||||
try:
|
||||
existing = open(installed_requirementstxt).read()
|
||||
need_pip = existing != str_requirements
|
||||
except:
|
||||
pass
|
||||
|
||||
if need_pip:
|
||||
print('requirements.txt (outdated)')
|
||||
print(str_requirements)
|
||||
|
||||
f = open(requirementstxt, 'wb')
|
||||
f.write(requirements)
|
||||
f.close()
|
||||
|
||||
p = subprocess.Popen([sys.executable, '-m', 'pip', 'install', '-r', requirementstxt,
|
||||
'--prefix', python_prefix], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
while True:
|
||||
line = p.stdout.readline()
|
||||
if not line:
|
||||
break
|
||||
line = line.decode('utf8').rstrip('\r\n')
|
||||
print(line)
|
||||
result = p.wait()
|
||||
print('pip install result %s' % result)
|
||||
if result:
|
||||
raise Exception('non-zero result from pip %s' % result)
|
||||
|
||||
f = open(installed_requirementstxt, 'wb')
|
||||
f.write(requirements)
|
||||
f.close()
|
||||
if isinstance(zipData, str):
|
||||
zipPath = (options and options.get('filename', None)) or zipData
|
||||
if zipPath != zipData:
|
||||
shutil.copyfile(zipData, zipPath)
|
||||
else:
|
||||
print('requirements.txt (up to date)')
|
||||
print(str_requirements)
|
||||
zipPath = options['filename']
|
||||
f = open(zipPath, 'wb')
|
||||
f.write(zipData)
|
||||
f.close()
|
||||
|
||||
sys.path.insert(0, zipPath)
|
||||
if platform.system() != 'Windows':
|
||||
site_packages = os.path.join(
|
||||
python_prefix, 'lib', python_version, 'site-packages')
|
||||
zipData = None
|
||||
|
||||
zip = zipfile.ZipFile(zipPath)
|
||||
|
||||
plugin_volume = os.environ.get('SCRYPTED_PLUGIN_VOLUME')
|
||||
|
||||
python_version = 'python%s' % str(
|
||||
sys.version_info[0])+"."+str(sys.version_info[1])
|
||||
print('python version:', python_version)
|
||||
|
||||
python_prefix = os.path.join(
|
||||
plugin_volume, '%s-%s-%s' % (python_version, platform.system(), platform.machine()))
|
||||
if not os.path.exists(python_prefix):
|
||||
os.makedirs(python_prefix)
|
||||
|
||||
if 'requirements.txt' in zip.namelist():
|
||||
requirements = zip.open('requirements.txt').read()
|
||||
str_requirements = requirements.decode('utf8')
|
||||
|
||||
requirementstxt = os.path.join(python_prefix, 'requirements.txt')
|
||||
installed_requirementstxt = os.path.join(
|
||||
python_prefix, 'requirements.installed.txt')
|
||||
|
||||
need_pip = True
|
||||
try:
|
||||
existing = open(installed_requirementstxt).read()
|
||||
need_pip = existing != str_requirements
|
||||
except:
|
||||
pass
|
||||
|
||||
if need_pip:
|
||||
print('requirements.txt (outdated)')
|
||||
print(str_requirements)
|
||||
|
||||
f = open(requirementstxt, 'wb')
|
||||
f.write(requirements)
|
||||
f.close()
|
||||
|
||||
p = subprocess.Popen([sys.executable, '-m', 'pip', 'install', '-r', requirementstxt,
|
||||
'--prefix', python_prefix], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
while True:
|
||||
line = p.stdout.readline()
|
||||
if not line:
|
||||
break
|
||||
line = line.decode('utf8').rstrip('\r\n')
|
||||
print(line)
|
||||
result = p.wait()
|
||||
print('pip install result %s' % result)
|
||||
if result:
|
||||
raise Exception('non-zero result from pip %s' % result)
|
||||
|
||||
f = open(installed_requirementstxt, 'wb')
|
||||
f.write(requirements)
|
||||
f.close()
|
||||
else:
|
||||
print('requirements.txt (up to date)')
|
||||
print(str_requirements)
|
||||
|
||||
sys.path.insert(0, zipPath)
|
||||
if platform.system() != 'Windows':
|
||||
site_packages = os.path.join(
|
||||
python_prefix, 'lib', python_version, 'site-packages')
|
||||
else:
|
||||
site_packages = os.path.join(
|
||||
python_prefix, 'Lib', 'site-packages')
|
||||
print('site-packages: %s' % site_packages)
|
||||
sys.path.insert(0, site_packages)
|
||||
else:
|
||||
site_packages = os.path.join(
|
||||
python_prefix, 'Lib', 'site-packages')
|
||||
print('site-packages: %s' % site_packages)
|
||||
sys.path.insert(0, site_packages)
|
||||
zip = zipfile.ZipFile(options['filename'])
|
||||
|
||||
self.systemManager = SystemManager(self.api, self.systemState)
|
||||
self.deviceManager = DeviceManager(self.nativeIds, self.systemManager)
|
||||
self.mediaManager = MediaManager(await self.api.getMediaManager())
|
||||
@@ -356,18 +383,55 @@ class PluginRemote:
|
||||
sdk.remote = self
|
||||
sdk.api = self.api
|
||||
sdk.zip = zip
|
||||
|
||||
def host_fork() -> PluginFork:
|
||||
parent_conn, child_conn = multiprocessing.Pipe()
|
||||
pluginFork = PluginFork()
|
||||
pluginFork.worker = multiprocessing.Process(target=plugin_fork, args=(child_conn,), daemon=True)
|
||||
pluginFork.worker.start()
|
||||
async def getFork():
|
||||
fd = os.dup(parent_conn.fileno())
|
||||
peer, readLoop = await rpc_reader.prepare_peer_readloop(self.loop, fd, fd)
|
||||
peer.peerName = 'thread'
|
||||
asyncio.run_coroutine_threadsafe(readLoop(), loop=self.loop)
|
||||
getRemote = await peer.getParam('getRemote')
|
||||
remote: PluginRemote = await getRemote(self.api, self.pluginId, self.hostInfo)
|
||||
await remote.setSystemState(self.systemManager.getSystemState())
|
||||
for nativeId, ds in self.nativeIds.items():
|
||||
await remote.setNativeId(nativeId, ds.id, ds.storage)
|
||||
forkOptions = (options or {}).copy()
|
||||
forkOptions['fork'] = True
|
||||
forkOptions['filename'] = zipPath
|
||||
return await remote.loadZip(packageJson, zipData, forkOptions)
|
||||
|
||||
|
||||
pluginFork.result = asyncio.create_task(getFork())
|
||||
return pluginFork
|
||||
|
||||
sdk.fork = host_fork
|
||||
|
||||
sdk_init2(sdk)
|
||||
except:
|
||||
from scrypted_sdk import sdk_init # type: ignore
|
||||
sdk_init(zip, self, self.systemManager,
|
||||
self.deviceManager, self.mediaManager)
|
||||
self.deviceManager, self.mediaManager)
|
||||
|
||||
if not forkMain:
|
||||
try:
|
||||
from main import create_scrypted_plugin # type: ignore
|
||||
except:
|
||||
print('plugin failed to start')
|
||||
traceback.print_exc()
|
||||
raise
|
||||
return await rpc.maybe_await(create_scrypted_plugin())
|
||||
|
||||
try:
|
||||
from main import create_scrypted_plugin # type: ignore
|
||||
from main import fork # type: ignore
|
||||
except:
|
||||
print('plugin failed to start')
|
||||
print('fork failed to start')
|
||||
traceback.print_exc()
|
||||
raise
|
||||
return await rpc.maybe_await(create_scrypted_plugin())
|
||||
return await rpc.maybe_await(fork())
|
||||
|
||||
async def setSystemState(self, state):
|
||||
self.systemState = state
|
||||
@@ -414,75 +478,8 @@ class PluginRemote:
|
||||
async def getServicePort(self, name):
|
||||
pass
|
||||
|
||||
|
||||
async def readLoop(loop, peer: rpc.RpcPeer, reader):
|
||||
deserializationContext = {
|
||||
'buffers': []
|
||||
}
|
||||
|
||||
while True:
|
||||
try:
|
||||
lengthBytes = await reader.read(4)
|
||||
typeBytes = await reader.read(1)
|
||||
type = typeBytes[0]
|
||||
length = int.from_bytes(lengthBytes, 'big')
|
||||
data = await reader.read(length - 1)
|
||||
|
||||
if type == 1:
|
||||
deserializationContext['buffers'].append(data)
|
||||
continue
|
||||
|
||||
message = json.loads(data)
|
||||
asyncio.run_coroutine_threadsafe(peer.handleMessage(message, deserializationContext), loop)
|
||||
|
||||
deserializationContext = {
|
||||
'buffers': []
|
||||
}
|
||||
except Exception as e:
|
||||
print('read loop error', e)
|
||||
sys.exit()
|
||||
|
||||
|
||||
async def async_main(loop: AbstractEventLoop):
|
||||
reader = await aiofiles.open(3, mode='rb')
|
||||
|
||||
mutex = threading.Lock()
|
||||
|
||||
def send(message, reject=None, serializationContext = None):
|
||||
with mutex:
|
||||
if serializationContext:
|
||||
buffers = serializationContext.get('buffers', None)
|
||||
if buffers:
|
||||
for buffer in buffers:
|
||||
length = len(buffer) + 1
|
||||
lb = length.to_bytes(4, 'big')
|
||||
type = 1
|
||||
try:
|
||||
os.write(4, lb)
|
||||
os.write(4, bytes([type]))
|
||||
os.write(4, buffer)
|
||||
except Exception as e:
|
||||
if reject:
|
||||
reject(e)
|
||||
return
|
||||
|
||||
jsonString = json.dumps(message)
|
||||
b = bytes(jsonString, 'utf8')
|
||||
length = len(b) + 1
|
||||
lb = length.to_bytes(4, 'big')
|
||||
type = 0
|
||||
try:
|
||||
os.write(4, lb)
|
||||
os.write(4, bytes([type]))
|
||||
os.write(4, b)
|
||||
except Exception as e:
|
||||
if reject:
|
||||
reject(e)
|
||||
|
||||
peer = rpc.RpcPeer(send)
|
||||
peer.nameDeserializerMap['Buffer'] = SidebandBufferSerializer()
|
||||
peer.constructorSerializerMap[bytes] = 'Buffer'
|
||||
peer.constructorSerializerMap[bytearray] = 'Buffer'
|
||||
async def plugin_async_main(loop: AbstractEventLoop, readFd: int, writeFd: int):
|
||||
peer, readLoop = await rpc_reader.prepare_peer_readloop(loop, readFd, writeFd)
|
||||
peer.params['print'] = print
|
||||
peer.params['getRemote'] = lambda api, pluginId, hostInfo: PluginRemote(
|
||||
api, pluginId, hostInfo, loop)
|
||||
@@ -499,7 +496,8 @@ async def async_main(loop: AbstractEventLoop):
|
||||
except:
|
||||
try:
|
||||
import resource
|
||||
heapTotal = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
|
||||
heapTotal = resource.getrusage(
|
||||
resource.RUSAGE_SELF).ru_maxrss
|
||||
except:
|
||||
heapTotal = 0
|
||||
stats = {
|
||||
@@ -519,9 +517,10 @@ async def async_main(loop: AbstractEventLoop):
|
||||
|
||||
asyncio.run_coroutine_threadsafe(get_update_stats(), loop)
|
||||
|
||||
await readLoop(loop, peer, reader)
|
||||
await readLoop()
|
||||
|
||||
def main():
|
||||
|
||||
def main(readFd: int, writeFd: int):
|
||||
loop = asyncio.new_event_loop()
|
||||
|
||||
def gc_runner():
|
||||
@@ -529,21 +528,30 @@ def main():
|
||||
loop.call_later(10, gc_runner)
|
||||
gc_runner()
|
||||
|
||||
loop.run_until_complete(async_main(loop))
|
||||
loop.run_until_complete(plugin_async_main(loop, readFd, writeFd))
|
||||
loop.close()
|
||||
|
||||
print('running')
|
||||
|
||||
if __name__ == "__main__":
|
||||
def plugin_main(readFd: int, writeFd: int):
|
||||
try:
|
||||
import gi
|
||||
gi.require_version('Gst', '1.0')
|
||||
from gi.repository import GLib, Gst
|
||||
Gst.init(None)
|
||||
|
||||
worker = threading.Thread(target=main)
|
||||
worker = threading.Thread(target=main, args=(readFd, writeFd))
|
||||
worker.start()
|
||||
|
||||
loop = GLib.MainLoop()
|
||||
loop.run()
|
||||
except:
|
||||
main()
|
||||
main(readFd, writeFd)
|
||||
|
||||
|
||||
def plugin_fork(conn: multiprocessing.connection.Connection):
|
||||
fd = os.dup(conn.fileno())
|
||||
plugin_main(fd, fd)
|
||||
|
||||
if __name__ == "__main__":
|
||||
plugin_main(3, 4)
|
||||
|
||||
128
server/python/rpc_reader.py
Normal file
128
server/python/rpc_reader.py
Normal file
@@ -0,0 +1,128 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import gc
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
import platform
|
||||
import shutil
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
import zipfile
|
||||
from asyncio.events import AbstractEventLoop
|
||||
from asyncio.futures import Future
|
||||
from asyncio.streams import StreamReader, StreamWriter
|
||||
from collections.abc import Mapping
|
||||
from io import StringIO
|
||||
from os import sys
|
||||
from typing import Any, List, Optional, Set, Tuple
|
||||
|
||||
import aiofiles
|
||||
import scrypted_python.scrypted_sdk.types
|
||||
from scrypted_python.scrypted_sdk import ScryptedStatic, PluginFork
|
||||
from scrypted_python.scrypted_sdk.types import Device, DeviceManifest, EventDetails, ScryptedInterfaceProperty, Storage
|
||||
from typing_extensions import TypedDict
|
||||
import rpc
|
||||
import multiprocessing
|
||||
import multiprocessing.connection
|
||||
|
||||
|
||||
class BufferSerializer(rpc.RpcSerializer):
|
||||
def serialize(self, value, serializationContext):
|
||||
return base64.b64encode(value).decode('utf8')
|
||||
|
||||
def deserialize(self, value, serializationContext):
|
||||
return base64.b64decode(value)
|
||||
|
||||
|
||||
class SidebandBufferSerializer(rpc.RpcSerializer):
|
||||
def serialize(self, value, serializationContext):
|
||||
buffers = serializationContext.get('buffers', None)
|
||||
if not buffers:
|
||||
buffers = []
|
||||
serializationContext['buffers'] = buffers
|
||||
buffers.append(value)
|
||||
return len(buffers) - 1
|
||||
|
||||
def deserialize(self, value, serializationContext):
|
||||
buffers: List = serializationContext.get('buffers', None)
|
||||
buffer = buffers.pop()
|
||||
return buffer
|
||||
|
||||
async def readLoop(loop, peer: rpc.RpcPeer, reader):
|
||||
deserializationContext = {
|
||||
'buffers': []
|
||||
}
|
||||
|
||||
while True:
|
||||
try:
|
||||
lengthBytes = await reader.read(4)
|
||||
typeBytes = await reader.read(1)
|
||||
type = typeBytes[0]
|
||||
length = int.from_bytes(lengthBytes, 'big')
|
||||
data = await reader.read(length - 1)
|
||||
|
||||
if type == 1:
|
||||
deserializationContext['buffers'].append(data)
|
||||
continue
|
||||
|
||||
message = json.loads(data)
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
peer.handleMessage(message, deserializationContext), loop)
|
||||
|
||||
deserializationContext = {
|
||||
'buffers': []
|
||||
}
|
||||
except Exception as e:
|
||||
print('read loop error: ' + peer.peerName, e)
|
||||
sys.exit()
|
||||
|
||||
async def prepare_peer_readloop(loop: AbstractEventLoop, readFd: int, writeFd: int):
|
||||
reader = await aiofiles.open(readFd, mode='rb')
|
||||
|
||||
mutex = threading.Lock()
|
||||
|
||||
def send(message, reject=None, serializationContext=None):
|
||||
with mutex:
|
||||
if serializationContext:
|
||||
buffers = serializationContext.get('buffers', None)
|
||||
if buffers:
|
||||
for buffer in buffers:
|
||||
length = len(buffer) + 1
|
||||
lb = length.to_bytes(4, 'big')
|
||||
type = 1
|
||||
try:
|
||||
os.write(writeFd, lb)
|
||||
os.write(writeFd, bytes([type]))
|
||||
os.write(writeFd, buffer)
|
||||
except Exception as e:
|
||||
if reject:
|
||||
reject(e)
|
||||
return
|
||||
|
||||
jsonString = json.dumps(message)
|
||||
b = bytes(jsonString, 'utf8')
|
||||
length = len(b) + 1
|
||||
lb = length.to_bytes(4, 'big')
|
||||
type = 0
|
||||
try:
|
||||
os.write(writeFd, lb)
|
||||
os.write(writeFd, bytes([type]))
|
||||
os.write(writeFd, b)
|
||||
except Exception as e:
|
||||
if reject:
|
||||
reject(e)
|
||||
|
||||
peer = rpc.RpcPeer(send)
|
||||
peer.nameDeserializerMap['Buffer'] = SidebandBufferSerializer()
|
||||
peer.constructorSerializerMap[bytes] = 'Buffer'
|
||||
peer.constructorSerializerMap[bytearray] = 'Buffer'
|
||||
|
||||
async def peerReadLoop():
|
||||
await readLoop(loop, peer, reader)
|
||||
|
||||
return peer, peerReadLoop
|
||||
Reference in New Issue
Block a user