openvino: working lpr and face recog;

This commit is contained in:
Koushik Dutta
2024-04-12 22:12:07 -07:00
parent 27a1c5269a
commit c1c9fec62f
6 changed files with 115 additions and 8 deletions

View File

@@ -64,7 +64,9 @@ class CoreMLRecognition(RecognizeDetection):
def predictTextModel(self, input):
model, inputName = self.textModel
return model.predict({inputName: input})
out_dict = model.predict({inputName: input})
preds = out_dict["linear_2"]
return preds
# def predictVision(self, input: Image.Image) -> asyncio.Future[list[Prediction]]:
# buffer = input.tobytes()

View File

@@ -33,6 +33,7 @@
"runtime": "python",
"type": "API",
"interfaces": [
"DeviceProvider",
"Settings",
"ObjectDetection",
"ObjectDetectionPreview"

View File

@@ -1,18 +1,21 @@
from __future__ import annotations
import asyncio
import json
import re
from typing import Any, Tuple
import numpy as np
import openvino.runtime as ov
import scrypted_sdk
from PIL import Image
from scrypted_sdk.other import SettingValue
from scrypted_sdk.types import Setting
from predict import PredictPlugin, Prediction, Rectangle
import numpy as np
import common.yolo as yolo
from predict import Prediction, PredictPlugin, Rectangle
from .recognition import OpenVINORecognition
availableModels = [
"Default",
@@ -66,7 +69,7 @@ def dump_device_properties(core):
class OpenVINOPlugin(
PredictPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Settings
PredictPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Settings, scrypted_sdk.DeviceProvider
):
def __init__(self, nativeId: str | None = None):
super().__init__(nativeId=nativeId)
@@ -81,6 +84,7 @@ class OpenVINOPlugin(
if mode == "Default":
mode = "AUTO"
mode = mode or "AUTO"
self.mode = mode
precision = self.storage.getItem("precision") or "Default"
if precision == "Default":
@@ -93,6 +97,8 @@ class OpenVINOPlugin(
else:
precision = "FP32"
self.precision = precision
model = self.storage.getItem("model") or "Default"
if model == "Default" or model not in availableModels:
if model != "Default":
@@ -110,11 +116,11 @@ class OpenVINOPlugin(
model_version = "v5"
xmlFile = self.downloadFile(
f"https://raw.githubusercontent.com/koush/openvino-models/main/{model}/{precision}/{ovmodel}.xml",
f"{model_version}/{precision}/{ovmodel}.xml",
f"{model_version}/{model}/{precision}/{ovmodel}.xml",
)
binFile = self.downloadFile(
f"https://raw.githubusercontent.com/koush/openvino-models/main/{model}/{precision}/{ovmodel}.bin",
f"{model_version}/{precision}/{ovmodel}.bin",
f"{model_version}/{model}/{precision}/{ovmodel}.bin",
)
if self.scrypted_model:
labelsFile = self.downloadFile(
@@ -159,6 +165,8 @@ class OpenVINOPlugin(
labels_contents = open(labelsFile, "r").read()
self.labels = parse_label_contents(labels_contents)
asyncio.ensure_future(self.prepareRecognitionModels(), loop=self.loop)
async def getSettings(self) -> list[Setting]:
mode = self.storage.getItem("mode") or "Default"
model = self.storage.getItem("model") or "Default"
@@ -304,3 +312,25 @@ class OpenVINOPlugin(
ret = self.create_detection_result(objs, src_size, cvss)
return ret
async def prepareRecognitionModels(self):
try:
await scrypted_sdk.deviceManager.onDevicesChanged(
{
"devices": [
{
"nativeId": "recognition",
"type": scrypted_sdk.ScryptedDeviceType.Builtin.value,
"interfaces": [
scrypted_sdk.ScryptedInterface.ObjectDetection.value,
],
"name": "OpenVINO Recognition",
}
]
}
)
except:
pass
async def getDevice(self, nativeId: str) -> Any:
return OpenVINORecognition(self, nativeId)

View File

@@ -0,0 +1,75 @@
from __future__ import annotations
import concurrent.futures
import openvino.runtime as ov
import numpy as np
from predict.recognize import RecognizeDetection
def euclidean_distance(arr1, arr2):
return np.linalg.norm(arr1 - arr2)
def cosine_similarity(vector_a, vector_b):
dot_product = np.dot(vector_a, vector_b)
norm_a = np.linalg.norm(vector_a)
norm_b = np.linalg.norm(vector_b)
similarity = dot_product / (norm_a * norm_b)
return similarity
predictExecutor = concurrent.futures.ThreadPoolExecutor(8, "Vision-Predict")
class OpenVINORecognition(RecognizeDetection):
def __init__(self, plugin, nativeId: str | None = None):
self.plugin = plugin
super().__init__(nativeId=nativeId)
def downloadModel(self, model: str):
ovmodel = "best"
precision = self.plugin.precision
model_version = "v5"
xmlFile = self.downloadFile(
f"https://raw.githubusercontent.com/koush/openvino-models/main/{model}/{precision}/{ovmodel}.xml",
f"{model_version}/{model}/{precision}/{ovmodel}.xml",
)
binFile = self.downloadFile(
f"https://raw.githubusercontent.com/koush/openvino-models/main/{model}/{precision}/{ovmodel}.bin",
f"{model_version}/{model}/{precision}/{ovmodel}.bin",
)
print(xmlFile, binFile)
return self.plugin.core.compile_model(xmlFile, self.plugin.mode)
def predictDetectModel(self, input):
infer_request = self.detectModel.create_infer_request()
im = np.stack([input])
im = im.transpose((0, 3, 1, 2)) # BHWC to BCHW, (n, 3, h, w)
im = im.astype(np.float32) / 255.0
im = np.ascontiguousarray(im) # contiguous
im = ov.Tensor(array=im)
input_tensor = im
infer_request.set_input_tensor(input_tensor)
infer_request.start_async()
infer_request.wait()
return infer_request.output_tensors[0].data[0]
def predictFaceModel(self, input):
im = ov.Tensor(array=input)
infer_request = self.faceModel.create_infer_request()
infer_request.set_input_tensor(im)
infer_request.start_async()
infer_request.wait()
return infer_request.output_tensors[0].data[0]
def predictTextModel(self, input):
input = input.astype(np.float32)
im = ov.Tensor(array=input)
infer_request = self.textModel.create_infer_request()
infer_request.set_input_tensor(im)
infer_request.start_async()
infer_request.wait()
return infer_request.output_tensors[0].data

View File

@@ -137,11 +137,10 @@ class RecognizeDetection(PredictPlugin):
async def setLabel(self, d: ObjectDetectionResult, image: scrypted_sdk.Image):
try:
image_tensor = await prepare_text_result(d, image)
out_dict = await asyncio.get_event_loop().run_in_executor(
preds = await asyncio.get_event_loop().run_in_executor(
predictExecutor,
lambda: self.predictTextModel(image_tensor),
)
preds = out_dict["linear_2"]
d['label'] = process_text_result(preds)
except Exception as e: