182 lines
5.9 KiB
TypeScript
182 lines
5.9 KiB
TypeScript
import { QueryParser } from '@morten-olsen/stash-query-dsl';
|
|
|
|
import { DatabaseService, tableNames, type TableRows } from '../database/database.js';
|
|
import { SplittingService } from '../splitter/splitter.js';
|
|
import { EventEmitter } from '../../utils/utils.event-emitter.js';
|
|
import type { Services } from '../../utils/utils.services.js';
|
|
import { compareObjectKeys } from '../../utils/utils.compare.js';
|
|
import { applyQueryFilter } from '../../utils/utils.query.js';
|
|
import { base64ToMaybeBuffer } from '../../utils/utils.binary.js';
|
|
|
|
import { mapFromDocumentRow } from './documents.mapping.js';
|
|
import type {
|
|
Document,
|
|
DocumentFilter,
|
|
DocumentFindResult,
|
|
DocumentUpsert,
|
|
DocumentUpsertResult,
|
|
} from './documents.schemas.ts';
|
|
|
|
type DocumentsServiceEvents = {
|
|
upserted: (document: Document) => void;
|
|
inserted: (document: Document) => void;
|
|
updated: (document: Document) => void;
|
|
};
|
|
|
|
class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
|
|
#services: Services;
|
|
|
|
constructor(services: Services) {
|
|
super();
|
|
this.#services = services;
|
|
}
|
|
|
|
public find = async (filter: DocumentFilter): Promise<DocumentFindResult> => {
|
|
const databaseService = this.#services.get(DatabaseService);
|
|
const db = await databaseService.getInstance();
|
|
let query = db<TableRows['documents']>(tableNames.documents);
|
|
if (filter) {
|
|
const parser = this.#services.get(QueryParser);
|
|
query = applyQueryFilter(
|
|
query,
|
|
typeof filter.condition === 'string' ? parser.parse(filter.condition) : filter.condition,
|
|
);
|
|
}
|
|
query = query.limit(filter.limit).offset(filter.offset);
|
|
const items = await query;
|
|
return {
|
|
items: items.map(mapFromDocumentRow),
|
|
};
|
|
};
|
|
|
|
public get = async (id: string): Promise<Document> => {
|
|
const databaseService = this.#services.get(DatabaseService);
|
|
const db = await databaseService.getInstance();
|
|
const [item] = await db<TableRows['documents']>(tableNames.documents).where('id', id).limit(1);
|
|
return mapFromDocumentRow(item);
|
|
};
|
|
|
|
public remove = async (id: string): Promise<void> => {
|
|
const databaseService = this.#services.get(DatabaseService);
|
|
const db = await databaseService.getInstance();
|
|
await db<TableRows['documents']>(tableNames.documents).where('id', id).delete();
|
|
};
|
|
|
|
public upsert = async (document: DocumentUpsert): Promise<DocumentUpsertResult> => {
|
|
const databaseService = this.#services.get(DatabaseService);
|
|
const db = await databaseService.getInstance();
|
|
|
|
const result = await db.transaction(async (trx) => {
|
|
let id = document.id || crypto.randomUUID();
|
|
if (document.source && document.sourceId) {
|
|
const [currentSourceDocument] = await trx<TableRows['documents']>(tableNames.documents)
|
|
.where('source', document.source)
|
|
.andWhere('sourceId', document.sourceId)
|
|
.limit(1);
|
|
if (currentSourceDocument) {
|
|
id = currentSourceDocument.id;
|
|
}
|
|
}
|
|
const now = new Date();
|
|
const [current] = await trx<TableRows['documents']>(tableNames.documents).where('id', id).limit(1);
|
|
if (current) {
|
|
if (
|
|
compareObjectKeys(current, document, [
|
|
'sourceId',
|
|
'source',
|
|
'content',
|
|
'contentType',
|
|
'type',
|
|
'typeVersion',
|
|
'metadata',
|
|
])
|
|
) {
|
|
return {
|
|
id,
|
|
action: 'skipped',
|
|
document: mapFromDocumentRow(current),
|
|
} as const;
|
|
}
|
|
await trx<TableRows['documents']>(tableNames.documents)
|
|
.update({
|
|
...document,
|
|
content: base64ToMaybeBuffer(document.content),
|
|
id,
|
|
updatedAt: now,
|
|
})
|
|
.where('id', id);
|
|
const resultDocument: Document = mapFromDocumentRow({
|
|
...current,
|
|
...document,
|
|
content: base64ToMaybeBuffer(document.content ?? current.content) || null,
|
|
id,
|
|
});
|
|
this.emit('updated', resultDocument);
|
|
this.emit('upserted', resultDocument);
|
|
return {
|
|
id,
|
|
action: 'updated',
|
|
document: resultDocument,
|
|
} as const;
|
|
} else {
|
|
await trx<TableRows['documents']>(tableNames.documents).insert({
|
|
metadata: {},
|
|
type: 'raw',
|
|
...document,
|
|
content: base64ToMaybeBuffer(document.content),
|
|
id,
|
|
createdAt: now,
|
|
updatedAt: now,
|
|
});
|
|
const resultDocument: Document = mapFromDocumentRow({
|
|
type: 'raw',
|
|
text: null,
|
|
owner: null,
|
|
contentType: null,
|
|
source: null,
|
|
sourceId: null,
|
|
typeVersion: null,
|
|
metadata: {},
|
|
...document,
|
|
content: base64ToMaybeBuffer(document.content) || null,
|
|
deletedAt: null,
|
|
id,
|
|
createdAt: now,
|
|
updatedAt: now,
|
|
});
|
|
this.emit('inserted', resultDocument);
|
|
this.emit('upserted', resultDocument);
|
|
return {
|
|
id,
|
|
action: 'inserted',
|
|
document: resultDocument,
|
|
} as const;
|
|
}
|
|
});
|
|
|
|
if (result.action !== 'skipped') {
|
|
await db.transaction(async (trx) => {
|
|
await trx<TableRows['documentChunks']>(tableNames.documentChunks).delete().where('owner', result.id);
|
|
const splittingService = this.#services.get(SplittingService);
|
|
const chunks = await splittingService.chunk(result.document);
|
|
if (chunks.length > 0) {
|
|
await trx<TableRows['documentChunks']>(tableNames.documentChunks).insert(
|
|
chunks.map((chunk) => ({
|
|
id: crypto.randomUUID(),
|
|
owner: result.id,
|
|
content: chunk.content,
|
|
embedding: chunk.vector.toSql(),
|
|
embeddingModel: chunk.model,
|
|
})),
|
|
);
|
|
}
|
|
});
|
|
}
|
|
|
|
return result;
|
|
};
|
|
}
|
|
|
|
export * from './documents.schemas.js';
|
|
export { DocumentsService };
|