oak-backend-base/lib/AppLoader.js

599 lines
27 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.AppLoader = void 0;
const tslib_1 = require("tslib");
const fs_1 = require("fs");
const path_1 = require("path");
const node_schedule_1 = require("node-schedule");
const IntrinsicLogics_1 = require("oak-domain/lib/store/IntrinsicLogics");
const lodash_1 = require("oak-domain/lib/utils/lodash");
const uuid_1 = require("oak-domain/lib/utils/uuid");
const types_1 = require("oak-domain/lib/types");
const DbStore_1 = require("./DbStore");
const index_1 = tslib_1.__importStar(require("oak-common-aspect/lib/index"));
const assert_1 = tslib_1.__importDefault(require("assert"));
const dependencyBuilder_1 = require("oak-domain/lib/compiler/dependencyBuilder");
const DataSubscriber_1 = tslib_1.__importDefault(require("./cluster/DataSubscriber"));
const env_1 = require("./cluster/env");
const Synchronizer_1 = tslib_1.__importDefault(require("./Synchronizer"));
const i18n_1 = tslib_1.__importDefault(require("oak-domain/lib/data/i18n"));
const requirePrj_1 = tslib_1.__importDefault(require("./utils/requirePrj"));
class AppLoader extends types_1.AppLoader {
dbStore;
aspectDict;
externalDependencies;
dataSubscriber;
synchronizer;
contextBuilder;
nsSocket;
watcherTimerId;
scheduledJobs = {};
internalErrorHandlers = new Array();
regAllExceptionHandler() {
const handlers = this.requireSth('lib/configuration/exception');
if (Array.isArray(handlers)) {
handlers.forEach((handler) => {
console.log(`注册内部错误处理器: ${handler.name}`);
this.registerInternalErrorHandler(handler);
});
}
else {
console.warn('lib/configuration/exception必须默认导出一个处理器数组当前导出类型不正确将忽略此配置');
}
}
/**
* 注册一个内部错误处理器
* @param handler 内部错误处理器
*/
registerInternalErrorHandler(handler) {
// 检查有没有名称重复
if (this.internalErrorHandlers.find(h => h.name === handler.name)) {
throw new Error(`内部错误处理器名称重复: ${handler.name}`);
}
this.internalErrorHandlers.push(handler);
}
/**
* 发布内部错误事件给注册的处理器
*/
async publishInternalError(type, message, err) {
const errorToPublish = (0, lodash_1.cloneDeep)(err);
await Promise.all(this.internalErrorHandlers.map((handler) => {
return new Promise(async (resolve) => {
const ctx = await this.makeContext();
try {
console.log(`调用internalErrorHandler【${handler.name}】处理内部错误事件`);
await handler.handle(ctx, type, message, errorToPublish);
await ctx.commit();
}
catch (e) {
console.error('执行internalErrorHandler时出错', e);
await ctx.rollback();
}
finally {
resolve();
}
});
}));
}
requireSth(filePath) {
return (0, requirePrj_1.default)(this.path, filePath, this.externalDependencies);
}
async makeContext(cxtStr, headers) {
const context = this.contextBuilder(this.dbStore);
await context.begin();
try {
await context.initialize(cxtStr ? JSON.parse(cxtStr) : undefined);
}
catch (err) {
await context.rollback();
throw err;
}
context.headers = headers;
return context;
}
/**
* 后台启动的configuration统一放在这里读取
*/
getConfiguration() {
const dbConfigFile = (0, path_1.join)(this.path, 'configuration', 'mysql.json');
const dbConfig = require(dbConfigFile);
const syncConfigFile = (0, path_1.join)(this.path, 'lib', 'configuration', 'sync.js');
const syncConfigs = (0, fs_1.existsSync)(syncConfigFile) && require(syncConfigFile).default;
return {
dbConfig: dbConfig,
syncConfig: syncConfigs,
};
}
constructor(path, nsSubscribe, nsSocket, nsServer) {
super(path);
const { dbConfig } = this.getConfiguration();
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
const depGraph = (0, dependencyBuilder_1.analyzeDepedency)(process.cwd());
this.externalDependencies = depGraph.ascOrder;
const { authDeduceRelationMap, selectFreeEntities, updateFreeDict } = this.requireSth('lib/configuration/relation');
this.aspectDict = Object.assign({}, index_1.default, this.requireSth('lib/aspects/index'));
this.dbStore = new DbStore_1.DbStore(storageSchema, () => this.contextBuilder(this.dbStore), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
if (nsSubscribe) {
this.dataSubscriber = new DataSubscriber_1.default(nsSubscribe, nsServer);
}
this.nsSocket = nsSocket;
const { BackendRuntimeContext } = require(`${path}/lib/context/BackendRuntimeContext`);
const loaderThis = this;
// 需要重载context上的构造和commit方法否则程序中执行context.restartToExecute这样的方法中new一个context出来是无法正确执行的
class BackendRuntimeContextWrapper extends BackendRuntimeContext {
constructor(store) {
super(store);
this.clusterInfo = (0, env_1.getClusterInfo)();
}
async commit() {
const { eventOperationMap, opRecords } = this;
await super.commit();
// 注入在提交后向dataSubscribe发送订阅的事件
if (loaderThis.dataSubscriber) {
Object.keys(eventOperationMap).forEach((event) => {
const ids = eventOperationMap[event];
const opRecordsToPublish = opRecords.filter((ele) => !!ele.id && ids.includes(ele.id));
if (opRecordsToPublish.length !== ids.length && process.env.NODE_ENV === 'development') {
console.warn('要推送的事件的operation数量不足event事件中记录的数据请检查是否有空operation被加入了推送事件');
}
loaderThis.dataSubscriber.publishEvent(event, opRecordsToPublish, this.getSubscriberId());
});
}
}
}
;
this.contextBuilder = (store) => new BackendRuntimeContextWrapper(store);
}
registerTrigger(trigger) {
this.dbStore.registerTrigger(trigger);
}
initTriggers() {
const triggers = this.requireSth('lib/triggers/index');
const checkers = this.requireSth('lib/checkers/index');
const { actionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
const attrUpdateMatrix = this.requireSth('lib/configuration/attrUpdateMatrix');
const { triggers: adTriggers, checkers: adCheckers } = (0, IntrinsicLogics_1.makeIntrinsicLogics)(this.dbStore.getSchema(), actionDefDict, attrUpdateMatrix);
triggers.forEach((trigger) => this.registerTrigger(trigger));
adTriggers.forEach((trigger) => this.registerTrigger(trigger));
checkers.forEach((checker) => this.dbStore.registerChecker(checker));
adCheckers.forEach((checker) => this.dbStore.registerChecker(checker));
if (this.synchronizer) {
// 同步数据到远端结点通过commit trigger来完成
const syncTriggers = this.synchronizer.getSyncTriggers();
syncTriggers.forEach((trigger) => this.registerTrigger(trigger));
}
}
initSocket() {
// todo
const socketFilePath = (0, path_1.join)(this.path, 'lib/socket/index');
if ((0, fs_1.existsSync)(socketFilePath)) {
const { registerSocketEntry } = require(socketFilePath);
(0, assert_1.default)(typeof registerSocketEntry === 'function');
}
}
async mount(initialize) {
const { path } = this;
if (!initialize) {
const { syncConfig: syncConfig } = this.getConfiguration();
if (syncConfig) {
this.synchronizer = new Synchronizer_1.default(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore));
}
this.initTriggers();
this.initSocket();
}
const { importations, exportations } = require(`${path}/lib/ports/index`);
(0, index_1.registerPorts)(importations || [], exportations || []);
this.dbStore.connect();
}
async unmount() {
if (this.watcherTimerId) {
console.log('取消watcher...');
clearTimeout(this.watcherTimerId);
this.watcherTimerId = undefined;
}
for (const job in this.scheduledJobs) {
console.log(`取消定时任务【${job}】...`);
await this.scheduledJobs[job]?.cancel();
delete this.scheduledJobs[job];
}
await (0, index_1.clearPorts)();
await this.dbStore.disconnect();
}
async execAspect(name, headers, contextString, params) {
// 从aspect过来的不能有空cxtString以防被误判为root
const context = await this.makeContext(contextString || '{}', headers);
const fn = this.aspectDict[name];
try {
if (!fn) {
throw new Error(`不存在的接口名称: ${name}`);
}
const result = await fn(params, context);
await context.refineOpRecords();
const { opRecords } = context;
const message = context.getMessage();
await context.commit();
return {
opRecords,
message,
result,
};
}
catch (err) {
console.error(`执行aspect「${name}」出错`, err);
await context.rollback();
this.publishInternalError(`aspect`, `执行aspect「${name}」出错`, err);
if (err instanceof types_1.OakException) {
throw err;
}
if (err instanceof Error) {
const exception = await context.tryDeduceException(err);
if (exception) {
throw exception;
}
}
throw err;
}
}
async initialize(ifExists) {
await this.dbStore.initialize({ ifExists });
const data = this.requireSth('lib/data/index');
// oak-domain中只有i18n
(0, assert_1.default)(data.i18n);
data.i18n.push(...i18n_1.default);
const context = this.contextBuilder(this.dbStore);
context.openRootMode();
for (const entity in data) {
let rows = data[entity];
if (rows.length > 0) {
await context.begin();
// 如果是static的对象只要表中有数据就pass
const [first] = await this.dbStore.select(entity, {
data: {
id: 1,
},
indexFrom: 0,
count: 1,
}, context, {});
if (this.dbStore.getSchema()[entity].static) {
if (first) {
await context.commit();
console.log(`data in ${entity} omitted, ${rows.length} rows passed`);
continue;
}
}
// 再插入所有的行
try {
const insertRows = async (idx) => {
const rows2 = rows.slice(idx, 1000);
if (rows2.length > 0) {
await this.dbStore.operate(entity, {
data: rows,
action: 'create',
}, context, {
dontCollect: true,
dontCreateOper: true,
blockTrigger: true,
});
if (rows2.length === 1000) {
await insertRows(idx + 1000);
}
}
};
await insertRows(0);
await context.commit();
console.log(`data in ${entity} initialized, ${rows.length} rows inserted`);
}
catch (err) {
await context.rollback();
console.error(`data on ${entity} initilization failed!`);
throw err;
}
}
}
await this.dbStore.disconnect();
}
getStore() {
return this.dbStore;
}
getEndpoints(prefix) {
const endpoints = this.requireSth('lib/endpoints/index');
const endPointRouters = [];
const endPointMap = {};
const transformEndpointItem = (key, item) => {
const { name, method, fn, params: itemParams, type } = item;
const k = `${key}-${name}-${method}`;
const makeEndpoint = async () => {
endPointMap[k] = true;
let url = `${prefix}/${key}`;
if (itemParams) {
for (const p of itemParams) {
url += `/:${p}`;
}
}
endPointRouters.push([name, method, url, async (params, headers, req, body) => {
if (type == "free") {
const result = await fn(() => this.makeContext(undefined, headers), params, headers, req, body);
return result;
}
else {
const context = await this.makeContext(undefined, headers);
try {
const data = await fn(context, params, headers, req, body);
await context.commit();
return {
data,
};
}
catch (err) {
await context.rollback();
console.error(`endpoint「${key}」方法「${method}」出错`, err);
throw err;
}
}
}]);
};
if (endPointMap[k]) {
if (process.env.NODE_ENV === 'development') {
// 这里发现在热重载模式下会出现报错debug跟到requireSth发现问题怀疑是node的require机制导致的先加个容错在其他环境肯定不会出现
console.warn(`endpoint中url为「${key}」、名为${name}的方法「${method}」存在重复定义,将进行覆盖`);
makeEndpoint();
return;
}
throw new Error(`endpoint中url为「${key}」、名为${name}的方法「${method}」存在重复定义`);
}
makeEndpoint();
};
if (endpoints) {
for (const router in endpoints) {
const item = endpoints[router];
if (item instanceof Array) {
item.forEach(ele => transformEndpointItem(router, ele));
}
else {
transformEndpointItem(router, item);
}
}
}
if (this.synchronizer) {
const syncEp = this.synchronizer.getSelfEndpoint();
transformEndpointItem(syncEp.name, syncEp);
}
return endPointRouters;
}
operateInWatcher(entity, operation, context, singleton) {
return this.dbStore.operate(entity, operation, context, {});
}
selectInWatcher(entity, selection, context, forUpdate, singleton) {
return this.dbStore.select(entity, selection, context, {
blockTrigger: true,
forUpdate,
});
}
async execWatcher(watcher) {
const context = await this.makeContext();
let result;
try {
if (watcher.hasOwnProperty('actionData')) {
const { entity, action, filter, actionData, singleton } = watcher;
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter);
const data = typeof actionData === 'function' ? await (actionData)() : (0, lodash_1.cloneDeep)(actionData);
result = await this.operateInWatcher(entity, {
id: await (0, uuid_1.generateNewIdAsync)(),
action: action,
data,
filter: filter2,
}, context, singleton);
}
else {
const { entity, projection, fn, filter, singleton, forUpdate } = watcher;
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter);
const projection2 = typeof projection === 'function' ? await projection() : (0, lodash_1.cloneDeep)(projection);
const rows = await this.selectInWatcher(entity, {
data: projection2,
filter: filter2,
}, context, forUpdate, singleton);
if (rows.length > 0) {
result = await fn(context, rows);
}
}
await context.commit();
return result;
}
catch (err) {
if (err instanceof types_1.OakPartialSuccess) {
await context.commit();
}
else {
await context.rollback();
}
// 不能在这里publish因为这个方法可能是在timer中调用也可能是在routine中调用
throw err;
}
}
getCheckpointTs() {
const now = Date.now();
return process.env.NODE_ENV === 'development' ? now - 30 * 1000 : now - 120 * 1000;
}
checkpoint() {
return this.dbStore.checkpoint(this.getCheckpointTs());
}
startWatchers() {
const watchers = this.requireSth('lib/watchers/index');
const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
const { watchers: adWatchers } = (0, IntrinsicLogics_1.makeIntrinsicLogics)(this.dbStore.getSchema(), ActionDefDict);
const totalWatchers = (watchers || []).concat(adWatchers);
let count = 0;
const execOne = async (watcher, start) => {
try {
const result = await this.execWatcher(watcher);
if (result) {
console.log(`执行watcher【${watcher.name}】成功,耗时【${Date.now() - start}】,结果是:`, result);
}
}
catch (err) {
console.error(`执行watcher【${watcher.name}】失败,耗时【${Date.now() - start}】,结果是:`, err);
await this.publishInternalError(`watcher`, `执行watcher【${watcher.name}】失败`, err);
}
};
const doWatchers = async () => {
count++;
const start = Date.now();
for (const w of totalWatchers) {
/**
* todo 原来这里是所有的watcher并行会产生死锁先改成串行后续再优化
*/
await execOne(w, start);
}
const duration = Date.now() - start;
console.log(`${count}次执行watchers共执行${watchers.length}个,耗时${duration}毫秒`);
try {
await this.checkpoint();
}
catch (err) {
console.error(`执行了checkpoint发生错误`, err);
await this.publishInternalError(`checkpoint`, `执行checkpoint发生错误`, err);
}
this.watcherTimerId = setTimeout(() => doWatchers(), 120000);
};
doWatchers();
}
execFreeTimer(timer, context) {
const { timer: timerFn } = timer;
return timerFn(context);
}
startTimers() {
const timers = this.requireSth('lib/timers/index');
if (timers) {
for (const timer of timers) {
const { cron, name } = timer;
const job = (0, node_schedule_1.scheduleJob)(name, cron, async (date) => {
const start = Date.now();
console.log(`定时器【${name}】开始执行,时间是【${date.toLocaleTimeString()}`);
if (timer.hasOwnProperty('entity')) {
try {
const result = await this.execWatcher(timer);
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒】,结果是`, result);
}
catch (err) {
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒】,错误是`, err);
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
}
}
else {
const context = await this.makeContext();
try {
const result = await this.execFreeTimer(timer, context);
if (result) {
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是【${result}`);
}
await context.commit();
}
catch (err) {
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
if (err instanceof types_1.OakPartialSuccess) {
await context.commit();
}
else {
await context.rollback();
}
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
}
}
});
if (!job) {
// console.error(`定时器【${name}】创建失败请检查cron表达式是否正确`);
throw new Error(`定时器【${name}】创建失败请检查cron表达式是否正确`);
}
if (this.scheduledJobs[name]) {
// console.error(`定时器【${name}】已经存在,请检查定时器名称是否重复`);
throw new Error(`定时器【${name}】已经存在,请检查定时器名称是否重复`);
}
this.scheduledJobs[name] = job;
}
}
}
async execStartRoutines() {
const routines = this.requireSth('lib/routines/start') || [];
for (const routine of routines) {
console.log(`执行启动例程【${routine.name}】...`);
if (routine.hasOwnProperty('entity')) {
const start = Date.now();
try {
const result = await this.execWatcher(routine);
console.log(`例程【${routine.name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
}
catch (err) {
console.error(`例程【${routine.name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
throw err;
}
}
else {
const { name, routine: routineFn } = routine;
const context = await this.makeContext();
const start = Date.now();
try {
const result = await routineFn(context, {
socket: this.nsSocket,
contextBuilder: () => this.makeContext(),
});
console.log(`例程【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是【${result}`);
await context.commit();
}
catch (err) {
console.error(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
await context.rollback();
throw err;
}
}
}
}
async execStopRoutines() {
const routines = this.requireSth('lib/routines/stop') || [];
for (const routine of routines) {
console.log(`执行停止例程【${routine.name}】...`);
if (routine.hasOwnProperty('entity')) {
const start = Date.now();
try {
const result = await this.execWatcher(routine);
console.log(`例程【${routine.name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
}
catch (err) {
console.error(`例程【${routine.name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
throw err;
}
}
else {
const { name, routine: routineFn } = routine;
const context = await this.makeContext();
const start = Date.now();
try {
const result = await routineFn(context, {
socket: this.nsSocket,
contextBuilder: () => this.makeContext(),
});
console.log(`例程【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是【${result}`);
await context.commit();
}
catch (err) {
console.error(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
await context.rollback();
throw err;
}
}
}
}
async execRoutine(routine) {
const context = await this.makeContext();
try {
const result = await routine(context);
await context.commit();
return result;
}
catch (e) {
await context.rollback();
throw e;
}
}
}
exports.AppLoader = AppLoader;