This commit is contained in:
Morten Olsen
2025-09-09 12:46:31 +02:00
parent 1b02bc8e81
commit 7eb5266fb6
13 changed files with 421 additions and 33 deletions

View File

@@ -27,11 +27,9 @@
},
"name": "@morten-olsen/fluxcurrent-core",
"version": "1.0.0",
"imports": {
"#root/*": "./src/*"
},
"dependencies": {
"knex": "^3.1.0",
"knex-pglite": "^0.12.0",
"pg": "^8.16.3",
"zod": "^4.1.5"
}

View File

@@ -0,0 +1,31 @@
import knex, { type Knex } from 'knex';
import ClientPgLite from 'knex-pglite';
import { createMigrationSource } from './migrations/migrations.ts';
class DatabaseService {
#dbPromise?: Promise<Knex>;
#setup = async () => {
const db = knex({
client: ClientPgLite,
dialect: 'postgres',
connection: {},
migrations: {
migrationSource: createMigrationSource({}),
},
});
await db.migrate.latest();
return db;
};
public getDb = async () => {
if (!this.#dbPromise) {
this.#dbPromise = this.#setup();
}
return this.#dbPromise;
};
}
export { tableNames, type TableRow } from './migrations/migrations.ts';
export { DatabaseService };

View File

@@ -0,0 +1,95 @@
import type { Migration } from './migrations.types.ts';
type MetaFieldType = string | number | boolean;
const tableNames = {
typeSchemas: 'typeSchemas',
documents: 'documents',
};
const init: Migration = {
name: 'init',
up: async ({ knex }) => {
await knex.schema.createTable(tableNames.typeSchemas, (table) => {
table.string('name');
table.string('version');
table.jsonb('schema');
table.datetime('createdAt').notNullable();
table.datetime('updatedAt').notNullable();
table.datetime('deletedAt').nullable();
});
await knex.schema.createTable(tableNames.documents, (table) => {
table.string('uri').notNullable();
table.string('type').notNullable();
table.string('typeVersion').nullable();
table.datetime('createdAt').notNullable();
table.datetime('updatedAt').notNullable();
table.datetime('deletedAt').nullable();
table.jsonb('metadata').notNullable();
table.jsonb('data').notNullable();
// Primary key and unique constraints
table.primary(['uri', 'type']);
table.index(['uri']);
table.index(['type']);
table.index(['createdAt']);
table.index(['updatedAt']);
table.index(['deletedAt']);
});
// Add indexes for typeSchemas table
await knex.schema.alterTable(tableNames.typeSchemas, (table) => {
table.primary(['name', 'version']);
table.index(['name']);
table.index(['createdAt']);
table.index(['updatedAt']);
});
// Add GIN index for JSONB metadata column for efficient meta search
await knex.raw('CREATE INDEX documents_metadata_gin_idx ON documents USING GIN (metadata);');
// Add partial indexes for active (non-deleted) documents - most common query pattern
await knex.raw('CREATE INDEX documents_active_uri_idx ON documents (uri) WHERE "deletedAt" IS NULL;');
await knex.raw('CREATE INDEX documents_active_type_idx ON documents (type) WHERE "deletedAt" IS NULL;');
// Add partial index for active schemas
await knex.raw('CREATE INDEX type_schemas_active_name_idx ON "typeSchemas" (name) WHERE "deletedAt" IS NULL;');
},
down: async ({ knex }) => {
await knex.raw('DROP INDEX IF EXISTS documents_metadata_gin_idx;');
await knex.raw('DROP INDEX IF EXISTS documents_active_uri_idx;');
await knex.raw('DROP INDEX IF EXISTS documents_active_type_idx;');
await knex.raw('DROP INDEX IF EXISTS type_schemas_active_name_idx;');
await knex.schema.dropTableIfExists(tableNames.documents);
await knex.schema.dropTableIfExists(tableNames.typeSchemas);
},
};
type TypeSchemaRow = {
name: string;
version: string;
schema: Record<string, unknown>;
createdAt: string;
updatedAt: string;
deletedAt: string | null;
};
type DocumentRow = {
uri: string;
type: string;
typeVersion: string | null;
createdAt: string;
updatedAt: string;
deletedAt: string | null;
metadata: Record<string, MetaFieldType>;
data: Record<string, unknown>;
};
type TableRow = {
typeSchema: TypeSchemaRow;
document: DocumentRow;
};
export type { TableRow };
export { init, tableNames };

View File

@@ -0,0 +1,20 @@
import type { Knex } from 'knex';
import { init } from './migrations.001-init.ts';
import type { Migration, MigrationInput } from './migrations.types.ts';
const migrations: Migration[] = [init];
const createMigrationSource = (options: Omit<MigrationInput, 'knex'>): Knex.MigrationSource<Migration> => {
return {
getMigrations: async () => migrations,
getMigrationName: (migration) => migration.name,
getMigration: async (migration) => ({
up: async (knex) => migration.up({ ...options, knex }),
down: async (knex) => migration.down({ ...options, knex }),
}),
};
};
export { tableNames, type TableRow } from './migrations.001-init.ts';
export { createMigrationSource };

View File

@@ -0,0 +1,13 @@
import type { Knex } from 'knex';
type MigrationInput = {
knex: Knex;
};
type Migration = {
readonly name: string;
up: (input: MigrationInput) => Promise<void>;
down: (input: MigrationInput) => Promise<void>;
};
export type { Migration, MigrationInput };

View File

@@ -4,13 +4,10 @@ const metaValueSchema = z.union([z.string(), z.number(), z.boolean()]);
const documentSchema = z.object({
uri: z.string(),
created: z.iso.datetime(),
updated: z.iso.datetime(),
deleted: z.iso.datetime().nullish(),
createdAt: z.iso.datetime(),
updatedAt: z.iso.datetime(),
deletedAt: z.iso.datetime().nullish(),
type: z.string(),
title: z.string().nullish(),
description: z.string().nullish(),
tags: z.array(z.string()),
metadata: z.record(z.string(), metaValueSchema),
data: z.record(z.string(), z.unknown()),
});
@@ -18,25 +15,13 @@ const documentSchema = z.object({
type Document = z.infer<typeof Document>;
const documentUpsertSchema = documentSchema.omit({
created: true,
updated: true,
deleted: true,
createdAt: true,
updatedAt: true,
deletedAt: true,
});
type DocumentUpsert = z.infer<typeof documentUpsertSchema>;
const metaDateFilterSchema = z.object({
type: z.literal('date'),
field: z.string(),
filter: z.object({
gt: z.iso.datetime().optional(),
gte: z.iso.datetime().optional(),
lt: z.iso.datetime().optional(),
lte: z.iso.datetime().optional(),
nill: z.boolean().optional(),
}),
});
const metaNumberFilterSchema = z.object({
type: z.literal('number'),
field: z.string(),
@@ -72,22 +57,37 @@ const metaBoolFilterSchema = z.object({
}),
});
const metaFilterSchema = z.union([
metaDateFilterSchema,
metaNumberFilterSchema,
metaTextFilterSchema,
metaBoolFilterSchema,
]);
const metaFilterSchema = z.union([metaNumberFilterSchema, metaTextFilterSchema, metaBoolFilterSchema]);
type MetaFilter = z.infer<typeof metaFilterSchema>;
const metaConditionSchema = z.union([
z.object({
type: z.literal('and'),
get conditions() {
return z.array(metaConditionSchema);
},
}),
z.object({
type: z.literal('or'),
get conditions() {
return z.array(metaConditionSchema);
},
}),
metaFilterSchema,
]);
type MetaCondition = z.infer<typeof metaConditionSchema>;
const documentSearchOptionsSchema = z.object({
uris: z.array(z.string()).optional(),
types: z.array(z.string()).optional(),
meta: z.array(metaFilterSchema).optional(),
meta: metaConditionSchema.optional(),
limit: z.number().optional(),
offset: z.number().optional(),
});
type DocumentSearchOptions = z.infer<typeof documentSearchOptionsSchema>;
export type { Document, DocumentUpsert, MetaFilter, DocumentSearchOptions };
export { documentSchema, documentUpsertSchema, metaFilterSchema, documentSearchOptionsSchema };
export type { Document, DocumentUpsert, MetaFilter, MetaCondition, DocumentSearchOptions };
export { documentSchema, documentUpsertSchema, metaFilterSchema, metaConditionSchema, documentSearchOptionsSchema };

View File

@@ -0,0 +1,28 @@
import { describe, it, expect, beforeEach } from 'vitest';
import { DocumentsService } from './documents.ts';
import { Services } from '#root/utils/services.ts';
describe('DocumentsService', () => {
let services: Services;
let documentsService: DocumentsService;
beforeEach(() => {
services = new Services();
documentsService = services.get(DocumentsService);
});
it('should upsert a document', async () => {
const document = {
uri: 'test',
type: 'test',
metadata: {
test: 'test',
},
data: {
test: 'test',
},
};
await documentsService.upsert(document);
});
});

View File

@@ -0,0 +1,170 @@
import type { Knex } from 'knex';
import { DatabaseService, tableNames, type TableRow } from '../database/database.ts';
import type { DocumentSearchOptions, DocumentUpsert, MetaCondition, MetaFilter } from './documents.schemas.ts';
import type { Services } from '#root/utils/services.ts';
class DocumentsService {
#services: Services;
constructor(services: Services) {
this.#services = services;
}
public upsert = async (document: DocumentUpsert) => {
const db = await this.#services.get(DatabaseService).getDb();
const baseItem = {
...document,
updatedAt: new Date(),
deletedAt: null,
};
await db('documents')
.insert({ ...baseItem, createdAt: new Date() })
.onConflict(['uri', 'type'])
.merge({
...baseItem,
});
};
public search = async (options: DocumentSearchOptions) => {
const { uris, types, meta, limit, offset } = options;
const db = await this.#services.get(DatabaseService).getDb();
let query = db<TableRow['document']>(tableNames.documents);
if (uris) {
query = query.whereIn('uri', uris);
}
if (types) {
query = query.whereIn('type', types);
}
if (limit) {
query = query.limit(limit);
}
if (meta) {
query = query.where((builder) => {
this.buildMetaCondition(builder, meta);
});
}
if (offset) {
query = query.offset(offset);
}
return query;
};
/**
* Recursively builds meta search conditions with proper scoping
*/
private buildMetaCondition(builder: Knex.QueryBuilder, condition: MetaCondition): void {
if (condition.type === 'and') {
// Handle AND conditions - all must be true
for (const [index, subCondition] of condition.conditions.entries()) {
if (index === 0) {
// First condition doesn't need andWhere
builder.where((subBuilder) => {
this.buildMetaCondition(subBuilder, subCondition);
});
} else {
builder.andWhere((subBuilder) => {
this.buildMetaCondition(subBuilder, subCondition);
});
}
}
} else if (condition.type === 'or') {
// Handle OR conditions - at least one must be true
for (const [index, subCondition] of condition.conditions.entries()) {
if (index === 0) {
// First condition doesn't need orWhere
builder.where((subBuilder) => {
this.buildMetaCondition(subBuilder, subCondition);
});
} else {
builder.orWhere((subBuilder) => {
this.buildMetaCondition(subBuilder, subCondition);
});
}
}
} else {
// Handle individual filter conditions
this.buildMetaFilter(builder, condition);
}
}
/**
* Builds individual meta filter conditions using JSONB operators
*/
private buildMetaFilter(builder: Knex.QueryBuilder, filter: MetaFilter): void {
const fieldPath = `metadata->'${filter.field}'`;
if (filter.type === 'number') {
const { gt, gte, lt, lte, eq, neq, nill } = filter.filter;
if (nill !== undefined) {
if (nill) {
builder.whereNull(`metadata->'${filter.field}'`);
} else {
builder.whereNotNull(`metadata->'${filter.field}'`);
}
return;
}
if (eq !== undefined) {
builder.where(builder.client.raw(`(${fieldPath})::numeric`), eq);
}
if (neq !== undefined) {
builder.where(builder.client.raw(`(${fieldPath})::numeric`), '!=', neq);
}
if (gt !== undefined) {
builder.where(builder.client.raw(`(${fieldPath})::numeric`), '>', gt);
}
if (gte !== undefined) {
builder.where(builder.client.raw(`(${fieldPath})::numeric`), '>=', gte);
}
if (lt !== undefined) {
builder.where(builder.client.raw(`(${fieldPath})::numeric`), '<', lt);
}
if (lte !== undefined) {
builder.where(builder.client.raw(`(${fieldPath})::numeric`), '<=', lte);
}
} else if (filter.type === 'text') {
const { eq, neq, like, nlike, nill } = filter.filter;
if (nill !== undefined) {
if (nill) {
builder.whereNull(`metadata->'${filter.field}'`);
} else {
builder.whereNotNull(`metadata->'${filter.field}'`);
}
return;
}
if (eq !== undefined) {
builder.where(builder.client.raw(`metadata->>'${filter.field}'`), eq);
}
if (neq !== undefined) {
builder.where(builder.client.raw(`metadata->>'${filter.field}'`), '!=', neq);
}
if (like !== undefined) {
builder.where(builder.client.raw(`metadata->>'${filter.field}'`), 'like', like);
}
if (nlike !== undefined) {
builder.where(builder.client.raw(`metadata->>'${filter.field}'`), 'not like', nlike);
}
} else if (filter.type === 'bool') {
const { eq, nill } = filter.filter;
if (nill !== undefined) {
if (nill) {
builder.whereNull(`metadata->'${filter.field}'`);
} else {
builder.whereNotNull(`metadata->'${filter.field}'`);
}
return;
}
builder.where(builder.client.raw(`(${fieldPath})::boolean`), eq);
}
}
}
export { DocumentsService };

View File

@@ -0,0 +1,14 @@
type ServiceDepencency<T> = new (services: Services) => T;
class Services {
#instances: Map<ServiceDepencency<unknown>, unknown> = new Map<ServiceDepencency<unknown>, unknown>();
public get<T>(service: ServiceDepencency<T>) {
if (!this.#instances.has(service)) {
this.#instances.set(service, new service(this));
}
return this.#instances.get(service) as T;
}
}
export { Services };

19
pnpm-lock.yaml generated
View File

@@ -55,6 +55,9 @@ importers:
knex:
specifier: ^3.1.0
version: 3.1.0(pg@8.16.3)
knex-pglite:
specifier: ^0.12.0
version: 0.12.0(@electric-sql/pglite@0.3.8)(knex@3.1.0(pg@8.16.3))
pg:
specifier: ^8.16.3
version: 8.16.3
@@ -134,6 +137,9 @@ packages:
resolution: {integrity: sha512-6zABk/ECA/QYSCQ1NGiVwwbQerUCZ+TQbp64Q3AgmfNvurHH0j8TtXa1qbShXA6qqkpAj4V5W8pP6mLe1mcMqA==}
engines: {node: '>=18'}
'@electric-sql/pglite@0.3.8':
resolution: {integrity: sha512-VlAz/R7mktifp9IHzNvjxWJM8p3fPH2lHpustYuRSOXOpXiAMTlA5qqxcufPaDnfee6CZCE9qrT1MHDT7riSHg==}
'@esbuild/aix-ppc64@0.25.9':
resolution: {integrity: sha512-OaGtL73Jck6pBKjNIe24BnFE6agGl+6KxDtTfHhy1HmhthfKouEcOhqpSL64K4/0WCtbKFLOdzD/44cJ4k9opA==}
engines: {node: '>=18'}
@@ -1593,6 +1599,12 @@ packages:
keyv@4.5.4:
resolution: {integrity: sha512-oxVHkHR/EJf2CNXnWxRLW6mg7JyCCUcG0DtEGmL2ctUo1PNTin1PUil+r/+4r5MpVgC/fn1kjsx7mjSujKqIpw==}
knex-pglite@0.12.0:
resolution: {integrity: sha512-EsTpIJ8D1SaFm5sVNqKf+Q57bnPGVEpVWwZXXxGrzDyIwtHOwAnd59dY8izkR/nJt8OFrLHMudqaPKfXajOHsA==}
peerDependencies:
'@electric-sql/pglite': 0.x
knex: 3.x
knex@3.1.0:
resolution: {integrity: sha512-GLoII6hR0c4ti243gMs5/1Rb3B+AjwMOfjYm97pu0FOQa7JH56hgBxYf5WK2525ceSbBY1cjeZ9yk99GPMB6Kw==}
engines: {node: '>=16'}
@@ -2501,6 +2513,8 @@ snapshots:
'@bcoe/v8-coverage@1.0.2': {}
'@electric-sql/pglite@0.3.8': {}
'@esbuild/aix-ppc64@0.25.9':
optional: true
@@ -4095,6 +4109,11 @@ snapshots:
dependencies:
json-buffer: 3.0.1
knex-pglite@0.12.0(@electric-sql/pglite@0.3.8)(knex@3.1.0(pg@8.16.3)):
dependencies:
'@electric-sql/pglite': 0.3.8
knex: 3.1.0(pg@8.16.3)
knex@3.1.0(pg@8.16.3):
dependencies:
colorette: 2.0.19