mirror of
https://github.com/morten-olsen/reservoir.git
synced 2026-02-08 01:46:24 +01:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
daa816ac61 | ||
|
|
75d24c31c2 | ||
|
|
2f542c3066 |
@@ -34,6 +34,7 @@
|
||||
"fastify": "^5.6.1",
|
||||
"fastify-type-provider-zod": "^6.1.0",
|
||||
"knex": "^3.1.0",
|
||||
"p-queue": "^9.0.0",
|
||||
"pg": "^8.16.3",
|
||||
"pino": "^10.1.0",
|
||||
"pino-pretty": "^13.1.2",
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import { z } from 'zod';
|
||||
import type { FastifyPluginAsyncZod } from 'fastify-type-provider-zod';
|
||||
|
||||
import { DocumentsService } from '#root/services/documents/documents.ts';
|
||||
@@ -12,23 +11,18 @@ const documentsPlugin: FastifyPluginAsyncZod = async (app) => {
|
||||
method: 'POST',
|
||||
url: '',
|
||||
schema: {
|
||||
operationId: 'v1.documents.put',
|
||||
operationId: 'v1.documents.post',
|
||||
tags: ['documents'],
|
||||
summary: 'Upsert documents',
|
||||
body: z.object({
|
||||
items: z.array(upsertDocumentRequestSchema),
|
||||
}),
|
||||
body: upsertDocumentRequestSchema,
|
||||
response: {
|
||||
200: z.object({
|
||||
items: z.array(upsertDocumentResponseSchema),
|
||||
}),
|
||||
200: upsertDocumentResponseSchema,
|
||||
},
|
||||
},
|
||||
handler: async (req, reply) => {
|
||||
const documentsService = app.services.get(DocumentsService);
|
||||
const { items } = req.body;
|
||||
const results = await Promise.all(items.map((item) => documentsService.upsert(item)));
|
||||
return reply.send({ items: results });
|
||||
const result = await documentsService.upsert(req.body);
|
||||
return reply.send(result);
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
@@ -27,10 +27,10 @@ type DocumentRow = {
|
||||
id: string;
|
||||
type: string;
|
||||
source: string | null;
|
||||
data: string;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
deletedAt: string | null;
|
||||
data: string | unknown;
|
||||
createdAt: string | Date;
|
||||
updatedAt: string | Date;
|
||||
deletedAt: string | Date | null;
|
||||
};
|
||||
|
||||
type Tables = {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import equal from 'fast-deep-equal';
|
||||
import Queue from 'p-queue';
|
||||
|
||||
import type { UpsertDocumentRequest, UpsertDocumentResponse } from './documents.schemas.ts';
|
||||
|
||||
@@ -7,71 +8,76 @@ import type { Services } from '#root/utils/utils.services.ts';
|
||||
|
||||
class DocumentsService {
|
||||
#services: Services;
|
||||
#queue: Queue;
|
||||
|
||||
constructor(services: Services) {
|
||||
this.#services = services;
|
||||
this.#queue = new Queue({
|
||||
concurrency: 10,
|
||||
});
|
||||
}
|
||||
|
||||
public upsert = async (document: UpsertDocumentRequest): Promise<UpsertDocumentResponse> => {
|
||||
const dbService = this.#services.get(DatabaseService);
|
||||
const db = await dbService.getInstance();
|
||||
public upsert = (document: UpsertDocumentRequest): Promise<UpsertDocumentResponse> =>
|
||||
this.#queue.add(async () => {
|
||||
const dbService = this.#services.get(DatabaseService);
|
||||
const db = await dbService.getInstance();
|
||||
|
||||
const id = document.id || crypto.randomUUID();
|
||||
const id = document.id || crypto.randomUUID();
|
||||
|
||||
const [current] = await db<Tables['document']>(tableNames).where({
|
||||
id,
|
||||
type: document.type,
|
||||
});
|
||||
const now = new Date();
|
||||
|
||||
if (!current) {
|
||||
await db<Tables['document']>(tableNames.documents).insert({
|
||||
const [current] = await db<Tables['document']>(tableNames).where({
|
||||
id,
|
||||
type: document.type,
|
||||
createdAt: now.toISOString(),
|
||||
updatedAt: now.toISOString(),
|
||||
data: JSON.stringify(document.data),
|
||||
});
|
||||
return {
|
||||
data: document.data,
|
||||
id,
|
||||
type: document.type,
|
||||
source: document.source || null,
|
||||
createdAt: now.toISOString(),
|
||||
updatedAt: now.toISOString(),
|
||||
deletedAt: null,
|
||||
action: 'inserted',
|
||||
};
|
||||
}
|
||||
const currentData = JSON.parse(current.data);
|
||||
if (equal(currentData, document.data)) {
|
||||
const now = new Date();
|
||||
|
||||
if (!current) {
|
||||
await db<Tables['document']>(tableNames.documents).insert({
|
||||
id,
|
||||
type: document.type,
|
||||
createdAt: now.toISOString(),
|
||||
updatedAt: now.toISOString(),
|
||||
data: JSON.stringify(document.data),
|
||||
});
|
||||
return {
|
||||
data: document.data,
|
||||
id,
|
||||
type: document.type,
|
||||
source: document.source || null,
|
||||
createdAt: now.toISOString(),
|
||||
updatedAt: now.toISOString(),
|
||||
deletedAt: null,
|
||||
action: 'inserted',
|
||||
};
|
||||
}
|
||||
const currentData = typeof current.data === 'string' ? JSON.parse(current.data) : current.data;
|
||||
if (equal(currentData, document.data)) {
|
||||
return {
|
||||
...current,
|
||||
data: currentData,
|
||||
id,
|
||||
createdAt: new Date(current.createdAt).toISOString(),
|
||||
updatedAt: new Date(current.updatedAt).toISOString(),
|
||||
deletedAt: current.deletedAt ? new Date(current.deletedAt).toISOString() : null,
|
||||
action: 'skipped',
|
||||
};
|
||||
}
|
||||
await db<Tables['document']>(tableNames.documents)
|
||||
.update({
|
||||
source: document.source,
|
||||
data: JSON.stringify(document.data),
|
||||
updatedAt: now.toISOString(),
|
||||
})
|
||||
.where({ id, type: document.type });
|
||||
return {
|
||||
...current,
|
||||
data: currentData,
|
||||
id,
|
||||
createdAt: current.createdAt,
|
||||
updatedAt: current.updatedAt,
|
||||
deletedAt: current.deletedAt || null,
|
||||
action: 'skipped',
|
||||
};
|
||||
}
|
||||
await db<Tables['document']>(tableNames.documents)
|
||||
.update({
|
||||
source: document.source,
|
||||
data: JSON.stringify(document.data),
|
||||
data: document.data,
|
||||
createdAt: new Date(current.createdAt).toISOString(),
|
||||
updatedAt: now.toISOString(),
|
||||
})
|
||||
.where({ id, type: document.type });
|
||||
return {
|
||||
...current,
|
||||
id,
|
||||
data: document.data,
|
||||
createdAt: current.createdAt,
|
||||
updatedAt: now.toISOString(),
|
||||
deletedAt: current.deletedAt || null,
|
||||
action: 'updated',
|
||||
};
|
||||
};
|
||||
deletedAt: current.deletedAt ? new Date(current.deletedAt).toISOString() : null,
|
||||
action: 'updated',
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
export { DocumentsService };
|
||||
|
||||
23
pnpm-lock.yaml
generated
23
pnpm-lock.yaml
generated
@@ -73,6 +73,9 @@ importers:
|
||||
knex:
|
||||
specifier: ^3.1.0
|
||||
version: 3.1.0(better-sqlite3@12.4.1)(pg@8.16.3)
|
||||
p-queue:
|
||||
specifier: ^9.0.0
|
||||
version: 9.0.0
|
||||
pg:
|
||||
specifier: ^8.16.3
|
||||
version: 8.16.3
|
||||
@@ -1310,6 +1313,9 @@ packages:
|
||||
resolution: {integrity: sha512-kVscqXk4OCp68SZ0dkgEKVi6/8ij300KBWTJq32P/dYeWTSwK41WyTxalN1eRmA5Z9UU/LX9D7FWSmV9SAYx6g==}
|
||||
engines: {node: '>=0.10.0'}
|
||||
|
||||
eventemitter3@5.0.1:
|
||||
resolution: {integrity: sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==}
|
||||
|
||||
execa@5.1.1:
|
||||
resolution: {integrity: sha512-8uSpZZocAZRBAPIEINJj3Lo9HyGitllczc27Eh5YYojjMFMn8yHMDMaUHE2Jqfq05D/wucwI4JGURyXt1vchyg==}
|
||||
engines: {node: '>=10'}
|
||||
@@ -2005,6 +2011,14 @@ packages:
|
||||
resolution: {integrity: sha512-y3b8Kpd8OAN444hxfBbFfj1FY/RjtTd8tzYwhUqNYXx0fXx2iX4maP4Qr6qhIKbQXI02wTLAda4fYUbDagTUFw==}
|
||||
engines: {node: '>=6'}
|
||||
|
||||
p-queue@9.0.0:
|
||||
resolution: {integrity: sha512-KO1RyxstL9g1mK76530TExamZC/S2Glm080Nx8PE5sTd7nlduDQsAfEl4uXX+qZjLiwvDauvzXavufy3+rJ9zQ==}
|
||||
engines: {node: '>=20'}
|
||||
|
||||
p-timeout@7.0.1:
|
||||
resolution: {integrity: sha512-AxTM2wDGORHGEkPCt8yqxOTMgpfbEHqF51f/5fJCmwFC3C/zNcGT63SymH2ttOAaiIws2zVg4+izQCjrakcwHg==}
|
||||
engines: {node: '>=20'}
|
||||
|
||||
parent-module@1.0.1:
|
||||
resolution: {integrity: sha512-GQ2EWRpQV8/o+Aw8YqtfZZPfNRWZYkbidE9k5rpl/hC3vtHHBfGm2Ifi6qWV+coDGkrUKZAxE3Lot5kcsRlh+g==}
|
||||
engines: {node: '>=6'}
|
||||
@@ -4113,6 +4127,8 @@ snapshots:
|
||||
|
||||
esutils@2.0.3: {}
|
||||
|
||||
eventemitter3@5.0.1: {}
|
||||
|
||||
execa@5.1.1:
|
||||
dependencies:
|
||||
cross-spawn: 7.0.6
|
||||
@@ -4784,6 +4800,13 @@ snapshots:
|
||||
|
||||
p-map@2.1.0: {}
|
||||
|
||||
p-queue@9.0.0:
|
||||
dependencies:
|
||||
eventemitter3: 5.0.1
|
||||
p-timeout: 7.0.1
|
||||
|
||||
p-timeout@7.0.1: {}
|
||||
|
||||
parent-module@1.0.1:
|
||||
dependencies:
|
||||
callsites: 3.1.0
|
||||
|
||||
@@ -3,3 +3,4 @@ packages:
|
||||
- ./apps/*
|
||||
onlyBuiltDependencies:
|
||||
- better-sqlite3
|
||||
- esbuild
|
||||
|
||||
Reference in New Issue
Block a user