clusterApploader原来处理commitTriggers的逻辑是错误的,漏掉了非cs类型的

This commit is contained in:
Xu Chang 2025-04-18 16:14:11 +08:00
parent 51c93a9402
commit b60bddd66e
3 changed files with 29 additions and 25 deletions

View File

@ -5,7 +5,7 @@ import { AppLoader } from './AppLoader';
import { Namespace } from 'socket.io';
import { Socket } from 'socket.io-client';
export declare class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends AppLoader<ED, Cxt> {
private csTriggers;
private commitTriggers;
static VolatileTriggerEvent: string;
protected socket: Socket;
private connectServerSocket;

View File

@ -10,13 +10,13 @@ const AppLoader_1 = require("./AppLoader");
const assert_1 = tslib_1.__importDefault(require("assert"));
const socket_io_client_1 = require("socket.io-client");
class ClusterAppLoader extends AppLoader_1.AppLoader {
csTriggers;
commitTriggers;
static VolatileTriggerEvent = 'vtEvent';
socket;
connectServerSocket() {
const { instanceId } = (0, env_1.getClusterInfo)();
this.socket.on('connect', () => {
const csTriggerNames = Object.keys(this.csTriggers).map(ele => `${ele}-${instanceId}`);
const csTriggerNames = Object.keys(this.commitTriggers).filter(ele => this.commitTriggers[ele]).map(ele => `${ele}-${instanceId}`);
if (csTriggerNames.length > 0) {
this.socket.emit('sub', csTriggerNames);
}
@ -37,10 +37,8 @@ class ClusterAppLoader extends AppLoader_1.AppLoader {
}
});
}
sub(name, clusterSensative) {
sub(name) {
const { instanceId } = (0, env_1.getClusterInfo)();
(0, assert_1.default)(!this.csTriggers[name], `命名为${name}的trigger出现了多次请检查`);
this.csTriggers[name] = !!clusterSensative;
if (this.socket.connected) {
this.socket.emit('sub', [`${name}-${instanceId}`]);
}
@ -112,7 +110,7 @@ class ClusterAppLoader extends AppLoader_1.AppLoader {
await execLocal(ids);
}
});
this.csTriggers = {};
this.commitTriggers = {};
const { name } = nsServer;
// 本机pm2的socketio连接在cli中连接到adaptor之后会被自然推到redis这边继续保持pm2的socketio连接即可
const socketUrl = `http://localhost:${process.env.PM2_PORT || 8080}${name}`;
@ -124,11 +122,14 @@ class ClusterAppLoader extends AppLoader_1.AppLoader {
registerTrigger(trigger) {
// 如果是cluster sensative的trigger注册到socket事件上
// 如果是singletone的trigger只有0号实例注册
const { when, cs, singleton } = trigger;
const { when, cs, singleton, name } = trigger;
(0, assert_1.default)(!(cs && singleton));
if (when === 'commit' && (cs || singleton && (0, env_1.getClusterInfo)().instanceId === 0)) {
const { name } = trigger;
this.sub(name, cs);
if (when === 'commit') {
(0, assert_1.default)(!this.commitTriggers[name], `命名为${name}的trigger出现了多次请检查`);
this.commitTriggers[name] = !!cs;
if (cs || singleton && (0, env_1.getClusterInfo)().instanceId === 0) {
this.sub(name);
}
}
this.dbStore.registerTrigger(trigger);
}
@ -175,8 +176,8 @@ class ClusterAppLoader extends AppLoader_1.AppLoader {
async checkpoint() {
const { instanceCount, instanceId } = (0, env_1.getClusterInfo)();
let count = 0;
for (const name in this.csTriggers) {
if (this.csTriggers[name]) {
for (const name in this.commitTriggers) {
if (this.commitTriggers[name]) {
count += await this.dbStore.independentCheckPoint(name, this.getCheckpointTs(), instanceCount, instanceId);
}
else {

View File

@ -11,14 +11,16 @@ import { Namespace } from 'socket.io';
import { io, Socket } from 'socket.io-client';
export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends AppLoader<ED, Cxt> {
private csTriggers: Record<string, boolean>;
private commitTriggers: Record<string, boolean>;
static VolatileTriggerEvent = 'vtEvent';
protected socket: Socket;
private connectServerSocket() {
const { instanceId } = getClusterInfo()!;
this.socket.on('connect', () => {
const csTriggerNames = Object.keys(this.csTriggers).map(
const csTriggerNames = Object.keys(this.commitTriggers).filter(
ele => this.commitTriggers[ele]
).map(
ele => `${ele}-${instanceId}`
);
if (csTriggerNames.length > 0) {
@ -42,10 +44,8 @@ export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extend
});
}
private sub(name: string, clusterSensative?: true) {
private sub(name: string) {
const { instanceId } = getClusterInfo()!;
assert(!this.csTriggers[name], `命名为${name}的trigger出现了多次请检查`);
this.csTriggers[name] = !!clusterSensative;
if (this.socket!.connected) {
this.socket!.emit('sub', [`${name}-${instanceId}`]);
}
@ -121,7 +121,7 @@ export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extend
}
}
);
this.csTriggers = {};
this.commitTriggers = {};
const { name } = nsServer;
// 本机pm2的socketio连接在cli中连接到adaptor之后会被自然推到redis这边继续保持pm2的socketio连接即可
const socketUrl = `http://localhost:${process.env.PM2_PORT || 8080}${name}`;
@ -134,11 +134,14 @@ export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extend
protected registerTrigger(trigger: Trigger<ED, keyof ED, Cxt>): void {
// 如果是cluster sensative的trigger注册到socket事件上
// 如果是singletone的trigger只有0号实例注册
const { when, cs, singleton } = trigger as VolatileTrigger<ED, keyof ED, Cxt>;
const { when, cs, singleton, name } = trigger as VolatileTrigger<ED, keyof ED, Cxt>;
assert(!(cs && singleton));
if (when === 'commit' && (cs || singleton && getClusterInfo().instanceId === 0)) {
const { name } = trigger;
this.sub(name, cs);
if (when === 'commit') {
assert(!this.commitTriggers[name], `命名为${name}的trigger出现了多次请检查`);
this.commitTriggers[name] = !!cs;
if (cs || singleton && getClusterInfo().instanceId === 0) {
this.sub(name);
}
}
this.dbStore.registerTrigger(trigger);
}
@ -190,8 +193,8 @@ export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extend
protected async checkpoint() {
const { instanceCount, instanceId } = getClusterInfo();
let count = 0;
for (const name in this.csTriggers) {
if (this.csTriggers[name]) {
for (const name in this.commitTriggers) {
if (this.commitTriggers[name]) {
count += await this.dbStore.independentCheckPoint(name, this.getCheckpointTs(), instanceCount, instanceId);
}
else {