This commit is contained in:
Morten Olsen
2024-01-12 12:43:51 +01:00
commit 6d8e5bf955
109 changed files with 9246 additions and 0 deletions

View File

@@ -0,0 +1,22 @@
import { z } from 'zod';
const addArtifactSchema = z.object({
name: z.string(),
runId: z.string(),
loadId: z.string(),
data: z.string(),
});
const findArtifactsSchema = z.object({
ids: z.array(z.string()).optional(),
runId: z.string().optional(),
loadId: z.string().optional(),
offset: z.number().optional(),
limit: z.number().optional(),
});
type AddArtifactOptions = z.infer<typeof addArtifactSchema>;
type FindArtifactsOptions = z.infer<typeof findArtifactsSchema>;
export type { AddArtifactOptions, FindArtifactsOptions };
export { addArtifactSchema, findArtifactsSchema };

View File

@@ -0,0 +1,116 @@
import { EventEmitter } from 'eventemitter3';
import { Database } from '../../database/database.js';
import { nanoid } from 'nanoid';
import { AddArtifactOptions, FindArtifactsOptions } from './artifacts.schemas.js';
import { createHash } from 'crypto';
type ArtifactRepoEvents = {};
type ArtifactRepoOptions = {
database: Database;
};
class ArtifactRepo extends EventEmitter<ArtifactRepoEvents> {
#options: ArtifactRepoOptions;
constructor(options: ArtifactRepoOptions) {
super();
this.#options = options;
}
public add = async (options: AddArtifactOptions) => {
const { database } = this.#options;
const db = await database.instance;
const id = nanoid();
await db('artifacts').insert({
id,
name: options.name,
runId: options.runId,
loadId: options.loadId,
data: Buffer.from(options.data).toString('base64'),
createdAt: new Date(),
});
};
public prepareRemove = async (options: FindArtifactsOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('artifacts').select('id');
if (options.ids) {
query.whereIn('id', options.ids);
}
if (options.runId) {
query.where({ runId: options.runId });
}
if (options.loadId) {
query.where({ loadId: options.loadId });
}
if (options.offset) {
query.offset(options.offset);
}
if (options.limit) {
query.limit(options.limit);
}
const ids = await query;
const token = ids.map((id) => Buffer.from(id.id).toString('base64')).join('|');
const hash = createHash('sha256').update(token).digest('hex');
return {
ids,
hash,
};
};
public remove = async (hash: string, ids: string[]) => {
const { database } = this.#options;
const db = await database.instance;
const token = ids.map((id) => Buffer.from(id).toString('base64')).join('|');
const actualHash = createHash('sha256').update(token).digest('hex');
if (hash !== actualHash) {
throw new Error('Invalid hash');
}
await db('artifacts').whereIn('id', ids).delete();
};
public find = async (options: FindArtifactsOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('artifacts').select(['id', 'name', 'runId', 'loadId']);
if (options.ids) {
query.whereIn('id', options.ids);
}
if (options.runId) {
query.where({ runId: options.runId });
}
if (options.loadId) {
query.where({ loadId: options.loadId });
}
if (options.offset) {
query.offset(options.offset);
}
if (options.limit) {
query.limit(options.limit);
}
const results = await query;
return results;
};
}
export { addArtifactSchema, findArtifactsSchema } from './artifacts.schemas.js';
export { ArtifactRepo };

View File

@@ -0,0 +1,18 @@
import { z } from 'zod';
const setLoadSchema = z.object({
id: z.string(),
name: z.string().optional(),
script: z.string(),
});
const findLoadsSchema = z.object({
limit: z.number().optional(),
offset: z.number().optional(),
});
type SetLoadOptions = z.infer<typeof setLoadSchema>;
type FindLoadsOptions = z.infer<typeof findLoadsSchema>;
export type { SetLoadOptions, FindLoadsOptions };
export { setLoadSchema, findLoadsSchema };

View File

@@ -0,0 +1,94 @@
import { EventEmitter } from 'eventemitter3';
import { Database } from '../../database/database.js';
import { FindLoadsOptions, SetLoadOptions } from './loads.schemas.js';
import { nanoid } from 'nanoid';
import { createHash } from 'crypto';
import { Config } from '../../config/config.js';
import { mkdir, writeFile } from 'fs/promises';
import { resolve } from 'path';
type LoadRepoEvents = {
created: (id: string) => void;
updated: (id: string) => void;
deleted: (id: string) => void;
};
type LoadRepoOptions = {
database: Database;
config: Config;
};
class LoadRepo extends EventEmitter<LoadRepoEvents> {
#options: LoadRepoOptions;
constructor(options: LoadRepoOptions) {
super();
this.#options = options;
}
public getById = async (id: string) => {
const { database } = this.#options;
const db = await database.instance;
const loads = await db('loads').where({ id }).first();
return loads;
};
public getScript = async (id: string) => {
const load = await this.getById(id);
return load?.script;
};
public find = async (options: FindLoadsOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('loads').select(['id', 'name']);
if (options.offset) {
query.offset(options.offset);
}
if (options.limit) {
query.limit(options.limit);
}
const loads = await query;
return loads;
};
public set = async (options: SetLoadOptions) => {
const { database } = this.#options;
const db = await database.instance;
const id = options.id || nanoid();
const script = createHash('sha256').update(options.script).digest('hex');
const scriptDir = resolve(this.#options.config.files.location, 'scripts');
await mkdir(scriptDir, { recursive: true });
await writeFile(resolve(scriptDir, `${script}.js`), options.script);
const current = await this.getById(id);
if (current) {
await db('loads').where({ id }).update({
name: options.name,
script,
updatedAt: new Date(),
});
this.emit('updated', id);
return id;
} else {
await db('loads').insert({
id,
name: options.name,
updatedAt: new Date(),
createdAt: new Date(),
script,
});
}
this.emit('updated', id);
return id;
};
}
export { setLoadSchema, findLoadsSchema } from './loads.schemas.js';
export { LoadRepo };

View File

@@ -0,0 +1,25 @@
import { z } from 'zod';
const addLogSchema = z.object({
runId: z.string(),
loadId: z.string(),
severity: z.enum(['info', 'warning', 'error']),
message: z.string(),
data: z.any().optional(),
});
const findLogsSchema = z.object({
ids: z.array(z.string()).optional(),
runId: z.string().optional(),
loadId: z.string().optional(),
severities: z.array(z.enum(['debug', 'info', 'warn', 'error'])).optional(),
offset: z.number().optional(),
limit: z.number().optional(),
order: z.enum(['asc', 'desc']).optional(),
});
type AddLogOptions = z.infer<typeof addLogSchema>;
type FindLogsOptions = z.infer<typeof findLogsSchema>;
export type { AddLogOptions, FindLogsOptions };
export { addLogSchema, findLogsSchema };

View File

@@ -0,0 +1,122 @@
import { EventEmitter } from 'eventemitter3';
import { Database } from '../../database/database.js';
import { AddLogOptions, FindLogsOptions } from './logs.schemas.js';
import { nanoid } from 'nanoid';
import { createHash } from 'crypto';
type LogRepoEvents = {};
type LogRepoOptions = {
database: Database;
};
class LogRepo extends EventEmitter<LogRepoEvents> {
#options: LogRepoOptions;
constructor(options: LogRepoOptions) {
super();
this.#options = options;
}
public add = async (options: AddLogOptions) => {
const { database } = this.#options;
const db = await database.instance;
const id = nanoid();
await db('logs').insert({
id,
runId: options.runId,
loadId: options.loadId,
severity: options.severity,
message: options.message,
data: options.data,
timestamp: new Date(),
});
};
public prepareRemove = async (options: FindLogsOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('logs').select('id');
if (options.ids) {
query.whereIn('id', options.ids);
}
if (options.runId) {
query.where({ runId: options.runId });
}
if (options.loadId) {
query.where({ loadId: options.loadId });
}
if (options.severities) {
query.whereIn('severity', options.severities);
}
const ids = await query;
const token = ids.map((id) => Buffer.from(id.id).toString('base64')).join('|');
const hash = createHash('sha256').update(token).digest('hex');
return {
ids,
hash,
};
};
public remove = async (hash: string, ids: string[]) => {
const { database } = this.#options;
const db = await database.instance;
const token = ids.map((id) => Buffer.from(id).toString('base64')).join('|');
const actualHash = createHash('sha256').update(token).digest('hex');
if (hash !== actualHash) {
throw new Error('Invalid hash');
}
await db('logs').whereIn('id', ids).delete();
};
public find = async (options: FindLogsOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('logs');
if (options.ids) {
query.whereIn('id', options.ids);
}
if (options.runId) {
query.where({ runId: options.runId });
}
if (options.loadId) {
query.where({ loadId: options.loadId });
}
if (options.severities) {
query.whereIn('severity', options.severities);
}
if (options.offset) {
query.offset(options.offset);
}
if (options.limit) {
query.limit(options.limit);
}
if (options.order) {
query.orderBy('timestamp', options.order);
}
const logs = await query;
return logs;
};
}
export { addLogSchema, findLogsSchema } from './logs.schemas.js';
export { LogRepo };

View File

@@ -0,0 +1,66 @@
import { Config } from '../config/config.js';
import { Database } from '../database/database.js';
import { ArtifactRepo } from './artifacts/artifacts.js';
import { LoadRepo } from './loads/loads.js';
import { LogRepo } from './logs/logs.js';
import { RunRepo } from './runs/runs.js';
import { SecretRepo } from './secrets/secrets.js';
type ReposOptions = {
database: Database;
config: Config;
};
class Repos {
#loads: LoadRepo;
#runs: RunRepo;
#logs: LogRepo;
#artifacts: ArtifactRepo;
#secrets: SecretRepo;
constructor({ database, config }: ReposOptions) {
this.#loads = new LoadRepo({
database,
config,
});
this.#runs = new RunRepo({
database,
loads: this.#loads,
});
this.#logs = new LogRepo({
database,
});
this.#artifacts = new ArtifactRepo({
database,
});
this.#secrets = new SecretRepo({
database,
});
}
public get loads() {
return this.#loads;
}
public get runs() {
return this.#runs;
}
public get logs() {
return this.#logs;
}
public get artifacts() {
return this.#artifacts;
}
public get secrets() {
return this.#secrets;
}
}
export { findLogsSchema, addLogSchema } from './logs/logs.js';
export { setLoadSchema, findLoadsSchema } from './loads/loads.js';
export { createRunSchema, findRunsSchema } from './runs/runs.js';
export { addArtifactSchema, findArtifactsSchema } from './artifacts/artifacts.js';
export { Repos };

View File

@@ -0,0 +1,28 @@
import { z } from 'zod';
const runStatusSchema = z.enum(['running', 'succeeded', 'failed']);
const createRunSchema = z.object({
loadId: z.string(),
config: z.any().optional(),
data: z.any().optional(),
});
const updateRunSchema = z.object({
status: runStatusSchema,
error: z.string().optional(),
});
const findRunsSchema = z.object({
loadId: z.string().optional(),
offset: z.number().optional(),
limit: z.number().optional(),
});
type RunStatus = z.infer<typeof runStatusSchema>;
type CreateRunOptions = z.infer<typeof createRunSchema>;
type UpdateRunOptions = z.infer<typeof updateRunSchema>;
type FindRunsOptions = z.infer<typeof findRunsSchema>;
export type { RunStatus, CreateRunOptions, UpdateRunOptions, FindRunsOptions };
export { runStatusSchema, createRunSchema, updateRunSchema, findRunsSchema };

View File

@@ -0,0 +1,140 @@
import { nanoid } from 'nanoid';
import { EventEmitter } from 'eventemitter3';
import { Database } from '../../database/database.js';
import { CreateRunOptions, FindRunsOptions, UpdateRunOptions } from './runs.schemas.js';
import { LoadRepo } from '../loads/loads.js';
type RunRepoEvents = {
created: (args: { id: string; loadId: string }) => void;
updated: (args: { id: string; loadId: string }) => void;
failed: (args: { id: string; loadId: string }) => void;
succeeded: (args: { id: string; loadId: string }) => void;
};
type RunRepoOptions = {
database: Database;
loads: LoadRepo;
};
class RunRepo extends EventEmitter<RunRepoEvents> {
#options: RunRepoOptions;
constructor(options: RunRepoOptions) {
super();
this.#options = options;
}
public getById = async (id: string) => {
const { database } = this.#options;
const db = await database.instance;
const run = await db('runs').where({ id }).first();
if (!run) {
throw new Error('Run not found');
}
return run;
};
public getByLoadId = async (loadId: string) => {
const { database } = this.#options;
const db = await database.instance;
const runs = await db('runs').where({ loadId });
return runs;
};
public find = async (options: FindRunsOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('runs').select(['id', 'status', 'startedAt', 'status', 'error', 'endedAt']);
if (options.loadId) {
query.where({ loadId: options.loadId });
}
if (options.offset) {
query.offset(options.offset);
}
if (options.limit) {
query.limit(options.limit);
}
const runs = await query;
return runs;
};
public remove = async (options: FindRunsOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('runs');
if (options.loadId) {
query.where({ loadId: options.loadId });
}
await query.del();
};
public started = async (id: string) => {
const { database } = this.#options;
const db = await database.instance;
const current = await this.getById(id);
if (!current) {
throw new Error('Run not found');
}
const { loadId } = current;
const runs = await db('runs').where({ id }).update({
status: 'running',
startedAt: new Date(),
});
this.emit('updated', { id, loadId });
return runs;
};
public finished = async (id: string, options: UpdateRunOptions) => {
const { database } = this.#options;
const db = await database.instance;
const { loadId } = await this.getById(id);
const runs = await db('runs').where({ id }).update({
status: options.status,
error: options.error,
endedAt: new Date(),
});
this.emit('updated', { id, loadId });
switch (options.status) {
case 'failed':
this.emit('failed', { id, loadId });
break;
case 'succeeded':
this.emit('succeeded', { id, loadId });
break;
}
return runs;
};
public create = async (options: CreateRunOptions) => {
const { database, loads } = this.#options;
const id = nanoid();
const db = await database.instance;
const script = await loads.getScript(options.loadId);
await db('runs').insert({
id,
script,
loadId: options.loadId,
status: 'created',
createdAt: new Date(),
});
this.emit('created', {
id,
loadId: options.loadId,
});
return id;
};
}
export { createRunSchema, findRunsSchema } from './runs.schemas.js';
export { RunRepo };

View File

@@ -0,0 +1,17 @@
import { z } from 'zod';
const setSecretSchema = z.object({
id: z.string(),
value: z.string(),
});
const findSecretsSchema = z.object({
offset: z.number().optional(),
limit: z.number().optional(),
});
type SetSecretOptions = z.infer<typeof setSecretSchema>;
type FindSecretOptions = z.infer<typeof findSecretsSchema>;
export type { SetSecretOptions, FindSecretOptions };
export { setSecretSchema, findSecretsSchema };

View File

@@ -0,0 +1,76 @@
import { EventEmitter } from 'eventemitter3';
import { Database } from '../../database/database.js';
import { FindSecretOptions, SetSecretOptions } from './secrets.schemas.js';
type LogRepoEvents = {};
type LogRepoOptions = {
database: Database;
};
class SecretRepo extends EventEmitter<LogRepoEvents> {
#options: LogRepoOptions;
constructor(options: LogRepoOptions) {
super();
this.#options = options;
}
public set = async (options: SetSecretOptions) => {
const { database } = this.#options;
const db = await database.instance;
const current = await db('secrets').where('id', options.id).first();
if (current) {
await db('secrets').where('id', options.id).update({
value: options.value,
updatedAt: new Date(),
});
} else {
await db('secrets').insert({
id: options.id,
value: options.value,
createdAt: new Date(),
updatedAt: new Date(),
});
}
};
public remove = async (id: string) => {
const { database } = this.#options;
const db = await database.instance;
await db('logs').where('id', id).delete();
};
public getAll = async () => {
const { database } = this.#options;
const db = await database.instance;
const secrets = await db('secrets').select('id', 'value');
return secrets.reduce((acc, secret) => {
acc[secret.id] = secret.value;
return acc;
}, {} as Record<string, string>);
};
public find = async (options: FindSecretOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('secrets').select('id');
if (options.offset) {
query.offset(options.offset);
}
if (options.limit) {
query.limit(options.limit);
}
const secrets = await query;
return secrets;
};
}
export { findSecretsSchema, setSecretSchema } from './secrets.schemas.js';
export { SecretRepo };