From 5f2af054fc84617d2fa19ef9cd120379dff2c248 Mon Sep 17 00:00:00 2001 From: qcqcqc <1220204124@zust.edu.cn> Date: Tue, 30 Dec 2025 15:46:11 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E6=A0=B9=E6=8D=AE?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6=E8=BF=9B=E8=A1=8CDbStore?= =?UTF-8?q?=E7=9A=84=E8=87=AA=E5=8A=A8=E9=80=89=E6=8B=A9=EF=BC=8C=E7=8E=B0?= =?UTF-8?q?=E6=94=AF=E6=8C=81mysql=E4=B8=8Epostgres?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/AppLoader.d.ts | 13 +- lib/AppLoader.js | 10 +- lib/DbStore.d.ts | 22 +-- lib/DbStore.js | 235 ++++++++++++------------- lib/Synchronizer.d.ts | 2 +- lib/Synchronizer.js | 2 +- lib/cluster/env.js | 3 +- lib/routines/i18n.js | 5 +- lib/utils/dbPriority.d.ts | 16 ++ lib/utils/dbPriority.js | 37 ++++ lib/utils/requirePrj.js | 2 +- src/AppLoader.ts | 21 ++- src/DbStore.ts | 349 +++++++++++++++++++++----------------- src/utils/dbPriority.ts | 43 +++++ 14 files changed, 443 insertions(+), 317 deletions(-) create mode 100644 lib/utils/dbPriority.d.ts create mode 100644 lib/utils/dbPriority.js create mode 100644 src/utils/dbPriority.ts diff --git a/lib/AppLoader.d.ts b/lib/AppLoader.d.ts index 9432c53..384f8cd 100644 --- a/lib/AppLoader.d.ts +++ b/lib/AppLoader.d.ts @@ -1,7 +1,5 @@ -/// import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { AppLoader as GeneralAppLoader, Trigger, EntityDict, Watcher, OpRecord, FreeTimer, OperationResult } from "oak-domain/lib/types"; -import { DbStore } from "./DbStore"; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { IncomingHttpHeaders, IncomingMessage } from 'http'; import { Namespace } from 'socket.io'; @@ -9,13 +7,14 @@ import DataSubscriber from './cluster/DataSubscriber'; import Synchronizer from './Synchronizer'; import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore'; import { InternalErrorHandler } from './types'; +import { AppDbStore } from './DbStore'; export declare class AppLoader> extends GeneralAppLoader { - protected dbStore: DbStore; + protected dbStore: AppDbStore; private aspectDict; private externalDependencies; protected dataSubscriber?: DataSubscriber; protected synchronizer?: Synchronizer; - protected contextBuilder: (store: DbStore) => Cxt; + protected contextBuilder: (store: AppDbStore) => Cxt; private nsSocket?; private watcherTimerId?; private scheduledJobs; @@ -48,11 +47,11 @@ export declare class AppLoader; initialize(ifExists?: 'drop' | 'omit' | 'dropIfNotStatic'): Promise; - getStore(): DbStore; + getStore(): AppDbStore; getEndpoints(prefix: string): [string, "post" | "get" | "put" | "delete", string, (params: Record, headers: IncomingHttpHeaders, req: IncomingMessage, body?: any) => Promise<{ - headers?: Record | undefined; + headers?: Record; data: any; - statusCode?: number | undefined; + statusCode?: number; }>][]; protected operateInWatcher(entity: T, operation: ED[T]['Update'], context: Cxt, singleton?: true): Promise>; protected selectInWatcher(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true): Promise[]>; diff --git a/lib/AppLoader.js b/lib/AppLoader.js index 0630f76..0973fe0 100644 --- a/lib/AppLoader.js +++ b/lib/AppLoader.js @@ -9,7 +9,6 @@ const IntrinsicLogics_1 = require("oak-domain/lib/store/IntrinsicLogics"); const lodash_1 = require("oak-domain/lib/utils/lodash"); const uuid_1 = require("oak-domain/lib/utils/uuid"); const types_1 = require("oak-domain/lib/types"); -const DbStore_1 = require("./DbStore"); const index_1 = tslib_1.__importStar(require("oak-common-aspect/lib/index")); const assert_1 = tslib_1.__importDefault(require("assert")); const dependencyBuilder_1 = require("oak-domain/lib/compiler/dependencyBuilder"); @@ -18,6 +17,8 @@ const env_1 = require("./cluster/env"); const Synchronizer_1 = tslib_1.__importDefault(require("./Synchronizer")); const i18n_1 = tslib_1.__importDefault(require("oak-domain/lib/data/i18n")); const requirePrj_1 = tslib_1.__importDefault(require("./utils/requirePrj")); +const dbPriority_1 = require("./utils/dbPriority"); +const DbStore_1 = require("./DbStore"); class AppLoader extends types_1.AppLoader { dbStore; aspectDict; @@ -95,8 +96,7 @@ class AppLoader extends types_1.AppLoader { * 后台启动的configuration,统一放在这里读取 */ getConfiguration() { - const dbConfigFile = (0, path_1.join)(this.path, 'configuration', 'mysql.json'); - const dbConfig = require(dbConfigFile); + const dbConfig = (0, dbPriority_1.getDbConfig)(this.path); const syncConfigFile = (0, path_1.join)(this.path, 'lib', 'configuration', 'sync.js'); const syncConfigs = (0, fs_1.existsSync)(syncConfigFile) && require(syncConfigFile).default; return { @@ -112,7 +112,7 @@ class AppLoader extends types_1.AppLoader { this.externalDependencies = depGraph.ascOrder; const { authDeduceRelationMap, selectFreeEntities, updateFreeDict } = this.requireSth('lib/configuration/relation'); this.aspectDict = Object.assign({}, index_1.default, this.requireSth('lib/aspects/index')); - this.dbStore = new DbStore_1.DbStore(storageSchema, () => this.contextBuilder(this.dbStore), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict); + this.dbStore = (0, DbStore_1.createDbStore)(storageSchema, () => this.contextBuilder(this.dbStore), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict); if (nsSubscribe) { this.dataSubscriber = new DataSubscriber_1.default(nsSubscribe, nsServer); } @@ -290,7 +290,7 @@ class AppLoader extends types_1.AppLoader { } } } - await this.dbStore.disconnect(); + // await this.dbStore.disconnect(); // 不需要马上断开连接,在initialize后可能还会有操作,unmount时会断开 } getStore() { return this.dbStore; diff --git a/lib/DbStore.d.ts b/lib/DbStore.d.ts index 67e3a0e..17b11a0 100644 --- a/lib/DbStore.d.ts +++ b/lib/DbStore.d.ts @@ -1,22 +1,16 @@ -import { MysqlStore, MySqlSelectOption, MysqlOperateOption } from 'oak-db'; -import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types'; +import { DbConfiguration } from 'oak-db/src/types/configuration'; +import { EntityDict, StorageSchema, Trigger, Checker, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; -import { MySQLConfiguration } from 'oak-db/lib/MySQL/types/Configuration'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; -import { AsyncRowStore, AsyncContext } from 'oak-domain/lib/store/AsyncRowStore'; -export declare class DbStore> extends MysqlStore implements AsyncRowStore { - private executor; - private relationAuth; - constructor(storageSchema: StorageSchema, contextBuilder: () => Cxt, mysqlConfiguration: MySQLConfiguration, authDeduceRelationMap: AuthDeduceRelationMap, selectFreeEntities?: SelectFreeEntities, updateFreeDict?: UpdateFreeDict, onVolatileTrigger?: (entity: T, trigger: VolatileTrigger, ids: string[], cxtStr: string, option: OperateOption) => Promise); - checkRelationAsync>(entity: T, operation: Omit, context: Cxt): Promise; - protected cascadeUpdateAsync(entity: T, operation: ED[T]['Operation'], context: AsyncContext, option: MysqlOperateOption): Promise>; - operate(entity: T, operation: ED[T]['Operation'], context: Cxt, option: MysqlOperateOption): Promise>; - select(entity: T, selection: ED[T]['Selection'], context: Cxt, option: MySqlSelectOption): Promise[]>; - count(entity: T, selection: Pick, context: Cxt, option: SelectOption): Promise; +import { CascadeStore } from 'oak-domain/lib/store/CascadeStore'; +import { DbStore } from 'oak-db/lib/types/dbStore'; +export type TriggerStore> = { registerTrigger(trigger: Trigger): void; registerChecker(checker: Checker): void; setOnVolatileTrigger(onVolatileTrigger: (entity: T, trigger: VolatileTrigger, ids: string[], cxtStr: string, option: OperateOption) => Promise): void; execVolatileTrigger(entity: T, name: string, ids: string[], context: Cxt, option: OperateOption): Promise; checkpoint(ts: number): Promise; independentCheckPoint(name: string, ts: number, instanceCount?: number, instanceId?: number): Promise; -} +}; +export type AppDbStore> = DbStore & CascadeStore & TriggerStore; +export declare const createDbStore: >(storageSchema: StorageSchema, contextBuilder: () => Cxt, dbConfiguration: DbConfiguration, authDeduceRelationMap: AuthDeduceRelationMap, selectFreeEntities?: SelectFreeEntities, updateFreeDict?: UpdateFreeDict, onVolatileTrigger?: (entity: T, trigger: VolatileTrigger, ids: string[], cxtStr: string, option: OperateOption) => Promise) => AppDbStore; diff --git a/lib/DbStore.js b/lib/DbStore.js index 058172f..f17353d 100644 --- a/lib/DbStore.js +++ b/lib/DbStore.js @@ -1,128 +1,133 @@ "use strict"; Object.defineProperty(exports, "__esModule", { value: true }); -exports.DbStore = void 0; -const oak_db_1 = require("oak-db"); +exports.createDbStore = void 0; const TriggerExecutor_1 = require("oak-domain/lib/store/TriggerExecutor"); const RelationAuth_1 = require("oak-domain/lib/store/RelationAuth"); -class DbStore extends oak_db_1.MysqlStore { - executor; - relationAuth; - constructor(storageSchema, contextBuilder, mysqlConfiguration, authDeduceRelationMap, selectFreeEntities = [], updateFreeDict = {}, onVolatileTrigger) { - super(storageSchema, mysqlConfiguration); - this.executor = new TriggerExecutor_1.TriggerExecutor(contextBuilder, undefined, onVolatileTrigger); - this.relationAuth = new RelationAuth_1.RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict); - } - checkRelationAsync(entity, operation, context) { - return this.relationAuth.checkRelationAsync(entity, operation, context); - } - async cascadeUpdateAsync(entity, operation, context, option) { - // 如果是在modi处理过程中,所有的trigger也可以延时到apply时再处理(这时候因为modi中的数据并不实际存在,处理会有问题) - if (!option.blockTrigger) { - await this.executor.preOperation(entity, operation, context, option); +const dbPriority_1 = require("./utils/dbPriority"); +const createDbStore = (storageSchema, contextBuilder, dbConfiguration, authDeduceRelationMap, selectFreeEntities = [], updateFreeDict = {}, onVolatileTrigger) => { + const BaseStoreClass = (0, dbPriority_1.getDbStoreClass)(dbConfiguration); + // 动态创建继承类 + class DynamicDbStore extends BaseStoreClass { + executor; + relationAuth; + constructor() { + super(storageSchema, dbConfiguration); + this.executor = new TriggerExecutor_1.TriggerExecutor(contextBuilder, undefined, onVolatileTrigger); + this.relationAuth = new RelationAuth_1.RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict); } - const result = await super.cascadeUpdateAsync(entity, operation, context, option); - if (!option.blockTrigger) { - await this.executor.postOperation(entity, operation, context, option); + checkRelationAsync(entity, operation, context) { + return this.relationAuth.checkRelationAsync(entity, operation, context); } - return result; - } - async operate(entity, operation, context, option) { - const autoCommit = !context.getCurrentTxnId(); - let result; - if (autoCommit) { - await context.begin(); - } - try { - await this.relationAuth.checkRelationAsync(entity, operation, context); - result = await super.operate(entity, operation, context, option); - } - catch (err) { - await context.rollback(); - throw err; - } - if (autoCommit) { - await context.commit(); - } - return result; - } - async select(entity, selection, context, option) { - const autoCommit = !context.getCurrentTxnId(); - if (autoCommit) { - await context.begin(); - } - let result; - // select的trigger应加在根select之前,cascade的select不加处理 - Object.assign(selection, { - action: 'select', - }); - if (!option.blockTrigger) { - await this.executor.preOperation(entity, selection, context, option); - } - if (!option.dontCollect) { - await this.relationAuth.checkRelationAsync(entity, selection, context); - } - try { - result = await super.select(entity, selection, context, option); + async cascadeUpdateAsync(entity, operation, context, option) { + // 如果是在modi处理过程中,所有的trigger也可以延时到apply时再处理(这时候因为modi中的数据并不实际存在,处理会有问题) if (!option.blockTrigger) { - await this.executor.postOperation(entity, selection, context, option, result); + await this.executor.preOperation(entity, operation, context, option); } - } - catch (err) { - await context.rollback(); - throw err; - } - if (autoCommit) { - await context.commit(); - } - return result; - } - async count(entity, selection, context, option) { - const autoCommit = !context.getCurrentTxnId(); - let result; - if (autoCommit) { - await context.begin(); - } - try { - // count不用检查权限,因为检查权限中本身要用到count - // const selection2 = Object.assign({ - // action: 'select', - // }, selection) as ED[T]['Operation']; - // await this.relationAuth.checkRelationAsync(entity, selection2, context); - // if (!option.blockTrigger) { - // await this.executor.preOperation(entity, selection2, context, option); - // } - result = await super.count(entity, selection, context, option); - /* count应该不存在后trigger吧 + const result = await super.cascadeUpdateAsync(entity, operation, context, option); if (!option.blockTrigger) { - await this.executor.postOperation(entity, selection2, context, option); - } */ + await this.executor.postOperation(entity, operation, context, option); + } + return result; } - catch (err) { - await context.rollback(); - throw err; + async operate(entity, operation, context, option) { + const autoCommit = !context.getCurrentTxnId(); + let result; + if (autoCommit) { + await context.begin(); + } + try { + await this.relationAuth.checkRelationAsync(entity, operation, context); + result = await super.operate(entity, operation, context, option); + } + catch (err) { + await context.rollback(); + throw err; + } + if (autoCommit) { + await context.commit(); + } + return result; } - if (autoCommit) { - await context.commit(); + async select(entity, selection, context, option) { + const autoCommit = !context.getCurrentTxnId(); + if (autoCommit) { + await context.begin(); + } + let result; + // select的trigger应加在根select之前,cascade的select不加处理 + Object.assign(selection, { + action: 'select', + }); + if (!option.blockTrigger) { + await this.executor.preOperation(entity, selection, context, option); + } + if (!option.dontCollect) { + await this.relationAuth.checkRelationAsync(entity, selection, context); + } + try { + result = await super.select(entity, selection, context, option); + if (!option.blockTrigger) { + await this.executor.postOperation(entity, selection, context, option, result); + } + } + catch (err) { + await context.rollback(); + throw err; + } + if (autoCommit) { + await context.commit(); + } + return result; + } + async count(entity, selection, context, option) { + const autoCommit = !context.getCurrentTxnId(); + let result; + if (autoCommit) { + await context.begin(); + } + try { + // count不用检查权限,因为检查权限中本身要用到count + // const selection2 = Object.assign({ + // action: 'select', + // }, selection) as ED[T]['Operation']; + // await this.relationAuth.checkRelationAsync(entity, selection2, context); + // if (!option.blockTrigger) { + // await this.executor.preOperation(entity, selection2, context, option); + // } + result = await super.count(entity, selection, context, option); + /* count应该不存在后trigger吧 + if (!option.blockTrigger) { + await this.executor.postOperation(entity, selection2, context, option); + } */ + } + catch (err) { + await context.rollback(); + throw err; + } + if (autoCommit) { + await context.commit(); + } + return result; + } + registerTrigger(trigger) { + this.executor.registerTrigger(trigger); + } + registerChecker(checker) { + this.executor.registerChecker(checker, this.getSchema()); + } + setOnVolatileTrigger(onVolatileTrigger) { + this.executor.setOnVolatileTrigger(onVolatileTrigger); + } + async execVolatileTrigger(entity, name, ids, context, option) { + return this.executor.execVolatileTrigger(entity, name, ids, context, option); + } + checkpoint(ts) { + return this.executor.checkpoint(ts); + } + independentCheckPoint(name, ts, instanceCount, instanceId) { + return this.executor.independentCheckPoint(name, ts, instanceCount, instanceId); } - return result; } - registerTrigger(trigger) { - this.executor.registerTrigger(trigger); - } - registerChecker(checker) { - this.executor.registerChecker(checker, this.getSchema()); - } - setOnVolatileTrigger(onVolatileTrigger) { - this.executor.setOnVolatileTrigger(onVolatileTrigger); - } - async execVolatileTrigger(entity, name, ids, context, option) { - return this.executor.execVolatileTrigger(entity, name, ids, context, option); - } - checkpoint(ts) { - return this.executor.checkpoint(ts); - } - independentCheckPoint(name, ts, instanceCount, instanceId) { - return this.executor.independentCheckPoint(name, ts, instanceCount, instanceId); - } -} -exports.DbStore = DbStore; + return new DynamicDbStore(); +}; +exports.createDbStore = createDbStore; diff --git a/lib/Synchronizer.d.ts b/lib/Synchronizer.d.ts index 2283d42..eca40d7 100644 --- a/lib/Synchronizer.d.ts +++ b/lib/Synchronizer.d.ts @@ -27,7 +27,7 @@ export default class Synchronizer[]; + getSyncTriggers(): Array>; getSelfEndpoint(): EndpointItem; tryCreateSyncProcess(): void; } diff --git a/lib/Synchronizer.js b/lib/Synchronizer.js index 99d48fe..ed4fecf 100644 --- a/lib/Synchronizer.js +++ b/lib/Synchronizer.js @@ -114,7 +114,7 @@ class Synchronizer { remoteEntityId: entityId, data: queue.map((ele) => ({ entity: ele.oper.targetEntity, - rowIds: ele.oper.filter.id.$in, + rowIds: ele.oper.filter.id.$in, // 暂时应该没什么用 action: ele.oper.action, data: ele.oper.data, })), diff --git a/lib/cluster/env.js b/lib/cluster/env.js index 63fef08..7503829 100644 --- a/lib/cluster/env.js +++ b/lib/cluster/env.js @@ -1,6 +1,6 @@ "use strict"; Object.defineProperty(exports, "__esModule", { value: true }); -exports.getClusterInfo = void 0; +exports.getClusterInfo = getClusterInfo; function getProcessEnvOption(option) { if (process.env.hasOwnProperty(option)) { return process.env[option]; @@ -54,4 +54,3 @@ const MyClusterInfo = initialize(); function getClusterInfo() { return MyClusterInfo; } -exports.getClusterInfo = getClusterInfo; diff --git a/lib/routines/i18n.js b/lib/routines/i18n.js index e396e0c..6356d35 100644 --- a/lib/routines/i18n.js +++ b/lib/routines/i18n.js @@ -1,6 +1,7 @@ "use strict"; Object.defineProperty(exports, "__esModule", { value: true }); -exports.checkAndUpdateI18n = exports.checkI18n = void 0; +exports.checkI18n = checkI18n; +exports.checkAndUpdateI18n = checkAndUpdateI18n; const tslib_1 = require("tslib"); const node_path_1 = require("node:path"); const requirePrj_1 = tslib_1.__importDefault(require("../utils/requirePrj")); @@ -66,7 +67,6 @@ async function checkAndUpdateI18nInner(context, onlyCheck) { function checkI18n(context) { return checkAndUpdateI18nInner(context, true); } -exports.checkI18n = checkI18n; /** * 检查项目目录下的i18n数据和数据库中的差异,并更新 * @param context @@ -75,4 +75,3 @@ exports.checkI18n = checkI18n; function checkAndUpdateI18n(context) { return checkAndUpdateI18nInner(context); } -exports.checkAndUpdateI18n = checkAndUpdateI18n; diff --git a/lib/utils/dbPriority.d.ts b/lib/utils/dbPriority.d.ts new file mode 100644 index 0000000..e417dad --- /dev/null +++ b/lib/utils/dbPriority.d.ts @@ -0,0 +1,16 @@ +import { MysqlStore, PostgreSQLStore } from "oak-db"; +import { DbConfiguration } from "oak-db/src/types/configuration"; +import { AsyncRowStore } from "oak-domain/lib/store/AsyncRowStore"; +import { EntityDict, StorageSchema } from 'oak-domain/lib/types'; +import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; +import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; +import { CascadeStore } from "oak-domain/lib/store/CascadeStore"; +/** + * 数据库优先级列表,按顺序尝试获取配置文件 + */ +export declare const dbList: { + mysql: typeof MysqlStore; + postgres: typeof PostgreSQLStore; +}; +export declare const getDbConfig: (path: string) => DbConfiguration; +export declare const getDbStoreClass: >(dbConfig: DbConfiguration) => new (schema: StorageSchema, config: DbConfiguration) => AsyncRowStore & CascadeStore; diff --git a/lib/utils/dbPriority.js b/lib/utils/dbPriority.js new file mode 100644 index 0000000..bcf2bad --- /dev/null +++ b/lib/utils/dbPriority.js @@ -0,0 +1,37 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.getDbStoreClass = exports.getDbConfig = exports.dbList = void 0; +const oak_db_1 = require("oak-db"); +const path_1 = require("path"); +/** + * 数据库优先级列表,按顺序尝试获取配置文件 + */ +exports.dbList = { + mysql: oak_db_1.MysqlStore, + postgres: oak_db_1.PostgreSQLStore +}; +const getDbConfig = (path) => { + for (const db of Object.keys(exports.dbList)) { + try { + // eslint-disable-next-line @typescript-eslint/no-var-requires + const dbConfigFile = (0, path_1.join)(path, 'configuration', `${db}.json`); + const config = require(dbConfigFile); + console.log(`使用${db}作为数据库`); + return Object.assign({}, { type: db }, config); + } + catch (err) { + // do nothing + } + } + throw new Error(`没有找到数据库配置文件,请在configuration目录下添加任一配置文件:${Object.keys(exports.dbList).map(ele => `${ele}.json`).join('、')}`); +}; +exports.getDbConfig = getDbConfig; +const getDbStoreClass = (dbConfig) => { + const dbType = dbConfig.type || 'mysql'; + const DbStoreClass = exports.dbList[dbType.toLowerCase()]; + if (!DbStoreClass) { + throw new Error(`不支持的数据库类型:${dbType},请确认是否存在以下配置文件:${Object.keys(exports.dbList).map(ele => `${ele}.json`).join('、')}`); + } + return DbStoreClass; +}; +exports.getDbStoreClass = getDbStoreClass; diff --git a/lib/utils/requirePrj.js b/lib/utils/requirePrj.js index df6a041..fb4a96a 100644 --- a/lib/utils/requirePrj.js +++ b/lib/utils/requirePrj.js @@ -1,5 +1,6 @@ "use strict"; Object.defineProperty(exports, "__esModule", { value: true }); +exports.default = requireSth; const fs_1 = require("fs"); const lodash_1 = require("oak-domain/lib/utils/lodash"); const path_1 = require("path"); @@ -20,4 +21,3 @@ function requireSth(prjPath, filePath, dependencies) { } return (0, lodash_1.mergeConcatMany)(sthExternal); } -exports.default = requireSth; diff --git a/src/AppLoader.ts b/src/AppLoader.ts index 34ff51d..67b06d0 100644 --- a/src/AppLoader.ts +++ b/src/AppLoader.ts @@ -6,9 +6,7 @@ import { cloneDeep, mergeConcatMany, omit } from 'oak-domain/lib/utils/lodash'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { generateNewIdAsync } from 'oak-domain/lib/utils/uuid'; import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, CreateOpResult, SyncConfig, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord, Routine, FreeRoutine, Timer, FreeTimer, StorageSchema, OperationResult, OakPartialSuccess, OakException } from "oak-domain/lib/types"; -import { DbStore } from "./DbStore"; import generalAspectDict, { clearPorts, registerPorts } from 'oak-common-aspect/lib/index'; -import { MySQLConfiguration } from 'oak-db/lib/MySQL/types/Configuration'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { Endpoint, EndpointItem } from 'oak-domain/lib/types/Endpoint'; import assert from 'assert'; @@ -23,14 +21,16 @@ import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore'; import domainI18nData from 'oak-domain/lib/data/i18n'; import requireSth from './utils/requirePrj'; import { InternalErrorHandler, InternalErrorType } from './types'; +import { getDbConfig } from './utils/dbPriority'; +import { createDbStore, AppDbStore } from './DbStore'; export class AppLoader> extends GeneralAppLoader { - protected dbStore: DbStore; + protected dbStore: AppDbStore; private aspectDict: Record>; private externalDependencies: string[]; protected dataSubscriber?: DataSubscriber; protected synchronizer?: Synchronizer; - protected contextBuilder: (store: DbStore) => Cxt; + protected contextBuilder: (store: AppDbStore) => Cxt; private nsSocket?: Namespace; private watcherTimerId?: NodeJS.Timeout; private scheduledJobs: Record = {}; @@ -109,13 +109,12 @@ export class AppLoader | undefined, }; } @@ -128,7 +127,7 @@ export class AppLoader( + this.dbStore = createDbStore( storageSchema, () => this.contextBuilder(this.dbStore), dbConfig, @@ -146,7 +145,7 @@ export class AppLoader) { + constructor(store: AppDbStore) { super(store); this.clusterInfo = getClusterInfo(); } @@ -354,10 +353,10 @@ export class AppLoader { + getStore(): AppDbStore { return this.dbStore; } diff --git a/src/DbStore.ts b/src/DbStore.ts index acd2eda..64bdc88 100644 --- a/src/DbStore.ts +++ b/src/DbStore.ts @@ -1,151 +1,18 @@ -import { MysqlStore, MySqlSelectOption, MysqlOperateOption } from 'oak-db'; +import { DbConfiguration } from 'oak-db/src/types/configuration'; +import { MySqlSelectOption, MysqlOperateOption } from 'oak-db'; import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { TriggerExecutor } from 'oak-domain/lib/store/TriggerExecutor'; -import { MySQLConfiguration, } from 'oak-db/lib/MySQL/types/Configuration'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; -import { AsyncRowStore, AsyncContext } from 'oak-domain/lib/store/AsyncRowStore'; +import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore'; import { RelationAuth } from 'oak-domain/lib/store/RelationAuth'; +import { getDbStoreClass } from './utils/dbPriority'; +import { CascadeStore } from 'oak-domain/lib/store/CascadeStore'; +import { DbStore } from 'oak-db/lib/types/dbStore'; - -export class DbStore> extends MysqlStore implements AsyncRowStore { - private executor: TriggerExecutor; - private relationAuth: RelationAuth; - - constructor( - storageSchema: StorageSchema, - contextBuilder: () => Cxt, - mysqlConfiguration: MySQLConfiguration, - authDeduceRelationMap: AuthDeduceRelationMap, - selectFreeEntities: SelectFreeEntities = [], - updateFreeDict: UpdateFreeDict = {}, - onVolatileTrigger?: (entity: T, trigger: VolatileTrigger, ids: string[], cxtStr: string, option: OperateOption) => Promise) { - super(storageSchema, mysqlConfiguration); - this.executor = new TriggerExecutor(contextBuilder, undefined, onVolatileTrigger); - this.relationAuth = new RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict); - } - - checkRelationAsync>(entity: T, operation: Omit, context: Cxt): Promise { - return this.relationAuth.checkRelationAsync(entity, operation, context); - } - - protected async cascadeUpdateAsync(entity: T, operation: ED[T]['Operation'], context: AsyncContext, option: MysqlOperateOption) { - // 如果是在modi处理过程中,所有的trigger也可以延时到apply时再处理(这时候因为modi中的数据并不实际存在,处理会有问题) - if (!option.blockTrigger) { - await this.executor.preOperation(entity, operation, context as Cxt, option); - } - const result = await super.cascadeUpdateAsync(entity, operation, context, option); - if (!option.blockTrigger) { - await this.executor.postOperation(entity, operation, context as Cxt, option); - } - return result; - } - - async operate( - entity: T, - operation: ED[T]['Operation'], - context: Cxt, - option: MysqlOperateOption - ) { - const autoCommit = !context.getCurrentTxnId(); - let result; - if (autoCommit) { - await context.begin(); - } - try { - await this.relationAuth.checkRelationAsync(entity, operation, context); - result = await super.operate(entity, operation, context, option); - } - catch (err) { - await context.rollback(); - throw err; - } - if (autoCommit) { - await context.commit(); - } - return result; - } - - async select( - entity: T, - selection: ED[T]['Selection'], - context: Cxt, - option: MySqlSelectOption - ) { - const autoCommit = !context.getCurrentTxnId(); - if (autoCommit) { - await context.begin(); - } - let result: Partial[]; - - // select的trigger应加在根select之前,cascade的select不加处理 - Object.assign(selection, { - action: 'select', - }); - if (!option.blockTrigger) { - await this.executor.preOperation(entity, selection as ED[T]['Operation'], context, option); - } - if (!option.dontCollect) { - await this.relationAuth.checkRelationAsync(entity, selection, context); - } - try { - result = await super.select(entity, selection, context, option); - - if (!option.blockTrigger) { - await this.executor.postOperation(entity, selection as ED[T]['Operation'] - , context, option, result); - } - } - catch (err) { - await context.rollback(); - throw err; - } - if (autoCommit) { - await context.commit(); - } - return result; - } - - async count(entity: T, selection: Pick, context: Cxt, option: SelectOption): Promise { - const autoCommit = !context.getCurrentTxnId(); - let result; - if (autoCommit) { - await context.begin(); - } - try { - // count不用检查权限,因为检查权限中本身要用到count - // const selection2 = Object.assign({ - // action: 'select', - // }, selection) as ED[T]['Operation']; - - // await this.relationAuth.checkRelationAsync(entity, selection2, context); - // if (!option.blockTrigger) { - // await this.executor.preOperation(entity, selection2, context, option); - // } - result = await super.count(entity, selection, context, option); - /* count应该不存在后trigger吧 - if (!option.blockTrigger) { - await this.executor.postOperation(entity, selection2, context, option); - } */ - } - catch (err) { - await context.rollback(); - throw err; - } - if (autoCommit) { - await context.commit(); - } - return result; - } - - registerTrigger(trigger: Trigger) { - this.executor.registerTrigger(trigger); - } - - registerChecker(checker: Checker) { - this.executor.registerChecker(checker, this.getSchema()); - } - +export type TriggerStore> = { + registerTrigger(trigger: Trigger): void; + registerChecker(checker: Checker): void; setOnVolatileTrigger( onVolatileTrigger: ( entity: T, @@ -153,25 +20,193 @@ export class DbStore Promise - ) { - this.executor.setOnVolatileTrigger(onVolatileTrigger); - } - - async execVolatileTrigger( + ): void; + execVolatileTrigger( entity: T, name: string, ids: string[], context: Cxt, option: OperateOption - ) { - return this.executor.execVolatileTrigger(entity, name, ids, context, option); + ): Promise; + checkpoint(ts: number): Promise; + independentCheckPoint(name: string, ts: number, instanceCount?: number, instanceId?: number): Promise; +} + +export type AppDbStore> = DbStore & CascadeStore & TriggerStore; + +export const createDbStore = >( + storageSchema: StorageSchema, + contextBuilder: () => Cxt, + dbConfiguration: DbConfiguration, + authDeduceRelationMap: AuthDeduceRelationMap, + selectFreeEntities: SelectFreeEntities = [], + updateFreeDict: UpdateFreeDict = {}, + onVolatileTrigger?: (entity: T, trigger: VolatileTrigger, ids: string[], cxtStr: string, option: OperateOption) => Promise +): AppDbStore => { + + const BaseStoreClass = getDbStoreClass(dbConfiguration) as any + + // 动态创建继承类 + class DynamicDbStore extends BaseStoreClass implements TriggerStore { + private executor: TriggerExecutor; + private relationAuth: RelationAuth; + + constructor() { + super(storageSchema, dbConfiguration); + this.executor = new TriggerExecutor(contextBuilder, undefined, onVolatileTrigger); + this.relationAuth = new RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict); + } + + checkRelationAsync>(entity: T, operation: Omit, context: Cxt): Promise { + return this.relationAuth.checkRelationAsync(entity, operation, context); + } + + protected async cascadeUpdateAsync(entity: T, operation: ED[T]['Operation'], context: AsyncContext, option: MysqlOperateOption) { + // 如果是在modi处理过程中,所有的trigger也可以延时到apply时再处理(这时候因为modi中的数据并不实际存在,处理会有问题) + if (!option.blockTrigger) { + await this.executor.preOperation(entity, operation, context as Cxt, option); + } + const result = await super.cascadeUpdateAsync(entity, operation, context, option); + if (!option.blockTrigger) { + await this.executor.postOperation(entity, operation, context as Cxt, option); + } + return result; + } + + async operate( + entity: T, + operation: ED[T]['Operation'], + context: Cxt, + option: MysqlOperateOption + ) { + const autoCommit = !context.getCurrentTxnId(); + let result; + if (autoCommit) { + await context.begin(); + } + try { + await this.relationAuth.checkRelationAsync(entity, operation, context); + result = await super.operate(entity, operation, context, option); + } + catch (err) { + await context.rollback(); + throw err; + } + if (autoCommit) { + await context.commit(); + } + return result; + } + + async select( + entity: T, + selection: ED[T]['Selection'], + context: Cxt, + option: MySqlSelectOption + ) { + const autoCommit = !context.getCurrentTxnId(); + if (autoCommit) { + await context.begin(); + } + let result: Partial[]; + + // select的trigger应加在根select之前,cascade的select不加处理 + Object.assign(selection, { + action: 'select', + }); + if (!option.blockTrigger) { + await this.executor.preOperation(entity, selection as ED[T]['Operation'], context, option); + } + if (!option.dontCollect) { + await this.relationAuth.checkRelationAsync(entity, selection, context); + } + try { + result = await super.select(entity, selection, context, option); + + if (!option.blockTrigger) { + await this.executor.postOperation(entity, selection as ED[T]['Operation'] + , context, option, result); + } + } + catch (err) { + await context.rollback(); + throw err; + } + if (autoCommit) { + await context.commit(); + } + return result; + } + + async count(entity: T, selection: Pick, context: Cxt, option: SelectOption): Promise { + const autoCommit = !context.getCurrentTxnId(); + let result; + if (autoCommit) { + await context.begin(); + } + try { + // count不用检查权限,因为检查权限中本身要用到count + // const selection2 = Object.assign({ + // action: 'select', + // }, selection) as ED[T]['Operation']; + + // await this.relationAuth.checkRelationAsync(entity, selection2, context); + // if (!option.blockTrigger) { + // await this.executor.preOperation(entity, selection2, context, option); + // } + result = await super.count(entity, selection, context, option); + /* count应该不存在后trigger吧 + if (!option.blockTrigger) { + await this.executor.postOperation(entity, selection2, context, option); + } */ + } + catch (err) { + await context.rollback(); + throw err; + } + if (autoCommit) { + await context.commit(); + } + return result; + } + + registerTrigger(trigger: Trigger) { + this.executor.registerTrigger(trigger); + } + + registerChecker(checker: Checker) { + this.executor.registerChecker(checker, this.getSchema()); + } + + setOnVolatileTrigger( + onVolatileTrigger: ( + entity: T, + trigger: VolatileTrigger, + ids: string[], + cxtStr: string, + option: OperateOption) => Promise + ) { + this.executor.setOnVolatileTrigger(onVolatileTrigger); + } + + async execVolatileTrigger( + entity: T, + name: string, + ids: string[], + context: Cxt, + option: OperateOption + ) { + return this.executor.execVolatileTrigger(entity, name, ids, context, option); + } + + checkpoint(ts: number) { + return this.executor.checkpoint(ts); + } + + independentCheckPoint(name: string, ts: number, instanceCount?: number, instanceId?: number) { + return this.executor.independentCheckPoint(name, ts, instanceCount, instanceId); + } } - checkpoint(ts: number) { - return this.executor.checkpoint(ts); - } - - independentCheckPoint(name: string, ts: number, instanceCount?: number, instanceId?: number) { - return this.executor.independentCheckPoint(name, ts, instanceCount, instanceId); - } -} \ No newline at end of file + return new DynamicDbStore() as any as AppDbStore; +}; diff --git a/src/utils/dbPriority.ts b/src/utils/dbPriority.ts new file mode 100644 index 0000000..e9a1a14 --- /dev/null +++ b/src/utils/dbPriority.ts @@ -0,0 +1,43 @@ +import { MysqlStore, PostgreSQLStore } from "oak-db"; +import { DbConfiguration } from "oak-db/src/types/configuration"; +import { AsyncContext, AsyncRowStore } from "oak-domain/lib/store/AsyncRowStore"; +import { join } from "path"; +import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types'; +import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; +import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; +import { CascadeStore } from "oak-domain/lib/store/CascadeStore"; + +/** + * 数据库优先级列表,按顺序尝试获取配置文件 + */ +export const dbList = { + mysql: MysqlStore, + postgres: PostgreSQLStore +} + +export const getDbConfig = (path: string): DbConfiguration => { + for (const db of Object.keys(dbList)) { + try { + // eslint-disable-next-line @typescript-eslint/no-var-requires + const dbConfigFile = join(path, 'configuration', `${db}.json`); + const config = require(dbConfigFile); + console.log(`使用${db}作为数据库`); + return Object.assign({}, { type: db }, config); + } + catch (err) { + // do nothing + } + } + + throw new Error(`没有找到数据库配置文件,请在configuration目录下添加任一配置文件:${Object.keys(dbList).map(ele => `${ele}.json`).join('、')}`); +} + +export const getDbStoreClass = > + (dbConfig: DbConfiguration) => { + const dbType = dbConfig.type || 'mysql'; + const DbStoreClass = dbList[dbType.toLowerCase() as keyof typeof dbList]; + if (!DbStoreClass) { + throw new Error(`不支持的数据库类型:${dbType},请确认是否存在以下配置文件:${Object.keys(dbList).map(ele => `${ele}.json`).join('、')}`); + } + return DbStoreClass as new (schema: StorageSchema, config: DbConfiguration) => AsyncRowStore & CascadeStore; +} \ No newline at end of file