mirror of
https://github.com/koush/scrypted.git
synced 2026-02-08 00:12:13 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fae66619fb | ||
|
|
d979b9ec0c | ||
|
|
975319a65d | ||
|
|
7b5aa4ba2d | ||
|
|
670739c82b | ||
|
|
8511bd15a8 |
4
plugins/objectdetector/package-lock.json
generated
4
plugins/objectdetector/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@scrypted/objectdetector",
|
||||
"version": "0.0.107",
|
||||
"version": "0.0.108",
|
||||
"lockfileVersion": 2,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@scrypted/objectdetector",
|
||||
"version": "0.0.107",
|
||||
"version": "0.0.108",
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"@scrypted/common": "file:../../common",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@scrypted/objectdetector",
|
||||
"version": "0.0.107",
|
||||
"version": "0.0.108",
|
||||
"description": "Scrypted Video Analysis Plugin. Installed alongside a detection service like OpenCV or TensorFlow.",
|
||||
"author": "Scrypted",
|
||||
"license": "Apache-2.0",
|
||||
|
||||
@@ -597,6 +597,9 @@ class ObjectDetectionMixin extends SettingsMixinDeviceBase<VideoCamera & Camera
|
||||
// this.console.log('image saved', detected.detected.detections);
|
||||
}
|
||||
this.reportObjectDetections(detected.detected);
|
||||
if (this.hasMotionType) {
|
||||
await sleep(250);
|
||||
}
|
||||
// this.handleDetectionEvent(detected.detected);
|
||||
}
|
||||
}
|
||||
@@ -681,6 +684,19 @@ class ObjectDetectionMixin extends SettingsMixinDeviceBase<VideoCamera & Camera
|
||||
}
|
||||
}
|
||||
|
||||
normalizeBox(boundingBox: [number, number, number, number], inputDimensions: [number, number]) {
|
||||
let [x, y, width, height] = boundingBox;
|
||||
let x2 = x + width;
|
||||
let y2 = y + height;
|
||||
// the zones are point paths in percentage format
|
||||
x = x * 100 / inputDimensions[0];
|
||||
y = y * 100 / inputDimensions[1];
|
||||
x2 = x2 * 100 / inputDimensions[0];
|
||||
y2 = y2 * 100 / inputDimensions[1];
|
||||
const box = [[x, y], [x2, y], [x2, y2], [x, y2]];
|
||||
return box;
|
||||
}
|
||||
|
||||
getDetectionDuration() {
|
||||
// when motion type, the detection interval is a keepalive reset.
|
||||
// the duration needs to simply be an arbitrarily longer time.
|
||||
@@ -697,15 +713,7 @@ class ObjectDetectionMixin extends SettingsMixinDeviceBase<VideoCamera & Camera
|
||||
continue;
|
||||
|
||||
o.zones = []
|
||||
let [x, y, width, height] = o.boundingBox;
|
||||
let x2 = x + width;
|
||||
let y2 = y + height;
|
||||
// the zones are point paths in percentage format
|
||||
x = x * 100 / detection.inputDimensions[0];
|
||||
y = y * 100 / detection.inputDimensions[1];
|
||||
x2 = x2 * 100 / detection.inputDimensions[0];
|
||||
y2 = y2 * 100 / detection.inputDimensions[1];
|
||||
const box = [[x, y], [x2, y], [x2, y2], [x, y2]];
|
||||
const box = this.normalizeBox(o.boundingBox, detection.inputDimensions);
|
||||
|
||||
let included: boolean;
|
||||
for (const [zone, zoneValue] of Object.entries(this.zones)) {
|
||||
@@ -745,6 +753,14 @@ class ObjectDetectionMixin extends SettingsMixinDeviceBase<VideoCamera & Camera
|
||||
}
|
||||
}
|
||||
|
||||
// if this is a motion sensor and there are no inclusion zones set up,
|
||||
// use a default inclusion zone that crops the top and bottom to
|
||||
// prevents errant motion from the on screen time changing every second.
|
||||
if (this.hasMotionType && included === undefined) {
|
||||
const defaultInclusionZone = [[0, 10], [100, 10], [100, 90], [0, 90]];
|
||||
included = polygonOverlap(box, defaultInclusionZone);
|
||||
}
|
||||
|
||||
// if there are inclusion zones and this object
|
||||
// was not in any of them, filter it out.
|
||||
if (included === false)
|
||||
|
||||
4
plugins/python-codecs/package-lock.json
generated
4
plugins/python-codecs/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@scrypted/python-codecs",
|
||||
"version": "0.1.6",
|
||||
"version": "0.1.8",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@scrypted/python-codecs",
|
||||
"version": "0.1.6",
|
||||
"version": "0.1.8",
|
||||
"devDependencies": {
|
||||
"@scrypted/sdk": "file:../../sdk"
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@scrypted/python-codecs",
|
||||
"version": "0.1.6",
|
||||
"version": "0.1.8",
|
||||
"description": "Python Codecs for Scrypted",
|
||||
"keywords": [
|
||||
"scrypted",
|
||||
|
||||
136
plugins/python-codecs/src/gst_generator.py
Normal file
136
plugins/python-codecs/src/gst_generator.py
Normal file
@@ -0,0 +1,136 @@
|
||||
import concurrent.futures
|
||||
import threading
|
||||
import asyncio
|
||||
from queue import Queue
|
||||
|
||||
try:
|
||||
import gi
|
||||
gi.require_version('Gst', '1.0')
|
||||
gi.require_version('GstBase', '1.0')
|
||||
|
||||
from gi.repository import GLib, GObject, Gst
|
||||
GObject.threads_init()
|
||||
Gst.init(None)
|
||||
except:
|
||||
pass
|
||||
|
||||
class Callback:
|
||||
def __init__(self, callback) -> None:
|
||||
if callback:
|
||||
self.loop = asyncio.get_running_loop()
|
||||
self.callback = callback
|
||||
else:
|
||||
self.loop = None
|
||||
self.callback = None
|
||||
|
||||
def createPipelineIterator(pipeline: str):
|
||||
pipeline = '{pipeline} ! queue leaky=downstream max-size-buffers=0 ! appsink name=appsink emit-signals=true sync=false max-buffers=-1 drop=true'.format(pipeline=pipeline)
|
||||
print(pipeline)
|
||||
gst = Gst.parse_launch(pipeline)
|
||||
bus = gst.get_bus()
|
||||
|
||||
def on_bus_message(bus, message):
|
||||
t = str(message.type)
|
||||
# print(t)
|
||||
if t == str(Gst.MessageType.EOS):
|
||||
finish()
|
||||
elif t == str(Gst.MessageType.WARNING):
|
||||
err, debug = message.parse_warning()
|
||||
print('Warning: %s: %s\n' % (err, debug))
|
||||
elif t == str(Gst.MessageType.ERROR):
|
||||
err, debug = message.parse_error()
|
||||
print('Error: %s: %s\n' % (err, debug))
|
||||
finish()
|
||||
|
||||
def stopGst():
|
||||
bus.remove_signal_watch()
|
||||
bus.disconnect(watchId)
|
||||
gst.set_state(Gst.State.NULL)
|
||||
|
||||
def finish():
|
||||
nonlocal hasFinished
|
||||
hasFinished = True
|
||||
callback = Callback(None)
|
||||
callbackQueue.put(callback)
|
||||
if not asyncFuture.done():
|
||||
asyncFuture.set_result(None)
|
||||
if not finished.done():
|
||||
finished.set_result(None)
|
||||
|
||||
watchId = bus.connect('message', on_bus_message)
|
||||
bus.add_signal_watch()
|
||||
|
||||
finished = concurrent.futures.Future()
|
||||
finished.add_done_callback(lambda _: threading.Thread(target=stopGst, name="StopGst").start())
|
||||
hasFinished = False
|
||||
|
||||
appsink = gst.get_by_name('appsink')
|
||||
callbackQueue = Queue()
|
||||
asyncFuture = asyncio.Future()
|
||||
|
||||
async def gen():
|
||||
try:
|
||||
while True:
|
||||
nonlocal asyncFuture
|
||||
asyncFuture = asyncio.Future()
|
||||
yieldFuture = asyncio.Future()
|
||||
async def asyncCallback(sample):
|
||||
asyncFuture.set_result(sample)
|
||||
await yieldFuture
|
||||
callbackQueue.put(Callback(asyncCallback))
|
||||
sample = await asyncFuture
|
||||
if not sample:
|
||||
yieldFuture.set_result(None)
|
||||
break
|
||||
try:
|
||||
yield sample
|
||||
finally:
|
||||
yieldFuture.set_result(None)
|
||||
finally:
|
||||
finish()
|
||||
print('gstreamer finished')
|
||||
|
||||
|
||||
def on_new_sample(sink, preroll):
|
||||
nonlocal hasFinished
|
||||
|
||||
sample = sink.emit('pull-preroll' if preroll else 'pull-sample')
|
||||
|
||||
callback: Callback = callbackQueue.get()
|
||||
if not callback.callback or hasFinished:
|
||||
hasFinished = True
|
||||
if callback.callback:
|
||||
asyncio.run_coroutine_threadsafe(callback.callback(None), loop = callback.loop)
|
||||
return Gst.FlowReturn.OK
|
||||
|
||||
future = asyncio.run_coroutine_threadsafe(callback.callback(sample), loop = callback.loop)
|
||||
try:
|
||||
future.result()
|
||||
except:
|
||||
pass
|
||||
return Gst.FlowReturn.OK
|
||||
|
||||
appsink.connect('new-preroll', on_new_sample, True)
|
||||
appsink.connect('new-sample', on_new_sample, False)
|
||||
|
||||
gst.set_state(Gst.State.PLAYING)
|
||||
return gst, gen
|
||||
|
||||
def mainThread():
|
||||
async def asyncMain():
|
||||
gst, gen = createPipelineIterator('rtspsrc location=rtsp://localhost:59668/18cc179a814fd5b3 ! rtph264depay ! h264parse ! vtdec_hw ! videoconvert ! video/x-raw')
|
||||
i = 0
|
||||
async for sample in gen():
|
||||
print('sample')
|
||||
i = i + 1
|
||||
if i == 10:
|
||||
break
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.ensure_future(asyncMain(), loop = loop)
|
||||
loop.run_forever()
|
||||
|
||||
if __name__ == "__main__":
|
||||
threading.Thread(target = mainThread).start()
|
||||
mainLoop = GLib.MainLoop()
|
||||
mainLoop.run()
|
||||
@@ -1,136 +1,87 @@
|
||||
import concurrent.futures
|
||||
import threading
|
||||
import asyncio
|
||||
from queue import Queue
|
||||
from gst_generator import createPipelineIterator
|
||||
from util import optional_chain
|
||||
import scrypted_sdk
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
import pyvips
|
||||
from vips import createVipsMediaObject, VipsImage
|
||||
import platform
|
||||
|
||||
Gst = None
|
||||
try:
|
||||
import gi
|
||||
gi.require_version('Gst', '1.0')
|
||||
gi.require_version('GstBase', '1.0')
|
||||
|
||||
from gi.repository import GLib, GObject, Gst
|
||||
GObject.threads_init()
|
||||
Gst.init(None)
|
||||
from gi.repository import Gst
|
||||
except:
|
||||
pass
|
||||
|
||||
class Callback:
|
||||
def __init__(self, callback) -> None:
|
||||
if callback:
|
||||
self.loop = asyncio.get_running_loop()
|
||||
self.callback = callback
|
||||
async def generateVideoFramesGstreamer(mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None, h264Decoder: str = None) -> scrypted_sdk.VideoFrame:
|
||||
ffmpegInput: scrypted_sdk.FFmpegInput = await scrypted_sdk.mediaManager.convertMediaObjectToJSON(mediaObject, scrypted_sdk.ScryptedMimeTypes.FFmpegInput.value)
|
||||
container = ffmpegInput.get('container', None)
|
||||
videosrc = ffmpegInput.get('url')
|
||||
videoCodec = optional_chain(ffmpegInput, 'mediaStreamOptions', 'video', 'codec')
|
||||
|
||||
if videosrc.startswith('tcp://'):
|
||||
parsed_url = urlparse(videosrc)
|
||||
videosrc = 'tcpclientsrc port=%s host=%s' % (
|
||||
parsed_url.port, parsed_url.hostname)
|
||||
if container == 'mpegts':
|
||||
videosrc += ' ! tsdemux'
|
||||
elif container == 'sdp':
|
||||
videosrc += ' ! sdpdemux'
|
||||
else:
|
||||
self.loop = None
|
||||
self.callback = None
|
||||
raise Exception('unknown container %s' % container)
|
||||
elif videosrc.startswith('rtsp'):
|
||||
videosrc = 'rtspsrc buffer-mode=0 location=%s protocols=tcp latency=0 is-live=false' % videosrc
|
||||
if videoCodec == 'h264':
|
||||
videosrc += ' ! rtph264depay ! h264parse'
|
||||
|
||||
def createPipelineIterator(pipeline: str):
|
||||
pipeline = '{pipeline} ! queue leaky=downstream max-size-buffers=0 ! appsink name=appsink emit-signals=true sync=false max-buffers=-1 drop=true'.format(pipeline=pipeline)
|
||||
print(pipeline)
|
||||
gst = Gst.parse_launch(pipeline)
|
||||
bus = gst.get_bus()
|
||||
videocaps = 'video/x-raw'
|
||||
# if options and options.get('resize'):
|
||||
# videocaps = 'videoscale ! video/x-raw,width={width},height={height}'.format(width=options['resize']['width'], height=options['resize']['height'])
|
||||
|
||||
def on_bus_message(bus, message):
|
||||
t = str(message.type)
|
||||
# print(t)
|
||||
if t == str(Gst.MessageType.EOS):
|
||||
finish()
|
||||
elif t == str(Gst.MessageType.WARNING):
|
||||
err, debug = message.parse_warning()
|
||||
print('Warning: %s: %s\n' % (err, debug))
|
||||
elif t == str(Gst.MessageType.ERROR):
|
||||
err, debug = message.parse_error()
|
||||
print('Error: %s: %s\n' % (err, debug))
|
||||
finish()
|
||||
format = options and options.get('format')
|
||||
# I420 is a cheap way to get gray out of an h264 stream without color conversion.
|
||||
if format == 'gray':
|
||||
format = 'I420'
|
||||
bands = 1
|
||||
else:
|
||||
format = 'RGB'
|
||||
bands = 3
|
||||
|
||||
videocaps += ',format={format}'.format(format=format)
|
||||
|
||||
def stopGst():
|
||||
bus.remove_signal_watch()
|
||||
bus.disconnect(watchId)
|
||||
gst.set_state(Gst.State.NULL)
|
||||
decoder = 'decodebin'
|
||||
if videoCodec == 'h264':
|
||||
decoder = h264Decoder or 'Default'
|
||||
if decoder == 'Default':
|
||||
if platform.system() == 'Darwin':
|
||||
decoder = 'vtdec_hw'
|
||||
else:
|
||||
decoder = 'decodebin'
|
||||
|
||||
def finish():
|
||||
nonlocal hasFinished
|
||||
hasFinished = True
|
||||
callback = Callback(None)
|
||||
callbackQueue.put(callback)
|
||||
if not asyncFuture.done():
|
||||
asyncFuture.set_result(None)
|
||||
if not finished.done():
|
||||
finished.set_result(None)
|
||||
videosrc += ' ! {decoder} ! queue leaky=downstream max-size-buffers=0 ! videoconvert ! {videocaps}'.format(decoder=decoder, videocaps=videocaps)
|
||||
|
||||
watchId = bus.connect('message', on_bus_message)
|
||||
bus.add_signal_watch()
|
||||
gst, gen = createPipelineIterator(videosrc)
|
||||
async for gstsample in gen():
|
||||
caps = gstsample.get_caps()
|
||||
height = caps.get_structure(0).get_value('height')
|
||||
width = caps.get_structure(0).get_value('width')
|
||||
gst_buffer = gstsample.get_buffer()
|
||||
result, info = gst_buffer.map(Gst.MapFlags.READ)
|
||||
if not result:
|
||||
continue
|
||||
|
||||
finished = concurrent.futures.Future()
|
||||
finished.add_done_callback(lambda _: threading.Thread(target=stopGst, name="StopGst").start())
|
||||
hasFinished = False
|
||||
|
||||
appsink = gst.get_by_name('appsink')
|
||||
callbackQueue = Queue()
|
||||
asyncFuture = asyncio.Future()
|
||||
|
||||
async def gen():
|
||||
try:
|
||||
while True:
|
||||
nonlocal asyncFuture
|
||||
asyncFuture = asyncio.Future()
|
||||
yieldFuture = asyncio.Future()
|
||||
async def asyncCallback(sample):
|
||||
asyncFuture.set_result(sample)
|
||||
await yieldFuture
|
||||
callbackQueue.put(Callback(asyncCallback))
|
||||
sample = await asyncFuture
|
||||
if not sample:
|
||||
yieldFuture.set_result(None)
|
||||
break
|
||||
try:
|
||||
yield sample
|
||||
finally:
|
||||
yieldFuture.set_result(None)
|
||||
finally:
|
||||
finish()
|
||||
print('gstreamer finished')
|
||||
|
||||
|
||||
def on_new_sample(sink, preroll):
|
||||
nonlocal hasFinished
|
||||
|
||||
sample = sink.emit('pull-preroll' if preroll else 'pull-sample')
|
||||
|
||||
callback: Callback = callbackQueue.get()
|
||||
if not callback.callback or hasFinished:
|
||||
hasFinished = True
|
||||
if callback.callback:
|
||||
asyncio.run_coroutine_threadsafe(callback.callback(None), loop = callback.loop)
|
||||
return Gst.FlowReturn.OK
|
||||
|
||||
future = asyncio.run_coroutine_threadsafe(callback.callback(sample), loop = callback.loop)
|
||||
try:
|
||||
future.result()
|
||||
except:
|
||||
pass
|
||||
return Gst.FlowReturn.OK
|
||||
|
||||
appsink.connect('new-preroll', on_new_sample, True)
|
||||
appsink.connect('new-sample', on_new_sample, False)
|
||||
|
||||
gst.set_state(Gst.State.PLAYING)
|
||||
return gst, gen
|
||||
|
||||
def mainThread():
|
||||
async def asyncMain():
|
||||
gst, gen = createPipelineIterator('rtspsrc location=rtsp://localhost:59668/18cc179a814fd5b3 ! rtph264depay ! h264parse ! vtdec_hw ! videoconvert ! video/x-raw')
|
||||
i = 0
|
||||
async for sample in gen():
|
||||
print('sample')
|
||||
i = i + 1
|
||||
if i == 10:
|
||||
break
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.ensure_future(asyncMain(), loop = loop)
|
||||
loop.run_forever()
|
||||
|
||||
if __name__ == "__main__":
|
||||
threading.Thread(target = mainThread).start()
|
||||
mainLoop = GLib.MainLoop()
|
||||
mainLoop.run()
|
||||
vips = pyvips.Image.new_from_memory(info.data, width, height, bands, pyvips.BandFormat.UCHAR)
|
||||
vipsImage = VipsImage(vips)
|
||||
try:
|
||||
mo = await createVipsMediaObject(VipsImage(vips))
|
||||
yield mo
|
||||
finally:
|
||||
vipsImage.vipsImage.invalidate()
|
||||
vipsImage.vipsImage = None
|
||||
finally:
|
||||
gst_buffer.unmap(info)
|
||||
|
||||
51
plugins/python-codecs/src/libav.py
Normal file
51
plugins/python-codecs/src/libav.py
Normal file
@@ -0,0 +1,51 @@
|
||||
import time
|
||||
from gst_generator import createPipelineIterator
|
||||
import scrypted_sdk
|
||||
from typing import Any
|
||||
import pyvips
|
||||
from vips import createVipsMediaObject, VipsImage
|
||||
|
||||
av = None
|
||||
try:
|
||||
import av
|
||||
av.logging.set_level(av.logging.PANIC)
|
||||
except:
|
||||
pass
|
||||
|
||||
async def generateVideoFramesLibav(mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
|
||||
ffmpegInput: scrypted_sdk.FFmpegInput = await scrypted_sdk.mediaManager.convertMediaObjectToJSON(mediaObject, scrypted_sdk.ScryptedMimeTypes.FFmpegInput.value)
|
||||
videosrc = ffmpegInput.get('url')
|
||||
container = av.open(videosrc)
|
||||
# none of this stuff seems to work. might be libav being slow with rtsp.
|
||||
# container.no_buffer = True
|
||||
# container.gen_pts = False
|
||||
# container.options['-analyzeduration'] = '0'
|
||||
# container.options['-probesize'] = '500000'
|
||||
stream = container.streams.video[0]
|
||||
# stream.codec_context.thread_count = 1
|
||||
# stream.codec_context.low_delay = True
|
||||
# stream.codec_context.options['-analyzeduration'] = '0'
|
||||
# stream.codec_context.options['-probesize'] = '500000'
|
||||
|
||||
start = 0
|
||||
try:
|
||||
for idx, frame in enumerate(container.decode(stream)):
|
||||
now = time.time()
|
||||
if not start:
|
||||
start = now
|
||||
elapsed = now - start
|
||||
if (frame.time or 0) < elapsed - 0.500:
|
||||
# print('too slow, skipping frame')
|
||||
continue
|
||||
# print(frame)
|
||||
vips = pyvips.Image.new_from_array(frame.to_ndarray(format='rgb24'))
|
||||
vipsImage = VipsImage(vips)
|
||||
try:
|
||||
mo = await createVipsMediaObject(VipsImage(vips))
|
||||
yield mo
|
||||
finally:
|
||||
vipsImage.vipsImage.invalidate()
|
||||
vipsImage.vipsImage = None
|
||||
|
||||
finally:
|
||||
container.close()
|
||||
@@ -1,19 +1,12 @@
|
||||
import time
|
||||
from gstreamer import createPipelineIterator
|
||||
import asyncio
|
||||
from util import optional_chain
|
||||
import scrypted_sdk
|
||||
from scrypted_sdk import Setting, SettingValue
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
import pyvips
|
||||
import concurrent.futures
|
||||
import gstreamer
|
||||
import libav
|
||||
|
||||
Gst = None
|
||||
try:
|
||||
import gi
|
||||
gi.require_version('Gst', '1.0')
|
||||
gi.require_version('GstBase', '1.0')
|
||||
|
||||
from gi.repository import Gst
|
||||
except:
|
||||
pass
|
||||
@@ -21,101 +14,42 @@ except:
|
||||
av = None
|
||||
try:
|
||||
import av
|
||||
av.logging.set_level(av.logging.PANIC)
|
||||
except:
|
||||
pass
|
||||
|
||||
# vips is already multithreaded, but needs to be kicked off the python asyncio thread.
|
||||
vipsExecutor = concurrent.futures.ThreadPoolExecutor(max_workers=2, thread_name_prefix="vips")
|
||||
|
||||
async def to_thread(f):
|
||||
loop = asyncio.get_running_loop()
|
||||
return await loop.run_in_executor(vipsExecutor, f)
|
||||
|
||||
class VipsImage(scrypted_sdk.VideoFrame):
|
||||
def __init__(self, vipsImage: pyvips.Image) -> None:
|
||||
super().__init__()
|
||||
self.vipsImage = vipsImage
|
||||
self.width = vipsImage.width
|
||||
self.height = vipsImage.height
|
||||
|
||||
async def toBuffer(self, options: scrypted_sdk.ImageOptions = None) -> bytearray:
|
||||
vipsImage: VipsImage = await self.toVipsImage(options)
|
||||
|
||||
if not options or not options.get('format', None):
|
||||
def format():
|
||||
return memoryview(vipsImage.vipsImage.write_to_memory())
|
||||
return await to_thread(format)
|
||||
elif options['format'] == 'rgb':
|
||||
def format():
|
||||
if vipsImage.vipsImage.hasalpha():
|
||||
rgb = vipsImage.vipsImage.extract_band(0, vipsImage.vipsImage.bands - 1)
|
||||
else:
|
||||
rgb = vipsImage.vipsImage
|
||||
mem = memoryview(rgb.write_to_memory())
|
||||
return mem
|
||||
return await to_thread(format)
|
||||
|
||||
return await to_thread(lambda: vipsImage.vipsImage.write_to_buffer('.' + options['format']))
|
||||
|
||||
async def toVipsImage(self, options: scrypted_sdk.ImageOptions = None):
|
||||
return await to_thread(lambda: toVipsImage(self, options))
|
||||
|
||||
async def toImage(self, options: scrypted_sdk.ImageOptions = None) -> Any:
|
||||
if options and options.get('format', None):
|
||||
raise Exception('format can only be used with toBuffer')
|
||||
newVipsImage = await self.toVipsImage(options)
|
||||
return await createVipsMediaObject(newVipsImage)
|
||||
|
||||
def toVipsImage(vipsImageWrapper: VipsImage, options: scrypted_sdk.ImageOptions = None) -> VipsImage:
|
||||
vipsImage = vipsImageWrapper.vipsImage
|
||||
if not vipsImage:
|
||||
raise Exception('Video Frame has been invalidated')
|
||||
options = options or {}
|
||||
crop = options.get('crop')
|
||||
if crop:
|
||||
vipsImage = vipsImage.crop(int(crop['left']), int(crop['top']), int(crop['width']), int(crop['height']))
|
||||
|
||||
resize = options.get('resize')
|
||||
if resize:
|
||||
xscale = None
|
||||
if resize.get('width'):
|
||||
xscale = resize['width'] / vipsImage.width
|
||||
scale = xscale
|
||||
yscale = None
|
||||
if resize.get('height'):
|
||||
yscale = resize['height'] / vipsImage.height
|
||||
scale = yscale
|
||||
|
||||
if xscale and yscale:
|
||||
scale = min(yscale, xscale)
|
||||
|
||||
xscale = xscale or yscale
|
||||
yscale = yscale or xscale
|
||||
vipsImage = vipsImage.resize(xscale, vscale=yscale, kernel='linear')
|
||||
|
||||
return VipsImage(vipsImage)
|
||||
|
||||
async def createVipsMediaObject(image: VipsImage):
|
||||
ret = await scrypted_sdk.mediaManager.createMediaObject(image, scrypted_sdk.ScryptedMimeTypes.Image.value, {
|
||||
'width': image.width,
|
||||
'height': image.height,
|
||||
'toBuffer': lambda options = None: image.toBuffer(options),
|
||||
'toImage': lambda options = None: image.toImage(options),
|
||||
})
|
||||
return ret
|
||||
|
||||
class LibavGenerator(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.VideoFrameGenerator):
|
||||
async def generateVideoFrames(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
|
||||
worker = scrypted_sdk.fork()
|
||||
forked: CodecFork = await worker.result
|
||||
return await forked.generateVideoFramesLibav(mediaObject, options, filter)
|
||||
|
||||
class GstreamerGenerator(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.VideoFrameGenerator):
|
||||
class GstreamerGenerator(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.VideoFrameGenerator, scrypted_sdk.Settings):
|
||||
async def generateVideoFrames(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
|
||||
worker = scrypted_sdk.fork()
|
||||
forked: CodecFork = await worker.result
|
||||
return await forked.generateVideoFramesGstreamer(mediaObject, options, filter)
|
||||
return await forked.generateVideoFramesGstreamer(mediaObject, options, filter, self.storage.getItem('h264Decoder'))
|
||||
|
||||
async def getSettings(self) -> list[Setting]:
|
||||
return [
|
||||
{
|
||||
'key': 'h264Decoder',
|
||||
'title': 'H264 Decoder',
|
||||
'description': 'The Gstreamer pipeline to use to decode H264 video.',
|
||||
'value': self.storage.getItem('h264Decoder') or 'Default',
|
||||
'choices': [
|
||||
'Default',
|
||||
'decodebin',
|
||||
'vtdec_hw',
|
||||
'nvh264dec',
|
||||
'vaapih264dec',
|
||||
],
|
||||
'combobox': True,
|
||||
}
|
||||
]
|
||||
|
||||
async def putSetting(self, key: str, value: SettingValue) -> None:
|
||||
self.storage.setItem(key, value)
|
||||
await scrypted_sdk.deviceManager.onDeviceEvent(self.nativeId, scrypted_sdk.ScryptedInterface.Settings.value, None)
|
||||
|
||||
class PythonCodecs(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.DeviceProvider):
|
||||
def __init__(self, nativeId = None):
|
||||
@@ -133,6 +67,7 @@ class PythonCodecs(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.DeviceProvider)
|
||||
'nativeId': 'gstreamer',
|
||||
'interfaces': [
|
||||
scrypted_sdk.ScryptedInterface.VideoFrameGenerator.value,
|
||||
scrypted_sdk.ScryptedInterface.Settings.value,
|
||||
],
|
||||
'type': scrypted_sdk.ScryptedDeviceType.API.value,
|
||||
}
|
||||
@@ -160,108 +95,10 @@ class PythonCodecs(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.DeviceProvider)
|
||||
def create_scrypted_plugin():
|
||||
return PythonCodecs()
|
||||
|
||||
async def generateVideoFramesLibav(mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
|
||||
ffmpegInput: scrypted_sdk.FFmpegInput = await scrypted_sdk.mediaManager.convertMediaObjectToJSON(mediaObject, scrypted_sdk.ScryptedMimeTypes.FFmpegInput.value)
|
||||
videosrc = ffmpegInput.get('url')
|
||||
container = av.open(videosrc, options = options)
|
||||
# none of this stuff seems to work. might be libav being slow with rtsp.
|
||||
# container.no_buffer = True
|
||||
# container.options['-analyzeduration'] = '0'
|
||||
# container.options['-probesize'] = '500000'
|
||||
stream = container.streams.video[0]
|
||||
# stream.codec_context.thread_count = 1
|
||||
# stream.codec_context.low_delay = True
|
||||
# stream.codec_context.options['-analyzeduration'] = '0'
|
||||
# stream.codec_context.options['-probesize'] = '500000'
|
||||
|
||||
start = 0
|
||||
try:
|
||||
for idx, frame in enumerate(container.decode(stream)):
|
||||
now = time.time()
|
||||
if not start:
|
||||
start = now
|
||||
elapsed = now - start
|
||||
if (frame.time or 0) < elapsed - 0.500:
|
||||
# print('too slow, skipping frame')
|
||||
continue
|
||||
# print(frame)
|
||||
vips = pyvips.Image.new_from_array(frame.to_ndarray(format='rgb24'))
|
||||
vipsImage = VipsImage(vips)
|
||||
try:
|
||||
mo = await createVipsMediaObject(VipsImage(vips))
|
||||
yield mo
|
||||
finally:
|
||||
vipsImage.vipsImage.invalidate()
|
||||
vipsImage.vipsImage = None
|
||||
|
||||
finally:
|
||||
container.close()
|
||||
|
||||
|
||||
async def generateVideoFramesGstreamer(mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
|
||||
ffmpegInput: scrypted_sdk.FFmpegInput = await scrypted_sdk.mediaManager.convertMediaObjectToJSON(mediaObject, scrypted_sdk.ScryptedMimeTypes.FFmpegInput.value)
|
||||
container = ffmpegInput.get('container', None)
|
||||
videosrc = ffmpegInput.get('url')
|
||||
videoCodec = optional_chain(ffmpegInput, 'mediaStreamOptions', 'video', 'codec')
|
||||
|
||||
if videosrc.startswith('tcp://'):
|
||||
parsed_url = urlparse(videosrc)
|
||||
videosrc = 'tcpclientsrc port=%s host=%s' % (
|
||||
parsed_url.port, parsed_url.hostname)
|
||||
if container == 'mpegts':
|
||||
videosrc += ' ! tsdemux'
|
||||
elif container == 'sdp':
|
||||
videosrc += ' ! sdpdemux'
|
||||
else:
|
||||
raise Exception('unknown container %s' % container)
|
||||
elif videosrc.startswith('rtsp'):
|
||||
videosrc = 'rtspsrc buffer-mode=0 location=%s protocols=tcp latency=0 is-live=false' % videosrc
|
||||
if videoCodec == 'h264':
|
||||
videosrc += ' ! rtph264depay ! h264parse'
|
||||
|
||||
videocaps = 'video/x-raw'
|
||||
# if options and options.get('resize'):
|
||||
# videocaps = 'videoscale ! video/x-raw,width={width},height={height}'.format(width=options['resize']['width'], height=options['resize']['height'])
|
||||
|
||||
format = options and options.get('format')
|
||||
# I420 is a cheap way to get gray out of an h264 stream without color conversion.
|
||||
if format == 'gray':
|
||||
format = 'I420'
|
||||
bands = 1
|
||||
else:
|
||||
format = 'RGB'
|
||||
bands = 3
|
||||
|
||||
videocaps += ',format={format}'.format(format=format)
|
||||
|
||||
videosrc += ' ! decodebin ! queue leaky=downstream max-size-buffers=0 ! videoconvert ! ' + videocaps
|
||||
|
||||
gst, gen = createPipelineIterator(videosrc)
|
||||
async for gstsample in gen():
|
||||
caps = gstsample.get_caps()
|
||||
height = caps.get_structure(0).get_value('height')
|
||||
width = caps.get_structure(0).get_value('width')
|
||||
gst_buffer = gstsample.get_buffer()
|
||||
result, info = gst_buffer.map(Gst.MapFlags.READ)
|
||||
if not result:
|
||||
continue
|
||||
|
||||
try:
|
||||
vips = pyvips.Image.new_from_memory(info.data, width, height, bands, pyvips.BandFormat.UCHAR)
|
||||
vipsImage = VipsImage(vips)
|
||||
try:
|
||||
mo = await createVipsMediaObject(VipsImage(vips))
|
||||
yield mo
|
||||
finally:
|
||||
vipsImage.vipsImage.invalidate()
|
||||
vipsImage.vipsImage = None
|
||||
finally:
|
||||
gst_buffer.unmap(info)
|
||||
|
||||
class CodecFork:
|
||||
async def generateVideoFramesGstreamer(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
|
||||
async def generateVideoFramesGstreamer(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None, h264Decoder: str = None) -> scrypted_sdk.VideoFrame:
|
||||
try:
|
||||
async for data in generateVideoFramesGstreamer(mediaObject, options, filter):
|
||||
async for data in gstreamer.generateVideoFramesGstreamer(mediaObject, options, filter, h264Decoder):
|
||||
yield data
|
||||
finally:
|
||||
import os
|
||||
@@ -270,7 +107,7 @@ class CodecFork:
|
||||
|
||||
async def generateVideoFramesLibav(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
|
||||
try:
|
||||
async for data in generateVideoFramesLibav(mediaObject, options, filter):
|
||||
async for data in libav.generateVideoFramesLibav(mediaObject, options, filter):
|
||||
yield data
|
||||
finally:
|
||||
import os
|
||||
|
||||
89
plugins/python-codecs/src/vips.py
Normal file
89
plugins/python-codecs/src/vips.py
Normal file
@@ -0,0 +1,89 @@
|
||||
import time
|
||||
from gst_generator import createPipelineIterator
|
||||
import asyncio
|
||||
from util import optional_chain
|
||||
import scrypted_sdk
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
import pyvips
|
||||
import concurrent.futures
|
||||
|
||||
# vips is already multithreaded, but needs to be kicked off the python asyncio thread.
|
||||
vipsExecutor = concurrent.futures.ThreadPoolExecutor(max_workers=2, thread_name_prefix="vips")
|
||||
|
||||
async def to_thread(f):
|
||||
loop = asyncio.get_running_loop()
|
||||
return await loop.run_in_executor(vipsExecutor, f)
|
||||
|
||||
class VipsImage(scrypted_sdk.VideoFrame):
|
||||
def __init__(self, vipsImage: pyvips.Image) -> None:
|
||||
super().__init__()
|
||||
self.vipsImage = vipsImage
|
||||
self.width = vipsImage.width
|
||||
self.height = vipsImage.height
|
||||
|
||||
async def toBuffer(self, options: scrypted_sdk.ImageOptions = None) -> bytearray:
|
||||
vipsImage: VipsImage = await self.toVipsImage(options)
|
||||
|
||||
if not options or not options.get('format', None):
|
||||
def format():
|
||||
return memoryview(vipsImage.vipsImage.write_to_memory())
|
||||
return await to_thread(format)
|
||||
elif options['format'] == 'rgb':
|
||||
def format():
|
||||
if vipsImage.vipsImage.hasalpha():
|
||||
rgb = vipsImage.vipsImage.extract_band(0, vipsImage.vipsImage.bands - 1)
|
||||
else:
|
||||
rgb = vipsImage.vipsImage
|
||||
mem = memoryview(rgb.write_to_memory())
|
||||
return mem
|
||||
return await to_thread(format)
|
||||
|
||||
return await to_thread(lambda: vipsImage.vipsImage.write_to_buffer('.' + options['format']))
|
||||
|
||||
async def toVipsImage(self, options: scrypted_sdk.ImageOptions = None):
|
||||
return await to_thread(lambda: toVipsImage(self, options))
|
||||
|
||||
async def toImage(self, options: scrypted_sdk.ImageOptions = None) -> Any:
|
||||
if options and options.get('format', None):
|
||||
raise Exception('format can only be used with toBuffer')
|
||||
newVipsImage = await self.toVipsImage(options)
|
||||
return await createVipsMediaObject(newVipsImage)
|
||||
|
||||
def toVipsImage(vipsImageWrapper: VipsImage, options: scrypted_sdk.ImageOptions = None) -> VipsImage:
|
||||
vipsImage = vipsImageWrapper.vipsImage
|
||||
if not vipsImage:
|
||||
raise Exception('Video Frame has been invalidated')
|
||||
options = options or {}
|
||||
crop = options.get('crop')
|
||||
if crop:
|
||||
vipsImage = vipsImage.crop(int(crop['left']), int(crop['top']), int(crop['width']), int(crop['height']))
|
||||
|
||||
resize = options.get('resize')
|
||||
if resize:
|
||||
xscale = None
|
||||
if resize.get('width'):
|
||||
xscale = resize['width'] / vipsImage.width
|
||||
scale = xscale
|
||||
yscale = None
|
||||
if resize.get('height'):
|
||||
yscale = resize['height'] / vipsImage.height
|
||||
scale = yscale
|
||||
|
||||
if xscale and yscale:
|
||||
scale = min(yscale, xscale)
|
||||
|
||||
xscale = xscale or yscale
|
||||
yscale = yscale or xscale
|
||||
vipsImage = vipsImage.resize(xscale, vscale=yscale, kernel='linear')
|
||||
|
||||
return VipsImage(vipsImage)
|
||||
|
||||
async def createVipsMediaObject(image: VipsImage):
|
||||
ret = await scrypted_sdk.mediaManager.createMediaObject(image, scrypted_sdk.ScryptedMimeTypes.Image.value, {
|
||||
'width': image.width,
|
||||
'height': image.height,
|
||||
'toBuffer': lambda options = None: image.toBuffer(options),
|
||||
'toImage': lambda options = None: image.toImage(options),
|
||||
})
|
||||
return ret
|
||||
@@ -302,7 +302,6 @@ class DetectPlugin(scrypted_sdk.ScryptedDeviceBase, ObjectDetection):
|
||||
'detected': detected,
|
||||
'videoFrame': videoFrame,
|
||||
}
|
||||
await self.detection_event_notified(detection_session.settings)
|
||||
finally:
|
||||
try:
|
||||
await videoFrames.aclose()
|
||||
|
||||
4
server/package-lock.json
generated
4
server/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@scrypted/server",
|
||||
"version": "0.7.11",
|
||||
"version": "0.7.12",
|
||||
"lockfileVersion": 2,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@scrypted/server",
|
||||
"version": "0.7.11",
|
||||
"version": "0.7.12",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"@mapbox/node-pre-gyp": "^1.0.10",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@scrypted/server",
|
||||
"version": "0.7.12",
|
||||
"version": "0.7.13",
|
||||
"description": "",
|
||||
"dependencies": {
|
||||
"@mapbox/node-pre-gyp": "^1.0.10",
|
||||
|
||||
@@ -44,7 +44,7 @@ class StreamPipeReader:
|
||||
def readBlocking(self, n):
|
||||
b = bytes(0)
|
||||
while len(b) < n:
|
||||
self.conn.poll()
|
||||
self.conn.poll(None)
|
||||
add = os.read(self.conn.fileno(), n - len(b))
|
||||
if not len(add):
|
||||
raise Exception('unable to read requested bytes')
|
||||
|
||||
Reference in New Issue
Block a user