mirror of
https://github.com/koush/scrypted.git
synced 2026-05-04 21:30:30 +01:00
* fix doorbell device type * bump 0.7.1 for beta * standalone camera fixes * bump 0.7.2 for beta * more type annotations + trickle discover all devices * fetch arlo library clips * log options * cache library at lower level and fetch clips on demand * move library timedelta range lower in stack * wip siren as security system * virtual security system and tweaks * vss documentation and settings * expand vss usage docs * more docs changes * force homekit and scrypted to update given vss and siren state * RE-ENABLING SIREN!!! * bump 0.7.3 for beta * bump 0.7.3 for release
28 lines
858 B
Python
28 lines
858 B
Python
import asyncio
|
|
|
|
|
|
class BackgroundTaskMixin:
|
|
def create_task(self, coroutine) -> asyncio.Task:
|
|
task = asyncio.get_event_loop().create_task(coroutine)
|
|
self.register_task(task)
|
|
return task
|
|
|
|
def register_task(self, task) -> None:
|
|
if not hasattr(self, "background_tasks"):
|
|
self.background_tasks = set()
|
|
|
|
assert task is not None
|
|
|
|
def print_exception(task):
|
|
if task.exception():
|
|
self.logger.error(f"task exception: {task.exception()}")
|
|
|
|
self.background_tasks.add(task)
|
|
task.add_done_callback(print_exception)
|
|
task.add_done_callback(self.background_tasks.discard)
|
|
|
|
def cancel_pending_tasks(self) -> None:
|
|
if not hasattr(self, "background_tasks"):
|
|
return
|
|
for task in self.background_tasks:
|
|
task.cancel() |