Merge branch 'main' of github.com:koush/scrypted

This commit is contained in:
Koushik Dutta
2023-03-05 21:36:54 -08:00
4 changed files with 112 additions and 93 deletions

View File

@@ -1,19 +1,19 @@
{
"name": "@scrypted/arlo",
"version": "0.6.5",
"version": "0.6.7",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/arlo",
"version": "0.6.5",
"version": "0.6.7",
"devDependencies": {
"@scrypted/sdk": "file:../../sdk"
}
},
"../../sdk": {
"name": "@scrypted/sdk",
"version": "0.2.63",
"version": "0.2.78",
"dev": true,
"license": "ISC",
"dependencies": {

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/arlo",
"version": "0.6.5",
"version": "0.6.7",
"description": "Arlo Plugin for Scrypted",
"keywords": [
"scrypted",

View File

@@ -27,7 +27,7 @@ from .request import Request
from .mqtt_stream_async import MQTTStream
from .sse_stream_async import EventStream
from .logging import logger
# Import all of the other stuff.
from datetime import datetime
@@ -227,7 +227,7 @@ class Arlo(object):
when subsequent calls to /notify are made.
"""
async def heartbeat(self, basestations, interval=30):
while self.event_stream and self.event_stream.connected:
while self.event_stream and self.event_stream.active:
for basestation in basestations:
try:
self.Ping(basestation)
@@ -378,7 +378,9 @@ class Arlo(object):
return None
return stop
return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper))
return asyncio.get_event_loop().create_task(
self.HandleEvents(basestation, resource, [('is', 'motionDetected')], callbackwrapper)
)
def SubscribeToBatteryEvents(self, basestation, camera, callback):
"""
@@ -403,7 +405,9 @@ class Arlo(object):
return None
return stop
return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper))
return asyncio.get_event_loop().create_task(
self.HandleEvents(basestation, resource, [('is', 'batteryLevel')], callbackwrapper)
)
def SubscribeToDoorbellEvents(self, basestation, doorbell, callback):
"""
@@ -437,7 +441,9 @@ class Arlo(object):
return None
return stop
return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper))
return asyncio.get_event_loop().create_task(
self.HandleEvents(basestation, resource, [('is', 'buttonPressed')], callbackwrapper)
)
def SubscribeToSDPAnswers(self, basestation, camera, callback):
"""
@@ -456,14 +462,16 @@ class Arlo(object):
def callbackwrapper(self, event):
properties = event.get("properties", {})
stop = None
stop = None
if properties.get("type") == "answerSdp":
stop = callback(properties.get("data"))
if not stop:
return None
return stop
return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper))
return asyncio.get_event_loop().create_task(
self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper)
)
def SubscribeToCandidateAnswers(self, basestation, camera, callback):
"""
@@ -482,14 +490,16 @@ class Arlo(object):
def callbackwrapper(self, event):
properties = event.get("properties", {})
stop = None
stop = None
if properties.get("type") == "answerCandidate":
stop = callback(properties.get("data"))
if not stop:
return None
return stop
return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper))
return asyncio.get_event_loop().create_task(
self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper)
)
async def HandleEvents(self, basestation, resource, actions, callback):
"""
@@ -502,9 +512,17 @@ class Arlo(object):
await self.Subscribe()
async def loop_action_listener(action):
# in this function, action can either be a tuple or a string
# if it is a tuple, we expect there to be a property key in the tuple
property = None
if isinstance(action, tuple):
action, property = action
if not isinstance(action, str):
raise Exception('Actions must be either a tuple or a str')
seen_events = {}
while self.event_stream.active:
event, _ = await self.event_stream.get(resource, [action], seen_events)
event, _ = await self.event_stream.get(resource, action, property, seen_events)
if event is None or self.event_stream is None \
or self.event_stream.event_stream_stop_event.is_set():
@@ -514,7 +532,7 @@ class Arlo(object):
response = callback(self, event.item)
# always requeue so other listeners can see the event too
self.event_stream.requeue(event, resource, action)
self.event_stream.requeue(event, resource, action, property)
if response is not None:
return response
@@ -606,7 +624,13 @@ class Arlo(object):
return nl.stream_url_dict['url'].replace("rtsp://", "rtsps://")
return None
return await self.TriggerAndHandleEvent(basestation, resource, ["is"], trigger, callback)
return await self.TriggerAndHandleEvent(
basestation,
resource,
[("is", "activityState")],
trigger,
callback,
)
def StartPushToTalk(self, basestation, camera):
url = f'https://{self.BASE_URL}/hmsweb/users/devices/{self.user_id}_{camera.get("deviceId")}/pushtotalk'
@@ -644,8 +668,6 @@ class Arlo(object):
async def TriggerFullFrameSnapshot(self, basestation, camera):
"""
This function causes the camera to record a fullframe snapshot.
The presignedFullFrameSnapshotUrl url is returned.
Use DownloadSnapshot() to download the actual image file.
"""
resource = f"cameras/{camera.get('deviceId')}"
@@ -676,4 +698,14 @@ class Arlo(object):
return url
return None
return await self.TriggerAndHandleEvent(basestation, resource, ["fullFrameSnapshotAvailable", "lastImageSnapshotAvailable", "is"], trigger, callback)
return await self.TriggerAndHandleEvent(
basestation,
resource,
[
(action, property)
for action in ["fullFrameSnapshotAvailable", "lastImageSnapshotAvailable", "is"]
for property in ["presignedFullFrameSnapshotUrl", "presignedLastImageUrl"]
],
trigger,
callback,
)

View File

@@ -28,7 +28,7 @@ from .logging import logger
class Stream:
"""This class provides a queue-based EventStream object."""
def __init__(self, arlo, expire=10):
def __init__(self, arlo, expire=5):
self.event_stream = None
self.initializing = True
self.connected = False
@@ -43,7 +43,7 @@ class Stream:
self.event_loop = asyncio.get_event_loop()
self.event_loop.create_task(self._clean_queues())
self.event_loop.create_task(self._refresh_interval())
def __del__(self):
self.disconnect()
@@ -83,11 +83,16 @@ class Stream:
self.refresh_loop_signal.put_nowait(object())
async def _clean_queues(self):
interval = self.expire * 2
interval = self.expire * 4
await asyncio.sleep(interval)
while not self.event_stream_stop_event.is_set():
for key, q in self.queues.items():
# since we interrupt the cleanup loop after every queue, there's
# a chance the self.queues dict is modified during iteration.
# so, we first make a copy of all the items of the dict and any
# new queues will be processed on the next loop through
queue_items = [i for i in self.queues.items()]
for key, q in queue_items:
if q.empty():
continue
@@ -114,81 +119,47 @@ class Stream:
if num_dropped > 0:
logger.debug(f"Cleaned {num_dropped} events from queue {key}")
# cleanup is not urgent, so give other tasks a chance
await asyncio.sleep(0.1)
await asyncio.sleep(interval)
async def get(self, resource, actions, skip_uuids={}):
if len(actions) == 1:
action = actions[0]
async def get(self, resource, action, property=None, skip_uuids={}):
if not property:
key = f"{resource}/{action}"
if key not in self.queues:
q = self.queues[key] = asyncio.Queue()
else:
q = self.queues[key]
first_requeued = None
while True:
event = await q.get()
q.task_done()
if not event:
# exit signal received
return None, action
if first_requeued is not None and first_requeued is event:
# if we reach here, we've cycled through the whole queue
# and found nothing for us, so sleep and give the next
# subscriber a chance
q.put_nowait(event)
await asyncio.sleep(random.uniform(0, 0.01))
continue
if event.expired:
continue
elif event.uuid in skip_uuids:
q.put_nowait(event)
if first_requeued is None:
first_requeued = event
else:
return event, action
else:
while True:
for action in actions:
key = f"{resource}/{action}"
key = f"{resource}/{action}/{property}"
if key not in self.queues:
q = self.queues[key] = asyncio.Queue()
else:
q = self.queues[key]
if key not in self.queues:
q = self.queues[key] = asyncio.Queue()
else:
q = self.queues[key]
if q.empty():
continue
first_requeued = None
while True:
event = await q.get()
q.task_done()
first_requeued = None
while not q.empty():
event = q.get_nowait()
q.task_done()
if not event:
# exit signal received
return None, action
if not event:
# exit signal received
return None, action
if first_requeued is not None and first_requeued is event:
# if we reach here, we've cycled through the whole queue
# and found nothing for us, so go to the next queue
q.put_nowait(event)
break
if event.expired:
continue
elif event.uuid in skip_uuids:
q.put_nowait(event)
if first_requeued is None:
first_requeued = event
else:
return event, action
if first_requeued is not None and first_requeued is event:
# if we reach here, we've cycled through the whole queue
# and found nothing for us, so sleep and give the next
# subscriber a chance
q.put_nowait(event)
await asyncio.sleep(random.uniform(0, 0.01))
continue
if event.expired:
continue
elif event.uuid in skip_uuids:
q.put_nowait(event)
if first_requeued is None:
first_requeued = event
else:
return event, action
async def start(self):
raise NotImplementedError()
@@ -203,15 +174,31 @@ class Stream:
resource = response.get('resource')
action = response.get('action')
key = f"{resource}/{action}"
now = time.time()
event = StreamEvent(response, now, now + self.expire)
if key not in self.queues:
q = self.queues[key] = asyncio.Queue()
else:
q = self.queues[key]
now = time.time()
q.put_nowait(StreamEvent(response, now, now + self.expire))
q.put_nowait(event)
def requeue(self, event, resource, action):
key = f"{resource}/{action}"
# for optimized lookups, notify listeners of individual properties
properties = response.get('properties', {})
for property in properties.keys():
key = f"{resource}/{action}/{property}"
if key not in self.queues:
q = self.queues[key] = asyncio.Queue()
else:
q = self.queues[key]
q.put_nowait(event)
def requeue(self, event, resource, action, property=None):
if not property:
key = f"{resource}/{action}"
else:
key = f"{resource}/{action}/{property}"
self.queues[key].put_nowait(event)
def disconnect(self):