Compare commits

..

14 Commits

Author SHA1 Message Date
Koushik Dutta
fae66619fb prepublish 2023-03-17 19:16:50 -07:00
Koushik Dutta
d979b9ec0c server: connection.poll should provide None to block forever 2023-03-17 19:16:15 -07:00
Koushik Dutta
975319a65d motion: implement a default inclusion zone that prevents on screen clocks from triggering motion 2023-03-17 16:19:50 -07:00
Koushik Dutta
7b5aa4ba2d python-codecs: remove erroneous libav from gstreamer settings 2023-03-17 16:19:20 -07:00
Koushik Dutta
670739c82b python-codecs: restructure, add gstreamer decoder option 2023-03-17 10:28:41 -07:00
Koushik Dutta
8511bd15a8 server: update package lock 2023-03-16 23:59:19 -07:00
Koushik Dutta
06d3c89274 prepublish 2023-03-16 23:59:10 -07:00
Koushik Dutta
e13f3eb2f1 server: add python forked processes to stats 2023-03-16 23:59:01 -07:00
Koushik Dutta
001918d613 predict: fix detections from webui 2023-03-16 23:58:45 -07:00
Koushik Dutta
c859c3aa40 detect: publish plugins with new video pipeline support 2023-03-16 23:40:33 -07:00
Koushik Dutta
2bce019677 predict: make models a separate download 2023-03-16 23:29:02 -07:00
Koushik Dutta
6ba3386157 detect: fix peer kill causing exception inside finally handler 2023-03-16 22:10:25 -07:00
Koushik Dutta
51e66d98f9 videoanalysis: changing motion detect mode should restart motion detection 2023-03-16 22:09:56 -07:00
Koushik Dutta
6484804649 server: update package lock 2023-03-16 20:37:54 -07:00
32 changed files with 476 additions and 405 deletions

View File

@@ -1,20 +0,0 @@
#!/bin/sh
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
rm -rf all_models
mkdir -p all_models
cd all_models
wget https://github.com/koush/coreml-survival-guide/raw/master/MobileNetV2%2BSSDLite/ObjectDetection/ObjectDetection/MobileNetV2_SSDLite.mlmodel
wget https://raw.githubusercontent.com/koush/coreml-survival-guide/master/MobileNetV2%2BSSDLite/coco_labels.txt

View File

@@ -1 +0,0 @@
../all_models/MobileNetV2_SSDLite.mlmodel

View File

@@ -1 +0,0 @@
../all_models/coco_labels.txt

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/coreml",
"version": "0.0.27",
"version": "0.1.2",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/coreml",
"version": "0.0.27",
"version": "0.1.2",
"devDependencies": {
"@scrypted/sdk": "file:../../sdk"
}

View File

@@ -41,5 +41,5 @@
"devDependencies": {
"@scrypted/sdk": "file:../../sdk"
},
"version": "0.0.27"
"version": "0.1.2"
}

View File

@@ -29,16 +29,17 @@ class CoreMLPlugin(PredictPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Set
def __init__(self, nativeId: str | None = None):
super().__init__(MIME_TYPE, nativeId=nativeId)
modelPath = os.path.join(os.environ['SCRYPTED_PLUGIN_VOLUME'], 'zip', 'unzipped', 'fs', 'MobileNetV2_SSDLite.mlmodel')
self.model = ct.models.MLModel(modelPath)
labelsFile = self.downloadFile('https://raw.githubusercontent.com/koush/coreml-survival-guide/master/MobileNetV2%2BSSDLite/coco_labels.txt', 'coco_labels.txt')
modelFile = self.downloadFile('https://github.com/koush/coreml-survival-guide/raw/master/MobileNetV2%2BSSDLite/ObjectDetection/ObjectDetection/MobileNetV2_SSDLite.mlmodel', 'MobileNetV2_SSDLite.mlmodel')
self.model = ct.models.MLModel(modelFile)
self.modelspec = self.model.get_spec()
self.inputdesc = self.modelspec.description.input[0]
self.inputheight = self.inputdesc.type.imageType.height
self.inputwidth = self.inputdesc.type.imageType.width
labels_contents = scrypted_sdk.zip.open(
'fs/coco_labels.txt').read().decode('utf8')
labels_contents = open(labelsFile, 'r').read()
self.labels = parse_label_contents(labels_contents)
self.loop = asyncio.get_event_loop()

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/objectdetector",
"version": "0.0.106",
"version": "0.0.108",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/objectdetector",
"version": "0.0.106",
"version": "0.0.108",
"license": "Apache-2.0",
"dependencies": {
"@scrypted/common": "file:../../common",

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/objectdetector",
"version": "0.0.106",
"version": "0.0.108",
"description": "Scrypted Video Analysis Plugin. Installed alongside a detection service like OpenCV or TensorFlow.",
"author": "Scrypted",
"license": "Apache-2.0",
@@ -38,7 +38,10 @@
"Settings",
"MixinProvider"
],
"realfs": true
"realfs": true,
"pluginDependencies": [
"@scrypted/python-codecs"
]
},
"dependencies": {
"@scrypted/common": "file:../../common",

View File

@@ -75,6 +75,10 @@ class ObjectDetectionMixin extends SettingsMixinDeviceBase<VideoCamera & Camera
BUILTIN_MOTION_SENSOR_REPLACE,
],
defaultValue: "Default",
onPut: () => {
this.endObjectDetection();
this.maybeStartMotionDetection();
}
},
captureMode: {
title: 'Capture Mode',
@@ -593,6 +597,9 @@ class ObjectDetectionMixin extends SettingsMixinDeviceBase<VideoCamera & Camera
// this.console.log('image saved', detected.detected.detections);
}
this.reportObjectDetections(detected.detected);
if (this.hasMotionType) {
await sleep(250);
}
// this.handleDetectionEvent(detected.detected);
}
}
@@ -677,6 +684,19 @@ class ObjectDetectionMixin extends SettingsMixinDeviceBase<VideoCamera & Camera
}
}
normalizeBox(boundingBox: [number, number, number, number], inputDimensions: [number, number]) {
let [x, y, width, height] = boundingBox;
let x2 = x + width;
let y2 = y + height;
// the zones are point paths in percentage format
x = x * 100 / inputDimensions[0];
y = y * 100 / inputDimensions[1];
x2 = x2 * 100 / inputDimensions[0];
y2 = y2 * 100 / inputDimensions[1];
const box = [[x, y], [x2, y], [x2, y2], [x, y2]];
return box;
}
getDetectionDuration() {
// when motion type, the detection interval is a keepalive reset.
// the duration needs to simply be an arbitrarily longer time.
@@ -693,15 +713,7 @@ class ObjectDetectionMixin extends SettingsMixinDeviceBase<VideoCamera & Camera
continue;
o.zones = []
let [x, y, width, height] = o.boundingBox;
let x2 = x + width;
let y2 = y + height;
// the zones are point paths in percentage format
x = x * 100 / detection.inputDimensions[0];
y = y * 100 / detection.inputDimensions[1];
x2 = x2 * 100 / detection.inputDimensions[0];
y2 = y2 * 100 / detection.inputDimensions[1];
const box = [[x, y], [x2, y], [x2, y2], [x, y2]];
const box = this.normalizeBox(o.boundingBox, detection.inputDimensions);
let included: boolean;
for (const [zone, zoneValue] of Object.entries(this.zones)) {
@@ -741,6 +753,14 @@ class ObjectDetectionMixin extends SettingsMixinDeviceBase<VideoCamera & Camera
}
}
// if this is a motion sensor and there are no inclusion zones set up,
// use a default inclusion zone that crops the top and bottom to
// prevents errant motion from the on screen time changing every second.
if (this.hasMotionType && included === undefined) {
const defaultInclusionZone = [[0, 10], [100, 10], [100, 90], [0, 90]];
included = polygonOverlap(box, defaultInclusionZone);
}
// if there are inclusion zones and this object
// was not in any of them, filter it out.
if (included === false)

View File

@@ -9,3 +9,4 @@ dist/*.js
dist/*.txt
__pycache__
all_models
.venv

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/opencv",
"version": "0.0.64",
"version": "0.0.66",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/opencv",
"version": "0.0.64",
"version": "0.0.66",
"devDependencies": {
"@scrypted/sdk": "file:../../sdk"
}

View File

@@ -36,5 +36,5 @@
"devDependencies": {
"@scrypted/sdk": "file:../../sdk"
},
"version": "0.0.64"
"version": "0.0.66"
}

View File

@@ -273,7 +273,7 @@ class OpenCVPlugin(DetectPlugin):
buffer=info.data,
dtype=np.uint8)
detections = self.detect(
detection_session, mat, settings, src_size, convert_to_src_size)
detection_session, mat, src_size, convert_to_src_size)
# no point in triggering empty events.
finally:
buf.unmap(info)

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/python-codecs",
"version": "0.1.5",
"version": "0.1.8",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@scrypted/python-codecs",
"version": "0.1.5",
"version": "0.1.8",
"devDependencies": {
"@scrypted/sdk": "file:../../sdk"
}

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/python-codecs",
"version": "0.1.5",
"version": "0.1.8",
"description": "Python Codecs for Scrypted",
"keywords": [
"scrypted",

View File

@@ -0,0 +1,136 @@
import concurrent.futures
import threading
import asyncio
from queue import Queue
try:
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
from gi.repository import GLib, GObject, Gst
GObject.threads_init()
Gst.init(None)
except:
pass
class Callback:
def __init__(self, callback) -> None:
if callback:
self.loop = asyncio.get_running_loop()
self.callback = callback
else:
self.loop = None
self.callback = None
def createPipelineIterator(pipeline: str):
pipeline = '{pipeline} ! queue leaky=downstream max-size-buffers=0 ! appsink name=appsink emit-signals=true sync=false max-buffers=-1 drop=true'.format(pipeline=pipeline)
print(pipeline)
gst = Gst.parse_launch(pipeline)
bus = gst.get_bus()
def on_bus_message(bus, message):
t = str(message.type)
# print(t)
if t == str(Gst.MessageType.EOS):
finish()
elif t == str(Gst.MessageType.WARNING):
err, debug = message.parse_warning()
print('Warning: %s: %s\n' % (err, debug))
elif t == str(Gst.MessageType.ERROR):
err, debug = message.parse_error()
print('Error: %s: %s\n' % (err, debug))
finish()
def stopGst():
bus.remove_signal_watch()
bus.disconnect(watchId)
gst.set_state(Gst.State.NULL)
def finish():
nonlocal hasFinished
hasFinished = True
callback = Callback(None)
callbackQueue.put(callback)
if not asyncFuture.done():
asyncFuture.set_result(None)
if not finished.done():
finished.set_result(None)
watchId = bus.connect('message', on_bus_message)
bus.add_signal_watch()
finished = concurrent.futures.Future()
finished.add_done_callback(lambda _: threading.Thread(target=stopGst, name="StopGst").start())
hasFinished = False
appsink = gst.get_by_name('appsink')
callbackQueue = Queue()
asyncFuture = asyncio.Future()
async def gen():
try:
while True:
nonlocal asyncFuture
asyncFuture = asyncio.Future()
yieldFuture = asyncio.Future()
async def asyncCallback(sample):
asyncFuture.set_result(sample)
await yieldFuture
callbackQueue.put(Callback(asyncCallback))
sample = await asyncFuture
if not sample:
yieldFuture.set_result(None)
break
try:
yield sample
finally:
yieldFuture.set_result(None)
finally:
finish()
print('gstreamer finished')
def on_new_sample(sink, preroll):
nonlocal hasFinished
sample = sink.emit('pull-preroll' if preroll else 'pull-sample')
callback: Callback = callbackQueue.get()
if not callback.callback or hasFinished:
hasFinished = True
if callback.callback:
asyncio.run_coroutine_threadsafe(callback.callback(None), loop = callback.loop)
return Gst.FlowReturn.OK
future = asyncio.run_coroutine_threadsafe(callback.callback(sample), loop = callback.loop)
try:
future.result()
except:
pass
return Gst.FlowReturn.OK
appsink.connect('new-preroll', on_new_sample, True)
appsink.connect('new-sample', on_new_sample, False)
gst.set_state(Gst.State.PLAYING)
return gst, gen
def mainThread():
async def asyncMain():
gst, gen = createPipelineIterator('rtspsrc location=rtsp://localhost:59668/18cc179a814fd5b3 ! rtph264depay ! h264parse ! vtdec_hw ! videoconvert ! video/x-raw')
i = 0
async for sample in gen():
print('sample')
i = i + 1
if i == 10:
break
loop = asyncio.new_event_loop()
asyncio.ensure_future(asyncMain(), loop = loop)
loop.run_forever()
if __name__ == "__main__":
threading.Thread(target = mainThread).start()
mainLoop = GLib.MainLoop()
mainLoop.run()

View File

@@ -1,136 +1,87 @@
import concurrent.futures
import threading
import asyncio
from queue import Queue
from gst_generator import createPipelineIterator
from util import optional_chain
import scrypted_sdk
from typing import Any
from urllib.parse import urlparse
import pyvips
from vips import createVipsMediaObject, VipsImage
import platform
Gst = None
try:
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
from gi.repository import GLib, GObject, Gst
GObject.threads_init()
Gst.init(None)
from gi.repository import Gst
except:
pass
class Callback:
def __init__(self, callback) -> None:
if callback:
self.loop = asyncio.get_running_loop()
self.callback = callback
async def generateVideoFramesGstreamer(mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None, h264Decoder: str = None) -> scrypted_sdk.VideoFrame:
ffmpegInput: scrypted_sdk.FFmpegInput = await scrypted_sdk.mediaManager.convertMediaObjectToJSON(mediaObject, scrypted_sdk.ScryptedMimeTypes.FFmpegInput.value)
container = ffmpegInput.get('container', None)
videosrc = ffmpegInput.get('url')
videoCodec = optional_chain(ffmpegInput, 'mediaStreamOptions', 'video', 'codec')
if videosrc.startswith('tcp://'):
parsed_url = urlparse(videosrc)
videosrc = 'tcpclientsrc port=%s host=%s' % (
parsed_url.port, parsed_url.hostname)
if container == 'mpegts':
videosrc += ' ! tsdemux'
elif container == 'sdp':
videosrc += ' ! sdpdemux'
else:
self.loop = None
self.callback = None
raise Exception('unknown container %s' % container)
elif videosrc.startswith('rtsp'):
videosrc = 'rtspsrc buffer-mode=0 location=%s protocols=tcp latency=0 is-live=false' % videosrc
if videoCodec == 'h264':
videosrc += ' ! rtph264depay ! h264parse'
def createPipelineIterator(pipeline: str):
pipeline = '{pipeline} ! queue leaky=downstream max-size-buffers=0 ! appsink name=appsink emit-signals=true sync=false max-buffers=-1 drop=true'.format(pipeline=pipeline)
print(pipeline)
gst = Gst.parse_launch(pipeline)
bus = gst.get_bus()
videocaps = 'video/x-raw'
# if options and options.get('resize'):
# videocaps = 'videoscale ! video/x-raw,width={width},height={height}'.format(width=options['resize']['width'], height=options['resize']['height'])
def on_bus_message(bus, message):
t = str(message.type)
# print(t)
if t == str(Gst.MessageType.EOS):
finish()
elif t == str(Gst.MessageType.WARNING):
err, debug = message.parse_warning()
print('Warning: %s: %s\n' % (err, debug))
elif t == str(Gst.MessageType.ERROR):
err, debug = message.parse_error()
print('Error: %s: %s\n' % (err, debug))
finish()
format = options and options.get('format')
# I420 is a cheap way to get gray out of an h264 stream without color conversion.
if format == 'gray':
format = 'I420'
bands = 1
else:
format = 'RGB'
bands = 3
videocaps += ',format={format}'.format(format=format)
def stopGst():
bus.remove_signal_watch()
bus.disconnect(watchId)
gst.set_state(Gst.State.NULL)
decoder = 'decodebin'
if videoCodec == 'h264':
decoder = h264Decoder or 'Default'
if decoder == 'Default':
if platform.system() == 'Darwin':
decoder = 'vtdec_hw'
else:
decoder = 'decodebin'
def finish():
nonlocal hasFinished
hasFinished = True
callback = Callback(None)
callbackQueue.put(callback)
if not asyncFuture.done():
asyncFuture.set_result(None)
if not finished.done():
finished.set_result(None)
videosrc += ' ! {decoder} ! queue leaky=downstream max-size-buffers=0 ! videoconvert ! {videocaps}'.format(decoder=decoder, videocaps=videocaps)
watchId = bus.connect('message', on_bus_message)
bus.add_signal_watch()
gst, gen = createPipelineIterator(videosrc)
async for gstsample in gen():
caps = gstsample.get_caps()
height = caps.get_structure(0).get_value('height')
width = caps.get_structure(0).get_value('width')
gst_buffer = gstsample.get_buffer()
result, info = gst_buffer.map(Gst.MapFlags.READ)
if not result:
continue
finished = concurrent.futures.Future()
finished.add_done_callback(lambda _: threading.Thread(target=stopGst, name="StopGst").start())
hasFinished = False
appsink = gst.get_by_name('appsink')
callbackQueue = Queue()
asyncFuture = asyncio.Future()
async def gen():
try:
while True:
nonlocal asyncFuture
asyncFuture = asyncio.Future()
yieldFuture = asyncio.Future()
async def asyncCallback(sample):
asyncFuture.set_result(sample)
await yieldFuture
callbackQueue.put(Callback(asyncCallback))
sample = await asyncFuture
if not sample:
yieldFuture.set_result(None)
break
try:
yield sample
finally:
yieldFuture.set_result(None)
finally:
finish()
print('gstreamer finished')
def on_new_sample(sink, preroll):
nonlocal hasFinished
sample = sink.emit('pull-preroll' if preroll else 'pull-sample')
callback: Callback = callbackQueue.get()
if not callback.callback or hasFinished:
hasFinished = True
if callback.callback:
asyncio.run_coroutine_threadsafe(callback.callback(None), loop = callback.loop)
return Gst.FlowReturn.OK
future = asyncio.run_coroutine_threadsafe(callback.callback(sample), loop = callback.loop)
try:
future.result()
except:
pass
return Gst.FlowReturn.OK
appsink.connect('new-preroll', on_new_sample, True)
appsink.connect('new-sample', on_new_sample, False)
gst.set_state(Gst.State.PLAYING)
return gst, gen
def mainThread():
async def asyncMain():
gst, gen = createPipelineIterator('rtspsrc location=rtsp://localhost:59668/18cc179a814fd5b3 ! rtph264depay ! h264parse ! vtdec_hw ! videoconvert ! video/x-raw')
i = 0
async for sample in gen():
print('sample')
i = i + 1
if i == 10:
break
loop = asyncio.new_event_loop()
asyncio.ensure_future(asyncMain(), loop = loop)
loop.run_forever()
if __name__ == "__main__":
threading.Thread(target = mainThread).start()
mainLoop = GLib.MainLoop()
mainLoop.run()
vips = pyvips.Image.new_from_memory(info.data, width, height, bands, pyvips.BandFormat.UCHAR)
vipsImage = VipsImage(vips)
try:
mo = await createVipsMediaObject(VipsImage(vips))
yield mo
finally:
vipsImage.vipsImage.invalidate()
vipsImage.vipsImage = None
finally:
gst_buffer.unmap(info)

View File

@@ -0,0 +1,51 @@
import time
from gst_generator import createPipelineIterator
import scrypted_sdk
from typing import Any
import pyvips
from vips import createVipsMediaObject, VipsImage
av = None
try:
import av
av.logging.set_level(av.logging.PANIC)
except:
pass
async def generateVideoFramesLibav(mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
ffmpegInput: scrypted_sdk.FFmpegInput = await scrypted_sdk.mediaManager.convertMediaObjectToJSON(mediaObject, scrypted_sdk.ScryptedMimeTypes.FFmpegInput.value)
videosrc = ffmpegInput.get('url')
container = av.open(videosrc)
# none of this stuff seems to work. might be libav being slow with rtsp.
# container.no_buffer = True
# container.gen_pts = False
# container.options['-analyzeduration'] = '0'
# container.options['-probesize'] = '500000'
stream = container.streams.video[0]
# stream.codec_context.thread_count = 1
# stream.codec_context.low_delay = True
# stream.codec_context.options['-analyzeduration'] = '0'
# stream.codec_context.options['-probesize'] = '500000'
start = 0
try:
for idx, frame in enumerate(container.decode(stream)):
now = time.time()
if not start:
start = now
elapsed = now - start
if (frame.time or 0) < elapsed - 0.500:
# print('too slow, skipping frame')
continue
# print(frame)
vips = pyvips.Image.new_from_array(frame.to_ndarray(format='rgb24'))
vipsImage = VipsImage(vips)
try:
mo = await createVipsMediaObject(VipsImage(vips))
yield mo
finally:
vipsImage.vipsImage.invalidate()
vipsImage.vipsImage = None
finally:
container.close()

View File

@@ -1,19 +1,12 @@
import time
from gstreamer import createPipelineIterator
import asyncio
from util import optional_chain
import scrypted_sdk
from scrypted_sdk import Setting, SettingValue
from typing import Any
from urllib.parse import urlparse
import pyvips
import concurrent.futures
import gstreamer
import libav
Gst = None
try:
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
from gi.repository import Gst
except:
pass
@@ -21,101 +14,42 @@ except:
av = None
try:
import av
av.logging.set_level(av.logging.PANIC)
except:
pass
# vips is already multithreaded, but needs to be kicked off the python asyncio thread.
vipsExecutor = concurrent.futures.ThreadPoolExecutor(max_workers=2, thread_name_prefix="vips")
async def to_thread(f):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(vipsExecutor, f)
class VipsImage(scrypted_sdk.VideoFrame):
def __init__(self, vipsImage: pyvips.Image) -> None:
super().__init__()
self.vipsImage = vipsImage
self.width = vipsImage.width
self.height = vipsImage.height
async def toBuffer(self, options: scrypted_sdk.ImageOptions = None) -> bytearray:
vipsImage: VipsImage = await self.toVipsImage(options)
if not options or not options.get('format', None):
def format():
return memoryview(vipsImage.vipsImage.write_to_memory())
return await to_thread(format)
elif options['format'] == 'rgb':
def format():
if vipsImage.vipsImage.hasalpha():
rgb = vipsImage.vipsImage.extract_band(0, vipsImage.vipsImage.bands - 1)
else:
rgb = vipsImage.vipsImage
mem = memoryview(rgb.write_to_memory())
return mem
return await to_thread(format)
return await to_thread(lambda: vipsImage.vipsImage.write_to_buffer('.' + options['format']))
async def toVipsImage(self, options: scrypted_sdk.ImageOptions = None):
return await to_thread(lambda: toVipsImage(self, options))
async def toImage(self, options: scrypted_sdk.ImageOptions = None) -> Any:
if options and options.get('format', None):
raise Exception('format can only be used with toBuffer')
newVipsImage = await self.toVipsImage(options)
return await createVipsMediaObject(newVipsImage)
def toVipsImage(vipsImageWrapper: VipsImage, options: scrypted_sdk.ImageOptions = None) -> VipsImage:
vipsImage = vipsImageWrapper.vipsImage
if not vipsImage:
raise Exception('Video Frame has been invalidated')
options = options or {}
crop = options.get('crop')
if crop:
vipsImage = vipsImage.crop(int(crop['left']), int(crop['top']), int(crop['width']), int(crop['height']))
resize = options.get('resize')
if resize:
xscale = None
if resize.get('width'):
xscale = resize['width'] / vipsImage.width
scale = xscale
yscale = None
if resize.get('height'):
yscale = resize['height'] / vipsImage.height
scale = yscale
if xscale and yscale:
scale = min(yscale, xscale)
xscale = xscale or yscale
yscale = yscale or xscale
vipsImage = vipsImage.resize(xscale, vscale=yscale, kernel='linear')
return VipsImage(vipsImage)
async def createVipsMediaObject(image: VipsImage):
ret = await scrypted_sdk.mediaManager.createMediaObject(image, scrypted_sdk.ScryptedMimeTypes.Image.value, {
'width': image.width,
'height': image.height,
'toBuffer': lambda options = None: image.toBuffer(options),
'toImage': lambda options = None: image.toImage(options),
})
return ret
class LibavGenerator(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.VideoFrameGenerator):
async def generateVideoFrames(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
worker = scrypted_sdk.fork()
forked: CodecFork = await worker.result
return await forked.generateVideoFramesLibav(mediaObject, options, filter)
class GstreamerGenerator(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.VideoFrameGenerator):
class GstreamerGenerator(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.VideoFrameGenerator, scrypted_sdk.Settings):
async def generateVideoFrames(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
worker = scrypted_sdk.fork()
forked: CodecFork = await worker.result
return await forked.generateVideoFramesGstreamer(mediaObject, options, filter)
return await forked.generateVideoFramesGstreamer(mediaObject, options, filter, self.storage.getItem('h264Decoder'))
async def getSettings(self) -> list[Setting]:
return [
{
'key': 'h264Decoder',
'title': 'H264 Decoder',
'description': 'The Gstreamer pipeline to use to decode H264 video.',
'value': self.storage.getItem('h264Decoder') or 'Default',
'choices': [
'Default',
'decodebin',
'vtdec_hw',
'nvh264dec',
'vaapih264dec',
],
'combobox': True,
}
]
async def putSetting(self, key: str, value: SettingValue) -> None:
self.storage.setItem(key, value)
await scrypted_sdk.deviceManager.onDeviceEvent(self.nativeId, scrypted_sdk.ScryptedInterface.Settings.value, None)
class PythonCodecs(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.DeviceProvider):
def __init__(self, nativeId = None):
@@ -133,6 +67,7 @@ class PythonCodecs(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.DeviceProvider)
'nativeId': 'gstreamer',
'interfaces': [
scrypted_sdk.ScryptedInterface.VideoFrameGenerator.value,
scrypted_sdk.ScryptedInterface.Settings.value,
],
'type': scrypted_sdk.ScryptedDeviceType.API.value,
}
@@ -160,108 +95,10 @@ class PythonCodecs(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.DeviceProvider)
def create_scrypted_plugin():
return PythonCodecs()
async def generateVideoFramesLibav(mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
ffmpegInput: scrypted_sdk.FFmpegInput = await scrypted_sdk.mediaManager.convertMediaObjectToJSON(mediaObject, scrypted_sdk.ScryptedMimeTypes.FFmpegInput.value)
videosrc = ffmpegInput.get('url')
container = av.open(videosrc, options = options)
# none of this stuff seems to work. might be libav being slow with rtsp.
# container.no_buffer = True
# container.options['-analyzeduration'] = '0'
# container.options['-probesize'] = '500000'
stream = container.streams.video[0]
# stream.codec_context.thread_count = 1
# stream.codec_context.low_delay = True
# stream.codec_context.options['-analyzeduration'] = '0'
# stream.codec_context.options['-probesize'] = '500000'
start = 0
try:
for idx, frame in enumerate(container.decode(stream)):
now = time.time()
if not start:
start = now
elapsed = now - start
if (frame.time or 0) < elapsed - 0.500:
# print('too slow, skipping frame')
continue
# print(frame)
vips = pyvips.Image.new_from_array(frame.to_ndarray(format='rgb24'))
vipsImage = VipsImage(vips)
try:
mo = await createVipsMediaObject(VipsImage(vips))
yield mo
finally:
vipsImage.vipsImage.invalidate()
vipsImage.vipsImage = None
finally:
container.close()
async def generateVideoFramesGstreamer(mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
ffmpegInput: scrypted_sdk.FFmpegInput = await scrypted_sdk.mediaManager.convertMediaObjectToJSON(mediaObject, scrypted_sdk.ScryptedMimeTypes.FFmpegInput.value)
container = ffmpegInput.get('container', None)
videosrc = ffmpegInput.get('url')
videoCodec = optional_chain(ffmpegInput, 'mediaStreamOptions', 'video', 'codec')
if videosrc.startswith('tcp://'):
parsed_url = urlparse(videosrc)
videosrc = 'tcpclientsrc port=%s host=%s' % (
parsed_url.port, parsed_url.hostname)
if container == 'mpegts':
videosrc += ' ! tsdemux'
elif container == 'sdp':
videosrc += ' ! sdpdemux'
else:
raise Exception('unknown container %s' % container)
elif videosrc.startswith('rtsp'):
videosrc = 'rtspsrc buffer-mode=0 location=%s protocols=tcp latency=0 is-live=false' % videosrc
if videoCodec == 'h264':
videosrc += ' ! rtph264depay ! h264parse'
videocaps = 'video/x-raw'
# if options and options.get('resize'):
# videocaps = 'videoscale ! video/x-raw,width={width},height={height}'.format(width=options['resize']['width'], height=options['resize']['height'])
format = options and options.get('format')
# I420 is a cheap way to get gray out of an h264 stream without color conversion.
if format == 'gray':
format = 'I420'
bands = 1
else:
format = 'RGB'
bands = 3
videocaps += ',format={format}'.format(format=format)
videosrc += ' ! decodebin ! queue leaky=downstream max-size-buffers=0 ! videoconvert ! ' + videocaps
gst, gen = createPipelineIterator(videosrc)
async for gstsample in gen():
caps = gstsample.get_caps()
height = caps.get_structure(0).get_value('height')
width = caps.get_structure(0).get_value('width')
gst_buffer = gstsample.get_buffer()
result, info = gst_buffer.map(Gst.MapFlags.READ)
if not result:
continue
try:
vips = pyvips.Image.new_from_memory(info.data, width, height, bands, pyvips.BandFormat.UCHAR)
vipsImage = VipsImage(vips)
try:
mo = await createVipsMediaObject(VipsImage(vips))
yield mo
finally:
vipsImage.vipsImage.invalidate()
vipsImage.vipsImage = None
finally:
gst_buffer.unmap(info)
class CodecFork:
async def generateVideoFramesGstreamer(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
async def generateVideoFramesGstreamer(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None, h264Decoder: str = None) -> scrypted_sdk.VideoFrame:
try:
async for data in generateVideoFramesGstreamer(mediaObject, options, filter):
async for data in gstreamer.generateVideoFramesGstreamer(mediaObject, options, filter, h264Decoder):
yield data
finally:
import os
@@ -270,7 +107,7 @@ class CodecFork:
async def generateVideoFramesLibav(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
try:
async for data in generateVideoFramesLibav(mediaObject, options, filter):
async for data in libav.generateVideoFramesLibav(mediaObject, options, filter):
yield data
finally:
import os

View File

@@ -0,0 +1,89 @@
import time
from gst_generator import createPipelineIterator
import asyncio
from util import optional_chain
import scrypted_sdk
from typing import Any
from urllib.parse import urlparse
import pyvips
import concurrent.futures
# vips is already multithreaded, but needs to be kicked off the python asyncio thread.
vipsExecutor = concurrent.futures.ThreadPoolExecutor(max_workers=2, thread_name_prefix="vips")
async def to_thread(f):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(vipsExecutor, f)
class VipsImage(scrypted_sdk.VideoFrame):
def __init__(self, vipsImage: pyvips.Image) -> None:
super().__init__()
self.vipsImage = vipsImage
self.width = vipsImage.width
self.height = vipsImage.height
async def toBuffer(self, options: scrypted_sdk.ImageOptions = None) -> bytearray:
vipsImage: VipsImage = await self.toVipsImage(options)
if not options or not options.get('format', None):
def format():
return memoryview(vipsImage.vipsImage.write_to_memory())
return await to_thread(format)
elif options['format'] == 'rgb':
def format():
if vipsImage.vipsImage.hasalpha():
rgb = vipsImage.vipsImage.extract_band(0, vipsImage.vipsImage.bands - 1)
else:
rgb = vipsImage.vipsImage
mem = memoryview(rgb.write_to_memory())
return mem
return await to_thread(format)
return await to_thread(lambda: vipsImage.vipsImage.write_to_buffer('.' + options['format']))
async def toVipsImage(self, options: scrypted_sdk.ImageOptions = None):
return await to_thread(lambda: toVipsImage(self, options))
async def toImage(self, options: scrypted_sdk.ImageOptions = None) -> Any:
if options and options.get('format', None):
raise Exception('format can only be used with toBuffer')
newVipsImage = await self.toVipsImage(options)
return await createVipsMediaObject(newVipsImage)
def toVipsImage(vipsImageWrapper: VipsImage, options: scrypted_sdk.ImageOptions = None) -> VipsImage:
vipsImage = vipsImageWrapper.vipsImage
if not vipsImage:
raise Exception('Video Frame has been invalidated')
options = options or {}
crop = options.get('crop')
if crop:
vipsImage = vipsImage.crop(int(crop['left']), int(crop['top']), int(crop['width']), int(crop['height']))
resize = options.get('resize')
if resize:
xscale = None
if resize.get('width'):
xscale = resize['width'] / vipsImage.width
scale = xscale
yscale = None
if resize.get('height'):
yscale = resize['height'] / vipsImage.height
scale = yscale
if xscale and yscale:
scale = min(yscale, xscale)
xscale = xscale or yscale
yscale = yscale or xscale
vipsImage = vipsImage.resize(xscale, vscale=yscale, kernel='linear')
return VipsImage(vipsImage)
async def createVipsMediaObject(image: VipsImage):
ret = await scrypted_sdk.mediaManager.createMediaObject(image, scrypted_sdk.ScryptedMimeTypes.Image.value, {
'width': image.width,
'height': image.height,
'toBuffer': lambda options = None: image.toBuffer(options),
'toImage': lambda options = None: image.toImage(options),
})
return ret

View File

@@ -1,19 +0,0 @@
#!/bin/sh
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
mkdir -p all_models
wget https://dl.google.com/coral/canned_models/all_models.tar.gz
tar -C all_models -xvzf all_models.tar.gz
rm -f all_models.tar.gz

View File

@@ -1 +0,0 @@
../all_models/coco_labels.txt

View File

@@ -1 +0,0 @@
../all_models/mobilenet_ssd_v2_coco_quant_postprocess.tflite

View File

@@ -1 +0,0 @@
../all_models/mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/tensorflow-lite",
"version": "0.0.113",
"version": "0.1.2",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/tensorflow-lite",
"version": "0.0.113",
"version": "0.1.2",
"devDependencies": {
"@scrypted/sdk": "file:../../sdk"
}

View File

@@ -44,5 +44,5 @@
"devDependencies": {
"@scrypted/sdk": "file:../../sdk"
},
"version": "0.0.113"
"version": "0.1.2"
}

View File

@@ -302,11 +302,11 @@ class DetectPlugin(scrypted_sdk.ScryptedDeviceBase, ObjectDetection):
'detected': detected,
'videoFrame': videoFrame,
}
await self.detection_event_notified(detection_session.settings)
except:
raise
finally:
await videoFrames.aclose()
try:
await videoFrames.aclose()
except:
pass
async def detectObjects(self, mediaObject: MediaObject, session: ObjectDetectionSession = None, callbacks: ObjectDetectionCallbacks = None) -> ObjectsDetected:
is_image = mediaObject and (mediaObject.mimeType.startswith('image/') or mediaObject.mimeType.endswith('/x-raw-image'))

View File

@@ -8,6 +8,8 @@ from typing import Any, List, Tuple, Mapping
import asyncio
import time
from .rectangle import Rectangle, intersect_area, intersect_rect, to_bounding_box, from_bounding_box, combine_rect
import urllib.request
import os
from detect import DetectionSession, DetectPlugin
@@ -126,6 +128,17 @@ class PredictPlugin(DetectPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Set
loop = asyncio.get_event_loop()
loop.call_later(4 * 60 * 60, lambda: self.requestRestart())
def downloadFile(self, url: str, filename: str):
filesPath = os.path.join(os.environ['SCRYPTED_PLUGIN_VOLUME'], 'files')
fullpath = os.path.join(filesPath, filename)
if os.path.isfile(fullpath):
return fullpath
os.makedirs(filesPath, exist_ok=True)
tmp = fullpath + '.tmp'
urllib.request.urlretrieve(url, tmp)
os.rename(tmp, fullpath)
return fullpath
def getClasses(self) -> list[str]:
return list(self.labels.values())
@@ -273,7 +286,7 @@ class PredictPlugin(DetectPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Set
pass
async def run_detection_videoframe(self, videoFrame: scrypted_sdk.VideoFrame, detection_session: PredictSession) -> ObjectsDetected:
settings = detection_session.settings
settings = detection_session and detection_session.settings
src_size = videoFrame.width, videoFrame.height
w, h = self.get_input_size()
iw, ih = src_size

View File

@@ -40,8 +40,11 @@ class TensorFlowLitePlugin(PredictPlugin, scrypted_sdk.BufferConverter, scrypted
def __init__(self, nativeId: str | None = None):
super().__init__(MIME_TYPE, nativeId=nativeId)
labels_contents = scrypted_sdk.zip.open(
'fs/coco_labels.txt').read().decode('utf8')
tfliteFile = self.downloadFile('https://raw.githubusercontent.com/google-coral/test_data/master/ssd_mobilenet_v2_coco_quant_postprocess.tflite', 'ssd_mobilenet_v2_coco_quant_postprocess.tflite')
edgetpuFile = self.downloadFile('https://raw.githubusercontent.com/google-coral/test_data/master/ssd_mobilenet_v2_coco_quant_postprocess_edgetpu.tflite', 'ssd_mobilenet_v2_coco_quant_postprocess_edgetpu.tflite')
labelsFile = self.downloadFile('https://raw.githubusercontent.com/google-coral/test_data/master/coco_labels.txt', 'coco_labels.txt')
labels_contents = open(labelsFile, 'r').read()
self.labels = parse_label_contents(labels_contents)
self.interpreters = queue.Queue()
self.interpreter_count = 0
@@ -54,13 +57,11 @@ class TensorFlowLitePlugin(PredictPlugin, scrypted_sdk.BufferConverter, scrypted
self.edge_tpu_found = str(edge_tpus)
# todo co-compile
# https://coral.ai/docs/edgetpu/compiler/#co-compiling-multiple-models
model = scrypted_sdk.zip.open(
'fs/mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite').read()
# face_model = scrypted_sdk.zip.open(
# 'fs/mobilenet_ssd_v2_face_quant_postprocess.tflite').read()
for idx, edge_tpu in enumerate(edge_tpus):
try:
interpreter = make_interpreter(model, ":%s" % idx)
interpreter = make_interpreter(edgetpuFile, ":%s" % idx)
interpreter.allocate_tensors()
_, height, width, channels = interpreter.get_input_details()[
0]['shape']
@@ -77,11 +78,9 @@ class TensorFlowLitePlugin(PredictPlugin, scrypted_sdk.BufferConverter, scrypted
except Exception as e:
print('unable to use Coral Edge TPU', e)
self.edge_tpu_found = 'Edge TPU not found'
model = scrypted_sdk.zip.open(
'fs/mobilenet_ssd_v2_coco_quant_postprocess.tflite').read()
# face_model = scrypted_sdk.zip.open(
# 'fs/mobilenet_ssd_v2_face_quant_postprocess.tflite').read()
interpreter = tflite.Interpreter(model_content=model)
interpreter = tflite.Interpreter(model_path=tfliteFile)
interpreter.allocate_tensors()
_, height, width, channels = interpreter.get_input_details()[
0]['shape']

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/server",
"version": "0.7.10",
"version": "0.7.12",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/server",
"version": "0.7.10",
"version": "0.7.12",
"license": "ISC",
"dependencies": {
"@mapbox/node-pre-gyp": "^1.0.10",

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/server",
"version": "0.7.11",
"version": "0.7.13",
"description": "",
"dependencies": {
"@mapbox/node-pre-gyp": "^1.0.10",

View File

@@ -44,7 +44,7 @@ class StreamPipeReader:
def readBlocking(self, n):
b = bytes(0)
while len(b) < n:
self.conn.poll()
self.conn.poll(None)
add = os.read(self.conn.fileno(), n - len(b))
if not len(add):
raise Exception('unable to read requested bytes')
@@ -488,6 +488,11 @@ class PluginRemote:
reader = StreamPipeReader(parent_conn)
forkPeer, readLoop = await rpc_reader.prepare_peer_readloop(self.loop, reader = reader, writeFd = parent_conn.fileno())
forkPeer.peerName = 'thread'
async def updateStats(stats):
allMemoryStats[forkPeer] = stats
forkPeer.params['updateStats'] = updateStats
async def forkReadLoop():
try:
await readLoop()
@@ -495,6 +500,7 @@ class PluginRemote:
# traceback.print_exc()
print('fork read loop exited')
finally:
allMemoryStats.pop(forkPeer)
parent_conn.close()
reader.executor.shutdown()
asyncio.run_coroutine_threadsafe(forkReadLoop(), loop=self.loop)
@@ -585,6 +591,9 @@ class PluginRemote:
async def getServicePort(self, name):
pass
allMemoryStats = {}
async def plugin_async_main(loop: AbstractEventLoop, readFd: int = None, writeFd: int = None, reader: asyncio.StreamReader = None, writer: asyncio.StreamWriter = None):
peer, readLoop = await rpc_reader.prepare_peer_readloop(loop, readFd=readFd, writeFd=writeFd, reader=reader, writer=writer)
peer.params['print'] = print
@@ -593,6 +602,7 @@ async def plugin_async_main(loop: AbstractEventLoop, readFd: int = None, writeFd
async def get_update_stats():
update_stats = await peer.getParam('updateStats')
if not update_stats:
print('host did not provide update_stats')
return
def stats_runner():
@@ -608,8 +618,12 @@ async def plugin_async_main(loop: AbstractEventLoop, readFd: int = None, writeFd
resource.RUSAGE_SELF).ru_maxrss
except:
heapTotal = 0
for _, stats in allMemoryStats.items():
ptime += stats['cpu']['user']
heapTotal += stats['memoryUsage']['heapTotal']
stats = {
'type': 'stats',
'cpu': {
'user': ptime,
'system': 0,