mqtt: publish events

This commit is contained in:
Koushik Dutta
2021-12-29 23:11:40 -08:00
parent 4a3e72af85
commit 369bc10741
6 changed files with 177 additions and 20 deletions

View File

@@ -1,15 +1,9 @@
# @scrypted/mqtt
# MQTT Plugin for Scrypted
## npm commands
* npm run scrypted-webpack
* npm run scrypted-deploy <ipaddress>
* npm run scrypted-debug <ipaddress>
The MQTT Plugin can be used as both an MQTT Broker and or as an MQTT Client.
## scrypted distribution via npm
1. Ensure package.json is set up properly for publishing on npm.
2. npm publish
The MQTT Client for Scrypted can be both a MQTT publisher and a subscriber:
* Devices published from Scrypted will report their state and events to the MQTT Broker.
* MQTT topics subscribed by Scrypted can be used to import devices into Scrypted.
## Visual Studio Code configuration
* If using a remote server, edit [.vscode/settings.json](blob/master/.vscode/settings.json) to specify the IP Address of the Scrypted server.
* Launch Scrypted Debugger from the launch menu.
This plugin includes the Aedes MQTT Broker.

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/mqtt",
"version": "0.0.31",
"version": "0.0.32",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/mqtt",
"version": "0.0.31",
"version": "0.0.32",
"dependencies": {
"@types/node": "^16.6.1",
"aedes": "^0.46.1",

View File

@@ -17,9 +17,10 @@
"mqtt"
],
"scrypted": {
"name": "MQTT Plugin",
"name": "MQTT",
"type": "DeviceProvider",
"interfaces": [
"MixinProvider",
"DeviceProvider",
"Settings"
]
@@ -37,5 +38,5 @@
"@scrypted/sdk": "file:../../sdk",
"@types/nunjucks": "^3.2.0"
},
"version": "0.0.31"
"version": "0.0.32"
}

View File

@@ -70,4 +70,4 @@ export class MqttDeviceBase extends ScryptedDeviceBase implements Settings {
return this.client;
}
}
}

View File

@@ -1,7 +1,7 @@
// https://developer.scrypted.app/#getting-started
// package.json contains the metadata (name, interfaces) about this device
// under the "scrypted" key.
import { Settings, Setting, DeviceProvider, ScryptedDeviceBase, ScryptedInterface, ScryptedDeviceType, Scriptable, ScriptSource, ScryptedInterfaceDescriptors } from '@scrypted/sdk';
import { Settings, Setting, DeviceProvider, ScryptedDeviceBase, ScryptedInterface, ScryptedDeviceType, Scriptable, ScriptSource, ScryptedInterfaceDescriptors, MixinProvider, ScryptedDevice, EventListenerRegister } from '@scrypted/sdk';
import sdk from '@scrypted/sdk';
import { monacoEvalDefaults } from './monaco';
import { scryptedEval } from './scrypted-eval';
@@ -12,6 +12,9 @@ import ws from 'websocket-stream';
import http from 'http';
import { MqttDeviceBase } from './api/mqtt-device-base';
import { MqttAutoDiscoveryProvider } from './autodiscovery/autodiscovery';
import { SettingsMixinDeviceBase } from "../../../common/src/settings-mixin";
import { connect, Client } from 'mqtt';
import { canMixin } from './publishable-types';
const loopbackLight = require("!!raw-loader!./examples/loopback-light.ts").default;
@@ -22,7 +25,7 @@ for (const desc of Object.values(ScryptedInterfaceDescriptors)) {
}
}
const { log, deviceManager } = sdk;
const { log, deviceManager, systemManager } = sdk;
class MqttDevice extends MqttDeviceBase implements Scriptable {
handler: any;
@@ -164,7 +167,138 @@ class MqttDevice extends MqttDeviceBase implements Scriptable {
const brokerProperties = ['httpPort', 'tcpPort', 'enableBroker', 'username', 'password'];
class MqttProvider extends ScryptedDeviceBase implements DeviceProvider, Settings {
class MqttPublisherMixin extends SettingsMixinDeviceBase<any> {
client: Client;
handler: any;
pathname: string;
device: ScryptedDevice;
listener: EventListenerRegister;
constructor(public provider: MqttProvider, mixinDevice: any, mixinDeviceInterfaces: ScryptedInterface[], mixinDeviceState: { [key: string]: any }) {
super(mixinDevice, mixinDeviceState, {
mixinDeviceInterfaces,
providerNativeId: undefined,
group: 'MQTT',
groupKey: 'mqtt',
});
this.device = systemManager.getDeviceById(this.id);
this.connectClient();
this.listener = this.device.listen(undefined, (eventSource, eventDetails, eventData) => {
const { property } = eventDetails;
if (property) {
let str = this[property];
if (typeof str === 'object')
str = JSON.stringify(str);
this.client.publish(`${this.pathname}/${this.id}/${property}`, str?.toString() || '');
}
else {
let str = eventData;
if (typeof str === 'object')
str = JSON.stringify(str);
this.client.publish(`${this.pathname}/${this.id}/${eventDetails.eventInterface}`, str?.toString() || '');
}
})
}
async getMixinSettings(): Promise<Setting[]> {
return [
{
title: 'Publish URL',
key: 'url',
value: this.storage.getItem('url'),
description: "The base publish URL for the device. All published MQTT data will use this as the base path. Leave blank to use the Scrypted MQTT broker.",
placeholder: "mqtt://localhost/device/kitchen-light",
},
{
title: 'Username',
value: this.storage.getItem('username'),
key: 'username',
description: 'Optional: User name used to authenticate with the MQTT broker.',
},
{
title: 'Password',
value: this.storage.getItem('password'),
key: 'password',
type: 'password',
description: 'Optional: Password used to authenticate with the MQTT broker.',
},
];
}
async putMixinSetting(key: string, value: string | number | boolean) {
if (key === 'url') {
let url = value.toString();
if (!url.endsWith('/'))
url += '/';
this.storage.setItem(key, url);
}
else {
this.storage.setItem(key, value.toString());
}
}
connectClient() {
this.client?.end();
this.client = undefined;
const urlString = this.storage.getItem('url');
let url: URL;
let username: string;
let password: string;
if (urlString) {
url = new URL(urlString);
username = this.storage.getItem('username') || undefined;
password = this.storage.getItem('password') || undefined;
}
else {
const tcpPort = this.provider.storage.getItem('tcpPort') || '';
username = this.provider.storage.getItem('username') || undefined;
password = this.provider.storage.getItem('password') || undefined;
url = new URL(`mqtt://localhost:${tcpPort}/scrypted`);
}
this.pathname = url.pathname.substring(1);
const urlWithoutPath = new URL(url);
urlWithoutPath.pathname = '';
const client = this.client = connect(urlWithoutPath.toString(), {
username,
password,
});
client.setMaxListeners(Infinity);
client.on('connect', packet => {
this.console.log('MQTT client connected, publishing current state.');
for (const iface of this.device.interfaces) {
for (const prop of ScryptedInterfaceDescriptors[iface]?.properties || []) {
let str = this[prop];
if (typeof str === 'object')
str = JSON.stringify(str);
client.publish(`${this.pathname}/${this.id}/${prop}`, str?.toString() || '');
}
}
})
client.on('disconnect', () => this.console.log('mqtt client disconnected'));
client.on('error', e => {
this.console.log('mqtt client error', e);
});
return this.client;
}
release() {
this.client?.end();
this.client = undefined;
}
}
class MqttProvider extends ScryptedDeviceBase implements DeviceProvider, Settings, MixinProvider {
devices = new Map<string, any>();
netServer: net.Server;
httpServer: http.Server;
@@ -337,6 +471,19 @@ class MqttProvider extends ScryptedDeviceBase implements DeviceProvider, Setting
}
return ret;
}
async canMixin(type: ScryptedDeviceType, interfaces: string[]): Promise<string[]> {
return canMixin(type, interfaces) ? [ScryptedInterface.Settings] : undefined;
}
async getMixin(mixinDevice: any, mixinDeviceInterfaces: ScryptedInterface[], mixinDeviceState: { [key: string]: any; }): Promise<any> {
return new MqttPublisherMixin(this, mixinDevice, mixinDeviceInterfaces, mixinDeviceState);
}
async releaseMixin(id: string, mixinDevice: any): Promise<void> {
mixinDevice.release();
}
}
export default new MqttProvider();

View File

@@ -0,0 +1,15 @@
import { ScryptedDeviceType, ScryptedInterface } from "@scrypted/sdk";
export function canMixin(type: ScryptedDeviceType, interfaces: string[]): boolean {
const set = new Set(interfaces);
set.delete(ScryptedInterface.ObjectDetection);
set.delete(ScryptedInterface.DeviceDiscovery);
set.delete(ScryptedInterface.DeviceCreator);
set.delete(ScryptedInterface.DeviceProvider);
set.delete(ScryptedInterface.MixinProvider);
set.delete(ScryptedInterface.PushHandler);
set.delete(ScryptedInterface.EngineIOHandler);
set.delete(ScryptedInterface.HttpRequestHandler);
set.delete(ScryptedInterface.Settings);
return !!set.size;
}