oak-backend-base/lib/AppLoader.js

740 lines
32 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.AppLoader = void 0;
const tslib_1 = require("tslib");
const fs_1 = require("fs");
const path_1 = require("path");
const node_schedule_1 = require("node-schedule");
const IntrinsicLogics_1 = require("oak-domain/lib/store/IntrinsicLogics");
const lodash_1 = require("oak-domain/lib/utils/lodash");
const uuid_1 = require("oak-domain/lib/utils/uuid");
const types_1 = require("oak-domain/lib/types");
const index_1 = tslib_1.__importStar(require("oak-common-aspect/lib/index"));
const assert_1 = tslib_1.__importDefault(require("assert"));
const dependencyBuilder_1 = require("oak-domain/lib/compiler/dependencyBuilder");
const DataSubscriber_1 = tslib_1.__importDefault(require("./cluster/DataSubscriber"));
const env_1 = require("./cluster/env");
const Synchronizer_1 = tslib_1.__importDefault(require("./Synchronizer"));
const i18n_1 = tslib_1.__importDefault(require("oak-domain/lib/data/i18n"));
const requirePrj_1 = tslib_1.__importDefault(require("./utils/requirePrj"));
const dbPriority_1 = require("./utils/dbPriority");
const DbStore_1 = require("./DbStore");
class AppLoader extends types_1.AppLoader {
dbStore;
aspectDict;
externalDependencies;
dataSubscriber;
synchronizer;
contextBuilder;
nsSocket;
watcherTimerId;
scheduledJobs = {};
internalErrorHandlers = new Array();
watcherExecutingData = new Map();
regAllExceptionHandler() {
const handlers = this.requireSth('lib/configuration/exception');
if (Array.isArray(handlers)) {
handlers.forEach((handler) => {
console.log(`注册内部错误处理器: ${handler.name}`);
this.registerInternalErrorHandler(handler);
});
}
else {
console.warn('lib/configuration/exception必须默认导出一个处理器数组当前导出类型不正确将忽略此配置');
}
}
/**
* 注册一个内部错误处理器
* @param handler 内部错误处理器
*/
registerInternalErrorHandler(handler) {
// 检查有没有名称重复
if (this.internalErrorHandlers.find(h => h.name === handler.name)) {
throw new Error(`内部错误处理器名称重复: ${handler.name}`);
}
this.internalErrorHandlers.push(handler);
}
/**
* 发布内部错误事件给注册的处理器
*/
async publishInternalError(type, message, err) {
await Promise.all(this.internalErrorHandlers.map((handler) => {
return new Promise(async (resolve) => {
const ctx = await this.makeContext();
try {
console.log(`调用internalErrorHandler【${handler.name}】处理内部错误事件`);
await handler.handle(ctx, type, message, err);
await ctx.commit();
}
catch (e) {
console.error('执行internalErrorHandler时出错', e);
await ctx.rollback();
}
finally {
resolve();
}
});
}));
}
requireSth(filePath) {
return (0, requirePrj_1.default)(this.path, filePath, this.externalDependencies);
}
async makeContext(cxtStr, headers) {
const context = this.contextBuilder(this.dbStore);
await context.begin();
try {
await context.initialize(cxtStr ? JSON.parse(cxtStr) : undefined);
}
catch (err) {
await context.rollback();
throw err;
}
context.headers = headers;
return context;
}
/**
* 获取数据库配置
* @returns 读取数据库配置
*/
getDbConfig() {
return (0, dbPriority_1.getDbConfig)(this.path);
}
/**
* 获取同步配置
* @returns 读取同步配置
*/
getSyncConfig() {
const syncConfigFile = (0, path_1.join)(this.path, 'lib', 'configuration', 'sync.js');
const syncConfigs = (0, fs_1.existsSync)(syncConfigFile) && require(syncConfigFile).default;
return syncConfigs;
}
constructor(path, nsSubscribe, nsSocket, nsServer) {
super(path);
const dbConfig = this.getDbConfig();
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
const depGraph = (0, dependencyBuilder_1.analyzeDepedency)(process.cwd());
this.externalDependencies = depGraph.ascOrder;
const { authDeduceRelationMap, selectFreeEntities, updateFreeDict } = this.requireSth('lib/configuration/relation');
this.aspectDict = Object.assign({}, index_1.default, this.requireSth('lib/aspects/index'));
this.dbStore = (0, DbStore_1.createDbStore)(storageSchema, () => this.contextBuilder(this.dbStore), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
if (nsSubscribe) {
this.dataSubscriber = new DataSubscriber_1.default(nsSubscribe, nsServer);
}
this.nsSocket = nsSocket;
const { BackendRuntimeContext } = require(`${path}/lib/context/BackendRuntimeContext`);
const loaderThis = this;
// 需要重载context上的构造和commit方法否则程序中执行context.restartToExecute这样的方法中new一个context出来是无法正确执行的
class BackendRuntimeContextWrapper extends BackendRuntimeContext {
constructor(store) {
super(store);
this.clusterInfo = (0, env_1.getClusterInfo)();
}
async commit() {
const { eventOperationMap, opRecords } = this;
await super.commit();
// 注入在提交后向dataSubscribe发送订阅的事件
if (loaderThis.dataSubscriber) {
Object.keys(eventOperationMap).forEach((event) => {
const ids = eventOperationMap[event];
const opRecordsToPublish = opRecords.filter((ele) => !!ele.id && ids.includes(ele.id));
if (opRecordsToPublish.length !== ids.length && process.env.NODE_ENV === 'development') {
console.warn('要推送的事件的operation数量不足event事件中记录的数据请检查是否有空operation被加入了推送事件');
}
loaderThis.dataSubscriber.publishEvent(event, opRecordsToPublish, this.getSubscriberId());
});
}
}
}
;
this.contextBuilder = (store) => new BackendRuntimeContextWrapper(store);
}
registerTrigger(trigger) {
this.dbStore.registerTrigger(trigger);
}
initTriggers() {
const triggers = this.requireSth('lib/triggers/index');
const checkers = this.requireSth('lib/checkers/index');
const { actionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
const attrUpdateMatrix = this.requireSth('lib/configuration/attrUpdateMatrix');
const { triggers: adTriggers, checkers: adCheckers } = (0, IntrinsicLogics_1.makeIntrinsicLogics)(this.dbStore.getSchema(), actionDefDict, attrUpdateMatrix);
triggers.forEach((trigger) => this.registerTrigger(trigger));
adTriggers.forEach((trigger) => this.registerTrigger(trigger));
checkers.forEach((checker) => this.dbStore.registerChecker(checker));
adCheckers.forEach((checker) => this.dbStore.registerChecker(checker));
if (this.synchronizer) {
// 同步数据到远端结点通过commit trigger来完成
const syncTriggers = this.synchronizer.getSyncTriggers();
syncTriggers.forEach((trigger) => this.registerTrigger(trigger));
}
}
initSocket() {
// todo
const socketFilePath = (0, path_1.join)(this.path, 'lib/socket/index');
if ((0, fs_1.existsSync)(socketFilePath)) {
const { registerSocketEntry } = require(socketFilePath);
(0, assert_1.default)(typeof registerSocketEntry === 'function');
}
}
async mount(initialize) {
const { path } = this;
if (!initialize) {
const syncConfig = this.getSyncConfig();
if (syncConfig) {
this.synchronizer = new Synchronizer_1.default(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore));
}
this.initTriggers();
this.initSocket();
}
const { importations, exportations } = require(`${path}/lib/ports/index`);
(0, index_1.registerPorts)(importations || [], exportations || []);
this.dbStore.connect();
}
async unmount() {
if (this.watcherTimerId) {
console.log('取消watcher...');
clearTimeout(this.watcherTimerId);
this.watcherTimerId = undefined;
}
for (const job in this.scheduledJobs) {
console.log(`取消定时任务【${job}】...`);
await this.scheduledJobs[job]?.cancel();
delete this.scheduledJobs[job];
}
await (0, index_1.clearPorts)();
await this.dbStore.disconnect();
}
async execAspect(name, headers, contextString, params) {
// 从aspect过来的不能有空cxtString以防被误判为root
const context = await this.makeContext(contextString || '{}', headers);
const fn = this.aspectDict[name];
try {
if (!fn) {
throw new Error(`不存在的接口名称: ${name}`);
}
const result = await fn(params, context);
await context.refineOpRecords();
const { opRecords } = context;
const message = context.getMessage();
await context.commit();
return {
opRecords,
message,
result,
};
}
catch (err) {
console.error(`执行aspect「${name}」出错`, err);
await context.rollback();
this.publishInternalError(`aspect`, `执行aspect「${name}」出错`, err);
if (err instanceof types_1.OakException) {
throw err;
}
if (err instanceof Error) {
const exception = await context.tryDeduceException(err);
if (exception) {
throw exception;
}
}
throw err;
}
}
async initialize(ifExists) {
await this.dbStore.initialize({ ifExists });
const data = this.requireSth('lib/data/index');
// oak-domain中只有i18n
(0, assert_1.default)(data.i18n);
data.i18n.push(...i18n_1.default);
const context = this.contextBuilder(this.dbStore);
context.openRootMode();
for (const entity in data) {
let rows = data[entity];
if (rows.length > 0) {
await context.begin();
// 如果是static的对象只要表中有数据就pass
const [first] = await this.dbStore.select(entity, {
data: {
id: 1,
},
indexFrom: 0,
count: 1,
}, context, {});
if (this.dbStore.getSchema()[entity].static) {
if (first) {
await context.commit();
console.log(`data in ${entity} omitted, ${rows.length} rows passed`);
continue;
}
}
// 再插入所有的行
try {
const insertRows = async (idx) => {
const rows2 = rows.slice(idx, 1000);
if (rows2.length > 0) {
await this.dbStore.operate(entity, {
data: rows,
action: 'create',
}, context, {
dontCollect: true,
dontCreateOper: true,
blockTrigger: true,
});
if (rows2.length === 1000) {
await insertRows(idx + 1000);
}
}
};
await insertRows(0);
await context.commit();
console.log(`data in ${entity} initialized, ${rows.length} rows inserted`);
}
catch (err) {
await context.rollback();
console.error(`data on ${entity} initilization failed!`);
throw err;
}
}
}
// await this.dbStore.disconnect(); // 不需要马上断开连接在initialize后可能还会有操作unmount时会断开
}
getStore() {
return this.dbStore;
}
getEndpoints(prefix) {
const endpoints = this.requireSth('lib/endpoints/index');
const endPointRouters = [];
const endPointMap = {};
const transformEndpointItem = (key, item) => {
const { name, method, fn, params: itemParams, type } = item;
const k = `${key}-${name}-${method}`;
const makeEndpoint = async () => {
endPointMap[k] = true;
let url = `${prefix}/${key}`;
if (itemParams) {
for (const p of itemParams) {
url += `/:${p}`;
}
}
endPointRouters.push([name, method, url, async (params, headers, req, body) => {
if (type == "free") {
const result = await fn(() => this.makeContext(undefined, headers), params, headers, req, body);
return result;
}
else {
const context = await this.makeContext(undefined, headers);
try {
const data = await fn(context, params, headers, req, body);
await context.commit();
return {
data,
};
}
catch (err) {
await context.rollback();
console.error(`endpoint「${key}」方法「${method}」出错`, err);
throw err;
}
}
}]);
};
if (endPointMap[k]) {
if (process.env.NODE_ENV === 'development') {
// 这里发现在热重载模式下会出现报错debug跟到requireSth发现问题怀疑是node的require机制导致的先加个容错在其他环境肯定不会出现
console.warn(`endpoint中url为「${key}」、名为${name}的方法「${method}」存在重复定义,将进行覆盖`);
makeEndpoint();
return;
}
throw new Error(`endpoint中url为「${key}」、名为${name}的方法「${method}」存在重复定义`);
}
makeEndpoint();
};
if (endpoints) {
for (const router in endpoints) {
const item = endpoints[router];
if (item instanceof Array) {
item.forEach(ele => transformEndpointItem(router, ele));
}
else {
transformEndpointItem(router, item);
}
}
}
if (this.synchronizer) {
const syncEp = this.synchronizer.getSelfEndpoint();
transformEndpointItem(syncEp.name, syncEp);
}
return endPointRouters;
}
operateInWatcher(entity, operation, context, singleton) {
return this.dbStore.operate(entity, operation, context, {});
}
selectInWatcher(entity, selection, context, forUpdate, singleton) {
return this.dbStore.select(entity, selection, context, {
blockTrigger: true,
forUpdate,
});
}
/**
* 检查某个数据是否正在被watcher执行
* @param name watcher名称
* @param dataId 数据ID
* @returns 如果没有正在执行则返回true否则返回false
*/
checkDataExecuting(name, dataId) {
let dataSet = this.watcherExecutingData.get(name);
if (!dataSet) {
dataSet = new Map();
this.watcherExecutingData.set(name, dataSet);
}
if (dataSet.has(dataId)) {
return false;
}
dataSet.set(dataId, true);
return true;
}
/**
* 过滤出未在执行中的数据行,并标记为执行中
* @returns [过滤后的行, 是否有行被跳过]
*/
filterAndMarkExecutingRows(watcher, rows) {
if (watcher.exclusive !== true) {
// 不需要排他执行,直接返回所有行
return [rows, []];
}
const rowsWithoutExecuting = [];
const skipedRows = [];
const watcherName = watcher.name;
for (const row of rows) {
if (!row.id) {
console.error(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】获取的数据没有ID跳过此数据的并发检查处理`, row);
rowsWithoutExecuting.push(row);
continue;
}
if (this.checkDataExecuting(watcherName, row.id)) {
rowsWithoutExecuting.push(row);
}
else {
skipedRows.push(row);
console.warn(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】将跳过正在被执行的数据ID${row.id}】,请检查是否执行超时`);
}
}
return [rowsWithoutExecuting, skipedRows];
}
/**
* 清理执行标记
*/
cleanupExecutingMarks(watcherName, rows) {
for (const row of rows) {
if (row.id) {
const dataSet = this.watcherExecutingData.get(watcherName);
if (dataSet) {
dataSet.delete(row.id);
}
}
}
}
/**
* 解析 filter 和 projection支持函数或静态值
*/
async resolveFilterAndProjection(filter, projection) {
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter);
const projection2 = typeof projection === 'function' ? await projection() : (0, lodash_1.cloneDeep)(projection);
return [filter2, projection2];
}
/**
* 执行 WB 类型 watcher 的查询操作
*/
async selectForWBWatcher(watcher, context) {
const { entity, projection, filter, singleton, forUpdate } = watcher;
const [filter2, projection2] = await this.resolveFilterAndProjection(filter, projection);
return await this.selectInWatcher(entity, {
data: projection2,
filter: filter2,
}, context, forUpdate, singleton);
}
async execWatcher(watcher) {
let result;
// BBWatcher直接操作无需查询
if (watcher.hasOwnProperty('actionData')) {
const context = await this.makeContext();
try {
const { entity, action, filter, actionData, singleton } = watcher;
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter);
const data = typeof actionData === 'function' ? await actionData() : (0, lodash_1.cloneDeep)(actionData);
result = await this.operateInWatcher(entity, {
id: await (0, uuid_1.generateNewIdAsync)(),
action: action,
data,
filter: filter2,
}, context, singleton);
await context.commit();
return result;
}
catch (err) {
if (err instanceof types_1.OakPartialSuccess) {
await context.commit();
}
else {
await context.rollback();
}
throw err;
}
}
// WBFreeWatcher 和 WBWatcher查询后执行
const isFreeType = watcher.hasOwnProperty('type') &&
watcher.type === 'free';
// 1. 执行查询WBFreeWatcher 使用独立 context
const selectContext = isFreeType ? await this.makeContext() : await this.makeContext();
const rows = await this.selectForWBWatcher(watcher, selectContext);
if (isFreeType) {
await selectContext.commit();
}
// 2. 并发检查:过滤出未在执行中的数据
const [rowsWithoutExecuting, hasSkipped] = this.filterAndMarkExecutingRows(watcher, rows);
if (rowsWithoutExecuting.length === 0) {
if (!isFreeType) {
await selectContext.commit();
}
this.cleanupExecutingMarks(watcher.name, hasSkipped);
return result;
}
// 3. 执行业务逻辑
try {
if (isFreeType) {
const { fn } = watcher;
result = await fn(() => this.makeContext(), rowsWithoutExecuting);
}
else {
const { fn } = watcher;
result = await fn(selectContext, rowsWithoutExecuting);
await selectContext.commit();
}
return result;
}
catch (err) {
// 清理执行标记
this.cleanupExecutingMarks(watcher.name, rows);
if (!isFreeType) {
if (err instanceof types_1.OakPartialSuccess) {
await selectContext.commit();
}
else {
await selectContext.rollback();
}
}
throw err;
}
}
getCheckpointTs() {
const now = Date.now();
return process.env.NODE_ENV === 'development' ? now - 30 * 1000 : now - 120 * 1000;
}
checkpoint() {
return this.dbStore.checkpoint(this.getCheckpointTs());
}
startWatchers() {
const watchers = this.requireSth('lib/watchers/index');
const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
const { watchers: adWatchers } = (0, IntrinsicLogics_1.makeIntrinsicLogics)(this.dbStore.getSchema(), ActionDefDict);
const totalWatchers = (watchers || []).concat(adWatchers);
let count = 0;
const skipOnceSet = new Set();
const execOne = async (watcher, start) => {
if (skipOnceSet.has(watcher.name)) {
skipOnceSet.delete(watcher.name);
console.log(`跳过本次执行watcher【${watcher.name}`);
return;
}
try {
const result = await this.execWatcher(watcher);
if (result) {
console.log(`执行watcher【${watcher.name}】成功,耗时【${Date.now() - start}】,结果是:`, result);
}
}
catch (err) {
console.error(`执行watcher【${watcher.name}】失败,耗时【${Date.now() - start}】,结果是:`, err);
await this.publishInternalError(`watcher`, `执行watcher【${watcher.name}】失败`, err);
}
};
const doWatchers = async () => {
count++;
const start = Date.now();
for (const w of totalWatchers) {
/**
* todo 原来这里是所有的watcher并行会产生死锁先改成串行后续再优化
*/
await execOne(w, start);
}
const duration = Date.now() - start;
console.log(`${count}次执行watchers共执行${watchers.length}个,耗时${duration}毫秒`);
try {
await this.checkpoint();
}
catch (err) {
console.error(`执行了checkpoint发生错误`, err);
await this.publishInternalError(`checkpoint`, `执行checkpoint发生错误`, err);
}
this.watcherTimerId = setTimeout(() => doWatchers(), 120000);
};
// 首次执行时跳过所有lazy的watcher
for (const w of totalWatchers) {
if (w.lazy) {
skipOnceSet.add(w.name);
}
}
doWatchers();
}
execBaseTimer(timer, context) {
const { timer: timerFn } = timer;
return timerFn(context);
}
execFreeTimer(timer, contextBuilder) {
const { timer: timerFn } = timer;
return timerFn(contextBuilder);
}
startTimers() {
const timers = this.requireSth('lib/timers/index');
if (timers) {
for (const timer of timers) {
const { cron, name } = timer;
const job = (0, node_schedule_1.scheduleJob)(name, cron, async (date) => {
const start = Date.now();
console.log(`定时器【${name}】开始执行,时间是【${date.toLocaleTimeString()}`);
if (timer.hasOwnProperty('entity')) {
try {
const result = await this.execWatcher(timer);
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒】,结果是`, result);
}
catch (err) {
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒】,错误是`, err);
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
}
}
else {
if (timer.hasOwnProperty('type') && timer.type === 'free') {
try {
const result = await this.execFreeTimer(timer, () => this.makeContext());
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
}
catch (err) {
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
}
return;
}
const context = await this.makeContext();
try {
const result = await this.execBaseTimer(timer, context);
if (result) {
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
}
await context.commit();
}
catch (err) {
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
if (err instanceof types_1.OakPartialSuccess) {
await context.commit();
}
else {
await context.rollback();
}
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
}
}
});
if (!job) {
// console.error(`定时器【${name}】创建失败请检查cron表达式是否正确`);
throw new Error(`定时器【${name}】创建失败请检查cron表达式是否正确`);
}
if (this.scheduledJobs[name]) {
// console.error(`定时器【${name}】已经存在,请检查定时器名称是否重复`);
throw new Error(`定时器【${name}】已经存在,请检查定时器名称是否重复`);
}
this.scheduledJobs[name] = job;
}
}
}
async execStartRoutines() {
const routines = this.requireSth('lib/routines/start') || [];
for (const routine of routines) {
console.log(`执行启动例程【${routine.name}】...`);
if (routine.hasOwnProperty('entity')) {
const start = Date.now();
try {
const result = await this.execWatcher(routine);
console.log(`例程【${routine.name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
}
catch (err) {
console.error(`例程【${routine.name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
throw err;
}
}
else {
const { name, routine: routineFn } = routine;
const context = await this.makeContext();
const start = Date.now();
try {
const result = await routineFn(context, {
socket: this.nsSocket,
contextBuilder: () => this.makeContext(),
});
console.log(`例程【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
await context.commit();
}
catch (err) {
console.error(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
await context.rollback();
throw err;
}
}
}
}
async execStopRoutines() {
const routines = this.requireSth('lib/routines/stop') || [];
for (const routine of routines) {
console.log(`执行停止例程【${routine.name}】...`);
if (routine.hasOwnProperty('entity')) {
const start = Date.now();
try {
const result = await this.execWatcher(routine);
console.log(`例程【${routine.name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
}
catch (err) {
console.error(`例程【${routine.name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
throw err;
}
}
else {
const { name, routine: routineFn } = routine;
const context = await this.makeContext();
const start = Date.now();
try {
const result = await routineFn(context, {
socket: this.nsSocket,
contextBuilder: () => this.makeContext(),
});
console.log(`例程【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
await context.commit();
}
catch (err) {
console.error(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
await context.rollback();
throw err;
}
}
}
}
async execRoutine(routine) {
const context = await this.makeContext();
try {
const result = await routine(context);
await context.commit();
return result;
}
catch (e) {
await context.rollback();
throw e;
}
}
}
exports.AppLoader = AppLoader;