feat: add http gateway (#3)

This commit is contained in:
Morten Olsen
2024-01-12 21:10:48 +01:00
committed by GitHub
parent 9c5249956e
commit 1115ce2fb3
22 changed files with 397 additions and 74 deletions

View File

@@ -0,0 +1,34 @@
import { FastifyPluginAsync } from 'fastify';
import FastifyReplyFrom from '@fastify/reply-from';
import { escape } from 'querystring';
import { Runtime } from '../runtime/runtime.js';
type Options = {
runtime: Runtime;
};
const gateway: FastifyPluginAsync<Options> = async (fastify, { runtime }) => {
await fastify.register(FastifyReplyFrom, {
http: {},
});
fastify.all('/gateway/*', (req, res) => {
const [runId, ...pathSegments] = (req.params as any)['*'].split('/').filter(Boolean);
const run = runtime.runner.getInstance(runId);
if (!run) {
res.statusCode = 404;
res.send({ error: 'Run not found' });
return;
}
const socketPath = run.run?.httpGatewaySocket;
if (!socketPath) {
res.statusCode = 404;
res.send({ error: 'No socket path to run' });
return;
}
const path = pathSegments.join('/');
res.from(`unix+http://${escape(socketPath)}/${path}`);
});
};
export { gateway };

View File

@@ -3,6 +3,7 @@ import { EventEmitter } from 'eventemitter3';
import { Database } from '../../database/database.js';
import { CreateRunOptions, FindRunsOptions, UpdateRunOptions } from './runs.schemas.js';
import { LoadRepo } from '../loads/loads.js';
import { createHash } from 'crypto';
type RunRepoEvents = {
created: (args: { id: string; loadId: string }) => void;
@@ -18,13 +19,22 @@ type RunRepoOptions = {
class RunRepo extends EventEmitter<RunRepoEvents> {
#options: RunRepoOptions;
#isReady: Promise<void>;
constructor(options: RunRepoOptions) {
super();
this.#options = options;
this.#isReady = this.#setup();
}
#setup = async () => {
const { database } = this.#options;
const db = await database.instance;
await db('runs').update({ status: 'failed', error: 'server was shut down' }).where({ status: 'running' });
};
public getById = async (id: string) => {
await this.#isReady;
const { database } = this.#options;
const db = await database.instance;
@@ -36,6 +46,7 @@ class RunRepo extends EventEmitter<RunRepoEvents> {
};
public getByLoadId = async (loadId: string) => {
await this.#isReady;
const { database } = this.#options;
const db = await database.instance;
@@ -44,6 +55,7 @@ class RunRepo extends EventEmitter<RunRepoEvents> {
};
public find = async (options: FindRunsOptions) => {
await this.#isReady;
const { database } = this.#options;
const db = await database.instance;
const query = db('runs').select(['id', 'status', 'startedAt', 'status', 'error', 'endedAt']);
@@ -62,19 +74,41 @@ class RunRepo extends EventEmitter<RunRepoEvents> {
return runs;
};
public remove = async (options: FindRunsOptions) => {
public prepareRemove = async (options: FindRunsOptions) => {
await this.#isReady;
const { database } = this.#options;
const db = await database.instance;
const query = db('runs');
const query = db('runs').select('id');
if (options.loadId) {
query.where({ loadId: options.loadId });
}
await query.del();
const result = await query;
const ids = result.map((row) => row.id);
const token = ids.map((id) => Buffer.from(id).toString('base64')).join('|');
const hash = createHash('sha256').update(token).digest('hex');
return {
ids,
hash,
};
};
public remove = async (hash: string, ids: string[]) => {
const { database } = this.#options;
const db = await database.instance;
const token = ids.map((id) => Buffer.from(id).toString('base64')).join('|');
const actualHash = createHash('sha256').update(token).digest('hex');
if (hash !== actualHash) {
throw new Error('Invalid hash');
}
await db('runs').whereIn('id', ids).delete();
};
public started = async (id: string) => {
await this.#isReady;
const { database } = this.#options;
const db = await database.instance;
const current = await this.getById(id);
@@ -92,6 +126,7 @@ class RunRepo extends EventEmitter<RunRepoEvents> {
};
public finished = async (id: string, options: UpdateRunOptions) => {
await this.#isReady;
const { database } = this.#options;
const db = await database.instance;
const { loadId } = await this.getById(id);
@@ -114,6 +149,7 @@ class RunRepo extends EventEmitter<RunRepoEvents> {
};
public create = async (options: CreateRunOptions) => {
await this.#isReady;
const { database, loads } = this.#options;
const id = nanoid();
const db = await database.instance;

View File

@@ -1,3 +1,4 @@
import { z } from 'zod';
import { createRunSchema, findRunsSchema } from '../repos/repos.js';
import { publicProcedure, router } from './router.utils.js';
@@ -17,17 +18,50 @@ const find = publicProcedure.input(findRunsSchema).query(async ({ input, ctx })
return results;
});
const remove = publicProcedure.input(findRunsSchema).mutation(async ({ input, ctx }) => {
const prepareRemove = publicProcedure.input(findRunsSchema).query(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { runs } = repos;
await runs.remove(input);
return await runs.prepareRemove(input);
});
const remove = publicProcedure
.input(
z.object({
hash: z.string(),
ids: z.array(z.string()),
}),
)
.mutation(async ({ input, ctx }) => {
const { runtime } = ctx;
const { repos } = runtime;
const { runs } = repos;
for (const id of input.ids) {
const instance = runtime.runner.getInstance(id);
if (instance) {
await instance.run?.teardown();
}
}
await runs.remove(input.hash, input.ids);
});
const terminate = publicProcedure.input(z.string()).mutation(async ({ input, ctx }) => {
const { runtime } = ctx;
const { runner } = runtime;
const instance = runner.getInstance(input);
if (!instance || !instance.run) {
return;
}
await instance.run.teardown();
});
const runsRouter = router({
create,
find,
remove,
prepareRemove,
terminate,
});
export { runsRouter };

View File

@@ -1,5 +1,5 @@
import { EventEmitter } from 'eventemitter3';
import { run } from '@morten-olsen/mini-loader-runner';
import { RunInfo, run } from '@morten-olsen/mini-loader-runner';
import { Repos } from '../repos/repos.js';
import { LoggerEvent } from '../../../mini-loader/dist/esm/logger/logger.js';
import { ArtifactCreateEvent } from '../../../mini-loader/dist/esm/artifacts/artifacts.js';
@@ -20,12 +20,17 @@ type RunnerInstanceOptions = {
class RunnerInstance extends EventEmitter<RunnerInstanceEvents> {
#options: RunnerInstanceOptions;
#run?: RunInfo;
constructor(options: RunnerInstanceOptions) {
super();
this.#options = options;
}
public get run() {
return this.#run;
}
#addLog = async (event: LoggerEvent['payload']) => {
const { repos, id, loadId } = this.#options;
const { logs } = repos;
@@ -58,11 +63,13 @@ class RunnerInstance extends EventEmitter<RunnerInstanceEvents> {
const script = await readFile(scriptLocation, 'utf-8');
const allSecrets = await secrets.getAll();
await runs.started(id);
const { promise, emitter } = await run({
const current = await run({
script,
secrets: allSecrets,
input,
});
this.#run = current;
const { promise, emitter } = current;
emitter.on('message', (message) => {
switch (message.type) {
case 'log': {
@@ -84,9 +91,11 @@ class RunnerInstance extends EventEmitter<RunnerInstanceEvents> {
}
await runs.finished(id, { status: 'failed', error: errorMessage });
} finally {
this.#run = undefined;
this.emit('completed', { id });
}
};
}
export type { RunInfo };
export { RunnerInstance };

View File

@@ -36,6 +36,10 @@ class Runner {
this.#instances.set(args.id, instance);
await instance.start();
};
public getInstance = (id: string) => {
return this.#instances.get(id);
};
}
export { Runner };

View File

@@ -3,9 +3,16 @@ import fastify from 'fastify';
import { RootRouter, rootRouter } from '../router/router.js';
import { createContext } from '../router/router.utils.js';
import { Runtime } from '../runtime/runtime.js';
import { gateway } from '../gateway/gateway.js';
const createServer = async (runtime: Runtime) => {
const server = fastify({});
const server = fastify({
maxParamLength: 10000,
bodyLimit: 30 * 1024 * 1024,
logger: {
level: 'warn',
},
});
server.get('/', async () => {
return { hello: 'world' };
});
@@ -33,6 +40,14 @@ const createServer = async (runtime: Runtime) => {
},
} satisfies FastifyTRPCPluginOptions<RootRouter>['trpcOptions'],
});
server.register(gateway, {
runtime,
});
server.addHook('onError', async (request, reply, error) => {
console.error(error);
});
await server.ready();
return server;