feat: support filtered subscriptions
Some checks failed
Build and release / Build (push) Failing after 1m13s
Build and release / update-release-draft (push) Has been skipped
Build and release / Release (push) Has been skipped

This commit is contained in:
Morten Olsen
2025-12-10 23:46:52 +01:00
parent c7f9270ef2
commit 857d4f0b42
2 changed files with 65 additions and 29 deletions

View File

@@ -1,4 +1,4 @@
import { QueryParser } from '@morten-olsen/stash-query-dsl';
import { isMatch, QueryParser, type QueryFilter } from '@morten-olsen/stash-query-dsl';
import { DatabaseService, tableNames, type TableRows } from '../database/database.js';
import { SplittingService } from '../splitter/splitter.js';
@@ -24,10 +24,16 @@ type DocumentsServiceEvents = {
deleted: (document: Document) => void;
};
type DocumentServiceFilterSubscriber = {
filter?: QueryFilter | string;
fn: (document: Document) => void;
abortSignal?: AbortSignal;
};
class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
#services: Services;
#subscribeListenAbortController: AbortController;
#databaseListenAbortController?: AbortController;
#databaseListenAbortController?: Promise<AbortController>;
constructor(services: Services) {
super();
@@ -43,29 +49,8 @@ class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
this.#services = services;
}
public find = async (filter: DocumentFilter): Promise<DocumentFindResult> => {
const databaseService = this.#services.get(DatabaseService);
const db = await databaseService.getInstance();
let query = db<TableRows['documents']>(tableNames.documents);
if (filter) {
const parser = this.#services.get(QueryParser);
query = applyQueryFilter(
query,
typeof filter.condition === 'string' ? parser.parse(filter.condition) : filter.condition,
);
}
query = query.limit(filter.limit).offset(filter.offset);
const items = await query;
return {
items: items.map(mapFromDocumentRow),
};
};
#listen = async () => {
if (this.#databaseListenAbortController) {
return;
}
this.#databaseListenAbortController = new AbortController();
#setupListen = async () => {
const abortController = new AbortController();
const databaseService = this.#services.get(DatabaseService);
await databaseService.listen();
databaseService.on(
@@ -90,8 +75,56 @@ class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
this.emit('deleted', oldDocument);
}
},
{ abortSignal: this.#databaseListenAbortController.signal },
{ abortSignal: abortController.signal },
);
return abortController;
};
#listen = async () => {
if (!this.#databaseListenAbortController) {
this.#databaseListenAbortController = this.#setupListen();
}
return this.#databaseListenAbortController;
};
public subscribe = async (options: DocumentServiceFilterSubscriber) => {
const abortController = new AbortController();
const queryParser = this.#services.get(QueryParser);
const filter = typeof options.filter === 'string' ? queryParser.parse(options.filter) : options.filter;
this.on(
'upserted',
(document) => {
if (filter && !isMatch(document, filter)) {
return;
}
options.fn(document);
},
{ abortSignal: abortController.signal },
);
options.abortSignal?.addEventListener('abort', () => abortController.abort());
await this.#listen();
return () => abortController.abort();
};
public find = async (filter: DocumentFilter): Promise<DocumentFindResult> => {
const databaseService = this.#services.get(DatabaseService);
const db = await databaseService.getInstance();
let query = db<TableRows['documents']>(tableNames.documents);
if (filter) {
const parser = this.#services.get(QueryParser);
query = applyQueryFilter(
query,
typeof filter.condition === 'string' ? parser.parse(filter.condition) : filter.condition,
);
}
query = query.limit(filter.limit).offset(filter.offset);
const items = await query;
return {
items: items.map(mapFromDocumentRow),
};
};
public get = async (id: string): Promise<Document> => {
@@ -218,7 +251,7 @@ class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
[destroy] = async () => {
this.#subscribeListenAbortController.abort();
if (this.#databaseListenAbortController) {
this.#databaseListenAbortController.abort();
(await this.#databaseListenAbortController).abort();
}
};
}