feat: add ingestion queue

This commit is contained in:
Morten Olsen
2025-11-03 17:13:34 +01:00
parent 2f542c3066
commit 75d24c31c2
4 changed files with 81 additions and 52 deletions

View File

@@ -34,6 +34,7 @@
"fastify": "^5.6.1",
"fastify-type-provider-zod": "^6.1.0",
"knex": "^3.1.0",
"p-queue": "^9.0.0",
"pg": "^8.16.3",
"pino": "^10.1.0",
"pino-pretty": "^13.1.2",

View File

@@ -1,4 +1,3 @@
import { z } from 'zod';
import type { FastifyPluginAsyncZod } from 'fastify-type-provider-zod';
import { DocumentsService } from '#root/services/documents/documents.ts';
@@ -12,7 +11,7 @@ const documentsPlugin: FastifyPluginAsyncZod = async (app) => {
method: 'POST',
url: '',
schema: {
operationId: 'v1.documents.put',
operationId: 'v1.documents.post',
tags: ['documents'],
summary: 'Upsert documents',
body: upsertDocumentRequestSchema,

View File

@@ -1,4 +1,5 @@
import equal from 'fast-deep-equal';
import Queue from 'p-queue';
import type { UpsertDocumentRequest, UpsertDocumentResponse } from './documents.schemas.ts';
@@ -7,71 +8,76 @@ import type { Services } from '#root/utils/utils.services.ts';
class DocumentsService {
#services: Services;
#queue: Queue;
constructor(services: Services) {
this.#services = services;
this.#queue = new Queue({
concurrency: 10,
});
}
public upsert = async (document: UpsertDocumentRequest): Promise<UpsertDocumentResponse> => {
const dbService = this.#services.get(DatabaseService);
const db = await dbService.getInstance();
public upsert = (document: UpsertDocumentRequest): Promise<UpsertDocumentResponse> =>
this.#queue.add(async () => {
const dbService = this.#services.get(DatabaseService);
const db = await dbService.getInstance();
const id = document.id || crypto.randomUUID();
const id = document.id || crypto.randomUUID();
const [current] = await db<Tables['document']>(tableNames).where({
id,
type: document.type,
});
const now = new Date();
if (!current) {
await db<Tables['document']>(tableNames.documents).insert({
const [current] = await db<Tables['document']>(tableNames).where({
id,
type: document.type,
createdAt: now.toISOString(),
updatedAt: now.toISOString(),
data: JSON.stringify(document.data),
});
return {
data: document.data,
id,
type: document.type,
source: document.source || null,
createdAt: now.toISOString(),
updatedAt: now.toISOString(),
deletedAt: null,
action: 'inserted',
};
}
const currentData = JSON.parse(current.data);
if (equal(currentData, document.data)) {
const now = new Date();
if (!current) {
await db<Tables['document']>(tableNames.documents).insert({
id,
type: document.type,
createdAt: now.toISOString(),
updatedAt: now.toISOString(),
data: JSON.stringify(document.data),
});
return {
data: document.data,
id,
type: document.type,
source: document.source || null,
createdAt: now.toISOString(),
updatedAt: now.toISOString(),
deletedAt: null,
action: 'inserted',
};
}
const currentData = JSON.parse(current.data);
if (equal(currentData, document.data)) {
return {
...current,
data: currentData,
id,
createdAt: current.createdAt,
updatedAt: current.updatedAt,
deletedAt: current.deletedAt || null,
action: 'skipped',
};
}
await db<Tables['document']>(tableNames.documents)
.update({
source: document.source,
data: JSON.stringify(document.data),
updatedAt: now.toISOString(),
})
.where({ id, type: document.type });
return {
...current,
data: currentData,
id,
data: document.data,
createdAt: current.createdAt,
updatedAt: current.updatedAt,
deletedAt: current.deletedAt || null,
action: 'skipped',
};
}
await db<Tables['document']>(tableNames.documents)
.update({
source: document.source,
data: JSON.stringify(document.data),
updatedAt: now.toISOString(),
})
.where({ id, type: document.type });
return {
...current,
id,
data: document.data,
createdAt: current.createdAt,
updatedAt: now.toISOString(),
deletedAt: current.deletedAt || null,
action: 'updated',
};
};
deletedAt: current.deletedAt || null,
action: 'updated',
};
});
}
export { DocumentsService };