feat: use postgres for change notification
Some checks failed
Build and release / Release (push) Has been skipped
Build and release / Build (push) Failing after 1m14s
Build and release / update-release-draft (push) Has been skipped

This commit is contained in:
Morten Olsen
2025-12-10 23:21:25 +01:00
parent 3641e86da5
commit 904b0f783e
10 changed files with 274 additions and 43 deletions

View File

@@ -0,0 +1,40 @@
import { z } from 'zod';
const insertedRecordEventSchema = z.object({
operation: z.literal('INSERT'),
table: z.string(),
schema: z.string(),
newRecord: z.record(z.string(), z.unknown()),
});
type InsertedRecordEvent = z.infer<typeof insertedRecordEventSchema>;
const deletedRecordEventSchema = z.object({
operation: z.literal('DELETE'),
table: z.string(),
schema: z.string(),
oldRecord: z.record(z.string(), z.unknown()),
});
type DeletedRecordEvent = z.infer<typeof deletedRecordEventSchema>;
const updatedRecordEventSchema = z.object({
operation: z.literal('UPDATE'),
table: z.string(),
schema: z.string(),
newRecord: z.record(z.string(), z.unknown()),
oldRecord: z.record(z.string(), z.unknown()),
});
type UpdatedRecordEvent = z.infer<typeof updatedRecordEventSchema>;
const changedRecordEventSchema = z.discriminatedUnion('operation', [
insertedRecordEventSchema,
deletedRecordEventSchema,
updatedRecordEventSchema,
]);
type ChangedRecordEvent = z.infer<typeof changedRecordEventSchema>;
export type { InsertedRecordEvent, DeletedRecordEvent, UpdatedRecordEvent, ChangedRecordEvent };
export { insertedRecordEventSchema, deletedRecordEventSchema, updatedRecordEventSchema, changedRecordEventSchema };

View File

@@ -1,51 +1,59 @@
import knex, { type Knex } from 'knex';
import ClientPgLite from 'knex-pglite';
import { PGlite } from '@electric-sql/pglite';
import { vector } from '@electric-sql/pglite/vector';
import { destroy, type Services } from '../../utils/utils.services.js';
import { EventEmitter } from '../../utils/utils.event-emitter.js';
import { migrationSource } from './migrations/migrations.js';
import { type ChangedRecordEvent } from './database.schemas.js';
import type { GeneratorOutput } from './generators/generators.types.js';
import { pgLiteGenerator } from './generators/generators.pglite.js';
import { createEmitter } from './generators/generators.js';
class DatabaseService {
type DatabaseServiceEvents = {
changedRecord: (event: ChangedRecordEvent) => void;
};
class DatabaseService extends EventEmitter<DatabaseServiceEvents> {
#services: Services;
#instance?: Promise<Knex>;
#generated?: Promise<GeneratorOutput>;
constructor(services: Services) {
super();
this.#services = services;
}
#setup = async () => {
const pglite = new PGlite({
extensions: { vector },
});
const instance = knex({
client: ClientPgLite,
dialect: 'postgres',
connection: () => ({ pglite }) as object,
});
await instance.raw(`CREATE EXTENSION IF NOT EXISTS vector`);
const emitter = createEmitter();
const output = await pgLiteGenerator({ emitter, config: {} });
const { instance } = output;
await instance.migrate.latest({
migrationSource: migrationSource({ services: this.#services }),
});
emitter.on('changed', this.emit.bind(this, 'changedRecord'));
return output;
};
#getGenerated = async () => {
if (!this.#generated) {
this.#generated = this.#setup();
}
return this.#generated;
};
public listen = async () => {
const { subscribe } = await this.#getGenerated();
await subscribe();
};
public getInstance = async () => {
const { instance } = await this.#getGenerated();
return instance;
};
public getInstance = () => {
if (!this.#instance) {
this.#instance = this.#setup();
}
return this.#instance;
};
[destroy] = async () => {
if (!this.#instance) {
if (!this.#generated) {
return;
}
const instance = await this.#instance;
const { instance } = await this.#generated;
await instance.destroy();
};
}

View File

@@ -0,0 +1,38 @@
import { PGlite } from '@electric-sql/pglite';
import knex from 'knex';
import ClientPGLite from 'knex-pglite';
import { vector } from '@electric-sql/pglite/vector';
import { changedRecordEventSchema } from '../database.schemas.js';
import type { Generator } from './generators.types.js';
type PGLiteGeneratorOptions = {
dataLocation?: string;
};
const pgLiteGenerator: Generator<PGLiteGeneratorOptions> = async ({ emitter }) => {
const pglite = new PGlite({
extensions: { vector },
});
const instance = knex({
client: ClientPGLite,
dialect: 'postgres',
connection: () => ({ pglite }) as object,
});
await instance.raw(`CREATE EXTENSION IF NOT EXISTS vector`);
const subscribe = async () => {
pglite.onNotification((channel, data) => {
if (channel !== 'row_changed') {
return;
}
const payload = changedRecordEventSchema.parse(JSON.parse(data));
emitter.emit('changed', payload);
});
await instance.raw('LISTEN row_changed');
};
return { instance, subscribe };
};
export { pgLiteGenerator };

View File

@@ -0,0 +1,7 @@
import { EventEmitter } from '../../../utils/utils.event-emitter.js';
import type { GeneratorEvents } from './generators.types.js';
const createEmitter = () => new EventEmitter<GeneratorEvents>();
export { createEmitter };

View File

@@ -0,0 +1,21 @@
import type { Knex } from 'knex';
import type { ChangedRecordEvent } from '../database.schemas.js';
import type { EventEmitter } from '../../../utils/utils.event-emitter.js';
type GeneratorEvents = {
changed: (event: ChangedRecordEvent) => void;
};
type GeneratorOutput = {
instance: Knex;
subscribe: () => Promise<void>;
};
type GeneratorOptions<T> = {
config: T;
emitter: EventEmitter<GeneratorEvents>;
};
type Generator<T> = (options: GeneratorOptions<T>) => Promise<GeneratorOutput>;
export type { GeneratorEvents, GeneratorOutput, Generator };

View File

@@ -15,6 +15,45 @@ const init: Migration = {
const embedding = services.get(EmbeddingsService);
const embeddingField = await embedding.getFieldType(EMBEDDING_MODEL);
await knex.raw(`
CREATE OR REPLACE FUNCTION notify_changes()
RETURNS trigger AS $$
DECLARE
payload TEXT;
BEGIN
-- Build the JSON payload based on the operation type
IF (TG_OP = 'DELETE') THEN
payload := json_build_object(
'operation', TG_OP,
'table', TG_TABLE_NAME,
'schema', TG_TABLE_SCHEMA,
'oldRecord', row_to_json(OLD)
)::text;
ELSIF (TG_OP = 'INSERT') THEN
payload := json_build_object(
'operation', TG_OP,
'table', TG_TABLE_NAME,
'schema', TG_TABLE_SCHEMA,
'newRecord', row_to_json(NEW)
)::text;
ELSIF (TG_OP = 'UPDATE') THEN
payload := json_build_object(
'operation', TG_OP,
'table', TG_TABLE_NAME,
'schema', TG_TABLE_SCHEMA,
'oldRecord', row_to_json(OLD),
'newRecord', row_to_json(NEW)
)::text;
END IF;
-- Send the notification to the channel
PERFORM pg_notify('row_changed', payload);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
`);
await knex.schema.createTable(tableNames.documents, (table) => {
table.uuid('id').primary();
table.uuid('owner').nullable().references('id').inTable(tableNames.documents).onDelete('CASCADE');
@@ -34,6 +73,13 @@ const init: Migration = {
table.index(['owner']);
});
await knex.raw(`
CREATE TRIGGER document_changes_trigger
AFTER INSERT OR UPDATE OR DELETE ON documents
FOR EACH ROW
EXECUTE PROCEDURE notify_changes();
`);
await knex.schema.createTable(tableNames.documentChunks, (table) => {
table.uuid('id').primary();
table.uuid('owner').nullable().references('id').inTable(tableNames.documents).onDelete('CASCADE');
@@ -61,6 +107,8 @@ const init: Migration = {
});
},
down: async ({ knex }) => {
await knex.raw('DROP TRIGGER IF EXISTS document_changes_trigger ON documents;');
await knex.raw('DROP FUNCTION IF EXISTS notify_changes();');
await knex.schema.dropTableIfExists(tableNames.relations);
await knex.schema.dropTableIfExists(tableNames.documentChunks);
await knex.schema.dropTableIfExists(tableNames.documents);

View File

@@ -4,9 +4,9 @@ import type { Document } from './documents.schemas.js';
const mapFromDocumentRow = (row: TableRows['documents']): Document => ({
...row,
createdAt: row.createdAt.toISOString(),
updatedAt: row.updatedAt.toISOString(),
deletedAt: row.deletedAt?.toISOString() || null,
createdAt: row.createdAt.toISOString?.(),
updatedAt: row.updatedAt.toISOString?.(),
deletedAt: row.deletedAt?.toISOString?.() || null,
});
export { mapFromDocumentRow };

View File

@@ -3,31 +3,43 @@ import { QueryParser } from '@morten-olsen/stash-query-dsl';
import { DatabaseService, tableNames, type TableRows } from '../database/database.js';
import { SplittingService } from '../splitter/splitter.js';
import { EventEmitter } from '../../utils/utils.event-emitter.js';
import type { Services } from '../../utils/utils.services.js';
import { destroy, type Services } from '../../utils/utils.services.js';
import { compareObjectKeys } from '../../utils/utils.compare.js';
import { applyQueryFilter } from '../../utils/utils.query.js';
import { base64ToMaybeBuffer } from '../../utils/utils.binary.js';
import { mapFromDocumentRow } from './documents.mapping.js';
import type {
Document,
DocumentFilter,
DocumentFindResult,
DocumentUpsert,
DocumentUpsertResult,
} from './documents.schemas.ts';
import {
type Document,
type DocumentFilter,
type DocumentFindResult,
type DocumentUpsert,
type DocumentUpsertResult,
} from './documents.schemas.js';
type DocumentsServiceEvents = {
upserted: (document: Document) => void;
inserted: (document: Document) => void;
updated: (document: Document) => void;
updated: (next: Document, prev: Document) => void;
deleted: (document: Document) => void;
};
class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
#services: Services;
#subscribeListenAbortController: AbortController;
#databaseListenAbortController?: AbortController;
constructor(services: Services) {
super();
this.#subscribeListenAbortController = new AbortController();
this.onSubscribe(
async () => {
await this.#listen();
},
{
abortSignal: this.#subscribeListenAbortController.signal,
},
);
this.#services = services;
}
@@ -49,6 +61,39 @@ class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
};
};
#listen = async () => {
if (this.#databaseListenAbortController) {
return;
}
this.#databaseListenAbortController = new AbortController();
const databaseService = this.#services.get(DatabaseService);
await databaseService.listen();
databaseService.on(
'changedRecord',
(evt) => {
if (evt.table !== tableNames.documents) {
return;
}
if (evt.operation === 'INSERT') {
const newDocument = mapFromDocumentRow(evt.newRecord as TableRows['documents']);
this.emit('inserted', newDocument);
this.emit('upserted', newDocument);
}
if (evt.operation === 'UPDATE') {
const newDocument = mapFromDocumentRow(evt.newRecord as TableRows['documents']);
const oldDocument = mapFromDocumentRow(evt.oldRecord as TableRows['documents']);
this.emit('updated', newDocument, oldDocument);
this.emit('upserted', newDocument);
}
if (evt.operation === 'DELETE') {
const oldDocument = mapFromDocumentRow(evt.oldRecord as TableRows['documents']);
this.emit('deleted', oldDocument);
}
},
{ abortSignal: this.#databaseListenAbortController.signal },
);
};
public get = async (id: string): Promise<Document> => {
const databaseService = this.#services.get(DatabaseService);
const db = await databaseService.getInstance();
@@ -111,8 +156,6 @@ class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
content: base64ToMaybeBuffer(document.content ?? current.content) || null,
id,
});
this.emit('updated', resultDocument);
this.emit('upserted', resultDocument);
return {
id,
action: 'updated',
@@ -142,8 +185,6 @@ class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
createdAt: now,
updatedAt: now,
});
this.emit('inserted', resultDocument);
this.emit('upserted', resultDocument);
return {
id,
action: 'inserted',
@@ -173,6 +214,13 @@ class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
return result;
};
[destroy] = async () => {
this.#subscribeListenAbortController.abort();
if (this.#databaseListenAbortController) {
this.#databaseListenAbortController.abort();
}
};
}
export * from './documents.schemas.js';

View File

@@ -1,6 +1,7 @@
import type { ExplicitAny } from '../global.js';
type EventListener<T extends unknown[]> = (...args: T) => void | Promise<void>;
type SubscribeListener<T> = (type: T) => void | Promise<void>;
type OnOptions = {
abortSignal?: AbortSignal;
@@ -8,8 +9,25 @@ type OnOptions = {
class EventEmitter<T extends Record<string, (...args: ExplicitAny[]) => void | Promise<void>>> {
#listeners = new Map<keyof T, Set<EventListener<ExplicitAny>>>();
#subscribeListeners = new Set<SubscribeListener<keyof T>>();
onSubscribe = (callback: SubscribeListener<keyof T>, options: OnOptions = {}) => {
const { abortSignal } = options;
const callbackClone = (type: keyof T) => callback(type);
this.#subscribeListeners.add(callbackClone);
const abortController = new AbortController();
abortSignal?.addEventListener('abort', abortController.abort);
abortController.signal.addEventListener('abort', () => {
this.#subscribeListeners.difference(new Set([callbackClone]));
});
return abortController.abort;
};
on = <K extends keyof T>(event: K, callback: EventListener<Parameters<T[K]>>, options: OnOptions = {}) => {
for (const subscribeListener of this.#subscribeListeners) {
subscribeListener(event);
}
const { abortSignal } = options;
if (!this.#listeners.has(event)) {
this.#listeners.set(event, new Set());

View File

@@ -28,6 +28,9 @@ class BaseError extends Error {
}
const createApi = async (runtime: StashRuntime = new StashRuntime()) => {
runtime.documents.on('upserted', (document) => {
console.log(document);
});
const app = fastify().withTypeProvider<ZodTypeProvider>();
app.setValidatorCompiler(validatorCompiler);
app.setSerializerCompiler(serializerCompiler);