diff --git a/packages/runtime/src/services/documents/documents.schemas.ts b/packages/runtime/src/services/documents/documents.schemas.ts index 9f9fcee..4dc6409 100644 --- a/packages/runtime/src/services/documents/documents.schemas.ts +++ b/packages/runtime/src/services/documents/documents.schemas.ts @@ -72,11 +72,26 @@ const documentFindResultSchema = createListResultSchema(documentSchema); type DocumentFindResult = z.infer; -export type { Document, DocumentUpsert, DocumentUpsertResult, DocumentFilter, DocumentFindResult }; +const documentFilterChangedEventSchema = z.object({ + action: z.enum(['add', 'remove', 'update']), + document: documentSchema, +}); + +type DocumentFilterChangedEvent = z.infer; + +export type { + Document, + DocumentUpsert, + DocumentUpsertResult, + DocumentFilter, + DocumentFindResult, + DocumentFilterChangedEvent, +}; export { documentSchema, documentUpsertSchema, documentUpsertResultSchema, documentFilterSchema, documentFindResultSchema, + documentFilterChangedEventSchema, }; diff --git a/packages/runtime/src/services/documents/documents.ts b/packages/runtime/src/services/documents/documents.ts index c8bfce8..27e97f8 100644 --- a/packages/runtime/src/services/documents/documents.ts +++ b/packages/runtime/src/services/documents/documents.ts @@ -1,4 +1,4 @@ -import { QueryParser } from '@morten-olsen/stash-query-dsl'; +import { isMatch, QueryParser, type QueryFilter } from '@morten-olsen/stash-query-dsl'; import { DatabaseService, tableNames, type TableRows } from '../database/database.js'; import { SplittingService } from '../splitter/splitter.js'; @@ -12,6 +12,7 @@ import { mapFromDocumentRow } from './documents.mapping.js'; import { type Document, type DocumentFilter, + type DocumentFilterChangedEvent, type DocumentFindResult, type DocumentUpsert, type DocumentUpsertResult, @@ -24,10 +25,16 @@ type DocumentsServiceEvents = { deleted: (document: Document) => void; }; +type DocumentServiceFilterSubscriber = { + filter?: QueryFilter | string; + fn: (event: DocumentFilterChangedEvent) => void; + abortSignal?: AbortSignal; +}; + class DocumentsService extends EventEmitter { #services: Services; #subscribeListenAbortController: AbortController; - #databaseListenAbortController?: AbortController; + #databaseListenAbortController?: Promise; constructor(services: Services) { super(); @@ -43,29 +50,8 @@ class DocumentsService extends EventEmitter { this.#services = services; } - public find = async (filter: DocumentFilter): Promise => { - const databaseService = this.#services.get(DatabaseService); - const db = await databaseService.getInstance(); - let query = db(tableNames.documents); - if (filter) { - const parser = this.#services.get(QueryParser); - query = applyQueryFilter( - query, - typeof filter.condition === 'string' ? parser.parse(filter.condition) : filter.condition, - ); - } - query = query.limit(filter.limit).offset(filter.offset); - const items = await query; - return { - items: items.map(mapFromDocumentRow), - }; - }; - - #listen = async () => { - if (this.#databaseListenAbortController) { - return; - } - this.#databaseListenAbortController = new AbortController(); + #setupListen = async () => { + const abortController = new AbortController(); const databaseService = this.#services.get(DatabaseService); await databaseService.listen(); databaseService.on( @@ -90,8 +76,100 @@ class DocumentsService extends EventEmitter { this.emit('deleted', oldDocument); } }, - { abortSignal: this.#databaseListenAbortController.signal }, + { abortSignal: abortController.signal }, ); + return abortController; + }; + + #listen = async () => { + if (!this.#databaseListenAbortController) { + this.#databaseListenAbortController = this.#setupListen(); + } + return this.#databaseListenAbortController; + }; + + public subscribe = async (options: DocumentServiceFilterSubscriber) => { + const abortController = new AbortController(); + const queryParser = this.#services.get(QueryParser); + const filter = typeof options.filter === 'string' ? queryParser.parse(options.filter) : options.filter; + + this.on( + 'inserted', + (next) => { + const nextIncluded = !filter || isMatch(next, filter); + if (!nextIncluded) { + return; + } + options.fn({ + action: 'add', + document: next, + }); + }, + { abortSignal: abortController.signal }, + ); + + this.on( + 'updated', + (next, prev) => { + const nextIncluded = !filter || isMatch(next, filter); + const prevIncluded = !filter || isMatch(prev, filter); + if (nextIncluded && prevIncluded) { + options.fn({ + action: 'update', + document: next, + }); + } else if (nextIncluded && !prevIncluded) { + options.fn({ + action: 'add', + document: next, + }); + } else if (!nextIncluded && prevIncluded) { + options.fn({ + action: 'remove', + document: next, + }); + } + }, + { abortSignal: abortController.signal }, + ); + + this.on( + 'deleted', + (prev) => { + const prevIncluded = !filter || isMatch(prev, filter); + if (!prevIncluded) { + return; + } + options.fn({ + action: 'remove', + document: prev, + }); + }, + { abortSignal: abortController.signal }, + ); + + options.abortSignal?.addEventListener('abort', () => abortController.abort()); + + await this.#listen(); + return () => abortController.abort(); + }; + + public find = async (filter: DocumentFilter): Promise => { + const databaseService = this.#services.get(DatabaseService); + const db = await databaseService.getInstance(); + let query = db(tableNames.documents); + if (filter) { + const parser = this.#services.get(QueryParser); + query = applyQueryFilter( + query, + typeof filter.condition === 'string' ? parser.parse(filter.condition) : filter.condition, + ); + } + query = query.limit(filter.limit).offset(filter.offset); + const items = await query; + return { + items: items.map(mapFromDocumentRow), + }; }; public get = async (id: string): Promise => { @@ -134,6 +212,7 @@ class DocumentsService extends EventEmitter { 'type', 'typeVersion', 'metadata', + 'text', ]) ) { return { @@ -218,7 +297,7 @@ class DocumentsService extends EventEmitter { [destroy] = async () => { this.#subscribeListenAbortController.abort(); if (this.#databaseListenAbortController) { - this.#databaseListenAbortController.abort(); + (await this.#databaseListenAbortController).abort(); } }; } diff --git a/packages/server/src/api.ts b/packages/server/src/api.ts index 184d21c..360c161 100644 --- a/packages/server/src/api.ts +++ b/packages/server/src/api.ts @@ -28,8 +28,11 @@ class BaseError extends Error { } const createApi = async (runtime: StashRuntime = new StashRuntime()) => { - runtime.documents.on('upserted', (document) => { - console.log(document); + runtime.documents.subscribe({ + filter: "metadata.foo = 'bar'", + fn: (document) => { + console.log(document); + }, }); const app = fastify().withTypeProvider(); app.setValidatorCompiler(validatorCompiler);