lot more stuff

This commit is contained in:
Morten Olsen
2025-08-04 23:44:14 +02:00
parent daf0ea21bb
commit 757b2fcfac
185 changed files with 115899 additions and 1874 deletions

View File

@@ -1,34 +0,0 @@
class ConfigService {
public get istio() {
const gateway = process.env.ISTIO_GATEWAY;
if (!gateway) {
throw new Error('ISTIO_GATEWAY must be set');
}
return {
gateway: process.env.ISTIO_GATEWAY,
};
}
public get certManager() {
const certManager = process.env.CERT_MANAGER;
if (!certManager) {
throw new Error('CERT_MANAGER must be set');
}
return certManager;
}
public get postgres() {
const host = process.env.POSTGRES_HOST;
const user = process.env.POSTGRES_USER;
const password = process.env.POSTGRES_PASSWORD;
const port = process.env.POSTGRES_PORT ? parseInt(process.env.POSTGRES_PORT, 10) : 5432;
if (!host || !user || !password) {
throw new Error('POSTGRES_HOST, POSTGRES_USER, and POSTGRES_PASSWORD must be set');
}
return { host, user, password, port };
}
}
export { ConfigService };

View File

@@ -0,0 +1,91 @@
import { EventEmitter } from 'eventemitter3';
import equal from 'deep-equal';
import type { CustomResource } from './custom-resources.custom-resource.ts';
import type { CustomResourceStatus } from './custom-resources.types.ts';
type CustomResourceStatusOptions = {
resource: CustomResource<ExpectedAny>;
};
type CustomResourceConditionsEvents = {
changed: (type: string, condition: Condition) => void;
};
type Condition = {
lastTransitionTime: Date;
status: 'True' | 'False' | 'Unknown';
syncing?: boolean;
failed?: boolean;
resource?: boolean;
reason?: string;
message?: string;
observedGeneration?: number;
};
class CustomResourceConditions extends EventEmitter<CustomResourceConditionsEvents> {
#options: CustomResourceStatusOptions;
#conditions: Record<string, Condition>;
constructor(options: CustomResourceStatusOptions) {
super();
this.#options = options;
this.#conditions = Object.fromEntries(
(options.resource.status?.conditions || []).map(({ type, lastTransitionTime, ...condition }) => [
type,
{
...condition,
lastTransitionTime: new Date(lastTransitionTime),
},
]),
);
options.resource.on('changed', this.#handleChange);
}
#handleChange = () => {
const { resource } = this.#options;
for (const { type, ...condition } of resource.status?.conditions || []) {
const next = {
...condition,
lastTransitionTime: new Date(condition.lastTransitionTime),
};
const current = this.#conditions[type];
const isEqual = equal(current, next);
const isNewer = !current || next.lastTransitionTime > current.lastTransitionTime;
if (isEqual || !isNewer) {
return;
}
this.#conditions[type] = next;
this.emit('changed', type, next);
}
};
public get = (type: string): Condition | undefined => {
return this.#conditions[type];
};
public set = async (type: string, condition: Omit<Condition, 'lastTransitionTime'>) => {
const current = this.#conditions[type];
this.#conditions[type] = {
...condition,
lastTransitionTime: current && current.status === condition.status ? current.lastTransitionTime : new Date(),
observedGeneration: this.#options.resource.metadata?.generation,
};
await this.save();
};
public save = async () => {
const { resource } = this.#options;
const status: CustomResourceStatus = {
conditions: Object.entries(this.#conditions).map(([type, condition]) => ({
...condition,
type,
lastTransitionTime: condition.lastTransitionTime.toISOString(),
})),
};
await resource.patchStatus(status);
};
}
export { CustomResourceConditions };

View File

@@ -0,0 +1,211 @@
import type { z, ZodObject } from 'zod';
import { ApiException, PatchStrategy, setHeaderOptions, type KubernetesObject } from '@kubernetes/client-node';
import { EventEmitter } from 'eventemitter3';
import type { Resource } from '../resources/resources.resource.ts';
import type { Services } from '../../utils/service.ts';
import { K8sService } from '../k8s/k8s.ts';
import { CoalescingQueued } from '../../utils/queues.ts';
import type { CustomResourceDefinition, CustomResourceStatus } from './custom-resources.types.ts';
import { CustomResourceConditions } from './custom-resources.conditions.ts';
type CustomResourceObject<TSpec extends ZodObject> = KubernetesObject & {
spec: z.infer<TSpec>;
status?: CustomResourceStatus;
};
type CustomResourceOptions<TSpec extends ZodObject> = {
resource: Resource<CustomResourceObject<TSpec>>;
services: Services;
definition: CustomResourceDefinition<TSpec>;
};
type CustomResourceEvents<TSpec extends ZodObject> = {
changed: () => void;
changedStatus: (options: { previous: CustomResourceStatus; next: CustomResourceStatus }) => void;
changedMetadate: (options: { previous: KubernetesObject['metadata']; next: KubernetesObject['metadata'] }) => void;
changedSpec: (options: { previous: z.infer<TSpec>; next: z.infer<TSpec> }) => void;
};
type SubresourceResult = {
ready: boolean;
syncing?: boolean;
failed?: boolean;
reason?: string;
message?: string;
};
abstract class CustomResource<TSpec extends ZodObject> extends EventEmitter<CustomResourceEvents<TSpec>> {
#options: CustomResourceOptions<TSpec>;
#conditions: CustomResourceConditions;
#queue: CoalescingQueued<void>;
constructor(options: CustomResourceOptions<TSpec>) {
super();
this.#options = options;
this.#conditions = new CustomResourceConditions({
resource: this,
});
options.resource.on('changed', this.#handleChanged);
this.#queue = new CoalescingQueued({
action: async () => {
console.log('Reconcileing', this.apiVersion, this.kind, this.namespace, this.name);
await this.reconcile?.();
},
});
}
public get conditions() {
return this.#conditions;
}
public get names() {
return this.#options.definition.names;
}
public get services() {
const { services } = this.#options;
return services;
}
public get resource() {
const { resource } = this.#options;
return resource;
}
public get apiVersion() {
const apiVersion = this.resource.apiVersion;
if (!apiVersion) {
throw new Error('Custom resources needs an apiVersion');
}
return apiVersion;
}
public get kind() {
const kind = this.resource.kind;
if (!kind) {
throw new Error('Custom resources needs a kind');
}
return kind;
}
public get metadata() {
const metadata = this.resource.metadata;
if (!metadata) {
throw new Error('Custom resources needs metadata');
}
return metadata;
}
public get name() {
const name = this.metadata.name;
if (!name) {
throw new Error('Custom resources needs a name');
}
return name;
}
public get namespace() {
const namespace = this.metadata.namespace;
if (!namespace) {
throw new Error('Custom resources needs a namespace');
}
return namespace;
}
public get exists() {
return this.resource.exists;
}
public get ref() {
return this.resource.ref;
}
public get spec(): z.infer<TSpec> {
return this.resource.spec as ExpectedAny;
}
public get status() {
return this.resource.manifest?.status;
}
public get isSeen() {
return this.metadata.generation === this.status?.observedGeneration;
}
public get isValidSpec() {
const { success } = this.#options.definition.spec.safeParse(this.spec);
return success;
}
public setup?: () => Promise<void>;
public reconcile?: () => Promise<void>;
public markSeen = async () => {
if (this.isSeen) {
return;
}
await this.patchStatus({
observedGeneration: this.metadata.generation,
});
};
public queueReconcile = async () => {
return this.#queue.run();
};
#handleChanged = () => {
this.emit('changed');
};
public reconcileSubresource = async (name: string, action: () => Promise<SubresourceResult>) => {
try {
const result = await action();
await this.conditions.set(name, {
status: result.ready ? 'True' : 'False',
syncing: result.syncing,
failed: result.failed ?? false,
resource: true,
reason: result.reason,
message: result.message,
});
} catch (err) {
console.error(err);
await this.conditions.set(name, {
status: 'False',
failed: true,
reason: 'Failed',
resource: true,
message: err instanceof Error ? err.message : String(err),
});
}
};
public patchStatus = async (status: Partial<CustomResourceStatus>) => {
const k8s = this.services.get(K8sService);
const [group, version] = this.apiVersion?.split('/') || [];
try {
await k8s.customObjectsApi.patchNamespacedCustomObjectStatus(
{
group,
version,
plural: this.names.plural,
name: this.name,
namespace: this.namespace,
body: {
status,
},
},
setHeaderOptions('Content-Type', PatchStrategy.MergePatch),
);
} catch (err) {
if (err instanceof ApiException && err.code === 404) {
return;
}
throw err;
}
};
}
export { CustomResource, type CustomResourceOptions, type CustomResourceObject, type SubresourceResult };

View File

@@ -0,0 +1,128 @@
import { ApiException, type KubernetesObject } from '@kubernetes/client-node';
import type { ZodObject } from 'zod';
import type { Services } from '../../utils/service.ts';
import type { Resource } from '../resources/resources.resource.ts';
import { WatcherService } from '../watchers/watchers.ts';
import { K8sService } from '../k8s/k8s.ts';
import { Queue } from '../queue/queue.ts';
import type { CustomResourceDefinition } from './custom-resources.types.ts';
import type { CustomResource } from './custom-resources.custom-resource.ts';
import { createManifest } from './custom-resources.utils.ts';
type DefinitionItem = {
definition: CustomResourceDefinition<ExpectedAny>;
queue: Queue;
};
class CustomResourceService {
#services: Services;
#definitions: DefinitionItem[];
#resources: Map<string, CustomResource<ExpectedAny>>;
constructor(services: Services) {
this.#definitions = [];
this.#resources = new Map();
this.#services = services;
}
#handleChanged = async (resource: Resource<KubernetesObject>) => {
const uid = resource.metadata?.uid;
if (!uid) {
return;
}
let current = this.#resources.get(uid);
if (!current) {
const entry = this.#definitions.find(
({ definition: r }) =>
r.version === resource.version &&
r.group === resource.group &&
r.version === resource.version &&
r.kind === resource.kind,
);
if (!entry) {
return;
}
const { definition } = entry;
current = definition.create({
resource: resource as Resource<ExpectedAny>,
services: this.#services,
definition,
});
this.#resources.set(uid, current);
await current.setup?.();
if (!current.isSeen) {
await current.markSeen();
}
await current.queueReconcile();
} else if (!current.isSeen) {
await current.markSeen();
await current.queueReconcile();
}
};
public register = (...resources: CustomResourceDefinition<ExpectedAny>[]) => {
this.#definitions.push(
...resources.map((definition) => ({
definition,
queue: new Queue(),
})),
);
};
public install = async (replace = false) => {
const k8sService = this.#services.get(K8sService);
for (const { definition: crd } of this.#definitions) {
this.#services.log.info('Installing CRD', { kind: crd.kind });
try {
const manifest = createManifest(crd);
try {
await k8sService.extensionsApi.createCustomResourceDefinition({
body: manifest,
});
} catch (error) {
if (error instanceof ApiException && error.code === 409) {
if (replace) {
await k8sService.extensionsApi.patchCustomResourceDefinition({
name: manifest.metadata.name,
body: [{ op: 'replace', path: '/spec', value: manifest.spec }],
});
}
continue;
}
throw error;
}
} catch (error) {
if (error instanceof ApiException) {
throw new Error(`Failed to install ${crd.kind}: ${error.body}`);
}
throw error;
}
}
};
public watch = async () => {
const watcherService = this.#services.get(WatcherService);
for (const { definition, queue } of this.#definitions) {
const watcher = watcherService.create({
path: `/apis/${definition.group}/${definition.version}/${definition.names.plural}`,
list: (k8s) =>
k8s.customObjectsApi.listCustomObjectForAllNamespaces({
version: definition.version,
group: definition.group,
plural: definition.names.plural,
}),
verbs: ['add', 'update', 'delete'],
});
watcher.on('changed', (resource) => {
queue.add(() => this.#handleChanged(resource));
});
await watcher.start();
}
};
}
const createCustomResourceDefinition = <TSpec extends ZodObject>(options: CustomResourceDefinition<TSpec>) => options;
export { CustomResourceService, createCustomResourceDefinition };

View File

@@ -0,0 +1,38 @@
import { z, type ZodObject } from 'zod';
import type { CustomResource, CustomResourceOptions } from './custom-resources.custom-resource.ts';
type CustomResourceDefinition<TSpec extends ZodObject> = {
group: string;
version: string;
kind: string;
names: {
plural: string;
singular: string;
};
spec: TSpec;
create: (options: CustomResourceOptions<TSpec>) => CustomResource<TSpec>;
};
const customResourceStatusSchema = z.object({
observedGeneration: z.number().optional(),
conditions: z
.array(
z.object({
observedGeneration: z.number().optional(),
type: z.string(),
status: z.enum(['True', 'False', 'Unknown']),
lastTransitionTime: z.string().datetime(),
resource: z.boolean().optional(),
failed: z.boolean().optional(),
syncing: z.boolean().optional(),
reason: z.string().optional().optional(),
message: z.string().optional().optional(),
}),
)
.optional(),
});
type CustomResourceStatus = z.infer<typeof customResourceStatusSchema>;
export { customResourceStatusSchema, type CustomResourceDefinition, type CustomResourceStatus };

View File

@@ -0,0 +1,51 @@
import { z } from 'zod';
import { customResourceStatusSchema, type CustomResourceDefinition } from './custom-resources.types.ts';
const createManifest = (defintion: CustomResourceDefinition<ExpectedAny>) => {
return {
apiVersion: 'apiextensions.k8s.io/v1',
kind: 'CustomResourceDefinition',
metadata: {
name: `${defintion.names.plural}.${defintion.group}`,
},
spec: {
group: defintion.group,
names: {
kind: defintion.kind,
plural: defintion.names.plural,
singular: defintion.names.singular,
},
scope: 'Namespaced',
versions: [
{
name: defintion.version,
served: true,
storage: true,
schema: {
openAPIV3Schema: {
type: 'object',
properties: {
spec: {
...z.toJSONSchema(defintion.spec.strict(), { io: 'input' }),
$schema: undefined,
additionalProperties: undefined,
} as ExpectedAny,
status: {
...z.toJSONSchema(customResourceStatusSchema.strict(), { io: 'input' }),
$schema: undefined,
additionalProperties: undefined,
} as ExpectedAny,
},
},
},
subresources: {
status: {},
},
},
],
},
};
};
export { createManifest };

View File

@@ -0,0 +1,48 @@
import type { V1Deployment } from '@kubernetes/client-node';
import type { Services } from '../../utils/service.ts';
import { ResourceReference } from '../resources/resources.ref.ts';
import type { Watcher } from '../watchers/watchers.watcher.ts';
import { WatcherService } from '../watchers/watchers.ts';
import type { Resource } from '../resources/resources.ts';
const ISTIO_APP_SELECTOR = 'istio=ingress';
class IstioService {
#gatewayResource: ResourceReference<V1Deployment>;
#gatewayWatcher: Watcher<V1Deployment>;
constructor(services: Services) {
this.#gatewayResource = new ResourceReference<V1Deployment>();
const watcherService = services.get(WatcherService);
this.#gatewayWatcher = watcherService.create({
path: '/apis/apps/v1/deployments',
list: async (k8s) => {
return await k8s.apps.listDeploymentForAllNamespaces({
labelSelector: ISTIO_APP_SELECTOR,
});
},
transform: (manifest) => ({
apiVersion: 'apps/v1',
kind: 'Deployment',
...manifest,
}),
verbs: ['add', 'update', 'delete'],
});
this.#gatewayWatcher.on('changed', this.#handleChange);
}
#handleChange = (resource: Resource<V1Deployment>) => {
this.#gatewayResource.current = resource;
};
public get gateway() {
return this.#gatewayResource;
}
public start = async () => {
await this.#gatewayWatcher.start();
};
}
export { IstioService };

View File

@@ -1,158 +0,0 @@
import {
KubeConfig,
CoreV1Api,
ApiextensionsV1Api,
CustomObjectsApi,
EventsV1Api,
KubernetesObjectApi,
ApiException,
PatchStrategy,
} from '@kubernetes/client-node';
import type { Services } from '../utils/service.ts';
import { Manifest } from './k8s/k8s.manifest.ts';
class K8sService {
#services: Services;
#kc: KubeConfig;
#k8sApi: CoreV1Api;
#k8sExtensionsApi: ApiextensionsV1Api;
#k8sCustomObjectsApi: CustomObjectsApi;
#k8sEventsApi: EventsV1Api;
#k8sObjectsApi: KubernetesObjectApi;
constructor(services: Services) {
this.#services = services;
this.#kc = new KubeConfig();
this.#kc.loadFromDefault();
this.#k8sApi = this.#kc.makeApiClient(CoreV1Api);
this.#k8sExtensionsApi = this.#kc.makeApiClient(ApiextensionsV1Api);
this.#k8sCustomObjectsApi = this.#kc.makeApiClient(CustomObjectsApi);
this.#k8sEventsApi = this.#kc.makeApiClient(EventsV1Api);
this.#k8sObjectsApi = this.#kc.makeApiClient(KubernetesObjectApi);
}
public get config() {
return this.#kc;
}
public get api() {
return this.#k8sApi;
}
public get extensionsApi() {
return this.#k8sExtensionsApi;
}
public get customObjectsApi() {
return this.#k8sCustomObjectsApi;
}
public get eventsApi() {
return this.#k8sEventsApi;
}
public get objectsApi() {
return this.#k8sObjectsApi;
}
public exists = async (options: { apiVersion: string; kind: string; name: string; namespace?: string }) => {
try {
await this.objectsApi.read({
apiVersion: options.apiVersion,
kind: options.kind,
metadata: {
name: options.name,
namespace: options.namespace,
},
});
return true;
} catch (err) {
if (!(err instanceof ApiException && err.code === 404)) {
throw err;
}
return false;
}
};
public get = async <T>(options: { apiVersion: string; kind: string; name: string; namespace?: string }) => {
try {
const manifest = await this.objectsApi.read({
apiVersion: options.apiVersion,
kind: options.kind,
metadata: {
name: options.name,
namespace: options.namespace,
},
});
return new Manifest<T>({
manifest,
services: this.#services,
});
} catch (err) {
if (!(err instanceof ApiException && err.code === 404)) {
throw err;
}
return undefined;
}
};
public upsert = async (obj: ExpectedAny) => {
let current: unknown;
try {
current = await this.objectsApi.read({
apiVersion: obj.apiVersion,
kind: obj.kind,
metadata: {
name: obj.metadata.name,
namespace: obj.metadata.namespace,
},
});
} catch (error) {
if (!(error instanceof ApiException && error.code === 404)) {
throw error;
}
}
if (current) {
return new Manifest({
manifest: await this.objectsApi.patch(
obj,
undefined,
undefined,
undefined,
undefined,
PatchStrategy.MergePatch,
),
services: this.#services,
});
} else {
return new Manifest({
manifest: await this.objectsApi.create(obj),
services: this.#services,
});
}
};
public getSecret = async <T extends Record<string, string>>(name: string, namespace?: string) => {
const current = await this.get<ExpectedAny>({
apiVersion: 'v1',
kind: 'Secret',
name,
namespace,
});
if (!current) {
return undefined;
}
const { data } = current.manifest || {};
const decodedData = Object.fromEntries(
Object.entries(data).map(([key, value]) => [key, Buffer.from(String(value), 'base64').toString('utf-8')]),
);
return decodedData as T;
};
}
export { K8sService };

View File

@@ -1,183 +0,0 @@
import { ApiException, PatchStrategy, V1MicroTime } from '@kubernetes/client-node';
import type { Services } from '../../utils/service.ts';
import { K8sService } from '../k8s.ts';
import { GROUP } from '../../utils/consts.ts';
import { CustomResourceRegistry } from '../../custom-resource/custom-resource.registry.ts';
type ManifestOptions = {
manifest: ExpectedAny;
services: Services;
};
type ManifestMetadata = Record<string, string> & {
name: string;
namespace?: string;
labels?: Record<string, string>;
annotations?: Record<string, string>;
uid: string;
resourceVersion: string;
creationTimestamp: string;
generation: number;
};
type EventOptions = {
reason: string;
message: string;
action: string;
type: 'Normal' | 'Warning' | 'Error';
};
class Manifest<TSpec> {
#options: ManifestOptions;
constructor(options: ManifestOptions) {
this.#options = {
...options,
manifest: options.manifest,
};
}
public get objectRef() {
return {
apiVersion: this.apiVersion,
kind: this.kind,
name: this.metadata.name,
uid: this.metadata.uid,
namespace: this.metadata.namespace,
};
}
public get services(): Services {
return this.#options.services;
}
public get manifest() {
return this.#options.manifest;
}
protected set manifest(obj: ExpectedAny) {
this.#options.manifest = obj;
}
public get dependencyId() {
return `${this.metadata.uid}-${this.metadata.generation}`;
}
public get kind(): string {
return this.#options.manifest.kind;
}
public get apiVersion(): string {
return this.#options.manifest.apiVersion;
}
public get spec(): TSpec {
return this.#options.manifest.spec;
}
public get metadata(): ManifestMetadata {
return this.#options.manifest.metadata;
}
public isOwnerOf = (manifest: ExpectedAny) => {
const ownerRef = manifest?.metadata?.ownerReferences || [];
return ownerRef.some(
(ref: ExpectedAny) =>
ref.apiVersion === this.apiVersion &&
ref.kind === this.kind &&
ref.name === this.metadata.name &&
ref.uid === this.metadata.uid,
);
};
public addEvent = async (event: EventOptions) => {
const { manifest, services } = this.#options;
const k8sService = services.get(K8sService);
await k8sService.eventsApi.createNamespacedEvent({
namespace: manifest.metadata.namespace,
body: {
kind: 'Event',
metadata: {
name: `${manifest.metadata.name}-${Date.now()}-${Buffer.from(crypto.getRandomValues(new Uint8Array(8))).toString('hex')}`,
namespace: manifest.metadata.namespace,
},
eventTime: new V1MicroTime(),
note: event.message,
action: event.action,
reason: event.reason,
type: event.type,
reportingController: GROUP,
reportingInstance: manifest.metadata.name,
regarding: {
apiVersion: manifest.apiVersion,
resourceVersion: manifest.metadata.resourceVersion,
kind: manifest.kind,
name: manifest.metadata.name,
namespace: manifest.metadata.namespace,
uid: manifest.metadata.uid,
},
},
});
};
public patch = async (manifest: ExpectedAny) => {
const { services } = this.#options;
const k8sService = services.get(K8sService);
this.manifest = await k8sService.objectsApi.patch(
{
apiVersion: this.apiVersion,
kind: this.kind,
metadata: {
name: this.metadata.name,
namespace: this.metadata.namespace,
ownerReferences: this.metadata.ownerReferences,
...manifest.metadata,
labels: {
...this.metadata.labels,
...(manifest.metadata?.label || {}),
},
annotations: {
...this.metadata.annotations,
...(manifest.metadata?.annotations || {}),
},
},
spec: manifest.spec || this.spec,
},
undefined,
undefined,
undefined,
undefined,
PatchStrategy.MergePatch,
);
};
public update = async () => {
const { manifest, services } = this.#options;
const k8sService = services.get(K8sService);
const registry = services.get(CustomResourceRegistry);
const crd = registry.getByKind(manifest.kind);
if (!crd) {
throw new Error(`Custom resource ${manifest.kind} not found`);
}
try {
const resource = await k8sService.objectsApi.read({
apiVersion: this.apiVersion,
kind: this.kind,
metadata: {
name: this.metadata.name,
namespace: this.metadata.namespace,
},
});
this.#options.manifest = resource;
} catch (error) {
if (error instanceof ApiException && error.code === 404) {
return undefined;
}
throw error;
}
};
}
export { Manifest };

61
src/services/k8s/k8s.ts Normal file
View File

@@ -0,0 +1,61 @@
import {
KubeConfig,
CoreV1Api,
ApiextensionsV1Api,
CustomObjectsApi,
EventsV1Api,
KubernetesObjectApi,
ApiException,
AppsV1Api,
} from '@kubernetes/client-node';
class K8sService {
#kc: KubeConfig;
#k8sApi: CoreV1Api;
#k8sExtensionsApi: ApiextensionsV1Api;
#k8sCustomObjectsApi: CustomObjectsApi;
#k8sEventsApi: EventsV1Api;
#k8sObjectsApi: KubernetesObjectApi;
#k8sAppsApi: AppsV1Api;
constructor() {
this.#kc = new KubeConfig();
this.#kc.loadFromDefault();
this.#k8sApi = this.#kc.makeApiClient(CoreV1Api);
this.#k8sExtensionsApi = this.#kc.makeApiClient(ApiextensionsV1Api);
this.#k8sCustomObjectsApi = this.#kc.makeApiClient(CustomObjectsApi);
this.#k8sEventsApi = this.#kc.makeApiClient(EventsV1Api);
this.#k8sObjectsApi = this.#kc.makeApiClient(KubernetesObjectApi);
this.#k8sAppsApi = this.#kc.makeApiClient(AppsV1Api);
}
public get config() {
return this.#kc;
}
public get api() {
return this.#k8sApi;
}
public get extensionsApi() {
return this.#k8sExtensionsApi;
}
public get customObjectsApi() {
return this.#k8sCustomObjectsApi;
}
public get eventsApi() {
return this.#k8sEventsApi;
}
public get objectsApi() {
return this.#k8sObjectsApi;
}
public get apps() {
return this.#k8sAppsApi;
}
}
export { K8sService };

View File

@@ -0,0 +1,60 @@
import knex, { type Knex } from 'knex';
import { Services } from '../../utils/service.ts';
import type { PostgresDatabase, PostgresRole } from './postgres.types.ts';
type PostgresInstanceOptions = {
services: Services;
host: string;
port?: number;
user: string;
password: string;
};
class PostgresInstance {
#db: Knex;
constructor(options: PostgresInstanceOptions) {
this.#db = knex({
client: 'pg',
connection: {
host: process.env.FORCE_PG_HOST ?? options.host,
user: process.env.FORCE_PG_USER ?? options.user,
password: process.env.FORCE_PG_PASSWORD ?? options.password,
port: process.env.FORCE_PG_PORT ? parseInt(process.env.FORCE_PG_PORT) : options.port,
},
});
}
public ping = async () => {
try {
await this.#db.raw('SELECT 1');
return true;
} catch {
return false;
}
};
public upsertRole = async (role: PostgresRole) => {
const existingRole = await this.#db.raw('SELECT 1 FROM pg_roles WHERE rolname = ?', [role.name]);
if (existingRole.rows.length === 0) {
await this.#db.raw(`CREATE ROLE ${role.name} WITH LOGIN PASSWORD '${role.password}'`);
} else {
await this.#db.raw(`ALTER ROLE ${role.name} WITH PASSWORD '${role.password}'`);
}
};
public upsertDatabase = async (database: PostgresDatabase) => {
const existingDatabase = await this.#db.raw('SELECT * FROM pg_database WHERE datname = ?', [database.name]);
if (existingDatabase.rows.length === 0) {
await this.#db.raw(`CREATE DATABASE ${database.name} OWNER ${database.owner}`);
} else {
await this.#db.raw(`ALTER DATABASE ${database.name} OWNER TO ${database.owner}`);
}
};
}
export { PostgresInstance, type PostgresInstanceOptions };

View File

@@ -1,52 +1,19 @@
import knex, { type Knex } from 'knex';
import { Services } from '../../utils/service.ts';
import { ConfigService } from '../config/config.ts';
import type { PostgresDatabase, PostgresRole } from './postgres.types.ts';
import { PostgresInstance, type PostgresInstanceOptions } from './postgres.instance.ts';
class PostgresService {
#db: Knex;
#services: Services;
constructor(services: Services) {
this.#services = services;
const configService = services.get(ConfigService);
const config = configService.postgres;
this.#db = knex({
client: 'pg',
connection: {
host: config.host,
user: config.user,
password: config.password,
port: config.port,
},
}
public get = (options: Omit<PostgresInstanceOptions, 'services'>) => {
return new PostgresInstance({
...options,
services: this.#services,
});
}
public get config() {
const configService = this.#services.get(ConfigService);
return configService.postgres;
}
public upsertRole = async (role: PostgresRole) => {
const existingRole = await this.#db.raw('SELECT 1 FROM pg_roles WHERE rolname = ?', [role.name]);
if (existingRole.rows.length === 0) {
await this.#db.raw(`CREATE ROLE ${role.name} WITH LOGIN PASSWORD '${role.password}'`);
} else {
await this.#db.raw(`ALTER ROLE ${role.name} WITH PASSWORD '${role.password}'`);
}
};
public upsertDatabase = async (database: PostgresDatabase) => {
const existingDatabase = await this.#db.raw('SELECT * FROM pg_database WHERE datname = ?', [database.name]);
if (existingDatabase.rows.length === 0) {
await this.#db.raw(`CREATE DATABASE ${database.name} OWNER ${database.owner}`);
} else {
await this.#db.raw(`ALTER DATABASE ${database.name} OWNER TO ${database.owner}`);
}
};
}

View File

@@ -0,0 +1,40 @@
import PQueue from 'p-queue';
import pRetry from 'p-retry';
type QueueCreateOptions = ConstructorParameters<typeof PQueue>[0];
type QueueAddOptions = Parameters<typeof pRetry>[1] & {
retries?: number;
};
type QueueOptions = QueueCreateOptions & {
retries?: number;
};
class Queue {
#options: QueueOptions;
#queue: PQueue;
constructor(options: QueueOptions = {}) {
this.#options = options;
this.#queue = new PQueue(options);
}
public get concurrency() {
return this.#queue.concurrency;
}
public set concurrency(value: number) {
this.#queue.concurrency = value;
}
public add = async <T>(task: () => Promise<T>, options: QueueAddOptions = {}) => {
const withRetry = () =>
pRetry(task, {
retries: options.retries || this.#options.retries || 1,
});
return this.#queue.add(withRetry);
};
}
export { Queue };

View File

@@ -0,0 +1,81 @@
import type { KubernetesObject } from '@kubernetes/client-node';
import { EventEmitter } from 'eventemitter3';
import type { Resource } from './resources.ts';
import type { ResourceEvents } from './resources.resource.ts';
type ResourceReferenceEvents<T extends KubernetesObject> = ResourceEvents<T> & {
replaced: (options: { previous: Resource<T> | undefined; next: Resource<T> | undefined }) => void;
};
class ResourceReference<T extends KubernetesObject = KubernetesObject> extends EventEmitter<
ResourceReferenceEvents<T>
> {
#current?: Resource<T>;
#updatedEvent: ResourceEvents<T>['updated'];
#changedEvent: ResourceEvents<T>['changed'];
#changedMetadateEvent: ResourceEvents<T>['changedMetadate'];
#changedSpecEvent: ResourceEvents<T>['changedSpec'];
#changedStatusEvent: ResourceEvents<T>['changedStatus'];
#deletedEvent: ResourceEvents<T>['deleted'];
constructor(current?: Resource<T>) {
super();
this.#updatedEvent = this.emit.bind(this, 'updated');
this.#changedEvent = this.emit.bind(this, 'changed');
this.#changedMetadateEvent = this.emit.bind(this, 'changedMetadate');
this.#changedSpecEvent = this.emit.bind(this, 'changedSpec');
this.#changedStatusEvent = this.emit.bind(this, 'changedStatus');
this.#deletedEvent = this.emit.bind(this, 'deleted');
this.current = current;
}
public get current() {
return this.#current;
}
public set current(next: Resource<T> | undefined) {
const previous = this.#current;
if (next === previous) {
return;
}
if (this.#current) {
this.#current.off('updated', this.#updatedEvent);
this.#current.off('changed', this.#changedEvent);
this.#current.off('changedMetadate', this.#changedMetadateEvent);
this.#current.off('changedSpec', this.#changedSpecEvent);
this.#current.off('changedStatus', this.#changedStatusEvent);
this.#current.off('deleted', this.#deletedEvent);
}
if (next) {
next.on('updated', this.#updatedEvent);
next.on('changed', this.#changedEvent);
next.on('changedMetadate', this.#changedMetadateEvent);
next.on('changedSpec', this.#changedSpecEvent);
next.on('changedStatus', this.#changedStatusEvent);
next.on('deleted', this.#deletedEvent);
}
this.#current = next;
this.emit('replaced', {
previous,
next,
});
this.emit('changedStatus', {
previous: previous && 'status' in previous ? (previous.status as ExpectedAny) : undefined,
next: next && 'status' in next ? (next.status as ExpectedAny) : undefined,
});
this.emit('changedMetadate', {
previous: previous && 'metadata' in previous ? (previous.metadata as ExpectedAny) : undefined,
next: next && 'metadata' in next ? (next.metadata as ExpectedAny) : undefined,
});
this.emit('changedSpec', {
previous: previous && 'spec' in previous ? (previous.spec as ExpectedAny) : undefined,
next: next && 'spec' in next ? (next.spec as ExpectedAny) : undefined,
});
this.emit('changed');
this.emit('updated');
}
}
export { ResourceReference };

View File

@@ -0,0 +1,289 @@
import { ApiException, PatchStrategy, V1MicroTime, type KubernetesObject } from '@kubernetes/client-node';
import { EventEmitter } from 'eventemitter3';
import equal from 'deep-equal';
import { Services } from '../../utils/service.ts';
import { K8sService } from '../k8s/k8s.ts';
import { Queue } from '../queue/queue.ts';
import { GROUP } from '../../utils/consts.ts';
import { ResourceService } from './resources.ts';
type ResourceOptions<T extends KubernetesObject> = {
services: Services;
manifest?: T;
data: {
apiVersion: string;
kind: string;
name: string;
namespace?: string;
};
};
type UnknownResource = KubernetesObject & {
spec: ExpectedAny;
data: ExpectedAny;
};
type EventOptions = {
reason: string;
message: string;
action: string;
type: 'Normal' | 'Warning' | 'Error';
};
type ResourceEvents<T extends KubernetesObject> = {
updated: () => void;
deleted: () => void;
changed: () => void;
changedStatus: (options: {
previous: T extends { status: infer K } ? K | undefined : never;
next: T extends { status: infer K } ? K | undefined : never;
}) => void;
changedMetadate: (options: { previous: T['metadata'] | undefined; next: T['metadata'] | undefined }) => void;
changedSpec: (options: {
previous: T extends { spec: infer K } ? K | undefined : never;
next: T extends { spec: infer K } ? K | undefined : never;
}) => void;
};
class Resource<T extends KubernetesObject = UnknownResource> extends EventEmitter<ResourceEvents<T>> {
#options: ResourceOptions<T>;
#queue: Queue;
constructor(options: ResourceOptions<T>) {
super();
this.#options = options;
this.#queue = new Queue({ concurrency: 1 });
}
public get specifier() {
return this.#options.data;
}
public get manifest() {
return this.#options?.manifest;
}
public set manifest(obj: T | undefined) {
if (equal(obj, this.manifest)) {
return;
}
this.#options.manifest = obj;
const nextManifest = obj || {};
const currentManifest = this.manifest || {};
const nextStatus = 'status' in nextManifest ? nextManifest.status : undefined;
const currentStatus = 'status' in currentManifest ? currentManifest.status : undefined;
if (!equal(nextStatus, currentStatus)) {
this.emit('changedStatus', {
previous: currentStatus as ExpectedAny,
next: nextStatus as ExpectedAny,
});
}
const nextSpec = 'spec' in nextManifest ? nextManifest.spec : undefined;
const currentSpec = 'spec' in currentManifest ? currentManifest.spec : undefined;
if (!equal(nextSpec, currentSpec)) {
this.emit('changedSpec', {
next: nextSpec as ExpectedAny,
previous: currentSpec as ExpectedAny,
});
}
const nextMetadata = 'metadata' in nextManifest ? nextManifest.metadata : undefined;
const currentMetadata = 'metadata' in currentManifest ? currentManifest.metadata : undefined;
if (!equal(nextMetadata, currentMetadata)) {
this.emit('changedMetadate', {
next: nextMetadata as ExpectedAny,
previous: currentMetadata as ExpectedAny,
});
}
this.emit('updated');
this.emit('changed');
}
public get ref() {
if (!this.metadata?.uid) {
throw new Error('No uid for resource');
}
return {
apiVersion: this.apiVersion,
kind: this.kind,
name: this.name,
uid: this.metadata.uid,
};
}
public get exists() {
return !!this.manifest;
}
public get apiVersion() {
return this.#options.data.apiVersion;
}
public get group() {
const [group] = this.apiVersion?.split('/') || [];
return group;
}
public get version() {
const [, version] = this.apiVersion?.split('/') || [];
return version;
}
public get kind() {
return this.#options.data.kind;
}
public get metadata() {
return this.manifest?.metadata;
}
public get name() {
return this.#options.data.name;
}
public get namespace() {
return this.#options.data.namespace;
}
public get spec(): T extends { spec?: infer K } ? K | undefined : never {
if (this.manifest && 'spec' in this.manifest) {
return this.manifest.spec as ExpectedAny;
}
return undefined as ExpectedAny;
}
public get data(): T extends { data?: infer K } ? K | undefined : never {
if (this.manifest && 'data' in this.manifest) {
return this.manifest.data as ExpectedAny;
}
return undefined as ExpectedAny;
}
public get owners() {
const { services } = this.#options;
const references = this.metadata?.ownerReferences || [];
const resourceService = services.get(ResourceService);
return references.map((ref) =>
resourceService.get({
apiVersion: ref.apiVersion,
kind: ref.kind,
name: ref.name,
namespace: this.namespace,
}),
);
}
public patch = (patch: T) =>
this.#queue.add(async () => {
const { services } = this.#options;
const k8s = services.get(K8sService);
const body = {
...patch,
apiVersion: this.apiVersion,
kind: this.kind,
metadata: {
name: this.name,
namespace: this.namespace,
...patch.metadata,
},
};
try {
this.manifest = await k8s.objectsApi.patch(
body,
undefined,
undefined,
undefined,
undefined,
PatchStrategy.MergePatch,
);
} catch (err) {
if (err instanceof ApiException && err.code === 404) {
this.manifest = await k8s.objectsApi.create(body);
return;
}
throw err;
}
});
public delete = () =>
this.#queue.add(async () => {
try {
const { services } = this.#options;
const k8s = services.get(K8sService);
await k8s.objectsApi.delete({
apiVersion: this.apiVersion,
kind: this.kind,
metadata: {
name: this.name,
namespace: this.namespace,
},
});
this.manifest = undefined;
} catch (err) {
if (err instanceof ApiException && err.code === 404) {
return;
}
throw err;
}
});
public load = () =>
this.#queue.add(async () => {
const { services } = this.#options;
const k8s = services.get(K8sService);
try {
const manifest = await k8s.objectsApi.read({
apiVersion: this.apiVersion,
kind: this.kind,
metadata: {
name: this.name,
namespace: this.namespace,
},
});
this.manifest = manifest as T;
} catch (err) {
if (err instanceof ApiException && err.code === 404) {
this.manifest = undefined;
} else {
throw err;
}
}
});
public addEvent = async (event: EventOptions) => {
const { services } = this.#options;
const k8sService = services.get(K8sService);
await k8sService.eventsApi.createNamespacedEvent({
namespace: this.namespace || 'default',
body: {
kind: 'Event',
metadata: {
name: `${this.name}-${Date.now()}-${Buffer.from(crypto.getRandomValues(new Uint8Array(8))).toString('hex')}`,
namespace: this.namespace,
},
eventTime: new V1MicroTime(),
note: event.message,
action: event.action,
reason: event.reason,
type: event.type,
reportingController: GROUP,
reportingInstance: this.name,
regarding: {
apiVersion: this.apiVersion,
resourceVersion: this.metadata?.resourceVersion,
kind: this.kind,
name: this.name,
namespace: this.namespace,
uid: this.metadata?.uid,
},
},
});
};
}
export { Resource, type UnknownResource, type ResourceEvents };

View File

@@ -0,0 +1,44 @@
import type { KubernetesObject } from '@kubernetes/client-node';
import type { Services } from '../../utils/service.ts';
import { Resource } from './resources.resource.ts';
type ResourceGetOptions = {
apiVersion: string;
kind: string;
name: string;
namespace?: string;
};
class ResourceService {
#cache: Resource<ExpectedAny>[] = [];
#services: Services;
constructor(services: Services) {
this.#services = services;
}
public get = <T extends KubernetesObject>(options: ResourceGetOptions) => {
const { apiVersion, kind, name, namespace } = options;
let resource = this.#cache.find(
(resource) =>
resource.specifier.kind === kind &&
resource.specifier.apiVersion === apiVersion &&
resource.specifier.name === name &&
resource.specifier.namespace === namespace,
);
if (resource) {
return resource as Resource<T>;
}
resource = new Resource({
data: options,
services: this.#services,
});
this.#cache.push(resource);
return resource as Resource<T>;
};
}
export { ResourceReference } from './resources.ref.ts';
export { ResourceService, Resource };

View File

@@ -0,0 +1,92 @@
import type { V1Secret } from '@kubernetes/client-node';
import type { z, ZodObject } from 'zod';
import deepEqual from 'deep-equal';
import { ResourceService, type Resource } from '../resources/resources.ts';
import type { Services } from '../../utils/service.ts';
type EnsuredSecretOptions<T extends ZodObject> = {
services: Services;
name: string;
namespace: string;
schema: T;
owner?: ExpectedAny[];
generator: (previous?: unknown) => z.infer<T>;
validate?: (value: T) => boolean;
};
class EnsuredSecret<T extends ZodObject> {
#options: EnsuredSecretOptions<T>;
#resource: Resource<V1Secret>;
constructor(options: EnsuredSecretOptions<T>) {
this.#options = options;
const { services, name, namespace } = options;
const resourceService = services.get(ResourceService);
this.#resource = resourceService.get({
apiVersion: 'v1',
kind: 'Secret',
name,
namespace,
});
this.#resource.on('changed', this.#handleChanged);
this.#handleChanged();
}
public get resouce() {
return this.#resource;
}
public get value(): z.infer<T> | undefined {
if (!this.#resource.data) {
return undefined;
}
return Object.fromEntries(
Object.entries(this.#resource.data).map(([name, value]) => [name, Buffer.from(value, 'base64').toString('utf8')]),
) as ExpectedAny;
}
public patch = async (value: ExpectedAny) => {
const patched = {
...this.value,
...value,
};
if (deepEqual(patched, this.value)) {
return;
}
await this.resouce.patch({
data: patched,
});
};
public get isValid() {
const { schema, validate } = this.#options;
const { success } = schema.safeParse(this.value);
if (!success) {
return false;
}
if (validate) {
return validate(this.value as unknown as T);
}
return true;
}
#handleChanged = () => {
const { generator, owner } = this.#options;
if (this.isValid && deepEqual(this.#resource.metadata?.ownerReferences, owner)) {
return;
}
const data = generator();
const encodedValues = Object.fromEntries(
Object.entries(data).map(([name, value]) => [name, Buffer.from(String(value)).toString('base64')]),
);
this.#resource.patch({
metadata: {
ownerReferences: owner,
},
data: encodedValues,
});
};
}
export { EnsuredSecret, type EnsuredSecretOptions };

View File

@@ -1 +0,0 @@
class SecretsService {}

View File

@@ -0,0 +1,22 @@
import type { ZodObject } from 'zod';
import type { Services } from '../../utils/service.ts';
import { EnsuredSecret, type EnsuredSecretOptions } from './secrets.secret.ts';
class SecretService {
#services: Services;
constructor(services: Services) {
this.#services = services;
}
public ensure = <T extends ZodObject>(options: Omit<EnsuredSecretOptions<T>, 'services'>) => {
return new EnsuredSecret({
...options,
services: this.#services,
});
};
}
export { SecretService };

View File

@@ -0,0 +1,37 @@
import type { Services } from '../../utils/service.ts';
import { Watcher, type WatcherOptions } from './watchers.watcher.ts';
class WatcherService {
#services: Services;
constructor(services: Services) {
this.#services = services;
}
public create = (options: Omit<WatcherOptions, 'services'>) => {
const instance = new Watcher({
...options,
services: this.#services,
});
return instance;
};
public watchCustomGroup = async (group: string, version: string, plurals: string[]) => {
for (const plural of plurals) {
await this.create({
path: `/apis/${group}/${version}/${plural}`,
list: async (k8s) => {
return await k8s.customObjectsApi.listCustomObjectForAllNamespaces({
group,
version,
plural,
});
},
verbs: ['add', 'update', 'delete'],
}).start();
}
};
}
export { WatcherService, Watcher };

View File

@@ -0,0 +1,80 @@
import { makeInformer, type Informer, type KubernetesListObject, type KubernetesObject } from '@kubernetes/client-node';
import { EventEmitter } from 'eventemitter3';
import { K8sService } from '../k8s/k8s.ts';
import type { Services } from '../../utils/service.ts';
import { ResourceService, type Resource } from '../resources/resources.ts';
type ResourceChangedAction = 'add' | 'update' | 'delete';
type WatcherEvents<T extends KubernetesObject> = {
changed: (resource: Resource<T>) => void;
};
type WatcherOptions<T extends KubernetesObject = KubernetesObject> = {
path: string;
list: (k8s: K8sService) => Promise<KubernetesListObject<T>>;
selector?: string;
services: Services;
verbs: ResourceChangedAction[];
transform?: (input: T) => T;
};
class Watcher<T extends KubernetesObject> extends EventEmitter<WatcherEvents<T>> {
#options: WatcherOptions<T>;
#informer: Informer<T>;
constructor(options: WatcherOptions<T>) {
super();
this.#options = options;
this.#informer = this.#setup();
}
#setup = () => {
const { services, path, list, selector } = this.#options;
const k8s = services.get(K8sService);
const informer = makeInformer(k8s.config, path, list.bind(this, k8s), selector);
informer.on('add', this.#handleResource.bind(this, 'add'));
informer.on('update', this.#handleResource.bind(this, 'update'));
informer.on('delete', this.#handleResource.bind(this, 'delete'));
informer.on('error', (err) => {
console.log('Watcher failed, will retry in 5 seconds', path, err);
setTimeout(this.start, 5000);
});
return informer;
};
#handleResource = (action: ResourceChangedAction, originalManifest: T) => {
const { services, transform } = this.#options;
const manifest = transform ? transform(originalManifest) : originalManifest;
const resourceService = services.get(ResourceService);
const { apiVersion, kind, metadata = {} } = manifest;
const { name, namespace } = metadata;
if (!name || !apiVersion || !kind) {
return;
}
const resource = resourceService.get<T>({
apiVersion,
kind,
name,
namespace,
});
if (action === 'delete') {
resource.manifest = undefined;
} else {
resource.manifest = manifest;
}
this.emit('changed', resource);
};
public stop = async () => {
await this.#informer.stop();
};
public start = async () => {
await this.#informer.start();
};
}
export { Watcher, type WatcherOptions, type ResourceChangedAction };