update
This commit is contained in:
179
packages/runtime/src/services/documents/documents.ts
Normal file
179
packages/runtime/src/services/documents/documents.ts
Normal file
@@ -0,0 +1,179 @@
|
||||
import { QueryParser } from '@morten-olsen/stash-query-dsl';
|
||||
|
||||
import { DatabaseService, tableNames, type TableRows } from '../database/database.js';
|
||||
import { SplittingService } from '../splitter/splitter.js';
|
||||
|
||||
import type {
|
||||
Document,
|
||||
DocumentFilter,
|
||||
DocumentFindResult,
|
||||
DocumentUpsert,
|
||||
DocumentUpsertResult,
|
||||
} from './documents.schemas.ts';
|
||||
import { mapFromDocumentRow } from './documents.mapping.js';
|
||||
|
||||
import { EventEmitter } from '#root/utils/utils.event-emitter.js';
|
||||
import type { Services } from '#root/utils/utils.services.js';
|
||||
import { compareObjectKeys } from '#root/utils/utils.compare.js';
|
||||
import { applyQueryFilter } from '#root/utils/utils.query.js';
|
||||
|
||||
type DocumentsServiceEvents = {
|
||||
upserted: (document: Document) => void;
|
||||
inserted: (document: Document) => void;
|
||||
updated: (document: Document) => void;
|
||||
};
|
||||
|
||||
class DocumentsService extends EventEmitter<DocumentsServiceEvents> {
|
||||
#services: Services;
|
||||
|
||||
constructor(services: Services) {
|
||||
super();
|
||||
this.#services = services;
|
||||
}
|
||||
|
||||
public find = async (filter: DocumentFilter): Promise<DocumentFindResult> => {
|
||||
const databaseService = this.#services.get(DatabaseService);
|
||||
const db = await databaseService.getInstance();
|
||||
let query = db<TableRows['documents']>(tableNames.documents);
|
||||
if (filter) {
|
||||
const parser = this.#services.get(QueryParser);
|
||||
query = applyQueryFilter(
|
||||
query,
|
||||
typeof filter.condition === 'string' ? parser.parse(filter.condition) : filter.condition,
|
||||
);
|
||||
}
|
||||
query = query.limit(filter.limit).offset(filter.offset);
|
||||
const items = await query;
|
||||
return {
|
||||
items: items.map(mapFromDocumentRow),
|
||||
};
|
||||
};
|
||||
|
||||
public get = async (id: string): Promise<Document> => {
|
||||
const databaseService = this.#services.get(DatabaseService);
|
||||
const db = await databaseService.getInstance();
|
||||
const [item] = await db<TableRows['documents']>(tableNames.documents).where('id', id).limit(1);
|
||||
return mapFromDocumentRow(item);
|
||||
};
|
||||
|
||||
public remove = async (id: string): Promise<void> => {
|
||||
const databaseService = this.#services.get(DatabaseService);
|
||||
const db = await databaseService.getInstance();
|
||||
await db<TableRows['documents']>(tableNames.documents).where('id', id).delete();
|
||||
};
|
||||
|
||||
public upsert = async (document: DocumentUpsert): Promise<DocumentUpsertResult> => {
|
||||
const databaseService = this.#services.get(DatabaseService);
|
||||
const db = await databaseService.getInstance();
|
||||
|
||||
const result = await db.transaction(async (trx) => {
|
||||
let id = document.id || crypto.randomUUID();
|
||||
if (document.source && document.sourceId) {
|
||||
const [currentSourceDocument] = await trx<TableRows['documents']>(tableNames.documents)
|
||||
.where('source', document.source)
|
||||
.andWhere('sourceId', document.sourceId)
|
||||
.limit(1);
|
||||
if (currentSourceDocument) {
|
||||
id = currentSourceDocument.id;
|
||||
}
|
||||
}
|
||||
const now = new Date();
|
||||
const [current] = await trx<TableRows['documents']>(tableNames.documents).where('id', id).limit(1);
|
||||
if (current) {
|
||||
if (
|
||||
compareObjectKeys(current, document, [
|
||||
'sourceId',
|
||||
'source',
|
||||
'content',
|
||||
'contentType',
|
||||
'searchText',
|
||||
'type',
|
||||
'typeVersion',
|
||||
'metadata',
|
||||
])
|
||||
) {
|
||||
return {
|
||||
id,
|
||||
action: 'skipped',
|
||||
document: mapFromDocumentRow(current),
|
||||
} as const;
|
||||
}
|
||||
await trx<TableRows['documents']>(tableNames.documents)
|
||||
.update({
|
||||
...document,
|
||||
id,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where('id', id);
|
||||
const resultDocument: Document = mapFromDocumentRow({
|
||||
...current,
|
||||
...document,
|
||||
id,
|
||||
});
|
||||
this.emit('updated', resultDocument);
|
||||
this.emit('upserted', resultDocument);
|
||||
return {
|
||||
id,
|
||||
action: 'updated',
|
||||
document: resultDocument,
|
||||
} as const;
|
||||
} else {
|
||||
await trx<TableRows['documents']>(tableNames.documents).insert({
|
||||
metadata: {},
|
||||
type: 'raw',
|
||||
...document,
|
||||
id,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
});
|
||||
const resultDocument: Document = mapFromDocumentRow({
|
||||
type: 'raw',
|
||||
owner: null,
|
||||
contentType: null,
|
||||
content: null,
|
||||
source: null,
|
||||
sourceId: null,
|
||||
typeVersion: null,
|
||||
searchText: null,
|
||||
metadata: {},
|
||||
...document,
|
||||
deletedAt: null,
|
||||
id,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
});
|
||||
this.emit('inserted', resultDocument);
|
||||
this.emit('upserted', resultDocument);
|
||||
return {
|
||||
id,
|
||||
action: 'inserted',
|
||||
document: resultDocument,
|
||||
} as const;
|
||||
}
|
||||
});
|
||||
|
||||
if (result.action !== 'skipped') {
|
||||
await db.transaction(async (trx) => {
|
||||
await trx<TableRows['documentChunks']>(tableNames.documentChunks).delete().where('owner', result.id);
|
||||
const splittingService = this.#services.get(SplittingService);
|
||||
const chunks = await splittingService.chunk(result.document);
|
||||
if (chunks.length > 0) {
|
||||
await trx<TableRows['documentChunks']>(tableNames.documentChunks).insert(
|
||||
chunks.map((chunk) => ({
|
||||
id: crypto.randomUUID(),
|
||||
owner: result.id,
|
||||
content: chunk.content,
|
||||
embedding: chunk.vector.toSql(),
|
||||
embeddingModel: chunk.model,
|
||||
})),
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return result;
|
||||
};
|
||||
}
|
||||
|
||||
export * from './documents.schemas.js';
|
||||
export { DocumentsService };
|
||||
Reference in New Issue
Block a user