This commit is contained in:
Morten Olsen
2025-09-09 18:06:45 +02:00
parent ba7aa90434
commit 0ff0b0992b
25 changed files with 3177 additions and 198 deletions

4
packages/server/.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
/node_modules/
/dist/
/coverage/
/.env

View File

@@ -0,0 +1,39 @@
{
"type": "module",
"main": "dist/exports.js",
"scripts": {
"build": "tsc --build",
"test:unit": "vitest --run --passWithNoTests",
"test": "pnpm run \"/^test:/\""
},
"packageManager": "pnpm@10.6.0",
"files": [
"dist"
],
"imports": {
"#root/*": "./src/*"
},
"exports": {
".": "./dist/exports.js"
},
"devDependencies": {
"@morten-olsen/fluxcurrent-configs": "workspace:*",
"@morten-olsen/fluxcurrent-tests": "workspace:*",
"@types/node": "24.3.1",
"@vitest/coverage-v8": "3.2.4",
"typescript": "5.9.2",
"vitest": "3.2.4"
},
"dependencies": {
"@fastify/cors": "^11.1.0",
"@fastify/swagger": "^9.5.1",
"@morten-olsen/fluxcurrent-core": "workspace:*",
"@scalar/fastify-api-reference": "^1.35.1",
"fastify": "^5.6.0",
"fastify-sse-v2": "^4.2.1",
"fastify-type-provider-zod": "^6.0.0",
"zod": "^4.1.5"
},
"name": "@morten-olsen/fluxcurrent-server",
"version": "1.0.0"
}

View File

@@ -0,0 +1,45 @@
import fastify from 'fastify';
import fastifySwagger from '@fastify/swagger';
import fastifyApiReference from '@scalar/fastify-api-reference';
import { jsonSchemaTransform, serializerCompiler, validatorCompiler } from 'fastify-type-provider-zod';
import type { Services } from '@morten-olsen/fluxcurrent-core/utils/services.ts';
import { FastifySSEPlugin } from 'fastify-sse-v2';
import { searchEndpoint } from './endpoints/endpoints.search.ts';
import { documentsEndpoint } from './endpoints/endpoints.documents.ts';
type CreateApiOptions = {
services: Services;
};
const createApi = async (options: CreateApiOptions) => {
const app = fastify();
app.setValidatorCompiler(validatorCompiler);
app.setSerializerCompiler(serializerCompiler);
app.register(fastifySwagger, {
openapi: {
info: {
title: 'SampleApi',
description: 'Sample backend service',
version: '1.0.0',
},
servers: [],
},
transform: jsonSchemaTransform,
});
await app.register(fastifyApiReference, {
routePrefix: '/docs',
});
await app.register(FastifySSEPlugin);
await app.register(searchEndpoint, { services: options.services, prefix: '/search' });
await app.register(documentsEndpoint, { services: options.services, prefix: '/documents' });
await app.ready();
return app;
};
export { createApi };

View File

@@ -0,0 +1,24 @@
import type { FastifyPluginAsyncZod } from 'fastify-type-provider-zod';
import { z } from 'zod/v4';
import type { Services } from '@morten-olsen/fluxcurrent-core/utils/services.ts';
import { DocumentsService } from '@morten-olsen/fluxcurrent-core/services/documents/documents.ts';
import { documentUpsertSchema } from '@morten-olsen/fluxcurrent-core/services/documents/documents.schemas.ts';
const documentsEndpoint: FastifyPluginAsyncZod<{ services: Services }> = async (fastify, { services }) => {
fastify.route({
method: 'POST',
url: '',
schema: {
body: z.object({
document: documentUpsertSchema,
}),
},
handler: async (req, res) => {
const documentsService = services.get(DocumentsService);
const documents = await documentsService.upsert(req.body.document);
res.send(documents);
},
});
};
export { documentsEndpoint };

View File

@@ -0,0 +1,62 @@
import type { FastifyPluginAsyncZod } from 'fastify-type-provider-zod';
import { z } from 'zod/v4';
import type { Services } from '@morten-olsen/fluxcurrent-core/utils/services.ts';
import { parseDSL } from '@morten-olsen/fluxcurrent-core/services/documents/documents.dsl.ts';
import { DocumentsService } from '@morten-olsen/fluxcurrent-core/services/documents/documents.ts';
import {
documentSchema,
type DocumentUpsertEvent,
} from '@morten-olsen/fluxcurrent-core/services/documents/documents.schemas.ts';
import { filterDocument } from '@morten-olsen/fluxcurrent-core/services/documents/documents.filter.ts';
const searchEndpoint: FastifyPluginAsyncZod<{ services: Services }> = async (fastify, { services }) => {
fastify.route({
method: 'POST',
url: '',
schema: {
body: z.object({
query: z.string().optional(),
}),
response: {
200: z.array(documentSchema),
},
},
handler: async (req, res) => {
const query = req.body.query ? parseDSL(req.body.query) : {};
const documentsService = services.get(DocumentsService);
const documents = await documentsService.search(query);
res.send(documents);
},
});
fastify.route({
method: 'GET',
url: '/stream',
schema: {
querystring: z.object({
query: z.string().optional(),
}),
},
handler: async (req, res) => {
const query = req.query.query ? parseDSL(req.query.query) : {};
const documentsService = services.get(DocumentsService);
res.sse({ event: 'init' });
const documents = await documentsService.search(query);
for (const document of documents) {
res.sse({ event: 'upsert', data: JSON.stringify(document) });
}
const listener = (event: DocumentUpsertEvent) => {
if (query.meta && !filterDocument(query.meta, event.document)) {
return;
}
res.sse({ event: 'upsert', data: JSON.stringify(event) });
};
documentsService.on('upsert', listener);
req.socket.on('close', () => {
documentsService.off('upsert', listener);
});
},
});
};
export { searchEndpoint };

View File

@@ -0,0 +1,664 @@
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { webhookConfigSchema, mqttConfigSchema, configSchema } from './schemas.config.ts';
describe('Configuration Schemas', () => {
// Store original environment variables to restore after tests
let originalEnv: NodeJS.ProcessEnv;
beforeEach(() => {
originalEnv = { ...process.env };
});
afterEach(() => {
process.env = originalEnv;
});
describe('Environment Variable Substitution', () => {
it('should substitute environment variables in strings', () => {
process.env.TEST_URL = 'https://example.com';
process.env.TEST_SECRET = 'secret123';
const config = {
type: 'webhook' as const,
url: '${TEST_URL}/webhook',
secret: 'Bearer ${TEST_SECRET}',
};
const result = webhookConfigSchema.parse(config);
expect(result.url).toBe('https://example.com/webhook');
expect(result.secret).toBe('Bearer secret123');
});
it('should handle missing environment variables by substituting empty string', () => {
const config = {
type: 'webhook' as const,
url: '${MISSING_VAR}/webhook',
};
const result = webhookConfigSchema.parse(config);
expect(result.url).toBe('/webhook');
});
it('should handle multiple environment variables in a single string', () => {
process.env.HOST = 'localhost';
process.env.PORT = '3000';
process.env.PATH_PREFIX = '/api/v1';
const config = {
type: 'webhook' as const,
url: 'http://${HOST}:${PORT}${PATH_PREFIX}/webhook',
};
const result = webhookConfigSchema.parse(config);
expect(result.url).toBe('http://localhost:3000/api/v1/webhook');
});
it('should leave strings without substitution patterns unchanged', () => {
const config = {
type: 'webhook' as const,
url: 'https://static.example.com/webhook',
};
const result = webhookConfigSchema.parse(config);
expect(result.url).toBe('https://static.example.com/webhook');
});
it('should handle empty strings', () => {
const config = {
type: 'webhook' as const,
url: '',
};
const result = webhookConfigSchema.parse(config);
expect(result.url).toBe('');
});
});
describe('Webhook Configuration', () => {
it('should parse valid webhook configuration with all fields', () => {
const config = {
type: 'webhook' as const,
url: 'https://example.com/webhook',
secret: 'secret123',
};
const result = webhookConfigSchema.parse(config);
expect(result).toEqual(config);
});
it('should parse webhook configuration without optional secret', () => {
const config = {
type: 'webhook' as const,
url: 'https://example.com/webhook',
};
const result = webhookConfigSchema.parse(config);
expect(result).toEqual(config);
expect(result.secret).toBeUndefined();
});
it('should reject invalid webhook type', () => {
const config = {
type: 'invalid',
url: 'https://example.com/webhook',
};
expect(() => webhookConfigSchema.parse(config)).toThrow();
});
it('should reject webhook configuration without required url', () => {
const config = {
type: 'webhook' as const,
};
expect(() => webhookConfigSchema.parse(config)).toThrow();
});
});
describe('MQTT Configuration', () => {
it('should parse valid MQTT configuration with all fields', () => {
const config = {
type: 'mqtt' as const,
url: 'mqtt://broker.example.com:1883',
username: 'user123',
password: 'pass123',
baseTopic: 'notifications',
};
const result = mqttConfigSchema.parse(config);
expect(result).toEqual(config);
});
it('should parse MQTT configuration with only required fields', () => {
const config = {
type: 'mqtt' as const,
url: 'mqtt://broker.example.com:1883',
baseTopic: 'notifications',
};
const result = mqttConfigSchema.parse(config);
expect(result).toEqual(config);
expect(result.username).toBeUndefined();
expect(result.password).toBeUndefined();
});
it('should handle environment variable substitution in MQTT fields', () => {
process.env.MQTT_HOST = 'mqtt.example.com';
process.env.MQTT_USER = 'mqttuser';
process.env.MQTT_PASS = 'mqttpass';
process.env.MQTT_TOPIC = 'app/notifications';
const config = {
type: 'mqtt' as const,
url: 'mqtt://${MQTT_HOST}:1883',
username: '${MQTT_USER}',
password: '${MQTT_PASS}',
baseTopic: '${MQTT_TOPIC}',
};
const result = mqttConfigSchema.parse(config);
expect(result.url).toBe('mqtt://mqtt.example.com:1883');
expect(result.username).toBe('mqttuser');
expect(result.password).toBe('mqttpass');
expect(result.baseTopic).toBe('app/notifications');
});
it('should reject invalid MQTT type', () => {
const config = {
type: 'invalid',
url: 'mqtt://broker.example.com:1883',
baseTopic: 'notifications',
};
expect(() => mqttConfigSchema.parse(config)).toThrow();
});
it('should reject MQTT configuration without required fields', () => {
const config = {
type: 'mqtt' as const,
url: 'mqtt://broker.example.com:1883',
// missing baseTopic
};
expect(() => mqttConfigSchema.parse(config)).toThrow();
});
});
describe('Database Configuration', () => {
describe('PGLite Configuration', () => {
it('should parse valid PGLite configuration with path', () => {
const config = {
database: {
type: 'pg-lite' as const,
path: './data/db.sqlite',
},
};
const result = configSchema.parse(config);
expect(result.database).toEqual(config.database);
});
it('should parse PGLite configuration without optional path', () => {
const config = {
database: {
type: 'pg-lite' as const,
},
};
const result = configSchema.parse(config);
expect(result.database.type).toBe('pg-lite');
expect(result.database.path).toBeUndefined();
});
it('should handle environment variable substitution in PGLite path', () => {
process.env.DB_PATH = '/var/data/app.db';
const config = {
database: {
type: 'pg-lite' as const,
path: '${DB_PATH}',
},
};
const result = configSchema.parse(config);
expect(result.database.path).toBe('/var/data/app.db');
});
});
describe('PostgreSQL Configuration', () => {
it('should parse valid PostgreSQL configuration with all fields', () => {
const config = {
database: {
type: 'pg' as const,
host: 'localhost',
port: '5432',
user: 'postgres',
password: 'password123',
database: 'myapp',
},
};
const result = configSchema.parse(config);
expect(result.database).toEqual({
...config.database,
port: 5432, // should be transformed to number
});
});
it('should parse PostgreSQL configuration without optional port (defaults to 5432)', () => {
const config = {
database: {
type: 'pg' as const,
host: 'localhost',
user: 'postgres',
password: 'password123',
database: 'myapp',
},
};
const result = configSchema.parse(config);
expect(result.database.port).toBe(5432);
});
it('should transform port string to number', () => {
const config = {
database: {
type: 'pg' as const,
host: 'localhost',
port: '3306',
user: 'postgres',
password: 'password123',
database: 'myapp',
},
};
const result = configSchema.parse(config);
expect(result.database.port).toBe(3306);
expect(typeof result.database.port).toBe('number');
});
it('should handle environment variable substitution in PostgreSQL fields', () => {
process.env.DB_HOST = 'prod-db.example.com';
process.env.DB_USER = 'app_user';
process.env.DB_PASS = 'secure_password';
process.env.DB_NAME = 'production_db';
process.env.DB_PORT = '5433';
const config = {
database: {
type: 'pg' as const,
host: '${DB_HOST}',
port: '${DB_PORT}',
user: '${DB_USER}',
password: '${DB_PASS}',
database: '${DB_NAME}',
},
};
const result = configSchema.parse(config);
expect(result.database.host).toBe('prod-db.example.com');
expect(result.database.port).toBe(5433);
expect(result.database.user).toBe('app_user');
expect(result.database.password).toBe('secure_password');
expect(result.database.database).toBe('production_db');
});
it('should reject PostgreSQL configuration with missing required fields', () => {
const config = {
database: {
type: 'pg' as const,
host: 'localhost',
// missing user, password, database
},
};
expect(() => configSchema.parse(config)).toThrow();
});
});
it('should use PGLite as default when no database config is provided', () => {
const config = {};
const result = configSchema.parse(config);
expect(result.database).toEqual({ type: 'pg-lite' });
});
});
describe('OIDC Configuration', () => {
it('should parse valid OIDC configuration', () => {
const config = {
oidc: {
type: 'oidc' as const,
issuer: 'https://auth.example.com',
clientId: 'client123',
clientSecret: 'secret123',
},
};
const result = configSchema.parse(config);
expect(result.oidc).toEqual(config.oidc);
});
it('should handle environment variable substitution in OIDC fields', () => {
process.env.OIDC_ISSUER = 'https://login.provider.com';
process.env.OIDC_CLIENT_ID = 'my-app-client';
process.env.OIDC_CLIENT_SECRET = 'very-secret-key';
const config = {
oidc: {
type: 'oidc' as const,
issuer: '${OIDC_ISSUER}',
clientId: '${OIDC_CLIENT_ID}',
clientSecret: '${OIDC_CLIENT_SECRET}',
},
};
const result = configSchema.parse(config);
expect(result.oidc!.issuer).toBe('https://login.provider.com');
expect(result.oidc!.clientId).toBe('my-app-client');
expect(result.oidc!.clientSecret).toBe('very-secret-key');
});
it('should be optional and undefined when not provided', () => {
const config = {};
const result = configSchema.parse(config);
expect(result.oidc).toBeUndefined();
});
it('should reject OIDC configuration with missing required fields', () => {
const config = {
oidc: {
type: 'oidc' as const,
issuer: 'https://auth.example.com',
// missing clientId and clientSecret
},
};
expect(() => configSchema.parse(config)).toThrow();
});
});
describe('Notifications Configuration', () => {
it('should parse empty notifications array as default', () => {
const config = {};
const result = configSchema.parse(config);
expect(result.notifications).toEqual([]);
});
it('should parse array with webhook notifications', () => {
const config = {
notifications: [
{
type: 'webhook' as const,
url: 'https://hook1.example.com',
secret: 'secret1',
},
{
type: 'webhook' as const,
url: 'https://hook2.example.com',
},
],
};
const result = configSchema.parse(config);
expect(result.notifications).toEqual(config.notifications);
});
it('should parse array with MQTT notifications', () => {
const config = {
notifications: [
{
type: 'mqtt' as const,
url: 'mqtt://broker1.example.com',
baseTopic: 'app1/notifications',
username: 'user1',
password: 'pass1',
},
{
type: 'mqtt' as const,
url: 'mqtt://broker2.example.com',
baseTopic: 'app2/notifications',
},
],
};
const result = configSchema.parse(config);
expect(result.notifications).toEqual(config.notifications);
});
it('should parse mixed webhook and MQTT notifications', () => {
const config = {
notifications: [
{
type: 'webhook' as const,
url: 'https://webhook.example.com',
secret: 'webhook-secret',
},
{
type: 'mqtt' as const,
url: 'mqtt://mqtt.example.com',
baseTopic: 'notifications',
username: 'mqttuser',
},
],
};
const result = configSchema.parse(config);
expect(result.notifications).toEqual(config.notifications);
});
it('should reject invalid notification types', () => {
const config = {
notifications: [
{
type: 'invalid',
url: 'https://example.com',
},
],
};
expect(() => configSchema.parse(config)).toThrow();
});
});
describe('Complete Configuration', () => {
it('should parse complete configuration with all components', () => {
process.env.DB_HOST = 'db.example.com';
process.env.WEBHOOK_SECRET = 'webhook123';
process.env.OIDC_ISSUER = 'https://auth.example.com';
const config = {
database: {
type: 'pg' as const,
host: '${DB_HOST}',
port: '5432',
user: 'postgres',
password: 'password',
database: 'myapp',
},
oidc: {
type: 'oidc' as const,
issuer: '${OIDC_ISSUER}',
clientId: 'client123',
clientSecret: 'secret123',
},
notifications: [
{
type: 'webhook' as const,
url: 'https://webhook.example.com',
secret: '${WEBHOOK_SECRET}',
},
{
type: 'mqtt' as const,
url: 'mqtt://mqtt.example.com',
baseTopic: 'notifications',
},
],
};
const result = configSchema.parse(config);
expect(result.database.type).toBe('pg');
expect(result.database.host).toBe('db.example.com');
expect(result.database.port).toBe(5432);
expect(result.oidc?.issuer).toBe('https://auth.example.com');
expect(result.notifications).toHaveLength(2);
expect(result.notifications[0].type).toBe('webhook');
expect((result.notifications[0] as any).secret).toBe('webhook123');
expect(result.notifications[1].type).toBe('mqtt');
});
it('should parse minimal configuration with defaults', () => {
const config = {};
const result = configSchema.parse(config);
expect(result.database).toEqual({ type: 'pg-lite' });
expect(result.oidc).toBeUndefined();
expect(result.notifications).toEqual([]);
});
it('should override defaults when values are provided', () => {
const config = {
database: {
type: 'pg' as const,
host: 'localhost',
user: 'postgres',
password: 'password',
database: 'test',
},
notifications: [
{
type: 'webhook' as const,
url: 'https://test.com',
},
],
};
const result = configSchema.parse(config);
expect(result.database.type).toBe('pg');
expect(result.notifications).toHaveLength(1);
});
});
describe('Error Cases', () => {
it('should reject configuration with invalid database type', () => {
const config = {
database: {
type: 'invalid-db-type',
host: 'localhost',
},
};
expect(() => configSchema.parse(config)).toThrow();
});
it('should reject configuration with invalid notification structure', () => {
const config = {
notifications: [
{
type: 'webhook',
// missing required url
},
],
};
expect(() => configSchema.parse(config)).toThrow();
});
it('should reject non-array notifications', () => {
const config = {
notifications: {
type: 'webhook',
url: 'https://example.com',
},
};
expect(() => configSchema.parse(config)).toThrow();
});
it('should reject invalid OIDC type', () => {
const config = {
oidc: {
type: 'invalid-oidc',
issuer: 'https://example.com',
clientId: 'client',
clientSecret: 'secret',
},
};
expect(() => configSchema.parse(config)).toThrow();
});
});
describe('Edge Cases', () => {
it('should handle environment variables with special characters', () => {
process.env.SPECIAL_VAR = 'value-with-dashes_and_underscores.dots';
const config = {
notifications: [
{
type: 'webhook' as const,
url: 'https://example.com/${SPECIAL_VAR}',
},
],
};
const result = configSchema.parse(config);
expect((result.notifications[0] as any).url).toBe('https://example.com/value-with-dashes_and_underscores.dots');
});
it('should handle nested environment variable patterns', () => {
process.env.HOST = 'example.com';
const config = {
notifications: [
{
type: 'webhook' as const,
url: 'https://${HOST}/${HOST}/webhook',
},
],
};
const result = configSchema.parse(config);
expect((result.notifications[0] as any).url).toBe('https://example.com/example.com/webhook');
});
it('should handle port transformation with invalid number', () => {
const config = {
database: {
type: 'pg' as const,
host: 'localhost',
port: 'invalid-port',
user: 'postgres',
password: 'password',
database: 'test',
},
};
const result = configSchema.parse(config);
expect(result.database.port).toBeNaN();
});
it('should handle empty environment variable names in substitution', () => {
const config = {
notifications: [
{
type: 'webhook' as const,
url: 'https://example.com/${}',
},
],
};
const result = configSchema.parse(config);
// Empty braces ${} don't match the regex \w+ so they remain unchanged
expect((result.notifications[0] as any).url).toBe('https://example.com/${}');
});
});
});

View File

@@ -0,0 +1,57 @@
import { z } from 'zod';
const stringWithEnvSubstitutionSchema = z.string().transform((value) => {
if (!value) {
return value;
}
return value.replace(/\${(\w+)}/g, (_, p1) => {
return process.env[p1] || '';
});
});
const webhookConfigSchema = z.object({
type: z.literal('webhook'),
url: stringWithEnvSubstitutionSchema,
secret: stringWithEnvSubstitutionSchema.optional(),
});
const mqttConfigSchema = z.object({
type: z.literal('mqtt'),
url: stringWithEnvSubstitutionSchema,
username: stringWithEnvSubstitutionSchema.optional(),
password: stringWithEnvSubstitutionSchema.optional(),
baseTopic: stringWithEnvSubstitutionSchema,
});
const oidcConfigSchema = z.object({
type: z.literal('oidc'),
issuer: stringWithEnvSubstitutionSchema,
clientId: stringWithEnvSubstitutionSchema,
clientSecret: stringWithEnvSubstitutionSchema,
});
const notificationsConfigSchema = z.union([webhookConfigSchema, mqttConfigSchema]);
const pgLiteConfigSchema = z.object({
type: z.literal('pg-lite'),
path: stringWithEnvSubstitutionSchema.optional(),
});
const pgConfigSchema = z.object({
type: z.literal('pg'),
host: stringWithEnvSubstitutionSchema,
port: stringWithEnvSubstitutionSchema.optional().transform((value) => (value ? parseInt(value) : 5432)),
user: stringWithEnvSubstitutionSchema,
password: stringWithEnvSubstitutionSchema,
database: stringWithEnvSubstitutionSchema,
});
const databaseConfigSchema = z.union([pgLiteConfigSchema, pgConfigSchema]);
const configSchema = z.object({
database: databaseConfigSchema.default({ type: 'pg-lite' }),
oidc: oidcConfigSchema.optional(),
notifications: z.array(notificationsConfigSchema).default([]),
});
export { webhookConfigSchema, mqttConfigSchema, configSchema };

View File

@@ -0,0 +1,8 @@
import { Services } from '@morten-olsen/fluxcurrent-core/utils/services.ts';
import { createApi } from './api/api.ts';
const services = new Services();
const api = await createApi({ services });
await api.listen({ port: 3000 });

View File

@@ -0,0 +1,12 @@
{
"compilerOptions": {
"outDir": "./dist",
"paths": {
"#root/*": ["./src"]
}
},
"include": [
"src/**/*.ts"
],
"extends": "@morten-olsen/fluxcurrent-configs/tsconfig.json"
}

View File

@@ -0,0 +1,12 @@
import { defineConfig } from 'vitest/config';
import { getAliases } from '@morten-olsen/fluxcurrent-tests/vitest';
// eslint-disable-next-line import/no-default-export
export default defineConfig(async () => {
const aliases = await getAliases();
return {
resolve: {
alias: aliases,
},
};
});