From 75d24c31c2ea212c60cb34eef7839e95ba98aa9e Mon Sep 17 00:00:00 2001 From: Morten Olsen Date: Mon, 3 Nov 2025 17:13:34 +0100 Subject: [PATCH] feat: add ingestion queue --- packages/server/package.json | 1 + packages/server/src/api/api.documents.ts | 3 +- .../src/services/documents/documents.ts | 106 +++++++++--------- pnpm-lock.yaml | 23 ++++ 4 files changed, 81 insertions(+), 52 deletions(-) diff --git a/packages/server/package.json b/packages/server/package.json index 419a61b..6635263 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -34,6 +34,7 @@ "fastify": "^5.6.1", "fastify-type-provider-zod": "^6.1.0", "knex": "^3.1.0", + "p-queue": "^9.0.0", "pg": "^8.16.3", "pino": "^10.1.0", "pino-pretty": "^13.1.2", diff --git a/packages/server/src/api/api.documents.ts b/packages/server/src/api/api.documents.ts index 97bf9ec..6d2a393 100644 --- a/packages/server/src/api/api.documents.ts +++ b/packages/server/src/api/api.documents.ts @@ -1,4 +1,3 @@ -import { z } from 'zod'; import type { FastifyPluginAsyncZod } from 'fastify-type-provider-zod'; import { DocumentsService } from '#root/services/documents/documents.ts'; @@ -12,7 +11,7 @@ const documentsPlugin: FastifyPluginAsyncZod = async (app) => { method: 'POST', url: '', schema: { - operationId: 'v1.documents.put', + operationId: 'v1.documents.post', tags: ['documents'], summary: 'Upsert documents', body: upsertDocumentRequestSchema, diff --git a/packages/server/src/services/documents/documents.ts b/packages/server/src/services/documents/documents.ts index a8aa681..b38975c 100644 --- a/packages/server/src/services/documents/documents.ts +++ b/packages/server/src/services/documents/documents.ts @@ -1,4 +1,5 @@ import equal from 'fast-deep-equal'; +import Queue from 'p-queue'; import type { UpsertDocumentRequest, UpsertDocumentResponse } from './documents.schemas.ts'; @@ -7,71 +8,76 @@ import type { Services } from '#root/utils/utils.services.ts'; class DocumentsService { #services: Services; + #queue: Queue; constructor(services: Services) { this.#services = services; + this.#queue = new Queue({ + concurrency: 10, + }); } - public upsert = async (document: UpsertDocumentRequest): Promise => { - const dbService = this.#services.get(DatabaseService); - const db = await dbService.getInstance(); + public upsert = (document: UpsertDocumentRequest): Promise => + this.#queue.add(async () => { + const dbService = this.#services.get(DatabaseService); + const db = await dbService.getInstance(); - const id = document.id || crypto.randomUUID(); + const id = document.id || crypto.randomUUID(); - const [current] = await db(tableNames).where({ - id, - type: document.type, - }); - const now = new Date(); - - if (!current) { - await db(tableNames.documents).insert({ + const [current] = await db(tableNames).where({ id, type: document.type, - createdAt: now.toISOString(), - updatedAt: now.toISOString(), - data: JSON.stringify(document.data), }); - return { - data: document.data, - id, - type: document.type, - source: document.source || null, - createdAt: now.toISOString(), - updatedAt: now.toISOString(), - deletedAt: null, - action: 'inserted', - }; - } - const currentData = JSON.parse(current.data); - if (equal(currentData, document.data)) { + const now = new Date(); + + if (!current) { + await db(tableNames.documents).insert({ + id, + type: document.type, + createdAt: now.toISOString(), + updatedAt: now.toISOString(), + data: JSON.stringify(document.data), + }); + return { + data: document.data, + id, + type: document.type, + source: document.source || null, + createdAt: now.toISOString(), + updatedAt: now.toISOString(), + deletedAt: null, + action: 'inserted', + }; + } + const currentData = JSON.parse(current.data); + if (equal(currentData, document.data)) { + return { + ...current, + data: currentData, + id, + createdAt: current.createdAt, + updatedAt: current.updatedAt, + deletedAt: current.deletedAt || null, + action: 'skipped', + }; + } + await db(tableNames.documents) + .update({ + source: document.source, + data: JSON.stringify(document.data), + updatedAt: now.toISOString(), + }) + .where({ id, type: document.type }); return { ...current, - data: currentData, id, + data: document.data, createdAt: current.createdAt, - updatedAt: current.updatedAt, - deletedAt: current.deletedAt || null, - action: 'skipped', - }; - } - await db(tableNames.documents) - .update({ - source: document.source, - data: JSON.stringify(document.data), updatedAt: now.toISOString(), - }) - .where({ id, type: document.type }); - return { - ...current, - id, - data: document.data, - createdAt: current.createdAt, - updatedAt: now.toISOString(), - deletedAt: current.deletedAt || null, - action: 'updated', - }; - }; + deletedAt: current.deletedAt || null, + action: 'updated', + }; + }); } export { DocumentsService }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d29727e..cc252c9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -73,6 +73,9 @@ importers: knex: specifier: ^3.1.0 version: 3.1.0(better-sqlite3@12.4.1)(pg@8.16.3) + p-queue: + specifier: ^9.0.0 + version: 9.0.0 pg: specifier: ^8.16.3 version: 8.16.3 @@ -1310,6 +1313,9 @@ packages: resolution: {integrity: sha512-kVscqXk4OCp68SZ0dkgEKVi6/8ij300KBWTJq32P/dYeWTSwK41WyTxalN1eRmA5Z9UU/LX9D7FWSmV9SAYx6g==} engines: {node: '>=0.10.0'} + eventemitter3@5.0.1: + resolution: {integrity: sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==} + execa@5.1.1: resolution: {integrity: sha512-8uSpZZocAZRBAPIEINJj3Lo9HyGitllczc27Eh5YYojjMFMn8yHMDMaUHE2Jqfq05D/wucwI4JGURyXt1vchyg==} engines: {node: '>=10'} @@ -2005,6 +2011,14 @@ packages: resolution: {integrity: sha512-y3b8Kpd8OAN444hxfBbFfj1FY/RjtTd8tzYwhUqNYXx0fXx2iX4maP4Qr6qhIKbQXI02wTLAda4fYUbDagTUFw==} engines: {node: '>=6'} + p-queue@9.0.0: + resolution: {integrity: sha512-KO1RyxstL9g1mK76530TExamZC/S2Glm080Nx8PE5sTd7nlduDQsAfEl4uXX+qZjLiwvDauvzXavufy3+rJ9zQ==} + engines: {node: '>=20'} + + p-timeout@7.0.1: + resolution: {integrity: sha512-AxTM2wDGORHGEkPCt8yqxOTMgpfbEHqF51f/5fJCmwFC3C/zNcGT63SymH2ttOAaiIws2zVg4+izQCjrakcwHg==} + engines: {node: '>=20'} + parent-module@1.0.1: resolution: {integrity: sha512-GQ2EWRpQV8/o+Aw8YqtfZZPfNRWZYkbidE9k5rpl/hC3vtHHBfGm2Ifi6qWV+coDGkrUKZAxE3Lot5kcsRlh+g==} engines: {node: '>=6'} @@ -4113,6 +4127,8 @@ snapshots: esutils@2.0.3: {} + eventemitter3@5.0.1: {} + execa@5.1.1: dependencies: cross-spawn: 7.0.6 @@ -4784,6 +4800,13 @@ snapshots: p-map@2.1.0: {} + p-queue@9.0.0: + dependencies: + eventemitter3: 5.0.1 + p-timeout: 7.0.1 + + p-timeout@7.0.1: {} + parent-module@1.0.1: dependencies: callsites: 3.1.0