wyze: refactor for multiprocessing

This commit is contained in:
Koushik Dutta
2023-12-21 21:55:34 -08:00
parent 5cfcfafc00
commit c2d86237d6
4 changed files with 298 additions and 352 deletions

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/wyze",
"version": "0.0.11",
"version": "0.0.31",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/wyze",
"version": "0.0.11",
"version": "0.0.31",
"devDependencies": {
"@scrypted/sdk": "file:../../sdk"
}

View File

@@ -33,5 +33,5 @@
"devDependencies": {
"@scrypted/sdk": "file:../../sdk"
},
"version": "0.0.11"
"version": "0.0.31"
}

View File

@@ -1,50 +1,40 @@
from __future__ import annotations
from typing import Any, Coroutine, List, Dict, Callable, Iterator, MutableSet
import scrypted_sdk
import asyncio
import urllib.request
import base64
import concurrent.futures
import json
import os
import urllib
import sys
import platform
import queue
import struct
import subprocess
import sys
import threading
import traceback
import urllib
import urllib.request
from ctypes import c_int
from typing import Any, Callable, Coroutine, Dict, Iterator, List, MutableSet
import scrypted_sdk
from scrypted_sdk.other import MediaObject
from scrypted_sdk.types import (DeviceProvider, PanTiltZoom,
RequestMediaStreamOptions,
ResponseMediaStreamOptions, ScryptedDeviceType,
ScryptedInterface, Setting, Settings,
VideoCamera)
import wyzecam
import wyzecam.api_models
import json
import threading
import queue
import traceback
from ctypes import c_int
import concurrent.futures
import subprocess
import base64
import struct
from wyzecam.tutk.tutk import (
FRAME_SIZE_2K,
FRAME_SIZE_1080P,
FRAME_SIZE_360P,
)
from scrypted_sdk.types import (
DeviceProvider,
RequestMediaStreamOptions,
ResponseMediaStreamOptions,
VideoCamera,
ScryptedDeviceType,
ScryptedInterface,
Settings,
Setting,
)
from wyzecam.tutk.tutk import FRAME_SIZE_2K, FRAME_SIZE_360P, FRAME_SIZE_1080P
os.environ["TUTK_PROJECT_ROOT"] = os.path.join(
os.environ["SCRYPTED_PLUGIN_VOLUME"], "zip/unzipped/fs"
)
sdkKey = "AQAAAIZ44fijz5pURQiNw4xpEfV9ZysFH8LYBPDxiONQlbLKaDeb7n26TSOPSGHftbRVo25k3uz5of06iGNB4pSfmvsCvm/tTlmML6HKS0vVxZnzEuK95TPGEGt+aE15m6fjtRXQKnUav59VSRHwRj9Z1Kjm1ClfkSPUF5NfUvsb3IAbai0WlzZE1yYCtks7NFRMbTXUMq3bFtNhEERD/7oc504b"
toThreadExecutor = concurrent.futures.ThreadPoolExecutor(
max_workers=2, thread_name_prefix="image"
)
toThreadExecutor = concurrent.futures.ThreadPoolExecutor(thread_name_prefix="probe")
codecMap = {
"mulaw": "PCMU",
@@ -55,6 +45,15 @@ codecMap = {
}
def print_exception(print, e):
for line in traceback.format_exception(e):
print(line)
def format_exception(e):
return "\n".join(traceback.format_exception(e))
async def to_thread(f):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(toThreadExecutor, f)
@@ -79,12 +78,9 @@ class CodecInfo:
self.audioSampleRate = audioSampleRate
class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings):
class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings, PanTiltZoom):
camera: wyzecam.WyzeCamera
plugin: WyzePlugin
streams: MutableSet[wyzecam.WyzeIOTCSession]
activeStream: wyzecam.WyzeIOTCSession
audioQueues: MutableSet[queue.Queue[tuple[bytes, Any]]]
main: CodecInfo
sub: CodecInfo
@@ -101,13 +97,8 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings):
self.main = None
self.sub = None
self.mainFrameSize = FRAME_SIZE_2K if camera.is_2k else FRAME_SIZE_1080P
self.subByterate = 30
self.subByteRate = 30
self.mainServer = asyncio.ensure_future(self.ensureServer(self.handleClientHD))
self.subServer = asyncio.ensure_future(self.ensureServer(self.handleClientSD))
self.audioServer = asyncio.ensure_future(
self.ensureServer(self.handleAudioClient)
)
self.rfcServer = asyncio.ensure_future(
self.ensureServer(self.handleMainRfcClient)
)
@@ -115,15 +106,27 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings):
self.ensureServer(self.handleSubRfcClient)
)
if camera.is_pan_cam:
self.ptzCapabilities = {
"pan": True,
"tilt": True,
}
async def ptzCommand(self, command: scrypted_sdk.PanTiltZoomCommand) -> None:
pass
def safeParseJsonStorage(self, key: str):
try:
return json.loads(self.storage.getItem(key))
except:
return None
def getMuted(self):
return False
def getMainByteRate(self, default=False):
try:
bit = int(self.safeParseJsonStorage("bitrate"))
bit = int(self.safeParseJsonStorage("byterate"))
bit = round(bit / 8)
bit = bit if 1 <= bit <= 255 else 0
if not bit:
@@ -143,7 +146,7 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings):
"key": "bitrate",
"title": "Main Stream Bitrate",
"description": "The bitrate used by the main stream.",
"value": self.safeParseJsonStorage("bitrate"),
"value": self.safeParseJsonStorage("byterate"),
"combobox": True,
"value": str(self.getMainByteRate(True)),
"choices": [
@@ -168,44 +171,6 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings):
self.nativeId, ScryptedInterface.VideoCamera.value, None
)
async def handleClientHD(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
return await self.handleClient(
self.plugin.account.model_copy(),
self.mainFrameSize,
self.getMainByteRate(),
reader,
writer,
)
async def handleClientSD(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
account = self.plugin.account.model_copy()
# wyze cams will disconnect first stream if the phone id requests a second stream.
# use a different substream phone id, similar to how docker wyze bridge does it.
account.phone_id = account.phone_id[2:]
return await self.handleClient(
account,
FRAME_SIZE_360P,
self.subByterate,
reader,
writer,
)
def receiveAudioData(self):
q: queue.Queue[tuple[bytes, Any]] = queue.Queue()
self.audioQueues.add(q)
try:
while True:
b, info = q.get()
if not b:
return
yield b, info
finally:
self.audioQueues.remove(q)
async def handleMainRfcClient(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
@@ -223,9 +188,11 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings):
writer: asyncio.StreamWriter,
):
info = self.sub if substream else self.main
port = await self.subServer if substream else await self.mainServer
audioPort = await self.audioServer
try:
ffmpeg = await scrypted_sdk.mediaManager.getFFmpegPath()
except Exception as e:
raise e
loop = asyncio.get_event_loop()
class Protocol:
def __init__(self, pt: int) -> None:
@@ -240,44 +207,41 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings):
writer.write(len_data)
writer.write(data)
ffmpeg = await scrypted_sdk.mediaManager.getFFmpegPath()
loop = asyncio.get_event_loop()
vt, vp = await loop.create_datagram_endpoint(
lambda: Protocol(96), local_addr=("127.0.0.1", 0)
)
vhost, vport = vt._sock.getsockname()
vprocess = subprocess.Popen(
[
ffmpeg,
"-analyzeduration",
"0",
"-probesize",
"100k",
"-f",
"h264",
"-i",
f"tcp://127.0.0.1:{port}",
"-vcodec",
"copy",
"-an",
"-f",
"rtp",
"-payload_type",
"96",
f"rtp://127.0.0.1:{vport}?pkt_size=1300",
]
vprocess = await asyncio.create_subprocess_exec(
ffmpeg,
"-analyzeduration",
"0",
"-probesize",
"100k",
"-f",
"h264",
"-i",
"pipe:0",
"-vcodec",
"copy",
"-an",
"-f",
"rtp",
"-payload_type",
"96",
f"rtp://127.0.0.1:{vport}?pkt_size=1300",
stdin=asyncio.subprocess.PIPE,
)
at, ap = await loop.create_datagram_endpoint(
lambda: Protocol(97), local_addr=("127.0.0.1", 0)
)
aprocess: asyncio.subprocess.Process = None
if not self.getMuted():
at, ap = await loop.create_datagram_endpoint(
lambda: Protocol(97), local_addr=("127.0.0.1", 0)
)
ahost, aport = at._sock.getsockname()
ahost, aport = at._sock.getsockname()
aprocess = subprocess.Popen(
[
aprocess = await asyncio.create_subprocess_exec(
ffmpeg,
"-analyzeduration",
"0",
@@ -288,7 +252,7 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings):
"-ar",
f"{info.audioSampleRate}",
"-i",
f"tcp://127.0.0.1:{audioPort}",
"pipe:0",
"-acodec",
"copy",
"-vn",
@@ -297,159 +261,29 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings):
"-payload_type",
"97",
f"rtp://127.0.0.1:{aport}?pkt_size=1300",
]
)
stdin=asyncio.subprocess.PIPE,
)
try:
while True:
buffer = await reader.read()
if not len(buffer):
print("stuffing")
async for audio, data, codec, sampleRate in self.forkAndStream(substream):
if writer.is_closing():
return
p = aprocess if audio else vprocess
if p:
p.stdin.write(data)
await p.stdin.drain()
except Exception as e:
traceback.print_exception(e)
print_exception(self.print, e)
finally:
self.print("rfc reader closed")
# aprocess.stdin.write("q\n")
aprocess.terminate()
# vprocess.stdin.write("q\n")
vprocess.terminate()
async def handleAudioClient(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
loop = asyncio.get_event_loop()
closed = False
q = queue.Queue()
async def write():
nonlocal closed
d = q.get()
if closed:
pass
if not d or closed:
closed = True
writer.close()
else:
writer.write(d)
def run():
try:
for frame, frame_info in self.receiveAudioData():
if closed:
return
q.put(frame)
asyncio.run_coroutine_threadsafe(write(), loop=loop)
except Exception as e:
traceback.print_exception(e)
finally:
self.print("audio session closed")
q.put(None)
thread = threading.Thread(target=run)
thread.start()
try:
while True:
buffer = await reader.read()
if not len(buffer):
return
except Exception as e:
traceback.print_exception(e)
finally:
self.print("audio reader closed")
closed = True
async def handleClient(
self,
account: wyzecam.WyzeAccount,
frameSize,
bitrate,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
):
loop = asyncio.get_event_loop()
closed = False
q = queue.Queue()
async def write():
nonlocal closed
d = q.get()
if closed:
pass
if not d or closed:
closed = True
writer.close()
else:
writer.write(d)
s = wyzecam.WyzeIOTCSession(
self.plugin.wyze_iotc.tutk_platform_lib,
account,
self.camera,
frame_size=frameSize,
bitrate=bitrate,
# CONNECTING?
stream_state=c_int(2),
)
self.streams.add(s)
startedAudio = False
if not self.activeStream:
self.activeStream = s
def runAudio():
for frame, frame_info in s.recv_audio_data():
for q in self.audioQueues:
q.put((frame, frame_info))
def checkStartAudio():
nonlocal startedAudio
if not startedAudio and self.activeStream == s:
startedAudio = True
thread = threading.Thread(target=runAudio)
thread.start()
def run():
try:
with s as sess:
checkStartAudio()
for frame, frame_info in sess.recv_video_data():
if closed:
return
q.put(frame)
asyncio.run_coroutine_threadsafe(write(), loop=loop)
checkStartAudio()
except Exception as e:
traceback.print_exception(e)
finally:
self.print("session closed")
q.put(None)
thread = threading.Thread(target=run)
thread.start()
try:
while not closed:
buffer = await reader.read()
if not len(buffer):
return
except Exception as e:
traceback.print_exception(e)
finally:
self.streams.remove(s)
if self.activeStream == s:
# promote new audio stream to active
self.activeStream = None
for next in self.streams:
self.activeStream = next
break
self.print("reader closed")
closed = True
writer.close()
self.print("rfc reader closed")
vprocess.stdin.write("q\n".encode())
vprocess.terminate()
if aprocess:
aprocess.stdin.write("q\n".encode())
aprocess.terminate()
async def ensureServer(self, cb) -> int:
server = await asyncio.start_server(cb, "127.0.0.1", 0)
@@ -458,69 +292,87 @@ class WyzeCamera(scrypted_sdk.ScryptedDeviceBase, VideoCamera, Settings):
asyncio.ensure_future(server.serve_forever())
return port
def probeCodec(self, account, frameSize, bitrate):
with wyzecam.WyzeIOTCSession(
self.plugin.wyze_iotc.tutk_platform_lib,
account,
self.camera,
frame_size=frameSize,
bitrate=bitrate,
# CONNECTING?
stream_state=c_int(2),
) as sess:
audioCodec = sess.get_audio_codec()
for data, frame_info in sess.recv_video_data():
async def probeCodec(self, substream: bool):
sps: bytes = None
pps: bytes = None
audioCodec: str = None
audioSampleRate: int = None
async for audio, data, codec, sampleRate in self.forkAndStream(substream):
if not audio and not sps and len(data):
nals = data.split(b"\x00\x00\x00\x01")
sps = nals[1]
pps = nals[2]
return audioCodec + (sps, pps)
def probeMainCodec(self):
return self.probeCodec(
self.plugin.account.model_copy(),
self.mainFrameSize,
self.getMainByteRate(),
)
if audio and not self.getMuted():
audioCodec = codec
audioSampleRate = sampleRate
def probeSubCodec(self):
if sps and (audioCodec or self.getMuted()):
return (audioCodec, audioSampleRate, sps, pps)
async def forkAndStream(self, substream: bool):
frameSize = FRAME_SIZE_360P if substream else self.mainFrameSize
bitrate = self.subByteRate if substream else self.getMainByteRate()
account = self.plugin.account.model_copy()
account.phone_id = account.phone_id[2:]
return self.probeCodec(
account,
FRAME_SIZE_360P,
self.subByterate,
)
if substream:
account.phone_id = account.phone_id[2:]
forked = scrypted_sdk.fork()
try:
wyzeFork: WyzeFork = await forked.result
async for payload in await wyzeFork.open_stream(
self.plugin.tutk_platform_lib,
account.model_dump(),
self.camera.model_dump(),
frameSize,
bitrate,
self.getMuted(),
):
audio: bool = payload["audio"]
data: bytes = payload["data"]
codec: bytes = payload["codec"]
sampleRate: bytes = payload["sampleRate"]
yield audio, data, codec, sampleRate
finally:
forked.worker.terminate()
async def getVideoStream(
self, options: RequestMediaStreamOptions = None
) -> Coroutine[Any, Any, MediaObject]:
substream = options and options.get("id") == "substream"
if substream:
if not self.sub:
codec, sampleRate, sps, pps = await to_thread(self.probeSubCodec)
self.sub = CodecInfo("h264", (sps, pps), codec, sampleRate)
info = self.sub
try:
if substream:
if not self.sub:
self.print("fetching sub codec info")
codec, sampleRate, sps, pps = await self.probeCodec(True)
self.sub = CodecInfo("h264", (sps, pps), codec, sampleRate)
self.print("sub codec info", len(sps), len(pps))
info = self.sub
if not substream:
if not self.main:
codec, sampleRate, sps, pps = await to_thread(self.probeMainCodec)
self.main = CodecInfo("h264", (sps, pps), codec, sampleRate)
info = self.main
else:
if not self.main:
self.print("fetching main codec info")
codec, sampleRate, sps, pps = await self.probeCodec(False)
self.main = CodecInfo("h264", (sps, pps), codec, sampleRate)
self.print("main codec info", len(sps), len(pps))
info = self.main
except Exception as e:
self.print("Error retrieving codec info")
print_exception(self.print, e)
raise
port = await self.subServer if substream else await self.mainServer
audioPort = await self.audioServer
rfcPort = await self.rfcSubServer if substream else await self.rfcServer
msos = self.getVideoStreamOptionsInternal()
mso = msos[1] if substream else msos[0]
mso["audio"]["sampleRate"] = info.audioSampleRate
if not self.getMuted():
mso["audio"]["sampleRate"] = info.audioSampleRate
if True:
sps = base64.b64encode(info.videoCodecInfo[0]).decode()
pps = base64.b64encode(info.videoCodecInfo[1]).decode()
audioCodecName = codecMap.get(info.audioCodec)
sdp = f"""v=0
sps = base64.b64encode(info.videoCodecInfo[0]).decode()
pps = base64.b64encode(info.videoCodecInfo[1]).decode()
audioCodecName = codecMap.get(info.audioCodec)
sdp = f"""v=0
o=- 0 0 IN IP4 0.0.0.0
s=No Name
t=0 0
@@ -528,50 +380,23 @@ m=video 0 RTP/AVP 96
c=IN IP4 0.0.0.0
a=rtpmap:96 H264/90000
a=fmtp:96 packetization-mode=1; sprop-parameter-sets={sps},{pps}; profile-level-id=4D0029
"""
if not self.getMuted():
sdp += f"""
m=audio 0 RTP/AVP 97
c=IN IP4 0.0.0.0
b=AS:128
a=rtpmap:97 {audioCodecName}/{info.audioSampleRate}/1
"""
rfc = {
"url": f"tcp://127.0.0.1:{rfcPort}",
"sdp": sdp,
"mediaStreamOptions": mso,
}
jsonString = json.dumps(rfc)
mo = await scrypted_sdk.mediaManager.createMediaObject(
jsonString.encode(),
"x-scrypted/x-rfc4571",
{
"sourceId": self.id,
},
)
return mo
ffmpegInput: scrypted_sdk.FFmpegInput = {
"container": "ffmpeg",
rfc = {
"url": f"tcp://127.0.0.1:{rfcPort}",
"sdp": sdp,
"mediaStreamOptions": mso,
"inputArguments": [
"-analyzeduration",
"0",
"-probesize",
"100k",
"-f",
"h264",
"-i",
f"tcp://127.0.0.1:{port}",
"-f",
info.audioCodec,
"-ar",
f"{info.audioBitrate}",
"-ac",
"1",
"-i",
f"tcp://127.0.0.1:{audioPort}",
],
}
mo = await scrypted_sdk.mediaManager.createFFmpegMediaObject(
ffmpegInput,
jsonString = json.dumps(rfc)
mo = await scrypted_sdk.mediaManager.createMediaObject(
jsonString.encode(),
"x-scrypted/x-rfc4571",
{
"sourceId": self.id,
},
@@ -589,7 +414,7 @@ a=rtpmap:97 {audioCodecName}/{info.audioSampleRate}/1
"width": 2560 if self.camera.is_2k else 1920,
"height": 1440 if self.camera.is_2k else 1080,
},
"audio": {},
"audio": None if self.getMuted() else {},
}
)
# not all wyze can substream, need to create an exhaustive list?
@@ -604,7 +429,7 @@ a=rtpmap:97 {audioCodecName}/{info.audioSampleRate}/1
"width": 640,
"height": 360,
},
"audio": {},
"audio": None if self.getMuted() else {},
}
)
return ret
@@ -617,6 +442,7 @@ class WyzePlugin(scrypted_sdk.ScryptedDeviceBase, DeviceProvider):
cameras: Dict[str, wyzecam.WyzeCamera]
account: wyzecam.WyzeAccount
tutk_platform_lib: str
wyze_iotc: wyzecam.WyzeIOTC
def __init__(self):
super().__init__()
@@ -776,3 +602,123 @@ class WyzePlugin(scrypted_sdk.ScryptedDeviceBase, DeviceProvider):
def create_scrypted_plugin():
return WyzePlugin()
class WyzeFork:
async def open_stream(
self,
tutk_platform_lib: str,
account_json,
camera_json,
frameSize: int,
bitrate: int,
muted: bool,
):
account = wyzecam.WyzeAccount(**account_json)
camera = wyzecam.WyzeCamera(**camera_json)
wyze_iotc = wyzecam.WyzeIOTC(
tutk_platform_lib=tutk_platform_lib,
sdk_key=sdkKey,
max_num_av_channels=32,
)
wyze_iotc.initialize()
loop = asyncio.get_event_loop()
aq: asyncio.Queue[tuple[bool, bytes, Any]] = asyncio.Queue()
closed = False
def run():
with wyzecam.WyzeIOTCSession(
wyze_iotc.tutk_platform_lib,
account,
camera,
frame_size=frameSize,
bitrate=bitrate,
enable_audio=not muted,
# CONNECTING?
stream_state=c_int(2),
) as sess:
nonlocal closed
if not muted:
def runAudio():
nonlocal closed
try:
rate = sess.get_audio_sample_rate()
codec: str = None
for frame, frame_info in sess.recv_audio_data():
if closed:
return
if not codec:
codec, rate = sess.get_audio_codec_from_codec_id(
frame_info.codec_id
)
asyncio.run_coroutine_threadsafe(
aq.put((True, frame, codec, rate, frame_info)),
loop=loop,
)
except Exception as e:
asyncio.run_coroutine_threadsafe(
aq.put((True, None, None, None, format_exception(e))),
loop=loop,
)
finally:
asyncio.run_coroutine_threadsafe(
aq.put((True, None, None, None, None)), loop=loop
)
closed = True
athread = threading.Thread(
target=runAudio, name="audio-" + camera.p2p_id
)
athread.start()
else:
athread = None
try:
for frame in sess.recv_bridge_frame():
if closed:
return
asyncio.run_coroutine_threadsafe(
aq.put((False, frame, None, None, None)), loop=loop
)
except Exception as e:
asyncio.run_coroutine_threadsafe(
aq.put((False, None, None, None, format_exception(e))),
loop=loop,
)
finally:
asyncio.run_coroutine_threadsafe(
aq.put((False, None, None, None, None)), loop=loop
)
closed = True
if athread:
athread.join()
vthread = threading.Thread(target=run, name="video-" + camera.p2p_id)
vthread.start()
try:
while not closed:
payload = await aq.get()
audio, data, codec, sampleRate, info = payload
if data == None:
return
yield {
"__json_copy_serialize_children": True,
"data": data,
"audio": audio,
"codec": codec,
"sampleRate": sampleRate,
}
finally:
closed = True
async def fork():
return WyzeFork()