This commit is contained in:
Morten Olsen
2024-01-11 09:03:14 +01:00
commit 676a7e09b5
97 changed files with 8291 additions and 0 deletions

View File

@@ -0,0 +1,50 @@
import { nanoid } from 'nanoid';
import { resolve } from 'path';
import { existsSync } from 'fs';
import { mkdir, readFile, writeFile } from 'fs/promises';
import jwt from 'jsonwebtoken';
import { Config } from '../config/config.js';
type AuthOptions = {
config: Config;
};
class Auth {
#options: AuthOptions;
#data: Promise<{ secret: string }>;
constructor(options: AuthOptions) {
this.#options = options;
this.#data = this.#setup();
}
#setup = async () => {
const { config } = this.#options;
const secretLocation = resolve(config.files.location, 'secret');
let secret = '';
if (!existsSync(secretLocation)) {
await mkdir(config.files.location, { recursive: true });
secret = nanoid();
await writeFile(secretLocation, secret);
} else {
secret = await readFile(secretLocation, 'utf-8');
}
return {
secret,
};
};
public createToken = async (data: object) => {
const { secret } = await this.#data;
const token = jwt.sign(data, secret);
return token;
};
public validateToken = async (token: string) => {
const { secret } = await this.#data;
const result = jwt.verify(token, secret);
return result as object;
};
}
export { Auth };

View File

@@ -0,0 +1,19 @@
import { Knex } from 'knex';
type Config = {
database: Omit<Knex.Config, 'migrations'>;
files: {
location: string;
};
auth?: {
oidc?: {
issuer: string;
login?: {
clientId: string;
clientSecret: string;
};
};
};
};
export type { Config };

View File

@@ -0,0 +1,36 @@
import knex, { Knex } from 'knex';
import { source } from './migrations/migrations.source.js';
const tableNames = {
loads: 'loads',
};
class Database {
#instance?: Promise<Knex>;
#config: Knex.Config;
constructor(config: Knex.Config) {
this.#config = {
...config,
migrations: {
migrationSource: source,
},
};
}
#setup = async (config: Knex.Config) => {
const db = knex(config);
await db.migrate.latest();
return db;
};
public get instance() {
if (!this.#instance) {
this.#instance = this.#setup(this.#config);
}
return this.#instance;
}
}
export { Database, tableNames };

View File

@@ -0,0 +1,68 @@
import { Knex } from 'knex';
const name = 'init';
const up = async (knex: Knex) => {
await knex.schema.createTable('loads', (table) => {
table.string('id').primary();
table.string('name').nullable();
table.string('script').notNullable();
table.timestamp('createdAt').notNullable();
table.timestamp('updatedAt').notNullable();
});
await knex.schema.createTable('runs', (table) => {
table.string('id').primary();
table.string('loadId').notNullable();
table.string('status').notNullable();
table.string('script').notNullable();
table.string('input').nullable();
table.string('error').nullable();
table.timestamp('createdAt').notNullable();
table.timestamp('startedAt').nullable();
table.timestamp('endedAt').nullable();
table.index('loadId');
table.index('status');
});
await knex.schema.createTable('logs', (table) => {
table.string('id').primary();
table.string('runId').notNullable();
table.string('loadId').notNullable();
table.string('severity').notNullable();
table.string('message').notNullable();
table.jsonb('data').nullable();
table.timestamp('timestamp').notNullable();
table.index('runId');
});
await knex.schema.createTable('artifacts', (table) => {
table.string('id').primary();
table.string('name').notNullable();
table.string('runId').notNullable();
table.string('loadId').notNullable();
table.text('data').notNullable();
table.timestamp('createdAt').notNullable();
table.index('runId');
});
await knex.schema.createTable('secrets', (table) => {
table.string('id').primary();
table.string('value').notNullable();
table.timestamp('createdAt').notNullable();
table.timestamp('updatedAt').notNullable();
});
};
const down = async (knex: Knex) => {
await knex.schema.dropTable('loads');
await knex.schema.dropTable('runs');
await knex.schema.dropTable('logs');
await knex.schema.dropTable('artifacts');
await knex.schema.dropTable('secrets');
};
export { name, up, down };

View File

@@ -0,0 +1,19 @@
import { Knex } from 'knex';
import * as init from './migration.init.js';
type Migration = {
name: string;
up: (knex: Knex) => Promise<void>;
down: (knex: Knex) => Promise<void>;
};
const migrations = [init] satisfies Migration[];
const source: Knex.MigrationSource<Migration> = {
getMigrations: async () => migrations,
getMigration: async (migration) => migration,
getMigrationName: (migration) => migration.name,
};
export { source };

View File

@@ -0,0 +1,31 @@
import { program, Command } from 'commander';
import { Runtime } from './runtime/runtime.js';
import { createServer } from './server/server.js';
const start = new Command('start');
start.action(async () => {
const port = 4500;
const runtime = await Runtime.create();
const server = await createServer(runtime);
await server.listen({
port,
host: '0.0.0.0',
});
console.log(`Server listening on port ${port}`);
});
const createToken = new Command('create-token');
createToken.action(async () => {
const runtime = await Runtime.create();
const token = await runtime.auth.createToken({});
console.log(token);
});
program.addCommand(start);
program.addCommand(createToken);
await program.parseAsync(process.argv);
export type { Runtime } from './runtime/runtime.js';
export type { RootRouter } from './router/router.js';

47
packages/server/src/knex.d.ts vendored Normal file
View File

@@ -0,0 +1,47 @@
import 'knex';
declare module 'knex/types/tables.js' {
interface Tables {
loads: {
id: string;
name?: string;
script: string;
createdAt: Date;
updatedAt: Date;
};
runs: {
id: string;
loadId: string;
script: string;
input?: string;
error?: string;
createdAt: Date;
startedAt?: Date;
endedAt?: Date;
status: 'created' | 'running' | 'succeeded' | 'failed';
};
logs: {
id: string;
runId: string;
loadId: string;
severity: 'info' | 'warning' | 'error';
message: string;
data?: any;
timestamp: Date;
};
artifacts: {
id: string;
name: string;
createdAt: Date;
runId: string;
loadId: string;
data: string;
};
secrets: {
id: string;
value: string;
createdAt: Date;
updatedAt: Date;
};
}
}

View File

@@ -0,0 +1,22 @@
import { z } from 'zod';
const addArtifactSchema = z.object({
name: z.string(),
runId: z.string(),
loadId: z.string(),
data: z.string(),
});
const findArtifactsSchema = z.object({
ids: z.array(z.string()).optional(),
runId: z.string().optional(),
loadId: z.string().optional(),
offset: z.number().optional(),
limit: z.number().optional(),
});
type AddArtifactOptions = z.infer<typeof addArtifactSchema>;
type FindArtifactsOptions = z.infer<typeof findArtifactsSchema>;
export type { AddArtifactOptions, FindArtifactsOptions };
export { addArtifactSchema, findArtifactsSchema };

View File

@@ -0,0 +1,116 @@
import { EventEmitter } from 'eventemitter3';
import { Database } from '../../database/database.js';
import { nanoid } from 'nanoid';
import { AddArtifactOptions, FindArtifactsOptions } from './artifacts.schemas.js';
import { createHash } from 'crypto';
type ArtifactRepoEvents = {};
type ArtifactRepoOptions = {
database: Database;
};
class ArtifactRepo extends EventEmitter<ArtifactRepoEvents> {
#options: ArtifactRepoOptions;
constructor(options: ArtifactRepoOptions) {
super();
this.#options = options;
}
public add = async (options: AddArtifactOptions) => {
const { database } = this.#options;
const db = await database.instance;
const id = nanoid();
await db('artifacts').insert({
id,
name: options.name,
runId: options.runId,
loadId: options.loadId,
data: Buffer.from(options.data).toString('base64'),
createdAt: new Date(),
});
};
public prepareRemove = async (options: FindArtifactsOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('artifacts').select('id');
if (options.ids) {
query.whereIn('id', options.ids);
}
if (options.runId) {
query.where({ runId: options.runId });
}
if (options.loadId) {
query.where({ loadId: options.loadId });
}
if (options.offset) {
query.offset(options.offset);
}
if (options.limit) {
query.limit(options.limit);
}
const ids = await query;
const token = ids.map((id) => Buffer.from(id.id).toString('base64')).join('|');
const hash = createHash('sha256').update(token).digest('hex');
return {
ids,
hash,
};
};
public remove = async (hash: string, ids: string[]) => {
const { database } = this.#options;
const db = await database.instance;
const token = ids.map((id) => Buffer.from(id).toString('base64')).join('|');
const actualHash = createHash('sha256').update(token).digest('hex');
if (hash !== actualHash) {
throw new Error('Invalid hash');
}
await db('artifacts').whereIn('id', ids).delete();
};
public find = async (options: FindArtifactsOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('artifacts').select(['id', 'name', 'runId', 'loadId']);
if (options.ids) {
query.whereIn('id', options.ids);
}
if (options.runId) {
query.where({ runId: options.runId });
}
if (options.loadId) {
query.where({ loadId: options.loadId });
}
if (options.offset) {
query.offset(options.offset);
}
if (options.limit) {
query.limit(options.limit);
}
const results = await query;
return results;
};
}
export { addArtifactSchema, findArtifactsSchema } from './artifacts.schemas.js';
export { ArtifactRepo };

View File

@@ -0,0 +1,18 @@
import { z } from 'zod';
const setLoadSchema = z.object({
id: z.string(),
name: z.string().optional(),
script: z.string(),
});
const findLoadsSchema = z.object({
limit: z.number().optional(),
offset: z.number().optional(),
});
type SetLoadOptions = z.infer<typeof setLoadSchema>;
type FindLoadsOptions = z.infer<typeof findLoadsSchema>;
export type { SetLoadOptions, FindLoadsOptions };
export { setLoadSchema, findLoadsSchema };

View File

@@ -0,0 +1,94 @@
import { EventEmitter } from 'eventemitter3';
import { Database } from '../../database/database.js';
import { FindLoadsOptions, SetLoadOptions } from './loads.schemas.js';
import { nanoid } from 'nanoid';
import { createHash } from 'crypto';
import { Config } from '../../config/config.js';
import { mkdir, writeFile } from 'fs/promises';
import { resolve } from 'path';
type LoadRepoEvents = {
created: (id: string) => void;
updated: (id: string) => void;
deleted: (id: string) => void;
};
type LoadRepoOptions = {
database: Database;
config: Config;
};
class LoadRepo extends EventEmitter<LoadRepoEvents> {
#options: LoadRepoOptions;
constructor(options: LoadRepoOptions) {
super();
this.#options = options;
}
public getById = async (id: string) => {
const { database } = this.#options;
const db = await database.instance;
const loads = await db('loads').where({ id }).first();
return loads;
};
public getScript = async (id: string) => {
const load = await this.getById(id);
return load?.script;
};
public find = async (options: FindLoadsOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('loads').select(['id', 'name']);
if (options.offset) {
query.offset(options.offset);
}
if (options.limit) {
query.limit(options.limit);
}
const loads = await query;
return loads;
};
public set = async (options: SetLoadOptions) => {
const { database } = this.#options;
const db = await database.instance;
const id = options.id || nanoid();
const script = createHash('sha256').update(options.script).digest('hex');
const scriptDir = resolve(this.#options.config.files.location, 'scripts');
await mkdir(scriptDir, { recursive: true });
await writeFile(resolve(scriptDir, `${script}.js`), options.script);
const current = await this.getById(id);
if (current) {
await db('loads').where({ id }).update({
name: options.name,
script,
updatedAt: new Date(),
});
this.emit('updated', id);
return id;
} else {
await db('loads').insert({
id,
name: options.name,
updatedAt: new Date(),
createdAt: new Date(),
script,
});
}
this.emit('updated', id);
return id;
};
}
export { setLoadSchema, findLoadsSchema } from './loads.schemas.js';
export { LoadRepo };

View File

@@ -0,0 +1,25 @@
import { z } from 'zod';
const addLogSchema = z.object({
runId: z.string(),
loadId: z.string(),
severity: z.enum(['info', 'warning', 'error']),
message: z.string(),
data: z.any().optional(),
});
const findLogsSchema = z.object({
ids: z.array(z.string()).optional(),
runId: z.string().optional(),
loadId: z.string().optional(),
severities: z.array(z.enum(['debug', 'info', 'warn', 'error'])).optional(),
offset: z.number().optional(),
limit: z.number().optional(),
order: z.enum(['asc', 'desc']).optional(),
});
type AddLogOptions = z.infer<typeof addLogSchema>;
type FindLogsOptions = z.infer<typeof findLogsSchema>;
export type { AddLogOptions, FindLogsOptions };
export { addLogSchema, findLogsSchema };

View File

@@ -0,0 +1,122 @@
import { EventEmitter } from 'eventemitter3';
import { Database } from '../../database/database.js';
import { AddLogOptions, FindLogsOptions } from './logs.schemas.js';
import { nanoid } from 'nanoid';
import { createHash } from 'crypto';
type LogRepoEvents = {};
type LogRepoOptions = {
database: Database;
};
class LogRepo extends EventEmitter<LogRepoEvents> {
#options: LogRepoOptions;
constructor(options: LogRepoOptions) {
super();
this.#options = options;
}
public add = async (options: AddLogOptions) => {
const { database } = this.#options;
const db = await database.instance;
const id = nanoid();
await db('logs').insert({
id,
runId: options.runId,
loadId: options.loadId,
severity: options.severity,
message: options.message,
data: options.data,
timestamp: new Date(),
});
};
public prepareRemove = async (options: FindLogsOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('logs').select('id');
if (options.ids) {
query.whereIn('id', options.ids);
}
if (options.runId) {
query.where({ runId: options.runId });
}
if (options.loadId) {
query.where({ loadId: options.loadId });
}
if (options.severities) {
query.whereIn('severity', options.severities);
}
const ids = await query;
const token = ids.map((id) => Buffer.from(id.id).toString('base64')).join('|');
const hash = createHash('sha256').update(token).digest('hex');
return {
ids,
hash,
};
};
public remove = async (hash: string, ids: string[]) => {
const { database } = this.#options;
const db = await database.instance;
const token = ids.map((id) => Buffer.from(id).toString('base64')).join('|');
const actualHash = createHash('sha256').update(token).digest('hex');
if (hash !== actualHash) {
throw new Error('Invalid hash');
}
await db('logs').whereIn('id', ids).delete();
};
public find = async (options: FindLogsOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('logs');
if (options.ids) {
query.whereIn('id', options.ids);
}
if (options.runId) {
query.where({ runId: options.runId });
}
if (options.loadId) {
query.where({ loadId: options.loadId });
}
if (options.severities) {
query.whereIn('severity', options.severities);
}
if (options.offset) {
query.offset(options.offset);
}
if (options.limit) {
query.limit(options.limit);
}
if (options.order) {
query.orderBy('timestamp', options.order);
}
const logs = await query;
return logs;
};
}
export { addLogSchema, findLogsSchema } from './logs.schemas.js';
export { LogRepo };

View File

@@ -0,0 +1,66 @@
import { Config } from '../config/config.js';
import { Database } from '../database/database.js';
import { ArtifactRepo } from './artifacts/artifacts.js';
import { LoadRepo } from './loads/loads.js';
import { LogRepo } from './logs/logs.js';
import { RunRepo } from './runs/runs.js';
import { SecretRepo } from './secrets/secrets.js';
type ReposOptions = {
database: Database;
config: Config;
};
class Repos {
#loads: LoadRepo;
#runs: RunRepo;
#logs: LogRepo;
#artifacts: ArtifactRepo;
#secrets: SecretRepo;
constructor({ database, config }: ReposOptions) {
this.#loads = new LoadRepo({
database,
config,
});
this.#runs = new RunRepo({
database,
loads: this.#loads,
});
this.#logs = new LogRepo({
database,
});
this.#artifacts = new ArtifactRepo({
database,
});
this.#secrets = new SecretRepo({
database,
});
}
public get loads() {
return this.#loads;
}
public get runs() {
return this.#runs;
}
public get logs() {
return this.#logs;
}
public get artifacts() {
return this.#artifacts;
}
public get secrets() {
return this.#secrets;
}
}
export { findLogsSchema, addLogSchema } from './logs/logs.js';
export { setLoadSchema, findLoadsSchema } from './loads/loads.js';
export { createRunSchema, findRunsSchema } from './runs/runs.js';
export { addArtifactSchema, findArtifactsSchema } from './artifacts/artifacts.js';
export { Repos };

View File

@@ -0,0 +1,28 @@
import { z } from 'zod';
const runStatusSchema = z.enum(['running', 'succeeded', 'failed']);
const createRunSchema = z.object({
loadId: z.string(),
config: z.any().optional(),
data: z.any().optional(),
});
const updateRunSchema = z.object({
status: runStatusSchema,
error: z.string().optional(),
});
const findRunsSchema = z.object({
loadId: z.string().optional(),
offset: z.number().optional(),
limit: z.number().optional(),
});
type RunStatus = z.infer<typeof runStatusSchema>;
type CreateRunOptions = z.infer<typeof createRunSchema>;
type UpdateRunOptions = z.infer<typeof updateRunSchema>;
type FindRunsOptions = z.infer<typeof findRunsSchema>;
export type { RunStatus, CreateRunOptions, UpdateRunOptions, FindRunsOptions };
export { runStatusSchema, createRunSchema, updateRunSchema, findRunsSchema };

View File

@@ -0,0 +1,140 @@
import { nanoid } from 'nanoid';
import { EventEmitter } from 'eventemitter3';
import { Database } from '../../database/database.js';
import { CreateRunOptions, FindRunsOptions, UpdateRunOptions } from './runs.schemas.js';
import { LoadRepo } from '../loads/loads.js';
type RunRepoEvents = {
created: (args: { id: string; loadId: string }) => void;
updated: (args: { id: string; loadId: string }) => void;
failed: (args: { id: string; loadId: string }) => void;
succeeded: (args: { id: string; loadId: string }) => void;
};
type RunRepoOptions = {
database: Database;
loads: LoadRepo;
};
class RunRepo extends EventEmitter<RunRepoEvents> {
#options: RunRepoOptions;
constructor(options: RunRepoOptions) {
super();
this.#options = options;
}
public getById = async (id: string) => {
const { database } = this.#options;
const db = await database.instance;
const run = await db('runs').where({ id }).first();
if (!run) {
throw new Error('Run not found');
}
return run;
};
public getByLoadId = async (loadId: string) => {
const { database } = this.#options;
const db = await database.instance;
const runs = await db('runs').where({ loadId });
return runs;
};
public find = async (options: FindRunsOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('runs').select(['id', 'status', 'startedAt', 'status', 'error', 'endedAt']);
if (options.loadId) {
query.where({ loadId: options.loadId });
}
if (options.offset) {
query.offset(options.offset);
}
if (options.limit) {
query.limit(options.limit);
}
const runs = await query;
return runs;
};
public remove = async (options: FindRunsOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('runs');
if (options.loadId) {
query.where({ loadId: options.loadId });
}
await query.del();
};
public started = async (id: string) => {
const { database } = this.#options;
const db = await database.instance;
const current = await this.getById(id);
if (!current) {
throw new Error('Run not found');
}
const { loadId } = current;
const runs = await db('runs').where({ id }).update({
status: 'running',
startedAt: new Date(),
});
this.emit('updated', { id, loadId });
return runs;
};
public finished = async (id: string, options: UpdateRunOptions) => {
const { database } = this.#options;
const db = await database.instance;
const { loadId } = await this.getById(id);
const runs = await db('runs').where({ id }).update({
status: options.status,
error: options.error,
endedAt: new Date(),
});
this.emit('updated', { id, loadId });
switch (options.status) {
case 'failed':
this.emit('failed', { id, loadId });
break;
case 'succeeded':
this.emit('succeeded', { id, loadId });
break;
}
return runs;
};
public create = async (options: CreateRunOptions) => {
const { database, loads } = this.#options;
const id = nanoid();
const db = await database.instance;
const script = await loads.getScript(options.loadId);
await db('runs').insert({
id,
script,
loadId: options.loadId,
status: 'created',
createdAt: new Date(),
});
this.emit('created', {
id,
loadId: options.loadId,
});
return id;
};
}
export { createRunSchema, findRunsSchema } from './runs.schemas.js';
export { RunRepo };

View File

@@ -0,0 +1,17 @@
import { z } from 'zod';
const setSecretSchema = z.object({
id: z.string(),
value: z.string(),
});
const findSecretsSchema = z.object({
offset: z.number().optional(),
limit: z.number().optional(),
});
type SetSecretOptions = z.infer<typeof setSecretSchema>;
type FindSecretOptions = z.infer<typeof findSecretsSchema>;
export type { SetSecretOptions, FindSecretOptions };
export { setSecretSchema, findSecretsSchema };

View File

@@ -0,0 +1,76 @@
import { EventEmitter } from 'eventemitter3';
import { Database } from '../../database/database.js';
import { FindSecretOptions, SetSecretOptions } from './secrets.schemas.js';
type LogRepoEvents = {};
type LogRepoOptions = {
database: Database;
};
class SecretRepo extends EventEmitter<LogRepoEvents> {
#options: LogRepoOptions;
constructor(options: LogRepoOptions) {
super();
this.#options = options;
}
public set = async (options: SetSecretOptions) => {
const { database } = this.#options;
const db = await database.instance;
const current = await db('secrets').where('id', options.id).first();
if (current) {
await db('secrets').where('id', options.id).update({
value: options.value,
updatedAt: new Date(),
});
} else {
await db('secrets').insert({
id: options.id,
value: options.value,
createdAt: new Date(),
updatedAt: new Date(),
});
}
};
public remove = async (id: string) => {
const { database } = this.#options;
const db = await database.instance;
await db('logs').where('id', id).delete();
};
public getAll = async () => {
const { database } = this.#options;
const db = await database.instance;
const secrets = await db('secrets').select('id', 'value');
return secrets.reduce((acc, secret) => {
acc[secret.id] = secret.value;
return acc;
}, {} as Record<string, string>);
};
public find = async (options: FindSecretOptions) => {
const { database } = this.#options;
const db = await database.instance;
const query = db('secrets').select('id');
if (options.offset) {
query.offset(options.offset);
}
if (options.limit) {
query.limit(options.limit);
}
const secrets = await query;
return secrets;
};
}
export { findSecretsSchema, setSecretSchema } from './secrets.schemas.js';
export { SecretRepo };

View File

@@ -0,0 +1,43 @@
import { z } from 'zod';
import { findArtifactsSchema } from '../repos/repos.js';
import { publicProcedure, router } from './router.utils.js';
const find = publicProcedure.input(findArtifactsSchema).query(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { artifacts } = repos;
const result = await artifacts.find(input);
return result;
});
const prepareRemove = publicProcedure.input(findArtifactsSchema).query(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { artifacts } = repos;
await artifacts.prepareRemove(input);
});
const remove = publicProcedure
.input(
z.object({
hash: z.string(),
ids: z.array(z.string()),
}),
)
.mutation(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { artifacts } = repos;
await artifacts.remove(input.hash, input.ids);
});
const artifactsRouter = router({
find,
remove,
prepareRemove,
});
export { artifactsRouter };

View File

@@ -0,0 +1,27 @@
import { findLoadsSchema, setLoadSchema } from '../repos/repos.js';
import { publicProcedure, router } from './router.utils.js';
const set = publicProcedure.input(setLoadSchema).mutation(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { loads } = repos;
const load = await loads.set(input);
return load;
});
const find = publicProcedure.input(findLoadsSchema).query(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { loads } = repos;
const load = await loads.find(input);
return load;
});
const loadsRouter = router({
set,
find,
});
export { loadsRouter };

View File

@@ -0,0 +1,43 @@
import { z } from 'zod';
import { findLogsSchema } from '../repos/repos.js';
import { publicProcedure, router } from './router.utils.js';
const find = publicProcedure.input(findLogsSchema).query(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { logs } = repos;
const result = await logs.find(input);
return result;
});
const prepareRemove = publicProcedure.input(findLogsSchema).query(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { logs } = repos;
await logs.prepareRemove(input);
});
const remove = publicProcedure
.input(
z.object({
hash: z.string(),
ids: z.array(z.string()),
}),
)
.mutation(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { logs } = repos;
await logs.remove(input.hash, input.ids);
});
const logsRouter = router({
find,
remove,
prepareRemove,
});
export { logsRouter };

View File

@@ -0,0 +1,33 @@
import { createRunSchema, findRunsSchema } from '../repos/repos.js';
import { publicProcedure, router } from './router.utils.js';
const create = publicProcedure.input(createRunSchema).mutation(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { runs } = repos;
const id = await runs.create(input);
return id;
});
const find = publicProcedure.input(findRunsSchema).query(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { runs } = repos;
const results = await runs.find(input);
return results;
});
const remove = publicProcedure.input(findRunsSchema).mutation(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { runs } = repos;
await runs.remove(input);
});
const runsRouter = router({
create,
find,
remove,
});
export { runsRouter };

View File

@@ -0,0 +1,42 @@
import { z } from 'zod';
import { publicProcedure, router } from './router.utils.js';
import { findSecretsSchema, setSecretSchema } from '../repos/secrets/secrets.schemas.js';
const find = publicProcedure.input(findSecretsSchema).query(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { secrets } = repos;
const result = await secrets.find(input);
return result;
});
const set = publicProcedure.input(setSecretSchema).mutation(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { secrets } = repos;
await secrets.set(input);
});
const remove = publicProcedure
.input(
z.object({
id: z.string(),
}),
)
.mutation(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { secrets } = repos;
await secrets.remove(input.id);
});
const secretsRouter = router({
find,
set,
remove,
});
export { secretsRouter };

View File

@@ -0,0 +1,19 @@
import { artifactsRouter } from './router.artifacts.js';
import { loadsRouter } from './router.loads.js';
import { logsRouter } from './router.logs.js';
import { runsRouter } from './router.runs.js';
import { secretsRouter } from './router.secrets.js';
import { router } from './router.utils.js';
const rootRouter = router({
loads: loadsRouter,
runs: runsRouter,
logs: logsRouter,
artifacts: artifactsRouter,
secrets: secretsRouter,
});
type RootRouter = typeof rootRouter;
export type { RootRouter };
export { rootRouter };

View File

@@ -0,0 +1,30 @@
import { initTRPC } from '@trpc/server';
import { CreateFastifyContextOptions } from '@trpc/server/adapters/fastify';
import superjson from 'superjson';
import { Runtime } from '../runtime/runtime.js';
type ContextOptions = {
runtime: Runtime;
};
const createContext = async ({ runtime }: ContextOptions) => {
return async ({ req }: CreateFastifyContextOptions) => {
const { authorization } = req.headers;
const { auth } = runtime;
if (!authorization) {
throw new Error('No authorization header');
}
await auth.validateToken(authorization);
return {
runtime,
};
};
};
type Context = Awaited<ReturnType<typeof createContext>>;
const { router, procedure: publicProcedure } = initTRPC.context<Context>().create({
transformer: superjson,
});
export { createContext, router, publicProcedure };

View File

@@ -0,0 +1,92 @@
import { EventEmitter } from 'eventemitter3';
import { run } from '@morten-olsen/mini-loader-runner';
import { Repos } from '../repos/repos.js';
import { LoggerEvent } from '../../../mini-loader/dist/esm/logger/logger.js';
import { ArtifactCreateEvent } from '../../../mini-loader/dist/esm/artifacts/artifacts.js';
import { Config } from '../config/config.js';
import { resolve } from 'path';
import { readFile } from 'fs/promises';
type RunnerInstanceEvents = {
completed: (args: { id: string }) => void;
};
type RunnerInstanceOptions = {
id: string;
loadId: string;
repos: Repos;
config: Config;
};
class RunnerInstance extends EventEmitter<RunnerInstanceEvents> {
#options: RunnerInstanceOptions;
constructor(options: RunnerInstanceOptions) {
super();
this.#options = options;
}
#addLog = async (event: LoggerEvent['payload']) => {
const { repos, id, loadId } = this.#options;
const { logs } = repos;
await logs.add({
runId: id,
loadId,
severity: event.severity,
message: event.message,
data: event.data,
});
};
#addArtifact = async (event: ArtifactCreateEvent['payload']) => {
const { repos, id, loadId } = this.#options;
const { artifacts } = repos;
await artifacts.add({
name: event.name,
runId: id,
loadId,
data: event.data,
});
};
public start = async () => {
const { repos, id, config } = this.#options;
const { runs, secrets } = repos;
try {
const { script: scriptHash, input } = await runs.getById(id);
const scriptLocation = resolve(config.files.location, 'script', `${scriptHash}.js`);
const script = await readFile(scriptLocation, 'utf-8');
const allSecrets = await secrets.getAll();
await runs.started(id);
const { promise, emitter } = await run({
script,
secrets: allSecrets,
input,
});
emitter.on('message', (message) => {
switch (message.type) {
case 'log': {
this.#addLog(message.payload);
break;
}
case 'artifact:create': {
this.#addArtifact(message.payload);
break;
}
}
});
await promise;
await runs.finished(id, { status: 'succeeded' });
} catch (error) {
let errorMessage = 'Unknown error';
if (error instanceof Error) {
errorMessage = error.message;
}
await runs.finished(id, { status: 'failed', error: errorMessage });
} finally {
this.emit('completed', { id });
}
};
}
export { RunnerInstance };

View File

@@ -0,0 +1,41 @@
import { Config } from '../config/config.js';
import { Repos } from '../repos/repos.js';
import { RunnerInstance } from './runner.instance.js';
type RunnerOptions = {
repos: Repos;
config: Config;
};
class Runner {
#options: RunnerOptions;
#instances: Map<string, RunnerInstance> = new Map();
constructor(options: RunnerOptions) {
this.#options = options;
const { repos } = options;
repos.runs.on('created', this.#start);
}
#start = async (args: { id: string; loadId: string }) => {
const { repos, config } = this.#options;
if (this.#instances.has(args.id)) {
return;
}
const instance = new RunnerInstance({
id: args.id,
loadId: args.loadId,
repos,
config,
});
instance.on('completed', () => {
this.#instances.delete(args.id);
});
this.#instances.set(args.id, instance);
await instance.start();
};
}
export { Runner };

View File

@@ -0,0 +1,50 @@
import { Database } from '../database/database.js';
import { Repos } from '../repos/repos.js';
import { Runner } from '../runner/runner.js';
import { Config } from '../config/config.js';
import { Auth } from '../auth/auth.js';
import { resolve } from 'path';
class Runtime {
#repos: Repos;
#runner: Runner;
#auth: Auth;
constructor(options: Config) {
const database = new Database(options.database);
this.#repos = new Repos({ database, config: options });
this.#runner = new Runner({ repos: this.#repos, config: options });
this.#auth = new Auth({ config: options });
}
public get repos() {
return this.#repos;
}
public get runner() {
return this.#runner;
}
public get auth() {
return this.#auth;
}
public static create = async () => {
const runtime = new Runtime({
database: {
client: 'sqlite3',
connection: {
filename: resolve(process.cwd(), 'data', 'database.sqlite'),
},
useNullAsDefault: true,
},
files: {
location: resolve(process.cwd(), 'data', 'files'),
},
});
return runtime;
};
}
export { Runtime };

View File

@@ -0,0 +1,28 @@
import { fastifyTRPCPlugin, FastifyTRPCPluginOptions } from '@trpc/server/adapters/fastify';
import fastify from 'fastify';
import { RootRouter, rootRouter } from '../router/router.js';
import { createContext } from '../router/router.utils.js';
import { Runtime } from '../runtime/runtime.js';
const createServer = async (runtime: Runtime) => {
const server = fastify({});
server.get('/', async () => {
return { hello: 'world' };
});
server.register(fastifyTRPCPlugin, {
prefix: '/trpc',
trpcOptions: {
router: rootRouter,
createContext: await createContext({ runtime }),
onError({ error }) {
console.error(error);
},
} satisfies FastifyTRPCPluginOptions<RootRouter>['trpcOptions'],
});
await server.ready();
return server;
};
export { createServer };