From c2d86237d668f02ed11aaf754ebec91d02ef39e4 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Thu, 21 Dec 2023 21:55:34 -0800 Subject: [PATCH] wyze: refactor for multiprocessing --- plugins/wyze/docker-wyze-bridge | 2 +- plugins/wyze/package-lock.json | 4 +- plugins/wyze/package.json | 2 +- plugins/wyze/src/main.py | 642 +++++++++++++++----------------- 4 files changed, 298 insertions(+), 352 deletions(-) diff --git a/plugins/wyze/docker-wyze-bridge b/plugins/wyze/docker-wyze-bridge index ff09cca5b..043f7227c 160000 --- a/plugins/wyze/docker-wyze-bridge +++ b/plugins/wyze/docker-wyze-bridge @@ -1 +1 @@ -Subproject commit ff09cca5b8f78461de5e6281a4d2082d3d905eb6 +Subproject commit 043f7227c5afe3f346136dbeeb5285241a9f89b1 diff --git a/plugins/wyze/package-lock.json b/plugins/wyze/package-lock.json index ead2c5e52..4f3e7971c 100644 --- a/plugins/wyze/package-lock.json +++ b/plugins/wyze/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/wyze", - "version": "0.0.11", + "version": "0.0.31", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/wyze", - "version": "0.0.11", + "version": "0.0.31", "devDependencies": { "@scrypted/sdk": "file:../../sdk" } diff --git a/plugins/wyze/package.json b/plugins/wyze/package.json index 789e499ae..7e24e9302 100644 --- a/plugins/wyze/package.json +++ b/plugins/wyze/package.json @@ -33,5 +33,5 @@ "devDependencies": { "@scrypted/sdk": "file:../../sdk" }, - "version": "0.0.11" + "version": "0.0.31" } diff --git a/plugins/wyze/src/main.py b/plugins/wyze/src/main.py index 780a75e7d..3d127ea55 100644 --- a/plugins/wyze/src/main.py +++ b/plugins/wyze/src/main.py @@ -1,50 +1,40 @@ from __future__ import annotations -from typing import Any, Coroutine, List, Dict, Callable, Iterator, MutableSet -import scrypted_sdk + import asyncio -import urllib.request +import base64 +import concurrent.futures +import json import os -import urllib -import sys import platform +import queue +import struct +import subprocess +import sys +import threading +import traceback +import urllib +import urllib.request +from ctypes import c_int +from typing import Any, Callable, Coroutine, Dict, Iterator, List, MutableSet + +import scrypted_sdk from scrypted_sdk.other import MediaObject +from scrypted_sdk.types import (DeviceProvider, PanTiltZoom, + RequestMediaStreamOptions, + ResponseMediaStreamOptions, ScryptedDeviceType, + ScryptedInterface, Setting, Settings, + VideoCamera) + import wyzecam import wyzecam.api_models -import json -import threading -import queue -import traceback -from ctypes import c_int -import concurrent.futures -import subprocess -import base64 -import struct - -from wyzecam.tutk.tutk import ( - FRAME_SIZE_2K, - FRAME_SIZE_1080P, - FRAME_SIZE_360P, -) - -from scrypted_sdk.types import ( - DeviceProvider, - RequestMediaStreamOptions, - ResponseMediaStreamOptions, - VideoCamera, - ScryptedDeviceType, - ScryptedInterface, - Settings, - Setting, -) +from wyzecam.tutk.tutk import FRAME_SIZE_2K, FRAME_SIZE_360P, FRAME_SIZE_1080P os.environ["TUTK_PROJECT_ROOT"] = os.path.join( os.environ["SCRYPTED_PLUGIN_VOLUME"], "zip/unzipped/fs" ) sdkKey = "AQAAAIZ44fijz5pURQiNw4xpEfV9ZysFH8LYBPDxiONQlbLKaDeb7n26TSOPSGHftbRVo25k3uz5of06iGNB4pSfmvsCvm/tTlmML6HKS0vVxZnzEuK95TPGEGt+aE15m6fjtRXQKnUav59VSRHwRj9Z1Kjm1ClfkSPUF5NfUvsb3IAbai0WlzZE1yYCtks7NFRMbTXUMq3bFtNhEERD/7oc504b" -toThreadExecutor = concurrent.futures.ThreadPoolExecutor( - max_workers=2, thread_name_prefix="image" -) +toThreadExecutor = concurrent.futures.ThreadPoolExecutor(thread_name_prefix="probe") codecMap = { "mulaw": "PCMU", @@ -55,6 +45,15 @@ codecMap = { } +def print_exception(print, e): + for line in traceback.format_exception(e): + print(line) + + +def format_exception(e): + return "\n".join(traceback.format_exception(e)) + + async def to_thread(f): loop = asyncio.get_running_loop() return await loop.run_in_executor(toThreadExecutor, f) @@ -79,12 +78,9 @@ class CodecInfo: self.audioSampleRate = audioSampleRate -class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings): +class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings, PanTiltZoom): camera: wyzecam.WyzeCamera plugin: WyzePlugin - streams: MutableSet[wyzecam.WyzeIOTCSession] - activeStream: wyzecam.WyzeIOTCSession - audioQueues: MutableSet[queue.Queue[tuple[bytes, Any]]] main: CodecInfo sub: CodecInfo @@ -101,13 +97,8 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings): self.main = None self.sub = None self.mainFrameSize = FRAME_SIZE_2K if camera.is_2k else FRAME_SIZE_1080P - self.subByterate = 30 + self.subByteRate = 30 - self.mainServer = asyncio.ensure_future(self.ensureServer(self.handleClientHD)) - self.subServer = asyncio.ensure_future(self.ensureServer(self.handleClientSD)) - self.audioServer = asyncio.ensure_future( - self.ensureServer(self.handleAudioClient) - ) self.rfcServer = asyncio.ensure_future( self.ensureServer(self.handleMainRfcClient) ) @@ -115,15 +106,27 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings): self.ensureServer(self.handleSubRfcClient) ) + if camera.is_pan_cam: + self.ptzCapabilities = { + "pan": True, + "tilt": True, + } + + async def ptzCommand(self, command: scrypted_sdk.PanTiltZoomCommand) -> None: + pass + def safeParseJsonStorage(self, key: str): try: return json.loads(self.storage.getItem(key)) except: return None - + + def getMuted(self): + return False + def getMainByteRate(self, default=False): try: - bit = int(self.safeParseJsonStorage("bitrate")) + bit = int(self.safeParseJsonStorage("byterate")) bit = round(bit / 8) bit = bit if 1 <= bit <= 255 else 0 if not bit: @@ -143,7 +146,7 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings): "key": "bitrate", "title": "Main Stream Bitrate", "description": "The bitrate used by the main stream.", - "value": self.safeParseJsonStorage("bitrate"), + "value": self.safeParseJsonStorage("byterate"), "combobox": True, "value": str(self.getMainByteRate(True)), "choices": [ @@ -168,44 +171,6 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings): self.nativeId, ScryptedInterface.VideoCamera.value, None ) - async def handleClientHD( - self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter - ): - return await self.handleClient( - self.plugin.account.model_copy(), - self.mainFrameSize, - self.getMainByteRate(), - reader, - writer, - ) - - async def handleClientSD( - self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter - ): - account = self.plugin.account.model_copy() - # wyze cams will disconnect first stream if the phone id requests a second stream. - # use a different substream phone id, similar to how docker wyze bridge does it. - account.phone_id = account.phone_id[2:] - return await self.handleClient( - account, - FRAME_SIZE_360P, - self.subByterate, - reader, - writer, - ) - - def receiveAudioData(self): - q: queue.Queue[tuple[bytes, Any]] = queue.Queue() - self.audioQueues.add(q) - try: - while True: - b, info = q.get() - if not b: - return - yield b, info - finally: - self.audioQueues.remove(q) - async def handleMainRfcClient( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter ): @@ -223,9 +188,11 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings): writer: asyncio.StreamWriter, ): info = self.sub if substream else self.main - - port = await self.subServer if substream else await self.mainServer - audioPort = await self.audioServer + try: + ffmpeg = await scrypted_sdk.mediaManager.getFFmpegPath() + except Exception as e: + raise e + loop = asyncio.get_event_loop() class Protocol: def __init__(self, pt: int) -> None: @@ -240,44 +207,41 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings): writer.write(len_data) writer.write(data) - ffmpeg = await scrypted_sdk.mediaManager.getFFmpegPath() - loop = asyncio.get_event_loop() - vt, vp = await loop.create_datagram_endpoint( lambda: Protocol(96), local_addr=("127.0.0.1", 0) ) vhost, vport = vt._sock.getsockname() - vprocess = subprocess.Popen( - [ - ffmpeg, - "-analyzeduration", - "0", - "-probesize", - "100k", - "-f", - "h264", - "-i", - f"tcp://127.0.0.1:{port}", - "-vcodec", - "copy", - "-an", - "-f", - "rtp", - "-payload_type", - "96", - f"rtp://127.0.0.1:{vport}?pkt_size=1300", - ] + vprocess = await asyncio.create_subprocess_exec( + ffmpeg, + "-analyzeduration", + "0", + "-probesize", + "100k", + "-f", + "h264", + "-i", + "pipe:0", + "-vcodec", + "copy", + "-an", + "-f", + "rtp", + "-payload_type", + "96", + f"rtp://127.0.0.1:{vport}?pkt_size=1300", + stdin=asyncio.subprocess.PIPE, ) - at, ap = await loop.create_datagram_endpoint( - lambda: Protocol(97), local_addr=("127.0.0.1", 0) - ) + aprocess: asyncio.subprocess.Process = None + if not self.getMuted(): + at, ap = await loop.create_datagram_endpoint( + lambda: Protocol(97), local_addr=("127.0.0.1", 0) + ) - ahost, aport = at._sock.getsockname() + ahost, aport = at._sock.getsockname() - aprocess = subprocess.Popen( - [ + aprocess = await asyncio.create_subprocess_exec( ffmpeg, "-analyzeduration", "0", @@ -288,7 +252,7 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings): "-ar", f"{info.audioSampleRate}", "-i", - f"tcp://127.0.0.1:{audioPort}", + "pipe:0", "-acodec", "copy", "-vn", @@ -297,159 +261,29 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings): "-payload_type", "97", f"rtp://127.0.0.1:{aport}?pkt_size=1300", - ] - ) + stdin=asyncio.subprocess.PIPE, + ) try: - while True: - buffer = await reader.read() - if not len(buffer): + print("stuffing") + async for audio, data, codec, sampleRate in self.forkAndStream(substream): + if writer.is_closing(): return + + p = aprocess if audio else vprocess + if p: + p.stdin.write(data) + await p.stdin.drain() except Exception as e: - traceback.print_exception(e) + print_exception(self.print, e) finally: - self.print("rfc reader closed") - - # aprocess.stdin.write("q\n") - aprocess.terminate() - - # vprocess.stdin.write("q\n") - vprocess.terminate() - - async def handleAudioClient( - self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter - ): - loop = asyncio.get_event_loop() - closed = False - q = queue.Queue() - - async def write(): - nonlocal closed - d = q.get() - if closed: - pass - if not d or closed: - closed = True - writer.close() - else: - writer.write(d) - - def run(): - try: - for frame, frame_info in self.receiveAudioData(): - if closed: - return - q.put(frame) - asyncio.run_coroutine_threadsafe(write(), loop=loop) - - except Exception as e: - traceback.print_exception(e) - finally: - self.print("audio session closed") - q.put(None) - - thread = threading.Thread(target=run) - thread.start() - - try: - while True: - buffer = await reader.read() - if not len(buffer): - return - except Exception as e: - traceback.print_exception(e) - finally: - self.print("audio reader closed") - closed = True - - async def handleClient( - self, - account: wyzecam.WyzeAccount, - frameSize, - bitrate, - reader: asyncio.StreamReader, - writer: asyncio.StreamWriter, - ): - loop = asyncio.get_event_loop() - closed = False - q = queue.Queue() - - async def write(): - nonlocal closed - d = q.get() - if closed: - pass - if not d or closed: - closed = True - writer.close() - else: - writer.write(d) - - s = wyzecam.WyzeIOTCSession( - self.plugin.wyze_iotc.tutk_platform_lib, - account, - self.camera, - frame_size=frameSize, - bitrate=bitrate, - # CONNECTING? - stream_state=c_int(2), - ) - - self.streams.add(s) - startedAudio = False - if not self.activeStream: - self.activeStream = s - - def runAudio(): - for frame, frame_info in s.recv_audio_data(): - for q in self.audioQueues: - q.put((frame, frame_info)) - - def checkStartAudio(): - nonlocal startedAudio - if not startedAudio and self.activeStream == s: - startedAudio = True - thread = threading.Thread(target=runAudio) - thread.start() - - def run(): - try: - with s as sess: - checkStartAudio() - for frame, frame_info in sess.recv_video_data(): - if closed: - return - q.put(frame) - asyncio.run_coroutine_threadsafe(write(), loop=loop) - checkStartAudio() - - except Exception as e: - traceback.print_exception(e) - finally: - self.print("session closed") - q.put(None) - - thread = threading.Thread(target=run) - thread.start() - - try: - while not closed: - buffer = await reader.read() - if not len(buffer): - return - except Exception as e: - traceback.print_exception(e) - finally: - self.streams.remove(s) - if self.activeStream == s: - # promote new audio stream to active - self.activeStream = None - for next in self.streams: - self.activeStream = next - break - self.print("reader closed") - closed = True writer.close() + self.print("rfc reader closed") + vprocess.stdin.write("q\n".encode()) + vprocess.terminate() + if aprocess: + aprocess.stdin.write("q\n".encode()) + aprocess.terminate() async def ensureServer(self, cb) -> int: server = await asyncio.start_server(cb, "127.0.0.1", 0) @@ -458,69 +292,87 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings): asyncio.ensure_future(server.serve_forever()) return port - def probeCodec(self, account, frameSize, bitrate): - with wyzecam.WyzeIOTCSession( - self.plugin.wyze_iotc.tutk_platform_lib, - account, - self.camera, - frame_size=frameSize, - bitrate=bitrate, - # CONNECTING? - stream_state=c_int(2), - ) as sess: - audioCodec = sess.get_audio_codec() - for data, frame_info in sess.recv_video_data(): + async def probeCodec(self, substream: bool): + sps: bytes = None + pps: bytes = None + audioCodec: str = None + audioSampleRate: int = None + async for audio, data, codec, sampleRate in self.forkAndStream(substream): + if not audio and not sps and len(data): nals = data.split(b"\x00\x00\x00\x01") sps = nals[1] pps = nals[2] - return audioCodec + (sps, pps) - def probeMainCodec(self): - return self.probeCodec( - self.plugin.account.model_copy(), - self.mainFrameSize, - self.getMainByteRate(), - ) + if audio and not self.getMuted(): + audioCodec = codec + audioSampleRate = sampleRate - def probeSubCodec(self): + if sps and (audioCodec or self.getMuted()): + return (audioCodec, audioSampleRate, sps, pps) + + async def forkAndStream(self, substream: bool): + frameSize = FRAME_SIZE_360P if substream else self.mainFrameSize + bitrate = self.subByteRate if substream else self.getMainByteRate() account = self.plugin.account.model_copy() - account.phone_id = account.phone_id[2:] - return self.probeCodec( - account, - FRAME_SIZE_360P, - self.subByterate, - ) + if substream: + account.phone_id = account.phone_id[2:] + + forked = scrypted_sdk.fork() + try: + wyzeFork: WyzeFork = await forked.result + async for payload in await wyzeFork.open_stream( + self.plugin.tutk_platform_lib, + account.model_dump(), + self.camera.model_dump(), + frameSize, + bitrate, + self.getMuted(), + ): + audio: bool = payload["audio"] + data: bytes = payload["data"] + codec: bytes = payload["codec"] + sampleRate: bytes = payload["sampleRate"] + yield audio, data, codec, sampleRate + finally: + forked.worker.terminate() async def getVideoStream( self, options: RequestMediaStreamOptions = None ) -> Coroutine[Any, Any, MediaObject]: substream = options and options.get("id") == "substream" - if substream: - if not self.sub: - codec, sampleRate, sps, pps = await to_thread(self.probeSubCodec) - self.sub = CodecInfo("h264", (sps, pps), codec, sampleRate) - info = self.sub + try: + if substream: + if not self.sub: + self.print("fetching sub codec info") + codec, sampleRate, sps, pps = await self.probeCodec(True) + self.sub = CodecInfo("h264", (sps, pps), codec, sampleRate) + self.print("sub codec info", len(sps), len(pps)) + info = self.sub - if not substream: - if not self.main: - codec, sampleRate, sps, pps = await to_thread(self.probeMainCodec) - self.main = CodecInfo("h264", (sps, pps), codec, sampleRate) - info = self.main + else: + if not self.main: + self.print("fetching main codec info") + codec, sampleRate, sps, pps = await self.probeCodec(False) + self.main = CodecInfo("h264", (sps, pps), codec, sampleRate) + self.print("main codec info", len(sps), len(pps)) + info = self.main + except Exception as e: + self.print("Error retrieving codec info") + print_exception(self.print, e) + raise - port = await self.subServer if substream else await self.mainServer - audioPort = await self.audioServer rfcPort = await self.rfcSubServer if substream else await self.rfcServer msos = self.getVideoStreamOptionsInternal() mso = msos[1] if substream else msos[0] - mso["audio"]["sampleRate"] = info.audioSampleRate + if not self.getMuted(): + mso["audio"]["sampleRate"] = info.audioSampleRate - if True: - sps = base64.b64encode(info.videoCodecInfo[0]).decode() - pps = base64.b64encode(info.videoCodecInfo[1]).decode() - audioCodecName = codecMap.get(info.audioCodec) - sdp = f"""v=0 + sps = base64.b64encode(info.videoCodecInfo[0]).decode() + pps = base64.b64encode(info.videoCodecInfo[1]).decode() + audioCodecName = codecMap.get(info.audioCodec) + sdp = f"""v=0 o=- 0 0 IN IP4 0.0.0.0 s=No Name t=0 0 @@ -528,50 +380,23 @@ m=video 0 RTP/AVP 96 c=IN IP4 0.0.0.0 a=rtpmap:96 H264/90000 a=fmtp:96 packetization-mode=1; sprop-parameter-sets={sps},{pps}; profile-level-id=4D0029 +""" + if not self.getMuted(): + sdp += f""" m=audio 0 RTP/AVP 97 c=IN IP4 0.0.0.0 b=AS:128 a=rtpmap:97 {audioCodecName}/{info.audioSampleRate}/1 """ - rfc = { - "url": f"tcp://127.0.0.1:{rfcPort}", - "sdp": sdp, - "mediaStreamOptions": mso, - } - jsonString = json.dumps(rfc) - mo = await scrypted_sdk.mediaManager.createMediaObject( - jsonString.encode(), - "x-scrypted/x-rfc4571", - { - "sourceId": self.id, - }, - ) - return mo - - ffmpegInput: scrypted_sdk.FFmpegInput = { - "container": "ffmpeg", + rfc = { + "url": f"tcp://127.0.0.1:{rfcPort}", + "sdp": sdp, "mediaStreamOptions": mso, - "inputArguments": [ - "-analyzeduration", - "0", - "-probesize", - "100k", - "-f", - "h264", - "-i", - f"tcp://127.0.0.1:{port}", - "-f", - info.audioCodec, - "-ar", - f"{info.audioBitrate}", - "-ac", - "1", - "-i", - f"tcp://127.0.0.1:{audioPort}", - ], } - mo = await scrypted_sdk.mediaManager.createFFmpegMediaObject( - ffmpegInput, + jsonString = json.dumps(rfc) + mo = await scrypted_sdk.mediaManager.createMediaObject( + jsonString.encode(), + "x-scrypted/x-rfc4571", { "sourceId": self.id, }, @@ -589,7 +414,7 @@ a=rtpmap:97 {audioCodecName}/{info.audioSampleRate}/1 "width": 2560 if self.camera.is_2k else 1920, "height": 1440 if self.camera.is_2k else 1080, }, - "audio": {}, + "audio": None if self.getMuted() else {}, } ) # not all wyze can substream, need to create an exhaustive list? @@ -604,7 +429,7 @@ a=rtpmap:97 {audioCodecName}/{info.audioSampleRate}/1 "width": 640, "height": 360, }, - "audio": {}, + "audio": None if self.getMuted() else {}, } ) return ret @@ -617,6 +442,7 @@ class WyzePlugin(scrypted_sdk.ScryptedDeviceBase, DeviceProvider): cameras: Dict[str, wyzecam.WyzeCamera] account: wyzecam.WyzeAccount tutk_platform_lib: str + wyze_iotc: wyzecam.WyzeIOTC def __init__(self): super().__init__() @@ -776,3 +602,123 @@ class WyzePlugin(scrypted_sdk.ScryptedDeviceBase, DeviceProvider): def create_scrypted_plugin(): return WyzePlugin() + + +class WyzeFork: + async def open_stream( + self, + tutk_platform_lib: str, + account_json, + camera_json, + frameSize: int, + bitrate: int, + muted: bool, + ): + account = wyzecam.WyzeAccount(**account_json) + camera = wyzecam.WyzeCamera(**camera_json) + + wyze_iotc = wyzecam.WyzeIOTC( + tutk_platform_lib=tutk_platform_lib, + sdk_key=sdkKey, + max_num_av_channels=32, + ) + wyze_iotc.initialize() + + loop = asyncio.get_event_loop() + aq: asyncio.Queue[tuple[bool, bytes, Any]] = asyncio.Queue() + + closed = False + + def run(): + with wyzecam.WyzeIOTCSession( + wyze_iotc.tutk_platform_lib, + account, + camera, + frame_size=frameSize, + bitrate=bitrate, + enable_audio=not muted, + # CONNECTING? + stream_state=c_int(2), + ) as sess: + nonlocal closed + + if not muted: + + def runAudio(): + nonlocal closed + try: + rate = sess.get_audio_sample_rate() + codec: str = None + for frame, frame_info in sess.recv_audio_data(): + if closed: + return + if not codec: + codec, rate = sess.get_audio_codec_from_codec_id( + frame_info.codec_id + ) + asyncio.run_coroutine_threadsafe( + aq.put((True, frame, codec, rate, frame_info)), + loop=loop, + ) + except Exception as e: + asyncio.run_coroutine_threadsafe( + aq.put((True, None, None, None, format_exception(e))), + loop=loop, + ) + finally: + asyncio.run_coroutine_threadsafe( + aq.put((True, None, None, None, None)), loop=loop + ) + closed = True + + athread = threading.Thread( + target=runAudio, name="audio-" + camera.p2p_id + ) + athread.start() + else: + athread = None + + try: + for frame in sess.recv_bridge_frame(): + if closed: + return + asyncio.run_coroutine_threadsafe( + aq.put((False, frame, None, None, None)), loop=loop + ) + except Exception as e: + asyncio.run_coroutine_threadsafe( + aq.put((False, None, None, None, format_exception(e))), + loop=loop, + ) + finally: + asyncio.run_coroutine_threadsafe( + aq.put((False, None, None, None, None)), loop=loop + ) + closed = True + + if athread: + athread.join() + + vthread = threading.Thread(target=run, name="video-" + camera.p2p_id) + vthread.start() + + try: + while not closed: + payload = await aq.get() + audio, data, codec, sampleRate, info = payload + if data == None: + return + + yield { + "__json_copy_serialize_children": True, + "data": data, + "audio": audio, + "codec": codec, + "sampleRate": sampleRate, + } + finally: + closed = True + + +async def fork(): + return WyzeFork()