predict: fix text skews

This commit is contained in:
Koushik Dutta
2024-04-22 20:50:52 -07:00
parent ebe19532fc
commit ff2d1d5f97
7 changed files with 205 additions and 24 deletions

View File

@@ -37,3 +37,9 @@ class CoreMLTextRecognition(TextRecognition):
out_dict = model.predict({inputName: input})
results = list(out_dict.values())[0]
return results
def predictTextModel(self, input):
model, inputName = self.textModel
out_dict = model.predict({inputName: input})
preds = out_dict["linear_2"]
return preds

View File

@@ -8,7 +8,17 @@ from common.softmax import softmax
from common.colors import ensureRGBData
import math
async def crop_text(d: ObjectDetectionResult, image: scrypted_sdk.Image, width: int, height: int):
def skew_image(image: Image, skew_angle_rad: float):
skew_matrix = [1, 0, 0, skew_angle_rad, 1, 0]
# Apply the transformation
skewed_image = image.transform(
image.size, Image.AFFINE, skew_matrix, resample=Image.BICUBIC
)
return skewed_image
async def crop_text(d: ObjectDetectionResult, image: scrypted_sdk.Image):
l, t, w, h = d["boundingBox"]
l = math.floor(l)
t = math.floor(t)
@@ -27,14 +37,30 @@ async def crop_text(d: ObjectDetectionResult, image: scrypted_sdk.Image, width:
}
)
pilImage = await ensureRGBData(cropped, (w, h), format)
resized = pilImage.resize((width, height), resample=Image.LANCZOS).convert("L")
pilImage.close()
return resized
return pilImage
def calculate_y_change(original_height, skew_angle_radians):
# Calculate the change in y-position
y_change = original_height * math.tan(skew_angle_radians)
return y_change
async def prepare_text_result(d: ObjectDetectionResult, image: scrypted_sdk.Image, skew_angle: float):
textImage = await crop_text(d, image)
skew_height_change = calculate_y_change(d["boundingBox"][3], skew_angle)
skew_height_change = math.floor(skew_height_change)
textImage = skew_image(textImage, skew_angle)
# crop skew_height_change from top
if skew_height_change > 0:
textImage = textImage.crop((0, 0, textImage.width, textImage.height - skew_height_change))
elif skew_height_change < 0:
textImage = textImage.crop((0, -skew_height_change, textImage.width, textImage.height))
async def prepare_text_result(d: ObjectDetectionResult, image: scrypted_sdk.Image):
new_height = 64
new_width = int(d["boundingBox"][2] * new_height / d["boundingBox"][3])
textImage = await crop_text(d, image, new_width, new_height)
new_width = int(textImage.width * new_height / textImage.height)
textImage = textImage.resize((new_width, new_height), resample=Image.LANCZOS).convert("L")
new_width = 256
# calculate padding dimensions
padding = (0, 0, new_width - textImage.width, 0)
@@ -50,7 +76,6 @@ async def prepare_text_result(d: ObjectDetectionResult, image: scrypted_sdk.Imag
# test normalize contrast
# image_tensor = (image_tensor - np.min(image_tensor)) / (np.max(image_tensor) - np.min(image_tensor))
image_tensor = (image_tensor - 0.5) / 0.5
image_tensor = np.expand_dims(image_tensor, axis=0)

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import openvino.runtime as ov
import numpy as np
from predict.text_recognize import TextRecognition
@@ -34,3 +35,12 @@ class OpenVINOTextRecognition(TextRecognition):
infer_request.start_async()
infer_request.wait()
return infer_request.output_tensors[0].data
def predictTextModel(self, input):
input = input.astype(np.float32)
im = ov.Tensor(array=input)
infer_request = self.textModel.create_infer_request()
infer_request.set_input_tensor(im)
infer_request.start_async()
infer_request.wait()
return infer_request.output_tensors[0].data

View File

@@ -202,8 +202,8 @@ class RecognizeDetection(PredictPlugin):
for d in ret["detections"]:
if d["className"] == "face":
futures.append(asyncio.ensure_future(self.setEmbedding(d, image)))
elif d["className"] == "plate":
futures.append(asyncio.ensure_future(self.setLabel(d, image)))
# elif d["className"] == "plate":
# futures.append(asyncio.ensure_future(self.setLabel(d, image)))
# elif d['className'] == 'text':
# futures.append(asyncio.ensure_future(self.setLabel(d, image)))

View File

@@ -2,20 +2,26 @@ from __future__ import annotations
import asyncio
import concurrent.futures
import traceback
from asyncio import Future
from typing import Any, List, Tuple
import numpy as np
import scrypted_sdk
from PIL import Image
from scrypted_sdk import ObjectDetectionResult, ObjectDetectionSession, ObjectsDetected
from common.text import prepare_text_result, process_text_result
from predict import Prediction, PredictPlugin
from predict.craft_utils import normalizeMeanVariance
from predict.rectangle import Rectangle
from .craft_utils import adjustResultCoordinates, getDetBoxes
from predict.text_skew import find_adjacent_groups
predictExecutor = concurrent.futures.ThreadPoolExecutor(1, "TextDetect")
class TextRecognition(PredictPlugin):
def __init__(self, nativeId: str | None = None):
super().__init__(nativeId=nativeId)
@@ -30,7 +36,7 @@ class TextRecognition(PredictPlugin):
self.minThreshold = 0.1
self.detectModel = self.downloadModel("craft")
self.textModel = self.downloadModel("vgg_english_g2")
def downloadModel(self, model: str):
pass
@@ -38,7 +44,12 @@ class TextRecognition(PredictPlugin):
def predictDetectModel(self, input):
pass
async def detect_once(self, input: Image.Image, settings: Any, src_size, cvss) -> scrypted_sdk.ObjectsDetected:
def predictTextModel(self, input):
pass
async def detect_once(
self, input: Image.Image, settings: Any, src_size, cvss
) -> scrypted_sdk.ObjectsDetected:
image_tensor = normalizeMeanVariance(np.array(input))
# reshape to c w h
image_tensor = image_tensor.transpose([2, 0, 1])
@@ -51,9 +62,9 @@ class TextRecognition(PredictPlugin):
estimate_num_chars = False
ratio_h = ratio_w = 1
text_threshold = .7
link_threshold = .7
low_text = .4
text_threshold = 0.4
link_threshold = 0.7
low_text = 0.4
poly = False
boxes_list, polys_list = [], []
@@ -64,7 +75,14 @@ class TextRecognition(PredictPlugin):
# Post-processing
boxes, polys, mapper = getDetBoxes(
score_text, score_link, text_threshold, link_threshold, low_text, poly, estimate_num_chars)
score_text,
score_link,
text_threshold,
link_threshold,
low_text,
poly,
estimate_num_chars,
)
if not len(boxes):
continue
@@ -86,16 +104,60 @@ class TextRecognition(PredictPlugin):
for boxes in boxes_list:
for box in boxes:
tl, tr, br, bl = box
l = tl[0]
t = tl[1]
r = br[0]
b = br[1]
l = min(tl[0], bl[0])
t = min(tl[1], tr[1])
r = max(tr[0], br[0])
b = max(bl[1], br[1])
pred = Prediction(0, 1, Rectangle(l, t, r, b))
preds.append(pred)
return self.create_detection_result(preds, src_size, cvss)
async def run_detection_image(
self, image: scrypted_sdk.Image, detection_session: ObjectDetectionSession
) -> ObjectsDetected:
ret = await super().run_detection_image(image, detection_session)
detections = ret["detections"]
futures: List[Future] = []
boundingBoxes = [d["boundingBox"] for d in detections]
text_groups = find_adjacent_groups(boundingBoxes)
detections = []
for group in text_groups:
boundingBox = group["union"]
d: ObjectDetectionResult = {
"boundingBox": boundingBox,
"score": 1,
"className": "text",
}
futures.append(asyncio.ensure_future(self.setLabel(d, image, group["skew_angle"])))
detections.append(d)
ret["detections"] = detections
if len(futures):
await asyncio.wait(futures)
return ret
async def setLabel(self, d: ObjectDetectionResult, image: scrypted_sdk.Image, skew_angle: float):
try:
image_tensor = await prepare_text_result(d, image, skew_angle)
preds = await asyncio.get_event_loop().run_in_executor(
predictExecutor,
lambda: self.predictTextModel(image_tensor),
)
d["label"] = process_text_result(preds)
except Exception as e:
traceback.print_exc()
pass
# width, height, channels
def get_input_details(self) -> Tuple[int, int, int]:
return (self.inputwidth, self.inputheight, 3)
@@ -104,4 +166,4 @@ class TextRecognition(PredictPlugin):
return (self.inputwidth, self.inputheight)
def get_input_format(self) -> str:
return "rgb"
return "rgb"

View File

@@ -0,0 +1,78 @@
from typing import List, Tuple
import math
BoundingBox = Tuple[int, int, int, int]
def union_boxes(boxes: List[BoundingBox]) -> BoundingBox:
left = min([box[0] for box in boxes])
top = min([box[1] for box in boxes])
right = max([box[0] + box[2] for box in boxes])
bottom = max([box[1] + box[3] for box in boxes])
return left, top, right - left, bottom - top
def are_boxes_adjacent(box1: BoundingBox, box2: BoundingBox):
l1, t1, w1, h1 = box1
l2, t2, w2, h2 = box2
line_slop = 2 / 3
if t1 > t2 + h2 * line_slop or t2 > t1 + h1 * line_slop:
return False
# Calculate the left and right edges of each box
left_edge_box1 = l1
right_edge_box1 = l1 + w1
left_edge_box2 = l2
right_edge_box2 = l2 + w2
# Determine the larger height between the two boxes
larger_height = max(h1, h2)
threshold = larger_height * 2
# Calculate the vertical distance between the boxes
distance = min(
abs(left_edge_box1 - right_edge_box2), abs(left_edge_box2 - right_edge_box1)
)
# Check if the boxes are adjacent along their left or right sides
if distance <= threshold:
return True
else:
return False
def find_adjacent_groups(boxes: List[BoundingBox]) -> List[dict]:
groups = []
# sort boxes left to right
boxes = sorted(boxes, key=lambda box: box[0])
for box in boxes:
added_to_group = False
for group in groups:
for other_box in group["boxes"]:
if are_boxes_adjacent(box, other_box):
group["boxes"].append(box)
added_to_group = True
break
if added_to_group:
break
if not added_to_group:
groups.append({"boxes": [box], "skew_angle": 0})
# Calculate the skew angle of each group
for group in groups:
boxes = group["boxes"]
sum_angle = 0
for i in range(len(boxes) - 1):
x1, y1, w1, h1 = boxes[i]
x2, y2, w2, h2 = boxes[i + 1]
dx = x2 - x1
dy = y2 - y1
sum_angle += math.atan2(dy, dx)
group["skew_angle"] = 0 if not len(boxes) - 1 else sum_angle / (len(boxes) - 1)
group["union"] = union_boxes(boxes)
return groups

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/server",
"version": "0.98.4",
"version": "0.100.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@scrypted/server",
"version": "0.98.4",
"version": "0.100.0",
"hasInstallScript": true,
"license": "ISC",
"dependencies": {