Compare commits

...

3 Commits

Author SHA1 Message Date
Koushik Dutta
56b2ab9c4f prerelease 2023-03-27 11:53:24 -07:00
Koushik Dutta
d330e2eb9d server: remove os machine usage which only exists in recent node builds 2023-03-27 11:53:19 -07:00
Koushik Dutta
b55e7cacb3 predict: remove old pipline code 2023-03-27 11:14:53 -07:00
14 changed files with 49 additions and 1109 deletions

View File

@@ -1 +0,0 @@
../../tensorflow-lite/src/pipeline

View File

@@ -1,10 +1,5 @@
# plugin
Pillow>=5.4.1
PyGObject>=3.30.4
coremltools~=6.1
av>=10.0.0; sys_platform != 'linux' or platform_machine == 'x86_64' or platform_machine == 'aarch64'
# sort_oh
scipy
filterpy
numpy
# pillow for anything not intel linux, pillow-simd is available on x64 linux
Pillow>=5.4.1; sys_platform != 'linux' or platform_machine != 'x86_64'
pillow-simd; sys_platform == 'linux' and platform_machine == 'x86_64'

View File

@@ -1,5 +1,4 @@
import asyncio
from typing import Any
import concurrent.futures
# vips is already multithreaded, but needs to be kicked off the python asyncio thread.

View File

@@ -6,7 +6,6 @@ try:
except:
Image = None
pyvips = None
pass
from thread import to_thread
class VipsImage(scrypted_sdk.VideoFrame):

View File

@@ -1,118 +1,21 @@
from __future__ import annotations
from asyncio.events import AbstractEventLoop, TimerHandle
from asyncio.futures import Future
from typing import Any, Mapping, Tuple
from typing_extensions import TypedDict
from pipeline import GstPipeline, GstPipelineBase, create_pipeline_sink, safe_set_result
import scrypted_sdk
import json
import asyncio
import time
import os
import binascii
from urllib.parse import urlparse
import threading
from pipeline import run_pipeline
import platform
from .corohelper import run_coro_threadsafe
from PIL import Image
import math
import io
from typing import Any, Tuple
Gst = None
try:
from gi.repository import Gst
except:
pass
av = None
try:
import av
av.logging.set_level(av.logging.PANIC)
except:
pass
from scrypted_sdk.types import ObjectDetectionGeneratorSession, ObjectDetectionModel, Setting, FFmpegInput, MediaObject, ObjectDetection, ObjectDetectionCallbacks, ObjectDetectionSession, ObjectsDetected, ScryptedInterface, ScryptedMimeTypes
def optional_chain(root, *keys):
result = root
for k in keys:
if isinstance(result, dict):
result = result.get(k, None)
else:
result = getattr(result, k, None)
if result is None:
break
return result
class DetectionSession:
id: str
timerHandle: TimerHandle
future: Future
loop: AbstractEventLoop
settings: Any
running: bool
plugin: DetectPlugin
callbacks: ObjectDetectionCallbacks
user_callback: Any
def __init__(self) -> None:
self.timerHandle = None
self.future = Future()
self.running = False
self.mutex = threading.Lock()
self.last_sample = time.time()
self.user_callback = None
def clearTimeoutLocked(self):
if self.timerHandle:
self.timerHandle.cancel()
self.timerHandle = None
def clearTimeout(self):
with self.mutex:
self.clearTimeoutLocked()
def timedOut(self):
self.plugin.end_session(self)
def setTimeout(self, duration: float):
with self.mutex:
self.clearTimeoutLocked()
self.timerHandle = self.loop.call_later(
duration, lambda: self.timedOut())
class DetectionSink(TypedDict):
pipeline: str
input_size: Tuple[float, float]
import scrypted_sdk
from scrypted_sdk.types import (MediaObject, ObjectDetection,
ObjectDetectionCallbacks,
ObjectDetectionGeneratorSession,
ObjectDetectionModel, ObjectDetectionSession,
ObjectsDetected, ScryptedMimeTypes, Setting)
class DetectPlugin(scrypted_sdk.ScryptedDeviceBase, ObjectDetection):
def __init__(self, nativeId: str | None = None):
super().__init__(nativeId=nativeId)
self.detection_sessions: Mapping[str, DetectionSession] = {}
self.session_mutex = threading.Lock()
self.crop = False
self.loop = asyncio.get_event_loop()
async def getSettings(self) -> list[Setting]:
activeSessions: Setting = {
'key': 'activeSessions',
'readonly': True,
'title': 'Active Detection Sessions',
'value': len(self.detection_sessions),
}
return [
activeSessions
]
async def putSetting(self, key: str, value: scrypted_sdk.SettingValue) -> None:
pass
def getClasses(self) -> list[str]:
pass
@@ -138,165 +41,21 @@ class DetectPlugin(scrypted_sdk.ScryptedDeviceBase, ObjectDetection):
'settings': [],
}
decoderSetting: Setting = {
'title': "Decoder",
'description': "The tool used to decode the stream. The may be libav or a gstreamer element.",
'combobox': True,
'value': 'Default',
'placeholder': 'Default',
'key': 'decoder',
'subgroup': 'Advanced',
'choices': [
'Default',
'libav',
'decodebin',
'vtdec_hw',
'nvh264dec',
'vaapih264dec',
],
}
d['settings'] += self.getModelSettings(settings)
d['settings'].append(decoderSetting)
return d
async def detection_event(self, detection_session: DetectionSession, detection_result: ObjectsDetected, redetect: Any = None, mediaObject = None):
if not detection_session.running and detection_result.get('running'):
return
detection_result['timestamp'] = int(time.time() * 1000)
if detection_session.callbacks:
if detection_session.running:
return await detection_session.callbacks.onDetection(detection_result, redetect, mediaObject)
else:
await detection_session.callbacks.onDetectionEnded(detection_result)
else:
# legacy path, nuke this pattern in opencv, pam diff, and full tensorflow.
detection_result['detectionId'] = detection_session.id
await self.onDeviceEvent(ScryptedInterface.ObjectDetection.value, detection_result)
def end_session(self, detection_session: DetectionSession):
print('detection ended', detection_session.id)
detection_session.clearTimeout()
# leave detection_session.running as True to avoid race conditions.
# the removal from detection_sessions will restart it.
safe_set_result(detection_session.loop, detection_session.future)
with self.session_mutex:
self.detection_sessions.pop(detection_session.id, None)
detection_result: ObjectsDetected = {}
detection_result['running'] = False
asyncio.run_coroutine_threadsafe(self.detection_event(detection_session, detection_result), loop=detection_session.loop)
def create_detection_result_status(self, detection_id: str, running: bool):
detection_result: ObjectsDetected = {}
detection_result['detectionId'] = detection_id
detection_result['running'] = running
detection_result['timestamp'] = int(time.time() * 1000)
return detection_result
def run_detection_jpeg(self, detection_session: DetectionSession, image_bytes: bytes, settings: Any) -> ObjectsDetected:
pass
def get_detection_input_size(self, src_size):
pass
def create_detection_session(self):
return DetectionSession()
def run_detection_gstsample(self, detection_session: DetectionSession, gst_sample, settings: Any, src_size, convert_to_src_size) -> Tuple[ObjectsDetected, Any]:
async def run_detection_videoframe(self, videoFrame: scrypted_sdk.VideoFrame, detection_session: ObjectDetectionSession) -> ObjectsDetected:
pass
async def run_detection_videoframe(self, videoFrame: scrypted_sdk.VideoFrame, detection_session: DetectionSession) -> ObjectsDetected:
pass
async def run_detection_avframe(self, detection_session: DetectionSession, avframe, settings: Any, src_size, convert_to_src_size) -> Tuple[ObjectsDetected, Any]:
pil: Image.Image = avframe.to_image()
return await self.run_detection_image(detection_session, pil, settings, src_size, convert_to_src_size)
async def run_detection_image(self, detection_session: DetectionSession, image: Image.Image, settings: Any, src_size, convert_to_src_size) -> Tuple[ObjectsDetected, Any]:
pass
def run_detection_crop(self, detection_session: DetectionSession, sample: Any, settings: Any, src_size, convert_to_src_size, bounding_box: Tuple[float, float, float, float]) -> ObjectsDetected:
print("not implemented")
pass
def ensure_session(self, mediaObjectMimeType: str, session: ObjectDetectionSession) -> Tuple[bool, DetectionSession, ObjectsDetected]:
settings = None
duration = None
detection_id = None
detection_session = None
if session:
detection_id = session.get('detectionId', None)
duration = session.get('duration', None)
settings = session.get('settings', None)
is_image = mediaObjectMimeType and mediaObjectMimeType.startswith(
'image/')
ending = False
new_session = False
with self.session_mutex:
if not is_image and not detection_id:
detection_id = binascii.b2a_hex(os.urandom(15)).decode('utf8')
if detection_id:
detection_session = self.detection_sessions.get(
detection_id, None)
if duration == None and not is_image:
ending = True
elif detection_id and not detection_session:
if not mediaObjectMimeType:
return (False, None, self.create_detection_result_status(detection_id, False))
new_session = True
detection_session = self.create_detection_session()
detection_session.plugin = self
detection_session.id = detection_id
detection_session.settings = settings
loop = asyncio.get_event_loop()
detection_session.loop = loop
self.detection_sessions[detection_id] = detection_session
detection_session.future.add_done_callback(
lambda _: self.end_session(detection_session))
if not ending and detection_session and time.time() - detection_session.last_sample > 30 and not mediaObjectMimeType:
print('detection session has not received a sample in 30 seconds, terminating',
detection_session.id)
ending = True
if ending:
if detection_session:
self.end_session(detection_session)
return (False, None, self.create_detection_result_status(detection_id, False))
if is_image:
return (False, detection_session, None)
detection_session.setTimeout(duration / 1000)
if settings != None:
detection_session.settings = settings
if not new_session:
print("existing session", detection_session.id)
return (False, detection_session, self.create_detection_result_status(detection_id, detection_session.running))
return (True, detection_session, None)
async def generateObjectDetections(self, videoFrames: Any, session: ObjectDetectionGeneratorSession = None) -> Any:
try:
videoFrames = await scrypted_sdk.sdk.connectRPCObject(videoFrames)
detection_session = self.create_detection_session()
detection_session.plugin = self
detection_session.settings = session and session.get('settings')
async for videoFrame in videoFrames:
detected = await self.run_detection_videoframe(videoFrame, detection_session)
detected = await self.run_detection_videoframe(videoFrame, session)
yield {
'__json_copy_serialize_children': True,
'detected': detected,
@@ -309,261 +68,13 @@ class DetectPlugin(scrypted_sdk.ScryptedDeviceBase, ObjectDetection):
pass
async def detectObjects(self, mediaObject: MediaObject, session: ObjectDetectionSession = None, callbacks: ObjectDetectionCallbacks = None) -> ObjectsDetected:
is_image = mediaObject and (mediaObject.mimeType.startswith('image/') or mediaObject.mimeType.endswith('/x-raw-image'))
settings = None
duration = None
if session:
duration = session.get('duration', None)
settings = session.get('settings', None)
vf: scrypted_sdk.VideoFrame
if mediaObject and mediaObject.mimeType == ScryptedMimeTypes.Image.value:
vf: scrypted_sdk.VideoFrame = mediaObject
return await self.run_detection_videoframe(vf, settings)
vf = mediaObject
else:
vf = await scrypted_sdk.mediaManager.convertMediaObjectToBuffer(mediaObject, ScryptedMimeTypes.Image.value)
create, detection_session, objects_detected = self.ensure_session(
mediaObject and mediaObject.mimeType, session)
if detection_session:
detection_session.callbacks = callbacks
if is_image:
stream = io.BytesIO(bytes(await scrypted_sdk.mediaManager.convertMediaObjectToBuffer(mediaObject, 'image/jpeg')))
image = Image.open(stream)
if detection_session:
if not detection_session.user_callback:
detection_session.user_callback = self.create_user_callback(self.run_detection_image, detection_session, duration)
def convert_to_src_size(point, normalize = False):
x, y = point
return (int(math.ceil(x)), int(math.ceil(y)), True)
detection_session.running = True
try:
return await detection_session.user_callback(image, image.size, convert_to_src_size)
finally:
detection_session.running = False
else:
return await self.run_detection_jpeg(detection_session, bytes(await scrypted_sdk.mediaManager.convertMediaObjectToBuffer(mediaObject, 'image/jpeg')), settings)
if not create:
# a detection session may have been created, but not started
# if the initial request was for an image.
# however, attached sessions should be unchoked, as the pipeline
# is not managed here.
if not detection_session or detection_session.running or not mediaObject:
return objects_detected
detection_id = detection_session.id
detection_session.running = True
print('detection starting', detection_id)
b = await scrypted_sdk.mediaManager.convertMediaObjectToBuffer(mediaObject, ScryptedMimeTypes.FFmpegInput.value)
s = b.decode('utf8')
j: FFmpegInput = json.loads(s)
container = j.get('container', None)
videosrc = j['url']
videoCodec = optional_chain(j, 'mediaStreamOptions', 'video', 'codec')
decoder = settings and settings.get('decoder')
if decoder == 'Default':
decoder = None
if decoder == 'libav' and not av:
decoder = None
elif decoder != 'libav' and not Gst:
decoder = None
if not decoder:
if Gst:
if videoCodec == 'h264':
# hw acceleration is "safe" to use on mac, but not
# on other hosts where it may crash.
# defaults must be safe.
if platform.system() == 'Darwin':
decoder = 'vtdec_hw'
else:
decoder = 'avdec_h264'
else:
# decodebin may pick a hardware accelerated decoder, which isn't ideal
# so use a known software decoder for h264 and decodebin for anything else.
decoder = 'decodebin'
elif av:
decoder = 'libav'
if decoder == 'libav':
user_callback = self.create_user_callback(self.run_detection_avframe, detection_session, duration)
async def inference_loop():
options = {
'analyzeduration': '0',
'probesize': '500000',
'reorder_queue_size': '0',
}
container = av.open(videosrc, options = options)
stream = container.streams.video[0]
start = 0
for idx, frame in enumerate(container.decode(stream)):
if detection_session.future.done():
container.close()
break
now = time.time()
if not start:
start = now
elapsed = now - start
if (frame.time or 0) < elapsed - 0.500:
# print('too slow, skipping frame')
continue
# print(frame)
size = (frame.width, frame.height)
def convert_to_src_size(point, normalize = False):
x, y = point
return (int(math.ceil(x)), int(math.ceil(y)), True)
await user_callback(frame, size, convert_to_src_size)
def thread_main():
loop = asyncio.new_event_loop()
loop.run_until_complete(inference_loop())
thread = threading.Thread(target=thread_main)
thread.start()
return self.create_detection_result_status(detection_id, True)
if not Gst:
raise Exception('Gstreamer is unavailable')
if videosrc.startswith('tcp://'):
parsed_url = urlparse(videosrc)
videosrc = 'tcpclientsrc port=%s host=%s' % (
parsed_url.port, parsed_url.hostname)
if container == 'mpegts':
videosrc += ' ! tsdemux'
elif container == 'sdp':
videosrc += ' ! sdpdemux'
else:
raise Exception('unknown container %s' % container)
elif videosrc.startswith('rtsp'):
videosrc = 'rtspsrc buffer-mode=0 location=%s protocols=tcp latency=0 is-live=false' % videosrc
if videoCodec == 'h264':
videosrc += ' ! rtph264depay ! h264parse'
videosrc += " ! %s" % decoder
width = optional_chain(j, 'mediaStreamOptions',
'video', 'width') or 1920
height = optional_chain(j, 'mediaStreamOptions',
'video', 'height') or 1080
src_size = (width, height)
self.run_pipeline(detection_session, duration, src_size, videosrc)
return self.create_detection_result_status(detection_id, True)
return await self.run_detection_videoframe(vf, session)
def get_pixel_format(self):
return 'RGB'
def create_pipeline_sink(self, src_size) -> DetectionSink:
inference_size = self.get_detection_input_size(src_size)
ret: DetectionSink = {}
ret['input_size'] = inference_size
ret['pipeline'] = create_pipeline_sink(
type(self).__name__, inference_size, self.get_pixel_format())
return ret
async def detection_event_notified(self, settings: Any):
pass
async def createMedia(self, data: Any) -> MediaObject:
pass
def invalidateMedia(self, detection_session: DetectionSession, data: Any):
pass
def create_user_callback(self, run_detection: Any, detection_session: DetectionSession, duration: float):
first_frame = True
current_data = None
current_src_size = None
current_convert_to_src_size = None
async def redetect(boundingBox: Tuple[float, float, float, float]):
nonlocal current_data
nonlocal current_src_size
nonlocal current_convert_to_src_size
if not current_data:
raise Exception('no sample')
detection_result = await self.run_detection_crop(
detection_session, current_data, detection_session.settings, current_src_size, current_convert_to_src_size, boundingBox)
return detection_result['detections']
async def user_callback(sample, src_size, convert_to_src_size):
try:
detection_session.last_sample = time.time()
nonlocal first_frame
if first_frame:
first_frame = False
print("first frame received", detection_session.id)
detection_result, data = await run_detection(
detection_session, sample, detection_session.settings, src_size, convert_to_src_size)
if detection_result:
detection_result['running'] = True
mo = None
retain = False
def maybeInvalidate():
if not retain:
self.invalidateMedia(detection_session, data)
# else:
# print('retaining')
mo = await self.createMedia(data)
try:
nonlocal current_data
nonlocal current_src_size
nonlocal current_convert_to_src_size
try:
current_data = data
current_src_size = src_size
current_convert_to_src_size = convert_to_src_size
retain = await run_coro_threadsafe(self.detection_event(detection_session, detection_result, redetect, mo), other_loop=detection_session.loop)
finally:
current_data = None
current_convert_to_src_size = None
current_src_size = None
maybeInvalidate()
except Exception as e:
print(e)
self.invalidateMedia(detection_session, data)
# asyncio.run_coroutine_threadsafe(, loop = self.loop).result()
await self.detection_event_notified(detection_session.settings)
if not detection_session or duration == None:
safe_set_result(detection_session.loop,
detection_session.future)
return detection_result
finally:
pass
return user_callback
def run_pipeline(self, detection_session: DetectionSession, duration, src_size, video_input):
inference_size = self.get_detection_input_size(src_size)
pipeline = run_pipeline(detection_session.loop, detection_session.future, self.create_user_callback(self.run_detection_gstsample, detection_session, duration),
appsink_name=type(self).__name__,
appsink_size=inference_size,
video_input=video_input,
pixel_format=self.get_pixel_format(),
crop=self.crop,
)
task = pipeline.run()
asyncio.ensure_future(task)

View File

@@ -1,315 +0,0 @@
from asyncio.events import AbstractEventLoop
from asyncio.futures import Future
import threading
from .safe_set_result import safe_set_result
import math
import asyncio
try:
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)
except:
pass
class GstPipelineBase:
def __init__(self, loop: AbstractEventLoop, finished: Future) -> None:
self.loop = loop
self.finished = finished
self.gst = None
def attach_launch(self, gst):
self.gst = gst
def parse_launch(self, pipeline: str):
self.attach_launch(Gst.parse_launch(pipeline))
# Set up a pipeline bus watch to catch errors.
self.bus = self.gst.get_bus()
self.watchId = self.bus.connect('message', self.on_bus_message)
self.bus.add_signal_watch()
def on_bus_message(self, bus, message):
# seeing the following error on pi 32 bit
# OverflowError: Python int too large to convert to C long
t = str(message.type)
if t == str(Gst.MessageType.EOS):
safe_set_result(self.loop, self.finished)
elif t == str(Gst.MessageType.WARNING):
err, debug = message.parse_warning()
print('Warning: %s: %s\n' % (err, debug))
elif t == str(Gst.MessageType.ERROR):
err, debug = message.parse_error()
print('Error: %s: %s\n' % (err, debug))
safe_set_result(self.loop, self.finished)
return True
async def run_attached(self):
try:
await self.finished
except:
pass
async def attach(self):
pass
async def detach(self):
pass
async def run(self):
await self.attach()
# Run pipeline.
self.gst.set_state(Gst.State.PLAYING)
try:
await self.run_attached()
finally:
# Clean up.
self.bus.remove_signal_watch()
self.bus.disconnect(self.watchId)
self.gst.set_state(Gst.State.NULL)
self.bus = None
self.watchId = None
self.gst = None
await self.detach()
class GstPipeline(GstPipelineBase):
def __init__(self, loop: AbstractEventLoop, finished: Future, appsink_name: str, user_callback, crop=False):
super().__init__(loop, finished)
self.appsink_name = appsink_name
self.user_callback = user_callback
self.running = False
self.gstsample = None
self.sink_size = None
self.src_size = None
self.dst_size = None
self.pad_size = None
self.scale_size = None
self.crop = crop
self.condition = None
def attach_launch(self, gst):
super().attach_launch(gst)
appsink = self.gst.get_by_name(self.appsink_name)
appsink.connect('new-preroll', self.on_new_sample, True)
appsink.connect('new-sample', self.on_new_sample, False)
async def attach(self):
# Start inference worker.
self.running = True
worker = threading.Thread(target=self.inference_main)
worker.start()
while not self.condition:
await asyncio.sleep(.1)
async def detach(self):
async def notifier():
async with self.condition:
self.condition.notify_all()
self.running = False
asyncio.run_coroutine_threadsafe(notifier(), loop = self.selfLoop)
def on_new_sample(self, sink, preroll):
sample = sink.emit('pull-preroll' if preroll else 'pull-sample')
if not self.sink_size:
s = sample.get_caps().get_structure(0)
self.sink_size = (s.get_value('width'), s.get_value('height'))
self.gstsample = sample
async def notifier():
async with self.condition:
self.condition.notify_all()
try:
if self.running:
asyncio.run_coroutine_threadsafe(notifier(), loop = self.selfLoop).result()
except Exception as e:
# now what?
# print('sample error')
# print(e)
pass
return Gst.FlowReturn.OK
def get_src_size(self):
if not self.src_size:
videoconvert = self.gst.get_by_name('videoconvert')
structure = videoconvert.srcpads[0].get_current_caps(
).get_structure(0)
_, w = structure.get_int('width')
_, h = structure.get_int('height')
self.src_size = (w, h)
videoscale = self.gst.get_by_name('videoscale')
structure = videoscale.srcpads[0].get_current_caps(
).get_structure(0)
_, w = structure.get_int('width')
_, h = structure.get_int('height')
self.dst_size = (w, h)
appsink = self.gst.get_by_name(self.appsink_name)
structure = appsink.sinkpads[0].get_current_caps().get_structure(0)
_, w = structure.get_int('width')
_, h = structure.get_int('height')
self.dst_size = (w, h)
# the dimension with the higher scale value got cropped or boxed.
# use the other dimension to figure out the crop/box amount.
scales = (self.dst_size[0] / self.src_size[0],
self.dst_size[1] / self.src_size[1])
if self.crop:
scale = max(scales[0], scales[1])
else:
scale = min(scales[0], scales[1])
self.scale_size = scale
dx = self.src_size[0] * scale
dy = self.src_size[1] * scale
px = math.ceil((self.dst_size[0] - dx) / 2)
py = math.ceil((self.dst_size[1] - dy) / 2)
self.pad_size = (px, py)
return self.src_size
def convert_to_src_size(self, point, normalize=False):
valid = True
px, py = self.pad_size
x, y = point
if normalize:
x = max(0, x)
x = min(x, self.src_size[0] - 1)
y = max(0, y)
y = min(y, self.src_size[1] - 1)
x = (x - px) / self.scale_size
if x < 0:
x = 0
valid = False
if x >= self.src_size[0]:
x = self.src_size[0] - 1
valid = False
y = (y - py) / self.scale_size
if y < 0:
y = 0
valid = False
if y >= self.src_size[1]:
y = self.src_size[1] - 1
valid = False
return (int(math.ceil(x)), int(math.ceil(y)), valid)
def inference_main(self):
loop = asyncio.new_event_loop()
self.selfLoop = loop
try:
loop.run_until_complete(self.inference_loop())
finally:
loop.close()
async def inference_loop(self):
self.condition = asyncio.Condition()
while self.running:
async with self.condition:
while not self.gstsample and self.running:
await self.condition.wait()
if not self.running:
return
gstsample = self.gstsample
self.gstsample = None
try:
await self.user_callback(gstsample, self.get_src_size(
), lambda p, normalize=False: self.convert_to_src_size(p, normalize))
except Exception as e:
print("callback failure")
print(e)
raise
def get_dev_board_model():
try:
model = open('/sys/firmware/devicetree/base/model').read().lower()
if 'mx8mq' in model:
return 'mx8mq'
if 'mt8167' in model:
return 'mt8167'
except:
pass
return None
def create_pipeline_sink(
appsink_name,
appsink_size,
pixel_format,
crop=False):
SINK_ELEMENT = 'appsink name={appsink_name} emit-signals=true max-buffers=-1 drop=true sync=false'.format(
appsink_name=appsink_name)
(width, height) = appsink_size
SINK_CAPS = 'video/x-raw,format={pixel_format}'
if width and height:
SINK_CAPS += ',width={width},height={height},pixel-aspect-ratio=1/1'
sink_caps = SINK_CAPS.format(
width=width, height=height, pixel_format=pixel_format)
pipeline = " {sink_caps} ! {sink_element}".format(
sink_caps=sink_caps,
sink_element=SINK_ELEMENT)
return pipeline
def create_pipeline(
appsink_name,
appsink_size,
video_input,
pixel_format,
crop=False,
parse_only=False):
if parse_only:
sink = 'appsink name={appsink_name} emit-signals=true sync=false'.format(
appsink_name=appsink_name)
PIPELINE = """ {video_input}
! {sink}
"""
else:
sink = create_pipeline_sink(
appsink_name, appsink_size, pixel_format, crop=crop)
if crop:
PIPELINE = """ {video_input} ! queue leaky=downstream max-size-buffers=0 ! videoconvert name=videoconvert ! aspectratiocrop aspect-ratio=1/1 ! videoscale name=videoscale ! queue leaky=downstream max-size-buffers=0
! {sink}
"""
else:
PIPELINE = """ {video_input} ! queue leaky=downstream max-size-buffers=0 ! videoconvert name=videoconvert ! videoscale name=videoscale ! queue leaky=downstream max-size-buffers=0
! {sink}
"""
pipeline = PIPELINE.format(video_input=video_input, sink=sink)
print('Gstreamer pipeline:\n', pipeline)
return pipeline
def run_pipeline(loop, finished,
user_callback,
appsink_name,
appsink_size,
video_input,
pixel_format,
crop=False,
parse_only=False):
gst = GstPipeline(loop, finished, appsink_name, user_callback, crop=crop)
pipeline = create_pipeline(
appsink_name, appsink_size, video_input, pixel_format, crop=crop, parse_only=parse_only)
gst.parse_launch(pipeline)
return gst

View File

@@ -1,11 +0,0 @@
from asyncio.futures import Future
from asyncio import AbstractEventLoop
def safe_set_result(loop: AbstractEventLoop, future: Future):
def loop_set_result():
try:
if not future.done():
future.set_result(None)
except:
pass
loop.call_soon_threadsafe(loop_set_result)

View File

@@ -1,34 +1,30 @@
from __future__ import annotations
from scrypted_sdk.types import ObjectDetectionResult, ObjectsDetected, Setting
import io
from PIL import Image
import re
import scrypted_sdk
from typing import Any, List, Tuple, Mapping
import asyncio
import time
from .rectangle import Rectangle, intersect_area, intersect_rect, to_bounding_box, from_bounding_box, combine_rect
import urllib.request
import os
import re
import urllib.request
from typing import Any, List, Tuple
from detect import DetectionSession, DetectPlugin
import scrypted_sdk
from PIL import Image
from scrypted_sdk.types import (ObjectDetectionResult, ObjectDetectionSession,
ObjectsDetected, Setting)
import numpy as np
import traceback
from detect import DetectPlugin
try:
from gi.repository import Gst
except:
pass
from .rectangle import (Rectangle, combine_rect, from_bounding_box,
intersect_area, intersect_rect, to_bounding_box)
class PredictSession(DetectionSession):
image: Image.Image
def __init__(self, start_time: float) -> None:
super().__init__()
self.image = None
self.processed = 0
self.start_time = start_time
def ensureRGBData(data: bytes, size: Tuple[int, int], format: str):
if format == 'rgba':
rgba = Image.frombuffer('RGBA', size, data)
try:
return rgba.convert('RGB')
finally:
rgba.close()
else:
return Image.frombuffer('RGB', size, data)
def parse_label_contents(contents: str):
lines = contents.splitlines()
@@ -144,42 +140,6 @@ class PredictPlugin(DetectPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Set
mo = await scrypted_sdk.mediaManager.createMediaObject(data, self.fromMimeType)
return mo
def end_session(self, detection_session: PredictSession):
image = detection_session.image
if image:
detection_session.image = None
image.close()
dps = detection_session.processed / (time.time() - detection_session.start_time)
print("Detections per second %s" % dps)
return super().end_session(detection_session)
def invalidateMedia(self, detection_session: PredictSession, data: RawImage):
if not data:
return
image = data.image
data.image = None
if image:
if not detection_session.image:
detection_session.image = image
else:
image.close()
data.jpegMediaObject = None
async def convert(self, data: RawImage, fromMimeType: str, toMimeType: str, options: scrypted_sdk.BufferConvertorOptions = None) -> Any:
mo = data.jpegMediaObject
if not mo:
image = data.image
if not image:
raise Exception('data is no longer valid')
bio = io.BytesIO()
image.save(bio, format='JPEG')
jpegBytes = bio.getvalue()
mo = await scrypted_sdk.mediaManager.createMediaObject(jpegBytes, 'image/jpeg')
data.jpegMediaObject = mo
return mo
def requestRestart(self):
asyncio.ensure_future(scrypted_sdk.deviceManager.requestRestart())
@@ -242,15 +202,6 @@ class PredictPlugin(DetectPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Set
# print(detection_result)
return detection_result
async def run_detection_jpeg(self, detection_session: PredictSession, image_bytes: bytes, settings: Any) -> ObjectsDetected:
stream = io.BytesIO(image_bytes)
image = Image.open(stream)
if image.mode == 'RGBA':
image = image.convert('RGB')
detections, _ = await self.run_detection_image(detection_session, image, settings, image.size)
return detections
def get_detection_input_size(self, src_size):
# signals to pipeline that any input size is fine
# previous code used to resize to correct size and run detection that way.
@@ -264,8 +215,8 @@ class PredictPlugin(DetectPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Set
async def detect_once(self, input: Image.Image, settings: Any, src_size, cvss) -> ObjectsDetected:
pass
async def run_detection_videoframe(self, videoFrame: scrypted_sdk.VideoFrame, detection_session: PredictSession) -> ObjectsDetected:
settings = detection_session and detection_session.settings
async def run_detection_videoframe(self, videoFrame: scrypted_sdk.VideoFrame, detection_session: ObjectDetectionSession) -> ObjectsDetected:
settings = detection_session and detection_session.get('settings')
src_size = videoFrame.width, videoFrame.height
w, h = self.get_input_size()
iw, ih = src_size
@@ -279,10 +230,7 @@ class PredictPlugin(DetectPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Set
data = await videoFrame.toBuffer({
'format': videoFrame.format or 'rgb',
})
if videoFrame.format == 'rgba':
image = Image.frombuffer('RGBA', (w, h), data).convert('RGB')
else:
image = Image.frombuffer('RGB', (w, h), data)
image = ensureRGBData(data, (w, h), videoFrame.format)
try:
ret = await self.detect_once(image, settings, src_size, cvss)
return ret
@@ -327,14 +275,8 @@ class PredictPlugin(DetectPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Set
})
)
if videoFrame.format == 'rgba':
first = Image.frombuffer('RGBA', (w, h), firstData).convert('RGB')
else:
first = Image.frombuffer('RGB', (w, h), firstData)
if videoFrame.format == 'rgba':
second = Image.frombuffer('RGBA', (w, h), secondData).convert('RGB')
else:
second = Image.frombuffer('RGB', (w, h), secondData)
first = ensureRGBData(firstData, (w, h), videoFrame.format)
second = ensureRGBData(secondData, (w, h), videoFrame.format)
def cvss1(point, normalize=False):
return point[0] / s, point[1] / s, True
@@ -375,174 +317,3 @@ class PredictPlugin(DetectPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Set
ret = ret1
ret['detections'] = dedupe_detections(ret1['detections'] + ret2['detections'], is_same_detection=is_same_detection_middle)
return ret
async def run_detection_image(self, detection_session: PredictSession, image: Image.Image, settings: Any, src_size, convert_to_src_size: Any = None, multipass_crop: Tuple[float, float, float, float] = None):
(w, h) = self.get_input_size() or image.size
(iw, ih) = image.size
# this a single pass or the second pass. detect once and return results.
if multipass_crop:
(l, t, dx, dy) = multipass_crop
# find center
cx = l + dx / 2
cy = t + dy / 2
# fix aspect ratio on box
if dx / w > dy / h:
dy = dx / w * h
else:
dx = dy / h * w
if dx > image.width:
s = image.width / dx
dx = image.width
dy *= s
if dy > image.height:
s = image.height / dy
dy = image.height
dx *= s
# crop size to fit input size
if dx < w:
dx = w
if dy < h:
dy = h
l = cx - dx / 2
t = cy - dy / 2
if l < 0:
l = 0
if t < 0:
t = 0
if l + dx > iw:
l = iw - dx
if t + dy > ih:
t = ih - dy
crop_box = (l, t, l + dx, t + dy)
if dx == w and dy == h:
input = image.crop(crop_box)
else:
input = image.resize((w, h), Image.ANTIALIAS, crop_box)
def cvss(point, normalize=False):
unscaled = ((point[0] / w) * dx + l, (point[1] / h) * dy + t)
converted = convert_to_src_size(unscaled, normalize) if convert_to_src_size else (unscaled[0], unscaled[1], True)
return converted
ret = await self.detect_once(input, settings, src_size, cvss)
input.close()
detection_session.processed = detection_session.processed + 1
return ret, RawImage(image)
ws = w / iw
hs = h / ih
s = max(ws, hs)
if ws == 1 and hs == 1:
def cvss(point, normalize=False):
converted = convert_to_src_size(point, normalize) if convert_to_src_size else (point[0], point[1], True)
return converted
ret = await self.detect_once(image, settings, src_size, cvss)
if detection_session:
detection_session.processed = detection_session.processed + 1
else:
sw = int(w / s)
sh = int(h / s)
first_crop = (0, 0, sw, sh)
first = image.resize((w, h), Image.ANTIALIAS, first_crop)
ow = iw - sw
oh = ih - sh
second_crop = (ow, oh, ow + sw, oh + sh)
second = image.resize((w, h), Image.ANTIALIAS, second_crop)
def cvss1(point, normalize=False):
unscaled = (point[0] / s, point[1] / s)
converted = convert_to_src_size(unscaled, normalize) if convert_to_src_size else (unscaled[0], unscaled[1], True)
return converted
def cvss2(point, normalize=False):
unscaled = (point[0] / s + ow, point[1] / s + oh)
converted = convert_to_src_size(unscaled, normalize) if convert_to_src_size else (unscaled[0], unscaled[1], True)
return converted
ret1 = await self.detect_once(first, settings, src_size, cvss1)
first.close()
if detection_session:
detection_session.processed = detection_session.processed + 1
ret2 = await self.detect_once(second, settings, src_size, cvss2)
if detection_session:
detection_session.processed = detection_session.processed + 1
second.close()
two_intersect = intersect_rect(Rectangle(*first_crop), Rectangle(*second_crop))
def is_same_detection_middle(d1: ObjectDetectionResult, d2: ObjectDetectionResult):
same, ret = is_same_detection(d1, d2)
if same:
return same, ret
if d1['className'] != d2['className']:
return False, None
r1 = from_bounding_box(d1['boundingBox'])
m1 = intersect_rect(two_intersect, r1)
if not m1:
return False, None
r2 = from_bounding_box(d2['boundingBox'])
m2 = intersect_rect(two_intersect, r2)
if not m2:
return False, None
same, ret = is_same_box(to_bounding_box(m1), to_bounding_box(m2))
if not same:
return False, None
c = to_bounding_box(combine_rect(r1, r2))
return True, c
ret = ret1
ret['detections'] = dedupe_detections(ret1['detections'] + ret2['detections'], is_same_detection=is_same_detection_middle)
if not len(ret['detections']):
return ret, RawImage(image)
return ret, RawImage(image)
async def run_detection_crop(self, detection_session: DetectionSession, sample: RawImage, settings: Any, src_size, convert_to_src_size, bounding_box: Tuple[float, float, float, float]) -> ObjectsDetected:
(ret, _) = await self.run_detection_image(detection_session, sample.image, settings, src_size, convert_to_src_size, bounding_box)
return ret
async def run_detection_gstsample(self, detection_session: PredictSession, gstsample, settings: Any, src_size, convert_to_src_size) -> Tuple[ObjectsDetected, Image.Image]:
caps = gstsample.get_caps()
# can't trust the width value, compute the stride
height = caps.get_structure(0).get_value('height')
width = caps.get_structure(0).get_value('width')
gst_buffer = gstsample.get_buffer()
result, info = gst_buffer.map(Gst.MapFlags.READ)
if not result:
return
try:
image = detection_session.image
detection_session.image = None
if image and (image.width != width or image.height != height):
image.close()
image = None
if image:
image.frombytes(bytes(info.data))
else:
image = Image.frombuffer('RGB', (width, height), bytes(info.data))
finally:
gst_buffer.unmap(info)
try:
return await self.run_detection_image(detection_session, image, settings, src_size, convert_to_src_size)
except:
image.close()
traceback.print_exc()
raise
def create_detection_session(self):
return PredictSession(start_time=time.time())

View File

@@ -1 +0,0 @@
../../../sort-tracker/sort_oh/libs

View File

@@ -1,10 +1,7 @@
--extra-index-url https://google-coral.github.io/py-repo/
pycoral~=2.0
tflite-runtime==2.5.0.post1
# plugin
numpy>=1.16.2
# pillow for anything not intel linux
# pillow for anything not intel linux, pillow-simd is available on x64 linux
Pillow>=5.4.1; sys_platform != 'linux' or platform_machine != 'x86_64'
pillow-simd; sys_platform == 'linux' and platform_machine == 'x86_64'
pycoral~=2.0
PyGObject>=3.30.4; sys_platform != 'win32'
tflite-runtime==2.5.0.post1

View File

@@ -1,5 +1,4 @@
from __future__ import annotations
import threading
from .common import *
from PIL import Image
from pycoral.adapters import detect
@@ -21,7 +20,6 @@ from predict import PredictPlugin
import concurrent.futures
import queue
import asyncio
from time import time
def parse_label_contents(contents: str):
lines = contents.splitlines()
@@ -117,7 +115,6 @@ class TensorFlowLitePlugin(PredictPlugin, scrypted_sdk.BufferConverter, scrypted
def predict():
interpreter = self.interpreters.get()
try:
print('predict s %s' % time())
common.set_input(
interpreter, input)
scale = (1, 1)
@@ -133,7 +130,6 @@ class TensorFlowLitePlugin(PredictPlugin, scrypted_sdk.BufferConverter, scrypted
raise e
finally:
self.interpreters.put(interpreter)
print('predict e %s' % time())
objs = await asyncio.get_event_loop().run_in_executor(self.executor, predict)

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/server",
"version": "0.7.35",
"version": "0.7.36",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/server",
"version": "0.7.35",
"version": "0.7.36",
"license": "ISC",
"dependencies": {
"@mapbox/node-pre-gyp": "^1.0.10",

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/server",
"version": "0.7.36",
"version": "0.7.37",
"description": "",
"dependencies": {
"@mapbox/node-pre-gyp": "^1.0.10",

View File

@@ -311,7 +311,7 @@ export class PluginHost {
this.worker.stdout.on('data', data => console.log(data.toString()));
this.worker.stderr.on('data', data => console.error(data.toString()));
const consoleHeader = `${os.platform()} ${os.arch()} ${os.machine()} ${os.version()}\nserver version: ${serverVersion}\nplugin version: ${this.pluginId} ${this.packageJson.version}\n`;
const consoleHeader = `${os.platform()} ${os.arch()} ${os.version()}\nserver version: ${serverVersion}\nplugin version: ${this.pluginId} ${this.packageJson.version}\n`;
this.consoleServer = createConsoleServer(this.worker.stdout, this.worker.stderr, consoleHeader);
const disconnect = () => {