diff --git a/lib/AppLoader.d.ts b/lib/AppLoader.d.ts index ffd9144..3d31f9a 100644 --- a/lib/AppLoader.d.ts +++ b/lib/AppLoader.d.ts @@ -1,24 +1,24 @@ /// import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; -import { AppLoader as GeneralAppLoader, EntityDict, OpRecord } from "oak-domain/lib/types"; +import { AppLoader as GeneralAppLoader, Trigger, EntityDict, Watcher, OpRecord } from "oak-domain/lib/types"; import { DbStore } from "./DbStore"; import { BackendRuntimeContext } from 'oak-frontend-base'; import { IncomingHttpHeaders, IncomingMessage } from 'http'; import { Namespace } from 'socket.io'; -import { ClusterInfo } from 'oak-domain/lib/types/Cluster'; export declare class AppLoader> extends GeneralAppLoader { - private dbStore; + protected dbStore: DbStore; private aspectDict; private externalDependencies; private dataSubscriber?; - private contextBuilder; + protected contextBuilder: (scene?: string) => (store: DbStore) => Promise; private requireSth; - constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore, header?: IncomingHttpHeaders, clusterInfo?: ClusterInfo) => Promise, ns?: Namespace); + private makeContext; + constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore) => Promise, ns?: Namespace); + protected registerTrigger(trigger: Trigger): void; initTriggers(): void; - startWatchers(): void; mount(initialize?: true): Promise; unmount(): Promise; - execAspect(name: string, header?: IncomingHttpHeaders, contextString?: string, params?: any): Promise<{ + execAspect(name: string, headers?: IncomingHttpHeaders, contextString?: string, params?: any): Promise<{ opRecords: OpRecord[]; result: any; message?: string; @@ -26,6 +26,10 @@ export declare class AppLoader; getStore(): DbStore; getEndpoints(prefix: string): [string, "get" | "post" | "put" | "delete", string, (params: Record, headers: IncomingHttpHeaders, req: IncomingMessage, body?: any) => Promise][]; + protected operateInWatcher(entity: T, operation: ED[T]['Update'], context: Cxt): Promise>; + protected selectInWatcher(entity: T, selection: ED[T]['Selection'], context: Cxt): Promise[]>; + protected execWatcher(watcher: Watcher): Promise; + startWatchers(): void; startTimers(): void; execStartRoutines(): Promise; execRoutine(routine: (context: Cxt) => Promise): Promise; diff --git a/lib/AppLoader.js b/lib/AppLoader.js index a735cd8..f805d2f 100644 --- a/lib/AppLoader.js +++ b/lib/AppLoader.js @@ -86,6 +86,12 @@ class AppLoader extends types_1.AppLoader { Object.assign(sthOut, sth); return sthOut; } + async makeContext(cxtStr, headers) { + const context = await this.contextBuilder(cxtStr)(this.dbStore); + context.clusterInfo = (0, env_2.getClusterInfo)(); + context.headers = headers; + return context; + } constructor(path, contextBuilder, ns) { super(path); const dbConfig = require((0, path_1.join)(path, '/configuration/mysql.json')); @@ -96,8 +102,8 @@ class AppLoader extends types_1.AppLoader { this.dbStore = new DbStore_1.DbStore(storageSchema, contextBuilder, dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict); if (ns) { this.dataSubscriber = new DataSubscriber_1.default(ns, (scene) => this.contextBuilder(scene)(this.dbStore)); - this.contextBuilder = (scene) => async (store, header, clusterInfo) => { - const context = await contextBuilder(scene)(store, header, clusterInfo); + this.contextBuilder = (scene) => async (store) => { + const context = await contextBuilder(scene)(store); // 注入在提交前向dataSubscribe const originCommit = context.commit; context.commit = async () => { @@ -117,79 +123,19 @@ class AppLoader extends types_1.AppLoader { this.contextBuilder = contextBuilder; } } + registerTrigger(trigger) { + this.dbStore.registerTrigger(trigger); + } initTriggers() { const triggers = this.requireSth('lib/triggers/index'); const checkers = this.requireSth('lib/checkers/index'); const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`); const { triggers: adTriggers, checkers: adCheckers } = (0, actionDef_1.makeIntrinsicCTWs)(this.dbStore.getSchema(), ActionDefDict); - triggers.forEach((trigger) => this.dbStore.registerTrigger(trigger)); - adTriggers.forEach((trigger) => this.dbStore.registerTrigger(trigger)); + triggers.forEach((trigger) => this.registerTrigger(trigger)); + adTriggers.forEach((trigger) => this.registerTrigger(trigger)); checkers.forEach((checker) => this.dbStore.registerChecker(checker)); adCheckers.forEach((checker) => this.dbStore.registerChecker(checker)); } - startWatchers() { - const watchers = this.requireSth('lib/watchers/index'); - const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`); - const { watchers: adWatchers } = (0, actionDef_1.makeIntrinsicCTWs)(this.dbStore.getSchema(), ActionDefDict); - const totalWatchers = watchers.concat(adWatchers); - let count = 0; - const doWatchers = async () => { - count++; - const start = Date.now(); - const context = await this.contextBuilder()(this.dbStore, undefined, (0, env_2.getClusterInfo)()); - for (const w of totalWatchers) { - await context.begin(); - try { - if (w.hasOwnProperty('actionData')) { - const { entity, action, filter, actionData } = w; - const filter2 = typeof filter === 'function' ? await filter() : filter; - const data = typeof actionData === 'function' ? await actionData() : actionData; // 这里有个奇怪的编译错误,不理解 by Xc - const result = await this.dbStore.operate(entity, { - id: await (0, uuid_1.generateNewIdAsync)(), - action, - data, - filter: filter2 - }, context, { - dontCollect: true, - }); - console.log(`执行了watcher【${w.name}】,结果是:`, result); - } - else { - const { entity, projection, fn, filter } = w; - const filter2 = typeof filter === 'function' ? await filter() : filter; - const projection2 = typeof projection === 'function' ? await projection() : projection; - const rows = await this.dbStore.select(entity, { - data: projection2, - filter: filter2, - }, context, { - dontCollect: true, - blockTrigger: true, - }); - if (rows.length > 0) { - const result = await fn(context, rows); - console.log(`执行了watcher【${w.name}】,结果是:`, result); - } - } - await context.commit(); - } - catch (err) { - await context.rollback(); - console.error(`执行了watcher【${w.name}】,发生错误:`, err); - } - } - const duration = Date.now() - start; - console.log(`第${count}次执行watchers,共执行${watchers.length}个,耗时${duration}毫秒`); - const now = Date.now(); - try { - await this.dbStore.checkpoint(process.env.NODE_ENV === 'development' ? now - 30 * 1000 : now - 120 * 1000); - } - catch (err) { - console.error(`执行了checkpoint,发生错误:`, err); - } - setTimeout(() => doWatchers(), 120000); - }; - doWatchers(); - } async mount(initialize) { const { path } = this; if (!initialize) { @@ -203,8 +149,8 @@ class AppLoader extends types_1.AppLoader { (0, index_1.clearPorts)(); this.dbStore.disconnect(); } - async execAspect(name, header, contextString, params) { - const context = await this.contextBuilder(contextString)(this.dbStore, header, (0, env_2.getClusterInfo)()); + async execAspect(name, headers, contextString, params) { + const context = await this.makeContext(contextString, headers); const fn = this.aspectDict[name]; if (!fn) { throw new Error(`不存在的接口名称: ${name}`); @@ -278,7 +224,7 @@ class AppLoader extends types_1.AppLoader { } } endPointRouters.push([name, method, url, async (params, headers, req, body) => { - const context = await this.contextBuilder()(this.dbStore, headers, (0, env_2.getClusterInfo)()); + const context = await this.makeContext(undefined, headers); await context.begin(); try { const result = await fn(context, params, headers, req, body); @@ -303,18 +249,96 @@ class AppLoader extends types_1.AppLoader { } return endPointRouters; } + operateInWatcher(entity, operation, context) { + return this.dbStore.operate(entity, operation, context, { + dontCollect: true, + }); + } + selectInWatcher(entity, selection, context) { + return this.dbStore.select(entity, selection, context, { + dontCollect: true, + blockTrigger: true, + }); + } + async execWatcher(watcher) { + const context = await this.makeContext(); + await context.begin(); + try { + if (watcher.hasOwnProperty('actionData')) { + const { entity, action, filter, actionData } = watcher; + const filter2 = typeof filter === 'function' ? await filter() : filter; + const data = typeof actionData === 'function' ? await (actionData)() : actionData; + const result = await this.operateInWatcher(entity, { + id: await (0, uuid_1.generateNewIdAsync)(), + action, + data, + filter: filter2 + }, context); + console.log(`执行了watcher【${watcher.name}】,结果是:`, result); + } + else { + const { entity, projection, fn, filter } = watcher; + const filter2 = typeof filter === 'function' ? await filter() : filter; + const projection2 = typeof projection === 'function' ? await projection() : projection; + const rows = await this.selectInWatcher(entity, { + data: projection2, + filter: filter2, + }, context); + if (rows.length > 0) { + const result = await fn(context, rows); + console.log(`执行了watcher【${watcher.name}】,结果是:`, result); + } + } + await context.commit(); + } + catch (err) { + await context.rollback(); + console.error(`执行了watcher【${watcher.name}】,发生错误:`, err); + } + } + startWatchers() { + const watchers = this.requireSth('lib/watchers/index'); + const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`); + const { watchers: adWatchers } = (0, actionDef_1.makeIntrinsicCTWs)(this.dbStore.getSchema(), ActionDefDict); + const totalWatchers = watchers.concat(adWatchers); + let count = 0; + const doWatchers = async () => { + count++; + const start = Date.now(); + for (const w of totalWatchers) { + await this.execWatcher(w); + } + const duration = Date.now() - start; + console.log(`第${count}次执行watchers,共执行${watchers.length}个,耗时${duration}毫秒`); + const now = Date.now(); + try { + await this.dbStore.checkpoint(process.env.NODE_ENV === 'development' ? now - 30 * 1000 : now - 120 * 1000); + } + catch (err) { + console.error(`执行了checkpoint,发生错误:`, err); + } + setTimeout(() => doWatchers(), 120000); + }; + doWatchers(); + } startTimers() { const timers = this.requireSth('lib/timers/index'); for (const timer of timers) { - const { cron, fn, name } = timer; + const { cron, name } = timer; (0, node_schedule_1.scheduleJob)(name, cron, async (date) => { const start = Date.now(); - const context = await this.contextBuilder()(this.dbStore, undefined, (0, env_2.getClusterInfo)()); + const context = await this.makeContext(); await context.begin(); console.log(`定时器【${name}】开始执行,时间是【${date.toLocaleTimeString()}】`); try { - const result = await fn(context); - console.log(`定时器【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}】`); + if (timer.hasOwnProperty('entity')) { + await this.execWatcher(timer); + } + else { + const { timer: timerFn } = timer; + const result = await timerFn(context); + console.log(`定时器【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}】`); + } await context.commit(); } catch (err) { @@ -327,23 +351,28 @@ class AppLoader extends types_1.AppLoader { async execStartRoutines() { const routines = this.requireSth('lib/routines/start'); for (const routine of routines) { - const { name, fn } = routine; - const context = await this.contextBuilder()(this.dbStore, undefined, (0, env_2.getClusterInfo)()); - const start = Date.now(); - await context.begin(); - try { - const result = await fn(context); - console.log(`例程【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}】`); - await context.commit(); + if (routine.hasOwnProperty('entity')) { + this.execWatcher(routine); } - catch (err) { - console.warn(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err); - await context.rollback(); + else { + const { name, routine: routineFn } = routine; + const context = await this.makeContext(); + const start = Date.now(); + await context.begin(); + try { + const result = await routineFn(context); + console.log(`例程【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}】`); + await context.commit(); + } + catch (err) { + console.warn(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err); + await context.rollback(); + } } } } async execRoutine(routine) { - const context = await this.contextBuilder()(this.dbStore, undefined, (0, env_2.getClusterInfo)()); + const context = await this.makeContext(); await routine(context); } } diff --git a/lib/ClusterAppLoader.d.ts b/lib/ClusterAppLoader.d.ts new file mode 100644 index 0000000..8240b54 --- /dev/null +++ b/lib/ClusterAppLoader.d.ts @@ -0,0 +1,11 @@ +import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; +import { EntityDict, OperationResult } from 'oak-domain/lib/types'; +import { BackendRuntimeContext } from 'oak-frontend-base'; +import { AppLoader } from './AppLoader'; +import { DbStore } from './DbStore'; +import { Namespace } from 'socket.io'; +export declare class ClusterAppLoader> extends AppLoader { + constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore) => Promise, ns?: Namespace); + protected operateInWatcher(entity: T, operation: ED[T]['Update'], context: Cxt): Promise>; + protected selectInWatcher(entity: T, selection: ED[T]['Selection'], context: Cxt): Promise[]>; +} diff --git a/lib/ClusterAppLoader.js b/lib/ClusterAppLoader.js new file mode 100644 index 0000000..e99830a --- /dev/null +++ b/lib/ClusterAppLoader.js @@ -0,0 +1,59 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.ClusterAppLoader = void 0; +const tslib_1 = require("tslib"); +const filter_1 = require("oak-domain/lib/store/filter"); +const env_1 = require("./cluster/env"); +const AppLoader_1 = require("./AppLoader"); +const assert_1 = tslib_1.__importDefault(require("assert")); +class ClusterAppLoader extends AppLoader_1.AppLoader { + constructor(path, contextBuilder, ns) { + super(path, contextBuilder, ns); + this.dbStore.setOnVolatileTrigger(async (entity, trigger, ids, cxtStr, option) => { + if (trigger.cs) { + // 如果是cluster sensative的触发器,需要发送到相应的instance上被处理 + } + else { + const context = await this.contextBuilder(cxtStr)(this.dbStore); + await context.begin(); + try { + await this.dbStore.execVolatileTrigger(entity, trigger.name, ids, context, option); + await context.commit(); + } + catch (err) { + await context.rollback(); + console.error('execVolatileTrigger异常', entity, trigger.name, ids, option, err); + } + } + }); + } + operateInWatcher(entity, operation, context) { + const { instanceCount, instanceId } = (0, env_1.getClusterInfo)(); + (0, assert_1.default)(instanceCount && typeof instanceId === 'number'); + const { filter } = operation; + const filter2 = (0, filter_1.combineFilters)(entity, this.dbStore.getSchema(), [filter, { + $$seq$$: { + $mod: [instanceCount, instanceId] + } + }]); + return super.operateInWatcher(entity, { + ...operation, + filter: filter2, + }, context); + } + selectInWatcher(entity, selection, context) { + const { instanceCount, instanceId } = (0, env_1.getClusterInfo)(); + (0, assert_1.default)(instanceCount && typeof instanceId === 'number'); + const { filter } = selection; + const filter2 = (0, filter_1.combineFilters)(entity, this.dbStore.getSchema(), [filter, { + $$seq$$: { + $mod: [instanceCount, instanceId] + } + }]); + return super.selectInWatcher(entity, { + ...selection, + filter: filter2, + }, context); + } +} +exports.ClusterAppLoader = ClusterAppLoader; diff --git a/lib/DbStore.d.ts b/lib/DbStore.d.ts index e7c4879..7d3cea8 100644 --- a/lib/DbStore.d.ts +++ b/lib/DbStore.d.ts @@ -1,5 +1,5 @@ import { MysqlStore, MySqlSelectOption, MysqlOperateOption } from 'oak-db'; -import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap } from 'oak-domain/lib/types'; +import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { MySQLConfiguration } from 'oak-db/lib/MySQL/types/Configuration'; import { BackendRuntimeContext } from 'oak-frontend-base'; @@ -7,12 +7,14 @@ import { AsyncContext, AsyncRowStore } from 'oak-domain/lib/store/AsyncRowStore' export declare class DbStore> extends MysqlStore implements AsyncRowStore { private executor; private relationAuth; - constructor(storageSchema: StorageSchema, contextBuilder: (scene?: string) => (store: DbStore) => Promise, mysqlConfiguration: MySQLConfiguration, authDeduceRelationMap: AuthDeduceRelationMap, selectFreeEntities?: SelectFreeEntities, updateFreeDict?: UpdateFreeDict); + constructor(storageSchema: StorageSchema, contextBuilder: (scene?: string) => (store: DbStore) => Promise, mysqlConfiguration: MySQLConfiguration, authDeduceRelationMap: AuthDeduceRelationMap, selectFreeEntities?: SelectFreeEntities, updateFreeDict?: UpdateFreeDict, onVolatileTrigger?: (entity: T, trigger: VolatileTrigger, ids: string[], cxtStr: string, option: OperateOption) => Promise); protected cascadeUpdateAsync(entity: T, operation: ED[T]['Operation'], context: AsyncContext, option: MysqlOperateOption): Promise>; operate(entity: T, operation: ED[T]['Operation'], context: Cxt, option: MysqlOperateOption): Promise>; select(entity: T, selection: ED[T]['Selection'], context: Cxt, option: MySqlSelectOption): Promise[]>; count(entity: T, selection: Pick, context: Cxt, option: SelectOption): Promise; registerTrigger(trigger: Trigger): void; registerChecker(checker: Checker): void; + setOnVolatileTrigger(onVolatileTrigger: (entity: T, trigger: VolatileTrigger, ids: string[], cxtStr: string, option: OperateOption) => Promise): void; + execVolatileTrigger(entity: T, name: string, ids: string[], context: Cxt, option: OperateOption): Promise; checkpoint(ts: number): Promise; } diff --git a/lib/DbStore.js b/lib/DbStore.js index f5b955c..1cf1058 100644 --- a/lib/DbStore.js +++ b/lib/DbStore.js @@ -7,9 +7,9 @@ const RelationAuth_1 = require("oak-domain/lib/store/RelationAuth"); class DbStore extends oak_db_1.MysqlStore { executor; relationAuth; - constructor(storageSchema, contextBuilder, mysqlConfiguration, authDeduceRelationMap, selectFreeEntities = [], updateFreeDict = {}) { + constructor(storageSchema, contextBuilder, mysqlConfiguration, authDeduceRelationMap, selectFreeEntities = [], updateFreeDict = {}, onVolatileTrigger) { super(storageSchema, mysqlConfiguration); - this.executor = new TriggerExecutor_1.TriggerExecutor((scene) => contextBuilder(scene)(this)); + this.executor = new TriggerExecutor_1.TriggerExecutor((scene) => contextBuilder(scene)(this), undefined, onVolatileTrigger); this.relationAuth = new RelationAuth_1.RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict); } async cascadeUpdateAsync(entity, operation, context, option) { @@ -109,6 +109,12 @@ class DbStore extends oak_db_1.MysqlStore { registerChecker(checker) { this.executor.registerChecker(checker); } + setOnVolatileTrigger(onVolatileTrigger) { + this.executor.setOnVolatileTrigger(onVolatileTrigger); + } + async execVolatileTrigger(entity, name, ids, context, option) { + return this.executor.execVolatileTrigger(entity, name, ids, context, option); + } checkpoint(ts) { return this.executor.checkpoint(ts); } diff --git a/lib/index.d.ts b/lib/index.d.ts index d055f14..f7829b6 100644 --- a/lib/index.d.ts +++ b/lib/index.d.ts @@ -1,2 +1,3 @@ export { AppLoader } from './AppLoader'; +export { ClusterAppLoader } from './ClusterAppLoader'; export * from './cluster/env'; diff --git a/lib/index.js b/lib/index.js index 491b075..50cc114 100644 --- a/lib/index.js +++ b/lib/index.js @@ -1,7 +1,9 @@ "use strict"; Object.defineProperty(exports, "__esModule", { value: true }); -exports.AppLoader = void 0; +exports.ClusterAppLoader = exports.AppLoader = void 0; const tslib_1 = require("tslib"); var AppLoader_1 = require("./AppLoader"); Object.defineProperty(exports, "AppLoader", { enumerable: true, get: function () { return AppLoader_1.AppLoader; } }); +var ClusterAppLoader_1 = require("./ClusterAppLoader"); +Object.defineProperty(exports, "ClusterAppLoader", { enumerable: true, get: function () { return ClusterAppLoader_1.ClusterAppLoader; } }); tslib_1.__exportStar(require("./cluster/env"), exports); diff --git a/src/AppLoader.ts b/src/AppLoader.ts index cff1c5d..1d36513 100644 --- a/src/AppLoader.ts +++ b/src/AppLoader.ts @@ -6,7 +6,7 @@ import { makeIntrinsicCTWs } from "oak-domain/lib/store/actionDef"; import { intersection, omit } from 'oak-domain/lib/utils/lodash'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { generateNewIdAsync } from 'oak-domain/lib/utils/uuid'; -import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, CreateOpResult, Context, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord } from "oak-domain/lib/types"; +import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, CreateOpResult, Context, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord, Routine, FreeRoutine, Timer, FreeTimer, StorageSchema } from "oak-domain/lib/types"; import { DbStore } from "./DbStore"; import generalAspectDict, { clearPorts, registerPorts } from 'oak-common-aspect/lib/index'; import { MySQLConfiguration } from 'oak-db/lib/MySQL/types/Configuration'; @@ -17,16 +17,15 @@ import { IncomingHttpHeaders, IncomingMessage } from 'http'; import { Server as SocketIoServer, Namespace } from 'socket.io'; import DataSubscriber from './cluster/DataSubscriber'; -import { ClusterInfo } from 'oak-domain/lib/types/Cluster'; import { getClusterInfo } from './cluster/env'; export class AppLoader> extends GeneralAppLoader { - private dbStore: DbStore; + protected dbStore: DbStore; private aspectDict: Record>; private externalDependencies: string[]; private dataSubscriber?: DataSubscriber; - private contextBuilder: (scene?: string) => (store: DbStore, header?: IncomingHttpHeaders, clusterInfo?: ClusterInfo) => Promise; + protected contextBuilder: (scene?: string) => (store: DbStore) => Promise; private requireSth(filePath: string): any { const depFilePath = join(this.path, filePath); @@ -110,7 +109,15 @@ export class AppLoader (store: DbStore, header?: IncomingHttpHeaders, clusterInfo?: ClusterInfo) => Promise, ns?: Namespace) { + private async makeContext(cxtStr?: string, headers?: IncomingHttpHeaders) { + const context = await this.contextBuilder(cxtStr)(this.dbStore); + context.clusterInfo = getClusterInfo(); + context.headers = headers; + + return context; + } + + constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore) => Promise, ns?: Namespace) { super(path); const dbConfig: MySQLConfiguration = require(join(path, '/configuration/mysql.json')); const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`); @@ -120,8 +127,8 @@ export class AppLoader(storageSchema, contextBuilder, dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict); if (ns) { this.dataSubscriber = new DataSubscriber(ns, (scene) => this.contextBuilder(scene)(this.dbStore)); - this.contextBuilder = (scene) => async (store, header, clusterInfo) => { - const context = await contextBuilder(scene)(store, header, clusterInfo); + this.contextBuilder = (scene) => async (store) => { + const context = await contextBuilder(scene)(store); // 注入在提交前向dataSubscribe const originCommit = context.commit; @@ -150,96 +157,34 @@ export class AppLoader) { + this.dbStore.registerTrigger(trigger); + } + initTriggers() { const triggers = this.requireSth('lib/triggers/index'); const checkers = this.requireSth('lib/checkers/index'); const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`); const { triggers: adTriggers, checkers: adCheckers } = makeIntrinsicCTWs(this.dbStore.getSchema(), ActionDefDict); + triggers.forEach( - (trigger: Trigger) => this.dbStore.registerTrigger(trigger) + (trigger: Trigger) => this.registerTrigger(trigger) ); + adTriggers.forEach( - (trigger) => this.dbStore.registerTrigger(trigger) + (trigger) => this.registerTrigger(trigger) ); + checkers.forEach( (checker: Checker) => this.dbStore.registerChecker(checker) ); + adCheckers.forEach( (checker) => this.dbStore.registerChecker(checker) ); } - startWatchers() { - const watchers = this.requireSth('lib/watchers/index'); - const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`); - - const { watchers: adWatchers } = makeIntrinsicCTWs(this.dbStore.getSchema(), ActionDefDict); - const totalWatchers = ([]>watchers).concat(adWatchers); - - let count = 0; - const doWatchers = async () => { - count++; - const start = Date.now(); - const context = await this.contextBuilder()(this.dbStore, undefined, getClusterInfo()); - for (const w of totalWatchers) { - await context.begin(); - try { - if (w.hasOwnProperty('actionData')) { - const { entity, action, filter, actionData } = >w; - const filter2 = typeof filter === 'function' ? await filter() : filter; - const data = typeof actionData === 'function' ? await (actionData as any)() : actionData; // 这里有个奇怪的编译错误,不理解 by Xc - const result = await this.dbStore.operate(entity, { - id: await generateNewIdAsync(), - action, - data, - filter: filter2 - }, context, { - dontCollect: true, - }); - - console.log(`执行了watcher【${w.name}】,结果是:`, result); - } - else { - const { entity, projection, fn, filter } = >w; - const filter2 = typeof filter === 'function' ? await filter() : filter; - const projection2 = typeof projection === 'function' ? await (projection as Function)() : projection; - const rows = await this.dbStore.select(entity, { - data: projection2 as any, - filter: filter2, - }, context, { - dontCollect: true, - blockTrigger: true, - }); - - if (rows.length > 0) { - const result = await fn(context, rows); - console.log(`执行了watcher【${w.name}】,结果是:`, result); - } - } - await context.commit(); - } - catch (err) { - await context.rollback(); - console.error(`执行了watcher【${w.name}】,发生错误:`, err); - } - } - const duration = Date.now() - start; - console.log(`第${count}次执行watchers,共执行${watchers.length}个,耗时${duration}毫秒`); - - const now = Date.now(); - try { - await this.dbStore.checkpoint(process.env.NODE_ENV === 'development' ? now - 30 * 1000 : now - 120 * 1000); - } - catch (err) { - console.error(`执行了checkpoint,发生错误:`, err); - } - - setTimeout(() => doWatchers(), 120000); - }; - doWatchers(); - } - async mount(initialize?: true) { const { path } = this; if (!initialize) { @@ -255,12 +200,13 @@ export class AppLoader[]; result: any; message?: string; }> { - const context = await this.contextBuilder(contextString)(this.dbStore, header, getClusterInfo()); + const context = await this.makeContext(contextString, headers); + const fn = this.aspectDict[name]; if (!fn) { throw new Error(`不存在的接口名称: ${name}`); @@ -340,7 +286,8 @@ export class AppLoader { - const context = await this.contextBuilder()(this.dbStore, headers, getClusterInfo()); + const context = await this.makeContext(undefined, headers); + await context.begin(); try { const result = await fn(context, params, headers, req, body); @@ -369,18 +316,106 @@ export class AppLoader(entity: T, operation: ED[T]['Update'], context: Cxt) { + return this.dbStore.operate(entity, operation, context, { + dontCollect: true, + }); + } + + protected selectInWatcher(entity: T, selection: ED[T]['Selection'], context: Cxt) { + return this.dbStore.select(entity, selection, context, { + dontCollect: true, + blockTrigger: true, + }) + } + + protected async execWatcher(watcher: Watcher) { + const context = await this.makeContext(); + await context.begin(); + try { + if (watcher.hasOwnProperty('actionData')) { + const { entity, action, filter, actionData } = >watcher; + const filter2 = typeof filter === 'function' ? await filter() : filter; + const data = typeof actionData === 'function' ? await (actionData)() : actionData; + const result = await this.operateInWatcher(entity, { + id: await generateNewIdAsync(), + action, + data, + filter: filter2 + }, context); + + console.log(`执行了watcher【${watcher.name}】,结果是:`, result); + } + else { + const { entity, projection, fn, filter } = >watcher; + const filter2 = typeof filter === 'function' ? await filter() : filter; + const projection2 = typeof projection === 'function' ? await projection () : projection; + const rows = await this.selectInWatcher(entity, { + data: projection2, + filter: filter2, + }, context); + + if (rows.length > 0) { + const result = await fn(context, rows); + console.log(`执行了watcher【${watcher.name}】,结果是:`, result); + } + } + await context.commit(); + } + catch (err) { + await context.rollback(); + console.error(`执行了watcher【${watcher.name}】,发生错误:`, err); + } + } + + startWatchers() { + const watchers = this.requireSth('lib/watchers/index'); + const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`); + + const { watchers: adWatchers } = makeIntrinsicCTWs(this.dbStore.getSchema(), ActionDefDict); + const totalWatchers = ([]>watchers).concat(adWatchers); + + let count = 0; + const doWatchers = async () => { + count++; + const start = Date.now(); + for (const w of totalWatchers) { + await this.execWatcher(w); + } + const duration = Date.now() - start; + console.log(`第${count}次执行watchers,共执行${watchers.length}个,耗时${duration}毫秒`); + + const now = Date.now(); + try { + await this.dbStore.checkpoint(process.env.NODE_ENV === 'development' ? now - 30 * 1000 : now - 120 * 1000); + } + catch (err) { + console.error(`执行了checkpoint,发生错误:`, err); + } + + setTimeout(() => doWatchers(), 120000); + }; + doWatchers(); + } + startTimers() { - const timers = this.requireSth('lib/timers/index'); + const timers: Timer[] = this.requireSth('lib/timers/index'); for (const timer of timers) { - const { cron, fn, name } = timer; + const { cron, name } = timer; scheduleJob(name, cron, async (date) => { const start = Date.now(); - const context = await this.contextBuilder()(this.dbStore, undefined, getClusterInfo()); + const context = await this.makeContext(); await context.begin(); console.log(`定时器【${name}】开始执行,时间是【${date.toLocaleTimeString()}】`); try { - const result = await fn(context); - console.log(`定时器【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}】`); + if (timer.hasOwnProperty('entity')) { + await this.execWatcher(timer as Watcher); + } + else { + const { timer: timerFn } = timer as FreeTimer; + const result = await timerFn(context); + console.log(`定时器【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}】`); + } await context.commit(); } catch (err) { @@ -392,26 +427,33 @@ export class AppLoader[] = this.requireSth('lib/routines/start'); for (const routine of routines) { - const { name, fn } = routine; - const context = await this.contextBuilder()(this.dbStore, undefined, getClusterInfo()); - const start = Date.now(); - await context.begin(); - try { - const result = await fn(context); - console.log(`例程【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}】`); - await context.commit(); + if (routine.hasOwnProperty('entity')) { + this.execWatcher(routine as Watcher); } - catch (err) { - console.warn(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err); - await context.rollback(); + else { + const { name, routine: routineFn } = routine as FreeRoutine; + const context = await this.makeContext(); + + const start = Date.now(); + await context.begin(); + try { + const result = await routineFn(context); + console.log(`例程【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}】`); + await context.commit(); + } + catch (err) { + console.warn(`例程【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err); + await context.rollback(); + } } } } async execRoutine(routine: (context: Cxt) => Promise) { - const context = await this.contextBuilder()(this.dbStore, undefined, getClusterInfo()); + const context = await this.makeContext(); + await routine(context); } } \ No newline at end of file diff --git a/src/ClusterAppLoader.ts b/src/ClusterAppLoader.ts new file mode 100644 index 0000000..78e5760 --- /dev/null +++ b/src/ClusterAppLoader.ts @@ -0,0 +1,64 @@ +import { combineFilters } from 'oak-domain/lib/store/filter'; +import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; +import { EntityDict, OperationResult, VolatileTrigger, Trigger } from 'oak-domain/lib/types'; +import { BackendRuntimeContext } from 'oak-frontend-base'; +import { getClusterInfo } from './cluster/env'; + +import { AppLoader } from './AppLoader'; +import assert from 'assert'; +import { DbStore } from './DbStore'; +import { Namespace } from 'socket.io'; + +export class ClusterAppLoader> extends AppLoader { + constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore) => Promise, ns?: Namespace) { + super(path, contextBuilder, ns); + this.dbStore.setOnVolatileTrigger( + async (entity, trigger, ids, cxtStr, option) => { + if (trigger.cs) { + // 如果是cluster sensative的触发器,需要发送到相应的instance上被处理 + } + else { + const context = await this.contextBuilder(cxtStr)(this.dbStore); + await context.begin(); + try { + await this.dbStore.execVolatileTrigger(entity, trigger.name, ids, context, option); + await context.commit(); + } + catch (err) { + await context.rollback(); + console.error('execVolatileTrigger异常', entity, trigger.name, ids, option, err); + } + } + } + ) + } + protected operateInWatcher(entity: T, operation: ED[T]['Update'], context: Cxt): Promise> { + const { instanceCount, instanceId } = getClusterInfo()!; + assert (instanceCount && typeof instanceId === 'number'); + const { filter } = operation; + const filter2 = combineFilters(entity, this.dbStore.getSchema(), [filter, { + $$seq$$: { + $mod: [instanceCount, instanceId] + } + }]); + return super.operateInWatcher(entity, { + ...operation, + filter: filter2, + }, context); + } + + protected selectInWatcher(entity: T, selection: ED[T]['Selection'], context: Cxt): Promise[]> { + const { instanceCount, instanceId } = getClusterInfo()!; + assert (instanceCount && typeof instanceId === 'number'); + const { filter } = selection; + const filter2 = combineFilters(entity, this.dbStore.getSchema(), [filter, { + $$seq$$: { + $mod: [instanceCount, instanceId] + } + }]); + return super.selectInWatcher(entity, { + ...selection, + filter: filter2, + }, context); + } +} \ No newline at end of file diff --git a/src/DbStore.ts b/src/DbStore.ts index 85f8039..428020e 100644 --- a/src/DbStore.ts +++ b/src/DbStore.ts @@ -1,5 +1,5 @@ import { MysqlStore, MySqlSelectOption, MysqlOperateOption } from 'oak-db'; -import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap } from 'oak-domain/lib/types'; +import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEntities, UpdateFreeDict, AuthDeduceRelationMap, VolatileTrigger, OperateOption } from 'oak-domain/lib/types'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { TriggerExecutor } from 'oak-domain/lib/store/TriggerExecutor'; import { MySQLConfiguration, } from 'oak-db/lib/MySQL/types/Configuration'; @@ -18,9 +18,10 @@ export class DbStore, selectFreeEntities: SelectFreeEntities = [], - updateFreeDict: UpdateFreeDict = {}) { + updateFreeDict: UpdateFreeDict = {}, + onVolatileTrigger?: (entity: T, trigger: VolatileTrigger, ids: string[], cxtStr: string, option: OperateOption) => Promise) { super(storageSchema, mysqlConfiguration); - this.executor = new TriggerExecutor((scene) => contextBuilder(scene)(this)); + this.executor = new TriggerExecutor((scene) => contextBuilder(scene)(this), undefined, onVolatileTrigger); this.relationAuth = new RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict); } @@ -141,6 +142,27 @@ export class DbStore( + entity: T, + trigger: VolatileTrigger, + ids: string[], + cxtStr: string, + option: OperateOption) => Promise + ) { + this.executor.setOnVolatileTrigger(onVolatileTrigger); + } + + async execVolatileTrigger( + entity: T, + name: string, + ids: string[], + context: Cxt, + option: OperateOption + ) { + return this.executor.execVolatileTrigger(entity, name, ids, context, option); + } + checkpoint(ts: number) { return this.executor.checkpoint(ts); } diff --git a/src/index.ts b/src/index.ts index e9b40f2..8cc99a7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,2 +1,3 @@ export { AppLoader } from './AppLoader'; +export { ClusterAppLoader } from './ClusterAppLoader'; export * from './cluster/env'; \ No newline at end of file