3 Commits
0.1.2 ... 0.1.5

Author SHA1 Message Date
Morten Olsen
daa816ac61 fix: postgres object compat 2025-11-03 20:04:35 +01:00
Morten Olsen
75d24c31c2 feat: add ingestion queue 2025-11-03 17:14:27 +01:00
Morten Olsen
2f542c3066 simplify post endpoint 2025-11-03 17:10:33 +01:00
6 changed files with 91 additions and 66 deletions

View File

@@ -34,6 +34,7 @@
"fastify": "^5.6.1",
"fastify-type-provider-zod": "^6.1.0",
"knex": "^3.1.0",
"p-queue": "^9.0.0",
"pg": "^8.16.3",
"pino": "^10.1.0",
"pino-pretty": "^13.1.2",

View File

@@ -1,4 +1,3 @@
import { z } from 'zod';
import type { FastifyPluginAsyncZod } from 'fastify-type-provider-zod';
import { DocumentsService } from '#root/services/documents/documents.ts';
@@ -12,23 +11,18 @@ const documentsPlugin: FastifyPluginAsyncZod = async (app) => {
method: 'POST',
url: '',
schema: {
operationId: 'v1.documents.put',
operationId: 'v1.documents.post',
tags: ['documents'],
summary: 'Upsert documents',
body: z.object({
items: z.array(upsertDocumentRequestSchema),
}),
body: upsertDocumentRequestSchema,
response: {
200: z.object({
items: z.array(upsertDocumentResponseSchema),
}),
200: upsertDocumentResponseSchema,
},
},
handler: async (req, reply) => {
const documentsService = app.services.get(DocumentsService);
const { items } = req.body;
const results = await Promise.all(items.map((item) => documentsService.upsert(item)));
return reply.send({ items: results });
const result = await documentsService.upsert(req.body);
return reply.send(result);
},
});
};

View File

@@ -27,10 +27,10 @@ type DocumentRow = {
id: string;
type: string;
source: string | null;
data: string;
createdAt: string;
updatedAt: string;
deletedAt: string | null;
data: string | unknown;
createdAt: string | Date;
updatedAt: string | Date;
deletedAt: string | Date | null;
};
type Tables = {

View File

@@ -1,4 +1,5 @@
import equal from 'fast-deep-equal';
import Queue from 'p-queue';
import type { UpsertDocumentRequest, UpsertDocumentResponse } from './documents.schemas.ts';
@@ -7,12 +8,17 @@ import type { Services } from '#root/utils/utils.services.ts';
class DocumentsService {
#services: Services;
#queue: Queue;
constructor(services: Services) {
this.#services = services;
this.#queue = new Queue({
concurrency: 10,
});
}
public upsert = async (document: UpsertDocumentRequest): Promise<UpsertDocumentResponse> => {
public upsert = (document: UpsertDocumentRequest): Promise<UpsertDocumentResponse> =>
this.#queue.add(async () => {
const dbService = this.#services.get(DatabaseService);
const db = await dbService.getInstance();
@@ -43,15 +49,15 @@ class DocumentsService {
action: 'inserted',
};
}
const currentData = JSON.parse(current.data);
const currentData = typeof current.data === 'string' ? JSON.parse(current.data) : current.data;
if (equal(currentData, document.data)) {
return {
...current,
data: currentData,
id,
createdAt: current.createdAt,
updatedAt: current.updatedAt,
deletedAt: current.deletedAt || null,
createdAt: new Date(current.createdAt).toISOString(),
updatedAt: new Date(current.updatedAt).toISOString(),
deletedAt: current.deletedAt ? new Date(current.deletedAt).toISOString() : null,
action: 'skipped',
};
}
@@ -66,12 +72,12 @@ class DocumentsService {
...current,
id,
data: document.data,
createdAt: current.createdAt,
createdAt: new Date(current.createdAt).toISOString(),
updatedAt: now.toISOString(),
deletedAt: current.deletedAt || null,
deletedAt: current.deletedAt ? new Date(current.deletedAt).toISOString() : null,
action: 'updated',
};
};
});
}
export { DocumentsService };

23
pnpm-lock.yaml generated
View File

@@ -73,6 +73,9 @@ importers:
knex:
specifier: ^3.1.0
version: 3.1.0(better-sqlite3@12.4.1)(pg@8.16.3)
p-queue:
specifier: ^9.0.0
version: 9.0.0
pg:
specifier: ^8.16.3
version: 8.16.3
@@ -1310,6 +1313,9 @@ packages:
resolution: {integrity: sha512-kVscqXk4OCp68SZ0dkgEKVi6/8ij300KBWTJq32P/dYeWTSwK41WyTxalN1eRmA5Z9UU/LX9D7FWSmV9SAYx6g==}
engines: {node: '>=0.10.0'}
eventemitter3@5.0.1:
resolution: {integrity: sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==}
execa@5.1.1:
resolution: {integrity: sha512-8uSpZZocAZRBAPIEINJj3Lo9HyGitllczc27Eh5YYojjMFMn8yHMDMaUHE2Jqfq05D/wucwI4JGURyXt1vchyg==}
engines: {node: '>=10'}
@@ -2005,6 +2011,14 @@ packages:
resolution: {integrity: sha512-y3b8Kpd8OAN444hxfBbFfj1FY/RjtTd8tzYwhUqNYXx0fXx2iX4maP4Qr6qhIKbQXI02wTLAda4fYUbDagTUFw==}
engines: {node: '>=6'}
p-queue@9.0.0:
resolution: {integrity: sha512-KO1RyxstL9g1mK76530TExamZC/S2Glm080Nx8PE5sTd7nlduDQsAfEl4uXX+qZjLiwvDauvzXavufy3+rJ9zQ==}
engines: {node: '>=20'}
p-timeout@7.0.1:
resolution: {integrity: sha512-AxTM2wDGORHGEkPCt8yqxOTMgpfbEHqF51f/5fJCmwFC3C/zNcGT63SymH2ttOAaiIws2zVg4+izQCjrakcwHg==}
engines: {node: '>=20'}
parent-module@1.0.1:
resolution: {integrity: sha512-GQ2EWRpQV8/o+Aw8YqtfZZPfNRWZYkbidE9k5rpl/hC3vtHHBfGm2Ifi6qWV+coDGkrUKZAxE3Lot5kcsRlh+g==}
engines: {node: '>=6'}
@@ -4113,6 +4127,8 @@ snapshots:
esutils@2.0.3: {}
eventemitter3@5.0.1: {}
execa@5.1.1:
dependencies:
cross-spawn: 7.0.6
@@ -4784,6 +4800,13 @@ snapshots:
p-map@2.1.0: {}
p-queue@9.0.0:
dependencies:
eventemitter3: 5.0.1
p-timeout: 7.0.1
p-timeout@7.0.1: {}
parent-module@1.0.1:
dependencies:
callsites: 3.1.0

View File

@@ -3,3 +3,4 @@ packages:
- ./apps/*
onlyBuiltDependencies:
- better-sqlite3
- esbuild