refact: use service container

This commit is contained in:
Morten Olsen
2025-10-16 12:13:22 +02:00
parent 8e594d59fd
commit 1f4b9cab04
12 changed files with 657 additions and 143 deletions

40
src/backbone.ts Normal file
View File

@@ -0,0 +1,40 @@
import { AccessHandler } from './access/access.handler.ts';
import { K8sService } from './k8s/k8s.ts';
import { MqttServer } from './server/server.ts';
import { TopicsHandler } from './topics/topics.handler.ts';
import { Services } from './utils/services.ts';
class Backbone {
#services: Services;
constructor(services = new Services()) {
this.#services = services;
}
public get services() {
return this.#services;
}
public get server() {
return this.#services.get(MqttServer);
}
public get accessHandler() {
return this.#services.get(AccessHandler);
}
public get topicsHandler() {
return this.#services.get(TopicsHandler);
}
public get k8s() {
return this.#services.get(K8sService);
}
public setupK8sOperator = async () => {
await this.k8s.setup();
this.accessHandler.register('k8s', this.k8s.clients);
};
}
export { Backbone };

View File

@@ -1,33 +1,31 @@
import { KubernetesObjectApi, PatchStrategy, type KubeConfig, type KubernetesObject } from '@kubernetes/client-node';
import { KubernetesObjectApi, type KubernetesObject } from '@kubernetes/client-node';
import type { K8sResources } from './k8s.resources.ts';
import { K8sResources } from './k8s.resources.ts';
import type { K8sBackboneClient } from './k8s.schemas.ts';
import type { AccessProvider } from '#root/access/access.provider.ts';
import type { Statement } from '#root/access/access.schemas.ts';
type K8sClientsOptions = {
config: KubeConfig;
resources: K8sResources;
};
import type { Services } from '#root/utils/services.ts';
import { K8sConfig } from './k8s.config.ts';
type K8sClient = {
statements: Statement[];
};
class K8sClients implements AccessProvider {
#options: K8sClientsOptions;
#services: Services;
#clients: Map<string, K8sClient>;
constructor(options: K8sClientsOptions) {
constructor(services: Services) {
this.#services = services;
this.#clients = new Map();
this.#options = options;
const { clients } = options.resources;
const { clients } = services.get(K8sResources);
clients.on('updated', this.#handleClientAdded);
}
#handleClientAdded = async (manifest: KubernetesObject & { spec: K8sBackboneClient }) => {
const { resources, config } = this.#options;
const resources = this.#services.get(K8sResources);
const { config } = this.#services.get(K8sConfig);
const secretName = `${manifest.metadata?.name}-secret`;
const secret = resources.secrets.manifests.find(
(m) => m.metadata?.namespace === manifest.metadata?.namespace && m.metadata?.name === secretName,

15
src/k8s/k8s.config.ts Normal file
View File

@@ -0,0 +1,15 @@
import { KubeConfig } from '@kubernetes/client-node';
class K8sConfig {
#config?: KubeConfig;
public get config() {
if (!this.#config) {
this.#config = new KubeConfig();
this.#config.loadFromDefault();
}
return this.#config;
}
}
export { K8sConfig };

View File

@@ -1,71 +1,80 @@
import { type KubeConfig, ApiException, ApiextensionsV1Api } from '@kubernetes/client-node';
import type { Services } from '#root/utils/services.ts';
import { ApiException, ApiextensionsV1Api } from '@kubernetes/client-node';
import { z, type ZodType } from 'zod';
import { K8sConfig } from './k8s.config.ts';
type CreateCrdOptions = {
config: KubeConfig;
kind: string;
apiVersion: string;
plural?: string;
scope: 'Cluster' | 'Namespaced';
spec: ZodType;
};
const createCrd = async (options: CreateCrdOptions) => {
const { config, ...definition } = options;
const plural = definition.plural ?? definition.kind.toLowerCase() + 's';
const [version, group] = definition.apiVersion.split('/').toReversed();
const manifest = {
apiVersion: 'apiextensions.k8s.io/v1',
kind: 'CustomResourceDefinition',
metadata: {
name: `${plural}.${group}`,
},
spec: {
group: group,
names: {
kind: definition.kind,
plural: plural,
singular: definition.kind.toLowerCase(),
class K8sCrds {
#services: Services;
constructor(services: Services) {
this.#services = services;
}
install = async (definition: CreateCrdOptions) => {
const { config } = this.#services.get(K8sConfig);
const plural = definition.plural ?? definition.kind.toLowerCase() + 's';
const [version, group] = definition.apiVersion.split('/').toReversed();
const manifest = {
apiVersion: 'apiextensions.k8s.io/v1',
kind: 'CustomResourceDefinition',
metadata: {
name: `${plural}.${group}`,
},
scope: definition.scope,
versions: [
{
name: version,
served: true,
storage: true,
schema: {
openAPIV3Schema: {
type: 'object',
properties: {
spec: {
...z.toJSONSchema(definition.spec, { io: 'input' }),
$schema: undefined,
additionalProperties: undefined,
} as ExplicitAny,
spec: {
group: group,
names: {
kind: definition.kind,
plural: plural,
singular: definition.kind.toLowerCase(),
},
scope: definition.scope,
versions: [
{
name: version,
served: true,
storage: true,
schema: {
openAPIV3Schema: {
type: 'object',
properties: {
spec: {
...z.toJSONSchema(definition.spec, { io: 'input' }),
$schema: undefined,
additionalProperties: undefined,
} as ExplicitAny,
},
},
},
subresources: {
status: {},
},
},
subresources: {
status: {},
},
},
],
},
};
const extensionsApi = config.makeApiClient(ApiextensionsV1Api);
try {
await extensionsApi.createCustomResourceDefinition({
body: manifest,
});
} catch (error) {
if (error instanceof ApiException && error.code === 409) {
await extensionsApi.patchCustomResourceDefinition({
name: manifest.metadata.name,
body: [{ op: 'replace', path: '/spec', value: manifest.spec }],
],
},
};
const extensionsApi = config.makeApiClient(ApiextensionsV1Api);
try {
await extensionsApi.createCustomResourceDefinition({
body: manifest,
});
} else {
throw error;
} catch (error) {
if (error instanceof ApiException && error.code === 409) {
await extensionsApi.patchCustomResourceDefinition({
name: manifest.metadata.name,
body: [{ op: 'replace', path: '/spec', value: manifest.spec }],
});
} else {
throw error;
}
}
}
};
};
}
export { createCrd };
export { K8sCrds };

View File

@@ -1,48 +1,60 @@
import { KubeConfig, V1Secret, type KubernetesObject } from '@kubernetes/client-node';
import { V1Secret, type KubernetesObject } from '@kubernetes/client-node';
import { K8sWatcher } from './k8s.watcher.ts';
import type { K8sBackboneClient, K8sBackboneTopic } from './k8s.schemas.ts';
import type { Services } from '#root/utils/services.ts';
import { K8sConfig } from './k8s.config.ts';
class K8sResources {
#secrets: K8sWatcher<V1Secret>;
#clients: K8sWatcher<KubernetesObject & { spec: K8sBackboneClient }>;
#topics: K8sWatcher<KubernetesObject & { spec: K8sBackboneTopic }>;
#services: Services;
#secrets?: K8sWatcher<V1Secret>;
#clients?: K8sWatcher<KubernetesObject & { spec: K8sBackboneClient }>;
#topics?: K8sWatcher<KubernetesObject & { spec: K8sBackboneTopic }>;
constructor(config: KubeConfig) {
config.loadFromDefault();
this.#secrets = new K8sWatcher({
config,
apiVersion: 'v1',
kind: 'Secret',
});
this.#clients = new K8sWatcher({
config,
apiVersion: 'backbone.mortenolsen.pro/v1',
kind: 'Client',
});
this.#topics = new K8sWatcher({
config,
apiVersion: 'backbone.mortenolsen.pro/v1',
kind: 'Topic',
});
constructor(services: Services) {
this.#services = services;
}
public get secrets() {
if (!this.#secrets) {
const { config } = this.#services.get(K8sConfig);
this.#secrets = new K8sWatcher({
config,
apiVersion: 'v1',
kind: 'Secret',
});
}
return this.#secrets;
}
public get clients() {
if (!this.#clients) {
const { config } = this.#services.get(K8sConfig);
this.#clients = new K8sWatcher({
config,
apiVersion: 'backbone.mortenolsen.pro/v1',
kind: 'Client',
});
}
return this.#clients;
}
public get topics() {
return this.#clients;
if (!this.#topics) {
const { config } = this.#services.get(K8sConfig);
this.#topics = new K8sWatcher({
config,
apiVersion: 'backbone.mortenolsen.pro/v1',
kind: 'Topic',
});
}
return this.#topics;
}
public start = async () => {
await this.#secrets.start();
await this.#clients.start();
await this.#topics.start();
await this.secrets.start();
await this.clients.start();
await this.topics.start();
};
}

View File

@@ -1,51 +1,43 @@
import { KubeConfig } from '@kubernetes/client-node';
import { K8sResources } from './k8s.resources.ts';
import { createCrd } from './k8s.crd.ts';
import { K8sCrds } from './k8s.crd.ts';
import { k8sBackboneClientSchema, k8sBackboneTopicSchema } from './k8s.schemas.ts';
import { K8sClients } from './k8s.clients.ts';
import { API_VERSION } from '#root/utils/consts.ts';
import type { Services } from '#root/utils/services.ts';
class K8sService {
#config: KubeConfig;
#resources: K8sResources;
#clients: K8sClients;
#services: Services;
constructor() {
this.#config = new KubeConfig();
this.#config.loadFromDefault();
this.#resources = new K8sResources(this.#config);
this.#clients = new K8sClients({
config: this.#config,
resources: this.resources,
});
constructor(services: Services) {
this.#services = services;
}
public get resources() {
return this.#resources;
return this.#services.get(K8sResources);
}
public get clients() {
return this.#clients;
return this.#services.get(K8sClients);
}
public setup = async () => {
await createCrd({
config: this.#config,
const crds = this.#services.get(K8sCrds);
await crds.install({
apiVersion: API_VERSION,
kind: 'Client',
scope: 'Namespaced',
spec: k8sBackboneClientSchema,
});
await createCrd({
config: this.#config,
await crds.install({
apiVersion: API_VERSION,
kind: 'Topic',
scope: 'Namespaced',
spec: k8sBackboneTopicSchema,
});
await this.#resources.start();
await this.resources.start();
};
}

View File

@@ -16,8 +16,9 @@ import { createWebSocketStream } from 'ws';
import { Session } from '../access/access.session.ts';
import { api } from '../api/api.ts';
import type { AccessHandler } from '#root/access/access.handler.ts';
import type { TopicsHandler } from '#root/topics/topics.handler.ts';
import { AccessHandler } from '#root/access/access.handler.ts';
import { TopicsHandler } from '#root/topics/topics.handler.ts';
import type { Services } from '#root/utils/services.ts';
type Aedes = ReturnType<typeof aedes.createBroker>;
@@ -30,23 +31,18 @@ declare module 'aedes' {
const packetMetaSymbol = Symbol('packetMeta');
type MqttServerOptions = {
accessHandler: AccessHandler;
topicsHandler: TopicsHandler;
};
class AuthError extends Error {
public readonly returnCode = 4;
}
class MqttServer {
#options: MqttServerOptions;
#services: Services;
#server: Aedes;
#http?: Promise<FastifyInstance>;
#tcp?: tcp.Server;
constructor(options: MqttServerOptions) {
this.#options = options;
constructor(services: Services) {
this.#services = services;
this.#server = aedes.createBroker({
authenticate: this.#authenticate,
authorizePublish: this.#authorizePublish,
@@ -61,7 +57,7 @@ class MqttServer {
if (!username || !password) {
throw new Error('unauthorized');
}
const { accessHandler } = this.#options;
const accessHandler = this.#services.get(AccessHandler);
const auth = await accessHandler.validate(username, password.toString('utf8'));
client.session = new Session(auth);
callback(null, true);
@@ -71,7 +67,7 @@ class MqttServer {
};
#authorizePublish: AuthorizePublishHandler = (client, packet, callback) => {
const { topicsHandler } = this.#options;
const topicsHandler = this.#services.get(TopicsHandler);
(packet as ExplicitAny)[packetMetaSymbol] = {
foo: 'bar',
};

View File

@@ -1,21 +1,11 @@
import { AccessHandler } from './access/access.handler.ts';
import { K8sService } from './k8s/k8s.ts';
import { MqttServer } from './server/server.ts';
import { TopicsHandler } from './topics/topics.handler.ts';
import { Backbone } from './backbone.ts';
const accessHandler = new AccessHandler();
const topicsHandler = new TopicsHandler();
const backbone = new Backbone();
await backbone.setupK8sOperator();
const k8s = new K8sService();
await k8s.setup();
accessHandler.register('k8s', k8s.clients);
const server = new MqttServer({
accessHandler,
topicsHandler,
});
const tcp = server.getTcpServer();
const tcp = backbone.server.getTcpServer();
tcp.listen(1883);
const http = await server.getHttpServer();
const http = await backbone.server.getHttpServer();
http.listen({ port: 8883 });
console.log('started');

51
src/utils/services.ts Normal file
View File

@@ -0,0 +1,51 @@
const destroy = Symbol('destroy');
const instanceKey = Symbol('instances');
type ServiceDependency<T> = new (services: Services) => T & {
[destroy]?: () => Promise<void> | void;
};
class Services {
[instanceKey]: Map<ServiceDependency<unknown>, unknown>;
constructor() {
this[instanceKey] = new Map();
}
public get = <T>(service: ServiceDependency<T>) => {
if (!this[instanceKey].has(service)) {
this[instanceKey].set(service, new service(this));
}
const instance = this[instanceKey].get(service);
if (!instance) {
throw new Error('Could not generate instance');
}
return instance as T;
};
public set = <T>(service: ServiceDependency<T>, instance: Partial<T>) => {
this[instanceKey].set(service, instance);
};
public clone = () => {
const services = new Services();
services[instanceKey] = Object.fromEntries(this[instanceKey].entries());
};
public destroy = async () => {
await Promise.all(
this[instanceKey].values().map(async (instance) => {
if (
typeof instance === 'object' &&
instance &&
destroy in instance &&
typeof instance[destroy] === 'function'
) {
await instance[destroy]();
}
}),
);
};
}
export { Services, destroy };