feat: 支持了watcher的lazy以及timer和watcher的free形式

This commit is contained in:
Pan Qiancheng 2026-01-19 14:46:16 +08:00
parent f3e579fa35
commit f48a6dc85b
6 changed files with 118 additions and 18 deletions

5
lib/AppLoader.d.ts vendored
View File

@ -1,5 +1,5 @@
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AppLoader as GeneralAppLoader, Trigger, EntityDict, Watcher, OpRecord, FreeTimer, OperationResult } from "oak-domain/lib/types";
import { AppLoader as GeneralAppLoader, Trigger, EntityDict, Watcher, OpRecord, FreeTimer, OperationResult, BaseTimer } from "oak-domain/lib/types";
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { IncomingHttpHeaders, IncomingMessage } from 'http';
import { Namespace } from 'socket.io';
@ -59,7 +59,8 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
protected getCheckpointTs(): number;
protected checkpoint(): Promise<number>;
startWatchers(): void;
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined;
protected execBaseTimer(timer: BaseTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined;
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, contextBuilder: () => Promise<Cxt>): Promise<OperationResult<ED>> | undefined;
startTimers(): void;
execStartRoutines(): Promise<void>;
execStopRoutines(): Promise<void>;

View File

@ -369,8 +369,22 @@ class AppLoader extends types_1.AppLoader {
});
}
async execWatcher(watcher) {
const context = await this.makeContext();
let result;
if (watcher.hasOwnProperty('type') && watcher.type === 'free') {
const selectContext = await this.makeContext();
const { entity, projection, fn, filter, singleton, forUpdate } = watcher;
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter);
const projection2 = typeof projection === 'function' ? await projection() : (0, lodash_1.cloneDeep)(projection);
const rows = await this.selectInWatcher(entity, {
data: projection2,
filter: filter2,
}, selectContext, forUpdate, singleton);
if (rows.length > 0) {
result = await fn(() => this.makeContext(), rows);
}
return result;
}
const context = await this.makeContext();
try {
if (watcher.hasOwnProperty('actionData')) {
const { entity, action, filter, actionData, singleton } = watcher;
@ -422,7 +436,13 @@ class AppLoader extends types_1.AppLoader {
const { watchers: adWatchers } = (0, IntrinsicLogics_1.makeIntrinsicLogics)(this.dbStore.getSchema(), ActionDefDict);
const totalWatchers = (watchers || []).concat(adWatchers);
let count = 0;
const skipOnceSet = new Set();
const execOne = async (watcher, start) => {
if (skipOnceSet.has(watcher.name)) {
skipOnceSet.delete(watcher.name);
console.log(`跳过本次执行watcher【${watcher.name}`);
return;
}
try {
const result = await this.execWatcher(watcher);
if (result) {
@ -454,12 +474,22 @@ class AppLoader extends types_1.AppLoader {
}
this.watcherTimerId = setTimeout(() => doWatchers(), 120000);
};
// 首次执行时跳过所有lazy的watcher
for (const w of totalWatchers) {
if (w.lazy) {
skipOnceSet.add(w.name);
}
}
doWatchers();
}
execFreeTimer(timer, context) {
execBaseTimer(timer, context) {
const { timer: timerFn } = timer;
return timerFn(context);
}
execFreeTimer(timer, contextBuilder) {
const { timer: timerFn } = timer;
return timerFn(contextBuilder);
}
startTimers() {
const timers = this.requireSth('lib/timers/index');
if (timers) {
@ -479,9 +509,20 @@ class AppLoader extends types_1.AppLoader {
}
}
else {
if (timer.hasOwnProperty('type') && timer.type === 'free') {
try {
const result = await this.execFreeTimer(timer, () => this.makeContext());
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
}
catch (err) {
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
}
return;
}
const context = await this.makeContext();
try {
const result = await this.execFreeTimer(timer, context);
const result = await this.execBaseTimer(timer, context);
if (result) {
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
}

View File

@ -1,5 +1,5 @@
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { EntityDict, OperationResult, Trigger, Watcher, FreeTimer } from 'oak-domain/lib/types';
import { EntityDict, OperationResult, Trigger, Watcher, FreeTimer, BaseTimer } from 'oak-domain/lib/types';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { AppLoader } from './AppLoader';
import { Namespace } from 'socket.io';
@ -15,6 +15,7 @@ export declare class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cx
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt, singleton?: true): Promise<OperationResult<ED>>;
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true): Promise<Partial<ED[T]['Schema']>[]>;
protected execWatcher(watcher: Watcher<ED, keyof ED, Cxt>): Promise<OperationResult<ED> | undefined>;
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined;
protected execBaseTimer(timer: BaseTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined;
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, contextBuilder: () => Promise<Cxt>): Promise<OperationResult<ED>> | undefined;
protected checkpoint(): Promise<number>;
}

View File

@ -168,11 +168,17 @@ class ClusterAppLoader extends AppLoader_1.AppLoader {
}
return super.execWatcher(watcher);
}
execFreeTimer(timer, context) {
execBaseTimer(timer, context) {
if (timer.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) {
return;
}
return super.execFreeTimer(timer, context);
return super.execBaseTimer(timer, context);
}
execFreeTimer(timer, contextBuilder) {
if (timer.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) {
return;
}
return super.execFreeTimer(timer, contextBuilder);
}
async checkpoint() {
const { instanceCount, instanceId } = (0, env_1.getClusterInfo)();

View File

@ -5,7 +5,7 @@ import { makeIntrinsicLogics } from "oak-domain/lib/store/IntrinsicLogics";
import { cloneDeep, mergeConcatMany, omit } from 'oak-domain/lib/utils/lodash';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { generateNewIdAsync } from 'oak-domain/lib/utils/uuid';
import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, CreateOpResult, SyncConfig, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord, Routine, FreeRoutine, Timer, FreeTimer, StorageSchema, OperationResult, OakPartialSuccess, OakException } from "oak-domain/lib/types";
import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, CreateOpResult, SyncConfig, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord, Routine, FreeRoutine, Timer, FreeTimer, StorageSchema, OperationResult, OakPartialSuccess, OakException, BaseTimer, WBFreeWatcher } from "oak-domain/lib/types";
import generalAspectDict, { clearPorts, registerPorts } from 'oak-common-aspect/lib/index';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { Endpoint, EndpointItem } from 'oak-domain/lib/types/Endpoint';
@ -448,8 +448,23 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
}
protected async execWatcher(watcher: Watcher<ED, keyof ED, Cxt>) {
const context = await this.makeContext();
let result: OperationResult<ED> | undefined;
if (watcher.hasOwnProperty('type') && (watcher as WBFreeWatcher<ED, keyof ED, Cxt>).type === 'free') {
const selectContext = await this.makeContext();
const { entity, projection, fn, filter, singleton, forUpdate } = <WBFreeWatcher<ED, keyof ED, Cxt>>watcher;
const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter);
const projection2 = typeof projection === 'function' ? await projection() : cloneDeep(projection);
const rows = await this.selectInWatcher(entity, {
data: projection2,
filter: filter2,
}, selectContext, forUpdate, singleton);
if (rows.length > 0) {
result = await fn(() => this.makeContext(), rows);
}
return result;
}
const context = await this.makeContext();
try {
if (watcher.hasOwnProperty('actionData')) {
const { entity, action, filter, actionData, singleton } = <BBWatcher<ED, keyof ED>>watcher;
@ -507,7 +522,13 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
const totalWatchers = (<Watcher<ED, keyof ED, Cxt>[]>watchers || []).concat(adWatchers);
let count = 0;
const skipOnceSet = new Set<string>();
const execOne = async (watcher: Watcher<ED, keyof ED, Cxt>, start: number) => {
if (skipOnceSet.has(watcher.name)) {
skipOnceSet.delete(watcher.name);
console.log(`跳过本次执行watcher【${watcher.name}`);
return;
}
try {
const result = await this.execWatcher(watcher);
if (result) {
@ -541,14 +562,27 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
this.watcherTimerId = setTimeout(() => doWatchers(), 120000);
};
// 首次执行时跳过所有lazy的watcher
for (const w of totalWatchers) {
if (w.lazy) {
skipOnceSet.add(w.name);
}
}
doWatchers();
}
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined {
const { timer: timerFn } = timer as FreeTimer<ED, Cxt>;
protected execBaseTimer(timer: BaseTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined {
const { timer: timerFn } = timer as BaseTimer<ED, Cxt>;
return timerFn(context);
}
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, contextBuilder: () => Promise<Cxt>): Promise<OperationResult<ED>> | undefined {
const { timer: timerFn } = timer as FreeTimer<ED, Cxt>;
return timerFn(contextBuilder);
}
startTimers() {
const timers: Timer<ED, keyof ED, Cxt>[] = this.requireSth('lib/timers/index');
if (timers) {
@ -569,9 +603,19 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
}
}
else {
if (timer.hasOwnProperty('type') && (timer as FreeTimer<ED, Cxt>).type === 'free') {
try {
const result = await this.execFreeTimer(timer as FreeTimer<ED, Cxt>, () => this.makeContext());
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
} catch (err) {
console.error(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
this.publishInternalError(`timer`, `定时器【${name}】执行失败`, err);
}
return;
}
const context = await this.makeContext();
try {
const result = await this.execFreeTimer(timer as FreeTimer<ED, Cxt>, context);
const result = await this.execBaseTimer(timer as BaseTimer<ED, Cxt>, context);
if (result) {
console.log(`定时器【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是`, result);
}

View File

@ -1,7 +1,7 @@
import { groupBy } from 'oak-domain/lib/utils/lodash';
import { combineFilters } from 'oak-domain/lib/store/filter';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { EntityDict, OperationResult, VolatileTrigger, Trigger, OperateOption, OakPartialSuccess, Watcher, FreeTimer } from 'oak-domain/lib/types';
import { EntityDict, OperationResult, VolatileTrigger, Trigger, OperateOption, OakPartialSuccess, Watcher, FreeTimer, BaseTimer } from 'oak-domain/lib/types';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { getClusterInfo } from './cluster/env';
@ -27,7 +27,7 @@ export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extend
this.socket!.emit('sub', csTriggerNames);
}
});
this.socket!.on(ClusterAppLoader.VolatileTriggerEvent, async (entity: keyof ED, name: string, ids: string[], cxtStr: string, option: OperateOption) => {
this.socket!.on(ClusterAppLoader.VolatileTriggerEvent, async (entity: keyof ED, name: string, ids: string[], cxtStr: string, option: OperateOption) => {
const context = await this.makeContext(cxtStr);
if (process.env.NODE_ENV === 'development') {
console.log(`${getClusterInfo().instanceId}」号实例接收到来自其它进程的volatileTrigger请求 name是「${name}」, ids是「${ids.join(',')}`);
@ -184,11 +184,18 @@ export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extend
return super.execWatcher(watcher);
}
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, context: Cxt) {
protected execBaseTimer(timer: BaseTimer<ED, Cxt>, context: Cxt) {
if (timer.singleton && getClusterInfo().instanceId !== 0) {
return;
}
return super.execFreeTimer(timer, context);
return super.execBaseTimer(timer, context);
}
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, contextBuilder: () => Promise<Cxt>) {
if (timer.singleton && getClusterInfo().instanceId !== 0) {
return;
}
return super.execFreeTimer(timer, contextBuilder);
}
protected async checkpoint() {