"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.ClusterAppLoader = void 0; const tslib_1 = require("tslib"); const lodash_1 = require("oak-domain/lib/utils/lodash"); const filter_1 = require("oak-domain/lib/store/filter"); const types_1 = require("oak-domain/lib/types"); const env_1 = require("./cluster/env"); const AppLoader_1 = require("./AppLoader"); const assert_1 = tslib_1.__importDefault(require("assert")); const socket_io_client_1 = require("socket.io-client"); class ClusterAppLoader extends AppLoader_1.AppLoader { commitTriggers; static VolatileTriggerEvent = 'vtEvent'; socket; connectServerSocket() { const { instanceId } = (0, env_1.getClusterInfo)(); this.socket.on('connect', () => { const csTriggerNames = Object.keys(this.commitTriggers).filter(ele => this.commitTriggers[ele]).map(ele => `${ele}-${instanceId}`); if (csTriggerNames.length > 0) { this.socket.emit('sub', csTriggerNames); } }); this.socket.on(ClusterAppLoader.VolatileTriggerEvent, async (entity, name, ids, cxtStr, option) => { const context = await this.makeContext(cxtStr); if (process.env.NODE_ENV === 'development') { console.log(`「${(0, env_1.getClusterInfo)().instanceId}」号实例接收到来自其它进程的volatileTrigger请求, name是「${name}」, ids是「${ids.join(',')}」`); } // await context.begin(); try { await this.dbStore.execVolatileTrigger(entity, name, ids, context, option); await context.commit(); } catch (err) { await context.rollback(); console.error('在集群环境下,处理来自其它实例的trigger数据,execVolatileTrigger异常', entity, name, ids, option, err); } }); } sub(name) { const { instanceId } = (0, env_1.getClusterInfo)(); if (this.socket.connected) { this.socket.emit('sub', [`${name}-${instanceId}`]); } else { this.socket.connect(); } } constructor(path, nsDs, nsSocket, nsServer, socketPath) { super(path, nsDs, nsSocket, nsServer); this.dbStore.setOnVolatileTrigger(async (entity, trigger, ids, cxtStr, option) => { const execLocal = async (ids2) => { const context = await this.makeContext(cxtStr); // await context.begin(); try { await this.dbStore.execVolatileTrigger(entity, trigger.name, ids2, context, option); await context.commit(); } catch (err) { if (err instanceof types_1.OakPartialSuccess) { await context.commit(); console.error('execVolatileTrigger异常', entity, trigger.name, ids2, option, err); } else { await context.rollback(); console.error('execVolatileTrigger异常', entity, trigger.name, ids2, option, err); } } }; if (trigger.cs) { // 如果是cluster sensative的触发器,需要发送到相应的instance上被处理 const context = await this.makeContext(); const rows = !!(ids.length) ? await context.select(entity, { data: { id: 1, $$seq$$: 1, }, filter: { id: { $in: ids }, } }, { dontCollect: true }) : []; await context.commit(); const { instanceCount, instanceId } = (0, env_1.getClusterInfo)(); const grouped = (0, lodash_1.groupBy)(rows, (ele) => ele.$$seq$$ % instanceCount); for (const seqMod in grouped) { const ids2 = grouped[seqMod].map(ele => ele.id); if (parseInt(seqMod) === instanceId) { await execLocal(ids2); } else { if (process.env.NODE_ENV === 'development') { console.log(`在trigger「${trigger.name}」上,因为cs原因,数据「${ids2.join(',')}」将被推送到「${seqMod}」号进程操作(当前进程号是「${instanceId}」)。`); } this.dataSubscriber.publishServerEvent(`${trigger.name}-${seqMod}`, ClusterAppLoader.VolatileTriggerEvent, entity, trigger.name, ids2, cxtStr, option); } } } else if (trigger.singleton) { if ((0, env_1.getClusterInfo)().instanceId === 0) { await execLocal(ids); } else if (ids.length) { if (process.env.NODE_ENV === 'development') { console.log(`在trigger「${trigger.name}」上,因为singleton原因,数据「${ids.join(',')}」将被推送到「0」号进程操作(当前进程号是「${(0, env_1.getClusterInfo)().instanceId}」)`); } this.dataSubscriber.publishServerEvent(`${trigger.name}-0`, ClusterAppLoader.VolatileTriggerEvent, entity, trigger.name, ids, cxtStr, option); } } else { await execLocal(ids); } }); this.commitTriggers = {}; const { name } = nsServer; // 本机pm2的socketio连接,在cli中连接到adaptor之后,会被自然推到redis,这边继续保持pm2的socketio连接即可 const socketUrl = `http://localhost:${process.env.PM2_PORT || 8080}${name}`; this.socket = (0, socket_io_client_1.io)(socketUrl, { path: socketPath, }); this.connectServerSocket(); } registerTrigger(trigger) { // 如果是cluster sensative的trigger,注册到socket事件上 // 如果是singletone的trigger,只有0号实例注册 const { when, cs, singleton, name } = trigger; (0, assert_1.default)(!(cs && singleton)); if (when === 'commit') { (0, assert_1.default)(!this.commitTriggers[name], `命名为${name}的trigger出现了多次,请检查`); const needSub = !!(cs || singleton && (0, env_1.getClusterInfo)().instanceId === 0); this.commitTriggers[name] = needSub; if (needSub) { this.sub(name); } } this.dbStore.registerTrigger(trigger); } operateInWatcher(entity, operation, context, singleton) { const { instanceCount, instanceId } = (0, env_1.getClusterInfo)(); (0, assert_1.default)(instanceCount && typeof instanceId === 'number'); const { filter } = operation; const filter2 = singleton ? filter : (0, filter_1.combineFilters)(entity, this.dbStore.getSchema(), [filter, { $$seq$$: { $mod: [instanceCount, instanceId] } }]); return super.operateInWatcher(entity, { ...operation, filter: filter2, }, context); } selectInWatcher(entity, selection, context, forUpdate, singleton) { const { instanceCount, instanceId } = (0, env_1.getClusterInfo)(); (0, assert_1.default)(instanceCount && typeof instanceId === 'number'); const { filter } = selection; const filter2 = singleton ? filter : (0, filter_1.combineFilters)(entity, this.dbStore.getSchema(), [filter, { $$seq$$: { $mod: [instanceCount, instanceId] } }]); return super.selectInWatcher(entity, { ...selection, filter: filter2, }, context, forUpdate); } async execWatcher(watcher) { if (watcher.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) { return; } return super.execWatcher(watcher); } execBaseTimer(timer, context) { if (timer.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) { return; } return super.execBaseTimer(timer, context); } execFreeTimer(timer, contextBuilder) { if (timer.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) { return; } return super.execFreeTimer(timer, contextBuilder); } async checkpoint() { const { instanceCount, instanceId } = (0, env_1.getClusterInfo)(); let count = 0; for (const name in this.commitTriggers) { if (this.commitTriggers[name]) { count += await this.dbStore.independentCheckPoint(name, this.getCheckpointTs(), instanceCount, instanceId); } else { count += await this.dbStore.independentCheckPoint(name, this.getCheckpointTs()); } } return count; } } exports.ClusterAppLoader = ClusterAppLoader;