This commit is contained in:
Morten Olsen
2025-08-06 21:18:02 +02:00
parent 757b2fcfac
commit cfb90f7c9f
72 changed files with 16675 additions and 823 deletions

View File

@@ -26,6 +26,7 @@ type Condition = {
class CustomResourceConditions extends EventEmitter<CustomResourceConditionsEvents> {
#options: CustomResourceStatusOptions;
#conditions: Record<string, Condition>;
#changed: boolean;
constructor(options: CustomResourceStatusOptions) {
super();
@@ -40,6 +41,7 @@ class CustomResourceConditions extends EventEmitter<CustomResourceConditionsEven
]),
);
options.resource.on('changed', this.#handleChange);
this.#changed = false;
}
#handleChange = () => {
@@ -66,9 +68,16 @@ class CustomResourceConditions extends EventEmitter<CustomResourceConditionsEven
public set = async (type: string, condition: Omit<Condition, 'lastTransitionTime'>) => {
const current = this.#conditions[type];
const isEqual = equal(
{ ...current, lastTransitionTime: undefined },
{ ...condition, lastTransitionTime: undefined },
);
if (isEqual) {
return;
}
this.#changed = true;
this.#conditions[type] = {
...condition,
lastTransitionTime: current && current.status === condition.status ? current.lastTransitionTime : new Date(),
observedGeneration: this.#options.resource.metadata?.generation,
};
@@ -76,15 +85,24 @@ class CustomResourceConditions extends EventEmitter<CustomResourceConditionsEven
};
public save = async () => {
const { resource } = this.#options;
const status: CustomResourceStatus = {
conditions: Object.entries(this.#conditions).map(([type, condition]) => ({
...condition,
type,
lastTransitionTime: condition.lastTransitionTime.toISOString(),
})),
};
await resource.patchStatus(status);
if (!this.#changed) {
return;
}
try {
this.#changed = false;
const { resource } = this.#options;
const status: CustomResourceStatus = {
conditions: Object.entries(this.#conditions).map(([type, condition]) => ({
...condition,
type,
lastTransitionTime: condition.lastTransitionTime.toISOString(),
})),
};
await resource.patchStatus(status);
} catch (error) {
this.#changed = true;
throw error;
}
};
}

View File

@@ -50,6 +50,13 @@ abstract class CustomResource<TSpec extends ZodObject> extends EventEmitter<Cust
options.resource.on('changed', this.#handleChanged);
this.#queue = new CoalescingQueued({
action: async () => {
if (this.exists && !this.isValidSpec) {
this.services.log.error(
`Invalid spec for ${this.apiVersion}/${this.kind}/${this.namespace}/${this.name}`,
this.spec,
);
return;
}
console.log('Reconcileing', this.apiVersion, this.kind, this.namespace, this.name);
await this.reconcile?.();
},
@@ -75,19 +82,11 @@ abstract class CustomResource<TSpec extends ZodObject> extends EventEmitter<Cust
}
public get apiVersion() {
const apiVersion = this.resource.apiVersion;
if (!apiVersion) {
throw new Error('Custom resources needs an apiVersion');
}
return apiVersion;
return this.resource.apiVersion;
}
public get kind() {
const kind = this.resource.kind;
if (!kind) {
throw new Error('Custom resources needs a kind');
}
return kind;
return this.resource.kind;
}
public get metadata() {
@@ -107,7 +106,7 @@ abstract class CustomResource<TSpec extends ZodObject> extends EventEmitter<Cust
}
public get namespace() {
const namespace = this.metadata.namespace;
const namespace = this.resource.specifier.namespace;
if (!namespace) {
throw new Error('Custom resources needs a namespace');
}

View File

@@ -180,15 +180,20 @@ class Resource<T extends KubernetesObject = UnknownResource> extends EventEmitte
public patch = (patch: T) =>
this.#queue.add(async () => {
const { services } = this.#options;
services.log.debug(`Patching ${this.apiVersion}/${this.kind}/${this.namespace}/${this.name}`, {
specifier: this.specifier,
current: this.manifest,
patch,
});
const k8s = services.get(K8sService);
const body = {
...patch,
apiVersion: this.apiVersion,
kind: this.kind,
apiVersion: this.specifier.apiVersion,
kind: this.specifier.kind,
metadata: {
name: this.name,
namespace: this.namespace,
...patch.metadata,
name: this.specifier.name,
namespace: this.specifier.namespace,
},
};
try {
@@ -213,13 +218,14 @@ class Resource<T extends KubernetesObject = UnknownResource> extends EventEmitte
this.#queue.add(async () => {
try {
const { services } = this.#options;
services.log.debug(`Deleting ${this.apiVersion}/${this.kind}/${this.namespace}/${this.name}`);
const k8s = services.get(K8sService);
await k8s.objectsApi.delete({
apiVersion: this.apiVersion,
kind: this.kind,
apiVersion: this.specifier.apiVersion,
kind: this.specifier.kind,
metadata: {
name: this.name,
namespace: this.namespace,
name: this.specifier.name,
namespace: this.specifier.namespace,
},
});
this.manifest = undefined;
@@ -237,11 +243,11 @@ class Resource<T extends KubernetesObject = UnknownResource> extends EventEmitte
const k8s = services.get(K8sService);
try {
const manifest = await k8s.objectsApi.read({
apiVersion: this.apiVersion,
kind: this.kind,
apiVersion: this.specifier.apiVersion,
kind: this.specifier.kind,
metadata: {
name: this.name,
namespace: this.namespace,
name: this.specifier.name,
namespace: this.specifier.namespace,
},
});
this.manifest = manifest as T;
@@ -254,36 +260,39 @@ class Resource<T extends KubernetesObject = UnknownResource> extends EventEmitte
}
});
public addEvent = async (event: EventOptions) => {
const { services } = this.#options;
const k8sService = services.get(K8sService);
public addEvent = (event: EventOptions) =>
this.#queue.add(async () => {
const { services } = this.#options;
const k8sService = services.get(K8sService);
await k8sService.eventsApi.createNamespacedEvent({
namespace: this.namespace || 'default',
body: {
kind: 'Event',
metadata: {
name: `${this.name}-${Date.now()}-${Buffer.from(crypto.getRandomValues(new Uint8Array(8))).toString('hex')}`,
namespace: this.namespace,
services.log.debug(`Adding event ${this.apiVersion}/${this.kind}/${this.namespace}/${this.name}`, event);
await k8sService.eventsApi.createNamespacedEvent({
namespace: this.specifier.namespace || 'default',
body: {
kind: 'Event',
metadata: {
name: `${this.specifier.name}-${Date.now()}-${Buffer.from(crypto.getRandomValues(new Uint8Array(8))).toString('hex')}`,
namespace: this.specifier.namespace,
},
eventTime: new V1MicroTime(),
note: event.message,
action: event.action,
reason: event.reason,
type: event.type,
reportingController: GROUP,
reportingInstance: this.name,
regarding: {
apiVersion: this.specifier.apiVersion,
resourceVersion: this.metadata?.resourceVersion,
kind: this.specifier.kind,
name: this.specifier.name,
namespace: this.specifier.namespace,
uid: this.metadata?.uid,
},
},
eventTime: new V1MicroTime(),
note: event.message,
action: event.action,
reason: event.reason,
type: event.type,
reportingController: GROUP,
reportingInstance: this.name,
regarding: {
apiVersion: this.apiVersion,
resourceVersion: this.metadata?.resourceVersion,
kind: this.kind,
name: this.name,
namespace: this.namespace,
uid: this.metadata?.uid,
},
},
});
});
};
}
export { Resource, type UnknownResource, type ResourceEvents };

View File

@@ -1,4 +1,10 @@
import { makeInformer, type Informer, type KubernetesListObject, type KubernetesObject } from '@kubernetes/client-node';
import {
ApiException,
makeInformer,
type Informer,
type KubernetesListObject,
type KubernetesObject,
} from '@kubernetes/client-node';
import { EventEmitter } from 'eventemitter3';
import { K8sService } from '../k8s/k8s.ts';
@@ -38,8 +44,10 @@ class Watcher<T extends KubernetesObject> extends EventEmitter<WatcherEvents<T>>
informer.on('update', this.#handleResource.bind(this, 'update'));
informer.on('delete', this.#handleResource.bind(this, 'delete'));
informer.on('error', (err) => {
console.log('Watcher failed, will retry in 5 seconds', path, err);
setTimeout(this.start, 5000);
if (!(err instanceof ApiException && err.code === 404)) {
console.log('Watcher failed, will retry in 3 seconds', path, err);
}
setTimeout(this.start, 3000);
});
return informer;
};