dlib: wip

This commit is contained in:
Koushik Dutta
2023-02-14 19:53:27 -08:00
parent 7cc9a68fde
commit 1fe6cbf7ca
4 changed files with 199 additions and 21 deletions

BIN
plugins/dlib/fs/black.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

View File

@@ -33,6 +33,7 @@
"runtime": "python",
"type": "API",
"interfaces": [
"DeviceProvider",
"Settings",
"BufferConverter",
"ObjectDetection"

View File

@@ -11,10 +11,21 @@ from typing import Any, List, Tuple, Mapping
from scrypted_sdk.types import ObjectDetectionModel, ObjectDetectionResult, ObjectsDetected, Setting
from predict import PredictSession
import threading
import asyncio
from .unknown import UnknownPeople
import base64
import json
import random
import string
def random_string():
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(10))
MIME_TYPE = 'x-scrypted-dlib/x-raw-image'
class DlibPlugin(PredictPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Settings):
class DlibPlugin(PredictPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Settings, scrypted_sdk.DeviceProvider):
def __init__(self, nativeId: str | None = None):
super().__init__(MIME_TYPE, nativeId=nativeId)
@@ -22,8 +33,50 @@ class DlibPlugin(PredictPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Setti
0: 'face'
}
self.known_faces = []
self.mutex = threading.Lock()
self.known_faces = {}
self.encoded_faces = {}
asyncio.ensure_future(self.load_known_faces())
asyncio.ensure_future(scrypted_sdk.deviceManager.onDeviceDiscovered({
'nativeId': 'unknown',
'name': 'Unknown People',
'type': scrypted_sdk.ScryptedDeviceType.Builtin.value,
'interfaces': [
scrypted_sdk.ScryptedInterface.Camera.value,
scrypted_sdk.ScryptedInterface.Settings.value,
]
}))
async def save_known_faces(self):
pass
async def load_known_faces(self):
self.known_faces = {}
self.encoded_faces = {}
try:
self.known_faces = json.loads(self.storage.getItem('known'))
except:
pass
for known in self.known_faces:
encoded = []
self.encoded_faces[known] = encoded
encodings = self.known_faces[known]
for str in encodings:
try:
parsed = base64.decodebytes(str)
encoding = np.frombuffer(parsed, dtype=np.float64)
encoded.append(encoding)
except:
pass
async def getDevice(self, nativeId: str) -> Any:
if nativeId == 'unknown':
return UnknownPeople('unknown')
# width, height, channels
def get_input_details(self) -> Tuple[int, int, int]:
@@ -41,30 +94,52 @@ class DlibPlugin(PredictPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Setti
with self.mutex:
face_locations = face_recognition.face_locations(nparray)
scaled = []
for idx, face in enumerate(face_locations):
t, r, b, l = face
t *= 4
r *= 4
b *= 4
l *= 4
face_locations[idx] = (t, r, b, l)
for idx, face in enumerate(face_locations):
t, r, b, l = face
t *= 4
r *= 4
b *= 4
l *= 4
face_locations[idx] = (t, r, b, l)
nparray = np.array(input)
face_encodings = face_recognition.face_encodings(nparray, face_locations, model = 'small')
nparray = np.array(input)
with self.mutex:
face_encodings = face_recognition.face_encodings(nparray, face_locations)
all_ids = []
all_faces = []
for encoded in self.encoded_faces:
all_ids += ([encoded] * len(self.encoded_faces[encoded]))
all_faces += self.encoded_faces[encoded]
m = {}
for idx, fe in enumerate(face_encodings):
results = face_recognition.compare_faces(self.known_faces, fe)
found = False
for i, r in enumerate(results):
if r:
found = True
m[idx] = str(i)
break
results = list(face_recognition.face_distance(all_faces, fe))
if not found:
self.known_faces.append(fe)
best = 1
if len(results):
best = min(results)
minpos = results.index(best)
if best > .6:
id = random_string()
print('top face %s' % best)
print('new face %s' % id)
encoded = [fe]
self.encoded_faces[id] = encoded
all_faces += encoded
volume = os.environ['SCRYPTED_PLUGIN_VOLUME']
people = os.path.join(volume, 'unknown')
os.makedirs(people, exist_ok=True)
t, r, b, l = face_locations[idx]
cropped = input.crop((l, t, r, b))
fp = os.path.join(people, id + '.jpg')
cropped.save(fp)
else:
id = all_ids[minpos]
print('has face %s' % id)
# return

View File

@@ -0,0 +1,102 @@
import scrypted_sdk
from scrypted_sdk import RequestPictureOptions, MediaObject, Setting
import os
import json
class UnknownPeople(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.Settings, scrypted_sdk.Camera):
def __init__(self, nativeId: str | None = None):
super().__init__(nativeId)
async def takePicture(self, options: RequestPictureOptions = None) -> MediaObject:
volume = os.environ['SCRYPTED_PLUGIN_VOLUME']
people = os.path.join(volume, 'unknown')
os.makedirs(people, exist_ok=True)
for unknown in os.listdir(people):
fp = os.path.join(people, unknown)
ret = scrypted_sdk.mediaManager.createMediaObjectFromUrl('file:/' + fp)
return await ret
black = os.path.join(volume, 'zip', 'unzipped', 'fs', 'black.jpg')
ret = scrypted_sdk.mediaManager.createMediaObjectFromUrl('file:/' + black)
return await ret
async def getSettings(self) -> list[Setting]:
volume = os.environ['SCRYPTED_PLUGIN_VOLUME']
people = os.path.join(volume, 'unknown')
os.makedirs(people, exist_ok=True)
known = {}
try:
known = json.loads(self.storage.getItem('known'))
except:
pass
choices = list(known.keys())
ret: list[Setting] = [
{
'key': 'known',
'title': 'Familiar People',
'description': 'The people known this this plugin.',
'choices': choices,
}
]
for unknown in os.listdir(people):
ret.append(
{
'key': unknown,
'title': 'Name',
'description': 'Associate this thumbnail with an existing person or identify a new person.',
'choices': choices,
'combobox': True,
}
)
ret.append(
{
'key': 'delete',
'title': 'Delete',
'description': 'Delete this face.',
'type': 'button',
}
)
return ret
ret.append(
{
'key': 'unknown',
'title': 'Unknown People',
'value': 'Waiting for unknown person...',
'description': 'There are no more people that need to be identified.',
'readonly': True,
}
)
return ret
async def putSetting(self, key: str, value: str) -> None:
if key == 'known':
return
known = {}
try:
known = json.loads(self.storage.getItem('known'))
except:
pass
choices = list(known.keys())
if value or key == 'delete':
volume = os.environ['SCRYPTED_PLUGIN_VOLUME']
people = os.path.join(volume, 'unknown')
os.makedirs(people, exist_ok=True)
for unknown in os.listdir(people):
fp = os.path.join(people, unknown)
os.remove(fp)
if value not in choices:
choices.append(value)
break
await self.onDeviceEvent(scrypted_sdk.ScryptedInterface.Settings.value, None)
await self.onDeviceEvent(scrypted_sdk.ScryptedInterface.Camera.value, None)