oak-backend-base/src/AppLoader.ts

688 lines
29 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { existsSync } from 'fs';
import { join } from 'path';
import { Job, scheduleJob } from 'node-schedule';
import { makeIntrinsicLogics } from "oak-domain/lib/store/IntrinsicLogics";
import { cloneDeep, mergeConcatMany, omit } from 'oak-domain/lib/utils/lodash';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { generateNewIdAsync } from 'oak-domain/lib/utils/uuid';
import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, CreateOpResult, SyncConfig, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord, Routine, FreeRoutine, Timer, FreeTimer, StorageSchema, OperationResult, OakPartialSuccess, OakException } from "oak-domain/lib/types";
import { DbStore } from "./DbStore";
import generalAspectDict, { clearPorts, registerPorts } from 'oak-common-aspect/lib/index';
import { MySQLConfiguration } from 'oak-db/lib/MySQL/types/Configuration';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { Endpoint, EndpointItem } from 'oak-domain/lib/types/Endpoint';
import assert from 'assert';
import { IncomingHttpHeaders, IncomingMessage } from 'http';
import { Namespace } from 'socket.io';
import { analyzeDepedency } from 'oak-domain/lib/compiler/dependencyBuilder';
import DataSubscriber from './cluster/DataSubscriber';
import { getClusterInfo } from './cluster/env';
import Synchronizer from './Synchronizer';
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
import domainI18nData from 'oak-domain/lib/data/i18n';
import requireSth from './utils/requirePrj';
import { InternalErrorHandler, InternalErrorType } from './types';
export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends GeneralAppLoader<ED, Cxt> {
protected dbStore: DbStore<ED, Cxt>;
private aspectDict: Record<string, Aspect<ED, Cxt>>;
private externalDependencies: string[];
protected dataSubscriber?: DataSubscriber<ED, Cxt>;
protected synchronizer?: Synchronizer<ED, Cxt>;
protected contextBuilder: (store: DbStore<ED, Cxt>) => Cxt;
private nsSocket?: Namespace;
private watcherTimerId?: NodeJS.Timeout;
private scheduledJobs: Record<string, Job> = {};
private internalErrorHandlers = new Array<InternalErrorHandler<ED, Cxt>>();
public regAllExceptionHandler() {
const handlers = this.requireSth('lib/configuration/exception') as Array<InternalErrorHandler<ED, Cxt>> | InternalErrorHandler<ED, Cxt>;
if (Array.isArray(handlers)) {
handlers.forEach(
(handler) => {
console.log(`注册内部错误处理器: ${handler.name}`);
this.registerInternalErrorHandler(handler);
}
);
} else {
console.warn('lib/configuration/exception必须默认导出一个处理器数组当前导出类型不正确将忽略此配置');
}
}
/**
* 注册一个内部错误处理器
* @param handler 内部错误处理器
*/
public registerInternalErrorHandler(handler: InternalErrorHandler<ED, Cxt>) {
// 检查有没有名称重复
if (this.internalErrorHandlers.find(h => h.name === handler.name)) {
throw new Error(`内部错误处理器名称重复: ${handler.name}`);
}
this.internalErrorHandlers.push(handler);
}
/**
* 发布内部错误事件给注册的处理器
*/
private async publishInternalError(type: InternalErrorType, message: string, err: any) {
const errorToPublish = cloneDeep(err);
await Promise.all(this.internalErrorHandlers.map(
(handler) => {
return new Promise<void>(async (resolve) => {
try {
const ctx = await this.makeContext();
console.log(`调用internalErrorHandler【${handler.name}】处理内部错误事件`);
handler.handle(ctx, type, message, errorToPublish);
} catch (e) {
console.error('执行internalErrorHandler时出错', e);
} finally {
resolve();
}
});
}
));
}
private requireSth(filePath: string): any {
return requireSth(this.path, filePath, this.externalDependencies);
}
protected async makeContext(cxtStr?: string, headers?: IncomingHttpHeaders) {
const context = this.contextBuilder(this.dbStore);
await context.begin();
try {
await context.initialize(cxtStr ? JSON.parse(cxtStr) : undefined);
}
catch (err) {
await context.rollback();
throw err;
}
context.headers = headers;
return context;
}
/**
* 后台启动的configuration统一放在这里读取
*/
private getConfiguration() {
const dbConfigFile = join(this.path, 'configuration', 'mysql.json');
const dbConfig = require(dbConfigFile);
const syncConfigFile = join(this.path, 'lib', 'configuration', 'sync.js');
const syncConfigs = existsSync(syncConfigFile) && require(syncConfigFile).default;
return {
dbConfig: dbConfig as MySQLConfiguration,
syncConfig: syncConfigs as SyncConfig<ED, Cxt> | undefined,
};
}
constructor(path: string, nsSubscribe?: Namespace, nsSocket?: Namespace, nsServer?: Namespace) {
super(path);
const { dbConfig } = this.getConfiguration();
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
const depGraph = analyzeDepedency(process.cwd());
this.externalDependencies = depGraph.ascOrder;
const { authDeduceRelationMap, selectFreeEntities, updateFreeDict } = this.requireSth('lib/configuration/relation')!;
this.aspectDict = Object.assign({}, generalAspectDict, this.requireSth('lib/aspects/index')!);
this.dbStore = new DbStore<ED, Cxt>(
storageSchema,
() => this.contextBuilder(this.dbStore),
dbConfig,
authDeduceRelationMap,
selectFreeEntities,
updateFreeDict,
);
if (nsSubscribe) {
this.dataSubscriber = new DataSubscriber(nsSubscribe, nsServer);
}
this.nsSocket = nsSocket;
const { BackendRuntimeContext } = require(`${path}/lib/context/BackendRuntimeContext`);
const loaderThis = this;
// 需要重载context上的构造和commit方法否则程序中执行context.restartToExecute这样的方法中new一个context出来是无法正确执行的
class BackendRuntimeContextWrapper extends BackendRuntimeContext {
constructor(store: DbStore<ED, Cxt>) {
super(store);
this.clusterInfo = getClusterInfo();
}
async commit() {
const { eventOperationMap, opRecords } = this;
await super.commit();
// 注入在提交后向dataSubscribe发送订阅的事件
if (loaderThis.dataSubscriber) {
Object.keys(eventOperationMap).forEach(
(event) => {
const ids = eventOperationMap[event];
const opRecordsToPublish = (opRecords as CreateOpResult<ED, keyof ED>[]).filter(
(ele) => !!ele.id && ids.includes(ele.id)
);
if (opRecordsToPublish.length !== ids.length && process.env.NODE_ENV === 'development') {
console.warn('要推送的事件的operation数量不足event事件中记录的数据请检查是否有空operation被加入了推送事件');
}
loaderThis.dataSubscriber!.publishEvent(event, opRecordsToPublish, this.getSubscriberId());
}
);
}
}
};
this.contextBuilder = (store) => new BackendRuntimeContextWrapper(store) as Cxt;
}
protected registerTrigger(trigger: Trigger<ED, keyof ED, Cxt>) {
this.dbStore.registerTrigger(trigger);
}
protected initTriggers() {
const triggers = this.requireSth('lib/triggers/index')!;
const checkers = this.requireSth('lib/checkers/index')!;
const { actionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
const attrUpdateMatrix = this.requireSth('lib/configuration/attrUpdateMatrix');
const { triggers: adTriggers, checkers: adCheckers } = makeIntrinsicLogics(this.dbStore.getSchema(), actionDefDict, attrUpdateMatrix);
triggers.forEach(
(trigger: Trigger<ED, keyof ED, Cxt>) => this.registerTrigger(trigger)
);
adTriggers.forEach(
(trigger) => this.registerTrigger(trigger)
);
checkers.forEach(
(checker: Checker<ED, keyof ED, Cxt>) => this.dbStore.registerChecker(checker)
);
adCheckers.forEach(
(checker) => this.dbStore.registerChecker(checker)
);
if (this.synchronizer) {
// 同步数据到远端结点通过commit trigger来完成
const syncTriggers = this.synchronizer.getSyncTriggers();
syncTriggers.forEach(
(trigger) => this.registerTrigger(trigger)
);
}
}
protected initSocket() {
// todo
const socketFilePath = join(this.path, 'lib/socket/index');
if (existsSync(socketFilePath)) {
const { registerSocketEntry } = require(socketFilePath);
assert(typeof registerSocketEntry === 'function');
}
}
async mount(initialize?: true) {
const { path } = this;
if (!initialize) {
const { syncConfig: syncConfig } = this.getConfiguration();
if (syncConfig) {
this.synchronizer = new Synchronizer(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore));
}
this.initTriggers();
this.initSocket();
}
const { importations, exportations } = require(`${path}/lib/ports/index`);
registerPorts(importations || [], exportations || []);
this.dbStore.connect();
}
async unmount() {
if (this.watcherTimerId) {
console.log('取消watcher...');
clearTimeout(this.watcherTimerId);
this.watcherTimerId = undefined
}
for (const job in this.scheduledJobs) {
console.log(`取消定时任务【${job}】...`);
await this.scheduledJobs[job]?.cancel();
delete this.scheduledJobs[job]
}
await clearPorts();
await this.dbStore.disconnect();
}
async execAspect(name: string, headers?: IncomingHttpHeaders, contextString?: string, params?: any): Promise<{
opRecords: OpRecord<ED>[];
result: any;
message?: string;
}> {
// 从aspect过来的不能有空cxtString以防被误判为root
const context = await this.makeContext(contextString || '{}', headers);
const fn = this.aspectDict[name];
try {
if (!fn) {
throw new Error(`不存在的接口名称: ${name}`);
}
const result = await fn(params, context);
await context.refineOpRecords();
const { opRecords } = context;
const message = context.getMessage();
await context.commit();
return {
opRecords,
message,
result,
};
}
catch (err) {
console.error(`执行aspect「${name}」出错`, err);
await context.rollback();
this.publishInternalError(`aspect`, `执行aspect「${name}」出错`, err)
if (err instanceof OakException) {
throw err;
}
if (err instanceof Error) {
const exception = await context.tryDeduceException(err);
if (exception) {
throw exception;
}
}
throw err;
}
}
async initialize(ifExists?: 'drop' | 'omit' | 'dropIfNotStatic') {
await this.dbStore.initialize({ ifExists });
const data = this.requireSth('lib/data/index')!;
// oak-domain中只有i18n
assert(data.i18n);
data.i18n.push(...domainI18nData);
const context = this.contextBuilder(this.dbStore);
context.openRootMode();
for (const entity in data) {
let rows = data[entity];
if (rows.length > 0) {
await context.begin();
// 如果是static的对象只要表中有数据就pass
const [first] = await this.dbStore.select(entity, {
data: {
id: 1,
},
indexFrom: 0,
count: 1,
}, context, {});
if (this.dbStore.getSchema()[entity].static) {
if (first) {
await context.commit();
console.log(`data in ${entity} omitted, ${rows.length} rows passed`);
continue;
}
}
// 再插入所有的行
try {
const insertRows = async (idx: number) => {
const rows2 = rows.slice(idx, 1000);
if (rows2.length > 0) {
await this.dbStore.operate(entity as keyof ED, {
data: rows,
action: 'create',
} as any, context, {
dontCollect: true,
dontCreateOper: true,
blockTrigger: true,
});
if (rows2.length === 1000) {
await insertRows(idx + 1000);
}
}
};
await insertRows(0);
await context.commit();
console.log(`data in ${entity} initialized, ${rows.length} rows inserted`);
}
catch (err) {
await context.rollback();
console.error(`data on ${entity} initilization failed!`);
throw err;
}
}
}
await this.dbStore.disconnect();
}
getStore(): DbStore<ED, Cxt> {
return this.dbStore;
}
getEndpoints(prefix: string) {
const endpoints: Record<string, Endpoint<ED, Cxt>> = this.requireSth('lib/endpoints/index');
const endPointRouters: Array<[EndpointItem<ED, Cxt>['name'], EndpointItem<ED, Cxt>['method'], string, (params: Record<string, string>, headers: IncomingHttpHeaders, req: IncomingMessage, body?: any) => Promise<{
headers?: Record<string, string | string[]>;
data: any;
statusCode?: number;
}>]> = [];
const endPointMap: Record<string, true> = {};
const transformEndpointItem = (key: string, item: EndpointItem<ED, Cxt>) => {
const { name, method, fn, params: itemParams, type } = item;
const k = `${key}-${name}-${method}`;
const makeEndpoint = async () => {
endPointMap[k] = true;
let url = `${prefix}/${key}`;
if (itemParams) {
for (const p of itemParams) {
url += `/:${p}`;
}
}
endPointRouters.push(
[name, method, url, async (params, headers, req, body) => {
if (type == "free") {
const result = await fn(() => this.makeContext(undefined, headers), params, headers, req, body);
return result;
} else {
const context = await this.makeContext(undefined, headers);
try {
const data = await fn(context, params, headers, req, body);
await context.commit();
return {
data,
};
}
catch (err) {
await context.rollback();
console.error(`endpoint「${key}」方法「${method}」出错`, err);
throw err;
}
}
}]
);
}
if (endPointMap[k]) {
if (process.env.NODE_ENV === 'development') {
// 这里发现在热重载模式下会出现报错debug跟到requireSth发现问题怀疑是node的require机制导致的先加个容错在其他环境肯定不会出现
console.warn(`endpoint中url为「${key}」、名为${name}的方法「${method}」存在重复定义,将进行覆盖`);
makeEndpoint();
return;
}
throw new Error(`endpoint中url为「${key}」、名为${name}的方法「${method}」存在重复定义`);
}
makeEndpoint();
};
if (endpoints) {
for (const router in endpoints) {
const item = endpoints[router];
if (item instanceof Array) {
item.forEach(
ele => transformEndpointItem(router, ele)
);
}
else {
transformEndpointItem(router, item);
}
}
}
if (this.synchronizer) {
const syncEp = this.synchronizer.getSelfEndpoint();
transformEndpointItem(syncEp.name, syncEp);
}
return endPointRouters;
}
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt, singleton?: true) {
return this.dbStore.operate(entity, operation, context, {
});
}
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true) {
return this.dbStore.select(entity, selection, context, {
blockTrigger: true,
forUpdate,
})
}
protected async execWatcher(watcher: Watcher<ED, keyof ED, Cxt>) {
const context = await this.makeContext();
let result: OperationResult<ED> | undefined;
try {
if (watcher.hasOwnProperty('actionData')) {
const { entity, action, filter, actionData, singleton } = <BBWatcher<ED, keyof ED>>watcher;
const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter);
const data = typeof actionData === 'function' ? await (actionData)() : cloneDeep(actionData);
result = await this.operateInWatcher(entity, {
id: await generateNewIdAsync(),
action: action as string,
data,
filter: filter2,
}, context, singleton);
}
else {
const { entity, projection, fn, filter, singleton, forUpdate } = <WBWatcher<ED, keyof ED, Cxt>>watcher;
const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter);
const projection2 = typeof projection === 'function' ? await projection() : cloneDeep(projection);
const rows = await this.selectInWatcher(entity, {
data: projection2,
filter: filter2,
}, context, forUpdate, singleton);
if (rows.length > 0) {
result = await fn(context, rows);
}
}
await context.commit();
return result;
}
catch (err) {
if (err instanceof OakPartialSuccess) {
await context.commit();
}
else {
await context.rollback();
}
// 不能在这里publish因为这个方法可能是在timer中调用也可能是在routine中调用
throw err;
}
}
protected getCheckpointTs() {
const now = Date.now();
return process.env.NODE_ENV === 'development' ? now - 30 * 1000 : now - 120 * 1000;
}
protected checkpoint() {
return this.dbStore.checkpoint(this.getCheckpointTs());
}
startWatchers() {
const watchers = this.requireSth('lib/watchers/index');
const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
const { watchers: adWatchers } = makeIntrinsicLogics(this.dbStore.getSchema(), ActionDefDict);
const totalWatchers = (<Watcher<ED, keyof ED, Cxt>[]>watchers || []).concat(adWatchers);
let count = 0;
const execOne = async (watcher: Watcher<ED, keyof ED, Cxt>, start: number) => {
try {
const result = await this.execWatcher(watcher);
if (result) {
console.log(`执行watcher【${watcher.name}】成功,耗时【${Date.now() - start}】,结果是:`, result);
}
}
catch (err) {
console.error(`执行watcher【${watcher.name}】失败,耗时【${Date.now() - start}】,结果是:`, err);
await this.publishInternalError(`watcher`, `执行watcher【${watcher.name}】失败`, err);
}
};
const doWatchers = async () => {
count++;
const start = Date.now();
for (const w of totalWatchers) {
/**
* todo 原来这里是所有的watcher并行会产生死锁先改成串行后续再优化
*/
await execOne(w, start);
}
const duration = Date.now() - start;
console.log(`${count}次执行watchers共执行${watchers.length}个,耗时${duration}毫秒`);
try {
await this.checkpoint();
}
catch (err) {
console.error(`执行了checkpoint发生错误`, err);
await this.publishInternalError(`checkpoint`, `执行checkpoint发生错误`, err);
}
this.watcherTimerId = setTimeout(() => doWatchers(), 120000);
};
doWatchers();
}
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined {
const { timer: timerFn } = timer as FreeTimer<ED, Cxt>;
return timerFn(context);
}
startTimers() {
const timers: Timer<ED, keyof ED, Cxt>[] = this.requireSth('lib/timers/index');
if (timers) {
for (const timer of timers) {
const { cron, name } = timer;
const job = scheduleJob(name, cron, async (date) => {
const start = Date.now();
console.log(`定时器【${name}】开始执行,时间是【${date.toLocaleTimeString()}`);
if (timer.hasOwnProperty('entity')) {
try {
const result = await this.execWatcher(timer as Watcher<ED, keyof ED, Cxt>);
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒】,结果是`, result);
}
catch (err) {
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒】,错误是`, err);
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
}
}
else {
const context = await this.makeContext();
try {
const result = await this.execFreeTimer(timer as FreeTimer<ED, Cxt>, context);
if (result) {
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是【${result}`);
}
await context.commit();
}
catch (err) {
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
if (err instanceof OakPartialSuccess) {
await context.commit();
}
else {
await context.rollback();
}
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
}
}
})
if (!job) {
// console.error(`定时器【${name}】创建失败请检查cron表达式是否正确`);
throw new Error(`定时器【${name}】创建失败请检查cron表达式是否正确`);
}
if (this.scheduledJobs[name]) {
// console.error(`定时器【${name}】已经存在,请检查定时器名称是否重复`);
throw new Error(`定时器【${name}】已经存在,请检查定时器名称是否重复`);
}
this.scheduledJobs[name] = job;
}
}
}
async execStartRoutines() {
const routines: Routine<ED, keyof ED, Cxt>[] = this.requireSth('lib/routines/start') || [];
for (const routine of routines) {
if (routine.hasOwnProperty('entity')) {
const start = Date.now();
try {
const result = await this.execWatcher(routine as Watcher<ED, keyof ED, Cxt>);
console.log(`例程【${routine.name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
}
catch (err) {
console.error(`例程【${routine.name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
throw err;
}
}
else {
const { name, routine: routineFn } = routine as FreeRoutine<ED, Cxt>;
const context = await this.makeContext();
const start = Date.now();
try {
const result = await routineFn(context, {
socket: this.nsSocket!,
});
console.log(`例程【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是【${result}`);
await context.commit();
}
catch (err) {
console.error(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
await context.rollback();
throw err;
}
}
}
}
async execStopRoutines() {
const routines: Routine<ED, keyof ED, Cxt>[] = this.requireSth('lib/routines/stop') || [];
for (const routine of routines) {
if (routine.hasOwnProperty('entity')) {
const start = Date.now();
try {
const result = await this.execWatcher(routine as Watcher<ED, keyof ED, Cxt>);
console.log(`例程【${routine.name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
}
catch (err) {
console.error(`例程【${routine.name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
throw err;
}
}
else {
const { name, routine: routineFn } = routine as FreeRoutine<ED, Cxt>;
const context = await this.makeContext();
const start = Date.now();
try {
const result = await routineFn(context, {
socket: this.nsSocket!,
});
console.log(`例程【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是【${result}`);
await context.commit();
}
catch (err) {
console.error(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
await context.rollback();
throw err;
}
}
}
}
async execRoutine(routine: <Cxt extends AsyncContext<ED>>(context: Cxt) => Promise<void>) {
const context = await this.makeContext();
try {
const result = await routine(context);
await context.commit();
return result;
}
catch (e: any) {
await context.rollback();
throw e;
}
}
}