clusterAppLoader

This commit is contained in:
Xu Chang 2023-12-13 11:46:31 +08:00
parent d331802483
commit 296418e796
12 changed files with 444 additions and 201 deletions

18
lib/AppLoader.d.ts vendored
View File

@ -1,24 +1,24 @@
/// <reference types="node" />
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AppLoader as GeneralAppLoader, EntityDict, OpRecord } from "oak-domain/lib/types";
import { AppLoader as GeneralAppLoader, Trigger, EntityDict, Watcher, OpRecord } from "oak-domain/lib/types";
import { DbStore } from "./DbStore";
import { BackendRuntimeContext } from 'oak-frontend-base';
import { IncomingHttpHeaders, IncomingMessage } from 'http';
import { Namespace } from 'socket.io';
import { ClusterInfo } from 'oak-domain/lib/types/Cluster';
export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends GeneralAppLoader<ED, Cxt> {
private dbStore;
protected dbStore: DbStore<ED, Cxt>;
private aspectDict;
private externalDependencies;
private dataSubscriber?;
private contextBuilder;
protected contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>;
private requireSth;
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>, header?: IncomingHttpHeaders, clusterInfo?: ClusterInfo) => Promise<Cxt>, ns?: Namespace);
private makeContext;
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace);
protected registerTrigger(trigger: Trigger<ED, keyof ED, Cxt>): void;
initTriggers(): void;
startWatchers(): void;
mount(initialize?: true): Promise<void>;
unmount(): Promise<void>;
execAspect(name: string, header?: IncomingHttpHeaders, contextString?: string, params?: any): Promise<{
execAspect(name: string, headers?: IncomingHttpHeaders, contextString?: string, params?: any): Promise<{
opRecords: OpRecord<ED>[];
result: any;
message?: string;
@ -26,6 +26,10 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
initialize(dropIfExists?: boolean): Promise<void>;
getStore(): DbStore<ED, Cxt>;
getEndpoints(prefix: string): [string, "get" | "post" | "put" | "delete", string, (params: Record<string, string>, headers: IncomingHttpHeaders, req: IncomingMessage, body?: any) => Promise<any>][];
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt): Promise<import("oak-domain/lib/types").OperationResult<ED>>;
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt): Promise<Partial<ED[T]["Schema"]>[]>;
protected execWatcher(watcher: Watcher<ED, keyof ED, Cxt>): Promise<void>;
startWatchers(): void;
startTimers(): void;
execStartRoutines(): Promise<void>;
execRoutine(routine: (context: Cxt) => Promise<void>): Promise<void>;

View File

@ -86,6 +86,12 @@ class AppLoader extends types_1.AppLoader {
Object.assign(sthOut, sth);
return sthOut;
}
async makeContext(cxtStr, headers) {
const context = await this.contextBuilder(cxtStr)(this.dbStore);
context.clusterInfo = (0, env_2.getClusterInfo)();
context.headers = headers;
return context;
}
constructor(path, contextBuilder, ns) {
super(path);
const dbConfig = require((0, path_1.join)(path, '/configuration/mysql.json'));
@ -96,8 +102,8 @@ class AppLoader extends types_1.AppLoader {
this.dbStore = new DbStore_1.DbStore(storageSchema, contextBuilder, dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
if (ns) {
this.dataSubscriber = new DataSubscriber_1.default(ns, (scene) => this.contextBuilder(scene)(this.dbStore));
this.contextBuilder = (scene) => async (store, header, clusterInfo) => {
const context = await contextBuilder(scene)(store, header, clusterInfo);
this.contextBuilder = (scene) => async (store) => {
const context = await contextBuilder(scene)(store);
// 注入在提交前向dataSubscribe
const originCommit = context.commit;
context.commit = async () => {
@ -117,79 +123,19 @@ class AppLoader extends types_1.AppLoader {
this.contextBuilder = contextBuilder;
}
}
registerTrigger(trigger) {
this.dbStore.registerTrigger(trigger);
}
initTriggers() {
const triggers = this.requireSth('lib/triggers/index');
const checkers = this.requireSth('lib/checkers/index');
const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
const { triggers: adTriggers, checkers: adCheckers } = (0, actionDef_1.makeIntrinsicCTWs)(this.dbStore.getSchema(), ActionDefDict);
triggers.forEach((trigger) => this.dbStore.registerTrigger(trigger));
adTriggers.forEach((trigger) => this.dbStore.registerTrigger(trigger));
triggers.forEach((trigger) => this.registerTrigger(trigger));
adTriggers.forEach((trigger) => this.registerTrigger(trigger));
checkers.forEach((checker) => this.dbStore.registerChecker(checker));
adCheckers.forEach((checker) => this.dbStore.registerChecker(checker));
}
startWatchers() {
const watchers = this.requireSth('lib/watchers/index');
const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
const { watchers: adWatchers } = (0, actionDef_1.makeIntrinsicCTWs)(this.dbStore.getSchema(), ActionDefDict);
const totalWatchers = watchers.concat(adWatchers);
let count = 0;
const doWatchers = async () => {
count++;
const start = Date.now();
const context = await this.contextBuilder()(this.dbStore, undefined, (0, env_2.getClusterInfo)());
for (const w of totalWatchers) {
await context.begin();
try {
if (w.hasOwnProperty('actionData')) {
const { entity, action, filter, actionData } = w;
const filter2 = typeof filter === 'function' ? await filter() : filter;
const data = typeof actionData === 'function' ? await actionData() : actionData; // 这里有个奇怪的编译错误,不理解 by Xc
const result = await this.dbStore.operate(entity, {
id: await (0, uuid_1.generateNewIdAsync)(),
action,
data,
filter: filter2
}, context, {
dontCollect: true,
});
console.log(`执行了watcher【${w.name}】,结果是:`, result);
}
else {
const { entity, projection, fn, filter } = w;
const filter2 = typeof filter === 'function' ? await filter() : filter;
const projection2 = typeof projection === 'function' ? await projection() : projection;
const rows = await this.dbStore.select(entity, {
data: projection2,
filter: filter2,
}, context, {
dontCollect: true,
blockTrigger: true,
});
if (rows.length > 0) {
const result = await fn(context, rows);
console.log(`执行了watcher【${w.name}】,结果是:`, result);
}
}
await context.commit();
}
catch (err) {
await context.rollback();
console.error(`执行了watcher【${w.name}】,发生错误:`, err);
}
}
const duration = Date.now() - start;
console.log(`${count}次执行watchers共执行${watchers.length}个,耗时${duration}毫秒`);
const now = Date.now();
try {
await this.dbStore.checkpoint(process.env.NODE_ENV === 'development' ? now - 30 * 1000 : now - 120 * 1000);
}
catch (err) {
console.error(`执行了checkpoint发生错误`, err);
}
setTimeout(() => doWatchers(), 120000);
};
doWatchers();
}
async mount(initialize) {
const { path } = this;
if (!initialize) {
@ -203,8 +149,8 @@ class AppLoader extends types_1.AppLoader {
(0, index_1.clearPorts)();
this.dbStore.disconnect();
}
async execAspect(name, header, contextString, params) {
const context = await this.contextBuilder(contextString)(this.dbStore, header, (0, env_2.getClusterInfo)());
async execAspect(name, headers, contextString, params) {
const context = await this.makeContext(contextString, headers);
const fn = this.aspectDict[name];
if (!fn) {
throw new Error(`不存在的接口名称: ${name}`);
@ -278,7 +224,7 @@ class AppLoader extends types_1.AppLoader {
}
}
endPointRouters.push([name, method, url, async (params, headers, req, body) => {
const context = await this.contextBuilder()(this.dbStore, headers, (0, env_2.getClusterInfo)());
const context = await this.makeContext(undefined, headers);
await context.begin();
try {
const result = await fn(context, params, headers, req, body);
@ -303,18 +249,96 @@ class AppLoader extends types_1.AppLoader {
}
return endPointRouters;
}
operateInWatcher(entity, operation, context) {
return this.dbStore.operate(entity, operation, context, {
dontCollect: true,
});
}
selectInWatcher(entity, selection, context) {
return this.dbStore.select(entity, selection, context, {
dontCollect: true,
blockTrigger: true,
});
}
async execWatcher(watcher) {
const context = await this.makeContext();
await context.begin();
try {
if (watcher.hasOwnProperty('actionData')) {
const { entity, action, filter, actionData } = watcher;
const filter2 = typeof filter === 'function' ? await filter() : filter;
const data = typeof actionData === 'function' ? await (actionData)() : actionData;
const result = await this.operateInWatcher(entity, {
id: await (0, uuid_1.generateNewIdAsync)(),
action,
data,
filter: filter2
}, context);
console.log(`执行了watcher【${watcher.name}】,结果是:`, result);
}
else {
const { entity, projection, fn, filter } = watcher;
const filter2 = typeof filter === 'function' ? await filter() : filter;
const projection2 = typeof projection === 'function' ? await projection() : projection;
const rows = await this.selectInWatcher(entity, {
data: projection2,
filter: filter2,
}, context);
if (rows.length > 0) {
const result = await fn(context, rows);
console.log(`执行了watcher【${watcher.name}】,结果是:`, result);
}
}
await context.commit();
}
catch (err) {
await context.rollback();
console.error(`执行了watcher【${watcher.name}】,发生错误:`, err);
}
}
startWatchers() {
const watchers = this.requireSth('lib/watchers/index');
const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
const { watchers: adWatchers } = (0, actionDef_1.makeIntrinsicCTWs)(this.dbStore.getSchema(), ActionDefDict);
const totalWatchers = watchers.concat(adWatchers);
let count = 0;
const doWatchers = async () => {
count++;
const start = Date.now();
for (const w of totalWatchers) {
await this.execWatcher(w);
}
const duration = Date.now() - start;
console.log(`${count}次执行watchers共执行${watchers.length}个,耗时${duration}毫秒`);
const now = Date.now();
try {
await this.dbStore.checkpoint(process.env.NODE_ENV === 'development' ? now - 30 * 1000 : now - 120 * 1000);
}
catch (err) {
console.error(`执行了checkpoint发生错误`, err);
}
setTimeout(() => doWatchers(), 120000);
};
doWatchers();
}
startTimers() {
const timers = this.requireSth('lib/timers/index');
for (const timer of timers) {
const { cron, fn, name } = timer;
const { cron, name } = timer;
(0, node_schedule_1.scheduleJob)(name, cron, async (date) => {
const start = Date.now();
const context = await this.contextBuilder()(this.dbStore, undefined, (0, env_2.getClusterInfo)());
const context = await this.makeContext();
await context.begin();
console.log(`定时器【${name}】开始执行,时间是【${date.toLocaleTimeString()}`);
try {
const result = await fn(context);
console.log(`定时器【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}`);
if (timer.hasOwnProperty('entity')) {
await this.execWatcher(timer);
}
else {
const { timer: timerFn } = timer;
const result = await timerFn(context);
console.log(`定时器【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}`);
}
await context.commit();
}
catch (err) {
@ -327,23 +351,28 @@ class AppLoader extends types_1.AppLoader {
async execStartRoutines() {
const routines = this.requireSth('lib/routines/start');
for (const routine of routines) {
const { name, fn } = routine;
const context = await this.contextBuilder()(this.dbStore, undefined, (0, env_2.getClusterInfo)());
const start = Date.now();
await context.begin();
try {
const result = await fn(context);
console.log(`例程【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}`);
await context.commit();
if (routine.hasOwnProperty('entity')) {
this.execWatcher(routine);
}
catch (err) {
console.warn(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
await context.rollback();
else {
const { name, routine: routineFn } = routine;
const context = await this.makeContext();
const start = Date.now();
await context.begin();
try {
const result = await routineFn(context);
console.log(`例程【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}`);
await context.commit();
}
catch (err) {
console.warn(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
await context.rollback();
}
}
}
}
async execRoutine(routine) {
const context = await this.contextBuilder()(this.dbStore, undefined, (0, env_2.getClusterInfo)());
const context = await this.makeContext();
await routine(context);
}
}

11
lib/ClusterAppLoader.d.ts vendored Normal file
View File

@ -0,0 +1,11 @@
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { EntityDict, OperationResult } from 'oak-domain/lib/types';
import { BackendRuntimeContext } from 'oak-frontend-base';
import { AppLoader } from './AppLoader';
import { DbStore } from './DbStore';
import { Namespace } from 'socket.io';
export declare class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends AppLoader<ED, Cxt> {
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace);
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt): Promise<OperationResult<ED>>;
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt): Promise<Partial<ED[T]['Schema']>[]>;
}

59
lib/ClusterAppLoader.js Normal file
View File

@ -0,0 +1,59 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.ClusterAppLoader = void 0;
const tslib_1 = require("tslib");
const filter_1 = require("oak-domain/lib/store/filter");
const env_1 = require("./cluster/env");
const AppLoader_1 = require("./AppLoader");
const assert_1 = tslib_1.__importDefault(require("assert"));
class ClusterAppLoader extends AppLoader_1.AppLoader {
constructor(path, contextBuilder, ns) {
super(path, contextBuilder, ns);
this.dbStore.setOnVolatileTrigger(async (entity, trigger, ids, cxtStr, option) => {
if (trigger.cs) {
// 如果是cluster sensative的触发器需要发送到相应的instance上被处理
}
else {
const context = await this.contextBuilder(cxtStr)(this.dbStore);
await context.begin();
try {
await this.dbStore.execVolatileTrigger(entity, trigger.name, ids, context, option);
await context.commit();
}
catch (err) {
await context.rollback();
console.error('execVolatileTrigger异常', entity, trigger.name, ids, option, err);
}
}
});
}
operateInWatcher(entity, operation, context) {
const { instanceCount, instanceId } = (0, env_1.getClusterInfo)();
(0, assert_1.default)(instanceCount && typeof instanceId === 'number');
const { filter } = operation;
const filter2 = (0, filter_1.combineFilters)(entity, this.dbStore.getSchema(), [filter, {
$$seq$$: {
$mod: [instanceCount, instanceId]
}
}]);
return super.operateInWatcher(entity, {
...operation,
filter: filter2,
}, context);
}
selectInWatcher(entity, selection, context) {
const { instanceCount, instanceId } = (0, env_1.getClusterInfo)();
(0, assert_1.default)(instanceCount && typeof instanceId === 'number');
const { filter } = selection;
const filter2 = (0, filter_1.combineFilters)(entity, this.dbStore.getSchema(), [filter, {
$$seq$$: {
$mod: [instanceCount, instanceId]
}
}]);
return super.selectInWatcher(entity, {
...selection,
filter: filter2,
}, context);
}
}
exports.ClusterAppLoader = ClusterAppLoader;

6
lib/DbStore.d.ts vendored
View File

@ -1,5 +1,5 @@
import { MysqlStore, MySqlSelectOption, MysqlOperateOption } from 'oak-db';
import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap } from 'oak-domain/lib/types';
import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { MySQLConfiguration } from 'oak-db/lib/MySQL/types/Configuration';
import { BackendRuntimeContext } from 'oak-frontend-base';
@ -7,12 +7,14 @@ import { AsyncContext, AsyncRowStore } from 'oak-domain/lib/store/AsyncRowStore'
export declare class DbStore<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends MysqlStore<ED, Cxt> implements AsyncRowStore<ED, Cxt> {
private executor;
private relationAuth;
constructor(storageSchema: StorageSchema<ED>, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, mysqlConfiguration: MySQLConfiguration, authDeduceRelationMap: AuthDeduceRelationMap<ED>, selectFreeEntities?: SelectFreeEntities<ED>, updateFreeDict?: UpdateFreeDict<ED>);
constructor(storageSchema: StorageSchema<ED>, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, mysqlConfiguration: MySQLConfiguration, authDeduceRelationMap: AuthDeduceRelationMap<ED>, selectFreeEntities?: SelectFreeEntities<ED>, updateFreeDict?: UpdateFreeDict<ED>, onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>);
protected cascadeUpdateAsync<T extends keyof ED>(entity: T, operation: ED[T]['Operation'], context: AsyncContext<ED>, option: MysqlOperateOption): Promise<import("oak-domain/lib/types").OperationResult<ED>>;
operate<T extends keyof ED>(entity: T, operation: ED[T]['Operation'], context: Cxt, option: MysqlOperateOption): Promise<import("oak-domain/lib/types").OperationResult<ED>>;
select<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, option: MySqlSelectOption): Promise<Partial<ED[T]["Schema"]>[]>;
count<T extends keyof ED>(entity: T, selection: Pick<ED[T]['Selection'], 'filter' | 'count'>, context: Cxt, option: SelectOption): Promise<number>;
registerTrigger<T extends keyof ED>(trigger: Trigger<ED, T, Cxt>): void;
registerChecker<T extends keyof ED>(checker: Checker<ED, T, Cxt>): void;
setOnVolatileTrigger(onVolatileTrigger: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>): void;
execVolatileTrigger<T extends keyof ED>(entity: T, name: string, ids: string[], context: Cxt, option: OperateOption): Promise<void>;
checkpoint(ts: number): Promise<number>;
}

View File

@ -7,9 +7,9 @@ const RelationAuth_1 = require("oak-domain/lib/store/RelationAuth");
class DbStore extends oak_db_1.MysqlStore {
executor;
relationAuth;
constructor(storageSchema, contextBuilder, mysqlConfiguration, authDeduceRelationMap, selectFreeEntities = [], updateFreeDict = {}) {
constructor(storageSchema, contextBuilder, mysqlConfiguration, authDeduceRelationMap, selectFreeEntities = [], updateFreeDict = {}, onVolatileTrigger) {
super(storageSchema, mysqlConfiguration);
this.executor = new TriggerExecutor_1.TriggerExecutor((scene) => contextBuilder(scene)(this));
this.executor = new TriggerExecutor_1.TriggerExecutor((scene) => contextBuilder(scene)(this), undefined, onVolatileTrigger);
this.relationAuth = new RelationAuth_1.RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
}
async cascadeUpdateAsync(entity, operation, context, option) {
@ -109,6 +109,12 @@ class DbStore extends oak_db_1.MysqlStore {
registerChecker(checker) {
this.executor.registerChecker(checker);
}
setOnVolatileTrigger(onVolatileTrigger) {
this.executor.setOnVolatileTrigger(onVolatileTrigger);
}
async execVolatileTrigger(entity, name, ids, context, option) {
return this.executor.execVolatileTrigger(entity, name, ids, context, option);
}
checkpoint(ts) {
return this.executor.checkpoint(ts);
}

1
lib/index.d.ts vendored
View File

@ -1,2 +1,3 @@
export { AppLoader } from './AppLoader';
export { ClusterAppLoader } from './ClusterAppLoader';
export * from './cluster/env';

View File

@ -1,7 +1,9 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.AppLoader = void 0;
exports.ClusterAppLoader = exports.AppLoader = void 0;
const tslib_1 = require("tslib");
var AppLoader_1 = require("./AppLoader");
Object.defineProperty(exports, "AppLoader", { enumerable: true, get: function () { return AppLoader_1.AppLoader; } });
var ClusterAppLoader_1 = require("./ClusterAppLoader");
Object.defineProperty(exports, "ClusterAppLoader", { enumerable: true, get: function () { return ClusterAppLoader_1.ClusterAppLoader; } });
tslib_1.__exportStar(require("./cluster/env"), exports);

View File

@ -6,7 +6,7 @@ import { makeIntrinsicCTWs } from "oak-domain/lib/store/actionDef";
import { intersection, omit } from 'oak-domain/lib/utils/lodash';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { generateNewIdAsync } from 'oak-domain/lib/utils/uuid';
import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, CreateOpResult, Context, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord } from "oak-domain/lib/types";
import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, CreateOpResult, Context, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord, Routine, FreeRoutine, Timer, FreeTimer, StorageSchema } from "oak-domain/lib/types";
import { DbStore } from "./DbStore";
import generalAspectDict, { clearPorts, registerPorts } from 'oak-common-aspect/lib/index';
import { MySQLConfiguration } from 'oak-db/lib/MySQL/types/Configuration';
@ -17,16 +17,15 @@ import { IncomingHttpHeaders, IncomingMessage } from 'http';
import { Server as SocketIoServer, Namespace } from 'socket.io';
import DataSubscriber from './cluster/DataSubscriber';
import { ClusterInfo } from 'oak-domain/lib/types/Cluster';
import { getClusterInfo } from './cluster/env';
export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends GeneralAppLoader<ED, Cxt> {
private dbStore: DbStore<ED, Cxt>;
protected dbStore: DbStore<ED, Cxt>;
private aspectDict: Record<string, Aspect<ED, Cxt>>;
private externalDependencies: string[];
private dataSubscriber?: DataSubscriber<ED, Cxt>;
private contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>, header?: IncomingHttpHeaders, clusterInfo?: ClusterInfo) => Promise<Cxt>;
protected contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>;
private requireSth(filePath: string): any {
const depFilePath = join(this.path, filePath);
@ -110,7 +109,15 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
return sthOut;
}
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>, header?: IncomingHttpHeaders, clusterInfo?: ClusterInfo) => Promise<Cxt>, ns?: Namespace) {
private async makeContext(cxtStr?: string, headers?: IncomingHttpHeaders) {
const context = await this.contextBuilder(cxtStr)(this.dbStore);
context.clusterInfo = getClusterInfo();
context.headers = headers;
return context;
}
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace) {
super(path);
const dbConfig: MySQLConfiguration = require(join(path, '/configuration/mysql.json'));
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
@ -120,8 +127,8 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
this.dbStore = new DbStore<ED, Cxt>(storageSchema, contextBuilder, dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
if (ns) {
this.dataSubscriber = new DataSubscriber(ns, (scene) => this.contextBuilder(scene)(this.dbStore));
this.contextBuilder = (scene) => async (store, header, clusterInfo) => {
const context = await contextBuilder(scene)(store, header, clusterInfo);
this.contextBuilder = (scene) => async (store) => {
const context = await contextBuilder(scene)(store);
// 注入在提交前向dataSubscribe
const originCommit = context.commit;
@ -150,96 +157,34 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
}
}
protected registerTrigger(trigger: Trigger<ED, keyof ED, Cxt>) {
this.dbStore.registerTrigger(trigger);
}
initTriggers() {
const triggers = this.requireSth('lib/triggers/index');
const checkers = this.requireSth('lib/checkers/index');
const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
const { triggers: adTriggers, checkers: adCheckers } = makeIntrinsicCTWs(this.dbStore.getSchema(), ActionDefDict);
triggers.forEach(
(trigger: Trigger<ED, keyof ED, Cxt>) => this.dbStore.registerTrigger(trigger)
(trigger: Trigger<ED, keyof ED, Cxt>) => this.registerTrigger(trigger)
);
adTriggers.forEach(
(trigger) => this.dbStore.registerTrigger(trigger)
(trigger) => this.registerTrigger(trigger)
);
checkers.forEach(
(checker: Checker<ED, keyof ED, Cxt>) => this.dbStore.registerChecker(checker)
);
adCheckers.forEach(
(checker) => this.dbStore.registerChecker(checker)
);
}
startWatchers() {
const watchers = this.requireSth('lib/watchers/index');
const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
const { watchers: adWatchers } = makeIntrinsicCTWs(this.dbStore.getSchema(), ActionDefDict);
const totalWatchers = (<Watcher<ED, keyof ED, Cxt>[]>watchers).concat(adWatchers);
let count = 0;
const doWatchers = async () => {
count++;
const start = Date.now();
const context = await this.contextBuilder()(this.dbStore, undefined, getClusterInfo());
for (const w of totalWatchers) {
await context.begin();
try {
if (w.hasOwnProperty('actionData')) {
const { entity, action, filter, actionData } = <BBWatcher<ED, keyof ED>>w;
const filter2 = typeof filter === 'function' ? await filter() : filter;
const data = typeof actionData === 'function' ? await (actionData as any)() : actionData; // 这里有个奇怪的编译错误,不理解 by Xc
const result = await this.dbStore.operate(entity, {
id: await generateNewIdAsync(),
action,
data,
filter: filter2
}, context, {
dontCollect: true,
});
console.log(`执行了watcher【${w.name}】,结果是:`, result);
}
else {
const { entity, projection, fn, filter } = <WBWatcher<ED, keyof ED, Cxt>>w;
const filter2 = typeof filter === 'function' ? await filter() : filter;
const projection2 = typeof projection === 'function' ? await (projection as Function)() : projection;
const rows = await this.dbStore.select(entity, {
data: projection2 as any,
filter: filter2,
}, context, {
dontCollect: true,
blockTrigger: true,
});
if (rows.length > 0) {
const result = await fn(context, rows);
console.log(`执行了watcher【${w.name}】,结果是:`, result);
}
}
await context.commit();
}
catch (err) {
await context.rollback();
console.error(`执行了watcher【${w.name}】,发生错误:`, err);
}
}
const duration = Date.now() - start;
console.log(`${count}次执行watchers共执行${watchers.length}个,耗时${duration}毫秒`);
const now = Date.now();
try {
await this.dbStore.checkpoint(process.env.NODE_ENV === 'development' ? now - 30 * 1000 : now - 120 * 1000);
}
catch (err) {
console.error(`执行了checkpoint发生错误`, err);
}
setTimeout(() => doWatchers(), 120000);
};
doWatchers();
}
async mount(initialize?: true) {
const { path } = this;
if (!initialize) {
@ -255,12 +200,13 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
this.dbStore.disconnect();
}
async execAspect(name: string, header?: IncomingHttpHeaders, contextString?: string, params?: any): Promise<{
async execAspect(name: string, headers?: IncomingHttpHeaders, contextString?: string, params?: any): Promise<{
opRecords: OpRecord<ED>[];
result: any;
message?: string;
}> {
const context = await this.contextBuilder(contextString)(this.dbStore, header, getClusterInfo());
const context = await this.makeContext(contextString, headers);
const fn = this.aspectDict[name];
if (!fn) {
throw new Error(`不存在的接口名称: ${name}`);
@ -340,7 +286,8 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
}
endPointRouters.push(
[name, method, url, async (params, headers, req, body) => {
const context = await this.contextBuilder()(this.dbStore, headers, getClusterInfo());
const context = await this.makeContext(undefined, headers);
await context.begin();
try {
const result = await fn(context, params, headers, req, body);
@ -369,18 +316,106 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
return endPointRouters;
}
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt) {
return this.dbStore.operate(entity, operation, context, {
dontCollect: true,
});
}
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt) {
return this.dbStore.select(entity, selection, context, {
dontCollect: true,
blockTrigger: true,
})
}
protected async execWatcher(watcher: Watcher<ED, keyof ED, Cxt>) {
const context = await this.makeContext();
await context.begin();
try {
if (watcher.hasOwnProperty('actionData')) {
const { entity, action, filter, actionData } = <BBWatcher<ED, keyof ED>>watcher;
const filter2 = typeof filter === 'function' ? await filter() : filter;
const data = typeof actionData === 'function' ? await (actionData)() : actionData;
const result = await this.operateInWatcher(entity, {
id: await generateNewIdAsync(),
action,
data,
filter: filter2
}, context);
console.log(`执行了watcher【${watcher.name}】,结果是:`, result);
}
else {
const { entity, projection, fn, filter } = <WBWatcher<ED, keyof ED, Cxt>>watcher;
const filter2 = typeof filter === 'function' ? await filter() : filter;
const projection2 = typeof projection === 'function' ? await projection () : projection;
const rows = await this.selectInWatcher(entity, {
data: projection2,
filter: filter2,
}, context);
if (rows.length > 0) {
const result = await fn(context, rows);
console.log(`执行了watcher【${watcher.name}】,结果是:`, result);
}
}
await context.commit();
}
catch (err) {
await context.rollback();
console.error(`执行了watcher【${watcher.name}】,发生错误:`, err);
}
}
startWatchers() {
const watchers = this.requireSth('lib/watchers/index');
const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
const { watchers: adWatchers } = makeIntrinsicCTWs(this.dbStore.getSchema(), ActionDefDict);
const totalWatchers = (<Watcher<ED, keyof ED, Cxt>[]>watchers).concat(adWatchers);
let count = 0;
const doWatchers = async () => {
count++;
const start = Date.now();
for (const w of totalWatchers) {
await this.execWatcher(w);
}
const duration = Date.now() - start;
console.log(`${count}次执行watchers共执行${watchers.length}个,耗时${duration}毫秒`);
const now = Date.now();
try {
await this.dbStore.checkpoint(process.env.NODE_ENV === 'development' ? now - 30 * 1000 : now - 120 * 1000);
}
catch (err) {
console.error(`执行了checkpoint发生错误`, err);
}
setTimeout(() => doWatchers(), 120000);
};
doWatchers();
}
startTimers() {
const timers = this.requireSth('lib/timers/index');
const timers: Timer<ED, keyof ED, Cxt>[] = this.requireSth('lib/timers/index');
for (const timer of timers) {
const { cron, fn, name } = timer;
const { cron, name } = timer;
scheduleJob(name, cron, async (date) => {
const start = Date.now();
const context = await this.contextBuilder()(this.dbStore, undefined, getClusterInfo());
const context = await this.makeContext();
await context.begin();
console.log(`定时器【${name}】开始执行,时间是【${date.toLocaleTimeString()}`);
try {
const result = await fn(context);
console.log(`定时器【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}`);
if (timer.hasOwnProperty('entity')) {
await this.execWatcher(timer as Watcher<ED, keyof ED, Cxt>);
}
else {
const { timer: timerFn } = timer as FreeTimer<ED, Cxt>;
const result = await timerFn(context);
console.log(`定时器【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}`);
}
await context.commit();
}
catch (err) {
@ -392,26 +427,33 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
}
async execStartRoutines() {
const routines = this.requireSth('lib/routines/start');
const routines: Routine<ED, keyof ED, Cxt>[] = this.requireSth('lib/routines/start');
for (const routine of routines) {
const { name, fn } = routine;
const context = await this.contextBuilder()(this.dbStore, undefined, getClusterInfo());
const start = Date.now();
await context.begin();
try {
const result = await fn(context);
console.log(`例程【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}`);
await context.commit();
if (routine.hasOwnProperty('entity')) {
this.execWatcher(routine as Watcher<ED, keyof ED, Cxt>);
}
catch (err) {
console.warn(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
await context.rollback();
else {
const { name, routine: routineFn } = routine as FreeRoutine<ED, Cxt>;
const context = await this.makeContext();
const start = Date.now();
await context.begin();
try {
const result = await routineFn(context);
console.log(`例程【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}`);
await context.commit();
}
catch (err) {
console.warn(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
await context.rollback();
}
}
}
}
async execRoutine(routine: (context: Cxt) => Promise<void>) {
const context = await this.contextBuilder()(this.dbStore, undefined, getClusterInfo());
const context = await this.makeContext();
await routine(context);
}
}

64
src/ClusterAppLoader.ts Normal file
View File

@ -0,0 +1,64 @@
import { combineFilters } from 'oak-domain/lib/store/filter';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { EntityDict, OperationResult, VolatileTrigger, Trigger } from 'oak-domain/lib/types';
import { BackendRuntimeContext } from 'oak-frontend-base';
import { getClusterInfo } from './cluster/env';
import { AppLoader } from './AppLoader';
import assert from 'assert';
import { DbStore } from './DbStore';
import { Namespace } from 'socket.io';
export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends AppLoader<ED, Cxt> {
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace) {
super(path, contextBuilder, ns);
this.dbStore.setOnVolatileTrigger(
async (entity, trigger, ids, cxtStr, option) => {
if (trigger.cs) {
// 如果是cluster sensative的触发器需要发送到相应的instance上被处理
}
else {
const context = await this.contextBuilder(cxtStr)(this.dbStore);
await context.begin();
try {
await this.dbStore.execVolatileTrigger(entity, trigger.name, ids, context, option);
await context.commit();
}
catch (err) {
await context.rollback();
console.error('execVolatileTrigger异常', entity, trigger.name, ids, option, err);
}
}
}
)
}
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt): Promise<OperationResult<ED>> {
const { instanceCount, instanceId } = getClusterInfo()!;
assert (instanceCount && typeof instanceId === 'number');
const { filter } = operation;
const filter2 = combineFilters<ED, T>(entity, this.dbStore.getSchema(), [filter, {
$$seq$$: {
$mod: [instanceCount, instanceId]
}
}]);
return super.operateInWatcher(entity, {
...operation,
filter: filter2,
}, context);
}
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt): Promise<Partial<ED[T]['Schema']>[]> {
const { instanceCount, instanceId } = getClusterInfo()!;
assert (instanceCount && typeof instanceId === 'number');
const { filter } = selection;
const filter2 = combineFilters<ED, T>(entity, this.dbStore.getSchema(), [filter, {
$$seq$$: {
$mod: [instanceCount, instanceId]
}
}]);
return super.selectInWatcher(entity, {
...selection,
filter: filter2,
}, context);
}
}

View File

@ -1,5 +1,5 @@
import { MysqlStore, MySqlSelectOption, MysqlOperateOption } from 'oak-db';
import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap } from 'oak-domain/lib/types';
import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { TriggerExecutor } from 'oak-domain/lib/store/TriggerExecutor';
import { MySQLConfiguration, } from 'oak-db/lib/MySQL/types/Configuration';
@ -18,9 +18,10 @@ export class DbStore<ED extends EntityDict & BaseEntityDict, Cxt extends Backend
mysqlConfiguration: MySQLConfiguration,
authDeduceRelationMap: AuthDeduceRelationMap<ED>,
selectFreeEntities: SelectFreeEntities<ED> = [],
updateFreeDict: UpdateFreeDict<ED> = {}) {
updateFreeDict: UpdateFreeDict<ED> = {},
onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>) {
super(storageSchema, mysqlConfiguration);
this.executor = new TriggerExecutor((scene) => contextBuilder(scene)(this));
this.executor = new TriggerExecutor((scene) => contextBuilder(scene)(this), undefined, onVolatileTrigger);
this.relationAuth = new RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
}
@ -141,6 +142,27 @@ export class DbStore<ED extends EntityDict & BaseEntityDict, Cxt extends Backend
this.executor.registerChecker(checker);
}
setOnVolatileTrigger(
onVolatileTrigger: <T extends keyof ED>(
entity: T,
trigger: VolatileTrigger<ED, T, Cxt>,
ids: string[],
cxtStr: string,
option: OperateOption) => Promise<void>
) {
this.executor.setOnVolatileTrigger(onVolatileTrigger);
}
async execVolatileTrigger<T extends keyof ED>(
entity: T,
name: string,
ids: string[],
context: Cxt,
option: OperateOption
) {
return this.executor.execVolatileTrigger(entity, name, ids, context, option);
}
checkpoint(ts: number) {
return this.executor.checkpoint(ts);
}

View File

@ -1,2 +1,3 @@
export { AppLoader } from './AppLoader';
export { ClusterAppLoader } from './ClusterAppLoader';
export * from './cluster/env';