diff --git a/lib/ClusterAppLoader.d.ts b/lib/ClusterAppLoader.d.ts index ab67221..12a7b0c 100644 --- a/lib/ClusterAppLoader.d.ts +++ b/lib/ClusterAppLoader.d.ts @@ -5,7 +5,7 @@ import { AppLoader } from './AppLoader'; import { Namespace } from 'socket.io'; import { Socket } from 'socket.io-client'; export declare class ClusterAppLoader> extends AppLoader { - private csTriggers; + private commitTriggers; static VolatileTriggerEvent: string; protected socket: Socket; private connectServerSocket; diff --git a/lib/ClusterAppLoader.js b/lib/ClusterAppLoader.js index 378f353..bbb1f62 100644 --- a/lib/ClusterAppLoader.js +++ b/lib/ClusterAppLoader.js @@ -10,13 +10,13 @@ const AppLoader_1 = require("./AppLoader"); const assert_1 = tslib_1.__importDefault(require("assert")); const socket_io_client_1 = require("socket.io-client"); class ClusterAppLoader extends AppLoader_1.AppLoader { - csTriggers; + commitTriggers; static VolatileTriggerEvent = 'vtEvent'; socket; connectServerSocket() { const { instanceId } = (0, env_1.getClusterInfo)(); this.socket.on('connect', () => { - const csTriggerNames = Object.keys(this.csTriggers).map(ele => `${ele}-${instanceId}`); + const csTriggerNames = Object.keys(this.commitTriggers).filter(ele => this.commitTriggers[ele]).map(ele => `${ele}-${instanceId}`); if (csTriggerNames.length > 0) { this.socket.emit('sub', csTriggerNames); } @@ -37,10 +37,8 @@ class ClusterAppLoader extends AppLoader_1.AppLoader { } }); } - sub(name, clusterSensative) { + sub(name) { const { instanceId } = (0, env_1.getClusterInfo)(); - (0, assert_1.default)(!this.csTriggers[name], `命名为${name}的trigger出现了多次,请检查`); - this.csTriggers[name] = !!clusterSensative; if (this.socket.connected) { this.socket.emit('sub', [`${name}-${instanceId}`]); } @@ -112,7 +110,7 @@ class ClusterAppLoader extends AppLoader_1.AppLoader { await execLocal(ids); } }); - this.csTriggers = {}; + this.commitTriggers = {}; const { name } = nsServer; // 本机pm2的socketio连接,在cli中连接到adaptor之后,会被自然推到redis,这边继续保持pm2的socketio连接即可 const socketUrl = `http://localhost:${process.env.PM2_PORT || 8080}${name}`; @@ -124,11 +122,14 @@ class ClusterAppLoader extends AppLoader_1.AppLoader { registerTrigger(trigger) { // 如果是cluster sensative的trigger,注册到socket事件上 // 如果是singletone的trigger,只有0号实例注册 - const { when, cs, singleton } = trigger; + const { when, cs, singleton, name } = trigger; (0, assert_1.default)(!(cs && singleton)); - if (when === 'commit' && (cs || singleton && (0, env_1.getClusterInfo)().instanceId === 0)) { - const { name } = trigger; - this.sub(name, cs); + if (when === 'commit') { + (0, assert_1.default)(!this.commitTriggers[name], `命名为${name}的trigger出现了多次,请检查`); + this.commitTriggers[name] = !!cs; + if (cs || singleton && (0, env_1.getClusterInfo)().instanceId === 0) { + this.sub(name); + } } this.dbStore.registerTrigger(trigger); } @@ -175,8 +176,8 @@ class ClusterAppLoader extends AppLoader_1.AppLoader { async checkpoint() { const { instanceCount, instanceId } = (0, env_1.getClusterInfo)(); let count = 0; - for (const name in this.csTriggers) { - if (this.csTriggers[name]) { + for (const name in this.commitTriggers) { + if (this.commitTriggers[name]) { count += await this.dbStore.independentCheckPoint(name, this.getCheckpointTs(), instanceCount, instanceId); } else { diff --git a/src/ClusterAppLoader.ts b/src/ClusterAppLoader.ts index 0766a6a..4cd427f 100644 --- a/src/ClusterAppLoader.ts +++ b/src/ClusterAppLoader.ts @@ -11,14 +11,16 @@ import { Namespace } from 'socket.io'; import { io, Socket } from 'socket.io-client'; export class ClusterAppLoader> extends AppLoader { - private csTriggers: Record; + private commitTriggers: Record; static VolatileTriggerEvent = 'vtEvent'; protected socket: Socket; private connectServerSocket() { const { instanceId } = getClusterInfo()!; this.socket.on('connect', () => { - const csTriggerNames = Object.keys(this.csTriggers).map( + const csTriggerNames = Object.keys(this.commitTriggers).filter( + ele => this.commitTriggers[ele] + ).map( ele => `${ele}-${instanceId}` ); if (csTriggerNames.length > 0) { @@ -42,10 +44,8 @@ export class ClusterAppLoader): void { // 如果是cluster sensative的trigger,注册到socket事件上 // 如果是singletone的trigger,只有0号实例注册 - const { when, cs, singleton } = trigger as VolatileTrigger; + const { when, cs, singleton, name } = trigger as VolatileTrigger; assert(!(cs && singleton)); - if (when === 'commit' && (cs || singleton && getClusterInfo().instanceId === 0)) { - const { name } = trigger; - this.sub(name, cs); + if (when === 'commit') { + assert(!this.commitTriggers[name], `命名为${name}的trigger出现了多次,请检查`); + this.commitTriggers[name] = !!cs; + if (cs || singleton && getClusterInfo().instanceId === 0) { + this.sub(name); + } } this.dbStore.registerTrigger(trigger); } @@ -190,8 +193,8 @@ export class ClusterAppLoader