From f48a6dc85b6c4068b26f7b9227e2d8f3d760cdb5 Mon Sep 17 00:00:00 2001 From: qcqcqc <1220204124@zust.edu.cn> Date: Mon, 19 Jan 2026 14:46:16 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E4=BA=86watcher?= =?UTF-8?q?=E7=9A=84lazy=E4=BB=A5=E5=8F=8Atimer=E5=92=8Cwatcher=E7=9A=84fr?= =?UTF-8?q?ee=E5=BD=A2=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/AppLoader.d.ts | 5 ++-- lib/AppLoader.js | 47 +++++++++++++++++++++++++++++++--- lib/ClusterAppLoader.d.ts | 5 ++-- lib/ClusterAppLoader.js | 10 ++++++-- src/AppLoader.ts | 54 +++++++++++++++++++++++++++++++++++---- src/ClusterAppLoader.ts | 15 ++++++++--- 6 files changed, 118 insertions(+), 18 deletions(-) diff --git a/lib/AppLoader.d.ts b/lib/AppLoader.d.ts index 384f8cd..5295c75 100644 --- a/lib/AppLoader.d.ts +++ b/lib/AppLoader.d.ts @@ -1,5 +1,5 @@ import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; -import { AppLoader as GeneralAppLoader, Trigger, EntityDict, Watcher, OpRecord, FreeTimer, OperationResult } from "oak-domain/lib/types"; +import { AppLoader as GeneralAppLoader, Trigger, EntityDict, Watcher, OpRecord, FreeTimer, OperationResult, BaseTimer } from "oak-domain/lib/types"; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { IncomingHttpHeaders, IncomingMessage } from 'http'; import { Namespace } from 'socket.io'; @@ -59,7 +59,8 @@ export declare class AppLoader; startWatchers(): void; - protected execFreeTimer(timer: FreeTimer, context: Cxt): Promise> | undefined; + protected execBaseTimer(timer: BaseTimer, context: Cxt): Promise> | undefined; + protected execFreeTimer(timer: FreeTimer, contextBuilder: () => Promise): Promise> | undefined; startTimers(): void; execStartRoutines(): Promise; execStopRoutines(): Promise; diff --git a/lib/AppLoader.js b/lib/AppLoader.js index a71e09d..c4dfa05 100644 --- a/lib/AppLoader.js +++ b/lib/AppLoader.js @@ -369,8 +369,22 @@ class AppLoader extends types_1.AppLoader { }); } async execWatcher(watcher) { - const context = await this.makeContext(); let result; + if (watcher.hasOwnProperty('type') && watcher.type === 'free') { + const selectContext = await this.makeContext(); + const { entity, projection, fn, filter, singleton, forUpdate } = watcher; + const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter); + const projection2 = typeof projection === 'function' ? await projection() : (0, lodash_1.cloneDeep)(projection); + const rows = await this.selectInWatcher(entity, { + data: projection2, + filter: filter2, + }, selectContext, forUpdate, singleton); + if (rows.length > 0) { + result = await fn(() => this.makeContext(), rows); + } + return result; + } + const context = await this.makeContext(); try { if (watcher.hasOwnProperty('actionData')) { const { entity, action, filter, actionData, singleton } = watcher; @@ -422,7 +436,13 @@ class AppLoader extends types_1.AppLoader { const { watchers: adWatchers } = (0, IntrinsicLogics_1.makeIntrinsicLogics)(this.dbStore.getSchema(), ActionDefDict); const totalWatchers = (watchers || []).concat(adWatchers); let count = 0; + const skipOnceSet = new Set(); const execOne = async (watcher, start) => { + if (skipOnceSet.has(watcher.name)) { + skipOnceSet.delete(watcher.name); + console.log(`跳过本次执行watcher【${watcher.name}】`); + return; + } try { const result = await this.execWatcher(watcher); if (result) { @@ -454,12 +474,22 @@ class AppLoader extends types_1.AppLoader { } this.watcherTimerId = setTimeout(() => doWatchers(), 120000); }; + // 首次执行时,跳过所有lazy的watcher + for (const w of totalWatchers) { + if (w.lazy) { + skipOnceSet.add(w.name); + } + } doWatchers(); } - execFreeTimer(timer, context) { + execBaseTimer(timer, context) { const { timer: timerFn } = timer; return timerFn(context); } + execFreeTimer(timer, contextBuilder) { + const { timer: timerFn } = timer; + return timerFn(contextBuilder); + } startTimers() { const timers = this.requireSth('lib/timers/index'); if (timers) { @@ -479,9 +509,20 @@ class AppLoader extends types_1.AppLoader { } } else { + if (timer.hasOwnProperty('type') && timer.type === 'free') { + try { + const result = await this.execFreeTimer(timer, () => this.makeContext()); + console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result); + } + catch (err) { + console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err); + this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err); + } + return; + } const context = await this.makeContext(); try { - const result = await this.execFreeTimer(timer, context); + const result = await this.execBaseTimer(timer, context); if (result) { console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result); } diff --git a/lib/ClusterAppLoader.d.ts b/lib/ClusterAppLoader.d.ts index 12a7b0c..3f6ad79 100644 --- a/lib/ClusterAppLoader.d.ts +++ b/lib/ClusterAppLoader.d.ts @@ -1,5 +1,5 @@ import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; -import { EntityDict, OperationResult, Trigger, Watcher, FreeTimer } from 'oak-domain/lib/types'; +import { EntityDict, OperationResult, Trigger, Watcher, FreeTimer, BaseTimer } from 'oak-domain/lib/types'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { AppLoader } from './AppLoader'; import { Namespace } from 'socket.io'; @@ -15,6 +15,7 @@ export declare class ClusterAppLoader(entity: T, operation: ED[T]['Update'], context: Cxt, singleton?: true): Promise>; protected selectInWatcher(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true): Promise[]>; protected execWatcher(watcher: Watcher): Promise | undefined>; - protected execFreeTimer(timer: FreeTimer, context: Cxt): Promise> | undefined; + protected execBaseTimer(timer: BaseTimer, context: Cxt): Promise> | undefined; + protected execFreeTimer(timer: FreeTimer, contextBuilder: () => Promise): Promise> | undefined; protected checkpoint(): Promise; } diff --git a/lib/ClusterAppLoader.js b/lib/ClusterAppLoader.js index 707839b..9f52611 100644 --- a/lib/ClusterAppLoader.js +++ b/lib/ClusterAppLoader.js @@ -168,11 +168,17 @@ class ClusterAppLoader extends AppLoader_1.AppLoader { } return super.execWatcher(watcher); } - execFreeTimer(timer, context) { + execBaseTimer(timer, context) { if (timer.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) { return; } - return super.execFreeTimer(timer, context); + return super.execBaseTimer(timer, context); + } + execFreeTimer(timer, contextBuilder) { + if (timer.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) { + return; + } + return super.execFreeTimer(timer, contextBuilder); } async checkpoint() { const { instanceCount, instanceId } = (0, env_1.getClusterInfo)(); diff --git a/src/AppLoader.ts b/src/AppLoader.ts index 4b15d4f..a1801d2 100644 --- a/src/AppLoader.ts +++ b/src/AppLoader.ts @@ -5,7 +5,7 @@ import { makeIntrinsicLogics } from "oak-domain/lib/store/IntrinsicLogics"; import { cloneDeep, mergeConcatMany, omit } from 'oak-domain/lib/utils/lodash'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { generateNewIdAsync } from 'oak-domain/lib/utils/uuid'; -import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, CreateOpResult, SyncConfig, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord, Routine, FreeRoutine, Timer, FreeTimer, StorageSchema, OperationResult, OakPartialSuccess, OakException } from "oak-domain/lib/types"; +import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, CreateOpResult, SyncConfig, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord, Routine, FreeRoutine, Timer, FreeTimer, StorageSchema, OperationResult, OakPartialSuccess, OakException, BaseTimer, WBFreeWatcher } from "oak-domain/lib/types"; import generalAspectDict, { clearPorts, registerPorts } from 'oak-common-aspect/lib/index'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { Endpoint, EndpointItem } from 'oak-domain/lib/types/Endpoint'; @@ -448,8 +448,23 @@ export class AppLoader) { - const context = await this.makeContext(); let result: OperationResult | undefined; + if (watcher.hasOwnProperty('type') && (watcher as WBFreeWatcher).type === 'free') { + const selectContext = await this.makeContext(); + const { entity, projection, fn, filter, singleton, forUpdate } = >watcher; + const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter); + const projection2 = typeof projection === 'function' ? await projection() : cloneDeep(projection); + const rows = await this.selectInWatcher(entity, { + data: projection2, + filter: filter2, + }, selectContext, forUpdate, singleton); + + if (rows.length > 0) { + result = await fn(() => this.makeContext(), rows); + } + return result; + } + const context = await this.makeContext(); try { if (watcher.hasOwnProperty('actionData')) { const { entity, action, filter, actionData, singleton } = >watcher; @@ -507,7 +522,13 @@ export class AppLoader[]>watchers || []).concat(adWatchers); let count = 0; + const skipOnceSet = new Set(); const execOne = async (watcher: Watcher, start: number) => { + if (skipOnceSet.has(watcher.name)) { + skipOnceSet.delete(watcher.name); + console.log(`跳过本次执行watcher【${watcher.name}】`); + return; + } try { const result = await this.execWatcher(watcher); if (result) { @@ -541,14 +562,27 @@ export class AppLoader doWatchers(), 120000); }; + + // 首次执行时,跳过所有lazy的watcher + for (const w of totalWatchers) { + if (w.lazy) { + skipOnceSet.add(w.name); + } + } + doWatchers(); } - protected execFreeTimer(timer: FreeTimer, context: Cxt): Promise> | undefined { - const { timer: timerFn } = timer as FreeTimer; + protected execBaseTimer(timer: BaseTimer, context: Cxt): Promise> | undefined { + const { timer: timerFn } = timer as BaseTimer; return timerFn(context); } + protected execFreeTimer(timer: FreeTimer, contextBuilder: () => Promise): Promise> | undefined { + const { timer: timerFn } = timer as FreeTimer; + return timerFn(contextBuilder); + } + startTimers() { const timers: Timer[] = this.requireSth('lib/timers/index'); if (timers) { @@ -569,9 +603,19 @@ export class AppLoader).type === 'free') { + try { + const result = await this.execFreeTimer(timer as FreeTimer, () => this.makeContext()); + console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result); + } catch (err) { + console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err); + this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err); + } + return; + } const context = await this.makeContext(); try { - const result = await this.execFreeTimer(timer as FreeTimer, context); + const result = await this.execBaseTimer(timer as BaseTimer, context); if (result) { console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result); } diff --git a/src/ClusterAppLoader.ts b/src/ClusterAppLoader.ts index 3b38112..760cad5 100644 --- a/src/ClusterAppLoader.ts +++ b/src/ClusterAppLoader.ts @@ -1,7 +1,7 @@ import { groupBy } from 'oak-domain/lib/utils/lodash'; import { combineFilters } from 'oak-domain/lib/store/filter'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; -import { EntityDict, OperationResult, VolatileTrigger, Trigger, OperateOption, OakPartialSuccess, Watcher, FreeTimer } from 'oak-domain/lib/types'; +import { EntityDict, OperationResult, VolatileTrigger, Trigger, OperateOption, OakPartialSuccess, Watcher, FreeTimer, BaseTimer } from 'oak-domain/lib/types'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { getClusterInfo } from './cluster/env'; @@ -27,7 +27,7 @@ export class ClusterAppLoader { + this.socket!.on(ClusterAppLoader.VolatileTriggerEvent, async (entity: keyof ED, name: string, ids: string[], cxtStr: string, option: OperateOption) => { const context = await this.makeContext(cxtStr); if (process.env.NODE_ENV === 'development') { console.log(`「${getClusterInfo().instanceId}」号实例接收到来自其它进程的volatileTrigger请求, name是「${name}」, ids是「${ids.join(',')}」`); @@ -184,11 +184,18 @@ export class ClusterAppLoader, context: Cxt) { + protected execBaseTimer(timer: BaseTimer, context: Cxt) { if (timer.singleton && getClusterInfo().instanceId !== 0) { return; } - return super.execFreeTimer(timer, context); + return super.execBaseTimer(timer, context); + } + + protected execFreeTimer(timer: FreeTimer, contextBuilder: () => Promise) { + if (timer.singleton && getClusterInfo().instanceId !== 0) { + return; + } + return super.execFreeTimer(timer, contextBuilder); } protected async checkpoint() {