diff --git a/packages/cli/src/commands/schedules/schedules.add.ts b/packages/cli/src/commands/schedules/schedules.add.ts new file mode 100644 index 0000000..b34aa8f --- /dev/null +++ b/packages/cli/src/commands/schedules/schedules.add.ts @@ -0,0 +1,32 @@ +import { Command } from 'commander'; +import { createClient } from '../../client/client.js'; +import { step } from '../../utils/step.js'; +import { Context } from '../../context/context.js'; +import { Config } from '../../config/config.js'; + +const add = new Command('add'); + +add + .description('Add schedule') + .argument('', 'Load ID') + .argument('', 'Cron') + .option('-n, --name ', 'Name') + .action(async (loadId, cron) => { + const config = new Config(); + const context = new Context(config.context); + const { name } = add.opts(); + const client = await step('Connecting to server', async () => { + return createClient(context); + }); + const id = await step('Adding schedule', async () => { + return await client.schedules.add.mutate({ + name, + load: loadId, + cron, + }); + }); + + console.log(`Schedule added with ID ${id}`); + }); + +export { add }; diff --git a/packages/cli/src/commands/schedules/schedules.list.ts b/packages/cli/src/commands/schedules/schedules.list.ts new file mode 100644 index 0000000..011c8f6 --- /dev/null +++ b/packages/cli/src/commands/schedules/schedules.list.ts @@ -0,0 +1,39 @@ +import { Command } from 'commander'; +import { createClient } from '../../client/client.js'; +import { step } from '../../utils/step.js'; +import { Context } from '../../context/context.js'; +import { Config } from '../../config/config.js'; + +const list = new Command('list'); + +const toInt = (value?: string) => { + if (!value) { + return undefined; + } + return parseInt(value, 10); +}; + +list + .alias('ls') + .description('List schedules') + .option('-l, --load-ids ', 'Load ID') + .option('-o, --offset ', 'Offset') + .option('-a, --limit ', 'Limit', '1000') + .action(async () => { + const { loadIds, offset, limit } = list.opts(); + const config = new Config(); + const context = new Context(config.context); + const client = await step('Connecting to server', async () => { + return createClient(context); + }); + const schedules = await step('Getting schedules', async () => { + return await client.schedules.find.query({ + loadIds, + offset: toInt(offset), + limit: toInt(limit), + }); + }); + console.table(schedules); + }); + +export { list }; diff --git a/packages/cli/src/commands/schedules/schedules.remove.ts b/packages/cli/src/commands/schedules/schedules.remove.ts new file mode 100644 index 0000000..beb3d58 --- /dev/null +++ b/packages/cli/src/commands/schedules/schedules.remove.ts @@ -0,0 +1,61 @@ +import { Command } from 'commander'; +import { createClient } from '../../client/client.js'; +import { step } from '../../utils/step.js'; +import { Context } from '../../context/context.js'; +import inquirer from 'inquirer'; +import { Config } from '../../config/config.js'; + +const remove = new Command('remove'); + +const toInt = (value?: string) => { + if (!value) { + return undefined; + } + return parseInt(value, 10); +}; + +remove + .alias('ls') + .description('LRemove schedules') + .option('-i, --ids ', 'Load IDs') + .option('-l, --load-ids ', 'Load IDs') + .option('-o, --offset ', 'Offset') + .option('-a, --limit ', 'Limit', '1000') + .action(async () => { + const { ids, loadIds, offset, limit } = remove.opts(); + const config = new Config(); + const context = new Context(config.context); + const client = await step('Connecting to server', async () => { + return createClient(context); + }); + const response = await step('Preparing to delete', async () => { + return await client.schedules.prepareRemove.query({ + ids, + loadIds, + offset: toInt(offset), + limit: toInt(limit), + }); + }); + + if (!response.ids.length) { + console.log('No logs to delete'); + return; + } + const { confirm } = await inquirer.prompt([ + { + type: 'confirm', + name: 'confirm', + message: `Are you sure you want to delete ${response.ids.length} schedules?`, + }, + ]); + + if (!confirm) { + return; + } + + await step('Deleting artifacts', async () => { + await client.artifacts.remove.mutate(response); + }); + }); + +export { remove }; diff --git a/packages/cli/src/commands/schedules/schedules.ts b/packages/cli/src/commands/schedules/schedules.ts new file mode 100644 index 0000000..520e696 --- /dev/null +++ b/packages/cli/src/commands/schedules/schedules.ts @@ -0,0 +1,11 @@ +import { Command } from 'commander'; +import { list } from './schedules.list.js'; +import { remove } from './schedules.remove.js'; +import { add } from './schedules.add.js'; + +const schedules = new Command('schedules'); +schedules.addCommand(list); +schedules.addCommand(remove); +schedules.addCommand(add); + +export { schedules }; diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index 01a82df..996b4ad 100644 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -8,6 +8,7 @@ import { secrets } from './commands/secrets/secrets.js'; import { local } from './commands/local/local.js'; import { auth } from './commands/auth/auth.js'; import { contexts } from './commands/contexts/contexts.js'; +import { schedules } from './commands/schedules/schedules.js'; program.addCommand(loads); program.addCommand(runs); @@ -17,6 +18,7 @@ program.addCommand(secrets); program.addCommand(local); program.addCommand(auth); program.addCommand(contexts); +program.addCommand(schedules); program.version(pkg.version); diff --git a/packages/server/package.json b/packages/server/package.json index 0ba39a4..2e5ef29 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -31,6 +31,7 @@ "@trpc/client": "^10.45.0", "@trpc/server": "^10.45.0", "commander": "^11.1.0", + "cron": "^3.1.6", "eventemitter3": "^5.0.1", "fastify": "^4.25.2", "jsonwebtoken": "^9.0.2", diff --git a/packages/server/src/database/migrations/migration.schedule.ts b/packages/server/src/database/migrations/migration.schedule.ts new file mode 100644 index 0000000..71dc272 --- /dev/null +++ b/packages/server/src/database/migrations/migration.schedule.ts @@ -0,0 +1,22 @@ +import { Knex } from 'knex'; + +const name = 'schedule-support'; + +const up = async (knex: Knex) => { + await knex.schema.createTable('schedules', (table) => { + table.string('id').primary(); + table.string('name').nullable(); + table.string('description').nullable(); + table.string('load').notNullable(); + table.string('cron').notNullable(); + table.string('input').nullable(); + table.timestamp('createdAt').notNullable(); + table.timestamp('updatedAt').notNullable(); + }); +}; + +const down = async (knex: Knex) => { + await knex.schema.dropTable('schedule'); +}; + +export { name, up, down }; diff --git a/packages/server/src/database/migrations/migrations.source.ts b/packages/server/src/database/migrations/migrations.source.ts index b89464e..7ce8400 100644 --- a/packages/server/src/database/migrations/migrations.source.ts +++ b/packages/server/src/database/migrations/migrations.source.ts @@ -1,6 +1,7 @@ import { Knex } from 'knex'; import * as init from './migration.init.js'; +import * as scheduleSupport from './migration.schedule.js'; type Migration = { name: string; @@ -8,7 +9,7 @@ type Migration = { down: (knex: Knex) => Promise; }; -const migrations = [init] satisfies Migration[]; +const migrations = [init, scheduleSupport] satisfies Migration[]; const source: Knex.MigrationSource = { getMigrations: async () => migrations, diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 83d1564..9464004 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -6,6 +6,7 @@ const start = new Command('start'); start.action(async () => { const port = 4500; const runtime = await Runtime.create(); + await runtime.scheduler.start(); const server = await createServer(runtime); await server.listen({ port, diff --git a/packages/server/src/knex.d.ts b/packages/server/src/knex.d.ts index 48f189d..a104048 100644 --- a/packages/server/src/knex.d.ts +++ b/packages/server/src/knex.d.ts @@ -43,5 +43,15 @@ declare module 'knex/types/tables.js' { createdAt: Date; updatedAt: Date; }; + schedules: { + id: string; + name?: string; + description?: string; + load: string; + cron: string; + input?: string; + createdAt: Date; + updatedAt: Date; + }; } } diff --git a/packages/server/src/repos/repos.ts b/packages/server/src/repos/repos.ts index 49d1f0e..40c94c1 100644 --- a/packages/server/src/repos/repos.ts +++ b/packages/server/src/repos/repos.ts @@ -4,6 +4,7 @@ import { ArtifactRepo } from './artifacts/artifacts.js'; import { LoadRepo } from './loads/loads.js'; import { LogRepo } from './logs/logs.js'; import { RunRepo } from './runs/runs.js'; +import { ScheduleRepo } from './schedules/schedules.js'; import { SecretRepo } from './secrets/secrets.js'; type ReposOptions = { @@ -17,6 +18,7 @@ class Repos { #logs: LogRepo; #artifacts: ArtifactRepo; #secrets: SecretRepo; + #schedule: ScheduleRepo; constructor({ database, config }: ReposOptions) { this.#loads = new LoadRepo({ @@ -36,6 +38,9 @@ class Repos { this.#secrets = new SecretRepo({ database, }); + this.#schedule = new ScheduleRepo({ + database, + }); } public get loads() { @@ -57,8 +62,13 @@ class Repos { public get secrets() { return this.#secrets; } + + public get schedules() { + return this.#schedule; + } } +export { findSchedulesSchema, addScheduleSchema } from './schedules/schedules.js'; export { findLogsSchema, addLogSchema } from './logs/logs.js'; export { setLoadSchema, findLoadsSchema } from './loads/loads.js'; export { createRunSchema, findRunsSchema } from './runs/runs.js'; diff --git a/packages/server/src/repos/schedules/schedules.schemas.ts b/packages/server/src/repos/schedules/schedules.schemas.ts new file mode 100644 index 0000000..149c1ca --- /dev/null +++ b/packages/server/src/repos/schedules/schedules.schemas.ts @@ -0,0 +1,22 @@ +import { z } from 'zod'; + +const addScheduleSchema = z.object({ + name: z.string().optional(), + description: z.string().optional(), + load: z.string(), + cron: z.string(), + input: z.string().optional(), +}); + +const findSchedulesSchema = z.object({ + ids: z.array(z.string()).optional(), + loadIds: z.array(z.string()).optional(), + offset: z.number().optional(), + limit: z.number().optional(), +}); + +type AddScheduleOptions = z.infer; +type FindSchedulesOptions = z.infer; + +export type { AddScheduleOptions, FindSchedulesOptions }; +export { addScheduleSchema, findSchedulesSchema }; diff --git a/packages/server/src/repos/schedules/schedules.ts b/packages/server/src/repos/schedules/schedules.ts new file mode 100644 index 0000000..9c25f50 --- /dev/null +++ b/packages/server/src/repos/schedules/schedules.ts @@ -0,0 +1,118 @@ +import { EventEmitter } from 'eventemitter3'; +import { Database } from '../../database/database.js'; +import { nanoid } from 'nanoid'; +import { AddScheduleOptions, FindSchedulesOptions } from './schedules.schemas.js'; +import { createHash } from 'crypto'; + +type ScheduleRepoEvents = { + added: (id: string) => void; + removed: (id: string) => void; +}; + +type ScheduleRepoOptions = { + database: Database; +}; + +class ScheduleRepo extends EventEmitter { + #options: ScheduleRepoOptions; + + constructor(options: ScheduleRepoOptions) { + super(); + this.#options = options; + } + + public get = async (id: string) => { + const { database } = this.#options; + const db = await database.instance; + const result = await db('schedules').where('id', id).first(); + return result; + }; + + public add = async (options: AddScheduleOptions) => { + const { database } = this.#options; + const db = await database.instance; + const id = nanoid(); + + await db('schedules').insert({ + id, + name: options.name, + description: options.description, + cron: options.cron, + createdAt: new Date(), + updatedAt: new Date(), + }); + + this.emit('added', id); + + return id; + }; + + public prepareRemove = async (options: FindSchedulesOptions) => { + const { database } = this.#options; + const db = await database.instance; + + const query = db('schedules').select('id'); + + if (options.ids) { + query.whereIn('id', options.ids); + } + + if (options.loadIds) { + query.whereIn('loadId', options.loadIds); + } + + const result = await query; + const ids = result.map((row) => row.id); + const token = ids.map((id) => Buffer.from(id).toString('base64')).join('|'); + const hash = createHash('sha256').update(token).digest('hex'); + return { + ids, + hash, + }; + }; + + public remove = async (hash: string, ids: string[]) => { + const { database } = this.#options; + const db = await database.instance; + const token = ids.map((id) => Buffer.from(id).toString('base64')).join('|'); + const actualHash = createHash('sha256').update(token).digest('hex'); + + if (hash !== actualHash) { + throw new Error('Invalid hash'); + } + + await db('schedules').whereIn('id', ids).delete(); + ids.forEach((id) => { + this.emit('removed', id); + }); + }; + + public find = async (options: FindSchedulesOptions) => { + const { database } = this.#options; + const db = await database.instance; + + const query = db('schedules'); + + if (options.ids) { + query.whereIn('id', options.ids); + } + + if (options.loadIds) { + query.whereIn('loadId', options.loadIds); + } + + if (options.offset) { + query.offset(options.offset); + } + + if (options.limit) { + query.limit(options.limit); + } + + const results = await query; + return results; + }; +} + +export { addScheduleSchema, findSchedulesSchema } from './schedules.schemas.js'; +export { ScheduleRepo }; diff --git a/packages/server/src/router/router.schedules.ts b/packages/server/src/router/router.schedules.ts new file mode 100644 index 0000000..3f311b5 --- /dev/null +++ b/packages/server/src/router/router.schedules.ts @@ -0,0 +1,53 @@ +import { z } from 'zod'; +import { addScheduleSchema, findSchedulesSchema } from '../repos/repos.js'; +import { publicProcedure, router } from './router.utils.js'; + +const add = publicProcedure.input(addScheduleSchema).mutation(async ({ input, ctx }) => { + const { runtime } = ctx; + const { repos } = runtime; + const { schedules } = repos; + + const result = await schedules.add(input); + return result; +}); + +const find = publicProcedure.input(findSchedulesSchema).query(async ({ input, ctx }) => { + const { runtime } = ctx; + const { repos } = runtime; + const { schedules } = repos; + + const result = await schedules.find(input); + return result; +}); + +const prepareRemove = publicProcedure.input(findSchedulesSchema).query(async ({ input, ctx }) => { + const { runtime } = ctx; + const { repos } = runtime; + const { schedules } = repos; + + return await schedules.prepareRemove(input); +}); + +const remove = publicProcedure + .input( + z.object({ + hash: z.string(), + ids: z.array(z.string()), + }), + ) + .mutation(async ({ input, ctx }) => { + const { runtime } = ctx; + const { repos } = runtime; + const { artifacts } = repos; + + await artifacts.remove(input.hash, input.ids); + }); + +const schedulesRouter = router({ + add, + find, + remove, + prepareRemove, +}); + +export { schedulesRouter }; diff --git a/packages/server/src/router/router.ts b/packages/server/src/router/router.ts index de748cc..35c0fdc 100644 --- a/packages/server/src/router/router.ts +++ b/packages/server/src/router/router.ts @@ -2,6 +2,7 @@ import { artifactsRouter } from './router.artifacts.js'; import { loadsRouter } from './router.loads.js'; import { logsRouter } from './router.logs.js'; import { runsRouter } from './router.runs.js'; +import { schedulesRouter } from './router.schedules.js'; import { secretsRouter } from './router.secrets.js'; import { router } from './router.utils.js'; @@ -11,6 +12,7 @@ const rootRouter = router({ logs: logsRouter, artifacts: artifactsRouter, secrets: secretsRouter, + schedules: schedulesRouter, }); type RootRouter = typeof rootRouter; diff --git a/packages/server/src/runtime/runtime.ts b/packages/server/src/runtime/runtime.ts index 14a3254..0656f0c 100644 --- a/packages/server/src/runtime/runtime.ts +++ b/packages/server/src/runtime/runtime.ts @@ -4,17 +4,20 @@ import { Runner } from '../runner/runner.js'; import { Config } from '../config/config.js'; import { Auth } from '../auth/auth.js'; import { resolve } from 'path'; +import { Scheduler } from '../scheduler/scheduler.js'; class Runtime { #repos: Repos; #runner: Runner; #auth: Auth; + #scheduler: Scheduler; constructor(options: Config) { const database = new Database(options.database); this.#repos = new Repos({ database, config: options }); this.#runner = new Runner({ repos: this.#repos, config: options }); this.#auth = new Auth({ config: options }); + this.#scheduler = new Scheduler({ runs: this.#repos.runs, schedules: this.#repos.schedules }); } public get repos() { @@ -29,6 +32,10 @@ class Runtime { return this.#auth; } + public get scheduler() { + return this.#scheduler; + } + public static create = async () => { const runtime = new Runtime({ database: { diff --git a/packages/server/src/scheduler/scheduler.ts b/packages/server/src/scheduler/scheduler.ts new file mode 100644 index 0000000..fb53f02 --- /dev/null +++ b/packages/server/src/scheduler/scheduler.ts @@ -0,0 +1,73 @@ +import { CronJob } from 'cron'; +import { ScheduleRepo } from '../repos/schedules/schedules.js'; +import { RunRepo } from '../repos/runs/runs.js'; + +type SchedulerOptions = { + runs: RunRepo; + schedules: ScheduleRepo; +}; + +type RunningSchedule = { + id: string; + job: CronJob; + stop: () => Promise; +}; + +class Scheduler { + #running: RunningSchedule[] = []; + #options: SchedulerOptions; + + constructor(options: SchedulerOptions) { + this.#options = options; + const { schedules } = this.#options; + schedules.on('added', this.#add); + schedules.on('removed', this.#remove); + } + + #remove = async (id: string) => { + const current = this.#running.filter((r) => r.id === id); + await Promise.all(current.map((r) => r.stop())); + this.#running = this.#running.filter((r) => r.id !== id); + }; + + #add = async (id: string) => { + const { schedules, runs } = this.#options; + const current = this.#running.filter((r) => r.id === id); + await Promise.all(current.map((r) => r.stop())); + const schedule = await schedules.get(id); + if (!schedule) { + return; + } + const job = new CronJob(schedule.cron, async () => { + await runs.create({ + loadId: schedule.load, + }); + }); + const stop = async () => { + job.stop(); + }; + this.#running.push({ + id: schedule.id, + job, + stop, + }); + }; + + public stop = async () => { + for (const running of this.#running) { + await running.stop(); + this.#running = this.#running.filter((r) => r !== running); + } + }; + + public start = async () => { + const { schedules } = this.#options; + await this.stop(); + const all = await schedules.find({}); + for (const schedule of all) { + await this.#add(schedule.id); + } + }; +} + +export { Scheduler }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 893d72d..2e9642e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -173,6 +173,9 @@ importers: commander: specifier: ^11.1.0 version: 11.1.0 + cron: + specifier: ^3.1.6 + version: 3.1.6 eventemitter3: specifier: ^5.0.1 version: 5.0.1 @@ -1288,6 +1291,10 @@ packages: '@types/node': 20.10.8 dev: true + /@types/luxon@3.3.8: + resolution: {integrity: sha512-jYvz8UMLDgy3a5SkGJne8H7VA7zPV2Lwohjx0V8V31+SqAjNmurWMkk9cQhfvlcnXWudBpK9xPM1n4rljOcHYQ==} + dev: false + /@types/node@20.10.8: resolution: {integrity: sha512-f8nQs3cLxbAFc00vEU59yf9UyGUftkPaLGfvbVOIDdx2i1b8epBqj2aNGyP19fiyXWvlmZ7qC1XLjAzw/OKIeA==} dependencies: @@ -2083,6 +2090,13 @@ packages: resolution: {integrity: sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==} dev: false + /cron@3.1.6: + resolution: {integrity: sha512-cvFiQCeVzsA+QPM6fhjBtlKGij7tLLISnTSvFxVdnFGLdz+ZdXN37kNe0i2gefmdD17XuZA6n2uPVwzl4FxW/w==} + dependencies: + '@types/luxon': 3.3.8 + luxon: 3.4.4 + dev: false + /cross-spawn@7.0.3: resolution: {integrity: sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==} engines: {node: '>= 8'} @@ -3752,6 +3766,11 @@ packages: dependencies: yallist: 4.0.0 + /luxon@3.4.4: + resolution: {integrity: sha512-zobTr7akeGHnv7eBOXcRgMeCP6+uyYsczwmeRCauvpvaAltgNyTbLH/+VaEAPUeWBT+1GuNmz4wC/6jtQzbbVA==} + engines: {node: '>=12'} + dev: false + /magic-string@0.25.9: resolution: {integrity: sha512-RmF0AsMzgt25qzqqLc1+MbHmhdx0ojF2Fvs4XnOqz2ZOBXzzkEwc/dJQZCYHAn7v1jbVOjAZfK8msRn4BxO4VQ==} dependencies: