This commit is contained in:
Morten Olsen
2024-12-10 20:59:29 +01:00
commit ede2d56b7c
54 changed files with 6955 additions and 0 deletions

2
packages/plaindb/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
/node_modules/
/dist/

View File

@@ -0,0 +1,21 @@
{
"name": "@plaindb/plaindb",
"version": "1.0.0",
"type": "module",
"main": "dist/exports.js",
"files": [
"dist"
],
"scripts": {
"build": "tsc --build"
},
"devDependencies": {
"@types/node": "^22.10.1",
"typescript": "^5.7.2"
},
"dependencies": {
"knex": "^3.1.0",
"nanoid": "^5.0.9",
"zod": "^3.24.0"
}
}

View File

@@ -0,0 +1,56 @@
import knex, { Knex } from 'knex';
type DatabaseMigration = {
name: string;
up: (knex: Knex) => Promise<void>;
down: (knex: Knex) => Promise<void>;
};
type DatabaseGetOptions = {
name: string;
migrations: DatabaseMigration[];
};
type DatabasesOptions = {
getConfig: (name: string) => Knex.Config;
};
class Databases {
#dbs = new Map<string, Knex>();
#options: DatabasesOptions;
constructor(options: DatabasesOptions) {
this.#options = options;
}
public get = async ({ name, migrations }: DatabaseGetOptions) => {
if (!this.#dbs.has(name)) {
const { getConfig } = this.#options;
const migrationSource: Knex.MigrationSource<DatabaseMigration> = {
getMigrations: async () => migrations,
getMigrationName: (migration) => migration.name,
getMigration: async (migration) => migration,
};
const config: Knex.Config = {
...getConfig(name),
migrations: {
migrationSource,
},
};
const db = knex(config);
await db.migrate.latest();
this.#dbs.set(name, db);
}
const value = this.#dbs.get(name);
if (!value) {
throw new Error('Database not found');
}
return value;
};
public close = async () => {
await Promise.all(Array.from(this.#dbs.values()).map((db) => db.destroy()));
};
}
export { Databases, type DatabasesOptions, type DatabaseMigration, type DatabaseGetOptions };

View File

@@ -0,0 +1,52 @@
import { EventEmitter } from '../utils/eventemitter.js';
type DocumentEvent = {
save: (document: Document) => void;
change: (document: Document) => void;
};
type DocumentOptions = {
id: string;
location: string;
data: Buffer;
};
class Document extends EventEmitter<DocumentEvent> {
#id: string;
#location: string;
#data: Buffer;
constructor({ location, data, id }: DocumentOptions) {
super();
this.#id = id;
this.#location = location;
this.#data = data;
}
public get id() {
return this.#id;
}
public get location() {
return this.#location;
}
public get data() {
return this.#data;
}
public set data(data: Buffer) {
this.#data = data;
this.emit('change', this);
}
public replace = (data: Buffer) => {
this.#data = data;
};
public save = async () => {
await this.emit('save', this);
};
}
export { Document };

View File

@@ -0,0 +1,108 @@
import { Knex } from 'knex';
import { nanoid } from 'nanoid';
import { createHash } from 'crypto';
import { Databases } from '../databases/databases.js';
import { FileSystem } from '../filesystem/filesystem.js';
import { EventEmitter } from '../utils/eventemitter.js';
import { Document } from './documents.document.js';
import { migrations } from './migrations/migrations.js';
type DocumentsEvents = {
change: (loctions: string[]) => void;
save: (document: Document) => void;
};
type DocumentsOptions = {
fs: FileSystem;
databases: Databases;
};
class Documents extends EventEmitter<DocumentsEvents> {
#options: DocumentsOptions;
#documents: Map<string, Document>;
#db?: Promise<Knex>;
constructor(options: DocumentsOptions) {
super();
this.#options = options;
this.#documents = new Map();
this.#options.fs.on('changed', this.#onChanges);
}
#onChanges = async (changes: string[]) => {
const db = await this.#getDB();
const actualChanges: string[] = [];
for (const location of changes) {
const current = await db('files').where({ location }).first();
const document = await this.get(location);
const hash = createHash('sha256').update(document.data).digest('hex');
if (!current) {
await db('files').insert({
id: nanoid(),
location,
hash,
updatedAt: new Date(),
});
} else if (current.hash !== hash) {
await db('files').where({ location }).update({
hash,
updatedAt: new Date(),
});
} else {
continue;
}
await this.emit('save', document);
actualChanges.push(location);
}
if (actualChanges.length) {
this.emit('change', actualChanges);
}
};
#getDB = async () => {
if (!this.#db) {
const { databases } = this.#options;
this.#db = databases.get({
name: 'documents',
migrations,
});
}
return this.#db;
};
#onSave = async (document: Document) => {
const { fs } = this.#options;
await this.emit('save', document);
await fs.set(document.location, document.data);
};
public all = async () => {
const db = await this.#getDB();
const files = await db('files').select('*');
return files.map((file: { location: string }) => file.location as string);
};
public get = async (location: string) => {
if (!this.#documents.has(location)) {
const db = await this.#getDB();
const { fs } = this.#options;
const data = await fs.get(location);
const current = await db('files').select('*').where({ location }).first();
const document = new Document({
id: current?.id || nanoid(),
location,
data: data || Buffer.from(''),
});
document.on('save', this.#onSave);
this.#documents.set(location, document);
}
const value = this.#documents.get(location);
if (!value) {
throw new Error('Document not found');
}
return value;
};
}
export { Documents };
export { Document } from './documents.document.js';

View File

@@ -0,0 +1,24 @@
import { DatabaseMigration } from '../../databases/databases.js';
const migrations: DatabaseMigration[] = [
{
name: 'init',
up: async (knex) => {
await knex.schema.createTable('files', (table) => {
table.string('id').primary();
table.string('location').notNullable();
table.string('hash').notNullable();
table.dateTime('updatedAt').notNullable();
table.dateTime('syncedAt').nullable();
table.unique('location');
table.index('updatedAt');
table.index('hash');
});
},
down: async (knex) => {
await knex.schema.dropTable('files');
},
},
];
export { migrations };

View File

@@ -0,0 +1,10 @@
export { PlainDB } from './plaindb/plaindb.js';
export { FileSystem } from './filesystem/filesystem.js';
export { Documents } from './documents/documents.js';
export { Document } from './documents/documents.document.js';
export { createActionApiRoute } from './plugins/plugin/plugin.api.js';
export { Plugins } from './plugins/plugins.js';
export { Plugin } from './plugins/plugin/plugin.js';
export { Databases, DatabaseMigration } from './databases/databases.js';
export * from 'zod';
export { type Knex as Database } from 'knex';

View File

@@ -0,0 +1,12 @@
import { EventEmitter } from '../utils/eventemitter.js';
type FileSystemEvents = {
changed: (locations: string[]) => void;
};
abstract class FileSystem extends EventEmitter<FileSystemEvents> {
abstract get: (location: string) => Promise<Buffer | undefined>;
abstract set: (location: string, data: Buffer) => Promise<void>;
}
export { FileSystem };

View File

@@ -0,0 +1,50 @@
import { Knex } from 'knex';
import { Documents } from '../documents/documents.js';
import { FileSystem } from '../filesystem/filesystem.js';
import { Plugins } from '../plugins/plugins.js';
import { Databases } from '../databases/databases.js';
type PlainDBOptions = {
fs: FileSystem;
getDBConfig: (name: string) => Knex.Config;
};
class PlainDB {
#options: PlainDBOptions;
#documents: Documents;
#plugins: Plugins;
#databases: Databases;
constructor(options: PlainDBOptions) {
this.#options = options;
this.#databases = new Databases({
getConfig: this.#options.getDBConfig,
});
this.#documents = new Documents({
fs: this.#options.fs,
databases: this.#databases,
});
this.#plugins = new Plugins({
documents: this.#documents,
databases: this.#databases,
});
}
public get documents() {
return this.#documents;
}
public get plugins() {
return this.#plugins;
}
public get databases() {
return this.#databases;
}
public close = async () => {
await this.#databases.close();
};
}
export { PlainDB };

View File

@@ -0,0 +1,18 @@
import { z, ZodSchema } from 'zod';
type PluginActionApi = Record<
string,
{
input?: ZodSchema;
output?: ZodSchema;
handle?: (input: any) => Promise<any>;
}
>;
const createActionApiRoute = <TInput extends ZodSchema = ZodSchema, TOutput extends ZodSchema = ZodSchema>(options: {
input?: TInput;
output?: TOutput;
handle?: (input: z.infer<TInput>) => Promise<z.infer<TOutput>>;
}) => options satisfies PluginActionApi[string];
export { type PluginActionApi, createActionApiRoute };

View File

@@ -0,0 +1,108 @@
import { Document, Documents } from '../../documents/documents.js';
import { DatabaseMigration, Databases } from '../../databases/databases.js';
import { EventEmitter } from '../../utils/eventemitter.js';
import { Plugins } from '../plugins.js';
import { z, ZodSchema } from 'zod';
import { PluginActionApi } from './plugin.api.js';
type PluginOptions<TLocalConfig extends ZodSchema = ZodSchema, TSharedConfig extends ZodSchema = ZodSchema> = {
plugins: Plugins;
documents: Documents;
databases: Databases;
configs: {
local?: TLocalConfig;
shared?: TSharedConfig;
};
};
type PluginEvents = {
configChange: (config: unknown) => void;
};
abstract class Plugin<
TLocalConfig extends ZodSchema = ZodSchema,
TSharedConfig extends ZodSchema = ZodSchema,
TActions extends PluginActionApi = PluginActionApi,
> extends EventEmitter<PluginEvents> {
#options: PluginOptions<TLocalConfig, TSharedConfig>;
constructor(options: PluginOptions<TLocalConfig, TSharedConfig>) {
super();
this.#options = options;
}
public get documents(): Documents {
return this.#options.documents;
}
public readonly configSchemas?: {
local?: TLocalConfig;
shared?: TSharedConfig;
};
public getDB = async (name: string, migrations: DatabaseMigration[]) => {
const { databases } = this.#options;
const scopedName = `plugins:${this.name}:${name}`;
return databases.get({ name: scopedName, migrations });
};
public get configs(): {
local?: z.infer<TLocalConfig>;
shared?: z.infer<TSharedConfig>;
} {
return this.#options.configs;
}
public setConfigs = async (configs: { local?: z.infer<TLocalConfig>; shared?: z.infer<TSharedConfig> }) => {
this.#options.configs = configs;
await this.emit('configChange', configs);
};
public abstract readonly name: string;
public actions?: TActions;
public onLoad?: () => Promise<void>;
public onUnload?: () => Promise<void>;
public onLoaded?: () => Promise<void>;
public process?: (document: Document) => Promise<void>;
/*public getPlugin = async <T extends Plugin>(plugin: new (...args: any) => T): Promise<
T['api'] extends (...args: any[]) => infer R ? R : never
> => {
const { plugins } = this.#options;
const instance = await plugins.get(plugin);
return instance.api?.() as any;
}*/
public action = async <TPlugin extends Plugin<any, any, any>, TAction extends keyof TPlugin['actions']>(
plugin: new (...args: any[]) => TPlugin,
action: TAction,
input: Exclude<Exclude<TPlugin['actions'], undefined>[TAction]['input'], undefined> extends ZodSchema
? z.infer<Exclude<Exclude<TPlugin['actions'], undefined>[TAction]['input'], undefined>>
: undefined,
): Promise<
Exclude<Exclude<TPlugin['actions'], undefined>[TAction]['output'], undefined> extends ZodSchema
? z.infer<Exclude<Exclude<TPlugin['actions'], undefined>[TAction]['output'], undefined>>
: undefined
> => {
const { plugins } = this.#options;
const instance = await plugins.get(plugin);
const { actions } = instance;
if (!actions) {
throw new Error(`Plugin ${plugin.name} does not have actions`);
}
const actionDef = actions[action];
if (!actionDef) {
throw new Error(`Plugin ${plugin.name} does not have action ${String(action)}`);
}
actionDef.input?.parse(input);
return (await actionDef.handle?.(input)) as any;
};
}
type PluginConstructor<
TLocalConfig extends ZodSchema = ZodSchema,
TSharedConfig extends ZodSchema = ZodSchema,
T extends Plugin = Plugin,
> = new (options: PluginOptions<TLocalConfig, TSharedConfig>) => T;
export { Plugin, type PluginOptions, type PluginConstructor };

View File

@@ -0,0 +1,108 @@
import { Document } from '../documents/documents.document.js';
import { Documents } from '../documents/documents.js';
import { Databases } from '../databases/databases.js';
import { Plugin, PluginConstructor } from './plugin/plugin.js';
import { z, ZodSchema } from 'zod';
type PluginsOptions = {
documents: Documents;
databases: Databases;
};
class Plugins {
#options: PluginsOptions;
#plugins: Map<PluginConstructor, Plugin>;
constructor(options: PluginsOptions) {
this.#options = options;
this.#plugins = new Map();
options.documents.on('save', this.#onSave);
}
#onSave = async (document: Document) => {
for (const plugin of this.#plugins.values()) {
await plugin.process?.(document);
}
};
#load = async (plugins: Plugin[]) => {
await Promise.all(plugins.map((plugin) => plugin.onLoad?.()));
plugins.forEach((plugin) => plugin.onLoaded?.());
};
#saveConfig = async (plugin: Plugin) => {
const document = await this.#options.documents.get(`.db/plugins/${plugin.name}/config.json`);
document.data = Buffer.from(JSON.stringify(plugin.configs));
await document.save();
};
public get = async <T extends Plugin>(plugin: PluginConstructor<any, any, T>): Promise<T> => {
if (!this.#plugins.has(plugin)) {
await this.add([plugin]);
}
return this.#plugins.get(plugin) as T;
};
public add = async (plugins: PluginConstructor[]) => {
const { documents, databases } = this.#options;
const configs = await Promise.all(
plugins.map(async (plugin) => {
const document = await documents.get(`.db/plugins/${plugin.name}/config.json`);
return JSON.parse(document.data.toString() || '{}');
}),
);
const instances = plugins.map(
(Plugin, i) =>
new Plugin({
plugins: this,
documents,
databases,
configs: configs[i],
}),
);
await this.#load(instances);
for (let i = 0; i < plugins.length; i++) {
const instance = instances[i];
const plugin = plugins[i];
instance.on('configChange', this.#saveConfig.bind(null, instance));
this.#plugins.set(plugin, instance);
}
};
public process = async (document: Document) => {
for (const plugin of this.#plugins.values()) {
await plugin.process?.(document);
}
};
public unload = async () => {
await Promise.all(this.#plugins.values().map((plugin) => plugin.onUnload?.()));
this.#plugins = new Map();
};
public action = async <TPlugin extends Plugin<any, any, any>, TAction extends keyof TPlugin['actions']>(
plugin: new (...args: any[]) => TPlugin,
action: TAction,
input: Exclude<Exclude<TPlugin['actions'], undefined>[TAction]['input'], undefined> extends ZodSchema
? z.infer<Exclude<Exclude<TPlugin['actions'], undefined>[TAction]['input'], undefined>>
: undefined,
): Promise<
Exclude<Exclude<TPlugin['actions'], undefined>[TAction]['output'], undefined> extends ZodSchema
? z.infer<Exclude<Exclude<TPlugin['actions'], undefined>[TAction]['output'], undefined>>
: undefined
> => {
const instance = await this.get(plugin);
const { actions } = instance;
if (!actions) {
throw new Error(`Plugin ${plugin.name} does not have actions`);
}
const actionDef = actions[action];
if (!actionDef) {
throw new Error(`Plugin ${plugin.name} does not have action ${String(action)}`);
}
actionDef.input?.parse(input);
return actionDef.handle?.(input) as any;
};
}
export { Plugins };

View File

@@ -0,0 +1,28 @@
class EventEmitter<TEvents extends Record<string, (...args: any[]) => any>> {
#listeners: Record<keyof TEvents, TEvents[keyof TEvents][]> = {} as any;
public on = <K extends keyof TEvents>(event: K, listener: TEvents[K]) => {
if (!this.#listeners[event]) {
this.#listeners[event] = [];
}
this.#listeners[event].push(listener);
return () => this.off(event, listener);
};
public off = <K extends keyof TEvents>(event: K, listener: TEvents[K]) => {
if (!this.#listeners[event]) {
return;
}
this.#listeners[event] = this.#listeners[event].filter((l) => l !== listener);
};
public emit = async <K extends keyof TEvents>(event: K, ...args: Parameters<TEvents[K]>) => {
if (!this.#listeners[event]) {
return;
}
const actions = this.#listeners[event].map((listener) => listener(...args));
await Promise.all(actions);
};
}
export { EventEmitter };

View File

@@ -0,0 +1,9 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "dist"
},
"include": [
"src"
]
}