feat: 支持了execWatcher的exclusive模式
This commit is contained in:
parent
19f68cb7ec
commit
4f253bab2c
|
|
@ -19,6 +19,7 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
|
|||
private watcherTimerId?;
|
||||
private scheduledJobs;
|
||||
private internalErrorHandlers;
|
||||
private watcherExecutingData;
|
||||
regAllExceptionHandler(): void;
|
||||
/**
|
||||
* 注册一个内部错误处理器
|
||||
|
|
@ -32,9 +33,15 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
|
|||
private requireSth;
|
||||
protected makeContext(cxtStr?: string, headers?: IncomingHttpHeaders): Promise<Cxt>;
|
||||
/**
|
||||
* 后台启动的configuration,统一放在这里读取
|
||||
* 获取数据库配置
|
||||
* @returns 读取数据库配置
|
||||
*/
|
||||
private getConfiguration;
|
||||
private getDbConfig;
|
||||
/**
|
||||
* 获取同步配置
|
||||
* @returns 读取同步配置
|
||||
*/
|
||||
private getSyncConfig;
|
||||
constructor(path: string, nsSubscribe?: Namespace, nsSocket?: Namespace, nsServer?: Namespace);
|
||||
protected registerTrigger(trigger: Trigger<ED, keyof ED, Cxt>): void;
|
||||
protected initTriggers(): void;
|
||||
|
|
@ -55,6 +62,30 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
|
|||
}>][];
|
||||
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt, singleton?: true): Promise<OperationResult<ED>>;
|
||||
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true): Promise<Partial<ED[T]["Schema"]>[]>;
|
||||
/**
|
||||
* 检查某个数据是否正在被watcher执行
|
||||
* @param name watcher名称
|
||||
* @param dataId 数据ID
|
||||
* @returns 如果没有正在执行则返回true,否则返回false
|
||||
*/
|
||||
private checkDataExecuting;
|
||||
/**
|
||||
* 过滤出未在执行中的数据行,并标记为执行中
|
||||
* @returns [过滤后的行, 是否有行被跳过]
|
||||
*/
|
||||
private filterAndMarkExecutingRows;
|
||||
/**
|
||||
* 清理执行标记
|
||||
*/
|
||||
private cleanupExecutingMarks;
|
||||
/**
|
||||
* 解析 filter 和 projection(支持函数或静态值)
|
||||
*/
|
||||
private resolveFilterAndProjection;
|
||||
/**
|
||||
* 执行 WB 类型 watcher 的查询操作
|
||||
*/
|
||||
private selectForWBWatcher;
|
||||
protected execWatcher(watcher: Watcher<ED, keyof ED, Cxt>): Promise<OperationResult<ED> | undefined>;
|
||||
protected getCheckpointTs(): number;
|
||||
protected checkpoint(): Promise<number>;
|
||||
|
|
|
|||
188
lib/AppLoader.js
188
lib/AppLoader.js
|
|
@ -30,6 +30,7 @@ class AppLoader extends types_1.AppLoader {
|
|||
watcherTimerId;
|
||||
scheduledJobs = {};
|
||||
internalErrorHandlers = new Array();
|
||||
watcherExecutingData = new Map();
|
||||
regAllExceptionHandler() {
|
||||
const handlers = this.requireSth('lib/configuration/exception');
|
||||
if (Array.isArray(handlers)) {
|
||||
|
|
@ -92,20 +93,24 @@ class AppLoader extends types_1.AppLoader {
|
|||
return context;
|
||||
}
|
||||
/**
|
||||
* 后台启动的configuration,统一放在这里读取
|
||||
* 获取数据库配置
|
||||
* @returns 读取数据库配置
|
||||
*/
|
||||
getConfiguration() {
|
||||
const dbConfig = (0, dbPriority_1.getDbConfig)(this.path);
|
||||
getDbConfig() {
|
||||
return (0, dbPriority_1.getDbConfig)(this.path);
|
||||
}
|
||||
/**
|
||||
* 获取同步配置
|
||||
* @returns 读取同步配置
|
||||
*/
|
||||
getSyncConfig() {
|
||||
const syncConfigFile = (0, path_1.join)(this.path, 'lib', 'configuration', 'sync.js');
|
||||
const syncConfigs = (0, fs_1.existsSync)(syncConfigFile) && require(syncConfigFile).default;
|
||||
return {
|
||||
dbConfig: dbConfig,
|
||||
syncConfig: syncConfigs,
|
||||
};
|
||||
return syncConfigs;
|
||||
}
|
||||
constructor(path, nsSubscribe, nsSocket, nsServer) {
|
||||
super(path);
|
||||
const { dbConfig } = this.getConfiguration();
|
||||
const dbConfig = this.getDbConfig();
|
||||
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
|
||||
const depGraph = (0, dependencyBuilder_1.analyzeDepedency)(process.cwd());
|
||||
this.externalDependencies = depGraph.ascOrder;
|
||||
|
|
@ -173,7 +178,7 @@ class AppLoader extends types_1.AppLoader {
|
|||
async mount(initialize) {
|
||||
const { path } = this;
|
||||
if (!initialize) {
|
||||
const { syncConfig: syncConfig } = this.getConfiguration();
|
||||
const syncConfig = this.getSyncConfig();
|
||||
if (syncConfig) {
|
||||
this.synchronizer = new Synchronizer_1.default(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore));
|
||||
}
|
||||
|
|
@ -368,58 +373,153 @@ class AppLoader extends types_1.AppLoader {
|
|||
forUpdate,
|
||||
});
|
||||
}
|
||||
/**
|
||||
* 检查某个数据是否正在被watcher执行
|
||||
* @param name watcher名称
|
||||
* @param dataId 数据ID
|
||||
* @returns 如果没有正在执行则返回true,否则返回false
|
||||
*/
|
||||
checkDataExecuting(name, dataId) {
|
||||
let dataSet = this.watcherExecutingData.get(name);
|
||||
if (!dataSet) {
|
||||
dataSet = new Map();
|
||||
this.watcherExecutingData.set(name, dataSet);
|
||||
}
|
||||
if (dataSet.has(dataId)) {
|
||||
return false;
|
||||
}
|
||||
dataSet.set(dataId, true);
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* 过滤出未在执行中的数据行,并标记为执行中
|
||||
* @returns [过滤后的行, 是否有行被跳过]
|
||||
*/
|
||||
filterAndMarkExecutingRows(watcher, rows) {
|
||||
if (watcher.exclusive !== true) {
|
||||
// 不需要排他执行,直接返回所有行
|
||||
return [rows, false];
|
||||
}
|
||||
const rowsWithoutExecuting = [];
|
||||
let hasSkipped = false;
|
||||
const watcherName = watcher.name;
|
||||
for (const row of rows) {
|
||||
if (!row.id) {
|
||||
console.error(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】获取的数据没有ID,跳过此数据的并发检查处理:`, row);
|
||||
rowsWithoutExecuting.push(row);
|
||||
continue;
|
||||
}
|
||||
if (this.checkDataExecuting(watcherName, row.id)) {
|
||||
rowsWithoutExecuting.push(row);
|
||||
}
|
||||
else {
|
||||
hasSkipped = true;
|
||||
console.warn(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】将跳过正在被执行的数据ID:【${row.id}】,请检查是否执行超时`);
|
||||
}
|
||||
}
|
||||
return [rowsWithoutExecuting, hasSkipped];
|
||||
}
|
||||
/**
|
||||
* 清理执行标记
|
||||
*/
|
||||
cleanupExecutingMarks(watcherName, rows) {
|
||||
for (const row of rows) {
|
||||
if (row.id) {
|
||||
const dataSet = this.watcherExecutingData.get(watcherName);
|
||||
if (dataSet) {
|
||||
dataSet.delete(row.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 解析 filter 和 projection(支持函数或静态值)
|
||||
*/
|
||||
async resolveFilterAndProjection(filter, projection) {
|
||||
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter);
|
||||
const projection2 = typeof projection === 'function' ? await projection() : (0, lodash_1.cloneDeep)(projection);
|
||||
return [filter2, projection2];
|
||||
}
|
||||
/**
|
||||
* 执行 WB 类型 watcher 的查询操作
|
||||
*/
|
||||
async selectForWBWatcher(watcher, context) {
|
||||
const { entity, projection, filter, singleton, forUpdate } = watcher;
|
||||
const [filter2, projection2] = await this.resolveFilterAndProjection(filter, projection);
|
||||
return await this.selectInWatcher(entity, {
|
||||
data: projection2,
|
||||
filter: filter2,
|
||||
}, context, forUpdate, singleton);
|
||||
}
|
||||
async execWatcher(watcher) {
|
||||
let result;
|
||||
if (watcher.hasOwnProperty('type') && watcher.type === 'free') {
|
||||
const selectContext = await this.makeContext();
|
||||
const { entity, projection, fn, filter, singleton, forUpdate } = watcher;
|
||||
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter);
|
||||
const projection2 = typeof projection === 'function' ? await projection() : (0, lodash_1.cloneDeep)(projection);
|
||||
const rows = await this.selectInWatcher(entity, {
|
||||
data: projection2,
|
||||
filter: filter2,
|
||||
}, selectContext, forUpdate, singleton);
|
||||
if (rows.length > 0) {
|
||||
result = await fn(() => this.makeContext(), rows);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
const context = await this.makeContext();
|
||||
try {
|
||||
if (watcher.hasOwnProperty('actionData')) {
|
||||
// BBWatcher:直接操作,无需查询
|
||||
if (watcher.hasOwnProperty('actionData')) {
|
||||
const context = await this.makeContext();
|
||||
try {
|
||||
const { entity, action, filter, actionData, singleton } = watcher;
|
||||
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter);
|
||||
const data = typeof actionData === 'function' ? await (actionData)() : (0, lodash_1.cloneDeep)(actionData);
|
||||
const data = typeof actionData === 'function' ? await actionData() : (0, lodash_1.cloneDeep)(actionData);
|
||||
result = await this.operateInWatcher(entity, {
|
||||
id: await (0, uuid_1.generateNewIdAsync)(),
|
||||
action: action,
|
||||
data,
|
||||
filter: filter2,
|
||||
}, context, singleton);
|
||||
await context.commit();
|
||||
return result;
|
||||
}
|
||||
catch (err) {
|
||||
if (err instanceof types_1.OakPartialSuccess) {
|
||||
await context.commit();
|
||||
}
|
||||
else {
|
||||
await context.rollback();
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
// WBFreeWatcher 和 WBWatcher:查询后执行
|
||||
const isFreeType = watcher.hasOwnProperty('type') &&
|
||||
watcher.type === 'free';
|
||||
// 1. 执行查询(WBFreeWatcher 使用独立 context)
|
||||
const selectContext = isFreeType ? await this.makeContext() : await this.makeContext();
|
||||
const rows = await this.selectForWBWatcher(watcher, selectContext);
|
||||
if (isFreeType) {
|
||||
await selectContext.commit();
|
||||
}
|
||||
// 2. 并发检查:过滤出未在执行中的数据
|
||||
const [rowsWithoutExecuting, hasSkipped] = this.filterAndMarkExecutingRows(watcher, rows);
|
||||
if (rowsWithoutExecuting.length === 0) {
|
||||
if (!isFreeType) {
|
||||
await selectContext.commit();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
// 3. 执行业务逻辑
|
||||
try {
|
||||
if (isFreeType) {
|
||||
const { fn } = watcher;
|
||||
result = await fn(() => this.makeContext(), rowsWithoutExecuting);
|
||||
}
|
||||
else {
|
||||
const { entity, projection, fn, filter, singleton, forUpdate } = watcher;
|
||||
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter);
|
||||
const projection2 = typeof projection === 'function' ? await projection() : (0, lodash_1.cloneDeep)(projection);
|
||||
const rows = await this.selectInWatcher(entity, {
|
||||
data: projection2,
|
||||
filter: filter2,
|
||||
}, context, forUpdate, singleton);
|
||||
if (rows.length > 0) {
|
||||
result = await fn(context, rows);
|
||||
}
|
||||
const { fn } = watcher;
|
||||
result = await fn(selectContext, rowsWithoutExecuting);
|
||||
await selectContext.commit();
|
||||
}
|
||||
await context.commit();
|
||||
return result;
|
||||
}
|
||||
catch (err) {
|
||||
if (err instanceof types_1.OakPartialSuccess) {
|
||||
await context.commit();
|
||||
// 清理执行标记
|
||||
this.cleanupExecutingMarks(watcher.name, rowsWithoutExecuting);
|
||||
if (!isFreeType) {
|
||||
if (err instanceof types_1.OakPartialSuccess) {
|
||||
await selectContext.commit();
|
||||
}
|
||||
else {
|
||||
await selectContext.rollback();
|
||||
}
|
||||
}
|
||||
else {
|
||||
await context.rollback();
|
||||
}
|
||||
// 不能在这里publish,因为这个方法可能是在timer中调用,也可能是在routine中调用
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ import { DbConfiguration } from 'oak-db/src/types/configuration';
|
|||
import { EntityDict, StorageSchema, Trigger, Checker, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types';
|
||||
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
|
||||
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
|
||||
import { DbTypeSymbol } from './utils/dbPriority';
|
||||
import { CascadeStore } from 'oak-domain/lib/store/CascadeStore';
|
||||
import { DbStore } from 'oak-db/lib/types/dbStore';
|
||||
export type TriggerStore<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> = {
|
||||
|
|
@ -13,4 +14,6 @@ export type TriggerStore<ED extends EntityDict & BaseEntityDict, Cxt extends Bac
|
|||
independentCheckPoint(name: string, ts: number, instanceCount?: number, instanceId?: number): Promise<number>;
|
||||
};
|
||||
export type AppDbStore<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> = DbStore<ED, Cxt> & CascadeStore<ED> & TriggerStore<ED, Cxt>;
|
||||
export declare const createDbStore: <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>(storageSchema: StorageSchema<ED>, contextBuilder: () => Cxt, dbConfiguration: DbConfiguration, authDeduceRelationMap: AuthDeduceRelationMap<ED>, selectFreeEntities?: SelectFreeEntities<ED>, updateFreeDict?: UpdateFreeDict<ED>, onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>) => AppDbStore<ED, Cxt>;
|
||||
export declare const createDbStore: <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>(storageSchema: StorageSchema<ED>, contextBuilder: () => Cxt, dbConfiguration: DbConfiguration & {
|
||||
[DbTypeSymbol]?: string;
|
||||
}, authDeduceRelationMap: AuthDeduceRelationMap<ED>, selectFreeEntities?: SelectFreeEntities<ED>, updateFreeDict?: UpdateFreeDict<ED>, onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>) => AppDbStore<ED, Cxt>;
|
||||
|
|
|
|||
|
|
@ -5,8 +5,10 @@ const TriggerExecutor_1 = require("oak-domain/lib/store/TriggerExecutor");
|
|||
const RelationAuth_1 = require("oak-domain/lib/store/RelationAuth");
|
||||
const dbPriority_1 = require("./utils/dbPriority");
|
||||
const createDbStore = (storageSchema, contextBuilder, dbConfiguration, authDeduceRelationMap, selectFreeEntities = [], updateFreeDict = {}, onVolatileTrigger) => {
|
||||
const BaseStoreClass = (0, dbPriority_1.getDbStoreClass)();
|
||||
// TODO: 这里的类型检查会过不去,因为ts不知道上层已经实现这个抽象类。
|
||||
const BaseStoreClass = (0, dbPriority_1.getDbStoreClass)(dbConfiguration);
|
||||
// 动态创建继承类
|
||||
// @ts-ignore
|
||||
class DynamicDbStore extends BaseStoreClass {
|
||||
executor;
|
||||
relationAuth;
|
||||
|
|
|
|||
|
|
@ -1,10 +1,10 @@
|
|||
import { MysqlStore, PostgreSQLStore } from "oak-db";
|
||||
import { DbConfiguration } from "oak-db/src/types/configuration";
|
||||
import { AsyncRowStore } from "oak-domain/lib/store/AsyncRowStore";
|
||||
import { EntityDict, StorageSchema } from 'oak-domain/lib/types';
|
||||
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
|
||||
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
|
||||
import { CascadeStore } from "oak-domain/lib/store/CascadeStore";
|
||||
import { DbStore } from "oak-db/lib/types/dbStore";
|
||||
/**
|
||||
* 数据库优先级列表,按顺序尝试获取配置文件
|
||||
*/
|
||||
|
|
@ -12,5 +12,10 @@ export declare const dbList: {
|
|||
mysql: typeof MysqlStore;
|
||||
postgres: typeof PostgreSQLStore;
|
||||
};
|
||||
export declare const getDbConfig: (path: string) => DbConfiguration;
|
||||
export declare const getDbStoreClass: <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>() => new (schema: StorageSchema<ED>, config: DbConfiguration) => AsyncRowStore<ED, Cxt> & CascadeStore<ED>;
|
||||
export declare const DbTypeSymbol: unique symbol;
|
||||
export declare const getDbConfig: (path: string) => DbConfiguration & {
|
||||
[DbTypeSymbol]: string;
|
||||
};
|
||||
export declare const getDbStoreClass: <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>(config: {
|
||||
[DbTypeSymbol]?: string;
|
||||
}) => new (schema: StorageSchema<ED>, config: DbConfiguration) => DbStore<ED, Cxt> & CascadeStore<ED>;
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
"use strict";
|
||||
Object.defineProperty(exports, "__esModule", { value: true });
|
||||
exports.getDbStoreClass = exports.getDbConfig = exports.dbList = void 0;
|
||||
exports.getDbStoreClass = exports.getDbConfig = exports.DbTypeSymbol = exports.dbList = void 0;
|
||||
const oak_db_1 = require("oak-db");
|
||||
const path_1 = require("path");
|
||||
const fs_1 = require("fs");
|
||||
|
|
@ -11,7 +11,7 @@ exports.dbList = {
|
|||
mysql: oak_db_1.MysqlStore,
|
||||
postgres: oak_db_1.PostgreSQLStore
|
||||
};
|
||||
let usedDbType = null;
|
||||
exports.DbTypeSymbol = Symbol.for('oak:backend:db:type');
|
||||
const getDbConfig = (path) => {
|
||||
for (const db of Object.keys(exports.dbList)) {
|
||||
try {
|
||||
|
|
@ -25,8 +25,14 @@ const getDbConfig = (path) => {
|
|||
}
|
||||
const config = require(dbConfigFile);
|
||||
console.log(`使用${db}作为数据库`);
|
||||
usedDbType = db;
|
||||
return Object.assign({}, config);
|
||||
// 定义不可枚举的属性,避免被序列化
|
||||
Object.defineProperty(config, exports.DbTypeSymbol, {
|
||||
value: db,
|
||||
enumerable: false,
|
||||
writable: false,
|
||||
configurable: false
|
||||
});
|
||||
return config;
|
||||
}
|
||||
catch (err) {
|
||||
// do nothing
|
||||
|
|
@ -35,10 +41,11 @@ const getDbConfig = (path) => {
|
|||
throw new Error(`没有找到数据库配置文件,请在configuration目录下添加任一配置文件:${Object.keys(exports.dbList).map(ele => `${ele}.json`).join('、')}`);
|
||||
};
|
||||
exports.getDbConfig = getDbConfig;
|
||||
const getDbStoreClass = () => {
|
||||
const dbType = usedDbType || (() => {
|
||||
throw new Error('无法确定数据库类型');
|
||||
})();
|
||||
const getDbStoreClass = (config) => {
|
||||
const dbType = Object.getOwnPropertyDescriptor(config, exports.DbTypeSymbol)?.value;
|
||||
if (!dbType) {
|
||||
throw new Error('无法获取数据库类型,请确认是否存在数据库配置文件');
|
||||
}
|
||||
const DbStoreClass = exports.dbList[dbType.toLowerCase()];
|
||||
if (!DbStoreClass) {
|
||||
throw new Error(`不支持的数据库类型:${dbType},请确认是否存在以下配置文件:${Object.keys(exports.dbList).map(ele => `${ele}.json`).join('、')}`);
|
||||
|
|
|
|||
224
src/AppLoader.ts
224
src/AppLoader.ts
|
|
@ -36,6 +36,8 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
private scheduledJobs: Record<string, Job> = {};
|
||||
private internalErrorHandlers = new Array<InternalErrorHandler<ED, Cxt>>();
|
||||
|
||||
private watcherExecutingData: Map<string, Map<string, true>> = new Map();
|
||||
|
||||
public regAllExceptionHandler() {
|
||||
const handlers = this.requireSth('lib/configuration/exception') as Array<InternalErrorHandler<ED, Cxt>> | InternalErrorHandler<ED, Cxt>;
|
||||
if (Array.isArray(handlers)) {
|
||||
|
|
@ -105,22 +107,26 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
}
|
||||
|
||||
/**
|
||||
* 后台启动的configuration,统一放在这里读取
|
||||
* 获取数据库配置
|
||||
* @returns 读取数据库配置
|
||||
*/
|
||||
private getConfiguration() {
|
||||
const dbConfig = getDbConfig(this.path);
|
||||
private getDbConfig() {
|
||||
return getDbConfig(this.path);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取同步配置
|
||||
* @returns 读取同步配置
|
||||
*/
|
||||
private getSyncConfig() {
|
||||
const syncConfigFile = join(this.path, 'lib', 'configuration', 'sync.js');
|
||||
const syncConfigs = existsSync(syncConfigFile) && require(syncConfigFile).default;
|
||||
|
||||
return {
|
||||
dbConfig: dbConfig,
|
||||
syncConfig: syncConfigs as SyncConfig<ED, Cxt> | undefined,
|
||||
};
|
||||
return syncConfigs as SyncConfig<ED, Cxt> | undefined;
|
||||
}
|
||||
|
||||
constructor(path: string, nsSubscribe?: Namespace, nsSocket?: Namespace, nsServer?: Namespace) {
|
||||
super(path);
|
||||
const { dbConfig } = this.getConfiguration();
|
||||
const dbConfig = this.getDbConfig();
|
||||
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
|
||||
const depGraph = analyzeDepedency(process.cwd());
|
||||
this.externalDependencies = depGraph.ascOrder;
|
||||
|
|
@ -224,7 +230,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
async mount(initialize?: true) {
|
||||
const { path } = this;
|
||||
if (!initialize) {
|
||||
const { syncConfig: syncConfig } = this.getConfiguration();
|
||||
const syncConfig = this.getSyncConfig();
|
||||
|
||||
if (syncConfig) {
|
||||
this.synchronizer = new Synchronizer(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore));
|
||||
|
|
@ -447,60 +453,186 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查某个数据是否正在被watcher执行
|
||||
* @param name watcher名称
|
||||
* @param dataId 数据ID
|
||||
* @returns 如果没有正在执行则返回true,否则返回false
|
||||
*/
|
||||
private checkDataExecuting(name: string, dataId: string): boolean {
|
||||
let dataSet = this.watcherExecutingData.get(name);
|
||||
if (!dataSet) {
|
||||
dataSet = new Map<string, true>();
|
||||
this.watcherExecutingData.set(name, dataSet);
|
||||
}
|
||||
if (dataSet.has(dataId)) {
|
||||
return false;
|
||||
}
|
||||
dataSet.set(dataId, true);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 过滤出未在执行中的数据行,并标记为执行中
|
||||
* @returns [过滤后的行, 是否有行被跳过]
|
||||
*/
|
||||
private filterAndMarkExecutingRows<T extends keyof ED>(
|
||||
watcher: Watcher<ED, T, Cxt>,
|
||||
rows: Partial<ED[T]['Schema']>[]
|
||||
): [Partial<ED[T]['Schema']>[], boolean] {
|
||||
if (watcher.exclusive !== true) {
|
||||
// 不需要排他执行,直接返回所有行
|
||||
return [rows, false];
|
||||
}
|
||||
|
||||
const rowsWithoutExecuting: Partial<ED[T]['Schema']>[] = [];
|
||||
let hasSkipped = false;
|
||||
const watcherName = watcher.name;
|
||||
|
||||
for (const row of rows) {
|
||||
if (!row.id) {
|
||||
console.error(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】获取的数据没有ID,跳过此数据的并发检查处理:`, row);
|
||||
rowsWithoutExecuting.push(row);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (this.checkDataExecuting(watcherName, row.id)) {
|
||||
rowsWithoutExecuting.push(row);
|
||||
} else {
|
||||
hasSkipped = true;
|
||||
console.warn(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】将跳过正在被执行的数据ID:【${row.id}】,请检查是否执行超时`);
|
||||
}
|
||||
}
|
||||
|
||||
return [rowsWithoutExecuting, hasSkipped];
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理执行标记
|
||||
*/
|
||||
private cleanupExecutingMarks<T extends keyof ED>(
|
||||
watcherName: string,
|
||||
rows: Partial<ED[T]['Schema']>[]
|
||||
) {
|
||||
for (const row of rows) {
|
||||
if (row.id) {
|
||||
const dataSet = this.watcherExecutingData.get(watcherName);
|
||||
if (dataSet) {
|
||||
dataSet.delete(row.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析 filter 和 projection(支持函数或静态值)
|
||||
*/
|
||||
private async resolveFilterAndProjection<T extends ED[keyof ED]['Filter']>(
|
||||
filter: T | (() => Promise<T>),
|
||||
projection: any | (() => Promise<any>)
|
||||
): Promise<[T, any]> {
|
||||
const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter);
|
||||
const projection2 = typeof projection === 'function' ? await projection() : cloneDeep(projection);
|
||||
return [filter2, projection2];
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行 WB 类型 watcher 的查询操作
|
||||
*/
|
||||
private async selectForWBWatcher<T extends keyof ED>(
|
||||
watcher: WBWatcher<ED, T, Cxt> | WBFreeWatcher<ED, T, Cxt>,
|
||||
context: Cxt
|
||||
) {
|
||||
const { entity, projection, filter, singleton, forUpdate } = watcher;
|
||||
const [filter2, projection2] = await this.resolveFilterAndProjection(filter, projection);
|
||||
|
||||
return await this.selectInWatcher(entity, {
|
||||
data: projection2,
|
||||
filter: filter2,
|
||||
}, context, forUpdate, singleton);
|
||||
}
|
||||
|
||||
protected async execWatcher(watcher: Watcher<ED, keyof ED, Cxt>) {
|
||||
let result: OperationResult<ED> | undefined;
|
||||
if (watcher.hasOwnProperty('type') && (watcher as WBFreeWatcher<ED, keyof ED, Cxt>).type === 'free') {
|
||||
const selectContext = await this.makeContext();
|
||||
const { entity, projection, fn, filter, singleton, forUpdate } = <WBFreeWatcher<ED, keyof ED, Cxt>>watcher;
|
||||
const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter);
|
||||
const projection2 = typeof projection === 'function' ? await projection() : cloneDeep(projection);
|
||||
const rows = await this.selectInWatcher(entity, {
|
||||
data: projection2,
|
||||
filter: filter2,
|
||||
}, selectContext, forUpdate, singleton);
|
||||
|
||||
if (rows.length > 0) {
|
||||
result = await fn(() => this.makeContext(), rows);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
const context = await this.makeContext();
|
||||
try {
|
||||
if (watcher.hasOwnProperty('actionData')) {
|
||||
// BBWatcher:直接操作,无需查询
|
||||
if (watcher.hasOwnProperty('actionData')) {
|
||||
const context = await this.makeContext();
|
||||
try {
|
||||
const { entity, action, filter, actionData, singleton } = <BBWatcher<ED, keyof ED>>watcher;
|
||||
const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter);
|
||||
const data = typeof actionData === 'function' ? await (actionData)() : cloneDeep(actionData);
|
||||
const data = typeof actionData === 'function' ? await actionData() : cloneDeep(actionData);
|
||||
|
||||
result = await this.operateInWatcher(entity, {
|
||||
id: await generateNewIdAsync(),
|
||||
action: action as string,
|
||||
data,
|
||||
filter: filter2,
|
||||
}, context, singleton);
|
||||
}
|
||||
else {
|
||||
const { entity, projection, fn, filter, singleton, forUpdate } = <WBWatcher<ED, keyof ED, Cxt>>watcher;
|
||||
const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter);
|
||||
const projection2 = typeof projection === 'function' ? await projection() : cloneDeep(projection);
|
||||
const rows = await this.selectInWatcher(entity, {
|
||||
data: projection2,
|
||||
filter: filter2,
|
||||
}, context, forUpdate, singleton);
|
||||
|
||||
if (rows.length > 0) {
|
||||
result = await fn(context, rows);
|
||||
await context.commit();
|
||||
return result;
|
||||
} catch (err) {
|
||||
if (err instanceof OakPartialSuccess) {
|
||||
await context.commit();
|
||||
} else {
|
||||
await context.rollback();
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
// WBFreeWatcher 和 WBWatcher:查询后执行
|
||||
const isFreeType = watcher.hasOwnProperty('type') &&
|
||||
(watcher as WBFreeWatcher<ED, keyof ED, Cxt>).type === 'free';
|
||||
|
||||
// 1. 执行查询(WBFreeWatcher 使用独立 context)
|
||||
const selectContext = isFreeType ? await this.makeContext() : await this.makeContext();
|
||||
const rows = await this.selectForWBWatcher(
|
||||
watcher as WBWatcher<ED, keyof ED, Cxt> | WBFreeWatcher<ED, keyof ED, Cxt>,
|
||||
selectContext
|
||||
);
|
||||
|
||||
if (isFreeType) {
|
||||
await selectContext.commit();
|
||||
}
|
||||
|
||||
// 2. 并发检查:过滤出未在执行中的数据
|
||||
const [rowsWithoutExecuting, hasSkipped] = this.filterAndMarkExecutingRows(
|
||||
watcher,
|
||||
rows
|
||||
);
|
||||
|
||||
if (rowsWithoutExecuting.length === 0) {
|
||||
if (!isFreeType) {
|
||||
await selectContext.commit();
|
||||
}
|
||||
await context.commit();
|
||||
return result;
|
||||
}
|
||||
catch (err) {
|
||||
if (err instanceof OakPartialSuccess) {
|
||||
await context.commit();
|
||||
|
||||
// 3. 执行业务逻辑
|
||||
try {
|
||||
if (isFreeType) {
|
||||
const { fn } = watcher as WBFreeWatcher<ED, keyof ED, Cxt>;
|
||||
result = await fn(() => this.makeContext(), rowsWithoutExecuting);
|
||||
} else {
|
||||
const { fn } = watcher as WBWatcher<ED, keyof ED, Cxt>;
|
||||
result = await fn(selectContext, rowsWithoutExecuting);
|
||||
await selectContext.commit();
|
||||
}
|
||||
else {
|
||||
await context.rollback();
|
||||
|
||||
return result;
|
||||
} catch (err) {
|
||||
// 清理执行标记
|
||||
this.cleanupExecutingMarks(watcher.name, rowsWithoutExecuting);
|
||||
|
||||
if (!isFreeType) {
|
||||
if (err instanceof OakPartialSuccess) {
|
||||
await selectContext.commit();
|
||||
} else {
|
||||
await selectContext.rollback();
|
||||
}
|
||||
}
|
||||
// 不能在这里publish,因为这个方法可能是在timer中调用,也可能是在routine中调用
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ import { TriggerExecutor } from 'oak-domain/lib/store/TriggerExecutor';
|
|||
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
|
||||
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
|
||||
import { RelationAuth } from 'oak-domain/lib/store/RelationAuth';
|
||||
import { getDbStoreClass } from './utils/dbPriority';
|
||||
import { DbTypeSymbol, getDbStoreClass } from './utils/dbPriority';
|
||||
import { CascadeStore } from 'oak-domain/lib/store/CascadeStore';
|
||||
import { DbStore } from 'oak-db/lib/types/dbStore';
|
||||
|
||||
|
|
@ -37,16 +37,20 @@ export type AppDbStore<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
export const createDbStore = <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>(
|
||||
storageSchema: StorageSchema<ED>,
|
||||
contextBuilder: () => Cxt,
|
||||
dbConfiguration: DbConfiguration,
|
||||
dbConfiguration: DbConfiguration & {
|
||||
[DbTypeSymbol]?: string;
|
||||
},
|
||||
authDeduceRelationMap: AuthDeduceRelationMap<ED>,
|
||||
selectFreeEntities: SelectFreeEntities<ED> = [],
|
||||
updateFreeDict: UpdateFreeDict<ED> = {},
|
||||
onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>
|
||||
): AppDbStore<ED, Cxt> => {
|
||||
|
||||
const BaseStoreClass = getDbStoreClass<ED, Cxt>() as any
|
||||
// TODO: 这里的类型检查会过不去,因为ts不知道上层已经实现这个抽象类。
|
||||
const BaseStoreClass = getDbStoreClass<ED, Cxt>(dbConfiguration)
|
||||
|
||||
// 动态创建继承类
|
||||
// @ts-ignore
|
||||
class DynamicDbStore extends BaseStoreClass implements TriggerStore<ED, Cxt> {
|
||||
private executor: TriggerExecutor<ED, Cxt>;
|
||||
private relationAuth: RelationAuth<ED>;
|
||||
|
|
@ -208,5 +212,5 @@ export const createDbStore = <ED extends EntityDict & BaseEntityDict, Cxt extend
|
|||
}
|
||||
}
|
||||
|
||||
return new DynamicDbStore() as any as AppDbStore<ED, Cxt>;
|
||||
return new DynamicDbStore() as AppDbStore<ED, Cxt>;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -2,11 +2,12 @@ import { MysqlStore, PostgreSQLStore } from "oak-db";
|
|||
import { DbConfiguration } from "oak-db/src/types/configuration";
|
||||
import { AsyncContext, AsyncRowStore } from "oak-domain/lib/store/AsyncRowStore";
|
||||
import { join } from "path";
|
||||
import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types';
|
||||
import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption, RowStore } from 'oak-domain/lib/types';
|
||||
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
|
||||
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
|
||||
import { CascadeStore } from "oak-domain/lib/store/CascadeStore";
|
||||
import { existsSync } from "fs";
|
||||
import { DbStore } from "oak-db/lib/types/dbStore";
|
||||
|
||||
/**
|
||||
* 数据库优先级列表,按顺序尝试获取配置文件
|
||||
|
|
@ -16,9 +17,11 @@ export const dbList = {
|
|||
postgres: PostgreSQLStore
|
||||
}
|
||||
|
||||
let usedDbType: string | null = null;
|
||||
export const DbTypeSymbol = Symbol.for('oak:backend:db:type')
|
||||
|
||||
export const getDbConfig = (path: string): DbConfiguration => {
|
||||
export const getDbConfig = (path: string): DbConfiguration & {
|
||||
[DbTypeSymbol]: string;
|
||||
} => {
|
||||
for (const db of Object.keys(dbList)) {
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-var-requires
|
||||
|
|
@ -33,8 +36,14 @@ export const getDbConfig = (path: string): DbConfiguration => {
|
|||
|
||||
const config = require(dbConfigFile);
|
||||
console.log(`使用${db}作为数据库`);
|
||||
usedDbType = db;
|
||||
return Object.assign({}, config);
|
||||
// 定义不可枚举的属性,避免被序列化
|
||||
Object.defineProperty(config, DbTypeSymbol, {
|
||||
value: db,
|
||||
enumerable: false,
|
||||
writable: false,
|
||||
configurable: false
|
||||
});
|
||||
return config;
|
||||
}
|
||||
catch (err) {
|
||||
// do nothing
|
||||
|
|
@ -45,13 +54,16 @@ export const getDbConfig = (path: string): DbConfiguration => {
|
|||
}
|
||||
|
||||
export const getDbStoreClass = <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>
|
||||
() => {
|
||||
const dbType = usedDbType || (() => {
|
||||
throw new Error('无法确定数据库类型');
|
||||
})();
|
||||
(config: {
|
||||
[DbTypeSymbol]?: string;
|
||||
}) => {
|
||||
const dbType = Object.getOwnPropertyDescriptor(config, DbTypeSymbol)?.value;
|
||||
if (!dbType) {
|
||||
throw new Error('无法获取数据库类型,请确认是否存在数据库配置文件');
|
||||
}
|
||||
const DbStoreClass = dbList[dbType.toLowerCase() as keyof typeof dbList];
|
||||
if (!DbStoreClass) {
|
||||
throw new Error(`不支持的数据库类型:${dbType},请确认是否存在以下配置文件:${Object.keys(dbList).map(ele => `${ele}.json`).join('、')}`);
|
||||
}
|
||||
return DbStoreClass as new (schema: StorageSchema<ED>, config: DbConfiguration) => AsyncRowStore<ED, Cxt> & CascadeStore<ED>;
|
||||
return DbStoreClass as new (schema: StorageSchema<ED>, config: DbConfiguration) => DbStore<ED, Cxt> & CascadeStore<ED>;
|
||||
}
|
||||
Loading…
Reference in New Issue