diff --git a/plugins/dlib/fs/black.jpg b/plugins/dlib/fs/black.jpg new file mode 100644 index 000000000..dd39e540e Binary files /dev/null and b/plugins/dlib/fs/black.jpg differ diff --git a/plugins/dlib/package.json b/plugins/dlib/package.json index fddce493c..4e92bb6a8 100644 --- a/plugins/dlib/package.json +++ b/plugins/dlib/package.json @@ -33,6 +33,7 @@ "runtime": "python", "type": "API", "interfaces": [ + "DeviceProvider", "Settings", "BufferConverter", "ObjectDetection" diff --git a/plugins/dlib/src/dlibplugin/__init__.py b/plugins/dlib/src/dlibplugin/__init__.py index 8a01efe73..b948d5fdc 100644 --- a/plugins/dlib/src/dlibplugin/__init__.py +++ b/plugins/dlib/src/dlibplugin/__init__.py @@ -11,10 +11,21 @@ from typing import Any, List, Tuple, Mapping from scrypted_sdk.types import ObjectDetectionModel, ObjectDetectionResult, ObjectsDetected, Setting from predict import PredictSession import threading +import asyncio +from .unknown import UnknownPeople +import base64 +import json +import random +import string + +def random_string(): + letters = string.ascii_lowercase + return ''.join(random.choice(letters) for i in range(10)) + MIME_TYPE = 'x-scrypted-dlib/x-raw-image' -class DlibPlugin(PredictPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Settings): +class DlibPlugin(PredictPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Settings, scrypted_sdk.DeviceProvider): def __init__(self, nativeId: str | None = None): super().__init__(MIME_TYPE, nativeId=nativeId) @@ -22,8 +33,50 @@ class DlibPlugin(PredictPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Setti 0: 'face' } - self.known_faces = [] self.mutex = threading.Lock() + self.known_faces = {} + self.encoded_faces = {} + asyncio.ensure_future(self.load_known_faces()) + + asyncio.ensure_future(scrypted_sdk.deviceManager.onDeviceDiscovered({ + 'nativeId': 'unknown', + 'name': 'Unknown People', + 'type': scrypted_sdk.ScryptedDeviceType.Builtin.value, + 'interfaces': [ + scrypted_sdk.ScryptedInterface.Camera.value, + scrypted_sdk.ScryptedInterface.Settings.value, + ] + })) + + async def save_known_faces(self): + + pass + + async def load_known_faces(self): + self.known_faces = {} + self.encoded_faces = {} + + try: + self.known_faces = json.loads(self.storage.getItem('known')) + except: + pass + + + for known in self.known_faces: + encoded = [] + self.encoded_faces[known] = encoded + encodings = self.known_faces[known] + for str in encodings: + try: + parsed = base64.decodebytes(str) + encoding = np.frombuffer(parsed, dtype=np.float64) + encoded.append(encoding) + except: + pass + + async def getDevice(self, nativeId: str) -> Any: + if nativeId == 'unknown': + return UnknownPeople('unknown') # width, height, channels def get_input_details(self) -> Tuple[int, int, int]: @@ -41,30 +94,52 @@ class DlibPlugin(PredictPlugin, scrypted_sdk.BufferConverter, scrypted_sdk.Setti with self.mutex: face_locations = face_recognition.face_locations(nparray) - scaled = [] - for idx, face in enumerate(face_locations): - t, r, b, l = face - t *= 4 - r *= 4 - b *= 4 - l *= 4 - face_locations[idx] = (t, r, b, l) + for idx, face in enumerate(face_locations): + t, r, b, l = face + t *= 4 + r *= 4 + b *= 4 + l *= 4 + face_locations[idx] = (t, r, b, l) - nparray = np.array(input) - face_encodings = face_recognition.face_encodings(nparray, face_locations, model = 'small') + nparray = np.array(input) + + with self.mutex: + face_encodings = face_recognition.face_encodings(nparray, face_locations) + + all_ids = [] + all_faces = [] + for encoded in self.encoded_faces: + all_ids += ([encoded] * len(self.encoded_faces[encoded])) + all_faces += self.encoded_faces[encoded] m = {} for idx, fe in enumerate(face_encodings): - results = face_recognition.compare_faces(self.known_faces, fe) - found = False - for i, r in enumerate(results): - if r: - found = True - m[idx] = str(i) - break + results = list(face_recognition.face_distance(all_faces, fe)) - if not found: - self.known_faces.append(fe) + best = 1 + if len(results): + best = min(results) + minpos = results.index(best) + + if best > .6: + id = random_string() + print('top face %s' % best) + print('new face %s' % id) + encoded = [fe] + self.encoded_faces[id] = encoded + all_faces += encoded + + volume = os.environ['SCRYPTED_PLUGIN_VOLUME'] + people = os.path.join(volume, 'unknown') + os.makedirs(people, exist_ok=True) + t, r, b, l = face_locations[idx] + cropped = input.crop((l, t, r, b)) + fp = os.path.join(people, id + '.jpg') + cropped.save(fp) + else: + id = all_ids[minpos] + print('has face %s' % id) # return diff --git a/plugins/dlib/src/dlibplugin/unknown.py b/plugins/dlib/src/dlibplugin/unknown.py new file mode 100644 index 000000000..e49d15ba8 --- /dev/null +++ b/plugins/dlib/src/dlibplugin/unknown.py @@ -0,0 +1,102 @@ +import scrypted_sdk +from scrypted_sdk import RequestPictureOptions, MediaObject, Setting +import os +import json + +class UnknownPeople(scrypted_sdk.ScryptedDeviceBase, scrypted_sdk.Settings, scrypted_sdk.Camera): + def __init__(self, nativeId: str | None = None): + super().__init__(nativeId) + + async def takePicture(self, options: RequestPictureOptions = None) -> MediaObject: + volume = os.environ['SCRYPTED_PLUGIN_VOLUME'] + people = os.path.join(volume, 'unknown') + os.makedirs(people, exist_ok=True) + for unknown in os.listdir(people): + fp = os.path.join(people, unknown) + ret = scrypted_sdk.mediaManager.createMediaObjectFromUrl('file:/' + fp) + return await ret + + black = os.path.join(volume, 'zip', 'unzipped', 'fs', 'black.jpg') + ret = scrypted_sdk.mediaManager.createMediaObjectFromUrl('file:/' + black) + return await ret + + async def getSettings(self) -> list[Setting]: + volume = os.environ['SCRYPTED_PLUGIN_VOLUME'] + people = os.path.join(volume, 'unknown') + os.makedirs(people, exist_ok=True) + + known = {} + + try: + known = json.loads(self.storage.getItem('known')) + except: + pass + + choices = list(known.keys()) + + ret: list[Setting] = [ + { + 'key': 'known', + 'title': 'Familiar People', + 'description': 'The people known this this plugin.', + 'choices': choices, + } + ] + + for unknown in os.listdir(people): + ret.append( + { + 'key': unknown, + 'title': 'Name', + 'description': 'Associate this thumbnail with an existing person or identify a new person.', + 'choices': choices, + 'combobox': True, + } + ) + ret.append( + { + 'key': 'delete', + 'title': 'Delete', + 'description': 'Delete this face.', + 'type': 'button', + } + ) + return ret + + ret.append( + { + 'key': 'unknown', + 'title': 'Unknown People', + 'value': 'Waiting for unknown person...', + 'description': 'There are no more people that need to be identified.', + 'readonly': True, + } + ) + + return ret + + async def putSetting(self, key: str, value: str) -> None: + if key == 'known': + return + + known = {} + try: + known = json.loads(self.storage.getItem('known')) + except: + pass + choices = list(known.keys()) + + if value or key == 'delete': + volume = os.environ['SCRYPTED_PLUGIN_VOLUME'] + people = os.path.join(volume, 'unknown') + os.makedirs(people, exist_ok=True) + for unknown in os.listdir(people): + fp = os.path.join(people, unknown) + os.remove(fp) + if value not in choices: + choices.append(value) + + break + + await self.onDeviceEvent(scrypted_sdk.ScryptedInterface.Settings.value, None) + await self.onDeviceEvent(scrypted_sdk.ScryptedInterface.Camera.value, None) \ No newline at end of file