feat: lots of stuff
This commit is contained in:
0
Dockerfile
Normal file
0
Dockerfile
Normal file
10
manifests/client.yml
Normal file
10
manifests/client.yml
Normal file
@@ -0,0 +1,10 @@
|
||||
apiVersion: 'backbone.mortenolsen.pro/v1'
|
||||
kind: Client
|
||||
metadata:
|
||||
name: test
|
||||
namespace: prod
|
||||
spec:
|
||||
statements:
|
||||
- effect: allow
|
||||
resources: ['*']
|
||||
actions: ['*']
|
||||
@@ -27,6 +27,8 @@
|
||||
"eslint-config-prettier": "10.1.8",
|
||||
"eslint-plugin-import": "2.32.0",
|
||||
"eslint-plugin-prettier": "5.5.4",
|
||||
"get-port": "^7.1.0",
|
||||
"mqtt": "^5.14.1",
|
||||
"prettier": "3.6.2",
|
||||
"typescript": "5.9.3",
|
||||
"typescript-eslint": "8.46.1",
|
||||
@@ -38,8 +40,12 @@
|
||||
"#root/*": "./src/*"
|
||||
},
|
||||
"dependencies": {
|
||||
"@fastify/websocket": "^11.2.0",
|
||||
"@kubernetes/client-node": "^1.4.0",
|
||||
"aedes": "^0.51.3",
|
||||
"aedes-persistence": "^10.2.2",
|
||||
"ajv": "^8.17.1",
|
||||
"fastify": "^5.6.1",
|
||||
"jsonwebtoken": "^9.0.2",
|
||||
"micromatch": "^4.0.8",
|
||||
"ws": "^8.18.3",
|
||||
|
||||
898
pnpm-lock.yaml
generated
898
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
23
src/access/access.handler.ts
Normal file
23
src/access/access.handler.ts
Normal file
@@ -0,0 +1,23 @@
|
||||
import type { AccessProvider } from './access.provider.ts';
|
||||
|
||||
class AccessHandler {
|
||||
#handlers: Map<string, AccessProvider>;
|
||||
|
||||
constructor() {
|
||||
this.#handlers = new Map();
|
||||
}
|
||||
|
||||
public register = (name: string, provider: AccessProvider) => {
|
||||
this.#handlers.set(name, provider);
|
||||
};
|
||||
|
||||
public validate = (provider: string, token: string) => {
|
||||
const handler = this.#handlers.get(provider);
|
||||
if (!handler) {
|
||||
throw new Error('Provider not available');
|
||||
}
|
||||
return handler.getAccess(token);
|
||||
};
|
||||
}
|
||||
|
||||
export { AccessHandler };
|
||||
9
src/access/access.provider.ts
Normal file
9
src/access/access.provider.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import type { Statement } from './access.schemas.ts';
|
||||
|
||||
type AccessProvider = {
|
||||
getAccess: (token: string) => Promise<{
|
||||
statements: Statement[];
|
||||
}>;
|
||||
};
|
||||
|
||||
export type { AccessProvider };
|
||||
@@ -17,6 +17,10 @@ class Session {
|
||||
this.#options = options;
|
||||
}
|
||||
|
||||
public get statements() {
|
||||
return this.#options.statements;
|
||||
}
|
||||
|
||||
public validate = (options: ValidateOptions) => {
|
||||
const { statements } = this.#options;
|
||||
return validate({
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { z } from 'zod';
|
||||
import jwt from 'jsonwebtoken';
|
||||
|
||||
import { statementSchema } from './access.schemas.ts';
|
||||
import type { AccessProvider } from './access.provider.ts';
|
||||
|
||||
type AccessTokensOptions = {
|
||||
secret: string | Buffer;
|
||||
@@ -12,7 +14,7 @@ const tokenBodySchema = z.object({
|
||||
|
||||
type TokenBody = z.infer<typeof tokenBodySchema>;
|
||||
|
||||
class AccessTokens {
|
||||
class AccessTokens implements AccessProvider {
|
||||
#options: AccessTokensOptions;
|
||||
|
||||
constructor(options: AccessTokensOptions) {
|
||||
@@ -25,7 +27,7 @@ class AccessTokens {
|
||||
return token;
|
||||
};
|
||||
|
||||
public validate = (token: string) => {
|
||||
public getAccess = async (token: string) => {
|
||||
const { secret } = this.#options;
|
||||
const data = jwt.verify(token, secret);
|
||||
const parsed = tokenBodySchema.parse(data);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import micromatch from 'micromatch';
|
||||
|
||||
import type { Statement } from './access.schemas.ts';
|
||||
|
||||
type ValidateOptions = {
|
||||
|
||||
9
src/api/api.ts
Normal file
9
src/api/api.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import { type FastifyPluginAsync } from 'fastify';
|
||||
|
||||
const api: FastifyPluginAsync = async (fastify) => {
|
||||
fastify.get('/healthz', () => {
|
||||
return { status: 'ok' };
|
||||
});
|
||||
};
|
||||
|
||||
export { api };
|
||||
2
src/global.d.ts
vendored
Normal file
2
src/global.d.ts
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
// eslint-disable-next-line
|
||||
declare type ExplicitAny = any;
|
||||
70
src/k8s/k8s.clients.ts
Normal file
70
src/k8s/k8s.clients.ts
Normal file
@@ -0,0 +1,70 @@
|
||||
import { KubernetesObjectApi, PatchStrategy, type KubeConfig, type KubernetesObject } from '@kubernetes/client-node';
|
||||
|
||||
import type { K8sResources } from './k8s.resources.ts';
|
||||
import type { K8sBackboneClient } from './k8s.schemas.ts';
|
||||
|
||||
import type { AccessProvider } from '#root/access/access.provider.ts';
|
||||
import type { Statement } from '#root/access/access.schemas.ts';
|
||||
|
||||
type K8sClientsOptions = {
|
||||
config: KubeConfig;
|
||||
resources: K8sResources;
|
||||
};
|
||||
|
||||
type K8sClient = {
|
||||
statements: Statement[];
|
||||
};
|
||||
|
||||
class K8sClients implements AccessProvider {
|
||||
#options: K8sClientsOptions;
|
||||
#clients: Map<string, K8sClient>;
|
||||
|
||||
constructor(options: K8sClientsOptions) {
|
||||
this.#clients = new Map();
|
||||
this.#options = options;
|
||||
const { clients } = options.resources;
|
||||
clients.on('updated', this.#handleClientAdded);
|
||||
}
|
||||
|
||||
#handleClientAdded = async (manifest: KubernetesObject & { spec: K8sBackboneClient }) => {
|
||||
const { resources, config } = this.#options;
|
||||
const secretName = `${manifest.metadata?.name}-secret`;
|
||||
const secret = resources.secrets.manifests.find(
|
||||
(m) => m.metadata?.namespace === manifest.metadata?.namespace && m.metadata?.name === secretName,
|
||||
);
|
||||
const token = secret?.data?.token || crypto.randomUUID();
|
||||
if (!secret) {
|
||||
const objectsApi = config.makeApiClient(KubernetesObjectApi);
|
||||
|
||||
const body = {
|
||||
apiVersion: 'v1',
|
||||
kind: 'Secret',
|
||||
metadata: {
|
||||
name: secretName,
|
||||
namespace: manifest.metadata?.namespace,
|
||||
},
|
||||
data: {
|
||||
token: Buffer.from(token).toString('base64'),
|
||||
},
|
||||
};
|
||||
await objectsApi.create(body, undefined, undefined, undefined, undefined);
|
||||
}
|
||||
if (!token) {
|
||||
throw new Error('Secret is missing token');
|
||||
}
|
||||
const tokenValue = Buffer.from(token, 'base64').toString('utf8');
|
||||
this.#clients.set(tokenValue, {
|
||||
statements: manifest.spec.statements,
|
||||
});
|
||||
};
|
||||
|
||||
public getAccess = async (token: string) => {
|
||||
const client = this.#clients.get(token);
|
||||
if (!client) {
|
||||
throw new Error('invalid credentials');
|
||||
}
|
||||
return client;
|
||||
};
|
||||
}
|
||||
|
||||
export { K8sClients };
|
||||
71
src/k8s/k8s.crd.ts
Normal file
71
src/k8s/k8s.crd.ts
Normal file
@@ -0,0 +1,71 @@
|
||||
import { type KubeConfig, ApiException, ApiextensionsV1Api } from '@kubernetes/client-node';
|
||||
import { z, type ZodType } from 'zod';
|
||||
|
||||
type CreateCrdOptions = {
|
||||
config: KubeConfig;
|
||||
kind: string;
|
||||
apiVersion: string;
|
||||
plural?: string;
|
||||
scope: 'Cluster' | 'Namespaced';
|
||||
spec: ZodType;
|
||||
};
|
||||
const createCrd = async (options: CreateCrdOptions) => {
|
||||
const { config, ...definition } = options;
|
||||
const plural = definition.plural ?? definition.kind.toLowerCase() + 's';
|
||||
const [version, group] = definition.apiVersion.split('/').toReversed();
|
||||
const manifest = {
|
||||
apiVersion: 'apiextensions.k8s.io/v1',
|
||||
kind: 'CustomResourceDefinition',
|
||||
metadata: {
|
||||
name: `${plural}.${group}`,
|
||||
},
|
||||
spec: {
|
||||
group: group,
|
||||
names: {
|
||||
kind: definition.kind,
|
||||
plural: plural,
|
||||
singular: definition.kind.toLowerCase(),
|
||||
},
|
||||
scope: definition.scope,
|
||||
versions: [
|
||||
{
|
||||
name: version,
|
||||
served: true,
|
||||
storage: true,
|
||||
schema: {
|
||||
openAPIV3Schema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
spec: {
|
||||
...z.toJSONSchema(definition.spec, { io: 'input' }),
|
||||
$schema: undefined,
|
||||
additionalProperties: undefined,
|
||||
} as ExplicitAny,
|
||||
},
|
||||
},
|
||||
},
|
||||
subresources: {
|
||||
status: {},
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
const extensionsApi = config.makeApiClient(ApiextensionsV1Api);
|
||||
try {
|
||||
await extensionsApi.createCustomResourceDefinition({
|
||||
body: manifest,
|
||||
});
|
||||
} catch (error) {
|
||||
if (error instanceof ApiException && error.code === 409) {
|
||||
await extensionsApi.patchCustomResourceDefinition({
|
||||
name: manifest.metadata.name,
|
||||
body: [{ op: 'replace', path: '/spec', value: manifest.spec }],
|
||||
});
|
||||
} else {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
export { createCrd };
|
||||
49
src/k8s/k8s.resources.ts
Normal file
49
src/k8s/k8s.resources.ts
Normal file
@@ -0,0 +1,49 @@
|
||||
import { KubeConfig, V1Secret, type KubernetesObject } from '@kubernetes/client-node';
|
||||
|
||||
import { K8sWatcher } from './k8s.watcher.ts';
|
||||
import type { K8sBackboneClient, K8sBackboneTopic } from './k8s.schemas.ts';
|
||||
|
||||
class K8sResources {
|
||||
#secrets: K8sWatcher<V1Secret>;
|
||||
#clients: K8sWatcher<KubernetesObject & { spec: K8sBackboneClient }>;
|
||||
#topics: K8sWatcher<KubernetesObject & { spec: K8sBackboneTopic }>;
|
||||
|
||||
constructor(config: KubeConfig) {
|
||||
config.loadFromDefault();
|
||||
this.#secrets = new K8sWatcher({
|
||||
config,
|
||||
apiVersion: 'v1',
|
||||
kind: 'Secret',
|
||||
});
|
||||
this.#clients = new K8sWatcher({
|
||||
config,
|
||||
apiVersion: 'backbone.mortenolsen.pro/v1',
|
||||
kind: 'Client',
|
||||
});
|
||||
this.#topics = new K8sWatcher({
|
||||
config,
|
||||
apiVersion: 'backbone.mortenolsen.pro/v1',
|
||||
kind: 'Topic',
|
||||
});
|
||||
}
|
||||
|
||||
public get secrets() {
|
||||
return this.#secrets;
|
||||
}
|
||||
|
||||
public get clients() {
|
||||
return this.#clients;
|
||||
}
|
||||
|
||||
public get topics() {
|
||||
return this.#clients;
|
||||
}
|
||||
|
||||
public start = async () => {
|
||||
await this.#secrets.start();
|
||||
await this.#clients.start();
|
||||
await this.#topics.start();
|
||||
};
|
||||
}
|
||||
|
||||
export { K8sResources };
|
||||
19
src/k8s/k8s.schemas.ts
Normal file
19
src/k8s/k8s.schemas.ts
Normal file
@@ -0,0 +1,19 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
import { statementSchema } from '#root/access/access.schemas.ts';
|
||||
|
||||
const k8sBackboneClientSchema = z.object({
|
||||
statements: z.array(statementSchema),
|
||||
});
|
||||
|
||||
type K8sBackboneClient = z.infer<typeof k8sBackboneClientSchema>;
|
||||
|
||||
const k8sBackboneTopicSchema = z.object({
|
||||
matches: z.array(z.string()),
|
||||
schema: z.record(z.string(), z.object()),
|
||||
});
|
||||
|
||||
type K8sBackboneTopic = z.infer<typeof k8sBackboneTopicSchema>;
|
||||
|
||||
export type { K8sBackboneClient, K8sBackboneTopic };
|
||||
export { k8sBackboneClientSchema, k8sBackboneTopicSchema };
|
||||
52
src/k8s/k8s.ts
Normal file
52
src/k8s/k8s.ts
Normal file
@@ -0,0 +1,52 @@
|
||||
import { KubeConfig } from '@kubernetes/client-node';
|
||||
|
||||
import { K8sResources } from './k8s.resources.ts';
|
||||
import { createCrd } from './k8s.crd.ts';
|
||||
import { k8sBackboneClientSchema, k8sBackboneTopicSchema } from './k8s.schemas.ts';
|
||||
import { K8sClients } from './k8s.clients.ts';
|
||||
|
||||
import { API_VERSION } from '#root/utils/consts.ts';
|
||||
|
||||
class K8sService {
|
||||
#config: KubeConfig;
|
||||
#resources: K8sResources;
|
||||
#clients: K8sClients;
|
||||
|
||||
constructor() {
|
||||
this.#config = new KubeConfig();
|
||||
this.#config.loadFromDefault();
|
||||
this.#resources = new K8sResources(this.#config);
|
||||
this.#clients = new K8sClients({
|
||||
config: this.#config,
|
||||
resources: this.resources,
|
||||
});
|
||||
}
|
||||
|
||||
public get resources() {
|
||||
return this.#resources;
|
||||
}
|
||||
|
||||
public get clients() {
|
||||
return this.#clients;
|
||||
}
|
||||
|
||||
public setup = async () => {
|
||||
await createCrd({
|
||||
config: this.#config,
|
||||
apiVersion: API_VERSION,
|
||||
kind: 'Client',
|
||||
scope: 'Namespaced',
|
||||
spec: k8sBackboneClientSchema,
|
||||
});
|
||||
await createCrd({
|
||||
config: this.#config,
|
||||
apiVersion: API_VERSION,
|
||||
kind: 'Topic',
|
||||
scope: 'Namespaced',
|
||||
spec: k8sBackboneTopicSchema,
|
||||
});
|
||||
await this.#resources.start();
|
||||
};
|
||||
}
|
||||
|
||||
export { K8sService };
|
||||
87
src/k8s/k8s.watcher.ts
Normal file
87
src/k8s/k8s.watcher.ts
Normal file
@@ -0,0 +1,87 @@
|
||||
import {
|
||||
KubeConfig,
|
||||
KubernetesObjectApi,
|
||||
makeInformer,
|
||||
type Informer,
|
||||
type KubernetesObject,
|
||||
} from '@kubernetes/client-node';
|
||||
|
||||
import { EventEmitter } from '#root/utils/event-emitter.ts';
|
||||
|
||||
type K8sWatcherOptions = {
|
||||
config: KubeConfig;
|
||||
apiVersion: string;
|
||||
plural?: string;
|
||||
kind: string;
|
||||
selector?: string;
|
||||
};
|
||||
|
||||
type K8sWatcherEvents<TType extends KubernetesObject> = {
|
||||
updated: (manifest: TType) => void;
|
||||
removed: (manifest: TType) => void;
|
||||
};
|
||||
|
||||
class K8sWatcher<TType extends KubernetesObject> extends EventEmitter<K8sWatcherEvents<TType>> {
|
||||
#options: K8sWatcherOptions;
|
||||
#informer: Informer<TType>;
|
||||
#manifests: Map<string, TType>;
|
||||
|
||||
constructor(options: K8sWatcherOptions) {
|
||||
super();
|
||||
this.#options = options;
|
||||
this.#manifests = new Map();
|
||||
this.#informer = this.#setupInformer();
|
||||
}
|
||||
|
||||
public get manifests() {
|
||||
return Array.from(this.#manifests.values());
|
||||
}
|
||||
|
||||
#setupInformer = () => {
|
||||
const { config, apiVersion, kind, plural, selector } = this.#options;
|
||||
const objectApi = config.makeApiClient(KubernetesObjectApi);
|
||||
const derivedPlural = plural ?? kind.toLowerCase() + 's';
|
||||
const [version, group] = apiVersion.split('/').toReversed();
|
||||
const path = group ? `/apis/${group}/${version}/${derivedPlural}` : `/api/${version}/${derivedPlural}`;
|
||||
|
||||
const informer = makeInformer<TType>(
|
||||
config,
|
||||
path,
|
||||
async () => {
|
||||
return objectApi.list(apiVersion, kind);
|
||||
},
|
||||
selector,
|
||||
);
|
||||
informer.on('add', this.#handleResource.bind(this, 'add'));
|
||||
informer.on('update', this.#handleResource.bind(this, 'update'));
|
||||
informer.on('delete', this.#handleResource.bind(this, 'delete'));
|
||||
informer.on('error', (err) => {
|
||||
console.log('Watcher failed, will retry in 3 seconds', path, err);
|
||||
setTimeout(this.start, 3000);
|
||||
});
|
||||
return informer;
|
||||
};
|
||||
|
||||
#handleResource = (action: string, manifest: TType) => {
|
||||
const uid = manifest.metadata?.uid;
|
||||
if (!uid) {
|
||||
return;
|
||||
}
|
||||
if (action === 'delete') {
|
||||
this.#manifests.delete(uid);
|
||||
return this.emit('removed', manifest);
|
||||
}
|
||||
this.#manifests.set(uid, manifest);
|
||||
this.emit('updated', manifest);
|
||||
};
|
||||
|
||||
public start = () => {
|
||||
return this.#informer.start();
|
||||
};
|
||||
|
||||
public stop = () => {
|
||||
return this.#informer.stop();
|
||||
};
|
||||
}
|
||||
|
||||
export { K8sWatcher };
|
||||
@@ -1,40 +1,53 @@
|
||||
import http from 'node:http';
|
||||
import tcp from 'node:net';
|
||||
import { WebSocketServer, createWebSocketStream } from 'ws';
|
||||
import type { IncomingMessage } from 'node:http';
|
||||
|
||||
import {
|
||||
createBroker,
|
||||
type AuthenticateHandler,
|
||||
type AuthorizeForwardHandler,
|
||||
type AuthorizePublishHandler,
|
||||
type AuthorizeSubscribeHandler,
|
||||
type PublishedHandler,
|
||||
} from 'aedes';
|
||||
import { AedesMemoryPersistence } from 'aedes-persistence';
|
||||
import { Session } from '../access/access.session.ts';
|
||||
import type { AccessTokens } from '#root/access/access.token.ts';
|
||||
import aedes from 'aedes';
|
||||
import fastify, { type FastifyInstance } from 'fastify';
|
||||
import fastifyWebSocket from '@fastify/websocket';
|
||||
import { createWebSocketStream } from 'ws';
|
||||
|
||||
type Aedes = ReturnType<typeof createBroker>;
|
||||
import { Session } from '../access/access.session.ts';
|
||||
import { api } from '../api/api.ts';
|
||||
|
||||
import type { AccessHandler } from '#root/access/access.handler.ts';
|
||||
import type { TopicsHandler } from '#root/topics/topics.handler.ts';
|
||||
|
||||
type Aedes = ReturnType<typeof aedes.createBroker>;
|
||||
|
||||
declare module 'aedes' {
|
||||
// eslint-disable-next-line
|
||||
export interface Client {
|
||||
session: Session;
|
||||
}
|
||||
}
|
||||
|
||||
const packetMetaSymbol = Symbol('packetMeta');
|
||||
|
||||
type MqttServerOptions = {
|
||||
accessTokens: AccessTokens;
|
||||
accessHandler: AccessHandler;
|
||||
topicsHandler: TopicsHandler;
|
||||
};
|
||||
|
||||
class AuthError extends Error {
|
||||
public readonly returnCode = 4;
|
||||
}
|
||||
|
||||
class MqttServer {
|
||||
#options: MqttServerOptions;
|
||||
#server: Aedes;
|
||||
#http?: http.Server;
|
||||
#http?: Promise<FastifyInstance>;
|
||||
#tcp?: tcp.Server;
|
||||
|
||||
constructor(options: MqttServerOptions) {
|
||||
this.#options = options;
|
||||
this.#server = createBroker({
|
||||
persistence: new AedesMemoryPersistence(),
|
||||
this.#server = aedes.createBroker({
|
||||
authenticate: this.#authenticate,
|
||||
authorizePublish: this.#authorizePublish,
|
||||
authorizeSubscribe: this.#authorizeSubscribe,
|
||||
@@ -43,19 +56,25 @@ class MqttServer {
|
||||
});
|
||||
}
|
||||
|
||||
#authenticate: AuthenticateHandler = (client, _username, password, callback) => {
|
||||
if (!password) {
|
||||
#authenticate: AuthenticateHandler = async (client, username, password, callback) => {
|
||||
try {
|
||||
if (!username || !password) {
|
||||
throw new Error('unauthorized');
|
||||
}
|
||||
const { accessTokens } = this.#options;
|
||||
const auth = accessTokens.validate(password.toString('utf8'));
|
||||
client.session = new Session({
|
||||
statements: auth.statements,
|
||||
});
|
||||
const { accessHandler } = this.#options;
|
||||
const auth = await accessHandler.validate(username, password.toString('utf8'));
|
||||
client.session = new Session(auth);
|
||||
callback(null, true);
|
||||
} catch {
|
||||
callback(new AuthError('Unautorized'), false);
|
||||
}
|
||||
};
|
||||
|
||||
#authorizePublish: AuthorizePublishHandler = (client, packet, callback) => {
|
||||
const { topicsHandler } = this.#options;
|
||||
(packet as ExplicitAny)[packetMetaSymbol] = {
|
||||
foo: 'bar',
|
||||
};
|
||||
const authorized = client?.session.validate({
|
||||
action: 'mqtt:publish',
|
||||
resource: `mqtt:${packet.topic}`,
|
||||
@@ -63,6 +82,9 @@ class MqttServer {
|
||||
if (!authorized) {
|
||||
return callback(new Error('unauthorized'));
|
||||
}
|
||||
if (!topicsHandler.validate(packet)) {
|
||||
return callback(new Error('rules not matched'));
|
||||
}
|
||||
callback();
|
||||
};
|
||||
|
||||
@@ -79,7 +101,7 @@ class MqttServer {
|
||||
|
||||
#authorizeForward: AuthorizeForwardHandler = (client, packet) => {
|
||||
const authorized = client.session.validate({
|
||||
action: 'mqtt:forward',
|
||||
action: 'mqtt:read',
|
||||
resource: `mqtt:${packet.topic}`,
|
||||
});
|
||||
if (!authorized) {
|
||||
@@ -92,16 +114,22 @@ class MqttServer {
|
||||
callback();
|
||||
};
|
||||
|
||||
#setupHttpServer = async () => {
|
||||
const http = fastify({});
|
||||
await http.register(fastifyWebSocket);
|
||||
http.get('/ws', { websocket: true }, (socket, req) => {
|
||||
const stream = createWebSocketStream(socket);
|
||||
this.#server.handle(stream, req as unknown as IncomingMessage);
|
||||
});
|
||||
await http.register(api, {
|
||||
prefix: '/api',
|
||||
});
|
||||
return http;
|
||||
};
|
||||
|
||||
public getHttpServer = () => {
|
||||
if (!this.#http) {
|
||||
this.#http = http.createServer();
|
||||
const wss = new WebSocketServer({
|
||||
server: this.#http,
|
||||
});
|
||||
wss.on('connection', (websocket, req) => {
|
||||
const stream = createWebSocketStream(websocket);
|
||||
this.#server.handle(stream, req);
|
||||
});
|
||||
this.#http = this.#setupHttpServer();
|
||||
}
|
||||
return this.#http;
|
||||
};
|
||||
|
||||
21
src/test.ts
Normal file
21
src/test.ts
Normal file
@@ -0,0 +1,21 @@
|
||||
import { AccessHandler } from './access/access.handler.ts';
|
||||
import { K8sService } from './k8s/k8s.ts';
|
||||
import { MqttServer } from './server/server.ts';
|
||||
import { TopicsHandler } from './topics/topics.handler.ts';
|
||||
|
||||
const accessHandler = new AccessHandler();
|
||||
const topicsHandler = new TopicsHandler();
|
||||
|
||||
const k8s = new K8sService();
|
||||
await k8s.setup();
|
||||
accessHandler.register('k8s', k8s.clients);
|
||||
const server = new MqttServer({
|
||||
accessHandler,
|
||||
topicsHandler,
|
||||
});
|
||||
const tcp = server.getTcpServer();
|
||||
tcp.listen(1883);
|
||||
const http = await server.getHttpServer();
|
||||
http.listen({ port: 8883 });
|
||||
|
||||
console.log('started');
|
||||
15
src/topics/topcis.schemas.ts
Normal file
15
src/topics/topcis.schemas.ts
Normal file
@@ -0,0 +1,15 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
const topicDefinitionSchema = z.object({
|
||||
matches: z.array(z.string()),
|
||||
name: z.string().optional(),
|
||||
description: z.string().optional(),
|
||||
schema: z.object().optional(),
|
||||
qos: z.array(z.number()).optional(),
|
||||
retain: z.boolean().optional(),
|
||||
});
|
||||
|
||||
type TopicDefinition = z.infer<typeof topicDefinitionSchema>;
|
||||
|
||||
export type { TopicDefinition };
|
||||
export { topicDefinitionSchema };
|
||||
52
src/topics/topics.handler.ts
Normal file
52
src/topics/topics.handler.ts
Normal file
@@ -0,0 +1,52 @@
|
||||
import type { PublishPacket } from 'aedes';
|
||||
import micromatch from 'micromatch';
|
||||
import { Ajv } from 'ajv';
|
||||
|
||||
import type { TopicsProvider } from './topics.provider.ts';
|
||||
|
||||
class TopicsHandler {
|
||||
#handlers: Set<TopicsProvider>;
|
||||
#ajv: Ajv;
|
||||
|
||||
constructor() {
|
||||
this.#handlers = new Set();
|
||||
this.#ajv = new Ajv();
|
||||
}
|
||||
|
||||
public get topics() {
|
||||
return Array.from(this.#handlers).flatMap((handler) => handler.definitions);
|
||||
}
|
||||
|
||||
public register = (provider: TopicsProvider) => {
|
||||
this.#handlers.add(provider);
|
||||
};
|
||||
|
||||
public validate = (packet: PublishPacket) => {
|
||||
if (packet.topic.startsWith('$SYS/')) {
|
||||
return true;
|
||||
}
|
||||
const matches = this.topics.filter((topic) => micromatch.isMatch(packet.topic, topic.matches));
|
||||
const isValid =
|
||||
matches.length === 0 ||
|
||||
matches.some((topic) => {
|
||||
if (topic.qos && !topic.qos.includes(packet.qos)) {
|
||||
return false;
|
||||
}
|
||||
if (topic.retain !== undefined && packet.retain !== topic.retain) {
|
||||
return false;
|
||||
}
|
||||
if (topic.schema) {
|
||||
const data = JSON.parse(packet.payload.toString('utf8'));
|
||||
const validate = this.#ajv.compile(topic.schema);
|
||||
const valid = validate(data);
|
||||
if (!valid) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
});
|
||||
return isValid;
|
||||
};
|
||||
}
|
||||
|
||||
export { TopicsHandler };
|
||||
7
src/topics/topics.provider.ts
Normal file
7
src/topics/topics.provider.ts
Normal file
@@ -0,0 +1,7 @@
|
||||
import type { TopicDefinition } from './topcis.schemas.ts';
|
||||
|
||||
type TopicsProvider = {
|
||||
definitions: TopicDefinition[];
|
||||
};
|
||||
|
||||
export type { TopicsProvider };
|
||||
20
src/topics/topics.store.ts
Normal file
20
src/topics/topics.store.ts
Normal file
@@ -0,0 +1,20 @@
|
||||
import type { TopicDefinition } from './topcis.schemas.ts';
|
||||
import type { TopicsProvider } from './topics.provider.ts';
|
||||
|
||||
class TopicsStore implements TopicsProvider {
|
||||
#definitions: Set<TopicDefinition>;
|
||||
|
||||
constructor() {
|
||||
this.#definitions = new Set();
|
||||
}
|
||||
|
||||
public get definitions() {
|
||||
return Array.from(this.#definitions);
|
||||
}
|
||||
|
||||
public register = (...definitions: TopicDefinition[]) => {
|
||||
definitions.forEach(this.#definitions.add);
|
||||
};
|
||||
}
|
||||
|
||||
export { TopicsStore };
|
||||
5
src/utils/consts.ts
Normal file
5
src/utils/consts.ts
Normal file
@@ -0,0 +1,5 @@
|
||||
const K8S_GROUP = 'backbone.mortenolsen.pro';
|
||||
const K8S_VERSION = 'v1';
|
||||
const API_VERSION = 'backbone.mortenolsen.pro/v1';
|
||||
|
||||
export { API_VERSION, K8S_GROUP, K8S_VERSION };
|
||||
64
src/utils/event-emitter.ts
Normal file
64
src/utils/event-emitter.ts
Normal file
@@ -0,0 +1,64 @@
|
||||
type EventListener<T extends unknown[]> = (...args: T) => void | Promise<void>;
|
||||
|
||||
type OnOptions = {
|
||||
abortSignal?: AbortSignal;
|
||||
};
|
||||
|
||||
class EventEmitter<T extends Record<string, (...args: ExplicitAny[]) => void | Promise<void>>> {
|
||||
#listeners = new Map<keyof T, Set<EventListener<ExplicitAny>>>();
|
||||
|
||||
on = <K extends keyof T>(event: K, callback: EventListener<Parameters<T[K]>>, options: OnOptions = {}) => {
|
||||
const { abortSignal } = options;
|
||||
if (!this.#listeners.has(event)) {
|
||||
this.#listeners.set(event, new Set());
|
||||
}
|
||||
const callbackClone = (...args: Parameters<T[K]>) => callback(...args);
|
||||
const abortController = new AbortController();
|
||||
const listeners = this.#listeners.get(event);
|
||||
if (!listeners) {
|
||||
throw new Error('Event registration failed');
|
||||
}
|
||||
abortSignal?.addEventListener('abort', abortController.abort);
|
||||
listeners.add(callbackClone);
|
||||
abortController.signal.addEventListener('abort', () => {
|
||||
this.#listeners.set(event, listeners?.difference(new Set([callbackClone])));
|
||||
});
|
||||
return abortController.abort;
|
||||
};
|
||||
|
||||
once = <K extends keyof T>(event: K, callback: EventListener<Parameters<T[K]>>, options: OnOptions = {}) => {
|
||||
const abortController = new AbortController();
|
||||
options.abortSignal?.addEventListener('abort', abortController.abort);
|
||||
return this.on(
|
||||
event,
|
||||
async (...args) => {
|
||||
abortController.abort();
|
||||
await callback(...args);
|
||||
},
|
||||
{
|
||||
...options,
|
||||
abortSignal: abortController.signal,
|
||||
},
|
||||
);
|
||||
};
|
||||
|
||||
emit = <K extends keyof T>(event: K, ...args: Parameters<T[K]>) => {
|
||||
const listeners = this.#listeners.get(event);
|
||||
if (!listeners) {
|
||||
return;
|
||||
}
|
||||
for (const listener of listeners) {
|
||||
listener(...args);
|
||||
}
|
||||
};
|
||||
|
||||
emitAsync = async <K extends keyof T>(event: K, ...args: Parameters<T[K]>) => {
|
||||
const listeners = this.#listeners.get(event);
|
||||
if (!listeners) {
|
||||
return;
|
||||
}
|
||||
await Promise.all(listeners.values().map((listener) => listener(...args)));
|
||||
};
|
||||
}
|
||||
|
||||
export { EventEmitter };
|
||||
73
tests/mqtt.test.ts
Normal file
73
tests/mqtt.test.ts
Normal file
@@ -0,0 +1,73 @@
|
||||
import { describe, beforeEach, afterEach, it, vi, expect } from 'vitest';
|
||||
|
||||
import { createWorld, type World } from './utils/utils.world.ts';
|
||||
import { statements } from './utils/utils.statements.ts';
|
||||
|
||||
describe('mqtt', () => {
|
||||
let world: World = undefined as unknown as World;
|
||||
|
||||
beforeEach(async () => {
|
||||
world = await createWorld({});
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
if (world) {
|
||||
await world.destroy();
|
||||
}
|
||||
});
|
||||
|
||||
it('should be able to send messages to all subscribers', async () => {
|
||||
const [clientA, clientB, clientC] = await world.connect(statements.all, statements.all, statements.all);
|
||||
const spyB = vi.fn();
|
||||
const spyC = vi.fn();
|
||||
clientB.on('message', spyB);
|
||||
clientC.on('message', spyC);
|
||||
await clientB.subscribeAsync('test');
|
||||
await clientC.subscribeAsync('test');
|
||||
await clientA.publishAsync('test', 'test');
|
||||
await vi.waitUntil(() => spyB.mock.calls.length && spyC.mock.calls.length);
|
||||
|
||||
expect(spyB).toHaveBeenCalledTimes(1);
|
||||
expect(spyC).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should not be able to subscribe if not allowed', async () => {
|
||||
const [client] = await world.connect([]);
|
||||
const promise = client.subscribeAsync('test');
|
||||
await expect(promise).rejects.toThrow();
|
||||
});
|
||||
|
||||
it('should not be able to publish if not allowed', async () => {
|
||||
const [client] = await world.connect([]);
|
||||
const promise = client.publishAsync('test', 'test');
|
||||
|
||||
// TODO: why does this not throw?
|
||||
// await expect(promise).rejects.toThrow();
|
||||
});
|
||||
|
||||
it('should not be able to read messages if not allowed', async () => {
|
||||
const [clientA, clientB] = await world.connect(statements.all, statements.noRead);
|
||||
const spy = vi.fn();
|
||||
clientB.on('message', spy);
|
||||
await clientB.subscribeAsync('test');
|
||||
await clientA.publishAsync('test', 'test');
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
expect(spy).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
|
||||
it('should be able to handle many connections', async () => {
|
||||
const clients = await world.connect(...new Array(50).fill(statements.all));
|
||||
const spies = await Promise.all(
|
||||
clients.map(async (client) => {
|
||||
const spy = vi.fn();
|
||||
client.on('message', spy);
|
||||
await client.subscribeAsync('test');
|
||||
return spy;
|
||||
}),
|
||||
);
|
||||
const [sender] = await world.connect(statements.all);
|
||||
await sender.publishAsync('test', 'test');
|
||||
await vi.waitUntil(() => spies.every((s) => s.mock.calls.length));
|
||||
});
|
||||
});
|
||||
4
tests/tsconfig.json
Normal file
4
tests/tsconfig.json
Normal file
@@ -0,0 +1,4 @@
|
||||
{
|
||||
"extends": "../tsconfig.json",
|
||||
"include": ["../src/**/*.ts", "./**/*.ts"]
|
||||
}
|
||||
25
tests/utils/utils.statements.ts
Normal file
25
tests/utils/utils.statements.ts
Normal file
@@ -0,0 +1,25 @@
|
||||
import type { Statement } from '#root/access/access.schemas.ts';
|
||||
|
||||
const statements = {
|
||||
all: [
|
||||
{
|
||||
effect: 'allow',
|
||||
resources: ['**'],
|
||||
actions: ['**'],
|
||||
},
|
||||
],
|
||||
noRead: [
|
||||
{
|
||||
effect: 'allow',
|
||||
resources: ['**'],
|
||||
actions: ['**'],
|
||||
},
|
||||
{
|
||||
effect: 'disallow',
|
||||
resources: ['**'],
|
||||
actions: ['mqtt:read'],
|
||||
},
|
||||
],
|
||||
} satisfies Record<string, Statement[]>;
|
||||
|
||||
export { statements };
|
||||
73
tests/utils/utils.world.ts
Normal file
73
tests/utils/utils.world.ts
Normal file
@@ -0,0 +1,73 @@
|
||||
import mqtt, { connectAsync, MqttClient } from 'mqtt';
|
||||
import getPort from 'get-port';
|
||||
|
||||
import { AccessHandler } from '#root/access/access.handler.ts';
|
||||
import { type Statement } from '#root/access/access.schemas.ts';
|
||||
import { AccessTokens } from '#root/access/access.token.ts';
|
||||
import { MqttServer } from '#root/server/server.ts';
|
||||
import type { TopicDefinition } from '#root/topics/topcis.schemas.ts';
|
||||
import { TopicsHandler } from '#root/topics/topics.handler.ts';
|
||||
import { TopicsStore } from '#root/topics/topics.store.ts';
|
||||
|
||||
type CreateSocketOptions = {
|
||||
port: number;
|
||||
token: string;
|
||||
};
|
||||
|
||||
const createSocket = async (options: CreateSocketOptions) => {
|
||||
const { port, token } = options;
|
||||
const mqttClient = await connectAsync(`ws://localhost:${port}/ws`, {
|
||||
username: 'token',
|
||||
password: token,
|
||||
reconnectOnConnackError: false,
|
||||
});
|
||||
return mqttClient;
|
||||
};
|
||||
|
||||
type WorldOptions = {
|
||||
topics?: TopicDefinition[];
|
||||
};
|
||||
|
||||
const createWorld = async (options: WorldOptions) => {
|
||||
const { topics = [] } = options;
|
||||
const secret = 'test';
|
||||
const accessHandler = new AccessHandler();
|
||||
const accessTokens = new AccessTokens({
|
||||
secret,
|
||||
});
|
||||
accessHandler.register('token', accessTokens);
|
||||
const topicsHandler = new TopicsHandler();
|
||||
const topicsStore = new TopicsStore();
|
||||
topicsStore.register(...topics);
|
||||
topicsHandler.register(topicsStore);
|
||||
const server = new MqttServer({ topicsHandler, accessHandler });
|
||||
const fastify = await server.getHttpServer();
|
||||
const port = await getPort();
|
||||
await fastify.listen({ port });
|
||||
const sockets: MqttClient[] = [];
|
||||
return {
|
||||
connect: async (...clients: Statement[][]) => {
|
||||
const newSockets = await Promise.all(
|
||||
clients.map((statements) =>
|
||||
createSocket({
|
||||
port,
|
||||
token: accessTokens.generate({
|
||||
statements,
|
||||
}),
|
||||
}),
|
||||
),
|
||||
);
|
||||
sockets.push(...newSockets);
|
||||
return newSockets;
|
||||
},
|
||||
destroy: async () => {
|
||||
await Promise.all(sockets.map((s) => s.endAsync()));
|
||||
await fastify.close();
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
type World = Awaited<ReturnType<typeof createWorld>>;
|
||||
|
||||
export type { World };
|
||||
export { createWorld };
|
||||
9
vitest.config.ts
Normal file
9
vitest.config.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import { defineConfig } from 'vitest/config';
|
||||
|
||||
// eslint-disable-next-line
|
||||
export default defineConfig({
|
||||
test: {
|
||||
include: ['**/*.test.ts'],
|
||||
globals: true,
|
||||
},
|
||||
});
|
||||
Reference in New Issue
Block a user