support mqtt

This commit is contained in:
Morten Olsen
2025-09-09 18:34:08 +02:00
parent 4a539702e9
commit 6104a0c599
3 changed files with 275 additions and 2 deletions

View File

@@ -36,6 +36,7 @@
"fastify": "^5.6.0",
"fastify-sse-v2": "^4.2.1",
"fastify-type-provider-zod": "^6.0.0",
"mqtt": "^5.14.1",
"zod": "^4.1.5"
},
"name": "@morten-olsen/fluxcurrent-server",

View File

@@ -1,4 +1,5 @@
import type { DocumentUpsertEvent } from '@morten-olsen/fluxcurrent-core/services/documents/documents.schemas.js';
import { MqttClient, connectAsync } from 'mqtt';
import type { Notifier } from '../notifier.types.ts';
@@ -6,13 +7,30 @@ import type { MqttConfig } from '#root/schemas/schemas.config.ts';
class MqttNotifier implements Notifier {
#config: MqttConfig;
#client?: Promise<MqttClient>;
constructor(config: MqttConfig) {
this.#config = config;
}
#connect = async () => {
this.#client = connectAsync(this.#config.url, {
username: this.#config.username,
password: this.#config.password,
});
return this.#client;
};
#getClient = async () => {
if (!this.#client) {
this.#client = this.#connect();
}
return this.#client;
};
public upsert = async (event: DocumentUpsertEvent) => {
console.log('upsert', event);
const client = await this.#getClient();
const topic = `${this.#config.baseTopic}/${event.document.type}/${event.document.uri}`;
await client.publish(topic, JSON.stringify(event));
};
}