From 904b0f783e8843827c533da8d56ec944f2c321b2 Mon Sep 17 00:00:00 2001 From: Morten Olsen Date: Wed, 10 Dec 2025 23:21:25 +0100 Subject: [PATCH] feat: use postgres for change notification --- .../src/services/database/database.schemas.ts | 40 ++++++++++ .../runtime/src/services/database/database.ts | 62 +++++++++------- .../database/generators/generators.pglite.ts | 38 ++++++++++ .../database/generators/generators.ts | 7 ++ .../database/generators/generators.types.ts | 21 ++++++ .../migrations/migrations.001-init.ts | 48 ++++++++++++ .../services/documents/documents.mapping.ts | 6 +- .../src/services/documents/documents.ts | 74 +++++++++++++++---- .../runtime/src/utils/utils.event-emitter.ts | 18 +++++ packages/server/src/api.ts | 3 + 10 files changed, 274 insertions(+), 43 deletions(-) create mode 100644 packages/runtime/src/services/database/database.schemas.ts create mode 100644 packages/runtime/src/services/database/generators/generators.pglite.ts create mode 100644 packages/runtime/src/services/database/generators/generators.ts create mode 100644 packages/runtime/src/services/database/generators/generators.types.ts diff --git a/packages/runtime/src/services/database/database.schemas.ts b/packages/runtime/src/services/database/database.schemas.ts new file mode 100644 index 0000000..8aba147 --- /dev/null +++ b/packages/runtime/src/services/database/database.schemas.ts @@ -0,0 +1,40 @@ +import { z } from 'zod'; + +const insertedRecordEventSchema = z.object({ + operation: z.literal('INSERT'), + table: z.string(), + schema: z.string(), + newRecord: z.record(z.string(), z.unknown()), +}); + +type InsertedRecordEvent = z.infer; + +const deletedRecordEventSchema = z.object({ + operation: z.literal('DELETE'), + table: z.string(), + schema: z.string(), + oldRecord: z.record(z.string(), z.unknown()), +}); + +type DeletedRecordEvent = z.infer; + +const updatedRecordEventSchema = z.object({ + operation: z.literal('UPDATE'), + table: z.string(), + schema: z.string(), + newRecord: z.record(z.string(), z.unknown()), + oldRecord: z.record(z.string(), z.unknown()), +}); + +type UpdatedRecordEvent = z.infer; + +const changedRecordEventSchema = z.discriminatedUnion('operation', [ + insertedRecordEventSchema, + deletedRecordEventSchema, + updatedRecordEventSchema, +]); + +type ChangedRecordEvent = z.infer; + +export type { InsertedRecordEvent, DeletedRecordEvent, UpdatedRecordEvent, ChangedRecordEvent }; +export { insertedRecordEventSchema, deletedRecordEventSchema, updatedRecordEventSchema, changedRecordEventSchema }; diff --git a/packages/runtime/src/services/database/database.ts b/packages/runtime/src/services/database/database.ts index da18ddc..ce743b0 100644 --- a/packages/runtime/src/services/database/database.ts +++ b/packages/runtime/src/services/database/database.ts @@ -1,51 +1,59 @@ -import knex, { type Knex } from 'knex'; -import ClientPgLite from 'knex-pglite'; -import { PGlite } from '@electric-sql/pglite'; -import { vector } from '@electric-sql/pglite/vector'; - import { destroy, type Services } from '../../utils/utils.services.js'; +import { EventEmitter } from '../../utils/utils.event-emitter.js'; import { migrationSource } from './migrations/migrations.js'; +import { type ChangedRecordEvent } from './database.schemas.js'; +import type { GeneratorOutput } from './generators/generators.types.js'; +import { pgLiteGenerator } from './generators/generators.pglite.js'; +import { createEmitter } from './generators/generators.js'; -class DatabaseService { +type DatabaseServiceEvents = { + changedRecord: (event: ChangedRecordEvent) => void; +}; + +class DatabaseService extends EventEmitter { #services: Services; - #instance?: Promise; + #generated?: Promise; constructor(services: Services) { + super(); this.#services = services; } #setup = async () => { - const pglite = new PGlite({ - extensions: { vector }, - }); - - const instance = knex({ - client: ClientPgLite, - dialect: 'postgres', - connection: () => ({ pglite }) as object, - }); - await instance.raw(`CREATE EXTENSION IF NOT EXISTS vector`); - + const emitter = createEmitter(); + const output = await pgLiteGenerator({ emitter, config: {} }); + const { instance } = output; await instance.migrate.latest({ migrationSource: migrationSource({ services: this.#services }), }); + emitter.on('changed', this.emit.bind(this, 'changedRecord')); + return output; + }; + + #getGenerated = async () => { + if (!this.#generated) { + this.#generated = this.#setup(); + } + return this.#generated; + }; + + public listen = async () => { + const { subscribe } = await this.#getGenerated(); + await subscribe(); + }; + + public getInstance = async () => { + const { instance } = await this.#getGenerated(); return instance; }; - public getInstance = () => { - if (!this.#instance) { - this.#instance = this.#setup(); - } - return this.#instance; - }; - [destroy] = async () => { - if (!this.#instance) { + if (!this.#generated) { return; } - const instance = await this.#instance; + const { instance } = await this.#generated; await instance.destroy(); }; } diff --git a/packages/runtime/src/services/database/generators/generators.pglite.ts b/packages/runtime/src/services/database/generators/generators.pglite.ts new file mode 100644 index 0000000..a98ed87 --- /dev/null +++ b/packages/runtime/src/services/database/generators/generators.pglite.ts @@ -0,0 +1,38 @@ +import { PGlite } from '@electric-sql/pglite'; +import knex from 'knex'; +import ClientPGLite from 'knex-pglite'; +import { vector } from '@electric-sql/pglite/vector'; + +import { changedRecordEventSchema } from '../database.schemas.js'; + +import type { Generator } from './generators.types.js'; + +type PGLiteGeneratorOptions = { + dataLocation?: string; +}; +const pgLiteGenerator: Generator = async ({ emitter }) => { + const pglite = new PGlite({ + extensions: { vector }, + }); + + const instance = knex({ + client: ClientPGLite, + dialect: 'postgres', + connection: () => ({ pglite }) as object, + }); + await instance.raw(`CREATE EXTENSION IF NOT EXISTS vector`); + const subscribe = async () => { + pglite.onNotification((channel, data) => { + if (channel !== 'row_changed') { + return; + } + const payload = changedRecordEventSchema.parse(JSON.parse(data)); + emitter.emit('changed', payload); + }); + await instance.raw('LISTEN row_changed'); + }; + + return { instance, subscribe }; +}; + +export { pgLiteGenerator }; diff --git a/packages/runtime/src/services/database/generators/generators.ts b/packages/runtime/src/services/database/generators/generators.ts new file mode 100644 index 0000000..0f1c27f --- /dev/null +++ b/packages/runtime/src/services/database/generators/generators.ts @@ -0,0 +1,7 @@ +import { EventEmitter } from '../../../utils/utils.event-emitter.js'; + +import type { GeneratorEvents } from './generators.types.js'; + +const createEmitter = () => new EventEmitter(); + +export { createEmitter }; diff --git a/packages/runtime/src/services/database/generators/generators.types.ts b/packages/runtime/src/services/database/generators/generators.types.ts new file mode 100644 index 0000000..754989b --- /dev/null +++ b/packages/runtime/src/services/database/generators/generators.types.ts @@ -0,0 +1,21 @@ +import type { Knex } from 'knex'; + +import type { ChangedRecordEvent } from '../database.schemas.js'; +import type { EventEmitter } from '../../../utils/utils.event-emitter.js'; + +type GeneratorEvents = { + changed: (event: ChangedRecordEvent) => void; +}; +type GeneratorOutput = { + instance: Knex; + subscribe: () => Promise; +}; + +type GeneratorOptions = { + config: T; + emitter: EventEmitter; +}; + +type Generator = (options: GeneratorOptions) => Promise; + +export type { GeneratorEvents, GeneratorOutput, Generator }; diff --git a/packages/runtime/src/services/database/migrations/migrations.001-init.ts b/packages/runtime/src/services/database/migrations/migrations.001-init.ts index 74f93a9..9e91374 100644 --- a/packages/runtime/src/services/database/migrations/migrations.001-init.ts +++ b/packages/runtime/src/services/database/migrations/migrations.001-init.ts @@ -15,6 +15,45 @@ const init: Migration = { const embedding = services.get(EmbeddingsService); const embeddingField = await embedding.getFieldType(EMBEDDING_MODEL); + await knex.raw(` + CREATE OR REPLACE FUNCTION notify_changes() + RETURNS trigger AS $$ + DECLARE + payload TEXT; + BEGIN + -- Build the JSON payload based on the operation type + IF (TG_OP = 'DELETE') THEN + payload := json_build_object( + 'operation', TG_OP, + 'table', TG_TABLE_NAME, + 'schema', TG_TABLE_SCHEMA, + 'oldRecord', row_to_json(OLD) + )::text; + ELSIF (TG_OP = 'INSERT') THEN + payload := json_build_object( + 'operation', TG_OP, + 'table', TG_TABLE_NAME, + 'schema', TG_TABLE_SCHEMA, + 'newRecord', row_to_json(NEW) + )::text; + ELSIF (TG_OP = 'UPDATE') THEN + payload := json_build_object( + 'operation', TG_OP, + 'table', TG_TABLE_NAME, + 'schema', TG_TABLE_SCHEMA, + 'oldRecord', row_to_json(OLD), + 'newRecord', row_to_json(NEW) + )::text; + END IF; + + -- Send the notification to the channel + PERFORM pg_notify('row_changed', payload); + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + `); + await knex.schema.createTable(tableNames.documents, (table) => { table.uuid('id').primary(); table.uuid('owner').nullable().references('id').inTable(tableNames.documents).onDelete('CASCADE'); @@ -34,6 +73,13 @@ const init: Migration = { table.index(['owner']); }); + await knex.raw(` + CREATE TRIGGER document_changes_trigger + AFTER INSERT OR UPDATE OR DELETE ON documents + FOR EACH ROW + EXECUTE PROCEDURE notify_changes(); + `); + await knex.schema.createTable(tableNames.documentChunks, (table) => { table.uuid('id').primary(); table.uuid('owner').nullable().references('id').inTable(tableNames.documents).onDelete('CASCADE'); @@ -61,6 +107,8 @@ const init: Migration = { }); }, down: async ({ knex }) => { + await knex.raw('DROP TRIGGER IF EXISTS document_changes_trigger ON documents;'); + await knex.raw('DROP FUNCTION IF EXISTS notify_changes();'); await knex.schema.dropTableIfExists(tableNames.relations); await knex.schema.dropTableIfExists(tableNames.documentChunks); await knex.schema.dropTableIfExists(tableNames.documents); diff --git a/packages/runtime/src/services/documents/documents.mapping.ts b/packages/runtime/src/services/documents/documents.mapping.ts index e85f0b2..af5a036 100644 --- a/packages/runtime/src/services/documents/documents.mapping.ts +++ b/packages/runtime/src/services/documents/documents.mapping.ts @@ -4,9 +4,9 @@ import type { Document } from './documents.schemas.js'; const mapFromDocumentRow = (row: TableRows['documents']): Document => ({ ...row, - createdAt: row.createdAt.toISOString(), - updatedAt: row.updatedAt.toISOString(), - deletedAt: row.deletedAt?.toISOString() || null, + createdAt: row.createdAt.toISOString?.(), + updatedAt: row.updatedAt.toISOString?.(), + deletedAt: row.deletedAt?.toISOString?.() || null, }); export { mapFromDocumentRow }; diff --git a/packages/runtime/src/services/documents/documents.ts b/packages/runtime/src/services/documents/documents.ts index 55983c6..c8bfce8 100644 --- a/packages/runtime/src/services/documents/documents.ts +++ b/packages/runtime/src/services/documents/documents.ts @@ -3,31 +3,43 @@ import { QueryParser } from '@morten-olsen/stash-query-dsl'; import { DatabaseService, tableNames, type TableRows } from '../database/database.js'; import { SplittingService } from '../splitter/splitter.js'; import { EventEmitter } from '../../utils/utils.event-emitter.js'; -import type { Services } from '../../utils/utils.services.js'; +import { destroy, type Services } from '../../utils/utils.services.js'; import { compareObjectKeys } from '../../utils/utils.compare.js'; import { applyQueryFilter } from '../../utils/utils.query.js'; import { base64ToMaybeBuffer } from '../../utils/utils.binary.js'; import { mapFromDocumentRow } from './documents.mapping.js'; -import type { - Document, - DocumentFilter, - DocumentFindResult, - DocumentUpsert, - DocumentUpsertResult, -} from './documents.schemas.ts'; +import { + type Document, + type DocumentFilter, + type DocumentFindResult, + type DocumentUpsert, + type DocumentUpsertResult, +} from './documents.schemas.js'; type DocumentsServiceEvents = { upserted: (document: Document) => void; inserted: (document: Document) => void; - updated: (document: Document) => void; + updated: (next: Document, prev: Document) => void; + deleted: (document: Document) => void; }; class DocumentsService extends EventEmitter { #services: Services; + #subscribeListenAbortController: AbortController; + #databaseListenAbortController?: AbortController; constructor(services: Services) { super(); + this.#subscribeListenAbortController = new AbortController(); + this.onSubscribe( + async () => { + await this.#listen(); + }, + { + abortSignal: this.#subscribeListenAbortController.signal, + }, + ); this.#services = services; } @@ -49,6 +61,39 @@ class DocumentsService extends EventEmitter { }; }; + #listen = async () => { + if (this.#databaseListenAbortController) { + return; + } + this.#databaseListenAbortController = new AbortController(); + const databaseService = this.#services.get(DatabaseService); + await databaseService.listen(); + databaseService.on( + 'changedRecord', + (evt) => { + if (evt.table !== tableNames.documents) { + return; + } + if (evt.operation === 'INSERT') { + const newDocument = mapFromDocumentRow(evt.newRecord as TableRows['documents']); + this.emit('inserted', newDocument); + this.emit('upserted', newDocument); + } + if (evt.operation === 'UPDATE') { + const newDocument = mapFromDocumentRow(evt.newRecord as TableRows['documents']); + const oldDocument = mapFromDocumentRow(evt.oldRecord as TableRows['documents']); + this.emit('updated', newDocument, oldDocument); + this.emit('upserted', newDocument); + } + if (evt.operation === 'DELETE') { + const oldDocument = mapFromDocumentRow(evt.oldRecord as TableRows['documents']); + this.emit('deleted', oldDocument); + } + }, + { abortSignal: this.#databaseListenAbortController.signal }, + ); + }; + public get = async (id: string): Promise => { const databaseService = this.#services.get(DatabaseService); const db = await databaseService.getInstance(); @@ -111,8 +156,6 @@ class DocumentsService extends EventEmitter { content: base64ToMaybeBuffer(document.content ?? current.content) || null, id, }); - this.emit('updated', resultDocument); - this.emit('upserted', resultDocument); return { id, action: 'updated', @@ -142,8 +185,6 @@ class DocumentsService extends EventEmitter { createdAt: now, updatedAt: now, }); - this.emit('inserted', resultDocument); - this.emit('upserted', resultDocument); return { id, action: 'inserted', @@ -173,6 +214,13 @@ class DocumentsService extends EventEmitter { return result; }; + + [destroy] = async () => { + this.#subscribeListenAbortController.abort(); + if (this.#databaseListenAbortController) { + this.#databaseListenAbortController.abort(); + } + }; } export * from './documents.schemas.js'; diff --git a/packages/runtime/src/utils/utils.event-emitter.ts b/packages/runtime/src/utils/utils.event-emitter.ts index 3986995..04e646e 100644 --- a/packages/runtime/src/utils/utils.event-emitter.ts +++ b/packages/runtime/src/utils/utils.event-emitter.ts @@ -1,6 +1,7 @@ import type { ExplicitAny } from '../global.js'; type EventListener = (...args: T) => void | Promise; +type SubscribeListener = (type: T) => void | Promise; type OnOptions = { abortSignal?: AbortSignal; @@ -8,8 +9,25 @@ type OnOptions = { class EventEmitter void | Promise>> { #listeners = new Map>>(); + #subscribeListeners = new Set>(); + + onSubscribe = (callback: SubscribeListener, options: OnOptions = {}) => { + const { abortSignal } = options; + const callbackClone = (type: keyof T) => callback(type); + this.#subscribeListeners.add(callbackClone); + const abortController = new AbortController(); + abortSignal?.addEventListener('abort', abortController.abort); + + abortController.signal.addEventListener('abort', () => { + this.#subscribeListeners.difference(new Set([callbackClone])); + }); + return abortController.abort; + }; on = (event: K, callback: EventListener>, options: OnOptions = {}) => { + for (const subscribeListener of this.#subscribeListeners) { + subscribeListener(event); + } const { abortSignal } = options; if (!this.#listeners.has(event)) { this.#listeners.set(event, new Set()); diff --git a/packages/server/src/api.ts b/packages/server/src/api.ts index 54e184d..184d21c 100644 --- a/packages/server/src/api.ts +++ b/packages/server/src/api.ts @@ -28,6 +28,9 @@ class BaseError extends Error { } const createApi = async (runtime: StashRuntime = new StashRuntime()) => { + runtime.documents.on('upserted', (document) => { + console.log(document); + }); const app = fastify().withTypeProvider(); app.setValidatorCompiler(validatorCompiler); app.setSerializerCompiler(serializerCompiler);