diff --git a/packages/core/package.json b/packages/core/package.json index 9831726..b8a8d8a 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -27,11 +27,9 @@ }, "name": "@morten-olsen/fluxcurrent-core", "version": "1.0.0", - "imports": { - "#root/*": "./src/*" - }, "dependencies": { "knex": "^3.1.0", + "knex-pglite": "^0.12.0", "pg": "^8.16.3", "zod": "^4.1.5" } diff --git a/packages/core/src/database/database.ts b/packages/core/src/database/database.ts deleted file mode 100644 index e69de29..0000000 diff --git a/packages/core/src/database/migrations/migrations.001-init.ts b/packages/core/src/database/migrations/migrations.001-init.ts deleted file mode 100644 index e69de29..0000000 diff --git a/packages/core/src/database/migrations/migrations.ts b/packages/core/src/database/migrations/migrations.ts deleted file mode 100644 index e69de29..0000000 diff --git a/packages/core/src/services/database/database.ts b/packages/core/src/services/database/database.ts new file mode 100644 index 0000000..7e64426 --- /dev/null +++ b/packages/core/src/services/database/database.ts @@ -0,0 +1,31 @@ +import knex, { type Knex } from 'knex'; +import ClientPgLite from 'knex-pglite'; + +import { createMigrationSource } from './migrations/migrations.ts'; + +class DatabaseService { + #dbPromise?: Promise; + + #setup = async () => { + const db = knex({ + client: ClientPgLite, + dialect: 'postgres', + connection: {}, + migrations: { + migrationSource: createMigrationSource({}), + }, + }); + await db.migrate.latest(); + return db; + }; + + public getDb = async () => { + if (!this.#dbPromise) { + this.#dbPromise = this.#setup(); + } + return this.#dbPromise; + }; +} + +export { tableNames, type TableRow } from './migrations/migrations.ts'; +export { DatabaseService }; diff --git a/packages/core/src/services/database/migrations/migrations.001-init.ts b/packages/core/src/services/database/migrations/migrations.001-init.ts new file mode 100644 index 0000000..9ce2f18 --- /dev/null +++ b/packages/core/src/services/database/migrations/migrations.001-init.ts @@ -0,0 +1,95 @@ +import type { Migration } from './migrations.types.ts'; + +type MetaFieldType = string | number | boolean; + +const tableNames = { + typeSchemas: 'typeSchemas', + documents: 'documents', +}; + +const init: Migration = { + name: 'init', + up: async ({ knex }) => { + await knex.schema.createTable(tableNames.typeSchemas, (table) => { + table.string('name'); + table.string('version'); + table.jsonb('schema'); + table.datetime('createdAt').notNullable(); + table.datetime('updatedAt').notNullable(); + table.datetime('deletedAt').nullable(); + }); + + await knex.schema.createTable(tableNames.documents, (table) => { + table.string('uri').notNullable(); + table.string('type').notNullable(); + table.string('typeVersion').nullable(); + table.datetime('createdAt').notNullable(); + table.datetime('updatedAt').notNullable(); + table.datetime('deletedAt').nullable(); + table.jsonb('metadata').notNullable(); + table.jsonb('data').notNullable(); + + // Primary key and unique constraints + table.primary(['uri', 'type']); + table.index(['uri']); + table.index(['type']); + table.index(['createdAt']); + table.index(['updatedAt']); + table.index(['deletedAt']); + }); + + // Add indexes for typeSchemas table + await knex.schema.alterTable(tableNames.typeSchemas, (table) => { + table.primary(['name', 'version']); + table.index(['name']); + table.index(['createdAt']); + table.index(['updatedAt']); + }); + + // Add GIN index for JSONB metadata column for efficient meta search + await knex.raw('CREATE INDEX documents_metadata_gin_idx ON documents USING GIN (metadata);'); + + // Add partial indexes for active (non-deleted) documents - most common query pattern + await knex.raw('CREATE INDEX documents_active_uri_idx ON documents (uri) WHERE "deletedAt" IS NULL;'); + await knex.raw('CREATE INDEX documents_active_type_idx ON documents (type) WHERE "deletedAt" IS NULL;'); + + // Add partial index for active schemas + await knex.raw('CREATE INDEX type_schemas_active_name_idx ON "typeSchemas" (name) WHERE "deletedAt" IS NULL;'); + }, + down: async ({ knex }) => { + await knex.raw('DROP INDEX IF EXISTS documents_metadata_gin_idx;'); + await knex.raw('DROP INDEX IF EXISTS documents_active_uri_idx;'); + await knex.raw('DROP INDEX IF EXISTS documents_active_type_idx;'); + await knex.raw('DROP INDEX IF EXISTS type_schemas_active_name_idx;'); + await knex.schema.dropTableIfExists(tableNames.documents); + await knex.schema.dropTableIfExists(tableNames.typeSchemas); + }, +}; + +type TypeSchemaRow = { + name: string; + version: string; + schema: Record; + createdAt: string; + updatedAt: string; + deletedAt: string | null; +}; + +type DocumentRow = { + uri: string; + type: string; + typeVersion: string | null; + createdAt: string; + updatedAt: string; + deletedAt: string | null; + metadata: Record; + data: Record; +}; + +type TableRow = { + typeSchema: TypeSchemaRow; + document: DocumentRow; +}; + +export type { TableRow }; +export { init, tableNames }; diff --git a/packages/core/src/services/database/migrations/migrations.ts b/packages/core/src/services/database/migrations/migrations.ts new file mode 100644 index 0000000..97317fa --- /dev/null +++ b/packages/core/src/services/database/migrations/migrations.ts @@ -0,0 +1,20 @@ +import type { Knex } from 'knex'; + +import { init } from './migrations.001-init.ts'; +import type { Migration, MigrationInput } from './migrations.types.ts'; + +const migrations: Migration[] = [init]; + +const createMigrationSource = (options: Omit): Knex.MigrationSource => { + return { + getMigrations: async () => migrations, + getMigrationName: (migration) => migration.name, + getMigration: async (migration) => ({ + up: async (knex) => migration.up({ ...options, knex }), + down: async (knex) => migration.down({ ...options, knex }), + }), + }; +}; + +export { tableNames, type TableRow } from './migrations.001-init.ts'; +export { createMigrationSource }; diff --git a/packages/core/src/services/database/migrations/migrations.types.ts b/packages/core/src/services/database/migrations/migrations.types.ts new file mode 100644 index 0000000..8ef1851 --- /dev/null +++ b/packages/core/src/services/database/migrations/migrations.types.ts @@ -0,0 +1,13 @@ +import type { Knex } from 'knex'; + +type MigrationInput = { + knex: Knex; +}; + +type Migration = { + readonly name: string; + up: (input: MigrationInput) => Promise; + down: (input: MigrationInput) => Promise; +}; + +export type { Migration, MigrationInput }; diff --git a/packages/core/src/services/documents/documents.schemas.ts b/packages/core/src/services/documents/documents.schemas.ts index d4c7b6b..d68408e 100644 --- a/packages/core/src/services/documents/documents.schemas.ts +++ b/packages/core/src/services/documents/documents.schemas.ts @@ -4,13 +4,10 @@ const metaValueSchema = z.union([z.string(), z.number(), z.boolean()]); const documentSchema = z.object({ uri: z.string(), - created: z.iso.datetime(), - updated: z.iso.datetime(), - deleted: z.iso.datetime().nullish(), + createdAt: z.iso.datetime(), + updatedAt: z.iso.datetime(), + deletedAt: z.iso.datetime().nullish(), type: z.string(), - title: z.string().nullish(), - description: z.string().nullish(), - tags: z.array(z.string()), metadata: z.record(z.string(), metaValueSchema), data: z.record(z.string(), z.unknown()), }); @@ -18,25 +15,13 @@ const documentSchema = z.object({ type Document = z.infer; const documentUpsertSchema = documentSchema.omit({ - created: true, - updated: true, - deleted: true, + createdAt: true, + updatedAt: true, + deletedAt: true, }); type DocumentUpsert = z.infer; -const metaDateFilterSchema = z.object({ - type: z.literal('date'), - field: z.string(), - filter: z.object({ - gt: z.iso.datetime().optional(), - gte: z.iso.datetime().optional(), - lt: z.iso.datetime().optional(), - lte: z.iso.datetime().optional(), - nill: z.boolean().optional(), - }), -}); - const metaNumberFilterSchema = z.object({ type: z.literal('number'), field: z.string(), @@ -72,22 +57,37 @@ const metaBoolFilterSchema = z.object({ }), }); -const metaFilterSchema = z.union([ - metaDateFilterSchema, - metaNumberFilterSchema, - metaTextFilterSchema, - metaBoolFilterSchema, -]); +const metaFilterSchema = z.union([metaNumberFilterSchema, metaTextFilterSchema, metaBoolFilterSchema]); type MetaFilter = z.infer; +const metaConditionSchema = z.union([ + z.object({ + type: z.literal('and'), + get conditions() { + return z.array(metaConditionSchema); + }, + }), + z.object({ + type: z.literal('or'), + get conditions() { + return z.array(metaConditionSchema); + }, + }), + metaFilterSchema, +]); + +type MetaCondition = z.infer; + const documentSearchOptionsSchema = z.object({ uris: z.array(z.string()).optional(), types: z.array(z.string()).optional(), - meta: z.array(metaFilterSchema).optional(), + meta: metaConditionSchema.optional(), + limit: z.number().optional(), + offset: z.number().optional(), }); type DocumentSearchOptions = z.infer; -export type { Document, DocumentUpsert, MetaFilter, DocumentSearchOptions }; -export { documentSchema, documentUpsertSchema, metaFilterSchema, documentSearchOptionsSchema }; +export type { Document, DocumentUpsert, MetaFilter, MetaCondition, DocumentSearchOptions }; +export { documentSchema, documentUpsertSchema, metaFilterSchema, metaConditionSchema, documentSearchOptionsSchema }; diff --git a/packages/core/src/services/documents/documents.test.ts b/packages/core/src/services/documents/documents.test.ts new file mode 100644 index 0000000..54e7953 --- /dev/null +++ b/packages/core/src/services/documents/documents.test.ts @@ -0,0 +1,28 @@ +import { describe, it, expect, beforeEach } from 'vitest'; + +import { DocumentsService } from './documents.ts'; + +import { Services } from '#root/utils/services.ts'; + +describe('DocumentsService', () => { + let services: Services; + let documentsService: DocumentsService; + beforeEach(() => { + services = new Services(); + documentsService = services.get(DocumentsService); + }); + + it('should upsert a document', async () => { + const document = { + uri: 'test', + type: 'test', + metadata: { + test: 'test', + }, + data: { + test: 'test', + }, + }; + await documentsService.upsert(document); + }); +}); diff --git a/packages/core/src/services/documents/documents.ts b/packages/core/src/services/documents/documents.ts index e69de29..8ea2f20 100644 --- a/packages/core/src/services/documents/documents.ts +++ b/packages/core/src/services/documents/documents.ts @@ -0,0 +1,170 @@ +import type { Knex } from 'knex'; + +import { DatabaseService, tableNames, type TableRow } from '../database/database.ts'; + +import type { DocumentSearchOptions, DocumentUpsert, MetaCondition, MetaFilter } from './documents.schemas.ts'; + +import type { Services } from '#root/utils/services.ts'; + +class DocumentsService { + #services: Services; + + constructor(services: Services) { + this.#services = services; + } + + public upsert = async (document: DocumentUpsert) => { + const db = await this.#services.get(DatabaseService).getDb(); + const baseItem = { + ...document, + updatedAt: new Date(), + deletedAt: null, + }; + await db('documents') + .insert({ ...baseItem, createdAt: new Date() }) + .onConflict(['uri', 'type']) + .merge({ + ...baseItem, + }); + }; + + public search = async (options: DocumentSearchOptions) => { + const { uris, types, meta, limit, offset } = options; + const db = await this.#services.get(DatabaseService).getDb(); + let query = db(tableNames.documents); + if (uris) { + query = query.whereIn('uri', uris); + } + if (types) { + query = query.whereIn('type', types); + } + if (limit) { + query = query.limit(limit); + } + if (meta) { + query = query.where((builder) => { + this.buildMetaCondition(builder, meta); + }); + } + if (offset) { + query = query.offset(offset); + } + return query; + }; + + /** + * Recursively builds meta search conditions with proper scoping + */ + private buildMetaCondition(builder: Knex.QueryBuilder, condition: MetaCondition): void { + if (condition.type === 'and') { + // Handle AND conditions - all must be true + for (const [index, subCondition] of condition.conditions.entries()) { + if (index === 0) { + // First condition doesn't need andWhere + builder.where((subBuilder) => { + this.buildMetaCondition(subBuilder, subCondition); + }); + } else { + builder.andWhere((subBuilder) => { + this.buildMetaCondition(subBuilder, subCondition); + }); + } + } + } else if (condition.type === 'or') { + // Handle OR conditions - at least one must be true + for (const [index, subCondition] of condition.conditions.entries()) { + if (index === 0) { + // First condition doesn't need orWhere + builder.where((subBuilder) => { + this.buildMetaCondition(subBuilder, subCondition); + }); + } else { + builder.orWhere((subBuilder) => { + this.buildMetaCondition(subBuilder, subCondition); + }); + } + } + } else { + // Handle individual filter conditions + this.buildMetaFilter(builder, condition); + } + } + + /** + * Builds individual meta filter conditions using JSONB operators + */ + private buildMetaFilter(builder: Knex.QueryBuilder, filter: MetaFilter): void { + const fieldPath = `metadata->'${filter.field}'`; + + if (filter.type === 'number') { + const { gt, gte, lt, lte, eq, neq, nill } = filter.filter; + + if (nill !== undefined) { + if (nill) { + builder.whereNull(`metadata->'${filter.field}'`); + } else { + builder.whereNotNull(`metadata->'${filter.field}'`); + } + return; + } + + if (eq !== undefined) { + builder.where(builder.client.raw(`(${fieldPath})::numeric`), eq); + } + if (neq !== undefined) { + builder.where(builder.client.raw(`(${fieldPath})::numeric`), '!=', neq); + } + if (gt !== undefined) { + builder.where(builder.client.raw(`(${fieldPath})::numeric`), '>', gt); + } + if (gte !== undefined) { + builder.where(builder.client.raw(`(${fieldPath})::numeric`), '>=', gte); + } + if (lt !== undefined) { + builder.where(builder.client.raw(`(${fieldPath})::numeric`), '<', lt); + } + if (lte !== undefined) { + builder.where(builder.client.raw(`(${fieldPath})::numeric`), '<=', lte); + } + } else if (filter.type === 'text') { + const { eq, neq, like, nlike, nill } = filter.filter; + + if (nill !== undefined) { + if (nill) { + builder.whereNull(`metadata->'${filter.field}'`); + } else { + builder.whereNotNull(`metadata->'${filter.field}'`); + } + return; + } + + if (eq !== undefined) { + builder.where(builder.client.raw(`metadata->>'${filter.field}'`), eq); + } + if (neq !== undefined) { + builder.where(builder.client.raw(`metadata->>'${filter.field}'`), '!=', neq); + } + if (like !== undefined) { + builder.where(builder.client.raw(`metadata->>'${filter.field}'`), 'like', like); + } + if (nlike !== undefined) { + builder.where(builder.client.raw(`metadata->>'${filter.field}'`), 'not like', nlike); + } + } else if (filter.type === 'bool') { + const { eq, nill } = filter.filter; + + if (nill !== undefined) { + if (nill) { + builder.whereNull(`metadata->'${filter.field}'`); + } else { + builder.whereNotNull(`metadata->'${filter.field}'`); + } + return; + } + + builder.where(builder.client.raw(`(${fieldPath})::boolean`), eq); + } + } +} + +export { DocumentsService }; diff --git a/packages/core/src/utils/services.ts b/packages/core/src/utils/services.ts index e69de29..205e3b2 100644 --- a/packages/core/src/utils/services.ts +++ b/packages/core/src/utils/services.ts @@ -0,0 +1,14 @@ +type ServiceDepencency = new (services: Services) => T; +class Services { + #instances: Map, unknown> = new Map, unknown>(); + + public get(service: ServiceDepencency) { + if (!this.#instances.has(service)) { + this.#instances.set(service, new service(this)); + } + + return this.#instances.get(service) as T; + } +} + +export { Services }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e38b4f0..04d162c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -55,6 +55,9 @@ importers: knex: specifier: ^3.1.0 version: 3.1.0(pg@8.16.3) + knex-pglite: + specifier: ^0.12.0 + version: 0.12.0(@electric-sql/pglite@0.3.8)(knex@3.1.0(pg@8.16.3)) pg: specifier: ^8.16.3 version: 8.16.3 @@ -134,6 +137,9 @@ packages: resolution: {integrity: sha512-6zABk/ECA/QYSCQ1NGiVwwbQerUCZ+TQbp64Q3AgmfNvurHH0j8TtXa1qbShXA6qqkpAj4V5W8pP6mLe1mcMqA==} engines: {node: '>=18'} + '@electric-sql/pglite@0.3.8': + resolution: {integrity: sha512-VlAz/R7mktifp9IHzNvjxWJM8p3fPH2lHpustYuRSOXOpXiAMTlA5qqxcufPaDnfee6CZCE9qrT1MHDT7riSHg==} + '@esbuild/aix-ppc64@0.25.9': resolution: {integrity: sha512-OaGtL73Jck6pBKjNIe24BnFE6agGl+6KxDtTfHhy1HmhthfKouEcOhqpSL64K4/0WCtbKFLOdzD/44cJ4k9opA==} engines: {node: '>=18'} @@ -1593,6 +1599,12 @@ packages: keyv@4.5.4: resolution: {integrity: sha512-oxVHkHR/EJf2CNXnWxRLW6mg7JyCCUcG0DtEGmL2ctUo1PNTin1PUil+r/+4r5MpVgC/fn1kjsx7mjSujKqIpw==} + knex-pglite@0.12.0: + resolution: {integrity: sha512-EsTpIJ8D1SaFm5sVNqKf+Q57bnPGVEpVWwZXXxGrzDyIwtHOwAnd59dY8izkR/nJt8OFrLHMudqaPKfXajOHsA==} + peerDependencies: + '@electric-sql/pglite': 0.x + knex: 3.x + knex@3.1.0: resolution: {integrity: sha512-GLoII6hR0c4ti243gMs5/1Rb3B+AjwMOfjYm97pu0FOQa7JH56hgBxYf5WK2525ceSbBY1cjeZ9yk99GPMB6Kw==} engines: {node: '>=16'} @@ -2501,6 +2513,8 @@ snapshots: '@bcoe/v8-coverage@1.0.2': {} + '@electric-sql/pglite@0.3.8': {} + '@esbuild/aix-ppc64@0.25.9': optional: true @@ -4095,6 +4109,11 @@ snapshots: dependencies: json-buffer: 3.0.1 + knex-pglite@0.12.0(@electric-sql/pglite@0.3.8)(knex@3.1.0(pg@8.16.3)): + dependencies: + '@electric-sql/pglite': 0.3.8 + knex: 3.1.0(pg@8.16.3) + knex@3.1.0(pg@8.16.3): dependencies: colorette: 2.0.19