mirror of
https://github.com/koush/scrypted.git
synced 2026-02-13 18:32:56 +00:00
python-codecs: wip
This commit is contained in:
124
plugins/python-codecs/src/gstreamer.py
Normal file
124
plugins/python-codecs/src/gstreamer.py
Normal file
@@ -0,0 +1,124 @@
|
||||
import concurrent.futures
|
||||
import threading
|
||||
import asyncio
|
||||
from queue import Queue
|
||||
|
||||
try:
|
||||
import gi
|
||||
gi.require_version('Gst', '1.0')
|
||||
gi.require_version('GstBase', '1.0')
|
||||
|
||||
from gi.repository import GLib, GObject, Gst
|
||||
GObject.threads_init()
|
||||
Gst.init(None)
|
||||
except:
|
||||
pass
|
||||
|
||||
class Callback:
|
||||
def __init__(self, callback) -> None:
|
||||
self.loop = asyncio.get_event_loop()
|
||||
self.callback = callback
|
||||
|
||||
def createPipelineIterator(pipeline: str):
|
||||
pipeline = '{pipeline} ! appsink name=appsink emit-signals=true sync=false'.format(pipeline=pipeline)
|
||||
gst = Gst.parse_launch(pipeline)
|
||||
bus = gst.get_bus()
|
||||
|
||||
def on_bus_message(bus, message):
|
||||
t = str(message.type)
|
||||
print(t)
|
||||
if t == str(Gst.MessageType.EOS):
|
||||
finish()
|
||||
elif t == str(Gst.MessageType.WARNING):
|
||||
err, debug = message.parse_warning()
|
||||
print('Warning: %s: %s\n' % (err, debug))
|
||||
elif t == str(Gst.MessageType.ERROR):
|
||||
err, debug = message.parse_error()
|
||||
print('Error: %s: %s\n' % (err, debug))
|
||||
finish()
|
||||
|
||||
def stopGst():
|
||||
bus.remove_signal_watch()
|
||||
bus.disconnect(watchId)
|
||||
gst.set_state(Gst.State.NULL)
|
||||
|
||||
def finish():
|
||||
nonlocal hasFinished
|
||||
hasFinished = True
|
||||
callback = Callback(None)
|
||||
callbackQueue.put(callback)
|
||||
if not asyncFuture.done():
|
||||
asyncFuture.set_result(None)
|
||||
if not finished.done():
|
||||
finished.set_result(None)
|
||||
|
||||
watchId = bus.connect('message', on_bus_message)
|
||||
bus.add_signal_watch()
|
||||
|
||||
finished = concurrent.futures.Future()
|
||||
finished.add_done_callback(lambda _: stopGst())
|
||||
hasFinished = False
|
||||
|
||||
appsink = gst.get_by_name('appsink')
|
||||
callbackQueue = Queue()
|
||||
asyncFuture = asyncio.Future()
|
||||
|
||||
async def gen():
|
||||
try:
|
||||
while True:
|
||||
nonlocal asyncFuture
|
||||
asyncFuture = asyncio.Future()
|
||||
yieldFuture = asyncio.Future()
|
||||
async def asyncCallback(sample):
|
||||
asyncFuture.set_result(sample)
|
||||
await yieldFuture
|
||||
callbackQueue.put(Callback(asyncCallback))
|
||||
sample = await asyncFuture
|
||||
if not sample:
|
||||
break
|
||||
yield sample
|
||||
yieldFuture.set_result(None)
|
||||
finally:
|
||||
finish()
|
||||
|
||||
|
||||
def on_new_sample(sink, preroll):
|
||||
nonlocal hasFinished
|
||||
|
||||
sample = sink.emit('pull-preroll' if preroll else 'pull-sample')
|
||||
|
||||
callback: Callback = callbackQueue.get()
|
||||
if not callback.callback or hasFinished:
|
||||
hasFinished = True
|
||||
if callback.callback:
|
||||
asyncio.run_coroutine_threadsafe(callback.callback(None), loop = callback.loop)
|
||||
return Gst.FlowReturn.OK
|
||||
|
||||
future = asyncio.run_coroutine_threadsafe(callback.callback(sample), loop = callback.loop)
|
||||
future.result()
|
||||
return Gst.FlowReturn.OK
|
||||
|
||||
appsink.connect('new-preroll', on_new_sample, True)
|
||||
appsink.connect('new-sample', on_new_sample, False)
|
||||
|
||||
gst.set_state(Gst.State.PLAYING)
|
||||
return gst, gen
|
||||
|
||||
def mainThread():
|
||||
async def asyncMain():
|
||||
gst, gen = createPipelineIterator('rtspsrc location=rtsp://localhost:59668/18cc179a814fd5b3 ! rtph264depay ! h264parse ! vtdec_hw ! videoconvert ! video/x-raw')
|
||||
i = 0
|
||||
async for sample in gen():
|
||||
print('sample')
|
||||
i = i + 1
|
||||
if i == 10:
|
||||
break
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.ensure_future(asyncMain(), loop = loop)
|
||||
loop.run_forever()
|
||||
|
||||
if __name__ == "__main__":
|
||||
threading.Thread(target = mainThread).start()
|
||||
mainLoop = GLib.MainLoop()
|
||||
mainLoop.run()
|
||||
48
plugins/python-codecs/src/main.py
Normal file
48
plugins/python-codecs/src/main.py
Normal file
@@ -0,0 +1,48 @@
|
||||
import scrypted_sdk
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
def optional_chain(root, *keys):
|
||||
result = root
|
||||
for k in keys:
|
||||
if isinstance(result, dict):
|
||||
result = result.get(k, None)
|
||||
else:
|
||||
result = getattr(result, k, None)
|
||||
if result is None:
|
||||
break
|
||||
return result
|
||||
|
||||
class PythonCodecs(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.VideoFrameGenerator):
|
||||
def __init__(self, nativeId = None):
|
||||
super().__init__(nativeId)
|
||||
|
||||
async def generateVideoFrames(self, mediaObject: scrypted_sdk.MediaObject, options: scrypted_sdk.VideoFrameGeneratorOptions = None, filter: Any = None) -> scrypted_sdk.VideoFrame:
|
||||
ffmpegInput: scrypted_sdk.FFmpegInput = await scrypted_sdk.mediaManager.convertMediaObjectToJSON(mediaObject, scrypted_sdk.ScryptedMimeTypes.FFmpegInput.value)
|
||||
container = ffmpegInput.get('container', None)
|
||||
videosrc = ffmpegInput.get('url')
|
||||
videoCodec = optional_chain(ffmpegInput, 'mediaStreamOptions', 'video', 'codec')
|
||||
|
||||
if videosrc.startswith('tcp://'):
|
||||
parsed_url = urlparse(videosrc)
|
||||
videosrc = 'tcpclientsrc port=%s host=%s' % (
|
||||
parsed_url.port, parsed_url.hostname)
|
||||
if container == 'mpegts':
|
||||
videosrc += ' ! tsdemux'
|
||||
elif container == 'sdp':
|
||||
videosrc += ' ! sdpdemux'
|
||||
else:
|
||||
raise Exception('unknown container %s' % container)
|
||||
elif videosrc.startswith('rtsp'):
|
||||
videosrc = 'rtspsrc buffer-mode=0 location=%s protocols=tcp latency=0 is-live=false' % videosrc
|
||||
if videoCodec == 'h264':
|
||||
videosrc += ' ! rtph264depay ! h264parse'
|
||||
|
||||
try:
|
||||
while True:
|
||||
yield 1
|
||||
finally:
|
||||
print('done!')
|
||||
|
||||
def create_scrypted_plugin():
|
||||
return PythonCodecs()
|
||||
5
plugins/python-codecs/src/requirements.txt
Normal file
5
plugins/python-codecs/src/requirements.txt
Normal file
@@ -0,0 +1,5 @@
|
||||
# plugin
|
||||
PyGObject>=3.30.4; sys_platform != 'win32'
|
||||
# libav doesnt work on arm7
|
||||
av>=10.0.0; sys_platform != 'linux' or platform_machine == 'x86_64' or platform_machine == 'aarch64'
|
||||
pyvips
|
||||
Reference in New Issue
Block a user