This commit is contained in:
Morten Olsen
2025-10-23 13:47:07 +02:00
commit b851dc3006
91 changed files with 7578 additions and 0 deletions

View File

@@ -0,0 +1,10 @@
import { KubeConfig } from '@kubernetes/client-node';
class K8sConfig extends KubeConfig {
constructor() {
super();
this.loadFromDefault();
}
}
export { K8sConfig };

View File

@@ -0,0 +1,10 @@
import { Resource } from '../resources/resource/resource.js';
import type { V1CustomResourceDefinition } from '@kubernetes/client-node';
class CRD extends Resource<V1CustomResourceDefinition> {
public static readonly apiVersion = 'apiextensions.k8s.io/v1';
public static readonly kind = 'CustomResourceDefinition';
}
export { CRD };

View File

@@ -0,0 +1,9 @@
import { Resource } from '../resources/resource/resource.js';
import type { V1Deployment } from '@kubernetes/client-node';
class Deployment extends Resource<V1Deployment> {
public static readonly apiVersion = 'apps/v1';
public static readonly kind = 'Deployment';
}
export { Deployment };

View File

@@ -0,0 +1,9 @@
import { Resource } from '../resources/resource/resource.js';
import type { V1Namespace } from '@kubernetes/client-node';
class Namespace extends Resource<V1Namespace> {
public static readonly apiVersion = 'v1';
public static readonly kind = 'Namespace';
}
export { Namespace };

View File

@@ -0,0 +1,9 @@
import { Resource } from '../resources/resource/resource.js';
import type { V1PersistentVolume } from '@kubernetes/client-node';
class PersistentVolume extends Resource<V1PersistentVolume> {
public static readonly apiVersion = 'v1';
public static readonly kind = 'PersistentVolume';
}
export { PersistentVolume };

View File

@@ -0,0 +1,29 @@
import { Resource, type ResourceOptions } from '../resources/resource/resource.js';
import type { KubernetesObject, V1Secret } from '@kubernetes/client-node';
import { decodeSecret, encodeSecret } from '../utils/utils.secrets.js';
type SetOptions<T extends Record<string, string | undefined>> = T | ((current: T | undefined) => T | Promise<T>);
class Secret<T extends Record<string, string> = Record<string, string>> extends Resource<V1Secret> {
public static readonly apiVersion = 'v1';
public static readonly kind = 'Secret';
constructor(options: ResourceOptions<V1Secret>) {
super(options);
}
public get value() {
return decodeSecret(this.data) as T | undefined;
}
public set = async (options: SetOptions<T>, data?: KubernetesObject) => {
const value = typeof options === 'function' ? await Promise.resolve(options(this.value)) : options;
await this.ensure({
...data,
data: encodeSecret(value),
});
};
}
export { Secret };

View File

@@ -0,0 +1,13 @@
import { Resource } from '../resources/resource/resource.js';
import type { V1Service } from '@kubernetes/client-node';
class Service extends Resource<V1Service> {
public static readonly apiVersion = 'v1';
public static readonly kind = 'Service';
public get hostname() {
return `${this.name}.${this.namespace}.svc.cluster.local`;
}
}
export { Service };

View File

@@ -0,0 +1,9 @@
import { Resource } from '../resources/resource/resource.js';
import type { V1StatefulSet } from '@kubernetes/client-node';
class StatefulSet extends Resource<V1StatefulSet> {
public static readonly apiVersion = 'apps/v1';
public static readonly kind = 'StatefulSet';
}
export { StatefulSet };

View File

@@ -0,0 +1,10 @@
import { Resource } from '../resources/resource/resource.js';
import type { V1StorageClass } from '@kubernetes/client-node';
class StorageClass extends Resource<V1StorageClass> {
public static readonly apiVersion = 'storage.k8s.io/v1';
public static readonly kind = 'StorageClass';
public static readonly plural = 'storageclasses';
}
export { StorageClass };

View File

@@ -0,0 +1,19 @@
import { CRD } from "./core.crd.js";
import { Deployment } from "./core.deployment.js";
import { Namespace } from "./core.namespace.js";
import { PersistentVolume } from "./core.pv.js";
import { Secret } from "./core.secret.js";
import { Service } from "./core.service.js";
import { StatefulSet } from "./core.stateful-set.js";
import { StorageClass } from "./core.storage-class.js";
export {
CRD,
Deployment,
Namespace,
PersistentVolume,
Secret,
Service,
StatefulSet,
StorageClass,
}

View File

@@ -0,0 +1,14 @@
class NotReadyError extends Error {
#reason?: string;
constructor(reason?: string, message?: string) {
super(message || reason || 'Resource is not ready');
this.#reason = reason;
}
get reason() {
return this.#reason;
}
}
export { NotReadyError };

View File

@@ -0,0 +1,14 @@
export { K8sOperator } from './operator.js';
export {
ResourceService,
Resource,
type ResourceOptions,
ResourceReference,
CustomResource,
type CustomResourceOptions,
} from './resources/resources.js';
export { Watcher, WatcherService } from './watchers/watchers.js';
export { NotReadyError } from './errors/errors.js';
export * as k8s from '@kubernetes/client-node';
export { z } from 'zod';
export * from './core/core.js';

2
packages/k8s/src/global.d.ts vendored Normal file
View File

@@ -0,0 +1,2 @@
// eslint-disable-next-line @typescript-eslint/no-explicit-any
declare type ExplicitAny = any;

View File

@@ -0,0 +1,16 @@
import { Services } from '@morten-olsen/box-utils/services';
import { ResourceService } from './resources/resources.js';
class K8sOperator {
#services: Services;
constructor(services: Services = new Services()) {
this.#services = services;
}
public get resources() {
return this.#services.get(ResourceService);
}
}
export { K8sOperator };

View File

@@ -0,0 +1,183 @@
import { z, type ZodType } from 'zod';
import { CustomObjectsApi, PatchStrategy, setHeaderOptions, type KubernetesObject } from '@kubernetes/client-node';
import { CronJob, CronTime } from 'cron';
import { K8sConfig } from '../../config/config.js';
import { CoalescingQueue } from '@morten-olsen/box-utils/coalescing-queue';
import { Resource, type ResourceOptions } from './resource.js';
import { NotReadyError } from '../../errors/errors.js'
const customResourceStatusSchema = z.object({
observedGeneration: z.number().optional(),
conditions: z
.array(
z.object({
observedGeneration: z.number().optional(),
type: z.string(),
status: z.enum(['True', 'False', 'Unknown']),
lastTransitionTime: z.string().datetime().optional(),
resource: z.boolean().optional(),
failed: z.boolean().optional(),
syncing: z.boolean().optional(),
reason: z.string().optional().optional(),
message: z.string().optional().optional(),
}),
)
.optional(),
});
type CustomResourceOptions<TSpec extends ZodType> = ResourceOptions<KubernetesObject & { spec: z.infer<TSpec> }>;
class CustomResource<TSpec extends ZodType> extends Resource<
KubernetesObject & { spec: z.infer<TSpec>; status?: z.infer<typeof customResourceStatusSchema> }
> {
public static readonly apiVersion: string;
public static readonly status = customResourceStatusSchema;
public static readonly labels: Record<string, string> = {};
public static readonly dependsOn?: Resource<KubernetesObject>[];
#reconcileQueue: CoalescingQueue<void>;
#cron: CronJob;
constructor(options: CustomResourceOptions<TSpec>) {
super(options);
this.#reconcileQueue = new CoalescingQueue({
action: async () => {
try {
if (!this.exists || this.manifest?.metadata?.deletionTimestamp) {
return;
}
await this.markSeen();
await this.reconcile?.();
await this.markReady();
} catch (err) {
if (err instanceof NotReadyError) {
await this.markNotReady(err.reason, err.message);
} else if (err instanceof Error) {
await this.markNotReady('Failed', err.message);
} else {
await this.markNotReady('Failed', String(err));
}
console.error(err);
}
},
});
this.#cron = CronJob.from({
cronTime: '*/2 * * * *',
onTick: this.queueReconcile,
start: true,
runOnInit: true,
});
this.on('changed', this.#handleUpdate);
}
public get reconcileTime() {
return this.#cron.cronTime.toString();
}
public set reconcileTime(pattern: string) {
this.#cron.cronTime = new CronTime(pattern);
}
public get isSeen() {
return this.metadata?.generation === this.status?.observedGeneration;
}
public get version() {
const [, version] = this.apiVersion.split('/');
return version;
}
public get group() {
const [group] = this.apiVersion.split('/');
return group;
}
public get scope() {
if (!('scope' in this.constructor) || typeof this.constructor.scope !== 'string') {
return;
}
return this.constructor.scope as 'Namespaced' | 'Cluster';
}
#handleUpdate = async (
previous?: KubernetesObject & { spec: z.infer<TSpec>; status?: z.infer<typeof customResourceStatusSchema> },
) => {
if (this.isSeen && previous) {
return;
}
return await this.queueReconcile();
};
public reconcile?: () => Promise<void>;
public queueReconcile = () => {
return this.#reconcileQueue.run();
};
public markSeen = async () => {
if (this.isSeen) {
return;
}
await this.patchStatus({
observedGeneration: this.metadata?.generation,
});
};
public markNotReady = async (reason?: string, message?: string) => {
await this.patchStatus({
conditions: [
{
type: 'Ready',
status: 'False',
reason,
message,
},
],
});
};
public markReady = async () => {
await this.patchStatus({
conditions: [
{
type: 'Ready',
status: 'True',
},
],
});
};
public patchStatus = (status: Partial<z.infer<typeof customResourceStatusSchema>>) =>
this.queue.add(async () => {
const config = this.services.get(K8sConfig);
const customObjectsApi = config.makeApiClient(CustomObjectsApi);
if (this.scope === 'Cluster') {
await customObjectsApi.patchClusterCustomObjectStatus(
{
version: this.version,
group: this.group,
plural: this.plural,
name: this.name,
body: { status },
},
setHeaderOptions('Content-Type', PatchStrategy.MergePatch),
);
} else {
await customObjectsApi.patchNamespacedCustomObjectStatus(
{
version: this.version,
group: this.group,
plural: this.plural,
name: this.name,
namespace: this.namespace || 'default',
body: { status },
},
setHeaderOptions('Content-Type', PatchStrategy.MergePatch),
);
}
});
}
export { CustomResource, type CustomResourceOptions };

View File

@@ -0,0 +1,43 @@
import { EventEmitter } from '@morten-olsen/box-utils/event-emitter';
import type { ResourceClass } from '../resources.js';
import type { ResourceEvents } from './resource.js';
class ResourceReference<T extends ResourceClass<ExplicitAny>> extends EventEmitter<ResourceEvents<T>> {
#current?: {
instance: InstanceType<T>;
unsubscribe: () => void;
};
constructor(current?: InstanceType<T>) {
super();
this.current = current;
}
public get current() {
return this.#current?.instance;
}
public set current(value: InstanceType<T> | undefined) {
const previous = this.#current;
this.#current?.unsubscribe();
if (value) {
const unsubscribe = value.on('changed', this.#handleChange);
this.#current = {
instance: value,
unsubscribe,
};
} else {
this.#current = undefined;
}
if (previous !== value) {
this.emit('changed');
}
}
#handleChange = () => {
this.emit('changed');
};
}
export { ResourceReference };

View File

@@ -0,0 +1,193 @@
import { ApiException, KubernetesObjectApi, PatchStrategy, type KubernetesObject } from '@kubernetes/client-node';
import deepEqual from 'deep-equal';
import { ResourceService } from '../resources.service.js';
import type { Services } from '@morten-olsen/box-utils/services';
import { EventEmitter } from '@morten-olsen/box-utils/event-emitter';
import { Queue } from '@morten-olsen/box-utils/queue';
import { isDeepSubset } from '@morten-olsen/box-utils/objects';
import { K8sConfig } from '../../config/config.js';
type ResourceSelector = {
apiVersion: string;
kind: string;
name: string;
namespace?: string;
};
type ResourceOptions<T extends KubernetesObject> = {
services: Services;
selector: ResourceSelector;
manifest?: T;
};
type ResourceEvents<T extends KubernetesObject> = {
changed: (from?: T) => void;
};
class Resource<T extends KubernetesObject> extends EventEmitter<ResourceEvents<T>> {
#manifest?: T;
#queue: Queue;
#options: ResourceOptions<T>;
constructor(options: ResourceOptions<T>) {
super();
this.#options = options;
this.#manifest = options.manifest;
this.#queue = new Queue({ concurrency: 1 });
}
protected get queue() {
return this.#queue;
}
public get services() {
return this.#options.services;
}
public get resources() {
return this.services.get(ResourceService);
}
public get manifest() {
return this.#manifest;
}
public set manifest(value: T | undefined) {
if (deepEqual(this.manifest, value)) {
return;
}
const previous = this.#manifest;
this.#manifest = value;
this.emit('changed', previous);
}
public get plural() {
if ('plural' in this.constructor && typeof this.constructor.plural === 'string') {
return this.constructor.plural;
}
if ('kind' in this.constructor && typeof this.constructor.kind === 'string') {
return this.constructor.kind.toLowerCase() + 's';
}
throw new Error('Unknown kind');
}
public get exists() {
return !!this.#manifest;
}
public get ready() {
return this.exists;
}
public get selector() {
return this.#options.selector;
}
public get apiVersion() {
return this.selector.apiVersion;
}
public get kind() {
return this.selector.kind;
}
public get name() {
return this.selector.name;
}
public get namespace() {
return this.selector.namespace;
}
public get metadata() {
return this.manifest?.metadata;
}
public get ref() {
if (!this.metadata?.uid) {
throw new Error('No uid for resource');
}
return {
apiVersion: this.apiVersion,
kind: this.kind,
name: this.name,
uid: this.metadata.uid,
};
}
public get spec(): (T extends { spec?: infer K } ? K : never) | undefined {
const manifest = this.manifest;
if (!manifest || !('spec' in manifest)) {
return;
}
return manifest.spec as ExplicitAny;
}
public get data(): (T extends { data?: infer K } ? K : never) | undefined {
const manifest = this.manifest;
if (!manifest || !('data' in manifest)) {
return;
}
return manifest.data as ExplicitAny;
}
public get status(): (T extends { status?: infer K } ? K : never) | undefined {
const manifest = this.manifest;
if (!manifest || !('status' in manifest)) {
return;
}
return manifest.status as ExplicitAny;
}
public patch = (patch: T) =>
this.#queue.add(async () => {
const { services } = this.#options;
const config = services.get(K8sConfig);
const objectsApi = config.makeApiClient(KubernetesObjectApi);
const body = {
...patch,
apiVersion: this.selector.apiVersion,
kind: this.selector.kind,
metadata: {
...patch.metadata,
name: this.selector.name,
namespace: this.selector.namespace,
},
};
try {
this.manifest = await objectsApi.patch(
body,
undefined,
undefined,
undefined,
undefined,
PatchStrategy.MergePatch,
);
} catch (err) {
if (err instanceof ApiException && err.code === 404) {
this.manifest = await objectsApi.create(body);
return;
}
throw err;
}
});
public getCondition = (
condition: string,
): T extends { status?: { conditions?: (infer U)[] } } ? U | undefined : undefined => {
const status = this.status as ExplicitAny;
return status?.conditions?.find((c: ExplicitAny) => c?.type === condition);
};
public ensure = async (manifest: T) => {
if (isDeepSubset(this.manifest, manifest)) {
return false;
}
await this.patch(manifest);
return true;
};
}
export { Resource, type ResourceOptions, type ResourceEvents };

View File

@@ -0,0 +1,139 @@
import { ApiException, ApiextensionsV1Api, type KubernetesObject } from '@kubernetes/client-node';
import type { ZodType } from 'zod';
import { WatcherService } from '../watchers/watchers.js';
import { K8sConfig } from '../config/config.js';
import { createManifest } from './resources.utils.js';
import { Resource, type ResourceOptions } from './resource/resource.js';
import { EventEmitter } from '@morten-olsen/box-utils/event-emitter';
import type { Services } from '@morten-olsen/box-utils/services';
type ResourceClass<T extends KubernetesObject> = (new (options: ResourceOptions<T>) => InstanceType<typeof Resource<T>>) & {
apiVersion: string;
kind: string;
plural?: string;
};
type InstallableResourceClass<T extends KubernetesObject> = ResourceClass<T> & {
spec: ZodType;
status: ZodType;
scope: 'Namespaced' | 'Cluster';
labels: Record<string, string>;
};
type ResourceServiceEvents = {
changed: (resource: Resource<ExplicitAny>) => void;
};
class ResourceService extends EventEmitter<ResourceServiceEvents> {
#services: Services;
#registry: Map<
ResourceClass<ExplicitAny>,
{
apiVersion: string;
kind: string;
plural?: string;
resources: Resource<ExplicitAny>[];
}
>;
constructor(services: Services) {
super();
this.#services = services;
this.#registry = new Map();
}
public register = async (...resources: ResourceClass<ExplicitAny>[]) => {
for (const resource of resources) {
if (!this.#registry.has(resource)) {
this.#registry.set(resource, {
apiVersion: resource.apiVersion,
kind: resource.kind,
plural: resource.plural,
resources: [],
});
}
const watcherService = this.#services.get(WatcherService);
const watcher = watcherService.create({
...resource,
verbs: ['add', 'update', 'delete'],
});
watcher.on('changed', (manifest) => {
const { name, namespace } = manifest.metadata || {};
if (!name) {
return;
}
const current = this.get(resource, name, namespace);
current.manifest = manifest;
});
await watcher.start();
}
};
public getAllOfKind = <T extends ResourceClass<ExplicitAny>>(type: T) => {
return (this.#registry.get(type)?.resources?.filter((r) => r.exists) as InstanceType<T>[]) || [];
};
public get = <T extends ResourceClass<ExplicitAny>>(type: T, name: string, namespace?: string) => {
let resourceRegistry = this.#registry.get(type);
if (!resourceRegistry) {
resourceRegistry = {
apiVersion: type.apiVersion,
kind: type.kind,
plural: type.plural,
resources: [],
};
this.#registry.set(type, resourceRegistry);
}
const { resources, apiVersion, kind } = resourceRegistry;
let current = resources.find((resource) => resource.name === name && resource.namespace === namespace);
if (!current) {
current = new type({
selector: {
apiVersion,
kind,
name,
namespace,
},
services: this.#services,
});
current.on('changed', this.emit.bind(this, 'changed', current));
resources.push(current);
}
return current as InstanceType<T>;
};
public install = async (...resources: InstallableResourceClass<ExplicitAny>[]) => {
const config = this.#services.get(K8sConfig);
const extensionsApi = config.makeApiClient(ApiextensionsV1Api);
for (const resource of resources) {
try {
const manifest = createManifest(resource);
try {
await extensionsApi.createCustomResourceDefinition({
body: manifest,
});
} catch (error) {
if (error instanceof ApiException && error.code === 409) {
await extensionsApi.patchCustomResourceDefinition({
name: manifest.metadata.name,
body: [{ op: 'replace', path: '/spec', value: manifest.spec }],
});
continue;
}
throw error;
}
} catch (error) {
if (error instanceof ApiException) {
throw new Error(`Failed to install ${resource.kind}: ${error.body}`);
}
throw error;
}
}
};
}
export { ResourceService, Resource, type ResourceOptions, type ResourceClass, type InstallableResourceClass };

View File

@@ -0,0 +1,4 @@
export { CustomResource, type CustomResourceOptions } from './resource/resource.custom.js';
export { ResourceReference } from './resource/resource.reference.js';
export { ResourceService, Resource, type ResourceOptions, type ResourceClass, type InstallableResourceClass } from './resources.service.js';

View File

@@ -0,0 +1,55 @@
import { z } from 'zod';
import type { InstallableResourceClass } from './resources.ts';
const createManifest = (defintion: InstallableResourceClass<ExplicitAny>) => {
const plural = defintion.plural ?? defintion.kind.toLowerCase() + 's';
const [version, group] = defintion.apiVersion.split('/').toReversed();
return {
apiVersion: 'apiextensions.k8s.io/v1',
kind: 'CustomResourceDefinition',
metadata: {
name: `${plural}.${group}`,
labels: defintion.labels,
},
spec: {
group: group,
names: {
kind: defintion.kind,
plural: plural,
singular: defintion.kind.toLowerCase(),
},
scope: defintion.scope,
versions: [
{
name: version,
served: true,
storage: true,
schema: {
openAPIV3Schema: {
type: 'object',
properties: {
spec: {
...z.toJSONSchema(defintion.spec, { io: 'input' }),
$schema: undefined,
additionalProperties: undefined,
} as ExplicitAny,
status: {
...z.toJSONSchema(defintion.status, { io: 'input' }),
$schema: undefined,
additionalProperties: undefined,
} as ExplicitAny,
},
},
},
subresources: {
status: {},
},
},
],
},
};
};
export { createManifest };

View File

@@ -0,0 +1,23 @@
const decodeSecret = <T extends Record<string, string>>(
data: Record<string, ExplicitAny> | undefined,
): T | undefined => {
if (!data) {
return undefined;
}
return Object.fromEntries(
Object.entries(data).map(([name, value]) => [name, Buffer.from(value, 'base64').toString('utf8')]),
) as T;
};
const encodeSecret = <T extends Record<string, string | undefined>>(
data: T | undefined,
): Record<string, string> | undefined => {
if (!data) {
return undefined;
}
return Object.fromEntries(
Object.entries(data).map(([name, value]) => [name, Buffer.from(value || '', 'utf8').toString('base64')]),
);
};
export { decodeSecret, encodeSecret };

View File

@@ -0,0 +1,70 @@
import { KubernetesObjectApi, makeInformer, type Informer, type KubernetesObject } from '@kubernetes/client-node';
import { EventEmitter } from '@morten-olsen/box-utils/event-emitter';
import type { Services } from '@morten-olsen/box-utils/services';
import { K8sConfig } from '../../config/config.js';
type ResourceChangedAction = 'add' | 'update' | 'delete';
type WatcherEvents<T extends KubernetesObject> = {
changed: (manifest: T) => void;
};
type WatcherOptions = {
apiVersion: string;
kind: string;
plural?: string;
selector?: string;
services: Services;
verbs: ResourceChangedAction[];
};
class Watcher<T extends KubernetesObject> extends EventEmitter<WatcherEvents<T>> {
#options: WatcherOptions;
#informer: Informer<T>;
constructor(options: WatcherOptions) {
super();
this.#options = options;
this.#informer = this.#setup();
}
#setup = () => {
const { services, apiVersion, kind, selector } = this.#options;
const plural = this.#options.plural ?? kind.toLowerCase() + 's';
const [version, group] = apiVersion.split('/').toReversed();
const config = services.get(K8sConfig);
const path = group ? `/apis/${group}/${version}/${plural}` : `/api/${version}/${plural}`;
const objectsApi = config.makeApiClient(KubernetesObjectApi);
const informer = makeInformer<T>(
config,
path,
async () => {
return objectsApi.list(apiVersion, kind);
},
selector,
);
informer.on('add', this.#handleResource.bind(this, 'add'));
informer.on('update', this.#handleResource.bind(this, 'update'));
informer.on('delete', this.#handleResource.bind(this, 'delete'));
informer.on('error', (err) => {
console.log('Watcher failed, will retry in 3 seconds', path, err);
setTimeout(this.start, 3000);
});
return informer;
};
#handleResource = (action: ResourceChangedAction, manifest: T) => {
this.emit('changed', manifest);
};
public stop = async () => {
await this.#informer.stop();
};
public start = async () => {
await this.#informer.start();
};
}
export { Watcher, type WatcherOptions, type ResourceChangedAction };

View File

@@ -0,0 +1,27 @@
import { Services, destroy } from "@morten-olsen/box-utils/services";
import { Watcher, type WatcherOptions } from "./watcher/watcher.js";
class WatcherService {
#services: Services;
#instances: Watcher<ExplicitAny>[];
constructor(services: Services) {
this.#instances = [];
this.#services = services;
}
public create = (options: Omit<WatcherOptions, 'services'>) => {
const instance = new Watcher({
...options,
services: this.#services,
});
this.#instances.push(instance);
return instance;
};
[destroy] = async () => {
await Promise.all(this.#instances.map((instance) => instance.stop()));
};
}
export { WatcherService, Watcher };