3 Commits
0.1.2 ... 0.1.5

Author SHA1 Message Date
Morten Olsen
daa816ac61 fix: postgres object compat 2025-11-03 20:04:35 +01:00
Morten Olsen
75d24c31c2 feat: add ingestion queue 2025-11-03 17:14:27 +01:00
Morten Olsen
2f542c3066 simplify post endpoint 2025-11-03 17:10:33 +01:00
6 changed files with 91 additions and 66 deletions

View File

@@ -34,6 +34,7 @@
"fastify": "^5.6.1", "fastify": "^5.6.1",
"fastify-type-provider-zod": "^6.1.0", "fastify-type-provider-zod": "^6.1.0",
"knex": "^3.1.0", "knex": "^3.1.0",
"p-queue": "^9.0.0",
"pg": "^8.16.3", "pg": "^8.16.3",
"pino": "^10.1.0", "pino": "^10.1.0",
"pino-pretty": "^13.1.2", "pino-pretty": "^13.1.2",

View File

@@ -1,4 +1,3 @@
import { z } from 'zod';
import type { FastifyPluginAsyncZod } from 'fastify-type-provider-zod'; import type { FastifyPluginAsyncZod } from 'fastify-type-provider-zod';
import { DocumentsService } from '#root/services/documents/documents.ts'; import { DocumentsService } from '#root/services/documents/documents.ts';
@@ -12,23 +11,18 @@ const documentsPlugin: FastifyPluginAsyncZod = async (app) => {
method: 'POST', method: 'POST',
url: '', url: '',
schema: { schema: {
operationId: 'v1.documents.put', operationId: 'v1.documents.post',
tags: ['documents'], tags: ['documents'],
summary: 'Upsert documents', summary: 'Upsert documents',
body: z.object({ body: upsertDocumentRequestSchema,
items: z.array(upsertDocumentRequestSchema),
}),
response: { response: {
200: z.object({ 200: upsertDocumentResponseSchema,
items: z.array(upsertDocumentResponseSchema),
}),
}, },
}, },
handler: async (req, reply) => { handler: async (req, reply) => {
const documentsService = app.services.get(DocumentsService); const documentsService = app.services.get(DocumentsService);
const { items } = req.body; const result = await documentsService.upsert(req.body);
const results = await Promise.all(items.map((item) => documentsService.upsert(item))); return reply.send(result);
return reply.send({ items: results });
}, },
}); });
}; };

View File

@@ -27,10 +27,10 @@ type DocumentRow = {
id: string; id: string;
type: string; type: string;
source: string | null; source: string | null;
data: string; data: string | unknown;
createdAt: string; createdAt: string | Date;
updatedAt: string; updatedAt: string | Date;
deletedAt: string | null; deletedAt: string | Date | null;
}; };
type Tables = { type Tables = {

View File

@@ -1,4 +1,5 @@
import equal from 'fast-deep-equal'; import equal from 'fast-deep-equal';
import Queue from 'p-queue';
import type { UpsertDocumentRequest, UpsertDocumentResponse } from './documents.schemas.ts'; import type { UpsertDocumentRequest, UpsertDocumentResponse } from './documents.schemas.ts';
@@ -7,71 +8,76 @@ import type { Services } from '#root/utils/utils.services.ts';
class DocumentsService { class DocumentsService {
#services: Services; #services: Services;
#queue: Queue;
constructor(services: Services) { constructor(services: Services) {
this.#services = services; this.#services = services;
this.#queue = new Queue({
concurrency: 10,
});
} }
public upsert = async (document: UpsertDocumentRequest): Promise<UpsertDocumentResponse> => { public upsert = (document: UpsertDocumentRequest): Promise<UpsertDocumentResponse> =>
const dbService = this.#services.get(DatabaseService); this.#queue.add(async () => {
const db = await dbService.getInstance(); const dbService = this.#services.get(DatabaseService);
const db = await dbService.getInstance();
const id = document.id || crypto.randomUUID(); const id = document.id || crypto.randomUUID();
const [current] = await db<Tables['document']>(tableNames).where({ const [current] = await db<Tables['document']>(tableNames).where({
id,
type: document.type,
});
const now = new Date();
if (!current) {
await db<Tables['document']>(tableNames.documents).insert({
id, id,
type: document.type, type: document.type,
createdAt: now.toISOString(),
updatedAt: now.toISOString(),
data: JSON.stringify(document.data),
}); });
return { const now = new Date();
data: document.data,
id, if (!current) {
type: document.type, await db<Tables['document']>(tableNames.documents).insert({
source: document.source || null, id,
createdAt: now.toISOString(), type: document.type,
updatedAt: now.toISOString(), createdAt: now.toISOString(),
deletedAt: null, updatedAt: now.toISOString(),
action: 'inserted', data: JSON.stringify(document.data),
}; });
} return {
const currentData = JSON.parse(current.data); data: document.data,
if (equal(currentData, document.data)) { id,
type: document.type,
source: document.source || null,
createdAt: now.toISOString(),
updatedAt: now.toISOString(),
deletedAt: null,
action: 'inserted',
};
}
const currentData = typeof current.data === 'string' ? JSON.parse(current.data) : current.data;
if (equal(currentData, document.data)) {
return {
...current,
data: currentData,
id,
createdAt: new Date(current.createdAt).toISOString(),
updatedAt: new Date(current.updatedAt).toISOString(),
deletedAt: current.deletedAt ? new Date(current.deletedAt).toISOString() : null,
action: 'skipped',
};
}
await db<Tables['document']>(tableNames.documents)
.update({
source: document.source,
data: JSON.stringify(document.data),
updatedAt: now.toISOString(),
})
.where({ id, type: document.type });
return { return {
...current, ...current,
data: currentData,
id, id,
createdAt: current.createdAt, data: document.data,
updatedAt: current.updatedAt, createdAt: new Date(current.createdAt).toISOString(),
deletedAt: current.deletedAt || null,
action: 'skipped',
};
}
await db<Tables['document']>(tableNames.documents)
.update({
source: document.source,
data: JSON.stringify(document.data),
updatedAt: now.toISOString(), updatedAt: now.toISOString(),
}) deletedAt: current.deletedAt ? new Date(current.deletedAt).toISOString() : null,
.where({ id, type: document.type }); action: 'updated',
return { };
...current, });
id,
data: document.data,
createdAt: current.createdAt,
updatedAt: now.toISOString(),
deletedAt: current.deletedAt || null,
action: 'updated',
};
};
} }
export { DocumentsService }; export { DocumentsService };

23
pnpm-lock.yaml generated
View File

@@ -73,6 +73,9 @@ importers:
knex: knex:
specifier: ^3.1.0 specifier: ^3.1.0
version: 3.1.0(better-sqlite3@12.4.1)(pg@8.16.3) version: 3.1.0(better-sqlite3@12.4.1)(pg@8.16.3)
p-queue:
specifier: ^9.0.0
version: 9.0.0
pg: pg:
specifier: ^8.16.3 specifier: ^8.16.3
version: 8.16.3 version: 8.16.3
@@ -1310,6 +1313,9 @@ packages:
resolution: {integrity: sha512-kVscqXk4OCp68SZ0dkgEKVi6/8ij300KBWTJq32P/dYeWTSwK41WyTxalN1eRmA5Z9UU/LX9D7FWSmV9SAYx6g==} resolution: {integrity: sha512-kVscqXk4OCp68SZ0dkgEKVi6/8ij300KBWTJq32P/dYeWTSwK41WyTxalN1eRmA5Z9UU/LX9D7FWSmV9SAYx6g==}
engines: {node: '>=0.10.0'} engines: {node: '>=0.10.0'}
eventemitter3@5.0.1:
resolution: {integrity: sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==}
execa@5.1.1: execa@5.1.1:
resolution: {integrity: sha512-8uSpZZocAZRBAPIEINJj3Lo9HyGitllczc27Eh5YYojjMFMn8yHMDMaUHE2Jqfq05D/wucwI4JGURyXt1vchyg==} resolution: {integrity: sha512-8uSpZZocAZRBAPIEINJj3Lo9HyGitllczc27Eh5YYojjMFMn8yHMDMaUHE2Jqfq05D/wucwI4JGURyXt1vchyg==}
engines: {node: '>=10'} engines: {node: '>=10'}
@@ -2005,6 +2011,14 @@ packages:
resolution: {integrity: sha512-y3b8Kpd8OAN444hxfBbFfj1FY/RjtTd8tzYwhUqNYXx0fXx2iX4maP4Qr6qhIKbQXI02wTLAda4fYUbDagTUFw==} resolution: {integrity: sha512-y3b8Kpd8OAN444hxfBbFfj1FY/RjtTd8tzYwhUqNYXx0fXx2iX4maP4Qr6qhIKbQXI02wTLAda4fYUbDagTUFw==}
engines: {node: '>=6'} engines: {node: '>=6'}
p-queue@9.0.0:
resolution: {integrity: sha512-KO1RyxstL9g1mK76530TExamZC/S2Glm080Nx8PE5sTd7nlduDQsAfEl4uXX+qZjLiwvDauvzXavufy3+rJ9zQ==}
engines: {node: '>=20'}
p-timeout@7.0.1:
resolution: {integrity: sha512-AxTM2wDGORHGEkPCt8yqxOTMgpfbEHqF51f/5fJCmwFC3C/zNcGT63SymH2ttOAaiIws2zVg4+izQCjrakcwHg==}
engines: {node: '>=20'}
parent-module@1.0.1: parent-module@1.0.1:
resolution: {integrity: sha512-GQ2EWRpQV8/o+Aw8YqtfZZPfNRWZYkbidE9k5rpl/hC3vtHHBfGm2Ifi6qWV+coDGkrUKZAxE3Lot5kcsRlh+g==} resolution: {integrity: sha512-GQ2EWRpQV8/o+Aw8YqtfZZPfNRWZYkbidE9k5rpl/hC3vtHHBfGm2Ifi6qWV+coDGkrUKZAxE3Lot5kcsRlh+g==}
engines: {node: '>=6'} engines: {node: '>=6'}
@@ -4113,6 +4127,8 @@ snapshots:
esutils@2.0.3: {} esutils@2.0.3: {}
eventemitter3@5.0.1: {}
execa@5.1.1: execa@5.1.1:
dependencies: dependencies:
cross-spawn: 7.0.6 cross-spawn: 7.0.6
@@ -4784,6 +4800,13 @@ snapshots:
p-map@2.1.0: {} p-map@2.1.0: {}
p-queue@9.0.0:
dependencies:
eventemitter3: 5.0.1
p-timeout: 7.0.1
p-timeout@7.0.1: {}
parent-module@1.0.1: parent-module@1.0.1:
dependencies: dependencies:
callsites: 3.1.0 callsites: 3.1.0

View File

@@ -3,3 +3,4 @@ packages:
- ./apps/* - ./apps/*
onlyBuiltDependencies: onlyBuiltDependencies:
- better-sqlite3 - better-sqlite3
- esbuild