openvino: wip segmentation

This commit is contained in:
Koushik Dutta
2026-01-17 12:16:55 -08:00
parent a4d28791ed
commit 961cb36a97
7 changed files with 633 additions and 16 deletions

View File

@@ -0,0 +1,82 @@
COCO_LABELS = {
0: "person",
1: "bicycle",
2: "car",
3: "motorcycle",
4: "airplane",
5: "bus",
6: "train",
7: "truck",
8: "boat",
9: "traffic light",
10: "fire hydrant",
11: "stop sign",
12: "parking meter",
13: "bench",
14: "bird",
15: "cat",
16: "dog",
17: "horse",
18: "sheep",
19: "cow",
20: "elephant",
21: "bear",
22: "zebra",
23: "giraffe",
24: "backpack",
25: "umbrella",
26: "handbag",
27: "tie",
28: "suitcase",
29: "frisbee",
30: "skis",
31: "snowboard",
32: "sports ball",
33: "kite",
34: "baseball bat",
35: "baseball glove",
36: "skateboard",
37: "surfboard",
38: "tennis racket",
39: "bottle",
40: "wine glass",
41: "cup",
42: "fork",
43: "knife",
44: "spoon",
45: "bowl",
46: "banana",
47: "apple",
48: "sandwich",
49: "orange",
50: "broccoli",
51: "carrot",
52: "hot dog",
53: "pizza",
54: "donut",
55: "cake",
56: "chair",
57: "couch",
58: "potted plant",
59: "bed",
60: "dining table",
61: "toilet",
62: "tv",
63: "laptop",
64: "mouse",
65: "remote",
66: "keyboard",
67: "cell phone",
68: "microwave",
69: "oven",
70: "toaster",
71: "sink",
72: "refrigerator",
73: "book",
74: "clock",
75: "vase",
76: "scissors",
77: "teddy bear",
78: "hair drier",
79: "toothbrush",
}

View File

@@ -0,0 +1,156 @@
"""
YOLOv9 Segmentation Parser - Numpy Implementation
This module provides pure numpy implementations of mask processing functions
that are equivalent to their torch counterparts in utils/segment/general.py.
"""
import numpy as np
import cv2
def crop_mask_numpy(masks, boxes):
"""
Crop predicted masks by zeroing out everything not in the predicted bbox.
Numpy version of crop_mask.
Args:
masks: numpy array [n, h, w] - predicted masks
boxes: numpy array [n, 4] - bbox coords [x1, y1, x2, y2]
Returns:
numpy array [n, h, w] - cropped masks
"""
n, h, w = masks.shape
x1 = boxes[:, 0][:, None, None] # (n, 1, 1)
y1 = boxes[:, 1][:, None, None] # (n, 1, 1)
x2 = boxes[:, 2][:, None, None] # (n, 1, 1)
y2 = boxes[:, 3][:, None, None] # (n, 1, 1)
r = np.arange(w).reshape(1, 1, -1) # (1, 1, w)
c = np.arange(h).reshape(1, -1, 1) # (1, h, 1)
crop_region = (r >= x1) & (r < x2) & (c >= y1) & (c < y2)
return masks * crop_region
def _upsample_bilinear(masks, target_shape):
"""
Upsample masks bilinearly to target shape.
Matches PyTorch's F.interpolate(mode='bilinear', align_corners=False).
Args:
masks: numpy array [n, h, w]
target_shape: tuple (target_h, target_w)
Returns:
numpy array [n, target_h, target_w]
"""
masks_transposed = masks.transpose(1, 2, 0) # (h, w, n)
upsampled = cv2.resize(
masks_transposed.astype(np.float32),
(target_shape[1], target_shape[0]), # cv2 uses (width, height)
interpolation=cv2.INTER_LINEAR
)
return upsampled.transpose(2, 0, 1) # (n, h, w)
def process_mask_numpy(protos, masks_in, bboxes, shape, upsample=False):
"""
Process masks using numpy.
Numpy version of process_mask from utils/segment/general.py.
Args:
protos: numpy array or torch tensor [c, mh, mw] - prototype masks
masks_in: numpy array or torch tensor [n, c] - mask coefficients
bboxes: numpy array or torch tensor [n, 4] - bbox coords [x1, y1, x2, y2]
shape: tuple (ih, iw) - input image size (height, width)
upsample: bool - whether to upsample masks to image size
Returns:
numpy array [n, ih, iw] (or [n, mh, mw] if upsample=False) - binary masks
"""
c, mh, mw = protos.shape # prototype: CHW
ih, iw = shape # input image: height, width
# Flatten protos for matrix multiplication: [c, mh, mw] -> [c, mh*mw]
protos_flat = protos.reshape(c, -1)
# Matrix multiplication: [n, c] @ [c, mh*mw] = [n, mh*mw]
masks_flat = masks_in @ protos_flat
# Apply sigmoid and reshape: [n, mh*mw] -> [n, mh, mw]
masks = (1 / (1 + np.exp(-masks_flat))).reshape(-1, mh, mw)
# Scale bboxes from image coordinates to mask coordinates
downsampled_bboxes = bboxes.copy()
downsampled_bboxes[:, 0] *= mw / iw # x1
downsampled_bboxes[:, 2] *= mw / iw # x2
downsampled_bboxes[:, 3] *= mh / ih # y2
downsampled_bboxes[:, 1] *= mh / ih # y1
# Crop masks to bounding boxes
masks = crop_mask_numpy(masks, downsampled_bboxes)
# Upsample to image size if requested
if upsample:
masks = _upsample_bilinear(masks, shape)
# Binarize masks with threshold 0.5
return (masks > 0.5)
def masks2segments_numpy(masks):
"""
Convert binary masks to segment contours (list of points).
Returns all contours for each mask (multiple polygons possible).
Args:
masks: numpy array [n, h, w] - binary masks (True/False or 0/1)
Returns:
List of lists of numpy arrays. Each inner list contains contours for one mask,
where each contour has shape [num_points, 2] containing contour points [x, y]
"""
segments = []
for mask in masks:
# Convert to uint8 for cv2
mask_uint8 = (mask * 255).astype(np.uint8)
# Find contours
contours, _ = cv2.findContours(
mask_uint8,
mode=cv2.RETR_EXTERNAL, # only outer contours
method=cv2.CHAIN_APPROX_SIMPLE # simplified contours
)
mask_contours = []
for contour in contours:
# Squeeze to remove extra dimension and convert to [x, y] format
contour = contour.squeeze().astype(np.float32)
# cv2 returns [x, y], ensure shape is [n, 2]
if len(contour.shape) == 1:
contour = contour.reshape(1, -1)
mask_contours.append(contour)
# If no contours found, add empty list
segments.append(mask_contours if mask_contours else [np.array([], dtype=np.float32).reshape(0, 2)])
return segments
def masks2polygons_numpy(masks):
"""
Convert binary masks to polygon points for plotting.
Args:
masks: numpy array [n, h, w] - binary masks (True/False or 0/1)
Returns:
List of lists, each containing [x, y] coordinates as a flat list suitable for drawing
Format: [[[x1, y1], [x2, y2], ...], ...] or [[x1, y1, x2, y2, ...], ...]
"""
segments = masks2segments_numpy(masks)
# Convert to list of [x, y] pairs
return [segment.tolist() for segment in segments]

View File

@@ -9,6 +9,7 @@ import traceback
from typing import Any, Tuple
import numpy as np
from ov.segment import OpenVINOSegmentation
import scrypted_sdk
from PIL import Image
from scrypted_sdk.other import SettingValue
@@ -221,6 +222,7 @@ class OpenVINOPlugin(
self.faceDevice = None
self.textDevice = None
self.clipDevice = None
self.segmentDevice = None
if not self.forked:
asyncio.ensure_future(self.prepareRecognitionModels(), loop=self.loop)
@@ -335,6 +337,18 @@ class OpenVINOPlugin(
"name": "OpenVINO CLIP Embedding",
}
)
await scrypted_sdk.deviceManager.onDeviceDiscovered(
{
"nativeId": "segment",
"type": scrypted_sdk.ScryptedDeviceType.Builtin.value,
"interfaces": [
scrypted_sdk.ScryptedInterface.ClusterForkInterface.value,
scrypted_sdk.ScryptedInterface.ObjectDetection.value,
],
"name": "OpenVINO Segmentation",
}
)
except:
pass
@@ -348,6 +362,9 @@ class OpenVINOPlugin(
elif nativeId == "clipembedding":
self.clipDevice = self.clipDevice or OpenVINOClipEmbedding(self, nativeId)
return self.clipDevice
elif nativeId == "segment":
self.segmentDevice = self.segmentDevice or OpenVINOSegmentation(self, nativeId)
return self.segmentDevice
custom_model = self.custom_models.get(nativeId, None)
if custom_model:
return custom_model

View File

@@ -0,0 +1,270 @@
from __future__ import annotations
import asyncio
import os
import traceback
import numpy as np
from ov import async_infer
import openvino as ov
from predict.segment import Segmentation
from predict import Prediction
from predict.rectangle import Rectangle
from common import yolo
import time
from common import yolov9_seg
prepareExecutor, predictExecutor = async_infer.create_executors("Segment")
def xywh2xyxy(x):
"""Convert [x_center, y_center, width, height] to [x1, y1, x2, y2]"""
y = np.copy(x)
y[:, 0] = x[:, 0] - x[:, 2] / 2 # x1
y[:, 1] = x[:, 1] - x[:, 3] / 2 # y1
y[:, 2] = x[:, 0] + x[:, 2] / 2 # x2
y[:, 3] = x[:, 1] + x[:, 3] / 2 # y2
return y
def box_iou(box1, box2):
"""Calculate IoU between two sets of boxes"""
# box1 shape: (n, 4), box2 shape: (m, 4)
# Compute intersection areas
area1 = (box1[:, 2] - box1[:, 0]) * (box1[:, 3] - box1[:, 1])
area2 = (box2[:, 2] - box2[:, 0]) * (box2[:, 3] - box2[:, 1])
iou = np.zeros((len(box1), len(box2)), dtype=np.float32)
for i in range(len(box1)):
for j in range(len(box2)):
# Intersection
inter_x1 = np.maximum(box1[i, 0], box2[j, 0])
inter_y1 = np.maximum(box1[i, 1], box2[j, 1])
inter_x2 = np.minimum(box1[i, 2], box2[j, 2])
inter_y2 = np.minimum(box1[i, 3], box2[j, 3])
inter_w = np.maximum(0, inter_x2 - inter_x1)
inter_h = np.maximum(0, inter_y2 - inter_y1)
inter_area = inter_w * inter_h
# Union
union = area1[i] + area2[j] - inter_area
iou[i, j] = inter_area / union if union > 0 else 0
return iou
def nms(boxes, scores, iou_thres):
"""Non-Maximum Suppression implementation in NumPy"""
if len(boxes) == 0:
return np.array([], dtype=np.int32)
# Sort by scores in descending order
indices = np.argsort(-scores)
keep = []
while len(indices) > 0:
i = indices[0]
keep.append(i)
if len(indices) == 1:
break
# Calculate IoU between the current box and all remaining boxes
iou_scores = box_iou(boxes[indices[0:1]], boxes[indices[1:]])[0]
# Keep boxes with IoU below threshold
indices = indices[1:][iou_scores < iou_thres]
return np.array(keep, dtype=np.int32)
def non_max_suppression(
prediction,
conf_thres=0.25,
iou_thres=0.45,
classes=None,
agnostic=False,
multi_label=False,
labels=(),
max_det=300,
nm=0, # number of masks
):
"""Non-Maximum Suppression (NMS) on inference results to reject overlapping detections
Returns:
list of detections, on (n,6) tensor per image [xyxy, conf, cls]
"""
if isinstance(prediction, (list, tuple)): # YOLO model in validation model, output = (inference_out, loss_out)
prediction = prediction[0] # select only inference output
bs = prediction.shape[0] # batch size
nc = prediction.shape[1] - nm - 4 # number of classes
mi = 4 + nc # mask start index
xc = np.max(prediction[:, 4:mi], axis=1) > conf_thres # candidates
# Checks
assert 0 <= conf_thres <= 1, f'Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0'
assert 0 <= iou_thres <= 1, f'Invalid IoU {iou_thres}, valid values are between 0.0 and 1.0'
# Settings
# min_wh = 2 # (pixels) minimum box width and height
max_wh = 7680 # (pixels) maximum box width and height
max_nms = 30000 # maximum number of boxes into NMS()
time_limit = 2.5 + 0.05 * bs # seconds to quit after
redundant = True # require redundant detections
multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img)
merge = False # use merge-NMS
t = time.time()
output = [np.zeros((0, 6 + nm), dtype=np.float32)] * bs
for xi, pred_x in enumerate(prediction): # image index, image inference
# Apply constraints
# x[((x[:, 2:4] < min_wh) | (x[:, 2:4] > max_wh)).any(1), 4] = 0 # width-height
x = pred_x.T[xc[xi]] # confidence
# Cat apriori labels if autolabelling
if labels and len(labels[xi]):
lb = labels[xi]
v = np.zeros((len(lb), nc + nm + 5), dtype=x.dtype)
v[:, :4] = lb[:, 1:5] # box
v[np.arange(len(lb)), lb[:, 0].astype(int) + 4] = 1.0 # cls
x = np.concatenate((x, v), 0)
# If none remain process next image
if x.shape[0] == 0:
continue
# Detections matrix nx6 (xyxy, conf, cls)
box = x[:, :4]
cls = x[:, 4:4 + nc]
mask = x[:, 4 + nc:] if nm > 0 else np.zeros((x.shape[0], nm), dtype=x.dtype)
box = xywh2xyxy(box) # center_x, center_y, width, height) to (x1, y1, x2, y2)
if multi_label:
i, j = np.where(cls > conf_thres)
x = np.concatenate((box[i], x[i, 4 + j][:, None], j[:, None].astype(np.float32), mask[i]), 1)
else: # best class only
j = np.argmax(cls, axis=1, keepdims=True)
conf = cls[np.arange(len(cls)), j.flatten()][:, None]
x = np.concatenate((box, conf, j.astype(np.float32), mask), 1)[conf.flatten() > conf_thres]
# Filter by class
if classes is not None:
class_tensor = np.array(classes, dtype=np.float32)
mask = np.any(x[:, 5:6] == class_tensor, axis=1)
x = x[mask]
# Apply finite constraint
# if not np.isfinite(x).all():
# x = x[np.isfinite(x).all(1)]
# Check shape
n = x.shape[0] # number of boxes
if n == 0: # no boxes
continue
elif n > max_nms: # excess boxes
x = x[x[:, 4].argsort()[::-1][:max_nms]] # sort by confidence
else:
x = x[x[:, 4].argsort()[::-1]] # sort by confidence
# Batched NMS
c = x[:, 5:6] * (0 if agnostic else max_wh) # classes
boxes, scores = x[:, :4] + c, x[:, 4] # boxes (offset by class), scores
i = nms(boxes, scores, iou_thres) # NMS
if i.shape[0] > max_det: # limit detections
i = i[:max_det]
if merge and (1 < n < 3E3): # Merge NMS (boxes merged using weighted mean)
# update boxes as boxes(i,4) = weights(i,n) * boxes(n,4)
iou = box_iou(boxes[i], boxes) > iou_thres # iou matrix
weights = iou * scores[None] # box weights
x[i, :4] = np.dot(weights, x[:, :4]).astype(np.float32) / weights.sum(1, keepdims=True) # merged boxes
if redundant:
i = i[iou.sum(1) > 1] # require redundancy
output[xi] = x[i]
if (time.time() - t) > time_limit:
import warnings
warnings.warn(f'WARNING ⚠️ NMS time limit {time_limit:.3f}s exceeded')
break # time limit exceeded
return output
class OpenVINOSegmentation(Segmentation):
def __init__(self, plugin, nativeId: str):
super().__init__(plugin=plugin, nativeId=nativeId)
def loadModel(self, name):
name = name + "_int8"
model_path = self.downloadHuggingFaceModelLocalFallback(name)
ovmodel = "best-converted"
xmlFile = os.path.join(model_path, f"{ovmodel}.xml")
model = self.plugin.core.compile_model(xmlFile, self.plugin.mode)
return model
async def detect_once(self, input, settings, src_size, cvss):
def predict():
im = np.expand_dims(input, axis=0)
im = im.transpose((0, 3, 1, 2)) # BHWC to BCHW, (n, 3, h, w)
im = im.astype(np.float32) / 255.0
im = np.ascontiguousarray(im) # contiguous
infer_request = self.model.create_infer_request()
tensor = ov.Tensor(array=im)
infer_request.set_input_tensor(tensor)
output_tensors = infer_request.infer()
pred = output_tensors[0]
proto = output_tensors[1]
pred = non_max_suppression(pred, nm=32)
objs = []
for det in pred:
if not len(det):
continue
# Upsample masks to input image space (320x320)
masks = yolov9_seg.process_mask_numpy(proto.squeeze(0), det[:, 6:], det[:, :4], (320, 320), upsample=True)
# Convert masks to contour points
segments = yolov9_seg.masks2segments_numpy(masks)
# Create Prediction instances
for i in range(len(det)):
# Convert all contours for this detection to list of [x, y] tuples
mask_contours = segments[i]
clip_paths = []
for contour in mask_contours:
if len(contour) > 0 and contour.shape[1] == 2:
single_path = [(float(contour[j, 0]), float(contour[j, 1])) for j in range(len(contour))]
clip_paths.append(single_path)
prediction = Prediction(
id=int(det[i, 5]), # class_id
score=float(det[i, 4]), # confidence
bbox=Rectangle(
xmin=float(det[i, 0]), # x1
ymin=float(det[i, 1]), # y1
xmax=float(det[i, 2]), # x2
ymax=float(det[i, 3]), # y2
),
embedding=None, # no embedding for segmentation
clipPaths=clip_paths # list of polygon outlines [[[x, y], ...], ...] at 320x320
)
objs.append(prediction)
return objs
try:
objs = await asyncio.get_event_loop().run_in_executor(
predictExecutor, lambda: predict()
)
except:
traceback.print_exc()
raise
ret = self.create_detection_result(objs, src_size, cvss)
return ret

View File

@@ -38,7 +38,7 @@ def custom_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
socket.getaddrinfo = custom_getaddrinfo
class Prediction:
def __init__(self, id: int, score: float, bbox: Rectangle, embedding: str = None):
def __init__(self, id: int, score: float, bbox: Rectangle, embedding: str = None, clipPaths: List[List[Tuple[float, float]]] = None):
# these may be numpy values. sanitize them.
self.id = int(id)
self.score = float(score)
@@ -50,7 +50,7 @@ class Prediction:
float(bbox.ymax),
)
self.embedding = embedding
self.clipPaths = clipPaths
class PredictPlugin(DetectPlugin, scrypted_sdk.ClusterForkInterface, scrypted_sdk.ScryptedSystemDevice, scrypted_sdk.DeviceCreator, scrypted_sdk.DeviceProvider):
labels: dict
@@ -191,6 +191,8 @@ class PredictPlugin(DetectPlugin, scrypted_sdk.ClusterForkInterface, scrypted_sd
detection["score"] = obj.score
if hasattr(obj, "embedding") and obj.embedding is not None:
detection["embedding"] = obj.embedding
if hasattr(obj, "clipPaths") and obj.clipPaths is not None and len(obj.clipPaths) > 0:
detection["clipPaths"] = obj.clipPaths
detections.append(detection)
if convert_to_src_size:
@@ -204,6 +206,15 @@ class PredictPlugin(DetectPlugin, scrypted_sdk.ClusterForkInterface, scrypted_sd
if any(map(lambda x: not math.isfinite(x), detection["boundingBox"])):
print("unexpected nan detected", obj.bbox)
continue
# Transform clipPaths coordinates if present
if "clipPaths" in detection and detection["clipPaths"] is not None:
clip_paths = detection["clipPaths"]
# Convert each polygon (list of [x, y] tuples) to source size
transformed = [[
(convert_to_src_size((pt[0], pt[1]))[0], convert_to_src_size((pt[0], pt[1]))[1])
for pt in polygon
] for polygon in clip_paths]
detection["clipPaths"] = transformed
detection_result["detections"].append(detection)
# print(detection_result)
@@ -313,6 +324,44 @@ class PredictPlugin(DetectPlugin, scrypted_sdk.ClusterForkInterface, scrypted_sd
if image.ffmpegFormats != True:
format = image.format or "rgb"
if settings and settings.get("pad", False):
if iw / w > ih / h:
scale = w / iw
else:
scale = h / ih
nw = int(iw * scale)
nh = int(ih * scale)
resize = {
"width": nw,
"height": nh,
}
b = await image.toBuffer(
{
"resize": resize,
"format": format,
}
)
if self.get_input_format() == "rgb":
data = await common.colors.ensureRGBData(b, (nw, nh), format)
elif self.get_input_format() == "rgba":
data = await common.colors.ensureRGBAData(b, (nw, nh), format)
elif self.get_input_format() == "yuvj444p":
data = await common.colors.ensureYCbCrAData(b, (nw, nh), format)
else:
raise Exception("unsupported format")
# data is a PIL image and we need to pad it to w, h
new_image = Image.new(data.mode, (w, h))
paste_x = (w - nw) // 2
paste_y = (h - nh) // 2
new_image.paste(data, (paste_x, paste_y))
data.close()
data = new_image
else:
b = await image.toBuffer(
{
"resize": resize,

View File

@@ -65,7 +65,11 @@ class ClipEmbedding(PredictPlugin, scrypted_sdk.TextEmbedding, scrypted_sdk.Imag
pass
async def getImageEmbedding(self, input):
detections = await super().detectObjects(input, None)
detections = await super().detectObjects(input, {
"settings": {
"pad": True,
}
})
return detections["detections"][0]["embedding"]
async def detectObjects(self, mediaObject, session = None):

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
from typing import Tuple
from ov import async_infer
from predict import PredictPlugin
import asyncio
from common import coco
customDetectPrepare, customDetectPredict = async_infer.create_executors("CustomDetect")
class Segmentation(PredictPlugin):
def __init__(self, plugin, nativeId: str):
super().__init__(plugin=plugin, nativeId=nativeId)
self.inputwidth = 320
self.inputheight = 320
self.loop = asyncio.get_event_loop()
self.labels = coco.COCO_LABELS
try:
self.model = self.loadModel('yolov9c_seg')
except:
raise
def loadModel(self, name: str):
pass
# width, height, channels
def get_input_details(self) -> Tuple[int, int, int]:
return (self.inputwidth, self.inputheight, 3)
def get_input_size(self) -> Tuple[float, float]:
return (self.inputwidth, self.inputheight)
def get_input_format(self) -> str:
return "rgb"