Compare commits
40 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
691419645a | |
|
|
4f253bab2c | |
|
|
19f68cb7ec | |
|
|
ac7e1064ff | |
|
|
f48a6dc85b | |
|
|
f3e579fa35 | |
|
|
f51d34ae1a | |
|
|
0cbadaf527 | |
|
|
67d3873895 | |
|
|
bda6e43ca7 | |
|
|
5f2af054fc | |
|
|
d772fb80e4 | |
|
|
92c45f00ce | |
|
|
716b3b192f | |
|
|
77f568fdff | |
|
|
e7053c46ef | |
|
|
bf6416abb0 | |
|
|
3e182c6769 | |
|
|
90129d2c87 | |
|
|
2d917d3648 | |
|
|
e646236e8d | |
|
|
6bf221da5f | |
|
|
9bb033a96b | |
|
|
d19cdec336 | |
|
|
4c1e55982c | |
|
|
713526ce26 | |
|
|
d22137c900 | |
|
|
2f92571fdf | |
|
|
7dbef1ea51 | |
|
|
9cc32f39fd | |
|
|
8b774dd111 | |
|
|
7f14dba278 | |
|
|
2014dbd91b | |
|
|
a9bbc9be97 | |
|
|
34697f3ef5 | |
|
|
8c3b74cb89 | |
|
|
d00eb839a2 | |
|
|
45889306c1 | |
|
|
8484b33da2 | |
|
|
f9625d9bc9 |
|
|
@ -1,29 +1,47 @@
|
|||
/// <reference types="node" />
|
||||
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
|
||||
import { AppLoader as GeneralAppLoader, Trigger, EntityDict, Watcher, OpRecord, FreeTimer, OperationResult } from "oak-domain/lib/types";
|
||||
import { DbStore } from "./DbStore";
|
||||
import { AppLoader as GeneralAppLoader, Trigger, EntityDict, Watcher, OpRecord, FreeTimer, OperationResult, BaseTimer } from "oak-domain/lib/types";
|
||||
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
|
||||
import { IncomingHttpHeaders, IncomingMessage } from 'http';
|
||||
import { Namespace } from 'socket.io';
|
||||
import DataSubscriber from './cluster/DataSubscriber';
|
||||
import Synchronizer from './Synchronizer';
|
||||
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
|
||||
import { InternalErrorHandler } from './types';
|
||||
import { AppDbStore } from './DbStore';
|
||||
export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends GeneralAppLoader<ED, Cxt> {
|
||||
protected dbStore: DbStore<ED, Cxt>;
|
||||
protected dbStore: AppDbStore<ED, Cxt>;
|
||||
private aspectDict;
|
||||
private externalDependencies;
|
||||
protected dataSubscriber?: DataSubscriber<ED, Cxt>;
|
||||
protected synchronizer?: Synchronizer<ED, Cxt>;
|
||||
protected contextBuilder: (store: DbStore<ED, Cxt>) => Cxt;
|
||||
protected contextBuilder: (store: AppDbStore<ED, Cxt>) => Cxt;
|
||||
private nsSocket?;
|
||||
private watcherTimerId?;
|
||||
private scheduledJobs;
|
||||
private internalErrorHandlers;
|
||||
private watcherExecutingData;
|
||||
regAllExceptionHandler(): void;
|
||||
/**
|
||||
* 注册一个内部错误处理器
|
||||
* @param handler 内部错误处理器
|
||||
*/
|
||||
registerInternalErrorHandler(handler: InternalErrorHandler<ED, Cxt>): void;
|
||||
/**
|
||||
* 发布内部错误事件给注册的处理器
|
||||
*/
|
||||
private publishInternalError;
|
||||
private requireSth;
|
||||
protected makeContext(cxtStr?: string, headers?: IncomingHttpHeaders): Promise<Cxt>;
|
||||
/**
|
||||
* 后台启动的configuration,统一放在这里读取
|
||||
* 获取数据库配置
|
||||
* @returns 读取数据库配置
|
||||
*/
|
||||
private getConfiguration;
|
||||
private getDbConfig;
|
||||
/**
|
||||
* 获取同步配置
|
||||
* @returns 读取同步配置
|
||||
*/
|
||||
private getSyncConfig;
|
||||
constructor(path: string, nsSubscribe?: Namespace, nsSocket?: Namespace, nsServer?: Namespace);
|
||||
protected registerTrigger(trigger: Trigger<ED, keyof ED, Cxt>): void;
|
||||
protected initTriggers(): void;
|
||||
|
|
@ -35,20 +53,47 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
|
|||
result: any;
|
||||
message?: string;
|
||||
}>;
|
||||
initialize(): Promise<void>;
|
||||
getStore(): DbStore<ED, Cxt>;
|
||||
initialize(ifExists?: 'drop' | 'omit' | 'dropIfNotStatic'): Promise<void>;
|
||||
getStore(): AppDbStore<ED, Cxt>;
|
||||
getEndpoints(prefix: string): [string, "post" | "get" | "put" | "delete", string, (params: Record<string, string>, headers: IncomingHttpHeaders, req: IncomingMessage, body?: any) => Promise<{
|
||||
headers?: Record<string, string | string[]> | undefined;
|
||||
headers?: Record<string, string | string[]>;
|
||||
data: any;
|
||||
statusCode?: number;
|
||||
}>][];
|
||||
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt, singleton?: true): Promise<OperationResult<ED>>;
|
||||
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true): Promise<Partial<ED[T]["Schema"]>[]>;
|
||||
/**
|
||||
* 检查某个数据是否正在被watcher执行
|
||||
* @param name watcher名称
|
||||
* @param dataId 数据ID
|
||||
* @returns 如果没有正在执行则返回true,否则返回false
|
||||
*/
|
||||
private checkDataExecuting;
|
||||
/**
|
||||
* 过滤出未在执行中的数据行,并标记为执行中
|
||||
* @returns [过滤后的行, 是否有行被跳过]
|
||||
*/
|
||||
private filterAndMarkExecutingRows;
|
||||
/**
|
||||
* 清理执行标记
|
||||
*/
|
||||
private cleanupExecutingMarks;
|
||||
/**
|
||||
* 解析 filter 和 projection(支持函数或静态值)
|
||||
*/
|
||||
private resolveFilterAndProjection;
|
||||
/**
|
||||
* 执行 WB 类型 watcher 的查询操作
|
||||
*/
|
||||
private selectForWBWatcher;
|
||||
protected execWatcher(watcher: Watcher<ED, keyof ED, Cxt>): Promise<OperationResult<ED> | undefined>;
|
||||
protected getCheckpointTs(): number;
|
||||
protected checkpoint(): Promise<number>;
|
||||
startWatchers(): void;
|
||||
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined;
|
||||
protected execBaseTimer(timer: BaseTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined;
|
||||
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, contextBuilder: () => Promise<Cxt>): Promise<OperationResult<ED>> | undefined;
|
||||
startTimers(): void;
|
||||
execStartRoutines(): Promise<void>;
|
||||
execStopRoutines(): Promise<void>;
|
||||
execRoutine(routine: <Cxt extends AsyncContext<ED>>(context: Cxt) => Promise<void>): Promise<void>;
|
||||
}
|
||||
|
|
|
|||
345
lib/AppLoader.js
345
lib/AppLoader.js
|
|
@ -9,7 +9,6 @@ const IntrinsicLogics_1 = require("oak-domain/lib/store/IntrinsicLogics");
|
|||
const lodash_1 = require("oak-domain/lib/utils/lodash");
|
||||
const uuid_1 = require("oak-domain/lib/utils/uuid");
|
||||
const types_1 = require("oak-domain/lib/types");
|
||||
const DbStore_1 = require("./DbStore");
|
||||
const index_1 = tslib_1.__importStar(require("oak-common-aspect/lib/index"));
|
||||
const assert_1 = tslib_1.__importDefault(require("assert"));
|
||||
const dependencyBuilder_1 = require("oak-domain/lib/compiler/dependencyBuilder");
|
||||
|
|
@ -17,6 +16,9 @@ const DataSubscriber_1 = tslib_1.__importDefault(require("./cluster/DataSubscrib
|
|||
const env_1 = require("./cluster/env");
|
||||
const Synchronizer_1 = tslib_1.__importDefault(require("./Synchronizer"));
|
||||
const i18n_1 = tslib_1.__importDefault(require("oak-domain/lib/data/i18n"));
|
||||
const requirePrj_1 = tslib_1.__importDefault(require("./utils/requirePrj"));
|
||||
const dbPriority_1 = require("./utils/dbPriority");
|
||||
const DbStore_1 = require("./DbStore");
|
||||
class AppLoader extends types_1.AppLoader {
|
||||
dbStore;
|
||||
aspectDict;
|
||||
|
|
@ -27,22 +29,55 @@ class AppLoader extends types_1.AppLoader {
|
|||
nsSocket;
|
||||
watcherTimerId;
|
||||
scheduledJobs = {};
|
||||
internalErrorHandlers = new Array();
|
||||
watcherExecutingData = new Map();
|
||||
regAllExceptionHandler() {
|
||||
const handlers = this.requireSth('lib/configuration/exception');
|
||||
if (Array.isArray(handlers)) {
|
||||
handlers.forEach((handler) => {
|
||||
console.log(`注册内部错误处理器: ${handler.name}`);
|
||||
this.registerInternalErrorHandler(handler);
|
||||
});
|
||||
}
|
||||
else {
|
||||
console.warn('lib/configuration/exception必须默认导出一个处理器数组,当前导出类型不正确,将忽略此配置');
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 注册一个内部错误处理器
|
||||
* @param handler 内部错误处理器
|
||||
*/
|
||||
registerInternalErrorHandler(handler) {
|
||||
// 检查有没有名称重复
|
||||
if (this.internalErrorHandlers.find(h => h.name === handler.name)) {
|
||||
throw new Error(`内部错误处理器名称重复: ${handler.name}`);
|
||||
}
|
||||
this.internalErrorHandlers.push(handler);
|
||||
}
|
||||
/**
|
||||
* 发布内部错误事件给注册的处理器
|
||||
*/
|
||||
async publishInternalError(type, message, err) {
|
||||
await Promise.all(this.internalErrorHandlers.map((handler) => {
|
||||
return new Promise(async (resolve) => {
|
||||
const ctx = await this.makeContext();
|
||||
try {
|
||||
console.log(`调用internalErrorHandler【${handler.name}】处理内部错误事件`);
|
||||
await handler.handle(ctx, type, message, err);
|
||||
await ctx.commit();
|
||||
}
|
||||
catch (e) {
|
||||
console.error('执行internalErrorHandler时出错', e);
|
||||
await ctx.rollback();
|
||||
}
|
||||
finally {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
requireSth(filePath) {
|
||||
const depFilePath = (0, path_1.join)(this.path, filePath);
|
||||
let sth;
|
||||
if ((0, fs_1.existsSync)(`${depFilePath}.js`)) {
|
||||
sth = require((0, path_1.join)(this.path, filePath)).default;
|
||||
}
|
||||
const sthExternal = this.externalDependencies.map(ele => {
|
||||
const depFilePath = (0, path_1.join)(this.path, 'node_modules', ele, filePath);
|
||||
if ((0, fs_1.existsSync)(`${depFilePath}.js`)) {
|
||||
return require(depFilePath).default;
|
||||
}
|
||||
}).filter(ele => !!ele);
|
||||
if (sth) {
|
||||
sthExternal.push(sth);
|
||||
}
|
||||
return (0, lodash_1.mergeConcatMany)(sthExternal);
|
||||
return (0, requirePrj_1.default)(this.path, filePath, this.externalDependencies);
|
||||
}
|
||||
async makeContext(cxtStr, headers) {
|
||||
const context = this.contextBuilder(this.dbStore);
|
||||
|
|
@ -58,27 +93,30 @@ class AppLoader extends types_1.AppLoader {
|
|||
return context;
|
||||
}
|
||||
/**
|
||||
* 后台启动的configuration,统一放在这里读取
|
||||
* 获取数据库配置
|
||||
* @returns 读取数据库配置
|
||||
*/
|
||||
getConfiguration() {
|
||||
const dbConfigFile = (0, path_1.join)(this.path, 'configuration', 'mysql.json');
|
||||
const dbConfig = require(dbConfigFile);
|
||||
getDbConfig() {
|
||||
return (0, dbPriority_1.getDbConfig)(this.path);
|
||||
}
|
||||
/**
|
||||
* 获取同步配置
|
||||
* @returns 读取同步配置
|
||||
*/
|
||||
getSyncConfig() {
|
||||
const syncConfigFile = (0, path_1.join)(this.path, 'lib', 'configuration', 'sync.js');
|
||||
const syncConfigs = (0, fs_1.existsSync)(syncConfigFile) && require(syncConfigFile).default;
|
||||
return {
|
||||
dbConfig: dbConfig,
|
||||
syncConfig: syncConfigs,
|
||||
};
|
||||
return syncConfigs;
|
||||
}
|
||||
constructor(path, nsSubscribe, nsSocket, nsServer) {
|
||||
super(path);
|
||||
const { dbConfig } = this.getConfiguration();
|
||||
const dbConfig = this.getDbConfig();
|
||||
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
|
||||
const depGraph = (0, dependencyBuilder_1.analyzeDepedency)(process.cwd());
|
||||
this.externalDependencies = depGraph.ascOrder;
|
||||
const { authDeduceRelationMap, selectFreeEntities, updateFreeDict } = this.requireSth('lib/configuration/relation');
|
||||
this.aspectDict = Object.assign({}, index_1.default, this.requireSth('lib/aspects/index'));
|
||||
this.dbStore = new DbStore_1.DbStore(storageSchema, () => this.contextBuilder(this.dbStore), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
|
||||
this.dbStore = (0, DbStore_1.createDbStore)(storageSchema, () => this.contextBuilder(this.dbStore), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
|
||||
if (nsSubscribe) {
|
||||
this.dataSubscriber = new DataSubscriber_1.default(nsSubscribe, nsServer);
|
||||
}
|
||||
|
|
@ -140,7 +178,7 @@ class AppLoader extends types_1.AppLoader {
|
|||
async mount(initialize) {
|
||||
const { path } = this;
|
||||
if (!initialize) {
|
||||
const { syncConfig: syncConfig } = this.getConfiguration();
|
||||
const syncConfig = this.getSyncConfig();
|
||||
if (syncConfig) {
|
||||
this.synchronizer = new Synchronizer_1.default(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore));
|
||||
}
|
||||
|
|
@ -185,7 +223,9 @@ class AppLoader extends types_1.AppLoader {
|
|||
};
|
||||
}
|
||||
catch (err) {
|
||||
console.error(`执行aspect「${name}」出错`, err);
|
||||
await context.rollback();
|
||||
this.publishInternalError(`aspect`, `执行aspect「${name}」出错`, err);
|
||||
if (err instanceof types_1.OakException) {
|
||||
throw err;
|
||||
}
|
||||
|
|
@ -198,8 +238,8 @@ class AppLoader extends types_1.AppLoader {
|
|||
throw err;
|
||||
}
|
||||
}
|
||||
async initialize() {
|
||||
await this.dbStore.initialize({ ifExists: 'dropIfNotStatic' });
|
||||
async initialize(ifExists) {
|
||||
await this.dbStore.initialize({ ifExists });
|
||||
const data = this.requireSth('lib/data/index');
|
||||
// oak-domain中只有i18n
|
||||
(0, assert_1.default)(data.i18n);
|
||||
|
|
@ -254,7 +294,7 @@ class AppLoader extends types_1.AppLoader {
|
|||
}
|
||||
}
|
||||
}
|
||||
await this.dbStore.disconnect();
|
||||
// await this.dbStore.disconnect(); // 不需要马上断开连接,在initialize后可能还会有操作,unmount时会断开
|
||||
}
|
||||
getStore() {
|
||||
return this.dbStore;
|
||||
|
|
@ -333,42 +373,153 @@ class AppLoader extends types_1.AppLoader {
|
|||
forUpdate,
|
||||
});
|
||||
}
|
||||
/**
|
||||
* 检查某个数据是否正在被watcher执行
|
||||
* @param name watcher名称
|
||||
* @param dataId 数据ID
|
||||
* @returns 如果没有正在执行则返回true,否则返回false
|
||||
*/
|
||||
checkDataExecuting(name, dataId) {
|
||||
let dataSet = this.watcherExecutingData.get(name);
|
||||
if (!dataSet) {
|
||||
dataSet = new Map();
|
||||
this.watcherExecutingData.set(name, dataSet);
|
||||
}
|
||||
if (dataSet.has(dataId)) {
|
||||
return false;
|
||||
}
|
||||
dataSet.set(dataId, true);
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* 过滤出未在执行中的数据行,并标记为执行中
|
||||
* @returns [过滤后的行, 是否有行被跳过]
|
||||
*/
|
||||
filterAndMarkExecutingRows(watcher, rows) {
|
||||
if (watcher.exclusive !== true) {
|
||||
// 不需要排他执行,直接返回所有行
|
||||
return [rows, []];
|
||||
}
|
||||
const rowsWithoutExecuting = [];
|
||||
const skipedRows = [];
|
||||
const watcherName = watcher.name;
|
||||
for (const row of rows) {
|
||||
if (!row.id) {
|
||||
console.error(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】获取的数据没有ID,跳过此数据的并发检查处理:`, row);
|
||||
rowsWithoutExecuting.push(row);
|
||||
continue;
|
||||
}
|
||||
if (this.checkDataExecuting(watcherName, row.id)) {
|
||||
rowsWithoutExecuting.push(row);
|
||||
}
|
||||
else {
|
||||
skipedRows.push(row);
|
||||
console.warn(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】将跳过正在被执行的数据ID:【${row.id}】,请检查是否执行超时`);
|
||||
}
|
||||
}
|
||||
return [rowsWithoutExecuting, skipedRows];
|
||||
}
|
||||
/**
|
||||
* 清理执行标记
|
||||
*/
|
||||
cleanupExecutingMarks(watcherName, rows) {
|
||||
for (const row of rows) {
|
||||
if (row.id) {
|
||||
const dataSet = this.watcherExecutingData.get(watcherName);
|
||||
if (dataSet) {
|
||||
dataSet.delete(row.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 解析 filter 和 projection(支持函数或静态值)
|
||||
*/
|
||||
async resolveFilterAndProjection(filter, projection) {
|
||||
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter);
|
||||
const projection2 = typeof projection === 'function' ? await projection() : (0, lodash_1.cloneDeep)(projection);
|
||||
return [filter2, projection2];
|
||||
}
|
||||
/**
|
||||
* 执行 WB 类型 watcher 的查询操作
|
||||
*/
|
||||
async selectForWBWatcher(watcher, context) {
|
||||
const { entity, projection, filter, singleton, forUpdate } = watcher;
|
||||
const [filter2, projection2] = await this.resolveFilterAndProjection(filter, projection);
|
||||
return await this.selectInWatcher(entity, {
|
||||
data: projection2,
|
||||
filter: filter2,
|
||||
}, context, forUpdate, singleton);
|
||||
}
|
||||
async execWatcher(watcher) {
|
||||
const context = await this.makeContext();
|
||||
let result;
|
||||
try {
|
||||
if (watcher.hasOwnProperty('actionData')) {
|
||||
// BBWatcher:直接操作,无需查询
|
||||
if (watcher.hasOwnProperty('actionData')) {
|
||||
const context = await this.makeContext();
|
||||
try {
|
||||
const { entity, action, filter, actionData, singleton } = watcher;
|
||||
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter);
|
||||
const data = typeof actionData === 'function' ? await (actionData)() : (0, lodash_1.cloneDeep)(actionData);
|
||||
const data = typeof actionData === 'function' ? await actionData() : (0, lodash_1.cloneDeep)(actionData);
|
||||
result = await this.operateInWatcher(entity, {
|
||||
id: await (0, uuid_1.generateNewIdAsync)(),
|
||||
action: action,
|
||||
data,
|
||||
filter: filter2,
|
||||
}, context, singleton);
|
||||
await context.commit();
|
||||
return result;
|
||||
}
|
||||
catch (err) {
|
||||
if (err instanceof types_1.OakPartialSuccess) {
|
||||
await context.commit();
|
||||
}
|
||||
else {
|
||||
await context.rollback();
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
// WBFreeWatcher 和 WBWatcher:查询后执行
|
||||
const isFreeType = watcher.hasOwnProperty('type') &&
|
||||
watcher.type === 'free';
|
||||
// 1. 执行查询(WBFreeWatcher 使用独立 context)
|
||||
const selectContext = isFreeType ? await this.makeContext() : await this.makeContext();
|
||||
const rows = await this.selectForWBWatcher(watcher, selectContext);
|
||||
if (isFreeType) {
|
||||
await selectContext.commit();
|
||||
}
|
||||
// 2. 并发检查:过滤出未在执行中的数据
|
||||
const [rowsWithoutExecuting, hasSkipped] = this.filterAndMarkExecutingRows(watcher, rows);
|
||||
if (rowsWithoutExecuting.length === 0) {
|
||||
if (!isFreeType) {
|
||||
await selectContext.commit();
|
||||
}
|
||||
this.cleanupExecutingMarks(watcher.name, hasSkipped);
|
||||
return result;
|
||||
}
|
||||
// 3. 执行业务逻辑
|
||||
try {
|
||||
if (isFreeType) {
|
||||
const { fn } = watcher;
|
||||
result = await fn(() => this.makeContext(), rowsWithoutExecuting);
|
||||
}
|
||||
else {
|
||||
const { entity, projection, fn, filter, singleton, forUpdate } = watcher;
|
||||
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter);
|
||||
const projection2 = typeof projection === 'function' ? await projection() : (0, lodash_1.cloneDeep)(projection);
|
||||
const rows = await this.selectInWatcher(entity, {
|
||||
data: projection2,
|
||||
filter: filter2,
|
||||
}, context, forUpdate, singleton);
|
||||
if (rows.length > 0) {
|
||||
result = await fn(context, rows);
|
||||
}
|
||||
const { fn } = watcher;
|
||||
result = await fn(selectContext, rowsWithoutExecuting);
|
||||
await selectContext.commit();
|
||||
}
|
||||
await context.commit();
|
||||
return result;
|
||||
}
|
||||
catch (err) {
|
||||
if (err instanceof types_1.OakPartialSuccess) {
|
||||
await context.commit();
|
||||
}
|
||||
else {
|
||||
await context.rollback();
|
||||
// 清理执行标记
|
||||
this.cleanupExecutingMarks(watcher.name, rows);
|
||||
if (!isFreeType) {
|
||||
if (err instanceof types_1.OakPartialSuccess) {
|
||||
await selectContext.commit();
|
||||
}
|
||||
else {
|
||||
await selectContext.rollback();
|
||||
}
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
|
|
@ -386,7 +537,13 @@ class AppLoader extends types_1.AppLoader {
|
|||
const { watchers: adWatchers } = (0, IntrinsicLogics_1.makeIntrinsicLogics)(this.dbStore.getSchema(), ActionDefDict);
|
||||
const totalWatchers = (watchers || []).concat(adWatchers);
|
||||
let count = 0;
|
||||
const skipOnceSet = new Set();
|
||||
const execOne = async (watcher, start) => {
|
||||
if (skipOnceSet.has(watcher.name)) {
|
||||
skipOnceSet.delete(watcher.name);
|
||||
console.log(`跳过本次执行watcher【${watcher.name}】`);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const result = await this.execWatcher(watcher);
|
||||
if (result) {
|
||||
|
|
@ -395,6 +552,7 @@ class AppLoader extends types_1.AppLoader {
|
|||
}
|
||||
catch (err) {
|
||||
console.error(`执行watcher【${watcher.name}】失败,耗时【${Date.now() - start}】,结果是:`, err);
|
||||
await this.publishInternalError(`watcher`, `执行watcher【${watcher.name}】失败`, err);
|
||||
}
|
||||
};
|
||||
const doWatchers = async () => {
|
||||
|
|
@ -413,15 +571,26 @@ class AppLoader extends types_1.AppLoader {
|
|||
}
|
||||
catch (err) {
|
||||
console.error(`执行了checkpoint,发生错误:`, err);
|
||||
await this.publishInternalError(`checkpoint`, `执行checkpoint发生错误`, err);
|
||||
}
|
||||
this.watcherTimerId = setTimeout(() => doWatchers(), 120000);
|
||||
};
|
||||
// 首次执行时,跳过所有lazy的watcher
|
||||
for (const w of totalWatchers) {
|
||||
if (w.lazy) {
|
||||
skipOnceSet.add(w.name);
|
||||
}
|
||||
}
|
||||
doWatchers();
|
||||
}
|
||||
execFreeTimer(timer, context) {
|
||||
execBaseTimer(timer, context) {
|
||||
const { timer: timerFn } = timer;
|
||||
return timerFn(context);
|
||||
}
|
||||
execFreeTimer(timer, contextBuilder) {
|
||||
const { timer: timerFn } = timer;
|
||||
return timerFn(contextBuilder);
|
||||
}
|
||||
startTimers() {
|
||||
const timers = this.requireSth('lib/timers/index');
|
||||
if (timers) {
|
||||
|
|
@ -436,26 +605,39 @@ class AppLoader extends types_1.AppLoader {
|
|||
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒】,结果是`, result);
|
||||
}
|
||||
catch (err) {
|
||||
console.warn(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒】,错误是`, err);
|
||||
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒】,错误是`, err);
|
||||
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (timer.hasOwnProperty('type') && timer.type === 'free') {
|
||||
try {
|
||||
const result = await this.execFreeTimer(timer, () => this.makeContext());
|
||||
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
|
||||
}
|
||||
catch (err) {
|
||||
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
|
||||
}
|
||||
return;
|
||||
}
|
||||
const context = await this.makeContext();
|
||||
try {
|
||||
const result = await this.execFreeTimer(timer, context);
|
||||
const result = await this.execBaseTimer(timer, context);
|
||||
if (result) {
|
||||
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是【${result}】`);
|
||||
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
|
||||
}
|
||||
await context.commit();
|
||||
}
|
||||
catch (err) {
|
||||
console.warn(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
if (err instanceof types_1.OakPartialSuccess) {
|
||||
await context.commit();
|
||||
}
|
||||
else {
|
||||
await context.rollback();
|
||||
}
|
||||
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
@ -474,6 +656,7 @@ class AppLoader extends types_1.AppLoader {
|
|||
async execStartRoutines() {
|
||||
const routines = this.requireSth('lib/routines/start') || [];
|
||||
for (const routine of routines) {
|
||||
console.log(`执行启动例程【${routine.name}】...`);
|
||||
if (routine.hasOwnProperty('entity')) {
|
||||
const start = Date.now();
|
||||
try {
|
||||
|
|
@ -481,7 +664,7 @@ class AppLoader extends types_1.AppLoader {
|
|||
console.log(`例程【${routine.name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
|
||||
}
|
||||
catch (err) {
|
||||
console.warn(`例程【${routine.name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
console.error(`例程【${routine.name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
|
@ -492,12 +675,48 @@ class AppLoader extends types_1.AppLoader {
|
|||
try {
|
||||
const result = await routineFn(context, {
|
||||
socket: this.nsSocket,
|
||||
contextBuilder: () => this.makeContext(),
|
||||
});
|
||||
console.log(`例程【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是【${result}】`);
|
||||
console.log(`例程【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
|
||||
await context.commit();
|
||||
}
|
||||
catch (err) {
|
||||
console.warn(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
console.error(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
await context.rollback();
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
async execStopRoutines() {
|
||||
const routines = this.requireSth('lib/routines/stop') || [];
|
||||
for (const routine of routines) {
|
||||
console.log(`执行停止例程【${routine.name}】...`);
|
||||
if (routine.hasOwnProperty('entity')) {
|
||||
const start = Date.now();
|
||||
try {
|
||||
const result = await this.execWatcher(routine);
|
||||
console.log(`例程【${routine.name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
|
||||
}
|
||||
catch (err) {
|
||||
console.error(`例程【${routine.name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
else {
|
||||
const { name, routine: routineFn } = routine;
|
||||
const context = await this.makeContext();
|
||||
const start = Date.now();
|
||||
try {
|
||||
const result = await routineFn(context, {
|
||||
socket: this.nsSocket,
|
||||
contextBuilder: () => this.makeContext(),
|
||||
});
|
||||
console.log(`例程【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
|
||||
await context.commit();
|
||||
}
|
||||
catch (err) {
|
||||
console.error(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
await context.rollback();
|
||||
throw err;
|
||||
}
|
||||
|
|
@ -506,7 +725,15 @@ class AppLoader extends types_1.AppLoader {
|
|||
}
|
||||
async execRoutine(routine) {
|
||||
const context = await this.makeContext();
|
||||
await routine(context);
|
||||
try {
|
||||
const result = await routine(context);
|
||||
await context.commit();
|
||||
return result;
|
||||
}
|
||||
catch (e) {
|
||||
await context.rollback();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
exports.AppLoader = AppLoader;
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
|
||||
import { EntityDict, OperationResult, Trigger, Watcher, FreeTimer } from 'oak-domain/lib/types';
|
||||
import { EntityDict, OperationResult, Trigger, Watcher, FreeTimer, BaseTimer } from 'oak-domain/lib/types';
|
||||
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
|
||||
import { AppLoader } from './AppLoader';
|
||||
import { Namespace } from 'socket.io';
|
||||
|
|
@ -15,6 +15,7 @@ export declare class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cx
|
|||
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt, singleton?: true): Promise<OperationResult<ED>>;
|
||||
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true): Promise<Partial<ED[T]['Schema']>[]>;
|
||||
protected execWatcher(watcher: Watcher<ED, keyof ED, Cxt>): Promise<OperationResult<ED> | undefined>;
|
||||
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined;
|
||||
protected execBaseTimer(timer: BaseTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined;
|
||||
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, contextBuilder: () => Promise<Cxt>): Promise<OperationResult<ED>> | undefined;
|
||||
protected checkpoint(): Promise<number>;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -168,11 +168,17 @@ class ClusterAppLoader extends AppLoader_1.AppLoader {
|
|||
}
|
||||
return super.execWatcher(watcher);
|
||||
}
|
||||
execFreeTimer(timer, context) {
|
||||
execBaseTimer(timer, context) {
|
||||
if (timer.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) {
|
||||
return;
|
||||
}
|
||||
return super.execFreeTimer(timer, context);
|
||||
return super.execBaseTimer(timer, context);
|
||||
}
|
||||
execFreeTimer(timer, contextBuilder) {
|
||||
if (timer.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) {
|
||||
return;
|
||||
}
|
||||
return super.execFreeTimer(timer, contextBuilder);
|
||||
}
|
||||
async checkpoint() {
|
||||
const { instanceCount, instanceId } = (0, env_1.getClusterInfo)();
|
||||
|
|
|
|||
|
|
@ -1,21 +1,19 @@
|
|||
import { MysqlStore, MySqlSelectOption, MysqlOperateOption } from 'oak-db';
|
||||
import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types';
|
||||
import { DbConfiguration } from 'oak-db/src/types/configuration';
|
||||
import { EntityDict, StorageSchema, Trigger, Checker, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types';
|
||||
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
|
||||
import { MySQLConfiguration } from 'oak-db/lib/MySQL/types/Configuration';
|
||||
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
|
||||
import { AsyncRowStore, AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
|
||||
export declare class DbStore<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends MysqlStore<ED, Cxt> implements AsyncRowStore<ED, Cxt> {
|
||||
private executor;
|
||||
private relationAuth;
|
||||
constructor(storageSchema: StorageSchema<ED>, contextBuilder: () => Cxt, mysqlConfiguration: MySQLConfiguration, authDeduceRelationMap: AuthDeduceRelationMap<ED>, selectFreeEntities?: SelectFreeEntities<ED>, updateFreeDict?: UpdateFreeDict<ED>, onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>);
|
||||
protected cascadeUpdateAsync<T extends keyof ED>(entity: T, operation: ED[T]['Operation'], context: AsyncContext<ED>, option: MysqlOperateOption): Promise<import("oak-domain/lib/types").OperationResult<ED>>;
|
||||
operate<T extends keyof ED>(entity: T, operation: ED[T]['Operation'], context: Cxt, option: MysqlOperateOption): Promise<import("oak-domain/lib/types").OperationResult<ED>>;
|
||||
select<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, option: MySqlSelectOption): Promise<Partial<ED[T]["Schema"]>[]>;
|
||||
count<T extends keyof ED>(entity: T, selection: Pick<ED[T]['Selection'], 'filter' | 'count'>, context: Cxt, option: SelectOption): Promise<number>;
|
||||
import { DbTypeSymbol } from './utils/dbPriority';
|
||||
import { CascadeStore } from 'oak-domain/lib/store/CascadeStore';
|
||||
import { DbStore } from 'oak-db/lib/types/dbStore';
|
||||
export type TriggerStore<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> = {
|
||||
registerTrigger<T extends keyof ED>(trigger: Trigger<ED, T, Cxt>): void;
|
||||
registerChecker<T extends keyof ED>(checker: Checker<ED, T, Cxt>): void;
|
||||
setOnVolatileTrigger(onVolatileTrigger: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>): void;
|
||||
execVolatileTrigger<T extends keyof ED>(entity: T, name: string, ids: string[], context: Cxt, option: OperateOption): Promise<void>;
|
||||
checkpoint(ts: number): Promise<number>;
|
||||
independentCheckPoint(name: string, ts: number, instanceCount?: number, instanceId?: number): Promise<number>;
|
||||
}
|
||||
};
|
||||
export type AppDbStore<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> = DbStore<ED, Cxt> & CascadeStore<ED> & TriggerStore<ED, Cxt>;
|
||||
export declare const createDbStore: <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>(storageSchema: StorageSchema<ED>, contextBuilder: () => Cxt, dbConfiguration: DbConfiguration & {
|
||||
[DbTypeSymbol]?: string;
|
||||
}, authDeduceRelationMap: AuthDeduceRelationMap<ED>, selectFreeEntities?: SelectFreeEntities<ED>, updateFreeDict?: UpdateFreeDict<ED>, onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>) => AppDbStore<ED, Cxt>;
|
||||
|
|
|
|||
234
lib/DbStore.js
234
lib/DbStore.js
|
|
@ -1,125 +1,135 @@
|
|||
"use strict";
|
||||
Object.defineProperty(exports, "__esModule", { value: true });
|
||||
exports.DbStore = void 0;
|
||||
const oak_db_1 = require("oak-db");
|
||||
exports.createDbStore = void 0;
|
||||
const TriggerExecutor_1 = require("oak-domain/lib/store/TriggerExecutor");
|
||||
const RelationAuth_1 = require("oak-domain/lib/store/RelationAuth");
|
||||
class DbStore extends oak_db_1.MysqlStore {
|
||||
executor;
|
||||
relationAuth;
|
||||
constructor(storageSchema, contextBuilder, mysqlConfiguration, authDeduceRelationMap, selectFreeEntities = [], updateFreeDict = {}, onVolatileTrigger) {
|
||||
super(storageSchema, mysqlConfiguration);
|
||||
this.executor = new TriggerExecutor_1.TriggerExecutor(contextBuilder, undefined, onVolatileTrigger);
|
||||
this.relationAuth = new RelationAuth_1.RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
|
||||
}
|
||||
async cascadeUpdateAsync(entity, operation, context, option) {
|
||||
// 如果是在modi处理过程中,所有的trigger也可以延时到apply时再处理(这时候因为modi中的数据并不实际存在,处理会有问题)
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.preOperation(entity, operation, context, option);
|
||||
const dbPriority_1 = require("./utils/dbPriority");
|
||||
const createDbStore = (storageSchema, contextBuilder, dbConfiguration, authDeduceRelationMap, selectFreeEntities = [], updateFreeDict = {}, onVolatileTrigger) => {
|
||||
// TODO: 这里的类型检查会过不去,因为ts不知道上层已经实现这个抽象类。
|
||||
const BaseStoreClass = (0, dbPriority_1.getDbStoreClass)(dbConfiguration);
|
||||
// 动态创建继承类
|
||||
// @ts-ignore
|
||||
class DynamicDbStore extends BaseStoreClass {
|
||||
executor;
|
||||
relationAuth;
|
||||
constructor() {
|
||||
super(storageSchema, dbConfiguration);
|
||||
this.executor = new TriggerExecutor_1.TriggerExecutor(contextBuilder, undefined, onVolatileTrigger);
|
||||
this.relationAuth = new RelationAuth_1.RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
|
||||
}
|
||||
const result = await super.cascadeUpdateAsync(entity, operation, context, option);
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.postOperation(entity, operation, context, option);
|
||||
checkRelationAsync(entity, operation, context) {
|
||||
return this.relationAuth.checkRelationAsync(entity, operation, context);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
async operate(entity, operation, context, option) {
|
||||
const autoCommit = !context.getCurrentTxnId();
|
||||
let result;
|
||||
if (autoCommit) {
|
||||
await context.begin();
|
||||
}
|
||||
try {
|
||||
await this.relationAuth.checkRelationAsync(entity, operation, context);
|
||||
result = await super.operate(entity, operation, context, option);
|
||||
}
|
||||
catch (err) {
|
||||
await context.rollback();
|
||||
throw err;
|
||||
}
|
||||
if (autoCommit) {
|
||||
await context.commit();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
async select(entity, selection, context, option) {
|
||||
const autoCommit = !context.getCurrentTxnId();
|
||||
if (autoCommit) {
|
||||
await context.begin();
|
||||
}
|
||||
let result;
|
||||
// select的trigger应加在根select之前,cascade的select不加处理
|
||||
Object.assign(selection, {
|
||||
action: 'select',
|
||||
});
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.preOperation(entity, selection, context, option);
|
||||
}
|
||||
if (!option.dontCollect) {
|
||||
await this.relationAuth.checkRelationAsync(entity, selection, context);
|
||||
}
|
||||
try {
|
||||
result = await super.select(entity, selection, context, option);
|
||||
async cascadeUpdateAsync(entity, operation, context, option) {
|
||||
// 如果是在modi处理过程中,所有的trigger也可以延时到apply时再处理(这时候因为modi中的数据并不实际存在,处理会有问题)
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.postOperation(entity, selection, context, option, result);
|
||||
await this.executor.preOperation(entity, operation, context, option);
|
||||
}
|
||||
}
|
||||
catch (err) {
|
||||
await context.rollback();
|
||||
throw err;
|
||||
}
|
||||
if (autoCommit) {
|
||||
await context.commit();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
async count(entity, selection, context, option) {
|
||||
const autoCommit = !context.getCurrentTxnId();
|
||||
let result;
|
||||
if (autoCommit) {
|
||||
await context.begin();
|
||||
}
|
||||
try {
|
||||
// count不用检查权限,因为检查权限中本身要用到count
|
||||
// const selection2 = Object.assign({
|
||||
// action: 'select',
|
||||
// }, selection) as ED[T]['Operation'];
|
||||
// await this.relationAuth.checkRelationAsync(entity, selection2, context);
|
||||
// if (!option.blockTrigger) {
|
||||
// await this.executor.preOperation(entity, selection2, context, option);
|
||||
// }
|
||||
result = await super.count(entity, selection, context, option);
|
||||
/* count应该不存在后trigger吧
|
||||
const result = await super.cascadeUpdateAsync(entity, operation, context, option);
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.postOperation(entity, selection2, context, option);
|
||||
} */
|
||||
await this.executor.postOperation(entity, operation, context, option);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
catch (err) {
|
||||
await context.rollback();
|
||||
throw err;
|
||||
async operate(entity, operation, context, option) {
|
||||
const autoCommit = !context.getCurrentTxnId();
|
||||
let result;
|
||||
if (autoCommit) {
|
||||
await context.begin();
|
||||
}
|
||||
try {
|
||||
await this.relationAuth.checkRelationAsync(entity, operation, context);
|
||||
result = await super.operate(entity, operation, context, option);
|
||||
}
|
||||
catch (err) {
|
||||
await context.rollback();
|
||||
throw err;
|
||||
}
|
||||
if (autoCommit) {
|
||||
await context.commit();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
if (autoCommit) {
|
||||
await context.commit();
|
||||
async select(entity, selection, context, option) {
|
||||
const autoCommit = !context.getCurrentTxnId();
|
||||
if (autoCommit) {
|
||||
await context.begin();
|
||||
}
|
||||
let result;
|
||||
// select的trigger应加在根select之前,cascade的select不加处理
|
||||
Object.assign(selection, {
|
||||
action: 'select',
|
||||
});
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.preOperation(entity, selection, context, option);
|
||||
}
|
||||
if (!option.dontCollect) {
|
||||
await this.relationAuth.checkRelationAsync(entity, selection, context);
|
||||
}
|
||||
try {
|
||||
result = await super.select(entity, selection, context, option);
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.postOperation(entity, selection, context, option, result);
|
||||
}
|
||||
}
|
||||
catch (err) {
|
||||
await context.rollback();
|
||||
throw err;
|
||||
}
|
||||
if (autoCommit) {
|
||||
await context.commit();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
async count(entity, selection, context, option) {
|
||||
const autoCommit = !context.getCurrentTxnId();
|
||||
let result;
|
||||
if (autoCommit) {
|
||||
await context.begin();
|
||||
}
|
||||
try {
|
||||
// count不用检查权限,因为检查权限中本身要用到count
|
||||
// const selection2 = Object.assign({
|
||||
// action: 'select',
|
||||
// }, selection) as ED[T]['Operation'];
|
||||
// await this.relationAuth.checkRelationAsync(entity, selection2, context);
|
||||
// if (!option.blockTrigger) {
|
||||
// await this.executor.preOperation(entity, selection2, context, option);
|
||||
// }
|
||||
result = await super.count(entity, selection, context, option);
|
||||
/* count应该不存在后trigger吧
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.postOperation(entity, selection2, context, option);
|
||||
} */
|
||||
}
|
||||
catch (err) {
|
||||
await context.rollback();
|
||||
throw err;
|
||||
}
|
||||
if (autoCommit) {
|
||||
await context.commit();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
registerTrigger(trigger) {
|
||||
this.executor.registerTrigger(trigger);
|
||||
}
|
||||
registerChecker(checker) {
|
||||
this.executor.registerChecker(checker, this.getSchema());
|
||||
}
|
||||
setOnVolatileTrigger(onVolatileTrigger) {
|
||||
this.executor.setOnVolatileTrigger(onVolatileTrigger);
|
||||
}
|
||||
async execVolatileTrigger(entity, name, ids, context, option) {
|
||||
return this.executor.execVolatileTrigger(entity, name, ids, context, option);
|
||||
}
|
||||
checkpoint(ts) {
|
||||
return this.executor.checkpoint(ts);
|
||||
}
|
||||
independentCheckPoint(name, ts, instanceCount, instanceId) {
|
||||
return this.executor.independentCheckPoint(name, ts, instanceCount, instanceId);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
registerTrigger(trigger) {
|
||||
this.executor.registerTrigger(trigger);
|
||||
}
|
||||
registerChecker(checker) {
|
||||
this.executor.registerChecker(checker, this.getSchema());
|
||||
}
|
||||
setOnVolatileTrigger(onVolatileTrigger) {
|
||||
this.executor.setOnVolatileTrigger(onVolatileTrigger);
|
||||
}
|
||||
async execVolatileTrigger(entity, name, ids, context, option) {
|
||||
return this.executor.execVolatileTrigger(entity, name, ids, context, option);
|
||||
}
|
||||
checkpoint(ts) {
|
||||
return this.executor.checkpoint(ts);
|
||||
}
|
||||
independentCheckPoint(name, ts, instanceCount, instanceId) {
|
||||
return this.executor.independentCheckPoint(name, ts, instanceCount, instanceId);
|
||||
}
|
||||
}
|
||||
exports.DbStore = DbStore;
|
||||
return new DynamicDbStore();
|
||||
};
|
||||
exports.createDbStore = createDbStore;
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
|
|||
* 根据sync的定义,生成对应的 commit triggers
|
||||
* @returns
|
||||
*/
|
||||
getSyncTriggers(): VolatileTrigger<ED, keyof ED, Cxt>[];
|
||||
getSyncTriggers(): Array<VolatileTrigger<ED, keyof ED, Cxt>>;
|
||||
getSelfEndpoint(): EndpointItem<ED, Cxt>;
|
||||
tryCreateSyncProcess(): void;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -114,7 +114,7 @@ class Synchronizer {
|
|||
remoteEntityId: entityId,
|
||||
data: queue.map((ele) => ({
|
||||
entity: ele.oper.targetEntity,
|
||||
rowIds: ele.oper.filter.id.$in,
|
||||
rowIds: ele.oper.filter.id.$in, // 暂时应该没什么用
|
||||
action: ele.oper.action,
|
||||
data: ele.oper.data,
|
||||
})),
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
"use strict";
|
||||
Object.defineProperty(exports, "__esModule", { value: true });
|
||||
exports.getClusterInfo = void 0;
|
||||
exports.getClusterInfo = getClusterInfo;
|
||||
function getProcessEnvOption(option) {
|
||||
if (process.env.hasOwnProperty(option)) {
|
||||
return process.env[option];
|
||||
|
|
@ -54,4 +54,3 @@ const MyClusterInfo = initialize();
|
|||
function getClusterInfo() {
|
||||
return MyClusterInfo;
|
||||
}
|
||||
exports.getClusterInfo = getClusterInfo;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,15 @@
|
|||
import { EntityDict } from 'oak-domain/lib/types';
|
||||
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
|
||||
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
|
||||
/**
|
||||
* 检查项目目录下的i18n数据和数据库中的差异
|
||||
* @param context
|
||||
* @returns
|
||||
*/
|
||||
export declare function checkI18n<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>>(context: Cxt): Promise<(void | Awaited<import("oak-domain/lib/types").OperationResult<ED>>)[]>;
|
||||
/**
|
||||
* 检查项目目录下的i18n数据和数据库中的差异,并更新
|
||||
* @param context
|
||||
* @returns
|
||||
*/
|
||||
export declare function checkAndUpdateI18n<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>>(context: Cxt): Promise<(void | Awaited<import("oak-domain/lib/types").OperationResult<ED>>)[]>;
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
"use strict";
|
||||
Object.defineProperty(exports, "__esModule", { value: true });
|
||||
exports.checkI18n = checkI18n;
|
||||
exports.checkAndUpdateI18n = checkAndUpdateI18n;
|
||||
const tslib_1 = require("tslib");
|
||||
const node_path_1 = require("node:path");
|
||||
const requirePrj_1 = tslib_1.__importDefault(require("../utils/requirePrj"));
|
||||
const dependencyBuilder_1 = require("oak-domain/lib/compiler/dependencyBuilder");
|
||||
const node_assert_1 = tslib_1.__importDefault(require("node:assert"));
|
||||
const lodash_1 = require("lodash");
|
||||
const uuid_1 = require("oak-domain/lib/utils/uuid");
|
||||
async function checkAndUpdateI18nInner(context, onlyCheck) {
|
||||
const pwd = process.cwd();
|
||||
const i18nData = (0, requirePrj_1.default)(pwd, (0, node_path_1.join)('lib', 'data', 'i18n'), (0, dependencyBuilder_1.analyzeDepedency)(pwd).ascOrder);
|
||||
const originI18nData = await context.select('i18n', {
|
||||
data: {
|
||||
id: 1,
|
||||
namespace: 1,
|
||||
language: 1,
|
||||
module: 1,
|
||||
position: 1,
|
||||
data: 1,
|
||||
},
|
||||
}, { dontCollect: true });
|
||||
const originDataDict = {};
|
||||
originI18nData.forEach((data) => originDataDict[data.id] = data);
|
||||
const result = i18nData.map(async (i18n) => {
|
||||
const { id, namespace, language, module, position, data } = i18n;
|
||||
const origin = originDataDict[id];
|
||||
if (origin) {
|
||||
(0, node_assert_1.default)(namespace === origin.namespace && language === origin.language && module === origin.module && position === origin.position);
|
||||
if (!(0, lodash_1.isEqual)(data, origin.data)) {
|
||||
console.log(`[${namespace}]数据${onlyCheck ? '需要更新' : '将被更新'}`);
|
||||
if (!onlyCheck) {
|
||||
return context.operate('i18n', {
|
||||
id: await (0, uuid_1.generateNewIdAsync)(),
|
||||
action: 'update',
|
||||
data: {
|
||||
data,
|
||||
},
|
||||
filter: {
|
||||
id,
|
||||
},
|
||||
}, {});
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
console.log(`[${namespace}]数据${onlyCheck ? '需要新建' : '将被新建'}`);
|
||||
if (!onlyCheck) {
|
||||
return context.operate('i18n', {
|
||||
id: await (0, uuid_1.generateNewIdAsync)(),
|
||||
action: 'create',
|
||||
data: i18n,
|
||||
}, {});
|
||||
}
|
||||
}
|
||||
return Promise.resolve();
|
||||
});
|
||||
return await Promise.all(result);
|
||||
}
|
||||
/**
|
||||
* 检查项目目录下的i18n数据和数据库中的差异
|
||||
* @param context
|
||||
* @returns
|
||||
*/
|
||||
function checkI18n(context) {
|
||||
return checkAndUpdateI18nInner(context, true);
|
||||
}
|
||||
/**
|
||||
* 检查项目目录下的i18n数据和数据库中的差异,并更新
|
||||
* @param context
|
||||
* @returns
|
||||
*/
|
||||
function checkAndUpdateI18n(context) {
|
||||
return checkAndUpdateI18nInner(context);
|
||||
}
|
||||
|
|
@ -0,0 +1,9 @@
|
|||
import { BaseEntityDict } from "oak-domain";
|
||||
import { AsyncContext } from "oak-domain/lib/store/AsyncRowStore";
|
||||
import { EntityDict } from "oak-domain/lib/types";
|
||||
export type InternalErrorType = 'aspect' | 'trigger' | 'watcher' | 'timer' | 'checkpoint';
|
||||
export type InternalErrorHandler<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> = {
|
||||
name: string;
|
||||
handle: (ctx: Cxt, type: InternalErrorType, message: string, err: Error) => Promise<void>;
|
||||
};
|
||||
export type ExceptionPublisher = (type: string, message: string, err: any) => Promise<void>;
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
"use strict";
|
||||
Object.defineProperty(exports, "__esModule", { value: true });
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
import { MysqlStore, PostgreSQLStore } from "oak-db";
|
||||
import { DbConfiguration } from "oak-db/src/types/configuration";
|
||||
import { EntityDict, StorageSchema } from 'oak-domain/lib/types';
|
||||
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
|
||||
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
|
||||
import { CascadeStore } from "oak-domain/lib/store/CascadeStore";
|
||||
import { DbStore } from "oak-db/lib/types/dbStore";
|
||||
/**
|
||||
* 数据库优先级列表,按顺序尝试获取配置文件
|
||||
*/
|
||||
export declare const dbList: {
|
||||
mysql: typeof MysqlStore;
|
||||
postgres: typeof PostgreSQLStore;
|
||||
};
|
||||
export declare const DbTypeSymbol: unique symbol;
|
||||
export declare const getDbConfig: (path: string) => DbConfiguration & {
|
||||
[DbTypeSymbol]: string;
|
||||
};
|
||||
export declare const getDbStoreClass: <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>(config: {
|
||||
[DbTypeSymbol]?: string;
|
||||
}) => new (schema: StorageSchema<ED>, config: DbConfiguration) => DbStore<ED, Cxt> & CascadeStore<ED>;
|
||||
|
|
@ -0,0 +1,55 @@
|
|||
"use strict";
|
||||
Object.defineProperty(exports, "__esModule", { value: true });
|
||||
exports.getDbStoreClass = exports.getDbConfig = exports.DbTypeSymbol = exports.dbList = void 0;
|
||||
const oak_db_1 = require("oak-db");
|
||||
const path_1 = require("path");
|
||||
const fs_1 = require("fs");
|
||||
/**
|
||||
* 数据库优先级列表,按顺序尝试获取配置文件
|
||||
*/
|
||||
exports.dbList = {
|
||||
mysql: oak_db_1.MysqlStore,
|
||||
postgres: oak_db_1.PostgreSQLStore
|
||||
};
|
||||
exports.DbTypeSymbol = Symbol.for('oak:backend:db:type');
|
||||
const getDbConfig = (path) => {
|
||||
for (const db of Object.keys(exports.dbList)) {
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-var-requires
|
||||
let dbConfigFile = (0, path_1.join)(path, 'configuration', `${db}.${process.env.NODE_ENV}.json`);
|
||||
if ((0, fs_1.existsSync)(dbConfigFile) === false) {
|
||||
dbConfigFile = (0, path_1.join)(path, 'configuration', `${db}.json`);
|
||||
}
|
||||
if ((0, fs_1.existsSync)(dbConfigFile) === false) {
|
||||
continue;
|
||||
}
|
||||
const config = require(dbConfigFile);
|
||||
console.log(`使用${db}作为数据库`);
|
||||
// 定义不可枚举的属性,避免被序列化
|
||||
Object.defineProperty(config, exports.DbTypeSymbol, {
|
||||
value: db,
|
||||
enumerable: false,
|
||||
writable: false,
|
||||
configurable: false
|
||||
});
|
||||
return config;
|
||||
}
|
||||
catch (err) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
throw new Error(`没有找到数据库配置文件,请在configuration目录下添加任一配置文件:${Object.keys(exports.dbList).map(ele => `${ele}.json`).join('、')}`);
|
||||
};
|
||||
exports.getDbConfig = getDbConfig;
|
||||
const getDbStoreClass = (config) => {
|
||||
const dbType = Object.getOwnPropertyDescriptor(config, exports.DbTypeSymbol)?.value;
|
||||
if (!dbType) {
|
||||
throw new Error('无法获取数据库类型,请确认是否存在数据库配置文件');
|
||||
}
|
||||
const DbStoreClass = exports.dbList[dbType.toLowerCase()];
|
||||
if (!DbStoreClass) {
|
||||
throw new Error(`不支持的数据库类型:${dbType},请确认是否存在以下配置文件:${Object.keys(exports.dbList).map(ele => `${ele}.json`).join('、')}`);
|
||||
}
|
||||
return DbStoreClass;
|
||||
};
|
||||
exports.getDbStoreClass = getDbStoreClass;
|
||||
|
|
@ -0,0 +1 @@
|
|||
export default function requireSth(prjPath: string, filePath: string, dependencies: string[]): any;
|
||||
|
|
@ -0,0 +1,23 @@
|
|||
"use strict";
|
||||
Object.defineProperty(exports, "__esModule", { value: true });
|
||||
exports.default = requireSth;
|
||||
const fs_1 = require("fs");
|
||||
const lodash_1 = require("oak-domain/lib/utils/lodash");
|
||||
const path_1 = require("path");
|
||||
function requireSth(prjPath, filePath, dependencies) {
|
||||
const depFilePath = (0, path_1.join)(prjPath, filePath);
|
||||
let sth;
|
||||
if ((0, fs_1.existsSync)(`${depFilePath}.js`)) {
|
||||
sth = require((0, path_1.join)(prjPath, filePath)).default;
|
||||
}
|
||||
const sthExternal = dependencies.map(ele => {
|
||||
const depFilePath = (0, path_1.join)(prjPath, 'node_modules', ele, filePath);
|
||||
if ((0, fs_1.existsSync)(`${depFilePath}.js`)) {
|
||||
return require(depFilePath).default;
|
||||
}
|
||||
}).filter(ele => !!ele);
|
||||
if (sth) {
|
||||
sthExternal.push(sth);
|
||||
}
|
||||
return (0, lodash_1.mergeConcatMany)(sthExternal);
|
||||
}
|
||||
10
package.json
10
package.json
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "oak-backend-base",
|
||||
"version": "4.1.19",
|
||||
"version": "4.1.28",
|
||||
"description": "oak-backend-base",
|
||||
"main": "lib/index",
|
||||
"author": {
|
||||
|
|
@ -21,10 +21,10 @@
|
|||
"mysql": "^2.18.1",
|
||||
"mysql2": "^2.3.3",
|
||||
"node-schedule": "^2.1.0",
|
||||
"oak-common-aspect": "^3.0.5",
|
||||
"oak-db": "^3.3.6",
|
||||
"oak-domain": "^5.1.24",
|
||||
"oak-frontend-base": "^5.3.31",
|
||||
"oak-common-aspect": "file:../oak-common-aspect",
|
||||
"oak-db": "file:../oak-db",
|
||||
"oak-domain": "file:../oak-domain",
|
||||
"oak-frontend-base": "file:../oak-frontend-base",
|
||||
"socket.io": "^4.8.1",
|
||||
"socket.io-client": "^4.7.2",
|
||||
"uuid": "^8.3.2"
|
||||
|
|
|
|||
419
src/AppLoader.ts
419
src/AppLoader.ts
|
|
@ -5,10 +5,8 @@ import { makeIntrinsicLogics } from "oak-domain/lib/store/IntrinsicLogics";
|
|||
import { cloneDeep, mergeConcatMany, omit } from 'oak-domain/lib/utils/lodash';
|
||||
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
|
||||
import { generateNewIdAsync } from 'oak-domain/lib/utils/uuid';
|
||||
import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, CreateOpResult, SyncConfig, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord, Routine, FreeRoutine, Timer, FreeTimer, StorageSchema, OperationResult, OakPartialSuccess, OakException } from "oak-domain/lib/types";
|
||||
import { DbStore } from "./DbStore";
|
||||
import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, CreateOpResult, SyncConfig, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord, Routine, FreeRoutine, Timer, FreeTimer, StorageSchema, OperationResult, OakPartialSuccess, OakException, BaseTimer, WBFreeWatcher } from "oak-domain/lib/types";
|
||||
import generalAspectDict, { clearPorts, registerPorts } from 'oak-common-aspect/lib/index';
|
||||
import { MySQLConfiguration } from 'oak-db/lib/MySQL/types/Configuration';
|
||||
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
|
||||
import { Endpoint, EndpointItem } from 'oak-domain/lib/types/Endpoint';
|
||||
import assert from 'assert';
|
||||
|
|
@ -21,40 +19,76 @@ import { getClusterInfo } from './cluster/env';
|
|||
import Synchronizer from './Synchronizer';
|
||||
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
|
||||
import domainI18nData from 'oak-domain/lib/data/i18n';
|
||||
import requireSth from './utils/requirePrj';
|
||||
import { InternalErrorHandler, InternalErrorType } from './types';
|
||||
import { getDbConfig } from './utils/dbPriority';
|
||||
import { createDbStore, AppDbStore } from './DbStore';
|
||||
|
||||
export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends GeneralAppLoader<ED, Cxt> {
|
||||
protected dbStore: DbStore<ED, Cxt>;
|
||||
protected dbStore: AppDbStore<ED, Cxt>;
|
||||
private aspectDict: Record<string, Aspect<ED, Cxt>>;
|
||||
private externalDependencies: string[];
|
||||
protected dataSubscriber?: DataSubscriber<ED, Cxt>;
|
||||
protected synchronizer?: Synchronizer<ED, Cxt>;
|
||||
protected contextBuilder: (store: DbStore<ED, Cxt>) => Cxt;
|
||||
protected contextBuilder: (store: AppDbStore<ED, Cxt>) => Cxt;
|
||||
private nsSocket?: Namespace;
|
||||
private watcherTimerId?: NodeJS.Timeout;
|
||||
private scheduledJobs: Record<string, Job> = {};
|
||||
private internalErrorHandlers = new Array<InternalErrorHandler<ED, Cxt>>();
|
||||
|
||||
private watcherExecutingData: Map<string, Map<string, true>> = new Map();
|
||||
|
||||
public regAllExceptionHandler() {
|
||||
const handlers = this.requireSth('lib/configuration/exception') as Array<InternalErrorHandler<ED, Cxt>> | InternalErrorHandler<ED, Cxt>;
|
||||
if (Array.isArray(handlers)) {
|
||||
handlers.forEach(
|
||||
(handler) => {
|
||||
console.log(`注册内部错误处理器: ${handler.name}`);
|
||||
this.registerInternalErrorHandler(handler);
|
||||
}
|
||||
);
|
||||
} else {
|
||||
console.warn('lib/configuration/exception必须默认导出一个处理器数组,当前导出类型不正确,将忽略此配置');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册一个内部错误处理器
|
||||
* @param handler 内部错误处理器
|
||||
*/
|
||||
public registerInternalErrorHandler(handler: InternalErrorHandler<ED, Cxt>) {
|
||||
// 检查有没有名称重复
|
||||
if (this.internalErrorHandlers.find(h => h.name === handler.name)) {
|
||||
throw new Error(`内部错误处理器名称重复: ${handler.name}`);
|
||||
}
|
||||
this.internalErrorHandlers.push(handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布内部错误事件给注册的处理器
|
||||
*/
|
||||
private async publishInternalError(type: InternalErrorType, message: string, err: any) {
|
||||
await Promise.all(this.internalErrorHandlers.map(
|
||||
(handler) => {
|
||||
return new Promise<void>(async (resolve) => {
|
||||
const ctx = await this.makeContext();
|
||||
try {
|
||||
console.log(`调用internalErrorHandler【${handler.name}】处理内部错误事件`);
|
||||
await handler.handle(ctx, type, message, err);
|
||||
await ctx.commit()
|
||||
} catch (e) {
|
||||
console.error('执行internalErrorHandler时出错', e);
|
||||
await ctx.rollback();
|
||||
} finally {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
private requireSth(filePath: string): any {
|
||||
const depFilePath = join(this.path, filePath);
|
||||
let sth: any;
|
||||
if (existsSync(`${depFilePath}.js`)) {
|
||||
sth = require(join(this.path, filePath)).default;
|
||||
}
|
||||
const sthExternal = this.externalDependencies.map(
|
||||
ele => {
|
||||
const depFilePath = join(this.path, 'node_modules', ele, filePath);
|
||||
if (existsSync(`${depFilePath}.js`)) {
|
||||
return require(depFilePath).default
|
||||
}
|
||||
}
|
||||
).filter(
|
||||
ele => !!ele
|
||||
);
|
||||
|
||||
if (sth) {
|
||||
sthExternal.push(sth);
|
||||
}
|
||||
|
||||
return mergeConcatMany(sthExternal);
|
||||
return requireSth(this.path, filePath, this.externalDependencies);
|
||||
}
|
||||
|
||||
protected async makeContext(cxtStr?: string, headers?: IncomingHttpHeaders) {
|
||||
|
|
@ -73,29 +107,39 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
}
|
||||
|
||||
/**
|
||||
* 后台启动的configuration,统一放在这里读取
|
||||
* 获取数据库配置
|
||||
* @returns 读取数据库配置
|
||||
*/
|
||||
private getConfiguration() {
|
||||
const dbConfigFile = join(this.path, 'configuration', 'mysql.json');
|
||||
const dbConfig = require(dbConfigFile);
|
||||
private getDbConfig() {
|
||||
return getDbConfig(this.path);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取同步配置
|
||||
* @returns 读取同步配置
|
||||
*/
|
||||
private getSyncConfig() {
|
||||
const syncConfigFile = join(this.path, 'lib', 'configuration', 'sync.js');
|
||||
const syncConfigs = existsSync(syncConfigFile) && require(syncConfigFile).default;
|
||||
|
||||
return {
|
||||
dbConfig: dbConfig as MySQLConfiguration,
|
||||
syncConfig: syncConfigs as SyncConfig<ED, Cxt> | undefined,
|
||||
};
|
||||
return syncConfigs as SyncConfig<ED, Cxt> | undefined;
|
||||
}
|
||||
|
||||
constructor(path: string, nsSubscribe?: Namespace, nsSocket?: Namespace, nsServer?: Namespace) {
|
||||
super(path);
|
||||
const { dbConfig } = this.getConfiguration();
|
||||
const dbConfig = this.getDbConfig();
|
||||
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
|
||||
const depGraph = analyzeDepedency(process.cwd());
|
||||
this.externalDependencies = depGraph.ascOrder;
|
||||
const { authDeduceRelationMap, selectFreeEntities, updateFreeDict } = this.requireSth('lib/configuration/relation')!;
|
||||
this.aspectDict = Object.assign({}, generalAspectDict, this.requireSth('lib/aspects/index')!);
|
||||
this.dbStore = new DbStore<ED, Cxt>(storageSchema, () => this.contextBuilder(this.dbStore), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
|
||||
this.dbStore = createDbStore<ED, Cxt>(
|
||||
storageSchema,
|
||||
() => this.contextBuilder(this.dbStore),
|
||||
dbConfig,
|
||||
authDeduceRelationMap,
|
||||
selectFreeEntities,
|
||||
updateFreeDict,
|
||||
);
|
||||
if (nsSubscribe) {
|
||||
this.dataSubscriber = new DataSubscriber(nsSubscribe, nsServer);
|
||||
}
|
||||
|
|
@ -106,7 +150,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
|
||||
// 需要重载context上的构造和commit方法,否则程序中执行context.restartToExecute这样的方法中,new一个context出来是无法正确执行的
|
||||
class BackendRuntimeContextWrapper extends BackendRuntimeContext {
|
||||
constructor(store: DbStore<ED, Cxt>) {
|
||||
constructor(store: AppDbStore<ED, Cxt>) {
|
||||
super(store);
|
||||
this.clusterInfo = getClusterInfo();
|
||||
}
|
||||
|
|
@ -186,7 +230,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
async mount(initialize?: true) {
|
||||
const { path } = this;
|
||||
if (!initialize) {
|
||||
const { syncConfig: syncConfig } = this.getConfiguration();
|
||||
const syncConfig = this.getSyncConfig();
|
||||
|
||||
if (syncConfig) {
|
||||
this.synchronizer = new Synchronizer(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore));
|
||||
|
|
@ -240,7 +284,9 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
};
|
||||
}
|
||||
catch (err) {
|
||||
console.error(`执行aspect「${name}」出错`, err);
|
||||
await context.rollback();
|
||||
this.publishInternalError(`aspect`, `执行aspect「${name}」出错`, err)
|
||||
if (err instanceof OakException) {
|
||||
throw err;
|
||||
}
|
||||
|
|
@ -254,8 +300,8 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
}
|
||||
}
|
||||
|
||||
async initialize() {
|
||||
await this.dbStore.initialize({ ifExists: 'dropIfNotStatic' });
|
||||
async initialize(ifExists?: 'drop' | 'omit' | 'dropIfNotStatic') {
|
||||
await this.dbStore.initialize({ ifExists });
|
||||
|
||||
const data = this.requireSth('lib/data/index')!;
|
||||
// oak-domain中只有i18n
|
||||
|
|
@ -312,10 +358,10 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
}
|
||||
}
|
||||
}
|
||||
await this.dbStore.disconnect();
|
||||
// await this.dbStore.disconnect(); // 不需要马上断开连接,在initialize后可能还会有操作,unmount时会断开
|
||||
}
|
||||
|
||||
getStore(): DbStore<ED, Cxt> {
|
||||
getStore(): AppDbStore<ED, Cxt> {
|
||||
return this.dbStore;
|
||||
}
|
||||
|
||||
|
|
@ -324,6 +370,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
const endPointRouters: Array<[EndpointItem<ED, Cxt>['name'], EndpointItem<ED, Cxt>['method'], string, (params: Record<string, string>, headers: IncomingHttpHeaders, req: IncomingMessage, body?: any) => Promise<{
|
||||
headers?: Record<string, string | string[]>;
|
||||
data: any;
|
||||
statusCode?: number;
|
||||
}>]> = [];
|
||||
const endPointMap: Record<string, true> = {};
|
||||
|
||||
|
|
@ -406,43 +453,187 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查某个数据是否正在被watcher执行
|
||||
* @param name watcher名称
|
||||
* @param dataId 数据ID
|
||||
* @returns 如果没有正在执行则返回true,否则返回false
|
||||
*/
|
||||
private checkDataExecuting(name: string, dataId: string): boolean {
|
||||
let dataSet = this.watcherExecutingData.get(name);
|
||||
if (!dataSet) {
|
||||
dataSet = new Map<string, true>();
|
||||
this.watcherExecutingData.set(name, dataSet);
|
||||
}
|
||||
if (dataSet.has(dataId)) {
|
||||
return false;
|
||||
}
|
||||
dataSet.set(dataId, true);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 过滤出未在执行中的数据行,并标记为执行中
|
||||
* @returns [过滤后的行, 是否有行被跳过]
|
||||
*/
|
||||
private filterAndMarkExecutingRows<T extends keyof ED>(
|
||||
watcher: Watcher<ED, T, Cxt>,
|
||||
rows: Partial<ED[T]['Schema']>[]
|
||||
): [Partial<ED[T]['Schema']>[], Partial<ED[T]['Schema']>[]] {
|
||||
if (watcher.exclusive !== true) {
|
||||
// 不需要排他执行,直接返回所有行
|
||||
return [rows, []];
|
||||
}
|
||||
|
||||
const rowsWithoutExecuting: Partial<ED[T]['Schema']>[] = [];
|
||||
const skipedRows: Partial<ED[T]['Schema']>[] = [];
|
||||
const watcherName = watcher.name;
|
||||
|
||||
for (const row of rows) {
|
||||
if (!row.id) {
|
||||
console.error(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】获取的数据没有ID,跳过此数据的并发检查处理:`, row);
|
||||
rowsWithoutExecuting.push(row);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (this.checkDataExecuting(watcherName, row.id)) {
|
||||
rowsWithoutExecuting.push(row);
|
||||
} else {
|
||||
skipedRows.push(row);
|
||||
console.warn(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】将跳过正在被执行的数据ID:【${row.id}】,请检查是否执行超时`);
|
||||
}
|
||||
}
|
||||
|
||||
return [rowsWithoutExecuting, skipedRows];
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理执行标记
|
||||
*/
|
||||
private cleanupExecutingMarks<T extends keyof ED>(
|
||||
watcherName: string,
|
||||
rows: Partial<ED[T]['Schema']>[]
|
||||
) {
|
||||
for (const row of rows) {
|
||||
if (row.id) {
|
||||
const dataSet = this.watcherExecutingData.get(watcherName);
|
||||
if (dataSet) {
|
||||
dataSet.delete(row.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析 filter 和 projection(支持函数或静态值)
|
||||
*/
|
||||
private async resolveFilterAndProjection<T extends ED[keyof ED]['Filter']>(
|
||||
filter: T | (() => Promise<T>),
|
||||
projection: any | (() => Promise<any>)
|
||||
): Promise<[T, any]> {
|
||||
const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter);
|
||||
const projection2 = typeof projection === 'function' ? await projection() : cloneDeep(projection);
|
||||
return [filter2, projection2];
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行 WB 类型 watcher 的查询操作
|
||||
*/
|
||||
private async selectForWBWatcher<T extends keyof ED>(
|
||||
watcher: WBWatcher<ED, T, Cxt> | WBFreeWatcher<ED, T, Cxt>,
|
||||
context: Cxt
|
||||
) {
|
||||
const { entity, projection, filter, singleton, forUpdate } = watcher;
|
||||
const [filter2, projection2] = await this.resolveFilterAndProjection(filter, projection);
|
||||
|
||||
return await this.selectInWatcher(entity, {
|
||||
data: projection2,
|
||||
filter: filter2,
|
||||
}, context, forUpdate, singleton);
|
||||
}
|
||||
|
||||
protected async execWatcher(watcher: Watcher<ED, keyof ED, Cxt>) {
|
||||
const context = await this.makeContext();
|
||||
let result: OperationResult<ED> | undefined;
|
||||
try {
|
||||
if (watcher.hasOwnProperty('actionData')) {
|
||||
|
||||
// BBWatcher:直接操作,无需查询
|
||||
if (watcher.hasOwnProperty('actionData')) {
|
||||
const context = await this.makeContext();
|
||||
try {
|
||||
const { entity, action, filter, actionData, singleton } = <BBWatcher<ED, keyof ED>>watcher;
|
||||
const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter);
|
||||
const data = typeof actionData === 'function' ? await (actionData)() : cloneDeep(actionData);
|
||||
const data = typeof actionData === 'function' ? await actionData() : cloneDeep(actionData);
|
||||
|
||||
result = await this.operateInWatcher(entity, {
|
||||
id: await generateNewIdAsync(),
|
||||
action: action as string,
|
||||
data,
|
||||
filter: filter2,
|
||||
}, context, singleton);
|
||||
}
|
||||
else {
|
||||
const { entity, projection, fn, filter, singleton, forUpdate } = <WBWatcher<ED, keyof ED, Cxt>>watcher;
|
||||
const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter);
|
||||
const projection2 = typeof projection === 'function' ? await projection() : cloneDeep(projection);
|
||||
const rows = await this.selectInWatcher(entity, {
|
||||
data: projection2,
|
||||
filter: filter2,
|
||||
}, context, forUpdate, singleton);
|
||||
|
||||
if (rows.length > 0) {
|
||||
result = await fn(context, rows);
|
||||
await context.commit();
|
||||
return result;
|
||||
} catch (err) {
|
||||
if (err instanceof OakPartialSuccess) {
|
||||
await context.commit();
|
||||
} else {
|
||||
await context.rollback();
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
await context.commit();
|
||||
}
|
||||
|
||||
// WBFreeWatcher 和 WBWatcher:查询后执行
|
||||
const isFreeType = watcher.hasOwnProperty('type') &&
|
||||
(watcher as WBFreeWatcher<ED, keyof ED, Cxt>).type === 'free';
|
||||
|
||||
// 1. 执行查询(WBFreeWatcher 使用独立 context)
|
||||
const selectContext = isFreeType ? await this.makeContext() : await this.makeContext();
|
||||
const rows = await this.selectForWBWatcher(
|
||||
watcher as WBWatcher<ED, keyof ED, Cxt> | WBFreeWatcher<ED, keyof ED, Cxt>,
|
||||
selectContext
|
||||
);
|
||||
|
||||
if (isFreeType) {
|
||||
await selectContext.commit();
|
||||
}
|
||||
|
||||
// 2. 并发检查:过滤出未在执行中的数据
|
||||
const [rowsWithoutExecuting, hasSkipped] = this.filterAndMarkExecutingRows(
|
||||
watcher,
|
||||
rows
|
||||
);
|
||||
|
||||
if (rowsWithoutExecuting.length === 0) {
|
||||
if (!isFreeType) {
|
||||
await selectContext.commit();
|
||||
}
|
||||
|
||||
this.cleanupExecutingMarks(watcher.name, hasSkipped);
|
||||
return result;
|
||||
}
|
||||
catch (err) {
|
||||
if (err instanceof OakPartialSuccess) {
|
||||
await context.commit();
|
||||
|
||||
// 3. 执行业务逻辑
|
||||
try {
|
||||
if (isFreeType) {
|
||||
const { fn } = watcher as WBFreeWatcher<ED, keyof ED, Cxt>;
|
||||
result = await fn(() => this.makeContext(), rowsWithoutExecuting);
|
||||
} else {
|
||||
const { fn } = watcher as WBWatcher<ED, keyof ED, Cxt>;
|
||||
result = await fn(selectContext, rowsWithoutExecuting);
|
||||
await selectContext.commit();
|
||||
}
|
||||
else {
|
||||
await context.rollback();
|
||||
|
||||
return result;
|
||||
} catch (err) {
|
||||
// 清理执行标记
|
||||
this.cleanupExecutingMarks(watcher.name, rows);
|
||||
|
||||
if (!isFreeType) {
|
||||
if (err instanceof OakPartialSuccess) {
|
||||
await selectContext.commit();
|
||||
} else {
|
||||
await selectContext.rollback();
|
||||
}
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
|
|
@ -465,7 +656,13 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
const totalWatchers = (<Watcher<ED, keyof ED, Cxt>[]>watchers || []).concat(adWatchers);
|
||||
|
||||
let count = 0;
|
||||
const skipOnceSet = new Set<string>();
|
||||
const execOne = async (watcher: Watcher<ED, keyof ED, Cxt>, start: number) => {
|
||||
if (skipOnceSet.has(watcher.name)) {
|
||||
skipOnceSet.delete(watcher.name);
|
||||
console.log(`跳过本次执行watcher【${watcher.name}】`);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const result = await this.execWatcher(watcher);
|
||||
if (result) {
|
||||
|
|
@ -474,6 +671,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
}
|
||||
catch (err) {
|
||||
console.error(`执行watcher【${watcher.name}】失败,耗时【${Date.now() - start}】,结果是:`, err);
|
||||
await this.publishInternalError(`watcher`, `执行watcher【${watcher.name}】失败`, err);
|
||||
}
|
||||
};
|
||||
const doWatchers = async () => {
|
||||
|
|
@ -493,18 +691,32 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
}
|
||||
catch (err) {
|
||||
console.error(`执行了checkpoint,发生错误:`, err);
|
||||
await this.publishInternalError(`checkpoint`, `执行checkpoint发生错误`, err);
|
||||
}
|
||||
|
||||
this.watcherTimerId = setTimeout(() => doWatchers(), 120000);
|
||||
};
|
||||
|
||||
// 首次执行时,跳过所有lazy的watcher
|
||||
for (const w of totalWatchers) {
|
||||
if (w.lazy) {
|
||||
skipOnceSet.add(w.name);
|
||||
}
|
||||
}
|
||||
|
||||
doWatchers();
|
||||
}
|
||||
|
||||
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined {
|
||||
const { timer: timerFn } = timer as FreeTimer<ED, Cxt>;
|
||||
protected execBaseTimer(timer: BaseTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined {
|
||||
const { timer: timerFn } = timer as BaseTimer<ED, Cxt>;
|
||||
return timerFn(context);
|
||||
}
|
||||
|
||||
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, contextBuilder: () => Promise<Cxt>): Promise<OperationResult<ED>> | undefined {
|
||||
const { timer: timerFn } = timer as FreeTimer<ED, Cxt>;
|
||||
return timerFn(contextBuilder);
|
||||
}
|
||||
|
||||
startTimers() {
|
||||
const timers: Timer<ED, keyof ED, Cxt>[] = this.requireSth('lib/timers/index');
|
||||
if (timers) {
|
||||
|
|
@ -520,26 +732,38 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒】,结果是`, result);
|
||||
}
|
||||
catch (err) {
|
||||
console.warn(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒】,错误是`, err);
|
||||
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒】,错误是`, err);
|
||||
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (timer.hasOwnProperty('type') && (timer as FreeTimer<ED, Cxt>).type === 'free') {
|
||||
try {
|
||||
const result = await this.execFreeTimer(timer as FreeTimer<ED, Cxt>, () => this.makeContext());
|
||||
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
|
||||
} catch (err) {
|
||||
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
|
||||
}
|
||||
return;
|
||||
}
|
||||
const context = await this.makeContext();
|
||||
try {
|
||||
const result = await this.execFreeTimer(timer as FreeTimer<ED, Cxt>, context);
|
||||
const result = await this.execBaseTimer(timer as BaseTimer<ED, Cxt>, context);
|
||||
if (result) {
|
||||
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是【${result}】`);
|
||||
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
|
||||
}
|
||||
await context.commit();
|
||||
}
|
||||
catch (err) {
|
||||
console.warn(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
if (err instanceof OakPartialSuccess) {
|
||||
await context.commit();
|
||||
}
|
||||
else {
|
||||
await context.rollback();
|
||||
}
|
||||
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
@ -559,6 +783,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
async execStartRoutines() {
|
||||
const routines: Routine<ED, keyof ED, Cxt>[] = this.requireSth('lib/routines/start') || [];
|
||||
for (const routine of routines) {
|
||||
console.log(`执行启动例程【${routine.name}】...`);
|
||||
if (routine.hasOwnProperty('entity')) {
|
||||
const start = Date.now();
|
||||
try {
|
||||
|
|
@ -566,7 +791,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
console.log(`例程【${routine.name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
|
||||
}
|
||||
catch (err) {
|
||||
console.warn(`例程【${routine.name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
console.error(`例程【${routine.name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
|
@ -578,12 +803,50 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
try {
|
||||
const result = await routineFn(context, {
|
||||
socket: this.nsSocket!,
|
||||
contextBuilder: () => this.makeContext(),
|
||||
});
|
||||
console.log(`例程【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是【${result}】`);
|
||||
console.log(`例程【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
|
||||
await context.commit();
|
||||
}
|
||||
catch (err) {
|
||||
console.warn(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
console.error(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
await context.rollback();
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async execStopRoutines() {
|
||||
const routines: Routine<ED, keyof ED, Cxt>[] = this.requireSth('lib/routines/stop') || [];
|
||||
for (const routine of routines) {
|
||||
console.log(`执行停止例程【${routine.name}】...`);
|
||||
if (routine.hasOwnProperty('entity')) {
|
||||
const start = Date.now();
|
||||
try {
|
||||
const result = await this.execWatcher(routine as Watcher<ED, keyof ED, Cxt>);
|
||||
console.log(`例程【${routine.name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
|
||||
}
|
||||
catch (err) {
|
||||
console.error(`例程【${routine.name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
else {
|
||||
const { name, routine: routineFn } = routine as FreeRoutine<ED, Cxt>;
|
||||
const context = await this.makeContext();
|
||||
|
||||
const start = Date.now();
|
||||
try {
|
||||
const result = await routineFn(context, {
|
||||
socket: this.nsSocket!,
|
||||
contextBuilder: () => this.makeContext(),
|
||||
});
|
||||
console.log(`例程【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
|
||||
await context.commit();
|
||||
}
|
||||
catch (err) {
|
||||
console.error(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
|
||||
await context.rollback();
|
||||
throw err;
|
||||
}
|
||||
|
|
@ -594,6 +857,14 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
async execRoutine(routine: <Cxt extends AsyncContext<ED>>(context: Cxt) => Promise<void>) {
|
||||
const context = await this.makeContext();
|
||||
|
||||
await routine(context);
|
||||
try {
|
||||
const result = await routine(context);
|
||||
await context.commit();
|
||||
return result;
|
||||
}
|
||||
catch (e: any) {
|
||||
await context.rollback();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
import { groupBy } from 'oak-domain/lib/utils/lodash';
|
||||
import { combineFilters } from 'oak-domain/lib/store/filter';
|
||||
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
|
||||
import { EntityDict, OperationResult, VolatileTrigger, Trigger, OperateOption, OakPartialSuccess, Watcher, FreeTimer } from 'oak-domain/lib/types';
|
||||
import { EntityDict, OperationResult, VolatileTrigger, Trigger, OperateOption, OakPartialSuccess, Watcher, FreeTimer, BaseTimer } from 'oak-domain/lib/types';
|
||||
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
|
||||
import { getClusterInfo } from './cluster/env';
|
||||
|
||||
|
|
@ -27,7 +27,7 @@ export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extend
|
|||
this.socket!.emit('sub', csTriggerNames);
|
||||
}
|
||||
});
|
||||
this.socket!.on(ClusterAppLoader.VolatileTriggerEvent, async (entity: keyof ED, name: string, ids: string[], cxtStr: string, option: OperateOption) => {
|
||||
this.socket!.on(ClusterAppLoader.VolatileTriggerEvent, async (entity: keyof ED, name: string, ids: string[], cxtStr: string, option: OperateOption) => {
|
||||
const context = await this.makeContext(cxtStr);
|
||||
if (process.env.NODE_ENV === 'development') {
|
||||
console.log(`「${getClusterInfo().instanceId}」号实例接收到来自其它进程的volatileTrigger请求, name是「${name}」, ids是「${ids.join(',')}」`);
|
||||
|
|
@ -184,11 +184,18 @@ export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extend
|
|||
return super.execWatcher(watcher);
|
||||
}
|
||||
|
||||
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, context: Cxt) {
|
||||
protected execBaseTimer(timer: BaseTimer<ED, Cxt>, context: Cxt) {
|
||||
if (timer.singleton && getClusterInfo().instanceId !== 0) {
|
||||
return;
|
||||
}
|
||||
return super.execFreeTimer(timer, context);
|
||||
return super.execBaseTimer(timer, context);
|
||||
}
|
||||
|
||||
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, contextBuilder: () => Promise<Cxt>) {
|
||||
if (timer.singleton && getClusterInfo().instanceId !== 0) {
|
||||
return;
|
||||
}
|
||||
return super.execFreeTimer(timer, contextBuilder);
|
||||
}
|
||||
|
||||
protected async checkpoint() {
|
||||
|
|
|
|||
353
src/DbStore.ts
353
src/DbStore.ts
|
|
@ -1,173 +1,216 @@
|
|||
import { MysqlStore, MySqlSelectOption, MysqlOperateOption } from 'oak-db';
|
||||
import { DbConfiguration } from 'oak-db/src/types/configuration';
|
||||
import { MySqlSelectOption, MysqlOperateOption } from 'oak-db';
|
||||
import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types';
|
||||
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
|
||||
import { TriggerExecutor } from 'oak-domain/lib/store/TriggerExecutor';
|
||||
import { MySQLConfiguration, } from 'oak-db/lib/MySQL/types/Configuration';
|
||||
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
|
||||
import { AsyncRowStore, AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
|
||||
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
|
||||
import { RelationAuth } from 'oak-domain/lib/store/RelationAuth';
|
||||
import { DbTypeSymbol, getDbStoreClass } from './utils/dbPriority';
|
||||
import { CascadeStore } from 'oak-domain/lib/store/CascadeStore';
|
||||
import { DbStore } from 'oak-db/lib/types/dbStore';
|
||||
|
||||
|
||||
export class DbStore<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends MysqlStore<ED, Cxt> implements AsyncRowStore<ED, Cxt> {
|
||||
private executor: TriggerExecutor<ED, Cxt>;
|
||||
private relationAuth: RelationAuth<ED>;
|
||||
|
||||
constructor(
|
||||
storageSchema: StorageSchema<ED>,
|
||||
contextBuilder: () => Cxt,
|
||||
mysqlConfiguration: MySQLConfiguration,
|
||||
authDeduceRelationMap: AuthDeduceRelationMap<ED>,
|
||||
selectFreeEntities: SelectFreeEntities<ED> = [],
|
||||
updateFreeDict: UpdateFreeDict<ED> = {},
|
||||
onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>) {
|
||||
super(storageSchema, mysqlConfiguration);
|
||||
this.executor = new TriggerExecutor(contextBuilder, undefined, onVolatileTrigger);
|
||||
this.relationAuth = new RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
|
||||
}
|
||||
|
||||
protected async cascadeUpdateAsync<T extends keyof ED>(entity: T, operation: ED[T]['Operation'], context: AsyncContext<ED>, option: MysqlOperateOption) {
|
||||
// 如果是在modi处理过程中,所有的trigger也可以延时到apply时再处理(这时候因为modi中的数据并不实际存在,处理会有问题)
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.preOperation(entity, operation, context as Cxt, option);
|
||||
}
|
||||
const result = await super.cascadeUpdateAsync(entity, operation, context, option);
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.postOperation(entity, operation, context as Cxt, option);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
async operate<T extends keyof ED>(
|
||||
entity: T,
|
||||
operation: ED[T]['Operation'],
|
||||
context: Cxt,
|
||||
option: MysqlOperateOption
|
||||
) {
|
||||
const autoCommit = !context.getCurrentTxnId();
|
||||
let result;
|
||||
if (autoCommit) {
|
||||
await context.begin();
|
||||
}
|
||||
try {
|
||||
await this.relationAuth.checkRelationAsync(entity, operation, context);
|
||||
result = await super.operate(entity, operation, context, option);
|
||||
}
|
||||
catch (err) {
|
||||
await context.rollback();
|
||||
throw err;
|
||||
}
|
||||
if (autoCommit) {
|
||||
await context.commit();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
async select<T extends keyof ED>(
|
||||
entity: T,
|
||||
selection: ED[T]['Selection'],
|
||||
context: Cxt,
|
||||
option: MySqlSelectOption
|
||||
) {
|
||||
const autoCommit = !context.getCurrentTxnId();
|
||||
if (autoCommit) {
|
||||
await context.begin();
|
||||
}
|
||||
let result: Partial<ED[T]['Schema']>[];
|
||||
|
||||
// select的trigger应加在根select之前,cascade的select不加处理
|
||||
Object.assign(selection, {
|
||||
action: 'select',
|
||||
});
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.preOperation(entity, selection as ED[T]['Operation'], context, option);
|
||||
}
|
||||
if (!option.dontCollect) {
|
||||
await this.relationAuth.checkRelationAsync(entity, selection, context);
|
||||
}
|
||||
try {
|
||||
result = await super.select(entity, selection, context, option);
|
||||
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.postOperation(entity, selection as ED[T]['Operation']
|
||||
, context, option, result);
|
||||
}
|
||||
}
|
||||
catch (err) {
|
||||
await context.rollback();
|
||||
throw err;
|
||||
}
|
||||
if (autoCommit) {
|
||||
await context.commit();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
async count<T extends keyof ED>(entity: T, selection: Pick<ED[T]['Selection'], 'filter' | 'count'>, context: Cxt, option: SelectOption): Promise<number> {
|
||||
const autoCommit = !context.getCurrentTxnId();
|
||||
let result;
|
||||
if (autoCommit) {
|
||||
await context.begin();
|
||||
}
|
||||
try {
|
||||
// count不用检查权限,因为检查权限中本身要用到count
|
||||
// const selection2 = Object.assign({
|
||||
// action: 'select',
|
||||
// }, selection) as ED[T]['Operation'];
|
||||
|
||||
// await this.relationAuth.checkRelationAsync(entity, selection2, context);
|
||||
// if (!option.blockTrigger) {
|
||||
// await this.executor.preOperation(entity, selection2, context, option);
|
||||
// }
|
||||
result = await super.count(entity, selection, context, option);
|
||||
/* count应该不存在后trigger吧
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.postOperation(entity, selection2, context, option);
|
||||
} */
|
||||
}
|
||||
catch (err) {
|
||||
await context.rollback();
|
||||
throw err;
|
||||
}
|
||||
if (autoCommit) {
|
||||
await context.commit();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
registerTrigger<T extends keyof ED>(trigger: Trigger<ED, T, Cxt>) {
|
||||
this.executor.registerTrigger(trigger);
|
||||
}
|
||||
|
||||
registerChecker<T extends keyof ED>(checker: Checker<ED, T, Cxt>) {
|
||||
this.executor.registerChecker(checker, this.getSchema());
|
||||
}
|
||||
|
||||
export type TriggerStore<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> = {
|
||||
registerTrigger<T extends keyof ED>(trigger: Trigger<ED, T, Cxt>): void;
|
||||
registerChecker<T extends keyof ED>(checker: Checker<ED, T, Cxt>): void;
|
||||
setOnVolatileTrigger(
|
||||
onVolatileTrigger: <T extends keyof ED>(
|
||||
entity: T,
|
||||
trigger: VolatileTrigger<ED, T, Cxt>,
|
||||
ids: string[],
|
||||
trigger: VolatileTrigger<ED, T, Cxt>,
|
||||
ids: string[],
|
||||
cxtStr: string,
|
||||
option: OperateOption) => Promise<void>
|
||||
) {
|
||||
this.executor.setOnVolatileTrigger(onVolatileTrigger);
|
||||
}
|
||||
|
||||
async execVolatileTrigger<T extends keyof ED>(
|
||||
): void;
|
||||
execVolatileTrigger<T extends keyof ED>(
|
||||
entity: T,
|
||||
name: string,
|
||||
ids: string[],
|
||||
context: Cxt,
|
||||
option: OperateOption
|
||||
) {
|
||||
return this.executor.execVolatileTrigger(entity, name, ids, context, option);
|
||||
): Promise<void>;
|
||||
checkpoint(ts: number): Promise<number>;
|
||||
independentCheckPoint(name: string, ts: number, instanceCount?: number, instanceId?: number): Promise<number>;
|
||||
}
|
||||
|
||||
export type AppDbStore<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> = DbStore<ED, Cxt> & CascadeStore<ED> & TriggerStore<ED, Cxt>;
|
||||
|
||||
export const createDbStore = <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>(
|
||||
storageSchema: StorageSchema<ED>,
|
||||
contextBuilder: () => Cxt,
|
||||
dbConfiguration: DbConfiguration & {
|
||||
[DbTypeSymbol]?: string;
|
||||
},
|
||||
authDeduceRelationMap: AuthDeduceRelationMap<ED>,
|
||||
selectFreeEntities: SelectFreeEntities<ED> = [],
|
||||
updateFreeDict: UpdateFreeDict<ED> = {},
|
||||
onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>
|
||||
): AppDbStore<ED, Cxt> => {
|
||||
|
||||
// TODO: 这里的类型检查会过不去,因为ts不知道上层已经实现这个抽象类。
|
||||
const BaseStoreClass = getDbStoreClass<ED, Cxt>(dbConfiguration)
|
||||
|
||||
// 动态创建继承类
|
||||
// @ts-ignore
|
||||
class DynamicDbStore extends BaseStoreClass implements TriggerStore<ED, Cxt> {
|
||||
private executor: TriggerExecutor<ED, Cxt>;
|
||||
private relationAuth: RelationAuth<ED>;
|
||||
|
||||
constructor() {
|
||||
super(storageSchema, dbConfiguration);
|
||||
this.executor = new TriggerExecutor(contextBuilder, undefined, onVolatileTrigger);
|
||||
this.relationAuth = new RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
|
||||
}
|
||||
|
||||
checkRelationAsync<T extends keyof ED, Cxt extends AsyncContext<ED>>(entity: T, operation: Omit<ED[T]["Operation"] | ED[T]["Selection"], "id">, context: Cxt): Promise<void> {
|
||||
return this.relationAuth.checkRelationAsync(entity, operation, context);
|
||||
}
|
||||
|
||||
protected async cascadeUpdateAsync<T extends keyof ED>(entity: T, operation: ED[T]['Operation'], context: AsyncContext<ED>, option: MysqlOperateOption) {
|
||||
// 如果是在modi处理过程中,所有的trigger也可以延时到apply时再处理(这时候因为modi中的数据并不实际存在,处理会有问题)
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.preOperation(entity, operation, context as Cxt, option);
|
||||
}
|
||||
const result = await super.cascadeUpdateAsync(entity, operation, context, option);
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.postOperation(entity, operation, context as Cxt, option);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
async operate<T extends keyof ED>(
|
||||
entity: T,
|
||||
operation: ED[T]['Operation'],
|
||||
context: Cxt,
|
||||
option: MysqlOperateOption
|
||||
) {
|
||||
const autoCommit = !context.getCurrentTxnId();
|
||||
let result;
|
||||
if (autoCommit) {
|
||||
await context.begin();
|
||||
}
|
||||
try {
|
||||
await this.relationAuth.checkRelationAsync(entity, operation, context);
|
||||
result = await super.operate(entity, operation, context, option);
|
||||
}
|
||||
catch (err) {
|
||||
await context.rollback();
|
||||
throw err;
|
||||
}
|
||||
if (autoCommit) {
|
||||
await context.commit();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
async select<T extends keyof ED>(
|
||||
entity: T,
|
||||
selection: ED[T]['Selection'],
|
||||
context: Cxt,
|
||||
option: MySqlSelectOption
|
||||
) {
|
||||
const autoCommit = !context.getCurrentTxnId();
|
||||
if (autoCommit) {
|
||||
await context.begin();
|
||||
}
|
||||
let result: Partial<ED[T]['Schema']>[];
|
||||
|
||||
// select的trigger应加在根select之前,cascade的select不加处理
|
||||
Object.assign(selection, {
|
||||
action: 'select',
|
||||
});
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.preOperation(entity, selection as ED[T]['Operation'], context, option);
|
||||
}
|
||||
if (!option.dontCollect) {
|
||||
await this.relationAuth.checkRelationAsync(entity, selection, context);
|
||||
}
|
||||
try {
|
||||
result = await super.select(entity, selection, context, option);
|
||||
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.postOperation(entity, selection as ED[T]['Operation']
|
||||
, context, option, result);
|
||||
}
|
||||
}
|
||||
catch (err) {
|
||||
await context.rollback();
|
||||
throw err;
|
||||
}
|
||||
if (autoCommit) {
|
||||
await context.commit();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
async count<T extends keyof ED>(entity: T, selection: Pick<ED[T]['Selection'], 'filter' | 'count'>, context: Cxt, option: SelectOption): Promise<number> {
|
||||
const autoCommit = !context.getCurrentTxnId();
|
||||
let result;
|
||||
if (autoCommit) {
|
||||
await context.begin();
|
||||
}
|
||||
try {
|
||||
// count不用检查权限,因为检查权限中本身要用到count
|
||||
// const selection2 = Object.assign({
|
||||
// action: 'select',
|
||||
// }, selection) as ED[T]['Operation'];
|
||||
|
||||
// await this.relationAuth.checkRelationAsync(entity, selection2, context);
|
||||
// if (!option.blockTrigger) {
|
||||
// await this.executor.preOperation(entity, selection2, context, option);
|
||||
// }
|
||||
result = await super.count(entity, selection, context, option);
|
||||
/* count应该不存在后trigger吧
|
||||
if (!option.blockTrigger) {
|
||||
await this.executor.postOperation(entity, selection2, context, option);
|
||||
} */
|
||||
}
|
||||
catch (err) {
|
||||
await context.rollback();
|
||||
throw err;
|
||||
}
|
||||
if (autoCommit) {
|
||||
await context.commit();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
registerTrigger<T extends keyof ED>(trigger: Trigger<ED, T, Cxt>) {
|
||||
this.executor.registerTrigger(trigger);
|
||||
}
|
||||
|
||||
registerChecker<T extends keyof ED>(checker: Checker<ED, T, Cxt>) {
|
||||
this.executor.registerChecker(checker, this.getSchema());
|
||||
}
|
||||
|
||||
setOnVolatileTrigger(
|
||||
onVolatileTrigger: <T extends keyof ED>(
|
||||
entity: T,
|
||||
trigger: VolatileTrigger<ED, T, Cxt>,
|
||||
ids: string[],
|
||||
cxtStr: string,
|
||||
option: OperateOption) => Promise<void>
|
||||
) {
|
||||
this.executor.setOnVolatileTrigger(onVolatileTrigger);
|
||||
}
|
||||
|
||||
async execVolatileTrigger<T extends keyof ED>(
|
||||
entity: T,
|
||||
name: string,
|
||||
ids: string[],
|
||||
context: Cxt,
|
||||
option: OperateOption
|
||||
) {
|
||||
return this.executor.execVolatileTrigger(entity, name, ids, context, option);
|
||||
}
|
||||
|
||||
checkpoint(ts: number) {
|
||||
return this.executor.checkpoint(ts);
|
||||
}
|
||||
|
||||
independentCheckPoint(name: string, ts: number, instanceCount?: number, instanceId?: number) {
|
||||
return this.executor.independentCheckPoint(name, ts, instanceCount, instanceId);
|
||||
}
|
||||
}
|
||||
|
||||
checkpoint(ts: number) {
|
||||
return this.executor.checkpoint(ts);
|
||||
}
|
||||
|
||||
independentCheckPoint(name: string, ts: number, instanceCount?: number, instanceId?: number) {
|
||||
return this.executor.independentCheckPoint(name, ts, instanceCount, instanceId);
|
||||
}
|
||||
}
|
||||
return new DynamicDbStore() as AppDbStore<ED, Cxt>;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -0,0 +1,87 @@
|
|||
import { EntityDict } from 'oak-domain/lib/types';
|
||||
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
|
||||
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
|
||||
import { join } from 'node:path';
|
||||
import requireSth from '../utils/requirePrj';
|
||||
import { analyzeDepedency } from 'oak-domain/lib/compiler/dependencyBuilder';
|
||||
import assert from 'node:assert';
|
||||
import { isEqual } from 'lodash';
|
||||
import { generateNewIdAsync } from 'oak-domain/lib/utils/uuid';
|
||||
|
||||
async function checkAndUpdateI18nInner<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>>(context: Cxt, onlyCheck?: true) {
|
||||
const pwd = process.cwd();
|
||||
const i18nData = requireSth(pwd, join('lib', 'data', 'i18n'), analyzeDepedency(pwd).ascOrder) as BaseEntityDict['i18n']['OpSchema'][];
|
||||
|
||||
const originI18nData = await context.select('i18n', {
|
||||
data: {
|
||||
id: 1,
|
||||
namespace: 1,
|
||||
language: 1,
|
||||
module: 1,
|
||||
position: 1,
|
||||
data: 1,
|
||||
},
|
||||
}, { dontCollect: true });
|
||||
|
||||
const originDataDict: Record<string, BaseEntityDict['i18n']['OpSchema']> = {};
|
||||
originI18nData.forEach(
|
||||
(data) => originDataDict[data.id!] = data as BaseEntityDict['i18n']['OpSchema']
|
||||
);
|
||||
|
||||
const result = i18nData.map(
|
||||
async (i18n) => {
|
||||
const { id, namespace, language, module, position, data } = i18n;
|
||||
const origin = originDataDict[id];
|
||||
|
||||
if (origin) {
|
||||
assert(namespace === origin.namespace && language === origin.language && module === origin.module && position === origin.position);
|
||||
if (!isEqual(data, origin.data)) {
|
||||
console.log(`[${namespace}]数据${onlyCheck ? '需要更新' : '将被更新'}`);
|
||||
if (!onlyCheck) {
|
||||
return context.operate('i18n', {
|
||||
id: await generateNewIdAsync(),
|
||||
action: 'update',
|
||||
data: {
|
||||
data,
|
||||
},
|
||||
filter: {
|
||||
id,
|
||||
},
|
||||
}, {});
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
console.log(`[${namespace}]数据${onlyCheck ? '需要新建' : '将被新建'}`);
|
||||
if (!onlyCheck) {
|
||||
return context.operate('i18n', {
|
||||
id: await generateNewIdAsync(),
|
||||
action: 'create',
|
||||
data: i18n,
|
||||
}, {});
|
||||
}
|
||||
}
|
||||
return Promise.resolve();
|
||||
}
|
||||
);
|
||||
|
||||
return await Promise.all(result);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查项目目录下的i18n数据和数据库中的差异
|
||||
* @param context
|
||||
* @returns
|
||||
*/
|
||||
export function checkI18n<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>>(context: Cxt) {
|
||||
return checkAndUpdateI18nInner<ED, Cxt>(context, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查项目目录下的i18n数据和数据库中的差异,并更新
|
||||
* @param context
|
||||
* @returns
|
||||
*/
|
||||
export function checkAndUpdateI18n<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>>(context: Cxt) {
|
||||
return checkAndUpdateI18nInner<ED, Cxt>(context);
|
||||
}
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
import { BaseEntityDict } from "oak-domain";
|
||||
import { AsyncContext } from "oak-domain/lib/store/AsyncRowStore";
|
||||
import { EntityDict, OakException } from "oak-domain/lib/types";
|
||||
|
||||
export type InternalErrorType = 'aspect' | 'trigger' | 'watcher' | 'timer' | 'checkpoint'
|
||||
export type InternalErrorHandler<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> = {
|
||||
name: string;
|
||||
handle: (ctx: Cxt, type: InternalErrorType, message:string, err: Error) => Promise<void>;
|
||||
}
|
||||
export type ExceptionPublisher = (type: string, message: string, err: any) => Promise<void>;
|
||||
|
|
@ -0,0 +1,69 @@
|
|||
import { MysqlStore, PostgreSQLStore } from "oak-db";
|
||||
import { DbConfiguration } from "oak-db/src/types/configuration";
|
||||
import { AsyncContext, AsyncRowStore } from "oak-domain/lib/store/AsyncRowStore";
|
||||
import { join } from "path";
|
||||
import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption, RowStore } from 'oak-domain/lib/types';
|
||||
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
|
||||
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
|
||||
import { CascadeStore } from "oak-domain/lib/store/CascadeStore";
|
||||
import { existsSync } from "fs";
|
||||
import { DbStore } from "oak-db/lib/types/dbStore";
|
||||
|
||||
/**
|
||||
* 数据库优先级列表,按顺序尝试获取配置文件
|
||||
*/
|
||||
export const dbList = {
|
||||
mysql: MysqlStore,
|
||||
postgres: PostgreSQLStore
|
||||
}
|
||||
|
||||
export const DbTypeSymbol = Symbol.for('oak:backend:db:type')
|
||||
|
||||
export const getDbConfig = (path: string): DbConfiguration & {
|
||||
[DbTypeSymbol]: string;
|
||||
} => {
|
||||
for (const db of Object.keys(dbList)) {
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-var-requires
|
||||
let dbConfigFile = join(path, 'configuration', `${db}.${process.env.NODE_ENV}.json`);
|
||||
if (existsSync(dbConfigFile) === false) {
|
||||
dbConfigFile = join(path, 'configuration', `${db}.json`);
|
||||
}
|
||||
|
||||
if (existsSync(dbConfigFile) === false) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const config = require(dbConfigFile);
|
||||
console.log(`使用${db}作为数据库`);
|
||||
// 定义不可枚举的属性,避免被序列化
|
||||
Object.defineProperty(config, DbTypeSymbol, {
|
||||
value: db,
|
||||
enumerable: false,
|
||||
writable: false,
|
||||
configurable: false
|
||||
});
|
||||
return config;
|
||||
}
|
||||
catch (err) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error(`没有找到数据库配置文件,请在configuration目录下添加任一配置文件:${Object.keys(dbList).map(ele => `${ele}.json`).join('、')}`);
|
||||
}
|
||||
|
||||
export const getDbStoreClass = <ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>>
|
||||
(config: {
|
||||
[DbTypeSymbol]?: string;
|
||||
}) => {
|
||||
const dbType = Object.getOwnPropertyDescriptor(config, DbTypeSymbol)?.value;
|
||||
if (!dbType) {
|
||||
throw new Error('无法获取数据库类型,请确认是否存在数据库配置文件');
|
||||
}
|
||||
const DbStoreClass = dbList[dbType.toLowerCase() as keyof typeof dbList];
|
||||
if (!DbStoreClass) {
|
||||
throw new Error(`不支持的数据库类型:${dbType},请确认是否存在以下配置文件:${Object.keys(dbList).map(ele => `${ele}.json`).join('、')}`);
|
||||
}
|
||||
return DbStoreClass as new (schema: StorageSchema<ED>, config: DbConfiguration) => DbStore<ED, Cxt> & CascadeStore<ED>;
|
||||
}
|
||||
|
|
@ -0,0 +1,27 @@
|
|||
import { existsSync } from "fs";
|
||||
import { mergeConcatMany } from "oak-domain/lib/utils/lodash";
|
||||
import { join } from "path";
|
||||
|
||||
export default function requireSth(prjPath: string, filePath: string, dependencies: string[]) {
|
||||
const depFilePath = join(prjPath, filePath);
|
||||
let sth: any;
|
||||
if (existsSync(`${depFilePath}.js`)) {
|
||||
sth = require(join(prjPath, filePath)).default;
|
||||
}
|
||||
const sthExternal = dependencies.map(
|
||||
ele => {
|
||||
const depFilePath = join(prjPath, 'node_modules', ele, filePath);
|
||||
if (existsSync(`${depFilePath}.js`)) {
|
||||
return require(depFilePath).default
|
||||
}
|
||||
}
|
||||
).filter(
|
||||
ele => !!ele
|
||||
);
|
||||
|
||||
if (sth) {
|
||||
sthExternal.push(sth);
|
||||
}
|
||||
|
||||
return mergeConcatMany(sthExternal);
|
||||
}
|
||||
Loading…
Reference in New Issue