diff --git a/lib/AppLoader.d.ts b/lib/AppLoader.d.ts index 5295c75..d7c9bbe 100644 --- a/lib/AppLoader.d.ts +++ b/lib/AppLoader.d.ts @@ -19,6 +19,7 @@ export declare class AppLoader; /** - * 后台启动的configuration,统一放在这里读取 + * 获取数据库配置 + * @returns 读取数据库配置 */ - private getConfiguration; + private getDbConfig; + /** + * 获取同步配置 + * @returns 读取同步配置 + */ + private getSyncConfig; constructor(path: string, nsSubscribe?: Namespace, nsSocket?: Namespace, nsServer?: Namespace); protected registerTrigger(trigger: Trigger): void; protected initTriggers(): void; @@ -55,6 +62,30 @@ export declare class AppLoader][]; protected operateInWatcher(entity: T, operation: ED[T]['Update'], context: Cxt, singleton?: true): Promise>; protected selectInWatcher(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true): Promise[]>; + /** + * 检查某个数据是否正在被watcher执行 + * @param name watcher名称 + * @param dataId 数据ID + * @returns 如果没有正在执行则返回true,否则返回false + */ + private checkDataExecuting; + /** + * 过滤出未在执行中的数据行,并标记为执行中 + * @returns [过滤后的行, 是否有行被跳过] + */ + private filterAndMarkExecutingRows; + /** + * 清理执行标记 + */ + private cleanupExecutingMarks; + /** + * 解析 filter 和 projection(支持函数或静态值) + */ + private resolveFilterAndProjection; + /** + * 执行 WB 类型 watcher 的查询操作 + */ + private selectForWBWatcher; protected execWatcher(watcher: Watcher): Promise | undefined>; protected getCheckpointTs(): number; protected checkpoint(): Promise; diff --git a/lib/AppLoader.js b/lib/AppLoader.js index c4dfa05..1bcd13d 100644 --- a/lib/AppLoader.js +++ b/lib/AppLoader.js @@ -30,6 +30,7 @@ class AppLoader extends types_1.AppLoader { watcherTimerId; scheduledJobs = {}; internalErrorHandlers = new Array(); + watcherExecutingData = new Map(); regAllExceptionHandler() { const handlers = this.requireSth('lib/configuration/exception'); if (Array.isArray(handlers)) { @@ -92,20 +93,24 @@ class AppLoader extends types_1.AppLoader { return context; } /** - * 后台启动的configuration,统一放在这里读取 + * 获取数据库配置 + * @returns 读取数据库配置 */ - getConfiguration() { - const dbConfig = (0, dbPriority_1.getDbConfig)(this.path); + getDbConfig() { + return (0, dbPriority_1.getDbConfig)(this.path); + } + /** + * 获取同步配置 + * @returns 读取同步配置 + */ + getSyncConfig() { const syncConfigFile = (0, path_1.join)(this.path, 'lib', 'configuration', 'sync.js'); const syncConfigs = (0, fs_1.existsSync)(syncConfigFile) && require(syncConfigFile).default; - return { - dbConfig: dbConfig, - syncConfig: syncConfigs, - }; + return syncConfigs; } constructor(path, nsSubscribe, nsSocket, nsServer) { super(path); - const { dbConfig } = this.getConfiguration(); + const dbConfig = this.getDbConfig(); const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`); const depGraph = (0, dependencyBuilder_1.analyzeDepedency)(process.cwd()); this.externalDependencies = depGraph.ascOrder; @@ -173,7 +178,7 @@ class AppLoader extends types_1.AppLoader { async mount(initialize) { const { path } = this; if (!initialize) { - const { syncConfig: syncConfig } = this.getConfiguration(); + const syncConfig = this.getSyncConfig(); if (syncConfig) { this.synchronizer = new Synchronizer_1.default(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore)); } @@ -368,58 +373,153 @@ class AppLoader extends types_1.AppLoader { forUpdate, }); } + /** + * 检查某个数据是否正在被watcher执行 + * @param name watcher名称 + * @param dataId 数据ID + * @returns 如果没有正在执行则返回true,否则返回false + */ + checkDataExecuting(name, dataId) { + let dataSet = this.watcherExecutingData.get(name); + if (!dataSet) { + dataSet = new Map(); + this.watcherExecutingData.set(name, dataSet); + } + if (dataSet.has(dataId)) { + return false; + } + dataSet.set(dataId, true); + return true; + } + /** + * 过滤出未在执行中的数据行,并标记为执行中 + * @returns [过滤后的行, 是否有行被跳过] + */ + filterAndMarkExecutingRows(watcher, rows) { + if (watcher.exclusive !== true) { + // 不需要排他执行,直接返回所有行 + return [rows, false]; + } + const rowsWithoutExecuting = []; + let hasSkipped = false; + const watcherName = watcher.name; + for (const row of rows) { + if (!row.id) { + console.error(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】获取的数据没有ID,跳过此数据的并发检查处理:`, row); + rowsWithoutExecuting.push(row); + continue; + } + if (this.checkDataExecuting(watcherName, row.id)) { + rowsWithoutExecuting.push(row); + } + else { + hasSkipped = true; + console.warn(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】将跳过正在被执行的数据ID:【${row.id}】,请检查是否执行超时`); + } + } + return [rowsWithoutExecuting, hasSkipped]; + } + /** + * 清理执行标记 + */ + cleanupExecutingMarks(watcherName, rows) { + for (const row of rows) { + if (row.id) { + const dataSet = this.watcherExecutingData.get(watcherName); + if (dataSet) { + dataSet.delete(row.id); + } + } + } + } + /** + * 解析 filter 和 projection(支持函数或静态值) + */ + async resolveFilterAndProjection(filter, projection) { + const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter); + const projection2 = typeof projection === 'function' ? await projection() : (0, lodash_1.cloneDeep)(projection); + return [filter2, projection2]; + } + /** + * 执行 WB 类型 watcher 的查询操作 + */ + async selectForWBWatcher(watcher, context) { + const { entity, projection, filter, singleton, forUpdate } = watcher; + const [filter2, projection2] = await this.resolveFilterAndProjection(filter, projection); + return await this.selectInWatcher(entity, { + data: projection2, + filter: filter2, + }, context, forUpdate, singleton); + } async execWatcher(watcher) { let result; - if (watcher.hasOwnProperty('type') && watcher.type === 'free') { - const selectContext = await this.makeContext(); - const { entity, projection, fn, filter, singleton, forUpdate } = watcher; - const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter); - const projection2 = typeof projection === 'function' ? await projection() : (0, lodash_1.cloneDeep)(projection); - const rows = await this.selectInWatcher(entity, { - data: projection2, - filter: filter2, - }, selectContext, forUpdate, singleton); - if (rows.length > 0) { - result = await fn(() => this.makeContext(), rows); - } - return result; - } - const context = await this.makeContext(); - try { - if (watcher.hasOwnProperty('actionData')) { + // BBWatcher:直接操作,无需查询 + if (watcher.hasOwnProperty('actionData')) { + const context = await this.makeContext(); + try { const { entity, action, filter, actionData, singleton } = watcher; const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter); - const data = typeof actionData === 'function' ? await (actionData)() : (0, lodash_1.cloneDeep)(actionData); + const data = typeof actionData === 'function' ? await actionData() : (0, lodash_1.cloneDeep)(actionData); result = await this.operateInWatcher(entity, { id: await (0, uuid_1.generateNewIdAsync)(), action: action, data, filter: filter2, }, context, singleton); + await context.commit(); + return result; + } + catch (err) { + if (err instanceof types_1.OakPartialSuccess) { + await context.commit(); + } + else { + await context.rollback(); + } + throw err; + } + } + // WBFreeWatcher 和 WBWatcher:查询后执行 + const isFreeType = watcher.hasOwnProperty('type') && + watcher.type === 'free'; + // 1. 执行查询(WBFreeWatcher 使用独立 context) + const selectContext = isFreeType ? await this.makeContext() : await this.makeContext(); + const rows = await this.selectForWBWatcher(watcher, selectContext); + if (isFreeType) { + await selectContext.commit(); + } + // 2. 并发检查:过滤出未在执行中的数据 + const [rowsWithoutExecuting, hasSkipped] = this.filterAndMarkExecutingRows(watcher, rows); + if (rowsWithoutExecuting.length === 0) { + if (!isFreeType) { + await selectContext.commit(); + } + return result; + } + // 3. 执行业务逻辑 + try { + if (isFreeType) { + const { fn } = watcher; + result = await fn(() => this.makeContext(), rowsWithoutExecuting); } else { - const { entity, projection, fn, filter, singleton, forUpdate } = watcher; - const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter); - const projection2 = typeof projection === 'function' ? await projection() : (0, lodash_1.cloneDeep)(projection); - const rows = await this.selectInWatcher(entity, { - data: projection2, - filter: filter2, - }, context, forUpdate, singleton); - if (rows.length > 0) { - result = await fn(context, rows); - } + const { fn } = watcher; + result = await fn(selectContext, rowsWithoutExecuting); + await selectContext.commit(); } - await context.commit(); return result; } catch (err) { - if (err instanceof types_1.OakPartialSuccess) { - await context.commit(); + // 清理执行标记 + this.cleanupExecutingMarks(watcher.name, rowsWithoutExecuting); + if (!isFreeType) { + if (err instanceof types_1.OakPartialSuccess) { + await selectContext.commit(); + } + else { + await selectContext.rollback(); + } } - else { - await context.rollback(); - } - // 不能在这里publish,因为这个方法可能是在timer中调用,也可能是在routine中调用 throw err; } } diff --git a/lib/DbStore.d.ts b/lib/DbStore.d.ts index 17b11a0..f153cf5 100644 --- a/lib/DbStore.d.ts +++ b/lib/DbStore.d.ts @@ -2,6 +2,7 @@ import { DbConfiguration } from 'oak-db/src/types/configuration'; import { EntityDict, StorageSchema, Trigger, Checker, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; +import { DbTypeSymbol } from './utils/dbPriority'; import { CascadeStore } from 'oak-domain/lib/store/CascadeStore'; import { DbStore } from 'oak-db/lib/types/dbStore'; export type TriggerStore> = { @@ -13,4 +14,6 @@ export type TriggerStore; }; export type AppDbStore> = DbStore & CascadeStore & TriggerStore; -export declare const createDbStore: >(storageSchema: StorageSchema, contextBuilder: () => Cxt, dbConfiguration: DbConfiguration, authDeduceRelationMap: AuthDeduceRelationMap, selectFreeEntities?: SelectFreeEntities, updateFreeDict?: UpdateFreeDict, onVolatileTrigger?: (entity: T, trigger: VolatileTrigger, ids: string[], cxtStr: string, option: OperateOption) => Promise) => AppDbStore; +export declare const createDbStore: >(storageSchema: StorageSchema, contextBuilder: () => Cxt, dbConfiguration: DbConfiguration & { + [DbTypeSymbol]?: string; +}, authDeduceRelationMap: AuthDeduceRelationMap, selectFreeEntities?: SelectFreeEntities, updateFreeDict?: UpdateFreeDict, onVolatileTrigger?: (entity: T, trigger: VolatileTrigger, ids: string[], cxtStr: string, option: OperateOption) => Promise) => AppDbStore; diff --git a/lib/DbStore.js b/lib/DbStore.js index 71da45f..1719198 100644 --- a/lib/DbStore.js +++ b/lib/DbStore.js @@ -5,8 +5,10 @@ const TriggerExecutor_1 = require("oak-domain/lib/store/TriggerExecutor"); const RelationAuth_1 = require("oak-domain/lib/store/RelationAuth"); const dbPriority_1 = require("./utils/dbPriority"); const createDbStore = (storageSchema, contextBuilder, dbConfiguration, authDeduceRelationMap, selectFreeEntities = [], updateFreeDict = {}, onVolatileTrigger) => { - const BaseStoreClass = (0, dbPriority_1.getDbStoreClass)(); + // TODO: 这里的类型检查会过不去,因为ts不知道上层已经实现这个抽象类。 + const BaseStoreClass = (0, dbPriority_1.getDbStoreClass)(dbConfiguration); // 动态创建继承类 + // @ts-ignore class DynamicDbStore extends BaseStoreClass { executor; relationAuth; diff --git a/lib/utils/dbPriority.d.ts b/lib/utils/dbPriority.d.ts index 9b818f7..61ec4e5 100644 --- a/lib/utils/dbPriority.d.ts +++ b/lib/utils/dbPriority.d.ts @@ -1,10 +1,10 @@ import { MysqlStore, PostgreSQLStore } from "oak-db"; import { DbConfiguration } from "oak-db/src/types/configuration"; -import { AsyncRowStore } from "oak-domain/lib/store/AsyncRowStore"; import { EntityDict, StorageSchema } from 'oak-domain/lib/types'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { CascadeStore } from "oak-domain/lib/store/CascadeStore"; +import { DbStore } from "oak-db/lib/types/dbStore"; /** * 数据库优先级列表,按顺序尝试获取配置文件 */ @@ -12,5 +12,10 @@ export declare const dbList: { mysql: typeof MysqlStore; postgres: typeof PostgreSQLStore; }; -export declare const getDbConfig: (path: string) => DbConfiguration; -export declare const getDbStoreClass: >() => new (schema: StorageSchema, config: DbConfiguration) => AsyncRowStore & CascadeStore; +export declare const DbTypeSymbol: unique symbol; +export declare const getDbConfig: (path: string) => DbConfiguration & { + [DbTypeSymbol]: string; +}; +export declare const getDbStoreClass: >(config: { + [DbTypeSymbol]?: string; +}) => new (schema: StorageSchema, config: DbConfiguration) => DbStore & CascadeStore; diff --git a/lib/utils/dbPriority.js b/lib/utils/dbPriority.js index 4173234..ae0e0d9 100644 --- a/lib/utils/dbPriority.js +++ b/lib/utils/dbPriority.js @@ -1,6 +1,6 @@ "use strict"; Object.defineProperty(exports, "__esModule", { value: true }); -exports.getDbStoreClass = exports.getDbConfig = exports.dbList = void 0; +exports.getDbStoreClass = exports.getDbConfig = exports.DbTypeSymbol = exports.dbList = void 0; const oak_db_1 = require("oak-db"); const path_1 = require("path"); const fs_1 = require("fs"); @@ -11,7 +11,7 @@ exports.dbList = { mysql: oak_db_1.MysqlStore, postgres: oak_db_1.PostgreSQLStore }; -let usedDbType = null; +exports.DbTypeSymbol = Symbol.for('oak:backend:db:type'); const getDbConfig = (path) => { for (const db of Object.keys(exports.dbList)) { try { @@ -25,8 +25,14 @@ const getDbConfig = (path) => { } const config = require(dbConfigFile); console.log(`使用${db}作为数据库`); - usedDbType = db; - return Object.assign({}, config); + // 定义不可枚举的属性,避免被序列化 + Object.defineProperty(config, exports.DbTypeSymbol, { + value: db, + enumerable: false, + writable: false, + configurable: false + }); + return config; } catch (err) { // do nothing @@ -35,10 +41,11 @@ const getDbConfig = (path) => { throw new Error(`没有找到数据库配置文件,请在configuration目录下添加任一配置文件:${Object.keys(exports.dbList).map(ele => `${ele}.json`).join('、')}`); }; exports.getDbConfig = getDbConfig; -const getDbStoreClass = () => { - const dbType = usedDbType || (() => { - throw new Error('无法确定数据库类型'); - })(); +const getDbStoreClass = (config) => { + const dbType = Object.getOwnPropertyDescriptor(config, exports.DbTypeSymbol)?.value; + if (!dbType) { + throw new Error('无法获取数据库类型,请确认是否存在数据库配置文件'); + } const DbStoreClass = exports.dbList[dbType.toLowerCase()]; if (!DbStoreClass) { throw new Error(`不支持的数据库类型:${dbType},请确认是否存在以下配置文件:${Object.keys(exports.dbList).map(ele => `${ele}.json`).join('、')}`); diff --git a/src/AppLoader.ts b/src/AppLoader.ts index a1801d2..c68d146 100644 --- a/src/AppLoader.ts +++ b/src/AppLoader.ts @@ -36,6 +36,8 @@ export class AppLoader = {}; private internalErrorHandlers = new Array>(); + private watcherExecutingData: Map> = new Map(); + public regAllExceptionHandler() { const handlers = this.requireSth('lib/configuration/exception') as Array> | InternalErrorHandler; if (Array.isArray(handlers)) { @@ -105,22 +107,26 @@ export class AppLoader | undefined, - }; + return syncConfigs as SyncConfig | undefined; } constructor(path: string, nsSubscribe?: Namespace, nsSocket?: Namespace, nsServer?: Namespace) { super(path); - const { dbConfig } = this.getConfiguration(); + const dbConfig = this.getDbConfig(); const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`); const depGraph = analyzeDepedency(process.cwd()); this.externalDependencies = depGraph.ascOrder; @@ -224,7 +230,7 @@ export class AppLoader this.contextBuilder(this.dbStore)); @@ -447,60 +453,186 @@ export class AppLoader(); + this.watcherExecutingData.set(name, dataSet); + } + if (dataSet.has(dataId)) { + return false; + } + dataSet.set(dataId, true); + return true; + } + + /** + * 过滤出未在执行中的数据行,并标记为执行中 + * @returns [过滤后的行, 是否有行被跳过] + */ + private filterAndMarkExecutingRows( + watcher: Watcher, + rows: Partial[] + ): [Partial[], boolean] { + if (watcher.exclusive !== true) { + // 不需要排他执行,直接返回所有行 + return [rows, false]; + } + + const rowsWithoutExecuting: Partial[] = []; + let hasSkipped = false; + const watcherName = watcher.name; + + for (const row of rows) { + if (!row.id) { + console.error(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】获取的数据没有ID,跳过此数据的并发检查处理:`, row); + rowsWithoutExecuting.push(row); + continue; + } + + if (this.checkDataExecuting(watcherName, row.id)) { + rowsWithoutExecuting.push(row); + } else { + hasSkipped = true; + console.warn(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】将跳过正在被执行的数据ID:【${row.id}】,请检查是否执行超时`); + } + } + + return [rowsWithoutExecuting, hasSkipped]; + } + + /** + * 清理执行标记 + */ + private cleanupExecutingMarks( + watcherName: string, + rows: Partial[] + ) { + for (const row of rows) { + if (row.id) { + const dataSet = this.watcherExecutingData.get(watcherName); + if (dataSet) { + dataSet.delete(row.id); + } + } + } + } + + /** + * 解析 filter 和 projection(支持函数或静态值) + */ + private async resolveFilterAndProjection( + filter: T | (() => Promise), + projection: any | (() => Promise) + ): Promise<[T, any]> { + const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter); + const projection2 = typeof projection === 'function' ? await projection() : cloneDeep(projection); + return [filter2, projection2]; + } + + /** + * 执行 WB 类型 watcher 的查询操作 + */ + private async selectForWBWatcher( + watcher: WBWatcher | WBFreeWatcher, + context: Cxt + ) { + const { entity, projection, filter, singleton, forUpdate } = watcher; + const [filter2, projection2] = await this.resolveFilterAndProjection(filter, projection); + + return await this.selectInWatcher(entity, { + data: projection2, + filter: filter2, + }, context, forUpdate, singleton); + } + protected async execWatcher(watcher: Watcher) { let result: OperationResult | undefined; - if (watcher.hasOwnProperty('type') && (watcher as WBFreeWatcher).type === 'free') { - const selectContext = await this.makeContext(); - const { entity, projection, fn, filter, singleton, forUpdate } = >watcher; - const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter); - const projection2 = typeof projection === 'function' ? await projection() : cloneDeep(projection); - const rows = await this.selectInWatcher(entity, { - data: projection2, - filter: filter2, - }, selectContext, forUpdate, singleton); - if (rows.length > 0) { - result = await fn(() => this.makeContext(), rows); - } - return result; - } - const context = await this.makeContext(); - try { - if (watcher.hasOwnProperty('actionData')) { + // BBWatcher:直接操作,无需查询 + if (watcher.hasOwnProperty('actionData')) { + const context = await this.makeContext(); + try { const { entity, action, filter, actionData, singleton } = >watcher; const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter); - const data = typeof actionData === 'function' ? await (actionData)() : cloneDeep(actionData); + const data = typeof actionData === 'function' ? await actionData() : cloneDeep(actionData); + result = await this.operateInWatcher(entity, { id: await generateNewIdAsync(), action: action as string, data, filter: filter2, }, context, singleton); - } - else { - const { entity, projection, fn, filter, singleton, forUpdate } = >watcher; - const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter); - const projection2 = typeof projection === 'function' ? await projection() : cloneDeep(projection); - const rows = await this.selectInWatcher(entity, { - data: projection2, - filter: filter2, - }, context, forUpdate, singleton); - if (rows.length > 0) { - result = await fn(context, rows); + await context.commit(); + return result; + } catch (err) { + if (err instanceof OakPartialSuccess) { + await context.commit(); + } else { + await context.rollback(); } + throw err; + } + } + + // WBFreeWatcher 和 WBWatcher:查询后执行 + const isFreeType = watcher.hasOwnProperty('type') && + (watcher as WBFreeWatcher).type === 'free'; + + // 1. 执行查询(WBFreeWatcher 使用独立 context) + const selectContext = isFreeType ? await this.makeContext() : await this.makeContext(); + const rows = await this.selectForWBWatcher( + watcher as WBWatcher | WBFreeWatcher, + selectContext + ); + + if (isFreeType) { + await selectContext.commit(); + } + + // 2. 并发检查:过滤出未在执行中的数据 + const [rowsWithoutExecuting, hasSkipped] = this.filterAndMarkExecutingRows( + watcher, + rows + ); + + if (rowsWithoutExecuting.length === 0) { + if (!isFreeType) { + await selectContext.commit(); } - await context.commit(); return result; } - catch (err) { - if (err instanceof OakPartialSuccess) { - await context.commit(); + + // 3. 执行业务逻辑 + try { + if (isFreeType) { + const { fn } = watcher as WBFreeWatcher; + result = await fn(() => this.makeContext(), rowsWithoutExecuting); + } else { + const { fn } = watcher as WBWatcher; + result = await fn(selectContext, rowsWithoutExecuting); + await selectContext.commit(); } - else { - await context.rollback(); + + return result; + } catch (err) { + // 清理执行标记 + this.cleanupExecutingMarks(watcher.name, rowsWithoutExecuting); + + if (!isFreeType) { + if (err instanceof OakPartialSuccess) { + await selectContext.commit(); + } else { + await selectContext.rollback(); + } } - // 不能在这里publish,因为这个方法可能是在timer中调用,也可能是在routine中调用 throw err; } } diff --git a/src/DbStore.ts b/src/DbStore.ts index 60febb2..e7b9356 100644 --- a/src/DbStore.ts +++ b/src/DbStore.ts @@ -6,7 +6,7 @@ import { TriggerExecutor } from 'oak-domain/lib/store/TriggerExecutor'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore'; import { RelationAuth } from 'oak-domain/lib/store/RelationAuth'; -import { getDbStoreClass } from './utils/dbPriority'; +import { DbTypeSymbol, getDbStoreClass } from './utils/dbPriority'; import { CascadeStore } from 'oak-domain/lib/store/CascadeStore'; import { DbStore } from 'oak-db/lib/types/dbStore'; @@ -37,16 +37,20 @@ export type AppDbStore>( storageSchema: StorageSchema, contextBuilder: () => Cxt, - dbConfiguration: DbConfiguration, + dbConfiguration: DbConfiguration & { + [DbTypeSymbol]?: string; + }, authDeduceRelationMap: AuthDeduceRelationMap, selectFreeEntities: SelectFreeEntities = [], updateFreeDict: UpdateFreeDict = {}, onVolatileTrigger?: (entity: T, trigger: VolatileTrigger, ids: string[], cxtStr: string, option: OperateOption) => Promise ): AppDbStore => { - const BaseStoreClass = getDbStoreClass() as any - + // TODO: 这里的类型检查会过不去,因为ts不知道上层已经实现这个抽象类。 + const BaseStoreClass = getDbStoreClass(dbConfiguration) + // 动态创建继承类 + // @ts-ignore class DynamicDbStore extends BaseStoreClass implements TriggerStore { private executor: TriggerExecutor; private relationAuth: RelationAuth; @@ -208,5 +212,5 @@ export const createDbStore = ; + return new DynamicDbStore() as AppDbStore; }; diff --git a/src/utils/dbPriority.ts b/src/utils/dbPriority.ts index 1ff9914..070bbbe 100644 --- a/src/utils/dbPriority.ts +++ b/src/utils/dbPriority.ts @@ -2,11 +2,12 @@ import { MysqlStore, PostgreSQLStore } from "oak-db"; import { DbConfiguration } from "oak-db/src/types/configuration"; import { AsyncContext, AsyncRowStore } from "oak-domain/lib/store/AsyncRowStore"; import { join } from "path"; -import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types'; +import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption, RowStore } from 'oak-domain/lib/types'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { CascadeStore } from "oak-domain/lib/store/CascadeStore"; import { existsSync } from "fs"; +import { DbStore } from "oak-db/lib/types/dbStore"; /** * 数据库优先级列表,按顺序尝试获取配置文件 @@ -16,9 +17,11 @@ export const dbList = { postgres: PostgreSQLStore } -let usedDbType: string | null = null; +export const DbTypeSymbol = Symbol.for('oak:backend:db:type') -export const getDbConfig = (path: string): DbConfiguration => { +export const getDbConfig = (path: string): DbConfiguration & { + [DbTypeSymbol]: string; +} => { for (const db of Object.keys(dbList)) { try { // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -30,11 +33,17 @@ export const getDbConfig = (path: string): DbConfiguration => { if (existsSync(dbConfigFile) === false) { continue; } - + const config = require(dbConfigFile); console.log(`使用${db}作为数据库`); - usedDbType = db; - return Object.assign({}, config); + // 定义不可枚举的属性,避免被序列化 + Object.defineProperty(config, DbTypeSymbol, { + value: db, + enumerable: false, + writable: false, + configurable: false + }); + return config; } catch (err) { // do nothing @@ -45,13 +54,16 @@ export const getDbConfig = (path: string): DbConfiguration => { } export const getDbStoreClass = > - () => { - const dbType = usedDbType || (() => { - throw new Error('无法确定数据库类型'); - })(); + (config: { + [DbTypeSymbol]?: string; + }) => { + const dbType = Object.getOwnPropertyDescriptor(config, DbTypeSymbol)?.value; + if (!dbType) { + throw new Error('无法获取数据库类型,请确认是否存在数据库配置文件'); + } const DbStoreClass = dbList[dbType.toLowerCase() as keyof typeof dbList]; if (!DbStoreClass) { throw new Error(`不支持的数据库类型:${dbType},请确认是否存在以下配置文件:${Object.keys(dbList).map(ele => `${ele}.json`).join('、')}`); } - return DbStoreClass as new (schema: StorageSchema, config: DbConfiguration) => AsyncRowStore & CascadeStore; + return DbStoreClass as new (schema: StorageSchema, config: DbConfiguration) => DbStore & CascadeStore; } \ No newline at end of file