From 369bc10741b49b3e3b7b8258dec5a0bdd0a90f4f Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Wed, 29 Dec 2021 23:11:40 -0800 Subject: [PATCH] mqtt: publish events --- plugins/mqtt/README.md | 18 +-- plugins/mqtt/package-lock.json | 4 +- plugins/mqtt/package.json | 5 +- plugins/mqtt/src/api/mqtt-device-base.ts | 2 +- plugins/mqtt/src/main.ts | 153 ++++++++++++++++++++++- plugins/mqtt/src/publishable-types.ts | 15 +++ 6 files changed, 177 insertions(+), 20 deletions(-) create mode 100644 plugins/mqtt/src/publishable-types.ts diff --git a/plugins/mqtt/README.md b/plugins/mqtt/README.md index 03f52c9ae..598fcfb2d 100644 --- a/plugins/mqtt/README.md +++ b/plugins/mqtt/README.md @@ -1,15 +1,9 @@ -# @scrypted/mqtt +# MQTT Plugin for Scrypted -## npm commands - * npm run scrypted-webpack - * npm run scrypted-deploy - * npm run scrypted-debug +The MQTT Plugin can be used as both an MQTT Broker and or as an MQTT Client. -## scrypted distribution via npm - 1. Ensure package.json is set up properly for publishing on npm. - 2. npm publish +The MQTT Client for Scrypted can be both a MQTT publisher and a subscriber: + * Devices published from Scrypted will report their state and events to the MQTT Broker. + * MQTT topics subscribed by Scrypted can be used to import devices into Scrypted. -## Visual Studio Code configuration - -* If using a remote server, edit [.vscode/settings.json](blob/master/.vscode/settings.json) to specify the IP Address of the Scrypted server. -* Launch Scrypted Debugger from the launch menu. +This plugin includes the Aedes MQTT Broker. \ No newline at end of file diff --git a/plugins/mqtt/package-lock.json b/plugins/mqtt/package-lock.json index c8cad5ad0..46d5e22c4 100644 --- a/plugins/mqtt/package-lock.json +++ b/plugins/mqtt/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/mqtt", - "version": "0.0.31", + "version": "0.0.32", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/mqtt", - "version": "0.0.31", + "version": "0.0.32", "dependencies": { "@types/node": "^16.6.1", "aedes": "^0.46.1", diff --git a/plugins/mqtt/package.json b/plugins/mqtt/package.json index 6df225cb6..0dd9571cd 100644 --- a/plugins/mqtt/package.json +++ b/plugins/mqtt/package.json @@ -17,9 +17,10 @@ "mqtt" ], "scrypted": { - "name": "MQTT Plugin", + "name": "MQTT", "type": "DeviceProvider", "interfaces": [ + "MixinProvider", "DeviceProvider", "Settings" ] @@ -37,5 +38,5 @@ "@scrypted/sdk": "file:../../sdk", "@types/nunjucks": "^3.2.0" }, - "version": "0.0.31" + "version": "0.0.32" } diff --git a/plugins/mqtt/src/api/mqtt-device-base.ts b/plugins/mqtt/src/api/mqtt-device-base.ts index 8654bc41f..91378873f 100644 --- a/plugins/mqtt/src/api/mqtt-device-base.ts +++ b/plugins/mqtt/src/api/mqtt-device-base.ts @@ -70,4 +70,4 @@ export class MqttDeviceBase extends ScryptedDeviceBase implements Settings { return this.client; } -} \ No newline at end of file +} diff --git a/plugins/mqtt/src/main.ts b/plugins/mqtt/src/main.ts index 632427f4f..b203af80e 100644 --- a/plugins/mqtt/src/main.ts +++ b/plugins/mqtt/src/main.ts @@ -1,7 +1,7 @@ // https://developer.scrypted.app/#getting-started // package.json contains the metadata (name, interfaces) about this device // under the "scrypted" key. -import { Settings, Setting, DeviceProvider, ScryptedDeviceBase, ScryptedInterface, ScryptedDeviceType, Scriptable, ScriptSource, ScryptedInterfaceDescriptors } from '@scrypted/sdk'; +import { Settings, Setting, DeviceProvider, ScryptedDeviceBase, ScryptedInterface, ScryptedDeviceType, Scriptable, ScriptSource, ScryptedInterfaceDescriptors, MixinProvider, ScryptedDevice, EventListenerRegister } from '@scrypted/sdk'; import sdk from '@scrypted/sdk'; import { monacoEvalDefaults } from './monaco'; import { scryptedEval } from './scrypted-eval'; @@ -12,6 +12,9 @@ import ws from 'websocket-stream'; import http from 'http'; import { MqttDeviceBase } from './api/mqtt-device-base'; import { MqttAutoDiscoveryProvider } from './autodiscovery/autodiscovery'; +import { SettingsMixinDeviceBase } from "../../../common/src/settings-mixin"; +import { connect, Client } from 'mqtt'; +import { canMixin } from './publishable-types'; const loopbackLight = require("!!raw-loader!./examples/loopback-light.ts").default; @@ -22,7 +25,7 @@ for (const desc of Object.values(ScryptedInterfaceDescriptors)) { } } -const { log, deviceManager } = sdk; +const { log, deviceManager, systemManager } = sdk; class MqttDevice extends MqttDeviceBase implements Scriptable { handler: any; @@ -164,7 +167,138 @@ class MqttDevice extends MqttDeviceBase implements Scriptable { const brokerProperties = ['httpPort', 'tcpPort', 'enableBroker', 'username', 'password']; -class MqttProvider extends ScryptedDeviceBase implements DeviceProvider, Settings { + +class MqttPublisherMixin extends SettingsMixinDeviceBase { + client: Client; + handler: any; + pathname: string; + device: ScryptedDevice; + listener: EventListenerRegister; + + constructor(public provider: MqttProvider, mixinDevice: any, mixinDeviceInterfaces: ScryptedInterface[], mixinDeviceState: { [key: string]: any }) { + super(mixinDevice, mixinDeviceState, { + mixinDeviceInterfaces, + providerNativeId: undefined, + group: 'MQTT', + groupKey: 'mqtt', + }); + + this.device = systemManager.getDeviceById(this.id); + this.connectClient(); + + this.listener = this.device.listen(undefined, (eventSource, eventDetails, eventData) => { + const { property } = eventDetails; + if (property) { + let str = this[property]; + if (typeof str === 'object') + str = JSON.stringify(str); + + this.client.publish(`${this.pathname}/${this.id}/${property}`, str?.toString() || ''); + } + else { + let str = eventData; + if (typeof str === 'object') + str = JSON.stringify(str); + + this.client.publish(`${this.pathname}/${this.id}/${eventDetails.eventInterface}`, str?.toString() || ''); + } + }) + } + + async getMixinSettings(): Promise { + return [ + { + title: 'Publish URL', + key: 'url', + value: this.storage.getItem('url'), + description: "The base publish URL for the device. All published MQTT data will use this as the base path. Leave blank to use the Scrypted MQTT broker.", + placeholder: "mqtt://localhost/device/kitchen-light", + }, + { + title: 'Username', + value: this.storage.getItem('username'), + key: 'username', + description: 'Optional: User name used to authenticate with the MQTT broker.', + }, + { + title: 'Password', + value: this.storage.getItem('password'), + key: 'password', + type: 'password', + description: 'Optional: Password used to authenticate with the MQTT broker.', + }, + ]; + } + + async putMixinSetting(key: string, value: string | number | boolean) { + if (key === 'url') { + let url = value.toString(); + if (!url.endsWith('/')) + url += '/'; + this.storage.setItem(key, url); + } + else { + this.storage.setItem(key, value.toString()); + } + } + + connectClient() { + this.client?.end(); + this.client = undefined; + const urlString = this.storage.getItem('url'); + let url: URL; + let username: string; + let password: string; + if (urlString) { + url = new URL(urlString); + username = this.storage.getItem('username') || undefined; + password = this.storage.getItem('password') || undefined; + } + else { + const tcpPort = this.provider.storage.getItem('tcpPort') || ''; + username = this.provider.storage.getItem('username') || undefined; + password = this.provider.storage.getItem('password') || undefined; + url = new URL(`mqtt://localhost:${tcpPort}/scrypted`); + } + + this.pathname = url.pathname.substring(1); + const urlWithoutPath = new URL(url); + urlWithoutPath.pathname = ''; + + const client = this.client = connect(urlWithoutPath.toString(), { + username, + password, + }); + client.setMaxListeners(Infinity); + + client.on('connect', packet => { + this.console.log('MQTT client connected, publishing current state.'); + + for (const iface of this.device.interfaces) { + for (const prop of ScryptedInterfaceDescriptors[iface]?.properties || []) { + let str = this[prop]; + if (typeof str === 'object') + str = JSON.stringify(str); + + client.publish(`${this.pathname}/${this.id}/${prop}`, str?.toString() || ''); + } + } + }) + client.on('disconnect', () => this.console.log('mqtt client disconnected')); + client.on('error', e => { + this.console.log('mqtt client error', e); + }); + + return this.client; + } + + release() { + this.client?.end(); + this.client = undefined; + } +} + +class MqttProvider extends ScryptedDeviceBase implements DeviceProvider, Settings, MixinProvider { devices = new Map(); netServer: net.Server; httpServer: http.Server; @@ -337,6 +471,19 @@ class MqttProvider extends ScryptedDeviceBase implements DeviceProvider, Setting } return ret; } + + + async canMixin(type: ScryptedDeviceType, interfaces: string[]): Promise { + return canMixin(type, interfaces) ? [ScryptedInterface.Settings] : undefined; + } + + async getMixin(mixinDevice: any, mixinDeviceInterfaces: ScryptedInterface[], mixinDeviceState: { [key: string]: any; }): Promise { + return new MqttPublisherMixin(this, mixinDevice, mixinDeviceInterfaces, mixinDeviceState); + } + + async releaseMixin(id: string, mixinDevice: any): Promise { + mixinDevice.release(); + } } export default new MqttProvider(); \ No newline at end of file diff --git a/plugins/mqtt/src/publishable-types.ts b/plugins/mqtt/src/publishable-types.ts new file mode 100644 index 000000000..d18ad9409 --- /dev/null +++ b/plugins/mqtt/src/publishable-types.ts @@ -0,0 +1,15 @@ +import { ScryptedDeviceType, ScryptedInterface } from "@scrypted/sdk"; + +export function canMixin(type: ScryptedDeviceType, interfaces: string[]): boolean { + const set = new Set(interfaces); + set.delete(ScryptedInterface.ObjectDetection); + set.delete(ScryptedInterface.DeviceDiscovery); + set.delete(ScryptedInterface.DeviceCreator); + set.delete(ScryptedInterface.DeviceProvider); + set.delete(ScryptedInterface.MixinProvider); + set.delete(ScryptedInterface.PushHandler); + set.delete(ScryptedInterface.EngineIOHandler); + set.delete(ScryptedInterface.HttpRequestHandler); + set.delete(ScryptedInterface.Settings); + return !!set.size; +}