Compare commits

...

8 Commits
4.1.26 ... dev

13 changed files with 515 additions and 118 deletions

40
lib/AppLoader.d.ts vendored
View File

@ -1,5 +1,5 @@
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AppLoader as GeneralAppLoader, Trigger, EntityDict, Watcher, OpRecord, FreeTimer, OperationResult } from "oak-domain/lib/types"; import { AppLoader as GeneralAppLoader, Trigger, EntityDict, Watcher, OpRecord, FreeTimer, OperationResult, BaseTimer } from "oak-domain/lib/types";
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { IncomingHttpHeaders, IncomingMessage } from 'http'; import { IncomingHttpHeaders, IncomingMessage } from 'http';
import { Namespace } from 'socket.io'; import { Namespace } from 'socket.io';
@ -19,6 +19,7 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
private watcherTimerId?; private watcherTimerId?;
private scheduledJobs; private scheduledJobs;
private internalErrorHandlers; private internalErrorHandlers;
private watcherExecutingData;
regAllExceptionHandler(): void; regAllExceptionHandler(): void;
/** /**
* *
@ -32,9 +33,15 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
private requireSth; private requireSth;
protected makeContext(cxtStr?: string, headers?: IncomingHttpHeaders): Promise<Cxt>; protected makeContext(cxtStr?: string, headers?: IncomingHttpHeaders): Promise<Cxt>;
/** /**
* configuration *
* @returns
*/ */
private getConfiguration; private getDbConfig;
/**
*
* @returns
*/
private getSyncConfig;
constructor(path: string, nsSubscribe?: Namespace, nsSocket?: Namespace, nsServer?: Namespace); constructor(path: string, nsSubscribe?: Namespace, nsSocket?: Namespace, nsServer?: Namespace);
protected registerTrigger(trigger: Trigger<ED, keyof ED, Cxt>): void; protected registerTrigger(trigger: Trigger<ED, keyof ED, Cxt>): void;
protected initTriggers(): void; protected initTriggers(): void;
@ -55,11 +62,36 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
}>][]; }>][];
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt, singleton?: true): Promise<OperationResult<ED>>; protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt, singleton?: true): Promise<OperationResult<ED>>;
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true): Promise<Partial<ED[T]["Schema"]>[]>; protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true): Promise<Partial<ED[T]["Schema"]>[]>;
/**
* watcher执行
* @param name watcher名称
* @param dataId ID
* @returns truefalse
*/
private checkDataExecuting;
/**
*
* @returns [, ]
*/
private filterAndMarkExecutingRows;
/**
*
*/
private cleanupExecutingMarks;
/**
* filter projection
*/
private resolveFilterAndProjection;
/**
* WB watcher
*/
private selectForWBWatcher;
protected execWatcher(watcher: Watcher<ED, keyof ED, Cxt>): Promise<OperationResult<ED> | undefined>; protected execWatcher(watcher: Watcher<ED, keyof ED, Cxt>): Promise<OperationResult<ED> | undefined>;
protected getCheckpointTs(): number; protected getCheckpointTs(): number;
protected checkpoint(): Promise<number>; protected checkpoint(): Promise<number>;
startWatchers(): void; startWatchers(): void;
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined; protected execBaseTimer(timer: BaseTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined;
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, contextBuilder: () => Promise<Cxt>): Promise<OperationResult<ED>> | undefined;
startTimers(): void; startTimers(): void;
execStartRoutines(): Promise<void>; execStartRoutines(): Promise<void>;
execStopRoutines(): Promise<void>; execStopRoutines(): Promise<void>;

View File

@ -30,6 +30,7 @@ class AppLoader extends types_1.AppLoader {
watcherTimerId; watcherTimerId;
scheduledJobs = {}; scheduledJobs = {};
internalErrorHandlers = new Array(); internalErrorHandlers = new Array();
watcherExecutingData = new Map();
regAllExceptionHandler() { regAllExceptionHandler() {
const handlers = this.requireSth('lib/configuration/exception'); const handlers = this.requireSth('lib/configuration/exception');
if (Array.isArray(handlers)) { if (Array.isArray(handlers)) {
@ -57,13 +58,12 @@ class AppLoader extends types_1.AppLoader {
* 发布内部错误事件给注册的处理器 * 发布内部错误事件给注册的处理器
*/ */
async publishInternalError(type, message, err) { async publishInternalError(type, message, err) {
const errorToPublish = (0, lodash_1.cloneDeep)(err);
await Promise.all(this.internalErrorHandlers.map((handler) => { await Promise.all(this.internalErrorHandlers.map((handler) => {
return new Promise(async (resolve) => { return new Promise(async (resolve) => {
const ctx = await this.makeContext(); const ctx = await this.makeContext();
try { try {
console.log(`调用internalErrorHandler【${handler.name}】处理内部错误事件`); console.log(`调用internalErrorHandler【${handler.name}】处理内部错误事件`);
await handler.handle(ctx, type, message, errorToPublish); await handler.handle(ctx, type, message, err);
await ctx.commit(); await ctx.commit();
} }
catch (e) { catch (e) {
@ -93,20 +93,24 @@ class AppLoader extends types_1.AppLoader {
return context; return context;
} }
/** /**
* 后台启动的configuration统一放在这里读取 * 获取数据库配置
* @returns 读取数据库配置
*/ */
getConfiguration() { getDbConfig() {
const dbConfig = (0, dbPriority_1.getDbConfig)(this.path); return (0, dbPriority_1.getDbConfig)(this.path);
}
/**
* 获取同步配置
* @returns 读取同步配置
*/
getSyncConfig() {
const syncConfigFile = (0, path_1.join)(this.path, 'lib', 'configuration', 'sync.js'); const syncConfigFile = (0, path_1.join)(this.path, 'lib', 'configuration', 'sync.js');
const syncConfigs = (0, fs_1.existsSync)(syncConfigFile) && require(syncConfigFile).default; const syncConfigs = (0, fs_1.existsSync)(syncConfigFile) && require(syncConfigFile).default;
return { return syncConfigs;
dbConfig: dbConfig,
syncConfig: syncConfigs,
};
} }
constructor(path, nsSubscribe, nsSocket, nsServer) { constructor(path, nsSubscribe, nsSocket, nsServer) {
super(path); super(path);
const { dbConfig } = this.getConfiguration(); const dbConfig = this.getDbConfig();
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`); const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
const depGraph = (0, dependencyBuilder_1.analyzeDepedency)(process.cwd()); const depGraph = (0, dependencyBuilder_1.analyzeDepedency)(process.cwd());
this.externalDependencies = depGraph.ascOrder; this.externalDependencies = depGraph.ascOrder;
@ -174,7 +178,7 @@ class AppLoader extends types_1.AppLoader {
async mount(initialize) { async mount(initialize) {
const { path } = this; const { path } = this;
if (!initialize) { if (!initialize) {
const { syncConfig: syncConfig } = this.getConfiguration(); const syncConfig = this.getSyncConfig();
if (syncConfig) { if (syncConfig) {
this.synchronizer = new Synchronizer_1.default(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore)); this.synchronizer = new Synchronizer_1.default(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore));
} }
@ -369,44 +373,154 @@ class AppLoader extends types_1.AppLoader {
forUpdate, forUpdate,
}); });
} }
/**
* 检查某个数据是否正在被watcher执行
* @param name watcher名称
* @param dataId 数据ID
* @returns 如果没有正在执行则返回true否则返回false
*/
checkDataExecuting(name, dataId) {
let dataSet = this.watcherExecutingData.get(name);
if (!dataSet) {
dataSet = new Map();
this.watcherExecutingData.set(name, dataSet);
}
if (dataSet.has(dataId)) {
return false;
}
dataSet.set(dataId, true);
return true;
}
/**
* 过滤出未在执行中的数据行并标记为执行中
* @returns [过滤后的行, 是否有行被跳过]
*/
filterAndMarkExecutingRows(watcher, rows) {
if (watcher.exclusive !== true) {
// 不需要排他执行,直接返回所有行
return [rows, []];
}
const rowsWithoutExecuting = [];
const skipedRows = [];
const watcherName = watcher.name;
for (const row of rows) {
if (!row.id) {
console.error(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】获取的数据没有ID跳过此数据的并发检查处理`, row);
rowsWithoutExecuting.push(row);
continue;
}
if (this.checkDataExecuting(watcherName, row.id)) {
rowsWithoutExecuting.push(row);
}
else {
skipedRows.push(row);
console.warn(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】将跳过正在被执行的数据ID${row.id}】,请检查是否执行超时`);
}
}
return [rowsWithoutExecuting, skipedRows];
}
/**
* 清理执行标记
*/
cleanupExecutingMarks(watcherName, rows) {
for (const row of rows) {
if (row.id) {
const dataSet = this.watcherExecutingData.get(watcherName);
if (dataSet) {
dataSet.delete(row.id);
}
}
}
}
/**
* 解析 filter projection支持函数或静态值
*/
async resolveFilterAndProjection(filter, projection) {
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter);
const projection2 = typeof projection === 'function' ? await projection() : (0, lodash_1.cloneDeep)(projection);
return [filter2, projection2];
}
/**
* 执行 WB 类型 watcher 的查询操作
*/
async selectForWBWatcher(watcher, context) {
const { entity, projection, filter, singleton, forUpdate } = watcher;
const [filter2, projection2] = await this.resolveFilterAndProjection(filter, projection);
return await this.selectInWatcher(entity, {
data: projection2,
filter: filter2,
}, context, forUpdate, singleton);
}
async execWatcher(watcher) { async execWatcher(watcher) {
const context = await this.makeContext();
let result; let result;
try { // BBWatcher直接操作无需查询
if (watcher.hasOwnProperty('actionData')) { if (watcher.hasOwnProperty('actionData')) {
const context = await this.makeContext();
try {
const { entity, action, filter, actionData, singleton } = watcher; const { entity, action, filter, actionData, singleton } = watcher;
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter); const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter);
const data = typeof actionData === 'function' ? await (actionData)() : (0, lodash_1.cloneDeep)(actionData); const data = typeof actionData === 'function' ? await actionData() : (0, lodash_1.cloneDeep)(actionData);
result = await this.operateInWatcher(entity, { result = await this.operateInWatcher(entity, {
id: await (0, uuid_1.generateNewIdAsync)(), id: await (0, uuid_1.generateNewIdAsync)(),
action: action, action: action,
data, data,
filter: filter2, filter: filter2,
}, context, singleton); }, context, singleton);
await context.commit();
return result;
}
catch (err) {
if (err instanceof types_1.OakPartialSuccess) {
await context.commit();
}
else {
await context.rollback();
}
throw err;
}
}
// WBFreeWatcher 和 WBWatcher查询后执行
const isFreeType = watcher.hasOwnProperty('type') &&
watcher.type === 'free';
// 1. 执行查询WBFreeWatcher 使用独立 context
const selectContext = isFreeType ? await this.makeContext() : await this.makeContext();
const rows = await this.selectForWBWatcher(watcher, selectContext);
if (isFreeType) {
await selectContext.commit();
}
// 2. 并发检查:过滤出未在执行中的数据
const [rowsWithoutExecuting, hasSkipped] = this.filterAndMarkExecutingRows(watcher, rows);
if (rowsWithoutExecuting.length === 0) {
if (!isFreeType) {
await selectContext.commit();
}
this.cleanupExecutingMarks(watcher.name, hasSkipped);
return result;
}
// 3. 执行业务逻辑
try {
if (isFreeType) {
const { fn } = watcher;
result = await fn(() => this.makeContext(), rowsWithoutExecuting);
} }
else { else {
const { entity, projection, fn, filter, singleton, forUpdate } = watcher; const { fn } = watcher;
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter); result = await fn(selectContext, rowsWithoutExecuting);
const projection2 = typeof projection === 'function' ? await projection() : (0, lodash_1.cloneDeep)(projection); await selectContext.commit();
const rows = await this.selectInWatcher(entity, {
data: projection2,
filter: filter2,
}, context, forUpdate, singleton);
if (rows.length > 0) {
result = await fn(context, rows);
}
} }
await context.commit();
return result; return result;
} }
catch (err) { catch (err) {
if (err instanceof types_1.OakPartialSuccess) { // 清理执行标记
await context.commit(); this.cleanupExecutingMarks(watcher.name, rows);
if (!isFreeType) {
if (err instanceof types_1.OakPartialSuccess) {
await selectContext.commit();
}
else {
await selectContext.rollback();
}
} }
else {
await context.rollback();
}
// 不能在这里publish因为这个方法可能是在timer中调用也可能是在routine中调用
throw err; throw err;
} }
} }
@ -423,7 +537,13 @@ class AppLoader extends types_1.AppLoader {
const { watchers: adWatchers } = (0, IntrinsicLogics_1.makeIntrinsicLogics)(this.dbStore.getSchema(), ActionDefDict); const { watchers: adWatchers } = (0, IntrinsicLogics_1.makeIntrinsicLogics)(this.dbStore.getSchema(), ActionDefDict);
const totalWatchers = (watchers || []).concat(adWatchers); const totalWatchers = (watchers || []).concat(adWatchers);
let count = 0; let count = 0;
const skipOnceSet = new Set();
const execOne = async (watcher, start) => { const execOne = async (watcher, start) => {
if (skipOnceSet.has(watcher.name)) {
skipOnceSet.delete(watcher.name);
console.log(`跳过本次执行watcher【${watcher.name}`);
return;
}
try { try {
const result = await this.execWatcher(watcher); const result = await this.execWatcher(watcher);
if (result) { if (result) {
@ -455,12 +575,22 @@ class AppLoader extends types_1.AppLoader {
} }
this.watcherTimerId = setTimeout(() => doWatchers(), 120000); this.watcherTimerId = setTimeout(() => doWatchers(), 120000);
}; };
// 首次执行时跳过所有lazy的watcher
for (const w of totalWatchers) {
if (w.lazy) {
skipOnceSet.add(w.name);
}
}
doWatchers(); doWatchers();
} }
execFreeTimer(timer, context) { execBaseTimer(timer, context) {
const { timer: timerFn } = timer; const { timer: timerFn } = timer;
return timerFn(context); return timerFn(context);
} }
execFreeTimer(timer, contextBuilder) {
const { timer: timerFn } = timer;
return timerFn(contextBuilder);
}
startTimers() { startTimers() {
const timers = this.requireSth('lib/timers/index'); const timers = this.requireSth('lib/timers/index');
if (timers) { if (timers) {
@ -480,9 +610,20 @@ class AppLoader extends types_1.AppLoader {
} }
} }
else { else {
if (timer.hasOwnProperty('type') && timer.type === 'free') {
try {
const result = await this.execFreeTimer(timer, () => this.makeContext());
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
}
catch (err) {
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
}
return;
}
const context = await this.makeContext(); const context = await this.makeContext();
try { try {
const result = await this.execFreeTimer(timer, context); const result = await this.execBaseTimer(timer, context);
if (result) { if (result) {
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result); console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
} }

View File

@ -1,5 +1,5 @@
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { EntityDict, OperationResult, Trigger, Watcher, FreeTimer } from 'oak-domain/lib/types'; import { EntityDict, OperationResult, Trigger, Watcher, FreeTimer, BaseTimer } from 'oak-domain/lib/types';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { AppLoader } from './AppLoader'; import { AppLoader } from './AppLoader';
import { Namespace } from 'socket.io'; import { Namespace } from 'socket.io';
@ -15,6 +15,7 @@ export declare class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cx
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt, singleton?: true): Promise<OperationResult<ED>>; protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt, singleton?: true): Promise<OperationResult<ED>>;
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true): Promise<Partial<ED[T]['Schema']>[]>; protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true): Promise<Partial<ED[T]['Schema']>[]>;
protected execWatcher(watcher: Watcher<ED, keyof ED, Cxt>): Promise<OperationResult<ED> | undefined>; protected execWatcher(watcher: Watcher<ED, keyof ED, Cxt>): Promise<OperationResult<ED> | undefined>;
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined; protected execBaseTimer(timer: BaseTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined;
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, contextBuilder: () => Promise<Cxt>): Promise<OperationResult<ED>> | undefined;
protected checkpoint(): Promise<number>; protected checkpoint(): Promise<number>;
} }

View File

@ -168,11 +168,17 @@ class ClusterAppLoader extends AppLoader_1.AppLoader {
} }
return super.execWatcher(watcher); return super.execWatcher(watcher);
} }
execFreeTimer(timer, context) { execBaseTimer(timer, context) {
if (timer.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) { if (timer.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) {
return; return;
} }
return super.execFreeTimer(timer, context); return super.execBaseTimer(timer, context);
}
execFreeTimer(timer, contextBuilder) {
if (timer.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) {
return;
}
return super.execFreeTimer(timer, contextBuilder);
} }
async checkpoint() { async checkpoint() {
const { instanceCount, instanceId } = (0, env_1.getClusterInfo)(); const { instanceCount, instanceId } = (0, env_1.getClusterInfo)();

5
lib/DbStore.d.ts vendored
View File

@ -2,6 +2,7 @@ import { DbConfiguration } from 'oak-db/src/types/configuration';
import { EntityDict, StorageSchema, Trigger, Checker, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types'; import { EntityDict, StorageSchema, Trigger, Checker, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { DbTypeSymbol } from './utils/dbPriority';
import { CascadeStore } from 'oak-domain/lib/store/CascadeStore'; import { CascadeStore } from 'oak-domain/lib/store/CascadeStore';
import { DbStore } from 'oak-db/lib/types/dbStore'; import { DbStore } from 'oak-db/lib/types/dbStore';
export type TriggerStore<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> = { export type TriggerStore<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> = {
@ -13,4 +14,6 @@ export type TriggerStore<ED extends EntityDict & BaseEntityDict, Cxt extends Bac
independentCheckPoint(name: string, ts: number, instanceCount?: number, instanceId?: number): Promise<number>; independentCheckPoint(name: string, ts: number, instanceCount?: number, instanceId?: number): Promise<number>;
}; };
export type AppDbStore<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> = DbStore<ED, Cxt> & CascadeStore<ED> & TriggerStore<ED, Cxt>; export type AppDbStore<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> = DbStore<ED, Cxt> & CascadeStore<ED> & TriggerStore<ED, Cxt>;
export declare const createDbStore: <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>(storageSchema: StorageSchema<ED>, contextBuilder: () => Cxt, dbConfiguration: DbConfiguration, authDeduceRelationMap: AuthDeduceRelationMap<ED>, selectFreeEntities?: SelectFreeEntities<ED>, updateFreeDict?: UpdateFreeDict<ED>, onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>) => AppDbStore<ED, Cxt>; export declare const createDbStore: <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>(storageSchema: StorageSchema<ED>, contextBuilder: () => Cxt, dbConfiguration: DbConfiguration & {
[DbTypeSymbol]?: string;
}, authDeduceRelationMap: AuthDeduceRelationMap<ED>, selectFreeEntities?: SelectFreeEntities<ED>, updateFreeDict?: UpdateFreeDict<ED>, onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>) => AppDbStore<ED, Cxt>;

View File

@ -5,8 +5,10 @@ const TriggerExecutor_1 = require("oak-domain/lib/store/TriggerExecutor");
const RelationAuth_1 = require("oak-domain/lib/store/RelationAuth"); const RelationAuth_1 = require("oak-domain/lib/store/RelationAuth");
const dbPriority_1 = require("./utils/dbPriority"); const dbPriority_1 = require("./utils/dbPriority");
const createDbStore = (storageSchema, contextBuilder, dbConfiguration, authDeduceRelationMap, selectFreeEntities = [], updateFreeDict = {}, onVolatileTrigger) => { const createDbStore = (storageSchema, contextBuilder, dbConfiguration, authDeduceRelationMap, selectFreeEntities = [], updateFreeDict = {}, onVolatileTrigger) => {
const BaseStoreClass = (0, dbPriority_1.getDbStoreClass)(); // TODO: 这里的类型检查会过不去因为ts不知道上层已经实现这个抽象类。
const BaseStoreClass = (0, dbPriority_1.getDbStoreClass)(dbConfiguration);
// 动态创建继承类 // 动态创建继承类
// @ts-ignore
class DynamicDbStore extends BaseStoreClass { class DynamicDbStore extends BaseStoreClass {
executor; executor;
relationAuth; relationAuth;

View File

@ -1,10 +1,10 @@
import { MysqlStore, PostgreSQLStore } from "oak-db"; import { MysqlStore, PostgreSQLStore } from "oak-db";
import { DbConfiguration } from "oak-db/src/types/configuration"; import { DbConfiguration } from "oak-db/src/types/configuration";
import { AsyncRowStore } from "oak-domain/lib/store/AsyncRowStore";
import { EntityDict, StorageSchema } from 'oak-domain/lib/types'; import { EntityDict, StorageSchema } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { CascadeStore } from "oak-domain/lib/store/CascadeStore"; import { CascadeStore } from "oak-domain/lib/store/CascadeStore";
import { DbStore } from "oak-db/lib/types/dbStore";
/** /**
* *
*/ */
@ -12,5 +12,10 @@ export declare const dbList: {
mysql: typeof MysqlStore; mysql: typeof MysqlStore;
postgres: typeof PostgreSQLStore; postgres: typeof PostgreSQLStore;
}; };
export declare const getDbConfig: (path: string) => DbConfiguration; export declare const DbTypeSymbol: unique symbol;
export declare const getDbStoreClass: <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>() => new (schema: StorageSchema<ED>, config: DbConfiguration) => AsyncRowStore<ED, Cxt> & CascadeStore<ED>; export declare const getDbConfig: (path: string) => DbConfiguration & {
[DbTypeSymbol]: string;
};
export declare const getDbStoreClass: <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>(config: {
[DbTypeSymbol]?: string;
}) => new (schema: StorageSchema<ED>, config: DbConfiguration) => DbStore<ED, Cxt> & CascadeStore<ED>;

View File

@ -1,6 +1,6 @@
"use strict"; "use strict";
Object.defineProperty(exports, "__esModule", { value: true }); Object.defineProperty(exports, "__esModule", { value: true });
exports.getDbStoreClass = exports.getDbConfig = exports.dbList = void 0; exports.getDbStoreClass = exports.getDbConfig = exports.DbTypeSymbol = exports.dbList = void 0;
const oak_db_1 = require("oak-db"); const oak_db_1 = require("oak-db");
const path_1 = require("path"); const path_1 = require("path");
const fs_1 = require("fs"); const fs_1 = require("fs");
@ -11,7 +11,7 @@ exports.dbList = {
mysql: oak_db_1.MysqlStore, mysql: oak_db_1.MysqlStore,
postgres: oak_db_1.PostgreSQLStore postgres: oak_db_1.PostgreSQLStore
}; };
let usedDbType = null; exports.DbTypeSymbol = Symbol.for('oak:backend:db:type');
const getDbConfig = (path) => { const getDbConfig = (path) => {
for (const db of Object.keys(exports.dbList)) { for (const db of Object.keys(exports.dbList)) {
try { try {
@ -25,8 +25,14 @@ const getDbConfig = (path) => {
} }
const config = require(dbConfigFile); const config = require(dbConfigFile);
console.log(`使用${db}作为数据库`); console.log(`使用${db}作为数据库`);
usedDbType = db; // 定义不可枚举的属性,避免被序列化
return Object.assign({}, config); Object.defineProperty(config, exports.DbTypeSymbol, {
value: db,
enumerable: false,
writable: false,
configurable: false
});
return config;
} }
catch (err) { catch (err) {
// do nothing // do nothing
@ -35,10 +41,11 @@ const getDbConfig = (path) => {
throw new Error(`没有找到数据库配置文件请在configuration目录下添加任一配置文件${Object.keys(exports.dbList).map(ele => `${ele}.json`).join('、')}`); throw new Error(`没有找到数据库配置文件请在configuration目录下添加任一配置文件${Object.keys(exports.dbList).map(ele => `${ele}.json`).join('、')}`);
}; };
exports.getDbConfig = getDbConfig; exports.getDbConfig = getDbConfig;
const getDbStoreClass = () => { const getDbStoreClass = (config) => {
const dbType = usedDbType || (() => { const dbType = Object.getOwnPropertyDescriptor(config, exports.DbTypeSymbol)?.value;
throw new Error('无法确定数据库类型'); if (!dbType) {
})(); throw new Error('无法获取数据库类型,请确认是否存在数据库配置文件');
}
const DbStoreClass = exports.dbList[dbType.toLowerCase()]; const DbStoreClass = exports.dbList[dbType.toLowerCase()];
if (!DbStoreClass) { if (!DbStoreClass) {
throw new Error(`不支持的数据库类型:${dbType},请确认是否存在以下配置文件:${Object.keys(exports.dbList).map(ele => `${ele}.json`).join('、')}`); throw new Error(`不支持的数据库类型:${dbType},请确认是否存在以下配置文件:${Object.keys(exports.dbList).map(ele => `${ele}.json`).join('、')}`);

View File

@ -1,6 +1,6 @@
{ {
"name": "oak-backend-base", "name": "oak-backend-base",
"version": "4.1.26", "version": "4.1.28",
"description": "oak-backend-base", "description": "oak-backend-base",
"main": "lib/index", "main": "lib/index",
"author": { "author": {
@ -21,10 +21,10 @@
"mysql": "^2.18.1", "mysql": "^2.18.1",
"mysql2": "^2.3.3", "mysql2": "^2.3.3",
"node-schedule": "^2.1.0", "node-schedule": "^2.1.0",
"oak-common-aspect": "^3.0.5", "oak-common-aspect": "file:../oak-common-aspect",
"oak-db": "^3.3.12", "oak-db": "file:../oak-db",
"oak-domain": "^5.1.33", "oak-domain": "file:../oak-domain",
"oak-frontend-base": "^5.3.45", "oak-frontend-base": "file:../oak-frontend-base",
"socket.io": "^4.8.1", "socket.io": "^4.8.1",
"socket.io-client": "^4.7.2", "socket.io-client": "^4.7.2",
"uuid": "^8.3.2" "uuid": "^8.3.2"

View File

@ -5,7 +5,7 @@ import { makeIntrinsicLogics } from "oak-domain/lib/store/IntrinsicLogics";
import { cloneDeep, mergeConcatMany, omit } from 'oak-domain/lib/utils/lodash'; import { cloneDeep, mergeConcatMany, omit } from 'oak-domain/lib/utils/lodash';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { generateNewIdAsync } from 'oak-domain/lib/utils/uuid'; import { generateNewIdAsync } from 'oak-domain/lib/utils/uuid';
import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, CreateOpResult, SyncConfig, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord, Routine, FreeRoutine, Timer, FreeTimer, StorageSchema, OperationResult, OakPartialSuccess, OakException } from "oak-domain/lib/types"; import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, CreateOpResult, SyncConfig, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord, Routine, FreeRoutine, Timer, FreeTimer, StorageSchema, OperationResult, OakPartialSuccess, OakException, BaseTimer, WBFreeWatcher } from "oak-domain/lib/types";
import generalAspectDict, { clearPorts, registerPorts } from 'oak-common-aspect/lib/index'; import generalAspectDict, { clearPorts, registerPorts } from 'oak-common-aspect/lib/index';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { Endpoint, EndpointItem } from 'oak-domain/lib/types/Endpoint'; import { Endpoint, EndpointItem } from 'oak-domain/lib/types/Endpoint';
@ -36,6 +36,8 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
private scheduledJobs: Record<string, Job> = {}; private scheduledJobs: Record<string, Job> = {};
private internalErrorHandlers = new Array<InternalErrorHandler<ED, Cxt>>(); private internalErrorHandlers = new Array<InternalErrorHandler<ED, Cxt>>();
private watcherExecutingData: Map<string, Map<string, true>> = new Map();
public regAllExceptionHandler() { public regAllExceptionHandler() {
const handlers = this.requireSth('lib/configuration/exception') as Array<InternalErrorHandler<ED, Cxt>> | InternalErrorHandler<ED, Cxt>; const handlers = this.requireSth('lib/configuration/exception') as Array<InternalErrorHandler<ED, Cxt>> | InternalErrorHandler<ED, Cxt>;
if (Array.isArray(handlers)) { if (Array.isArray(handlers)) {
@ -66,14 +68,13 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
* *
*/ */
private async publishInternalError(type: InternalErrorType, message: string, err: any) { private async publishInternalError(type: InternalErrorType, message: string, err: any) {
const errorToPublish = cloneDeep(err);
await Promise.all(this.internalErrorHandlers.map( await Promise.all(this.internalErrorHandlers.map(
(handler) => { (handler) => {
return new Promise<void>(async (resolve) => { return new Promise<void>(async (resolve) => {
const ctx = await this.makeContext(); const ctx = await this.makeContext();
try { try {
console.log(`调用internalErrorHandler【${handler.name}】处理内部错误事件`); console.log(`调用internalErrorHandler【${handler.name}】处理内部错误事件`);
await handler.handle(ctx, type, message, errorToPublish); await handler.handle(ctx, type, message, err);
await ctx.commit() await ctx.commit()
} catch (e) { } catch (e) {
console.error('执行internalErrorHandler时出错', e); console.error('执行internalErrorHandler时出错', e);
@ -106,22 +107,26 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
} }
/** /**
* configuration *
* @returns
*/ */
private getConfiguration() { private getDbConfig() {
const dbConfig = getDbConfig(this.path); return getDbConfig(this.path);
}
/**
*
* @returns
*/
private getSyncConfig() {
const syncConfigFile = join(this.path, 'lib', 'configuration', 'sync.js'); const syncConfigFile = join(this.path, 'lib', 'configuration', 'sync.js');
const syncConfigs = existsSync(syncConfigFile) && require(syncConfigFile).default; const syncConfigs = existsSync(syncConfigFile) && require(syncConfigFile).default;
return syncConfigs as SyncConfig<ED, Cxt> | undefined;
return {
dbConfig: dbConfig,
syncConfig: syncConfigs as SyncConfig<ED, Cxt> | undefined,
};
} }
constructor(path: string, nsSubscribe?: Namespace, nsSocket?: Namespace, nsServer?: Namespace) { constructor(path: string, nsSubscribe?: Namespace, nsSocket?: Namespace, nsServer?: Namespace) {
super(path); super(path);
const { dbConfig } = this.getConfiguration(); const dbConfig = this.getDbConfig();
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`); const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
const depGraph = analyzeDepedency(process.cwd()); const depGraph = analyzeDepedency(process.cwd());
this.externalDependencies = depGraph.ascOrder; this.externalDependencies = depGraph.ascOrder;
@ -225,7 +230,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
async mount(initialize?: true) { async mount(initialize?: true) {
const { path } = this; const { path } = this;
if (!initialize) { if (!initialize) {
const { syncConfig: syncConfig } = this.getConfiguration(); const syncConfig = this.getSyncConfig();
if (syncConfig) { if (syncConfig) {
this.synchronizer = new Synchronizer(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore)); this.synchronizer = new Synchronizer(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore));
@ -448,45 +453,188 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
}) })
} }
/**
* watcher执行
* @param name watcher名称
* @param dataId ID
* @returns truefalse
*/
private checkDataExecuting(name: string, dataId: string): boolean {
let dataSet = this.watcherExecutingData.get(name);
if (!dataSet) {
dataSet = new Map<string, true>();
this.watcherExecutingData.set(name, dataSet);
}
if (dataSet.has(dataId)) {
return false;
}
dataSet.set(dataId, true);
return true;
}
/**
*
* @returns [, ]
*/
private filterAndMarkExecutingRows<T extends keyof ED>(
watcher: Watcher<ED, T, Cxt>,
rows: Partial<ED[T]['Schema']>[]
): [Partial<ED[T]['Schema']>[], Partial<ED[T]['Schema']>[]] {
if (watcher.exclusive !== true) {
// 不需要排他执行,直接返回所有行
return [rows, []];
}
const rowsWithoutExecuting: Partial<ED[T]['Schema']>[] = [];
const skipedRows: Partial<ED[T]['Schema']>[] = [];
const watcherName = watcher.name;
for (const row of rows) {
if (!row.id) {
console.error(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】获取的数据没有ID跳过此数据的并发检查处理`, row);
rowsWithoutExecuting.push(row);
continue;
}
if (this.checkDataExecuting(watcherName, row.id)) {
rowsWithoutExecuting.push(row);
} else {
skipedRows.push(row);
console.warn(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】将跳过正在被执行的数据ID${row.id}】,请检查是否执行超时`);
}
}
return [rowsWithoutExecuting, skipedRows];
}
/**
*
*/
private cleanupExecutingMarks<T extends keyof ED>(
watcherName: string,
rows: Partial<ED[T]['Schema']>[]
) {
for (const row of rows) {
if (row.id) {
const dataSet = this.watcherExecutingData.get(watcherName);
if (dataSet) {
dataSet.delete(row.id);
}
}
}
}
/**
* filter projection
*/
private async resolveFilterAndProjection<T extends ED[keyof ED]['Filter']>(
filter: T | (() => Promise<T>),
projection: any | (() => Promise<any>)
): Promise<[T, any]> {
const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter);
const projection2 = typeof projection === 'function' ? await projection() : cloneDeep(projection);
return [filter2, projection2];
}
/**
* WB watcher
*/
private async selectForWBWatcher<T extends keyof ED>(
watcher: WBWatcher<ED, T, Cxt> | WBFreeWatcher<ED, T, Cxt>,
context: Cxt
) {
const { entity, projection, filter, singleton, forUpdate } = watcher;
const [filter2, projection2] = await this.resolveFilterAndProjection(filter, projection);
return await this.selectInWatcher(entity, {
data: projection2,
filter: filter2,
}, context, forUpdate, singleton);
}
protected async execWatcher(watcher: Watcher<ED, keyof ED, Cxt>) { protected async execWatcher(watcher: Watcher<ED, keyof ED, Cxt>) {
const context = await this.makeContext();
let result: OperationResult<ED> | undefined; let result: OperationResult<ED> | undefined;
try {
if (watcher.hasOwnProperty('actionData')) { // BBWatcher直接操作无需查询
if (watcher.hasOwnProperty('actionData')) {
const context = await this.makeContext();
try {
const { entity, action, filter, actionData, singleton } = <BBWatcher<ED, keyof ED>>watcher; const { entity, action, filter, actionData, singleton } = <BBWatcher<ED, keyof ED>>watcher;
const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter); const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter);
const data = typeof actionData === 'function' ? await (actionData)() : cloneDeep(actionData); const data = typeof actionData === 'function' ? await actionData() : cloneDeep(actionData);
result = await this.operateInWatcher(entity, { result = await this.operateInWatcher(entity, {
id: await generateNewIdAsync(), id: await generateNewIdAsync(),
action: action as string, action: action as string,
data, data,
filter: filter2, filter: filter2,
}, context, singleton); }, context, singleton);
}
else {
const { entity, projection, fn, filter, singleton, forUpdate } = <WBWatcher<ED, keyof ED, Cxt>>watcher;
const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter);
const projection2 = typeof projection === 'function' ? await projection() : cloneDeep(projection);
const rows = await this.selectInWatcher(entity, {
data: projection2,
filter: filter2,
}, context, forUpdate, singleton);
if (rows.length > 0) { await context.commit();
result = await fn(context, rows); return result;
} catch (err) {
if (err instanceof OakPartialSuccess) {
await context.commit();
} else {
await context.rollback();
} }
throw err;
} }
await context.commit(); }
// WBFreeWatcher 和 WBWatcher查询后执行
const isFreeType = watcher.hasOwnProperty('type') &&
(watcher as WBFreeWatcher<ED, keyof ED, Cxt>).type === 'free';
// 1. 执行查询WBFreeWatcher 使用独立 context
const selectContext = isFreeType ? await this.makeContext() : await this.makeContext();
const rows = await this.selectForWBWatcher(
watcher as WBWatcher<ED, keyof ED, Cxt> | WBFreeWatcher<ED, keyof ED, Cxt>,
selectContext
);
if (isFreeType) {
await selectContext.commit();
}
// 2. 并发检查:过滤出未在执行中的数据
const [rowsWithoutExecuting, hasSkipped] = this.filterAndMarkExecutingRows(
watcher,
rows
);
if (rowsWithoutExecuting.length === 0) {
if (!isFreeType) {
await selectContext.commit();
}
this.cleanupExecutingMarks(watcher.name, hasSkipped);
return result; return result;
} }
catch (err) {
if (err instanceof OakPartialSuccess) { // 3. 执行业务逻辑
await context.commit(); try {
if (isFreeType) {
const { fn } = watcher as WBFreeWatcher<ED, keyof ED, Cxt>;
result = await fn(() => this.makeContext(), rowsWithoutExecuting);
} else {
const { fn } = watcher as WBWatcher<ED, keyof ED, Cxt>;
result = await fn(selectContext, rowsWithoutExecuting);
await selectContext.commit();
} }
else {
await context.rollback(); return result;
} catch (err) {
// 清理执行标记
this.cleanupExecutingMarks(watcher.name, rows);
if (!isFreeType) {
if (err instanceof OakPartialSuccess) {
await selectContext.commit();
} else {
await selectContext.rollback();
}
} }
// 不能在这里publish因为这个方法可能是在timer中调用也可能是在routine中调用
throw err; throw err;
} }
} }
@ -508,7 +656,13 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
const totalWatchers = (<Watcher<ED, keyof ED, Cxt>[]>watchers || []).concat(adWatchers); const totalWatchers = (<Watcher<ED, keyof ED, Cxt>[]>watchers || []).concat(adWatchers);
let count = 0; let count = 0;
const skipOnceSet = new Set<string>();
const execOne = async (watcher: Watcher<ED, keyof ED, Cxt>, start: number) => { const execOne = async (watcher: Watcher<ED, keyof ED, Cxt>, start: number) => {
if (skipOnceSet.has(watcher.name)) {
skipOnceSet.delete(watcher.name);
console.log(`跳过本次执行watcher【${watcher.name}`);
return;
}
try { try {
const result = await this.execWatcher(watcher); const result = await this.execWatcher(watcher);
if (result) { if (result) {
@ -542,14 +696,27 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
this.watcherTimerId = setTimeout(() => doWatchers(), 120000); this.watcherTimerId = setTimeout(() => doWatchers(), 120000);
}; };
// 首次执行时跳过所有lazy的watcher
for (const w of totalWatchers) {
if (w.lazy) {
skipOnceSet.add(w.name);
}
}
doWatchers(); doWatchers();
} }
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined { protected execBaseTimer(timer: BaseTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined {
const { timer: timerFn } = timer as FreeTimer<ED, Cxt>; const { timer: timerFn } = timer as BaseTimer<ED, Cxt>;
return timerFn(context); return timerFn(context);
} }
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, contextBuilder: () => Promise<Cxt>): Promise<OperationResult<ED>> | undefined {
const { timer: timerFn } = timer as FreeTimer<ED, Cxt>;
return timerFn(contextBuilder);
}
startTimers() { startTimers() {
const timers: Timer<ED, keyof ED, Cxt>[] = this.requireSth('lib/timers/index'); const timers: Timer<ED, keyof ED, Cxt>[] = this.requireSth('lib/timers/index');
if (timers) { if (timers) {
@ -570,9 +737,19 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
} }
} }
else { else {
if (timer.hasOwnProperty('type') && (timer as FreeTimer<ED, Cxt>).type === 'free') {
try {
const result = await this.execFreeTimer(timer as FreeTimer<ED, Cxt>, () => this.makeContext());
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
} catch (err) {
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
}
return;
}
const context = await this.makeContext(); const context = await this.makeContext();
try { try {
const result = await this.execFreeTimer(timer as FreeTimer<ED, Cxt>, context); const result = await this.execBaseTimer(timer as BaseTimer<ED, Cxt>, context);
if (result) { if (result) {
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result); console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
} }

View File

@ -1,7 +1,7 @@
import { groupBy } from 'oak-domain/lib/utils/lodash'; import { groupBy } from 'oak-domain/lib/utils/lodash';
import { combineFilters } from 'oak-domain/lib/store/filter'; import { combineFilters } from 'oak-domain/lib/store/filter';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { EntityDict, OperationResult, VolatileTrigger, Trigger, OperateOption, OakPartialSuccess, Watcher, FreeTimer } from 'oak-domain/lib/types'; import { EntityDict, OperationResult, VolatileTrigger, Trigger, OperateOption, OakPartialSuccess, Watcher, FreeTimer, BaseTimer } from 'oak-domain/lib/types';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { getClusterInfo } from './cluster/env'; import { getClusterInfo } from './cluster/env';
@ -27,7 +27,7 @@ export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extend
this.socket!.emit('sub', csTriggerNames); this.socket!.emit('sub', csTriggerNames);
} }
}); });
this.socket!.on(ClusterAppLoader.VolatileTriggerEvent, async (entity: keyof ED, name: string, ids: string[], cxtStr: string, option: OperateOption) => { this.socket!.on(ClusterAppLoader.VolatileTriggerEvent, async (entity: keyof ED, name: string, ids: string[], cxtStr: string, option: OperateOption) => {
const context = await this.makeContext(cxtStr); const context = await this.makeContext(cxtStr);
if (process.env.NODE_ENV === 'development') { if (process.env.NODE_ENV === 'development') {
console.log(`${getClusterInfo().instanceId}」号实例接收到来自其它进程的volatileTrigger请求 name是「${name}」, ids是「${ids.join(',')}`); console.log(`${getClusterInfo().instanceId}」号实例接收到来自其它进程的volatileTrigger请求 name是「${name}」, ids是「${ids.join(',')}`);
@ -184,11 +184,18 @@ export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extend
return super.execWatcher(watcher); return super.execWatcher(watcher);
} }
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, context: Cxt) { protected execBaseTimer(timer: BaseTimer<ED, Cxt>, context: Cxt) {
if (timer.singleton && getClusterInfo().instanceId !== 0) { if (timer.singleton && getClusterInfo().instanceId !== 0) {
return; return;
} }
return super.execFreeTimer(timer, context); return super.execBaseTimer(timer, context);
}
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, contextBuilder: () => Promise<Cxt>) {
if (timer.singleton && getClusterInfo().instanceId !== 0) {
return;
}
return super.execFreeTimer(timer, contextBuilder);
} }
protected async checkpoint() { protected async checkpoint() {

View File

@ -6,7 +6,7 @@ import { TriggerExecutor } from 'oak-domain/lib/store/TriggerExecutor';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore'; import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
import { RelationAuth } from 'oak-domain/lib/store/RelationAuth'; import { RelationAuth } from 'oak-domain/lib/store/RelationAuth';
import { getDbStoreClass } from './utils/dbPriority'; import { DbTypeSymbol, getDbStoreClass } from './utils/dbPriority';
import { CascadeStore } from 'oak-domain/lib/store/CascadeStore'; import { CascadeStore } from 'oak-domain/lib/store/CascadeStore';
import { DbStore } from 'oak-db/lib/types/dbStore'; import { DbStore } from 'oak-db/lib/types/dbStore';
@ -37,16 +37,20 @@ export type AppDbStore<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
export const createDbStore = <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>( export const createDbStore = <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>(
storageSchema: StorageSchema<ED>, storageSchema: StorageSchema<ED>,
contextBuilder: () => Cxt, contextBuilder: () => Cxt,
dbConfiguration: DbConfiguration, dbConfiguration: DbConfiguration & {
[DbTypeSymbol]?: string;
},
authDeduceRelationMap: AuthDeduceRelationMap<ED>, authDeduceRelationMap: AuthDeduceRelationMap<ED>,
selectFreeEntities: SelectFreeEntities<ED> = [], selectFreeEntities: SelectFreeEntities<ED> = [],
updateFreeDict: UpdateFreeDict<ED> = {}, updateFreeDict: UpdateFreeDict<ED> = {},
onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void> onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>
): AppDbStore<ED, Cxt> => { ): AppDbStore<ED, Cxt> => {
const BaseStoreClass = getDbStoreClass<ED, Cxt>() as any // TODO: 这里的类型检查会过不去因为ts不知道上层已经实现这个抽象类。
const BaseStoreClass = getDbStoreClass<ED, Cxt>(dbConfiguration)
// 动态创建继承类 // 动态创建继承类
// @ts-ignore
class DynamicDbStore extends BaseStoreClass implements TriggerStore<ED, Cxt> { class DynamicDbStore extends BaseStoreClass implements TriggerStore<ED, Cxt> {
private executor: TriggerExecutor<ED, Cxt>; private executor: TriggerExecutor<ED, Cxt>;
private relationAuth: RelationAuth<ED>; private relationAuth: RelationAuth<ED>;
@ -208,5 +212,5 @@ export const createDbStore = <ED extends EntityDict & BaseEntityDict, Cxt extend
} }
} }
return new DynamicDbStore() as any as AppDbStore<ED, Cxt>; return new DynamicDbStore() as AppDbStore<ED, Cxt>;
}; };

View File

@ -2,11 +2,12 @@ import { MysqlStore, PostgreSQLStore } from "oak-db";
import { DbConfiguration } from "oak-db/src/types/configuration"; import { DbConfiguration } from "oak-db/src/types/configuration";
import { AsyncContext, AsyncRowStore } from "oak-domain/lib/store/AsyncRowStore"; import { AsyncContext, AsyncRowStore } from "oak-domain/lib/store/AsyncRowStore";
import { join } from "path"; import { join } from "path";
import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types'; import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption, RowStore } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { CascadeStore } from "oak-domain/lib/store/CascadeStore"; import { CascadeStore } from "oak-domain/lib/store/CascadeStore";
import { existsSync } from "fs"; import { existsSync } from "fs";
import { DbStore } from "oak-db/lib/types/dbStore";
/** /**
* *
@ -16,9 +17,11 @@ export const dbList = {
postgres: PostgreSQLStore postgres: PostgreSQLStore
} }
let usedDbType: string | null = null; export const DbTypeSymbol = Symbol.for('oak:backend:db:type')
export const getDbConfig = (path: string): DbConfiguration => { export const getDbConfig = (path: string): DbConfiguration & {
[DbTypeSymbol]: string;
} => {
for (const db of Object.keys(dbList)) { for (const db of Object.keys(dbList)) {
try { try {
// eslint-disable-next-line @typescript-eslint/no-var-requires // eslint-disable-next-line @typescript-eslint/no-var-requires
@ -30,11 +33,17 @@ export const getDbConfig = (path: string): DbConfiguration => {
if (existsSync(dbConfigFile) === false) { if (existsSync(dbConfigFile) === false) {
continue; continue;
} }
const config = require(dbConfigFile); const config = require(dbConfigFile);
console.log(`使用${db}作为数据库`); console.log(`使用${db}作为数据库`);
usedDbType = db; // 定义不可枚举的属性,避免被序列化
return Object.assign({}, config); Object.defineProperty(config, DbTypeSymbol, {
value: db,
enumerable: false,
writable: false,
configurable: false
});
return config;
} }
catch (err) { catch (err) {
// do nothing // do nothing
@ -45,13 +54,16 @@ export const getDbConfig = (path: string): DbConfiguration => {
} }
export const getDbStoreClass = <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> export const getDbStoreClass = <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>
() => { (config: {
const dbType = usedDbType || (() => { [DbTypeSymbol]?: string;
throw new Error('无法确定数据库类型'); }) => {
})(); const dbType = Object.getOwnPropertyDescriptor(config, DbTypeSymbol)?.value;
if (!dbType) {
throw new Error('无法获取数据库类型,请确认是否存在数据库配置文件');
}
const DbStoreClass = dbList[dbType.toLowerCase() as keyof typeof dbList]; const DbStoreClass = dbList[dbType.toLowerCase() as keyof typeof dbList];
if (!DbStoreClass) { if (!DbStoreClass) {
throw new Error(`不支持的数据库类型:${dbType},请确认是否存在以下配置文件:${Object.keys(dbList).map(ele => `${ele}.json`).join('、')}`); throw new Error(`不支持的数据库类型:${dbType},请确认是否存在以下配置文件:${Object.keys(dbList).map(ele => `${ele}.json`).join('、')}`);
} }
return DbStoreClass as new (schema: StorageSchema<ED>, config: DbConfiguration) => AsyncRowStore<ED, Cxt> & CascadeStore<ED>; return DbStoreClass as new (schema: StorageSchema<ED>, config: DbConfiguration) => DbStore<ED, Cxt> & CascadeStore<ED>;
} }