mirror of
https://github.com/koush/scrypted.git
synced 2026-02-28 08:22:29 +00:00
python-codecs: major refactor to support hw acceleration and on demand color space conversion
This commit is contained in:
@@ -12,3 +12,13 @@ def createVideoFrame(image) -> scrypted_sdk.VideoFrame:
|
||||
'timestamp': time.time() * 1000,
|
||||
'flush': flush,
|
||||
}
|
||||
|
||||
async def createImageMediaObject(image: scrypted_sdk.Image):
|
||||
ret = await scrypted_sdk.mediaManager.createMediaObject(image, scrypted_sdk.ScryptedMimeTypes.Image.value, {
|
||||
'format': None,
|
||||
'width': image.width,
|
||||
'height': image.height,
|
||||
'toBuffer': lambda options = None: image.toBuffer(options),
|
||||
'toImage': lambda options = None: image.toImage(options),
|
||||
})
|
||||
return ret
|
||||
|
||||
@@ -12,36 +12,54 @@ try:
|
||||
GObject.threads_init()
|
||||
Gst.init(None)
|
||||
except:
|
||||
pass
|
||||
Gst = None
|
||||
|
||||
async def createPipelineIterator(pipeline: str):
|
||||
async def createPipelineIterator(pipeline: str, gst = None):
|
||||
loop = asyncio.get_running_loop()
|
||||
pipeline = '{pipeline} ! queue leaky=downstream max-size-buffers=0 ! appsink name=appsink emit-signals=true sync=false max-buffers=-1 drop=true'.format(pipeline=pipeline)
|
||||
pipeline = '{pipeline} ! appsink name=appsink emit-signals=true sync=false max-buffers=-1 drop=true'.format(pipeline=pipeline)
|
||||
print(pipeline)
|
||||
gst = Gst.parse_launch(pipeline)
|
||||
bus = gst.get_bus()
|
||||
finished = concurrent.futures.Future()
|
||||
|
||||
def on_bus_message(bus, message):
|
||||
t = str(message.type)
|
||||
# print(t)
|
||||
if t == str(Gst.MessageType.EOS):
|
||||
print('EOS: Stream ended.')
|
||||
finish()
|
||||
elif t == str(Gst.MessageType.WARNING):
|
||||
err, debug = message.parse_warning()
|
||||
print('Warning: %s: %s\n' % (err, debug))
|
||||
print('Ending stream due to warning. If this camera is causing errors, switch to the libav decoder.');
|
||||
finish();
|
||||
elif t == str(Gst.MessageType.ERROR):
|
||||
err, debug = message.parse_error()
|
||||
print('Error: %s: %s\n' % (err, debug))
|
||||
finish()
|
||||
newGst = not gst
|
||||
if gst:
|
||||
bin = Gst.parse_bin_from_description(pipeline, False)
|
||||
gst.add(bin)
|
||||
gst = bin
|
||||
|
||||
def stopGst():
|
||||
bus.remove_signal_watch()
|
||||
bus.disconnect(watchId)
|
||||
gst.set_state(Gst.State.NULL)
|
||||
def stopGst():
|
||||
gst.set_state(Gst.State.NULL)
|
||||
|
||||
else:
|
||||
gst = Gst.parse_launch(pipeline)
|
||||
|
||||
def on_bus_message(bus, message):
|
||||
t = str(message.type)
|
||||
# print(t)
|
||||
if t == str(Gst.MessageType.EOS):
|
||||
print('EOS: Stream ended.')
|
||||
finish()
|
||||
elif t == str(Gst.MessageType.WARNING):
|
||||
err, debug = message.parse_warning()
|
||||
print('Warning: %s: %s\n' % (err, debug))
|
||||
print('Ending stream due to warning. If this camera is causing errors, switch to the libav decoder.');
|
||||
finish()
|
||||
elif t == str(Gst.MessageType.ERROR):
|
||||
err, debug = message.parse_error()
|
||||
print('Error: %s: %s\n' % (err, debug))
|
||||
finish()
|
||||
|
||||
bus = gst.get_bus()
|
||||
watchId = bus.connect('message', on_bus_message)
|
||||
bus.add_signal_watch()
|
||||
|
||||
def stopGst():
|
||||
bus.remove_signal_watch()
|
||||
bus.disconnect(watchId)
|
||||
gst.set_state(Gst.State.NULL)
|
||||
|
||||
finished.add_done_callback(lambda _: threading.Thread(target=stopGst, name="StopGst").start())
|
||||
|
||||
hasFinished = False
|
||||
def finish():
|
||||
nonlocal hasFinished
|
||||
hasFinished = True
|
||||
@@ -50,12 +68,6 @@ async def createPipelineIterator(pipeline: str):
|
||||
if not finished.done():
|
||||
finished.set_result(None)
|
||||
|
||||
watchId = bus.connect('message', on_bus_message)
|
||||
bus.add_signal_watch()
|
||||
|
||||
finished = concurrent.futures.Future()
|
||||
finished.add_done_callback(lambda _: threading.Thread(target=stopGst, name="StopGst").start())
|
||||
hasFinished = False
|
||||
|
||||
appsink = gst.get_by_name('appsink')
|
||||
yieldQueue = Queue()
|
||||
@@ -98,10 +110,33 @@ async def createPipelineIterator(pipeline: str):
|
||||
|
||||
def mainThread():
|
||||
async def asyncMain():
|
||||
gst, gen = createPipelineIterator('rtspsrc location=rtsp://localhost:59668/18cc179a814fd5b3 ! rtph264depay ! h264parse ! vtdec_hw ! videoconvert ! video/x-raw')
|
||||
gst, gen = await createPipelineIterator('rtspsrc location=rtsp://localhost:63876/674e895e04ddfd15 ! rtph264depay ! h264parse ! vtdec_hw ! video/x-raw(memory:GLMemory)')
|
||||
i = 0
|
||||
first = True
|
||||
async for sample in gen():
|
||||
print('sample')
|
||||
import time
|
||||
print(time.time())
|
||||
if first:
|
||||
first = False
|
||||
|
||||
for i in range(1, 10):
|
||||
caps = sample.get_caps()
|
||||
p = "appsrc name=appsrc emit-signals=True is-live=True \
|
||||
caps={caps} ! videocrop left=0 top=0 right=10 bottom=10 ! gldownload".format(caps = caps.to_string().replace(' ', ''))
|
||||
# p = "appsrc name=appsrc emit-signals=True is-live=True \
|
||||
# caps={caps} ! gldownload !\
|
||||
# videoconvert ! videoscale name=videoscale ! video/x-raw,format=RGB,width=640,height=480".format(caps = caps.to_string().replace(' ', ''))
|
||||
gst2, gen2 = await createPipelineIterator(p)
|
||||
appsrc = gst2.get_by_name('appsrc')
|
||||
vs = gst2.get_by_name('videoscale')
|
||||
g2 = gen2()
|
||||
|
||||
buffer = sample.get_buffer()
|
||||
appsrc.emit("push-buffer", buffer)
|
||||
s2 = await g2.__anext__()
|
||||
print(time.time())
|
||||
await g2.aclose()
|
||||
|
||||
i = i + 1
|
||||
if i == 10:
|
||||
break
|
||||
@@ -111,6 +146,8 @@ def mainThread():
|
||||
loop.run_forever()
|
||||
|
||||
if __name__ == "__main__":
|
||||
test = 334
|
||||
foo = f"{test}"
|
||||
threading.Thread(target = mainThread).start()
|
||||
mainLoop = GLib.MainLoop()
|
||||
mainLoop.run()
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from gst_generator import createPipelineIterator
|
||||
from gst_generator import createPipelineIterator, Gst
|
||||
from gstreamer_postprocess import GstreamerPostProcess, AppleMediaPostProcess, VaapiPostProcess
|
||||
from util import optional_chain
|
||||
import scrypted_sdk
|
||||
from typing import Any
|
||||
@@ -6,53 +7,287 @@ from urllib.parse import urlparse
|
||||
import vipsimage
|
||||
import pilimage
|
||||
import platform
|
||||
from generator_common import createVideoFrame
|
||||
from generator_common import createVideoFrame, createImageMediaObject
|
||||
from typing import Tuple
|
||||
import copy
|
||||
|
||||
Gst = None
|
||||
try:
|
||||
import gi
|
||||
gi.require_version('Gst', '1.0')
|
||||
gi.require_version('GstBase', '1.0')
|
||||
def getBands(caps):
|
||||
capsFormat = caps.get_structure(0).get_value('format')
|
||||
|
||||
from gi.repository import Gst
|
||||
except:
|
||||
pass
|
||||
if capsFormat == 'RGB':
|
||||
return 3
|
||||
elif capsFormat == 'RGBA':
|
||||
return 4
|
||||
elif capsFormat == 'NV12' or capsFormat == 'I420':
|
||||
return 1
|
||||
|
||||
async def generateVideoFramesGstreamer(mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None, h264Decoder: str = None) -> scrypted_sdk.VideoFrame:
|
||||
raise Exception(f'unknown pixel format, please report this bug to @koush on Discord {capsFormat}')
|
||||
|
||||
class GstSession:
|
||||
def __init__(self, gst) -> None:
|
||||
self.gst = gst
|
||||
self.reuse = []
|
||||
|
||||
class GstImage(scrypted_sdk.Image):
|
||||
def __init__(self, gst: GstSession, sample, postProcessPipeline: str):
|
||||
super().__init__()
|
||||
caps = sample.get_caps()
|
||||
self.width = caps.get_structure(0).get_value('width')
|
||||
self.height = caps.get_structure(0).get_value('height')
|
||||
self.gst = gst
|
||||
self.sample = sample
|
||||
self.postProcessPipeline = postProcessPipeline
|
||||
|
||||
async def close(self):
|
||||
self.sample = None
|
||||
|
||||
async def toImage(self, options: scrypted_sdk.ImageOptions = None):
|
||||
copyOptions: scrypted_sdk.ImageOptions = None
|
||||
needPostProcess = False
|
||||
if not self.postProcessPipeline:
|
||||
copyOptions = copy.deepcopy(options)
|
||||
if options:
|
||||
if options.get('crop') or options.get('resize'):
|
||||
needPostProcess = True
|
||||
options['crop'] = None
|
||||
options['resize'] = None
|
||||
|
||||
gstsample = await toGstSample(self.gst, self.sample, options, self.postProcessPipeline)
|
||||
caps = gstsample.get_caps()
|
||||
capsBands = getBands(caps)
|
||||
height = caps.get_structure(0).get_value('height')
|
||||
width = caps.get_structure(0).get_value('width')
|
||||
|
||||
gst_buffer = gstsample.get_buffer()
|
||||
result, info = gst_buffer.map(Gst.MapFlags.READ)
|
||||
if not result:
|
||||
raise Exception('unable to map gst buffer')
|
||||
|
||||
try:
|
||||
if vipsimage.pyvips:
|
||||
vips = vipsimage.new_from_memory(bytes(info.data), width, height, capsBands)
|
||||
image = vipsimage.VipsImage(vips)
|
||||
else:
|
||||
pil = pilimage.new_from_memory(bytes(info.data), width, height, capsBands)
|
||||
image = pilimage.PILImage(pil)
|
||||
|
||||
if needPostProcess:
|
||||
image = await image.toImage(copyOptions)
|
||||
return await createImageMediaObject(image)
|
||||
finally:
|
||||
gst_buffer.unmap(info)
|
||||
|
||||
async def toBuffer(self, options: scrypted_sdk.ImageOptions = None):
|
||||
format = options and options.get('format')
|
||||
if format == 'rgb':
|
||||
bands = 3
|
||||
elif format == 'rgba':
|
||||
bands = 4
|
||||
elif format == 'gray':
|
||||
bands = 1
|
||||
elif format == 'jpg':
|
||||
bands = 0
|
||||
else:
|
||||
raise Exception(f'invalid output format {format}')
|
||||
|
||||
copyOptions: scrypted_sdk.ImageOptions = None
|
||||
needPostProcess = False
|
||||
if not self.postProcessPipeline:
|
||||
copyOptions = copy.deepcopy(options)
|
||||
if options:
|
||||
if options.get('crop') or options.get('resize'):
|
||||
needPostProcess = True
|
||||
options['crop'] = None
|
||||
options['resize'] = None
|
||||
|
||||
gstsample = await toGstSample(self.gst, self.sample, options, self.postProcessPipeline)
|
||||
caps = gstsample.get_caps()
|
||||
height = caps.get_structure(0).get_value('height')
|
||||
width = caps.get_structure(0).get_value('width')
|
||||
capsFormat = caps.get_structure(0).get_value('format')
|
||||
|
||||
if capsFormat == 'RGB':
|
||||
capsBands = 3
|
||||
elif capsFormat == 'RGBA':
|
||||
capsBands = 4
|
||||
elif capsFormat == 'NV12' or capsFormat == 'I420':
|
||||
capsBands = 1
|
||||
else:
|
||||
raise Exception(f'unknown pixel format, please report this bug to @koush on Discord {capsFormat}')
|
||||
|
||||
gst_buffer = gstsample.get_buffer()
|
||||
result, info = gst_buffer.map(Gst.MapFlags.READ)
|
||||
if not result:
|
||||
raise Exception('unable to map gst buffer')
|
||||
|
||||
try:
|
||||
# print("~~~~~~~~~SAMPLE", width, height)
|
||||
# pil = pilimage.new_from_memory(info.data, width, height, capsBands)
|
||||
# pil.convert('RGB').save('/server/volume/test.jpg')
|
||||
|
||||
# format may have been previously specified and known to caller?
|
||||
|
||||
if not needPostProcess:
|
||||
if not format:
|
||||
return bytes(info.data)
|
||||
|
||||
if format == 'gray' and capsBands == 1:
|
||||
buffer = bytes(info.data)
|
||||
return buffer[0:width * height]
|
||||
|
||||
if bands == capsBands:
|
||||
buffer = bytes(info.data)
|
||||
return buffer
|
||||
|
||||
if vipsimage.pyvips:
|
||||
vips = vipsimage.new_from_memory(info.data, width, height, capsBands)
|
||||
image = vipsimage.VipsImage(vips)
|
||||
else:
|
||||
pil = pilimage.new_from_memory(info.data, width, height, capsBands)
|
||||
image = pilimage.PILImage(pil)
|
||||
|
||||
try:
|
||||
if not self.postProcessPipeline:
|
||||
return await image.toBuffer(copyOptions)
|
||||
else:
|
||||
return await image.toBuffer({
|
||||
'format': options and options.get('format'),
|
||||
})
|
||||
finally:
|
||||
await image.close()
|
||||
finally:
|
||||
gst_buffer.unmap(info)
|
||||
|
||||
async def createResamplerPipeline(sample, gst: GstSession, options: scrypted_sdk.ImageOptions, postProcessPipeline: str):
|
||||
if not sample:
|
||||
raise Exception('Video Frame has been invalidated')
|
||||
|
||||
resize = None
|
||||
if options:
|
||||
resize = options.get('resize')
|
||||
if resize:
|
||||
resize = (resize.get('width'), resize.get('height'))
|
||||
|
||||
for check in gst.reuse:
|
||||
if check.resize == resize:
|
||||
gst.reuse.remove(check)
|
||||
return check
|
||||
|
||||
if postProcessPipeline == 'VAAPI':
|
||||
pp = VaapiPostProcess()
|
||||
elif postProcessPipeline == 'OpenGL (GPU memory)':
|
||||
pp = AppleMediaPostProcess()
|
||||
elif postProcessPipeline == 'OpenGL (system memory)':
|
||||
pp = AppleMediaPostProcess()
|
||||
else:
|
||||
# trap the pipeline before it gets here. videocrop
|
||||
# in the pipeline seems to spam the stdout??
|
||||
# use the legacy vips/pil post process.
|
||||
pp = GstreamerPostProcess()
|
||||
|
||||
caps = sample.get_caps()
|
||||
|
||||
srcCaps = caps.to_string().replace(' ', '')
|
||||
pipeline = f"appsrc name=appsrc emit-signals=True is-live=True caps={srcCaps}"
|
||||
await pp.create(gst.gst, pipeline)
|
||||
pp.resize = resize
|
||||
|
||||
return pp
|
||||
|
||||
async def toGstSample(gst: GstSession, sample, options: scrypted_sdk.ImageOptions, postProcessPipeline: str) -> GstImage:
|
||||
if not sample:
|
||||
raise Exception('Video Frame has been invalidated')
|
||||
if not options:
|
||||
return sample
|
||||
|
||||
crop = options.get('crop')
|
||||
resize = options.get('resize')
|
||||
format = options.get('format')
|
||||
|
||||
caps = sample.get_caps()
|
||||
sampleWidth = caps.get_structure(0).get_value('width')
|
||||
sampleHeight = caps.get_structure(0).get_value('height')
|
||||
capsFormat = caps.get_structure(0).get_value('format')
|
||||
|
||||
# normalize format, eliminating it if possible
|
||||
if format == 'jpg':
|
||||
# get into a format suitable to be be handled by vips/pil
|
||||
if capsFormat == 'RGB' or capsFormat == 'RGBA':
|
||||
format = None
|
||||
else:
|
||||
format = 'RGBA'
|
||||
elif format == 'rgb':
|
||||
if capsFormat == 'RGB':
|
||||
format = None
|
||||
else:
|
||||
format = 'RGB'
|
||||
elif format == 'rgba':
|
||||
if capsFormat == 'RGBA':
|
||||
format = None
|
||||
else:
|
||||
format = 'RGBA'
|
||||
elif format == 'gray':
|
||||
# are there others? does the output format depend on GPU?
|
||||
# have only ever seen NV12
|
||||
if capsFormat == 'NV12' or capsFormat == 'I420':
|
||||
format = None
|
||||
else:
|
||||
format = 'NV12'
|
||||
elif format:
|
||||
raise Exception(f'invalid output format {format}')
|
||||
|
||||
if not crop and not resize and not format:
|
||||
return sample
|
||||
|
||||
pp = await createResamplerPipeline(sample, gst, options, postProcessPipeline)
|
||||
try:
|
||||
pp.update(caps, (sampleWidth, sampleHeight), options, format)
|
||||
|
||||
appsrc = pp.gst.get_by_name('appsrc')
|
||||
srcCaps = caps.to_string().replace(' ', '')
|
||||
appsrc.set_property('caps', caps.from_string(srcCaps))
|
||||
|
||||
appsrc.emit("push-sample", sample)
|
||||
|
||||
newSample = await pp.g.__anext__()
|
||||
|
||||
gst.reuse.append(pp)
|
||||
except:
|
||||
await pp.g.aclose()
|
||||
raise
|
||||
|
||||
return newSample
|
||||
|
||||
async def createGstMediaObject(image: GstImage):
|
||||
ret = await scrypted_sdk.mediaManager.createMediaObject(image, scrypted_sdk.ScryptedMimeTypes.Image.value, {
|
||||
'format': None,
|
||||
'width': image.width,
|
||||
'height': image.height,
|
||||
'toBuffer': lambda options = None: image.toBuffer(options),
|
||||
'toImage': lambda options = None: image.toImage(options),
|
||||
})
|
||||
return ret
|
||||
|
||||
async def generateVideoFramesGstreamer(mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None, h264Decoder: str = None, postProcessPipeline: str = None) -> scrypted_sdk.VideoFrame:
|
||||
ffmpegInput: scrypted_sdk.FFmpegInput = await scrypted_sdk.mediaManager.convertMediaObjectToJSON(mediaObject, scrypted_sdk.ScryptedMimeTypes.FFmpegInput.value)
|
||||
container = ffmpegInput.get('container', None)
|
||||
videosrc = ffmpegInput.get('url')
|
||||
pipeline = ffmpegInput.get('url')
|
||||
videoCodec = optional_chain(ffmpegInput, 'mediaStreamOptions', 'video', 'codec')
|
||||
|
||||
if videosrc.startswith('tcp://'):
|
||||
parsed_url = urlparse(videosrc)
|
||||
videosrc = 'tcpclientsrc port=%s host=%s' % (
|
||||
if pipeline.startswith('tcp://'):
|
||||
parsed_url = urlparse(pipeline)
|
||||
pipeline = 'tcpclientsrc port=%s host=%s' % (
|
||||
parsed_url.port, parsed_url.hostname)
|
||||
if container == 'mpegts':
|
||||
videosrc += ' ! tsdemux'
|
||||
pipeline += ' ! tsdemux'
|
||||
elif container == 'sdp':
|
||||
videosrc += ' ! sdpdemux'
|
||||
pipeline += ' ! sdpdemux'
|
||||
else:
|
||||
raise Exception('unknown container %s' % container)
|
||||
elif videosrc.startswith('rtsp'):
|
||||
videosrc = 'rtspsrc buffer-mode=0 location=%s protocols=tcp latency=0' % videosrc
|
||||
elif pipeline.startswith('rtsp'):
|
||||
pipeline = 'rtspsrc buffer-mode=0 location=%s protocols=tcp latency=0' % pipeline
|
||||
if videoCodec == 'h264':
|
||||
videosrc += ' ! rtph264depay ! h264parse'
|
||||
|
||||
videocaps = 'video/x-raw'
|
||||
# if options and options.get('resize'):
|
||||
# videocaps = 'videoscale ! video/x-raw,width={width},height={height}'.format(width=options['resize']['width'], height=options['resize']['height'])
|
||||
|
||||
format = options and options.get('format')
|
||||
# I420 is a cheap way to get gray out of an h264 stream without color conversion.
|
||||
if format == 'gray':
|
||||
format = 'I420'
|
||||
bands = 1
|
||||
else:
|
||||
format = 'RGB'
|
||||
bands = 3
|
||||
|
||||
videocaps += ',format={format}'.format(format=format)
|
||||
pipeline += ' ! rtph264depay ! h264parse'
|
||||
|
||||
decoder = None
|
||||
def setDecoderClearDefault(value: str):
|
||||
@@ -82,58 +317,31 @@ async def generateVideoFramesGstreamer(mediaObject: scrypted_sdk.MediaObject, op
|
||||
fps = options and options.get('fps', None)
|
||||
videorate = ''
|
||||
if fps:
|
||||
videorate = 'videorate ! '
|
||||
videocaps += ',framerate={fps}/1'.format(fps=fps)
|
||||
videorate = f'! videorate max-rate={fps}'
|
||||
|
||||
if decoder.find("{videocaps}") == -1:
|
||||
videosrc += ' ! {decoder} ! queue leaky=downstream max-size-buffers=0 ! videoconvert ! {videorate} {videocaps}'.format(decoder=decoder, videocaps=videocaps, videorate=videorate)
|
||||
if postProcessPipeline == 'VAAPI':
|
||||
pipeline += f' ! {decoder} {videorate} ! queue leaky=downstream max-size-buffers=0'
|
||||
elif postProcessPipeline == 'OpenGL (GPU memory)':
|
||||
pipeline += f' ! {decoder} {videorate} ! queue leaky=downstream max-size-buffers=0 ! glupload'
|
||||
elif postProcessPipeline == 'OpenGL (system memory)':
|
||||
pipeline += f' ! {decoder} {videorate} ! queue leaky=downstream max-size-buffers=0 ! video/x-raw ! glupload'
|
||||
else:
|
||||
if format == 'RGB':
|
||||
format = 'RGBA'
|
||||
bands = 4
|
||||
videocaps += 'A'
|
||||
d = decoder.replace('{videocaps}', '{videorate}{videocaps}'.format(videocaps=videocaps, videorate=videorate))
|
||||
videosrc += ' ! {decoder}'.format(decoder=d)
|
||||
pipeline += f' ! {decoder} ! video/x-raw {videorate} ! queue leaky=downstream max-size-buffers=0'
|
||||
# disable the gstreamer post process because videocrop spams the log
|
||||
# postProcessPipeline = 'Default'
|
||||
postProcessPipeline = None
|
||||
|
||||
gst, gen = await createPipelineIterator(videosrc)
|
||||
|
||||
vipsImage: vipsimage.VipsImage = None
|
||||
pilImage: pilimage.PILImage = None
|
||||
print(pipeline)
|
||||
mo: scrypted_sdk.MediaObject = None
|
||||
|
||||
gst, gen = await createPipelineIterator(pipeline)
|
||||
gstImage: GstImage = None
|
||||
session = GstSession(gst)
|
||||
async for gstsample in gen():
|
||||
caps = gstsample.get_caps()
|
||||
height = caps.get_structure(0).get_value('height')
|
||||
width = caps.get_structure(0).get_value('width')
|
||||
gst_buffer = gstsample.get_buffer()
|
||||
result, info = gst_buffer.map(Gst.MapFlags.READ)
|
||||
if not result:
|
||||
continue
|
||||
|
||||
if not mo:
|
||||
gstImage = GstImage(session, gstsample, postProcessPipeline)
|
||||
mo = await createImageMediaObject(gstImage)
|
||||
gstImage.sample = gstsample
|
||||
try:
|
||||
if vipsimage.pyvips:
|
||||
vips = vipsimage.new_from_memory(info.data, width, height, bands)
|
||||
|
||||
if not mo:
|
||||
vipsImage = vipsimage.VipsImage(vips)
|
||||
mo = await vipsimage.createVipsMediaObject(vipsImage)
|
||||
|
||||
vipsImage.vipsImage = vips
|
||||
try:
|
||||
yield createVideoFrame(mo)
|
||||
finally:
|
||||
await vipsImage.close()
|
||||
else:
|
||||
pil = pilimage.new_from_memory(info.data, width, height, bands)
|
||||
|
||||
if not mo:
|
||||
pilImage = pilimage.PILImage(pil)
|
||||
mo = await pilimage.createPILMediaObject(pilImage)
|
||||
|
||||
pilImage.pilImage = pil
|
||||
try:
|
||||
yield createVideoFrame(mo)
|
||||
finally:
|
||||
await pilImage.close()
|
||||
yield createVideoFrame(mo)
|
||||
finally:
|
||||
gst_buffer.unmap(info)
|
||||
await gstImage.close()
|
||||
|
||||
221
plugins/python-codecs/src/gstreamer_postprocess.py
Normal file
221
plugins/python-codecs/src/gstreamer_postprocess.py
Normal file
@@ -0,0 +1,221 @@
|
||||
import scrypted_sdk
|
||||
from typing import Tuple
|
||||
from gst_generator import createPipelineIterator
|
||||
|
||||
class GstreamerPostProcess():
|
||||
def __init__(self) -> None:
|
||||
self.postprocess = ' ! videoconvert ! videocrop name=videocrop ! videoscale ! capsfilter name=capsfilter'
|
||||
self.resize = None
|
||||
|
||||
async def create(self, gst, pipeline: str):
|
||||
gst, gen = await createPipelineIterator(pipeline + self.postprocess, gst)
|
||||
g = gen()
|
||||
self.gst = gst
|
||||
self.g = g
|
||||
self.videocrop = self.gst.get_by_name('videocrop')
|
||||
self.capsfilter = self.gst.get_by_name('capsfilter')
|
||||
|
||||
def update(self, caps, sampleSize: Tuple[int, int], options: scrypted_sdk.ImageOptions = None, format: str = None):
|
||||
sampleWidth, sampleHeight = sampleSize
|
||||
|
||||
crop = options.get('crop')
|
||||
resize = options.get('resize')
|
||||
|
||||
if crop:
|
||||
left = int(crop['left'])
|
||||
top = int(crop['top'])
|
||||
width = int(crop['width'])
|
||||
height = int(crop['height'])
|
||||
# right and bottom crop values are pixel distance from the corresponding edge,
|
||||
# not a bounding box
|
||||
right = sampleWidth - (left + width)
|
||||
bottom = sampleHeight - (top + height)
|
||||
else:
|
||||
left = 0
|
||||
top = 0
|
||||
right = 0
|
||||
bottom = 0
|
||||
|
||||
videocrop = self.videocrop
|
||||
videocrop.set_property('left', left)
|
||||
videocrop.set_property('top', top)
|
||||
videocrop.set_property('right', right)
|
||||
videocrop.set_property('bottom', bottom)
|
||||
|
||||
sinkCaps = "video/x-raw"
|
||||
if resize:
|
||||
width = resize.get('width')
|
||||
if width:
|
||||
xscale = resize['width'] / sampleWidth
|
||||
height = sampleHeight * xscale
|
||||
|
||||
height = resize.get('height')
|
||||
if height:
|
||||
yscale = resize['height'] / sampleHeight
|
||||
if not width:
|
||||
width = sampleWidth * yscale
|
||||
|
||||
width = int(width)
|
||||
height = int(height)
|
||||
|
||||
# pipeline += " ! videoscale"
|
||||
sinkCaps += f",width={width},height={height}"
|
||||
|
||||
if format:
|
||||
sinkCaps += f",format={format}"
|
||||
|
||||
self.capsfilter.set_property('caps', caps.from_string(sinkCaps))
|
||||
|
||||
class VaapiPostProcess():
|
||||
def __init__(self) -> None:
|
||||
self.postprocess = ' ! vaapipostproc name=vaapipostproc'
|
||||
self.resize = None
|
||||
|
||||
async def create(self, gst, pipeline: str):
|
||||
gst, gen = await createPipelineIterator(pipeline + self.postprocess, gst)
|
||||
g = gen()
|
||||
self.gst = gst
|
||||
self.g = g
|
||||
self.vaapipostproc = self.gst.get_by_name('vaapipostproc')
|
||||
|
||||
def update(self, caps, sampleSize: Tuple[int, int], options: scrypted_sdk.ImageOptions = None, format: str = None):
|
||||
sampleWidth, sampleHeight = sampleSize
|
||||
|
||||
crop = options.get('crop')
|
||||
resize = options.get('resize')
|
||||
|
||||
vaapipostproc = self.vaapipostproc
|
||||
|
||||
if resize:
|
||||
width = resize.get('width')
|
||||
if width:
|
||||
xscale = resize['width'] / sampleWidth
|
||||
height = sampleHeight * xscale
|
||||
|
||||
height = resize.get('height')
|
||||
if height:
|
||||
yscale = resize['height'] / sampleHeight
|
||||
if not width:
|
||||
width = sampleWidth * yscale
|
||||
|
||||
width = int(width)
|
||||
height = int(height)
|
||||
|
||||
outputWidth = width
|
||||
outputHeight = height
|
||||
else:
|
||||
outputWidth = 0
|
||||
outputHeight = 0
|
||||
|
||||
vaapipostproc.set_property('width', outputWidth)
|
||||
vaapipostproc.set_property('height', outputHeight)
|
||||
|
||||
if format:
|
||||
if format == 'RGB':
|
||||
format = 'RGBA'
|
||||
vaapipostproc.set_property('format', 11)
|
||||
|
||||
if False and crop:
|
||||
left = int(crop['left'])
|
||||
top = int(crop['top'])
|
||||
width = int(crop['width'])
|
||||
height = int(crop['height'])
|
||||
# right and bottom crop values are pixel distance from the corresponding edge,
|
||||
# not a bounding box
|
||||
right = sampleWidth - (left + width)
|
||||
bottom = sampleHeight - (top + height)
|
||||
else:
|
||||
left = 0
|
||||
top = 0
|
||||
right = 300
|
||||
bottom = 300
|
||||
|
||||
print(left, top, right, bottom)
|
||||
vaapipostproc.set_property('crop-left', left)
|
||||
vaapipostproc.set_property('crop-top', top)
|
||||
vaapipostproc.set_property('crop-right', right)
|
||||
vaapipostproc.set_property('crop-bottom', bottom)
|
||||
|
||||
class AppleMediaPostProcess():
|
||||
def __init__(self) -> None:
|
||||
self.postprocess = ' ! glcolorconvert ! gltransformation name=gltransformation ! glcolorscale ! capsfilter name=glCapsFilter caps="video/x-raw(memory:GLMemory),format=RGBA" ! gldownload'
|
||||
self.resize = None
|
||||
|
||||
async def create(self, gst, pipeline: str):
|
||||
gst, gen = await createPipelineIterator(pipeline + self.postprocess, gst)
|
||||
g = gen()
|
||||
self.gst = gst
|
||||
self.g = g
|
||||
# positions/scales the input into target texture
|
||||
self.gltransformation = self.gst.get_by_name('gltransformation')
|
||||
# sets the target texture size
|
||||
self.glCapsFilter = self.gst.get_by_name('glCapsFilter')
|
||||
# sets output format to something other than RGBA if necessary since gl can't handle non-RGBA
|
||||
# self.swCapsFilter = self.gst.get_by_name('swCapsFilter')
|
||||
|
||||
|
||||
def update(self, caps, sampleSize: Tuple[int, int], options: scrypted_sdk.ImageOptions = None, format: str = None):
|
||||
# print(options)
|
||||
sampleWidth, sampleHeight = sampleSize
|
||||
|
||||
crop = options.get('crop')
|
||||
resize = options.get('resize')
|
||||
|
||||
glCaps = "video/x-raw(memory:GLMemory),format=RGBA"
|
||||
if resize:
|
||||
width = resize.get('width')
|
||||
if width:
|
||||
xscale = resize['width'] / sampleWidth
|
||||
height = sampleHeight * xscale
|
||||
|
||||
height = resize.get('height')
|
||||
if height:
|
||||
yscale = resize['height'] / sampleHeight
|
||||
if not width:
|
||||
width = sampleWidth * yscale
|
||||
|
||||
width = int(width)
|
||||
height = int(height)
|
||||
|
||||
# pipeline += " ! videoscale"
|
||||
glCaps += f",width={width},height={height}"
|
||||
|
||||
self.glCapsFilter.set_property('caps', caps.from_string(glCaps))
|
||||
|
||||
if crop:
|
||||
left = int(crop['left'])
|
||||
top = int(crop['top'])
|
||||
width = int(crop['width'])
|
||||
height = int(crop['height'])
|
||||
|
||||
scaleX = sampleWidth / width
|
||||
scaleY = sampleHeight / height
|
||||
|
||||
# the default scale origin is the center.
|
||||
newCenterX = left + width / 2
|
||||
newCenterY = top + height / 2
|
||||
curCenterX = sampleWidth / 2
|
||||
curCenterY = sampleHeight / 2
|
||||
diffX = curCenterX - newCenterX
|
||||
diffY = curCenterY - newCenterY
|
||||
translationX = diffX / width
|
||||
translationY = diffY / height
|
||||
else:
|
||||
scaleX = 1
|
||||
scaleY = 1
|
||||
translationX = 0
|
||||
translationY = 0
|
||||
|
||||
gltransformation = self.gltransformation
|
||||
gltransformation.set_property('scale-x', scaleX)
|
||||
gltransformation.set_property('scale-y', scaleY)
|
||||
gltransformation.set_property('translation-x', translationX)
|
||||
gltransformation.set_property('translation-y', translationY)
|
||||
|
||||
# if format and format != 'RGBA':
|
||||
# swcaps = f'video/x-raw,format={format}'
|
||||
# print(swcaps)
|
||||
# self.swCapsFilter.set_property('caps', caps.from_string(swcaps))
|
||||
# else:
|
||||
# self.swCapsFilter.set_property('caps', 'video/x-raw')
|
||||
# print('nc')
|
||||
@@ -3,7 +3,7 @@ import scrypted_sdk
|
||||
from typing import Any
|
||||
import vipsimage
|
||||
import pilimage
|
||||
from generator_common import createVideoFrame
|
||||
from generator_common import createVideoFrame, createImageMediaObject
|
||||
|
||||
av = None
|
||||
try:
|
||||
@@ -54,7 +54,7 @@ async def generateVideoFramesLibav(mediaObject: scrypted_sdk.MediaObject, option
|
||||
|
||||
if not mo:
|
||||
vipsImage = vipsimage.VipsImage(vips)
|
||||
mo = await vipsimage.createVipsMediaObject(vipsImage)
|
||||
mo = await createImageMediaObject(vipsImage)
|
||||
|
||||
vipsImage.vipsImage = vips
|
||||
try:
|
||||
@@ -75,7 +75,7 @@ async def generateVideoFramesLibav(mediaObject: scrypted_sdk.MediaObject, option
|
||||
|
||||
if not mo:
|
||||
pilImage = pilimage.PILImage(pil)
|
||||
mo = await pilimage.createPILMediaObject(pilImage)
|
||||
mo = await createImageMediaObject(pilImage)
|
||||
|
||||
pilImage.pilImage = pil
|
||||
try:
|
||||
|
||||
@@ -31,7 +31,7 @@ class GstreamerGenerator(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.VideoFram
|
||||
async def generateVideoFrames(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
|
||||
worker = scrypted_sdk.fork()
|
||||
forked: CodecFork = await worker.result
|
||||
return await forked.generateVideoFramesGstreamer(mediaObject, options, filter, self.storage.getItem('h264Decoder'))
|
||||
return await forked.generateVideoFramesGstreamer(mediaObject, options, filter, self.storage.getItem('h264Decoder'), self.storage.getItem('postProcessPipeline'))
|
||||
|
||||
async def getSettings(self) -> List[Setting]:
|
||||
return [
|
||||
@@ -46,9 +46,20 @@ class GstreamerGenerator(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.VideoFram
|
||||
'vtdec_hw',
|
||||
'nvh264dec',
|
||||
'vaapih264dec',
|
||||
'vaapih264dec ! vaapipostproc ! {videocaps}',
|
||||
],
|
||||
'combobox': True,
|
||||
},
|
||||
{
|
||||
'key': 'postProcessPipeline',
|
||||
'title': 'Post Process Pipeline',
|
||||
'description': 'The Gstreamer pipeline to use to resize and scale frames.',
|
||||
'value': self.storage.getItem('postProcessPipeline') or 'Default',
|
||||
'choices': [
|
||||
'Default',
|
||||
'OpenGL (GPU memory)',
|
||||
'OpenGL (system memory)',
|
||||
'VAAPI',
|
||||
],
|
||||
}
|
||||
]
|
||||
|
||||
@@ -138,10 +149,10 @@ def multiprocess_exit():
|
||||
os._exit(os.EX_OK)
|
||||
|
||||
class CodecFork:
|
||||
async def generateVideoFramesGstreamer(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None, h264Decoder: str = None) -> scrypted_sdk.VideoFrame:
|
||||
async def generateVideoFramesGstreamer(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions, filter: Any, h264Decoder: str, postProcessPipeline: str) -> scrypted_sdk.VideoFrame:
|
||||
start = time.time()
|
||||
try:
|
||||
async for data in gstreamer.generateVideoFramesGstreamer(mediaObject, options, filter, h264Decoder):
|
||||
async for data in gstreamer.generateVideoFramesGstreamer(mediaObject, options, filter, h264Decoder, postProcessPipeline):
|
||||
yield data
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
|
||||
@@ -2,6 +2,7 @@ import scrypted_sdk
|
||||
from typing import Any
|
||||
from thread import to_thread
|
||||
import io
|
||||
from generator_common import createImageMediaObject
|
||||
|
||||
try:
|
||||
from PIL import Image
|
||||
@@ -66,7 +67,7 @@ class PILImage(scrypted_sdk.Image):
|
||||
if options and options.get('format', None):
|
||||
raise Exception('format can only be used with toBuffer')
|
||||
newPILImage = await self.toPILImage(options)
|
||||
return await createPILMediaObject(newPILImage)
|
||||
return await createImageMediaObject(newPILImage)
|
||||
|
||||
def toPILImage(pilImageWrapper: PILImage, options: scrypted_sdk.ImageOptions = None) -> PILImage:
|
||||
pilImage = pilImageWrapper.pilImage
|
||||
@@ -94,16 +95,6 @@ def toPILImage(pilImageWrapper: PILImage, options: scrypted_sdk.ImageOptions = N
|
||||
|
||||
return PILImage(pilImage)
|
||||
|
||||
async def createPILMediaObject(image: PILImage):
|
||||
ret = await scrypted_sdk.mediaManager.createMediaObject(image, scrypted_sdk.ScryptedMimeTypes.Image.value, {
|
||||
'format': None,
|
||||
'width': image.width,
|
||||
'height': image.height,
|
||||
'toBuffer': lambda options = None: image.toBuffer(options),
|
||||
'toImage': lambda options = None: image.toImage(options),
|
||||
})
|
||||
return ret
|
||||
|
||||
class ImageReader(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.BufferConverter):
|
||||
def __init__(self, nativeId: str):
|
||||
super().__init__(nativeId)
|
||||
@@ -114,7 +105,7 @@ class ImageReader(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.BufferConverter)
|
||||
async def convert(self, data: Any, fromMimeType: str, toMimeType: str, options: scrypted_sdk.MediaObjectOptions = None) -> Any:
|
||||
pil = Image.open(io.BytesIO(data))
|
||||
pil.load()
|
||||
return await createPILMediaObject(PILImage(pil))
|
||||
return await createImageMediaObject(PILImage(pil))
|
||||
|
||||
class ImageWriter(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.BufferConverter):
|
||||
def __init__(self, nativeId: str):
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import scrypted_sdk
|
||||
import asyncio
|
||||
from generator_common import createImageMediaObject
|
||||
|
||||
from typing import Any
|
||||
try:
|
||||
import pyvips
|
||||
@@ -68,7 +69,7 @@ class VipsImage(scrypted_sdk.Image):
|
||||
if options and options.get('format', None):
|
||||
raise Exception('format can only be used with toBuffer')
|
||||
newVipsImage = await self.toVipsImage(options)
|
||||
return await createVipsMediaObject(newVipsImage)
|
||||
return await createImageMediaObject(newVipsImage)
|
||||
|
||||
def toVipsImage(vipsImageWrapper: VipsImage, options: scrypted_sdk.ImageOptions = None) -> VipsImage:
|
||||
vipsImage = vipsImageWrapper.vipsImage
|
||||
@@ -99,16 +100,6 @@ def toVipsImage(vipsImageWrapper: VipsImage, options: scrypted_sdk.ImageOptions
|
||||
|
||||
return VipsImage(vipsImage)
|
||||
|
||||
async def createVipsMediaObject(image: VipsImage):
|
||||
ret = await scrypted_sdk.mediaManager.createMediaObject(image, scrypted_sdk.ScryptedMimeTypes.Image.value, {
|
||||
'format': None,
|
||||
'width': image.width,
|
||||
'height': image.height,
|
||||
'toBuffer': lambda options = None: image.toBuffer(options),
|
||||
'toImage': lambda options = None: image.toImage(options),
|
||||
})
|
||||
return ret
|
||||
|
||||
class ImageReader(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.BufferConverter):
|
||||
def __init__(self, nativeId: str):
|
||||
super().__init__(nativeId)
|
||||
@@ -118,7 +109,7 @@ class ImageReader(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.BufferConverter)
|
||||
|
||||
async def convert(self, data: Any, fromMimeType: str, toMimeType: str, options: scrypted_sdk.MediaObjectOptions = None) -> Any:
|
||||
vips = Image.new_from_buffer(data, '')
|
||||
return await createVipsMediaObject(VipsImage(vips))
|
||||
return await createImageMediaObject(VipsImage(vips))
|
||||
|
||||
class ImageWriter(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.BufferConverter):
|
||||
def __init__(self, nativeId: str):
|
||||
@@ -134,3 +125,6 @@ class ImageWriter(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.BufferConverter)
|
||||
|
||||
def new_from_memory(data, width: int, height: int, bands: int):
|
||||
return Image.new_from_memory(data, width, height, bands, pyvips.BandFormat.UCHAR)
|
||||
|
||||
def new_from_buffer(data, width: int, height: int, bands: int):
|
||||
return Image.new_from_buffer(data, width, height, bands, pyvips.BandFormat.UCHAR)
|
||||
|
||||
Reference in New Issue
Block a user