feat: support filtered subscriptions
This commit is contained in:
@@ -72,11 +72,26 @@ const documentFindResultSchema = createListResultSchema(documentSchema);
|
||||
|
||||
type DocumentFindResult = z.infer<typeof documentFindResultSchema>;
|
||||
|
||||
export type { Document, DocumentUpsert, DocumentUpsertResult, DocumentFilter, DocumentFindResult };
|
||||
const documentFilterChangedEventSchema = z.object({
|
||||
action: z.enum(['add', 'remove', 'update']),
|
||||
document: documentSchema,
|
||||
});
|
||||
|
||||
type DocumentFilterChangedEvent = z.infer<typeof documentFilterChangedEventSchema>;
|
||||
|
||||
export type {
|
||||
Document,
|
||||
DocumentUpsert,
|
||||
DocumentUpsertResult,
|
||||
DocumentFilter,
|
||||
DocumentFindResult,
|
||||
DocumentFilterChangedEvent,
|
||||
};
|
||||
export {
|
||||
documentSchema,
|
||||
documentUpsertSchema,
|
||||
documentUpsertResultSchema,
|
||||
documentFilterSchema,
|
||||
documentFindResultSchema,
|
||||
documentFilterChangedEventSchema,
|
||||
};
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { QueryParser } from '@morten-olsen/stash-query-dsl';
|
||||
import { isMatch, QueryParser, type QueryFilter } from '@morten-olsen/stash-query-dsl';
|
||||
|
||||
import { DatabaseService, tableNames, type TableRows } from '../database/database.js';
|
||||
import { SplittingService } from '../splitter/splitter.js';
|
||||
@@ -12,6 +12,7 @@ import { mapFromDocumentRow } from './documents.mapping.js';
|
||||
import {
|
||||
type Document,
|
||||
type DocumentFilter,
|
||||
type DocumentFilterChangedEvent,
|
||||
type DocumentFindResult,
|
||||
type DocumentUpsert,
|
||||
type DocumentUpsertResult,
|
||||
@@ -24,10 +25,16 @@ type DocumentsServiceEvents = {
|
||||
deleted: (document: Document) => void;
|
||||
};
|
||||
|
||||
type DocumentServiceFilterSubscriber = {
|
||||
filter?: QueryFilter | string;
|
||||
fn: (event: DocumentFilterChangedEvent) => void;
|
||||
abortSignal?: AbortSignal;
|
||||
};
|
||||
|
||||
class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
|
||||
#services: Services;
|
||||
#subscribeListenAbortController: AbortController;
|
||||
#databaseListenAbortController?: AbortController;
|
||||
#databaseListenAbortController?: Promise<AbortController>;
|
||||
|
||||
constructor(services: Services) {
|
||||
super();
|
||||
@@ -43,29 +50,8 @@ class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
|
||||
this.#services = services;
|
||||
}
|
||||
|
||||
public find = async (filter: DocumentFilter): Promise<DocumentFindResult> => {
|
||||
const databaseService = this.#services.get(DatabaseService);
|
||||
const db = await databaseService.getInstance();
|
||||
let query = db<TableRows['documents']>(tableNames.documents);
|
||||
if (filter) {
|
||||
const parser = this.#services.get(QueryParser);
|
||||
query = applyQueryFilter(
|
||||
query,
|
||||
typeof filter.condition === 'string' ? parser.parse(filter.condition) : filter.condition,
|
||||
);
|
||||
}
|
||||
query = query.limit(filter.limit).offset(filter.offset);
|
||||
const items = await query;
|
||||
return {
|
||||
items: items.map(mapFromDocumentRow),
|
||||
};
|
||||
};
|
||||
|
||||
#listen = async () => {
|
||||
if (this.#databaseListenAbortController) {
|
||||
return;
|
||||
}
|
||||
this.#databaseListenAbortController = new AbortController();
|
||||
#setupListen = async () => {
|
||||
const abortController = new AbortController();
|
||||
const databaseService = this.#services.get(DatabaseService);
|
||||
await databaseService.listen();
|
||||
databaseService.on(
|
||||
@@ -90,8 +76,100 @@ class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
|
||||
this.emit('deleted', oldDocument);
|
||||
}
|
||||
},
|
||||
{ abortSignal: this.#databaseListenAbortController.signal },
|
||||
{ abortSignal: abortController.signal },
|
||||
);
|
||||
return abortController;
|
||||
};
|
||||
|
||||
#listen = async () => {
|
||||
if (!this.#databaseListenAbortController) {
|
||||
this.#databaseListenAbortController = this.#setupListen();
|
||||
}
|
||||
return this.#databaseListenAbortController;
|
||||
};
|
||||
|
||||
public subscribe = async (options: DocumentServiceFilterSubscriber) => {
|
||||
const abortController = new AbortController();
|
||||
const queryParser = this.#services.get(QueryParser);
|
||||
const filter = typeof options.filter === 'string' ? queryParser.parse(options.filter) : options.filter;
|
||||
|
||||
this.on(
|
||||
'inserted',
|
||||
(next) => {
|
||||
const nextIncluded = !filter || isMatch(next, filter);
|
||||
if (!nextIncluded) {
|
||||
return;
|
||||
}
|
||||
options.fn({
|
||||
action: 'add',
|
||||
document: next,
|
||||
});
|
||||
},
|
||||
{ abortSignal: abortController.signal },
|
||||
);
|
||||
|
||||
this.on(
|
||||
'updated',
|
||||
(next, prev) => {
|
||||
const nextIncluded = !filter || isMatch(next, filter);
|
||||
const prevIncluded = !filter || isMatch(prev, filter);
|
||||
if (nextIncluded && prevIncluded) {
|
||||
options.fn({
|
||||
action: 'update',
|
||||
document: next,
|
||||
});
|
||||
} else if (nextIncluded && !prevIncluded) {
|
||||
options.fn({
|
||||
action: 'add',
|
||||
document: next,
|
||||
});
|
||||
} else if (!nextIncluded && prevIncluded) {
|
||||
options.fn({
|
||||
action: 'remove',
|
||||
document: next,
|
||||
});
|
||||
}
|
||||
},
|
||||
{ abortSignal: abortController.signal },
|
||||
);
|
||||
|
||||
this.on(
|
||||
'deleted',
|
||||
(prev) => {
|
||||
const prevIncluded = !filter || isMatch(prev, filter);
|
||||
if (!prevIncluded) {
|
||||
return;
|
||||
}
|
||||
options.fn({
|
||||
action: 'remove',
|
||||
document: prev,
|
||||
});
|
||||
},
|
||||
{ abortSignal: abortController.signal },
|
||||
);
|
||||
|
||||
options.abortSignal?.addEventListener('abort', () => abortController.abort());
|
||||
|
||||
await this.#listen();
|
||||
return () => abortController.abort();
|
||||
};
|
||||
|
||||
public find = async (filter: DocumentFilter): Promise<DocumentFindResult> => {
|
||||
const databaseService = this.#services.get(DatabaseService);
|
||||
const db = await databaseService.getInstance();
|
||||
let query = db<TableRows['documents']>(tableNames.documents);
|
||||
if (filter) {
|
||||
const parser = this.#services.get(QueryParser);
|
||||
query = applyQueryFilter(
|
||||
query,
|
||||
typeof filter.condition === 'string' ? parser.parse(filter.condition) : filter.condition,
|
||||
);
|
||||
}
|
||||
query = query.limit(filter.limit).offset(filter.offset);
|
||||
const items = await query;
|
||||
return {
|
||||
items: items.map(mapFromDocumentRow),
|
||||
};
|
||||
};
|
||||
|
||||
public get = async (id: string): Promise<Document> => {
|
||||
@@ -134,6 +212,7 @@ class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
|
||||
'type',
|
||||
'typeVersion',
|
||||
'metadata',
|
||||
'text',
|
||||
])
|
||||
) {
|
||||
return {
|
||||
@@ -218,7 +297,7 @@ class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
|
||||
[destroy] = async () => {
|
||||
this.#subscribeListenAbortController.abort();
|
||||
if (this.#databaseListenAbortController) {
|
||||
this.#databaseListenAbortController.abort();
|
||||
(await this.#databaseListenAbortController).abort();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user