This commit is contained in:
Morten Olsen
2025-08-20 22:45:30 +02:00
parent 9e5081ed9b
commit cfd2d76873
14 changed files with 220 additions and 52 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 798 KiB

View File

@@ -22,6 +22,7 @@
"dependencies": {
"@goauthentik/api": "2025.6.3-1751754396",
"@kubernetes/client-node": "^1.3.0",
"cron": "^4.3.3",
"debounce": "^2.2.0",
"deep-equal": "^2.2.3",
"dotenv": "^17.2.1",

23
pnpm-lock.yaml generated
View File

@@ -14,6 +14,9 @@ importers:
'@kubernetes/client-node':
specifier: ^1.3.0
version: 1.3.0(encoding@0.1.13)
cron:
specifier: ^4.3.3
version: 4.3.3
debounce:
specifier: ^2.2.0
version: 2.2.0
@@ -229,6 +232,9 @@ packages:
'@types/lodash@4.17.20':
resolution: {integrity: sha512-H3MHACvFUEiujabxhaI/ImO6gUrd8oOurg7LQtS7mbwIXA/cUqWrvBsaeJ23aZEPk1TAYkurjfMbSELfoCXlGA==}
'@types/luxon@3.7.1':
resolution: {integrity: sha512-H3iskjFIAn5SlJU7OuxUmTEpebK6TKB8rxZShDslBMZJ5u9S//KM1sbdAisiSrqwLQncVjnpi2OK2J51h+4lsg==}
'@types/node-fetch@2.6.12':
resolution: {integrity: sha512-8nneRWKCg3rMtF69nLQJnOYUcbafYeFSjqkw3jCRLsqkWFlHaoQrr5mXmofFGOx3DKn7UfmBMyov8ySvLRVldA==}
@@ -507,6 +513,10 @@ packages:
console-control-strings@1.1.0:
resolution: {integrity: sha512-ty/fTekppD2fIwRvnZAVdeOiGd1c7YXEixbgJTNzqcxJWKQnjJ/V1bNEEE6hygpM3WjwHFUVK6HTjWSzV4a8sQ==}
cron@4.3.3:
resolution: {integrity: sha512-B/CJj5yL3sjtlun6RtYHvoSB26EmQ2NUmhq9ZiJSyKIM4K/fqfh9aelDFlIayD2YMeFZqWLi9hHV+c+pq2Djkw==}
engines: {node: '>=18.x'}
cross-spawn@7.0.6:
resolution: {integrity: sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==}
engines: {node: '>= 8'}
@@ -1238,6 +1248,10 @@ packages:
resolution: {integrity: sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==}
engines: {node: '>=10'}
luxon@3.7.1:
resolution: {integrity: sha512-RkRWjA926cTvz5rAb1BqyWkKbbjzCGchDUIKMCUvNi17j6f6j8uHGDV82Aqcqtzd+icoYpELmG3ksgGiFNNcNg==}
engines: {node: '>=12'}
make-fetch-happen@9.1.0:
resolution: {integrity: sha512-+zopwDy7DNknmwPQplem5lAZX/eCOzSvSNNcSKm5eVwTkOBzoktEfXsa9L23J/GIRhxRsaxzkPEhrJEpE2F4Gg==}
engines: {node: '>= 10'}
@@ -2129,6 +2143,8 @@ snapshots:
'@types/lodash@4.17.20': {}
'@types/luxon@3.7.1': {}
'@types/node-fetch@2.6.12':
dependencies:
'@types/node': 22.16.5
@@ -2485,6 +2501,11 @@ snapshots:
console-control-strings@1.1.0:
optional: true
cron@4.3.3:
dependencies:
'@types/luxon': 3.7.1
luxon: 3.7.1
cross-spawn@7.0.6:
dependencies:
path-key: 3.1.1
@@ -3329,6 +3350,8 @@ snapshots:
yallist: 4.0.0
optional: true
luxon@3.7.1: {}
make-fetch-happen@9.1.0:
dependencies:
agentkeepalive: 4.6.0

View File

@@ -3,6 +3,7 @@ import type { K8SCertificateV1 } from 'src/__generated__/resources/K8SCertificat
import { CRD } from '#resources/core/crd/crd.ts';
import { Resource, ResourceService, type ResourceOptions } from '#services/resources/resources.ts';
import { NotReadyError } from '#utils/errors.ts';
class Certificate extends Resource<KubernetesObject & K8SCertificateV1> {
public static readonly apiVersion = 'cert-manager.io/v1';
@@ -18,12 +19,19 @@ class Certificate extends Resource<KubernetesObject & K8SCertificateV1> {
}
#handleCrdChanged = () => {
this.emit('changed');
this.emit('changed', this.manifest);
};
public get hasCRD() {
return this.#crd.exists;
}
public set = async (manifest: KubernetesObject & K8SCertificateV1) => {
if (!this.hasCRD) {
throw new NotReadyError('MissingCRD', 'certificates.cert-manager.io does not exist');
}
return this.ensure(manifest);
};
}
export { Certificate };

View File

@@ -3,7 +3,7 @@ import type { V1PersistentVolumeClaim } from '@kubernetes/client-node';
import { StorageClass } from '../storage-class/storage-class.ts';
import { PersistentVolume } from '../pv/pv.ts';
import { Resource, ResourceService } from '#services/resources/resources.ts';
import { Resource, ResourceService, type ResourceOptions } from '#services/resources/resources.ts';
const PROVISIONER = 'homelab-operator';
@@ -11,8 +11,14 @@ class PVC extends Resource<V1PersistentVolumeClaim> {
public static readonly apiVersion = 'v1';
public static readonly kind = 'PersistentVolumeClaim';
constructor(options: ResourceOptions<V1PersistentVolumeClaim>) {
super(options);
this.on('changed', this.reconcile);
}
public reconcile = async () => {
const storageClassName = this.spec?.storageClassName;
console.log('PVC', this.name, storageClassName);
if (!storageClassName) {
return;
}

View File

@@ -16,6 +16,8 @@ import { Service } from '#resources/core/service/service.ts';
import { HelmRelease } from '#resources/flux/helm-release/helm-release.ts';
import { RepoService } from '#bootstrap/repos/repos.ts';
import { VirtualService } from '#resources/istio/virtual-service/virtual-service.ts';
import { DestinationRule } from '#resources/istio/destination-rule/destination-rule.ts';
import { NotReadyError } from '#utils/errors.ts';
const specSchema = z.object({
environment: z.string(),
@@ -43,6 +45,7 @@ class AuthentikServer extends CustomResource<typeof specSchema> {
#service: Service;
#helmRelease: HelmRelease;
#virtualService: VirtualService;
#destinationRule: DestinationRule;
constructor(options: CustomResourceOptions<typeof specSchema>) {
super(options);
@@ -67,17 +70,20 @@ class AuthentikServer extends CustomResource<typeof specSchema> {
this.#virtualService = resourceService.get(VirtualService, this.name, this.namespace);
this.#virtualService.on('changed', this.queueReconcile);
this.#destinationRule = resourceService.get(DestinationRule, this.name, this.namespace);
this.#destinationRule.on('changed', this.queueReconcile);
}
public reconcile = async () => {
if (!this.spec) {
return;
throw new NotReadyError('MissingSpec');
}
const resourceService = this.services.get(ResourceService);
this.#environment.current = resourceService.get(Environment, this.spec.environment);
if (!this.#environment.current.spec) {
return;
throw new NotReadyError('MissingEnvSpev');
}
await this.#database.ensure({
@@ -91,7 +97,7 @@ class AuthentikServer extends CustomResource<typeof specSchema> {
const databaseSecret = this.#database.secret.value;
if (!databaseSecret) {
return;
throw new NotReadyError('MissingDatabaseSecret');
}
await this.#initSecret.set(
@@ -111,7 +117,7 @@ class AuthentikServer extends CustomResource<typeof specSchema> {
const initSecret = this.#initSecret.value;
if (!initSecret) {
return;
throw new NotReadyError('MissingInitSecret');
}
const domain = `${this.spec?.subdomain || 'authentik'}.${this.#environment.current.spec.domain}`;
@@ -129,7 +135,7 @@ class AuthentikServer extends CustomResource<typeof specSchema> {
);
const secret = this.#secret.value;
if (!secret) {
return;
throw new NotReadyError('MissingSecret');
}
const repoService = this.services.get(RepoService);
@@ -214,6 +220,20 @@ class AuthentikServer extends CustomResource<typeof specSchema> {
},
});
await this.#destinationRule.ensure({
metadata: {
ownerReferences: [this.ref],
},
spec: {
host: this.#service.hostname,
trafficPolicy: {
tls: {
mode: 'DISABLE',
},
},
},
});
const gateway = this.#environment.current.gateway;
await this.#virtualService.set({
metadata: {

View File

@@ -11,6 +11,7 @@ import { Certificate } from '#resources/cert-manager/certificate/certificate.ts'
import { StorageClass } from '#resources/core/storage-class/storage-class.ts';
import { PROVISIONER } from '#resources/core/pvc/pvc.ts';
import { Gateway } from '#resources/istio/gateway/gateway.ts';
import { NotReadyError } from '#utils/errors.ts';
const specSchema = z.object({
domain: z.string(),
@@ -86,7 +87,7 @@ class Environment extends CustomResource<typeof specSchema> {
public reconcile = async () => {
const { data: spec, success } = specSchema.safeParse(this.spec);
if (!success || !spec) {
return;
throw new NotReadyError('InvalidSpec');
}
await this.#namespace.ensure({
metadata: {
@@ -95,7 +96,6 @@ class Environment extends CustomResource<typeof specSchema> {
},
},
});
if (this.#certificate.hasCRD) {
await this.#certificate.ensure({
metadata: {
ownerReferences: [this.ref],
@@ -112,7 +112,6 @@ class Environment extends CustomResource<typeof specSchema> {
},
},
});
}
await this.#storageClass.ensure({
metadata: {
ownerReferences: [this.ref],

View File

@@ -12,6 +12,8 @@ import { Secret } from '#resources/core/secret/secret.ts';
import { API_VERSION } from '#utils/consts.ts';
import { getWithNamespace } from '#utils/naming.ts';
import { PostgresService } from '#services/postgres/postgres.service.ts';
import { NotReadyError } from '#utils/errors.ts';
import { generateRandomHexPass } from '#utils/secrets.ts';
const specSchema = z.object({
environment: z.string().optional(),
@@ -77,17 +79,17 @@ class PostgresDatabase extends CustomResource<typeof specSchema> {
this.#cluster.current = environment.postgresCluster;
} else {
this.#cluster.current = undefined;
return;
throw new NotReadyError('MissingEnvOrClusterSpec');
}
const clusterSecret = this.#cluster.current.secret.value;
if (!clusterSecret) {
return;
throw new NotReadyError('MissingClusterSecret');
}
await this.#secret.set(
(current) => ({
password: crypto.randomUUID(),
password: generateRandomHexPass(),
user: this.username,
database: this.database,
...current,
@@ -103,7 +105,7 @@ class PostgresDatabase extends CustomResource<typeof specSchema> {
const secret = this.#secret.value;
if (!secret) {
return;
throw new NotReadyError('MissingSecret');
}
const postgresService = this.services.get(PostgresService);
@@ -117,7 +119,7 @@ class PostgresDatabase extends CustomResource<typeof specSchema> {
const connectionError = await database.ping();
if (connectionError) {
console.error('Failed to connect', connectionError);
return;
throw new NotReadyError('FailedToConnectToDatabase');
}
await database.upsertRole({
name: secret.user,

View File

@@ -15,12 +15,17 @@ class DestinationRule extends Resource<KubernetesObject & K8SDestinationRuleV1>
super(options);
const resourceService = this.services.get(ResourceService);
this.#crd = resourceService.get(CRD, 'destinationrules.networking.istio.io');
this.#crd.on('changed', this.#handleChange);
}
public get hasCRD() {
return this.#crd.exists;
}
#handleChange = () => {
this.emit('changed', this.manifest);
};
public set = async (manifest: KubernetesObject & K8SDestinationRuleV1) => {
if (!this.hasCRD) {
throw new NotReadyError('CRD is not installed');

View File

@@ -15,11 +15,11 @@ class Gateway extends Resource<KubernetesObject & K8SGatewayV1> {
super(options);
const resourceService = this.services.get(ResourceService);
this.#crd = resourceService.get(CRD, 'gateways.networking.istio.io');
this.on('changed', this.#handleUpdate);
this.#crd.on('changed', this.#handleUpdate);
}
#handleUpdate = async () => {
this.emit('changed');
this.emit('changed', this.manifest);
};
public get hasCRD() {

View File

@@ -15,12 +15,17 @@ class VirtualService extends Resource<KubernetesObject & K8SVirtualServiceV1> {
super(options);
const resourceService = this.services.get(ResourceService);
this.#crd = resourceService.get(CRD, 'virtualservices.networking.istio.io');
this.#crd.on('changed', this.#handleChange);
}
public get hasCRD() {
return this.#crd.exists;
}
#handleChange = () => {
this.emit('changed', this.manifest);
};
public set = async (manifest: KubernetesObject & K8SVirtualServiceV1) => {
if (!this.hasCRD) {
throw new NotReadyError('CRD is not installed');

View File

@@ -1,11 +1,13 @@
import { z, type ZodType } from 'zod';
import { type KubernetesObject } from '@kubernetes/client-node';
import { PatchStrategy, setHeaderOptions, type KubernetesObject } from '@kubernetes/client-node';
import { Resource, type ResourceOptions } from './resource.ts';
import { API_VERSION } from '#utils/consts.ts';
import { CoalescingQueued } from '#utils/queues.ts';
import { NotReadyError } from '#utils/errors.ts';
import { K8sService } from '#services/k8s/k8s.ts';
import { CronJob, CronTime } from 'cron';
const customResourceStatusSchema = z.object({
observedGeneration: z.number().optional(),
@@ -15,7 +17,7 @@ const customResourceStatusSchema = z.object({
observedGeneration: z.number().optional(),
type: z.string(),
status: z.enum(['True', 'False', 'Unknown']),
lastTransitionTime: z.string().datetime(),
lastTransitionTime: z.string().datetime().optional(),
resource: z.boolean().optional(),
failed: z.boolean().optional(),
syncing: z.boolean().optional(),
@@ -35,13 +37,14 @@ class CustomResource<TSpec extends ZodType> extends Resource<
public static readonly status = customResourceStatusSchema;
#reconcileQueue: CoalescingQueued<void>;
#cron: CronJob;
constructor(options: CustomResourceOptions<TSpec>) {
super(options);
this.#reconcileQueue = new CoalescingQueued({
action: async () => {
try {
if (!this.exists || !this.manifest?.metadata?.deletionTimestamp) {
if (!this.exists || this.manifest?.metadata?.deletionTimestamp) {
return;
}
this.services.log.debug('Reconciling', {
@@ -50,25 +53,63 @@ class CustomResource<TSpec extends ZodType> extends Resource<
namespace: this.namespace,
name: this.name,
});
await this.markSeen();
await this.reconcile?.();
await this.markReady();
} catch (err) {
if (err instanceof NotReadyError) {
console.error(err);
await this.markNotReady(err.reason, err.message);
} else if (err instanceof Error) {
await this.markNotReady('Failed', err.message);
} else {
throw err;
await this.markNotReady('Failed', String(err));
}
console.error(err);
}
},
});
this.#cron = CronJob.from({
cronTime: '*/2 * * * *',
onTick: this.queueReconcile,
start: true,
runOnInit: true,
});
this.on('changed', this.#handleUpdate);
}
public get reconcileTime() {
return this.#cron.cronTime.toString();
}
public set reconcileTime(pattern: string) {
this.#cron.cronTime = new CronTime(pattern);
}
public get isSeen() {
return this.metadata?.generation === this.status?.observedGeneration;
}
#handleUpdate = async () => {
if (this.isSeen) {
public get version() {
const [, version] = this.apiVersion.split('/');
return version;
}
public get group() {
const [group] = this.apiVersion.split('/');
return group;
}
public get scope() {
if (!('scope' in this.constructor) || typeof this.constructor.scope !== 'string') {
return;
}
return this.constructor.scope as 'Namespaced' | 'Cluster';
}
#handleUpdate = async (
previous?: KubernetesObject & { spec: z.infer<TSpec>; status?: z.infer<typeof customResourceStatusSchema> },
) => {
if (this.isSeen && previous) {
return;
}
return await this.queueReconcile();
@@ -88,9 +129,58 @@ class CustomResource<TSpec extends ZodType> extends Resource<
});
};
public patchStatus = async (status: Partial<z.infer<typeof customResourceStatusSchema>>) => {
this.patch({ status } as ExpectedAny);
public markNotReady = async (reason?: string, message?: string) => {
await this.patchStatus({
conditions: [
{
type: 'Ready',
status: 'False',
reason,
message,
},
],
});
};
public markReady = async () => {
await this.patchStatus({
conditions: [
{
type: 'Ready',
status: 'True',
},
],
});
};
public patchStatus = (status: Partial<z.infer<typeof customResourceStatusSchema>>) =>
this.queue.add(async () => {
const k8sService = this.services.get(K8sService);
if (this.scope === 'Cluster') {
await k8sService.customObjectsApi.patchClusterCustomObjectStatus(
{
version: this.version,
group: this.group,
plural: this.plural,
name: this.name,
body: { status },
},
setHeaderOptions('Content-Type', PatchStrategy.MergePatch),
);
} else {
await k8sService.customObjectsApi.patchNamespacedCustomObjectStatus(
{
version: this.version,
group: this.group,
plural: this.plural,
name: this.name,
namespace: this.namespace || 'default',
body: { status },
},
setHeaderOptions('Content-Type', PatchStrategy.MergePatch),
);
}
});
}
export { CustomResource, type CustomResourceOptions };

View File

@@ -20,11 +20,11 @@ type ResourceOptions<T extends KubernetesObject> = {
manifest?: T;
};
type ResourceEvents = {
changed: () => void;
type ResourceEvents<T extends KubernetesObject> = {
changed: (from?: T) => void;
};
class Resource<T extends KubernetesObject> extends EventEmitter<ResourceEvents> {
class Resource<T extends KubernetesObject> extends EventEmitter<ResourceEvents<T>> {
#manifest?: T;
#queue: Queue;
#options: ResourceOptions<T>;
@@ -36,6 +36,10 @@ class Resource<T extends KubernetesObject> extends EventEmitter<ResourceEvents>
this.#queue = new Queue({ concurrency: 1 });
}
protected get queue() {
return this.#queue;
}
public get services() {
return this.#options.services;
}
@@ -48,8 +52,19 @@ class Resource<T extends KubernetesObject> extends EventEmitter<ResourceEvents>
if (deepEqual(this.manifest, value)) {
return;
}
const previous = this.#manifest;
this.#manifest = value;
this.emit('changed');
this.emit('changed', previous);
}
public get plural() {
if ('plural' in this.constructor && typeof this.constructor.plural === 'string') {
return this.constructor.plural;
}
if ('kind' in this.constructor && typeof this.constructor.kind === 'string') {
return this.constructor.kind.toLowerCase() + 's';
}
throw new Error('Unknown kind');
}
public get exists() {
@@ -123,11 +138,7 @@ class Resource<T extends KubernetesObject> extends EventEmitter<ResourceEvents>
public patch = (patch: T) =>
this.#queue.add(async () => {
const { services } = this.#options;
services.log.debug(`Patching ${this.apiVersion}/${this.kind}/${this.namespace}/${this.name}`, {
spelector: this.selector,
current: this.manifest,
patch,
});
services.log.debug(`Patching ${this.apiVersion}/${this.kind}/${this.namespace}/${this.name}`);
const k8s = services.get(K8sService);
const body = {
...patch,

View File

@@ -47,9 +47,7 @@ class Watcher<T extends KubernetesObject> extends EventEmitter<WatcherEvents<T>>
informer.on('update', this.#handleResource.bind(this, 'update'));
informer.on('delete', this.#handleResource.bind(this, 'delete'));
informer.on('error', (err) => {
if (!(err instanceof ApiException && err.code === 404)) {
console.log('Watcher failed, will retry in 3 seconds', path, err);
}
setTimeout(this.start, 3000);
});
return informer;