191 lines
8.8 KiB
JavaScript
191 lines
8.8 KiB
JavaScript
"use strict";
|
||
Object.defineProperty(exports, "__esModule", { value: true });
|
||
exports.ClusterAppLoader = void 0;
|
||
const tslib_1 = require("tslib");
|
||
const lodash_1 = require("oak-domain/lib/utils/lodash");
|
||
const filter_1 = require("oak-domain/lib/store/filter");
|
||
const types_1 = require("oak-domain/lib/types");
|
||
const env_1 = require("./cluster/env");
|
||
const AppLoader_1 = require("./AppLoader");
|
||
const assert_1 = tslib_1.__importDefault(require("assert"));
|
||
const socket_io_client_1 = require("socket.io-client");
|
||
class ClusterAppLoader extends AppLoader_1.AppLoader {
|
||
commitTriggers;
|
||
static VolatileTriggerEvent = 'vtEvent';
|
||
socket;
|
||
connectServerSocket() {
|
||
const { instanceId } = (0, env_1.getClusterInfo)();
|
||
this.socket.on('connect', () => {
|
||
const csTriggerNames = Object.keys(this.commitTriggers).filter(ele => this.commitTriggers[ele]).map(ele => `${ele}-${instanceId}`);
|
||
if (csTriggerNames.length > 0) {
|
||
this.socket.emit('sub', csTriggerNames);
|
||
}
|
||
});
|
||
this.socket.on(ClusterAppLoader.VolatileTriggerEvent, async (entity, name, ids, cxtStr, option) => {
|
||
const context = await this.makeContext(cxtStr);
|
||
if (process.env.NODE_ENV === 'development') {
|
||
console.log(`「${(0, env_1.getClusterInfo)().instanceId}」号实例接收到来自其它进程的volatileTrigger请求, name是「${name}」, ids是「${ids.join(',')}」`);
|
||
}
|
||
// await context.begin();
|
||
try {
|
||
await this.dbStore.execVolatileTrigger(entity, name, ids, context, option);
|
||
await context.commit();
|
||
}
|
||
catch (err) {
|
||
await context.rollback();
|
||
console.error('在集群环境下,处理来自其它实例的trigger数据,execVolatileTrigger异常', entity, name, ids, option, err);
|
||
}
|
||
});
|
||
}
|
||
sub(name) {
|
||
const { instanceId } = (0, env_1.getClusterInfo)();
|
||
if (this.socket.connected) {
|
||
this.socket.emit('sub', [`${name}-${instanceId}`]);
|
||
}
|
||
else {
|
||
this.socket.connect();
|
||
}
|
||
}
|
||
constructor(path, nsDs, nsSocket, nsServer, socketPath) {
|
||
super(path, nsDs, nsSocket, nsServer);
|
||
this.dbStore.setOnVolatileTrigger(async (entity, trigger, ids, cxtStr, option) => {
|
||
const execLocal = async (ids2) => {
|
||
const context = await this.makeContext(cxtStr);
|
||
// await context.begin();
|
||
try {
|
||
await this.dbStore.execVolatileTrigger(entity, trigger.name, ids2, context, option);
|
||
await context.commit();
|
||
}
|
||
catch (err) {
|
||
if (err instanceof types_1.OakPartialSuccess) {
|
||
await context.commit();
|
||
console.error('execVolatileTrigger异常', entity, trigger.name, ids2, option, err);
|
||
}
|
||
else {
|
||
await context.rollback();
|
||
console.error('execVolatileTrigger异常', entity, trigger.name, ids2, option, err);
|
||
}
|
||
}
|
||
};
|
||
if (trigger.cs) {
|
||
// 如果是cluster sensative的触发器,需要发送到相应的instance上被处理
|
||
const context = await this.makeContext();
|
||
const rows = !!(ids.length) ? await context.select(entity, {
|
||
data: {
|
||
id: 1,
|
||
$$seq$$: 1,
|
||
},
|
||
filter: {
|
||
id: { $in: ids },
|
||
}
|
||
}, { dontCollect: true }) : [];
|
||
await context.commit();
|
||
const { instanceCount, instanceId } = (0, env_1.getClusterInfo)();
|
||
const grouped = (0, lodash_1.groupBy)(rows, (ele) => ele.$$seq$$ % instanceCount);
|
||
for (const seqMod in grouped) {
|
||
const ids2 = grouped[seqMod].map(ele => ele.id);
|
||
if (parseInt(seqMod) === instanceId) {
|
||
await execLocal(ids2);
|
||
}
|
||
else {
|
||
if (process.env.NODE_ENV === 'development') {
|
||
console.log(`在trigger「${trigger.name}」上,因为cs原因,数据「${ids2.join(',')}」将被推送到「${seqMod}」号进程操作(当前进程号是「${instanceId}」)。`);
|
||
}
|
||
this.dataSubscriber.publishServerEvent(`${trigger.name}-${seqMod}`, ClusterAppLoader.VolatileTriggerEvent, entity, trigger.name, ids2, cxtStr, option);
|
||
}
|
||
}
|
||
}
|
||
else if (trigger.singleton) {
|
||
if ((0, env_1.getClusterInfo)().instanceId === 0) {
|
||
await execLocal(ids);
|
||
}
|
||
else if (ids.length) {
|
||
if (process.env.NODE_ENV === 'development') {
|
||
console.log(`在trigger「${trigger.name}」上,因为singleton原因,数据「${ids.join(',')}」将被推送到「0」号进程操作(当前进程号是「${(0, env_1.getClusterInfo)().instanceId}」)`);
|
||
}
|
||
this.dataSubscriber.publishServerEvent(`${trigger.name}-0`, ClusterAppLoader.VolatileTriggerEvent, entity, trigger.name, ids, cxtStr, option);
|
||
}
|
||
}
|
||
else {
|
||
await execLocal(ids);
|
||
}
|
||
});
|
||
this.commitTriggers = {};
|
||
const { name } = nsServer;
|
||
// 本机pm2的socketio连接,在cli中连接到adaptor之后,会被自然推到redis,这边继续保持pm2的socketio连接即可
|
||
const socketUrl = `http://localhost:${process.env.PM2_PORT || 8080}${name}`;
|
||
this.socket = (0, socket_io_client_1.io)(socketUrl, {
|
||
path: socketPath,
|
||
});
|
||
this.connectServerSocket();
|
||
}
|
||
registerTrigger(trigger) {
|
||
// 如果是cluster sensative的trigger,注册到socket事件上
|
||
// 如果是singletone的trigger,只有0号实例注册
|
||
const { when, cs, singleton, name } = trigger;
|
||
(0, assert_1.default)(!(cs && singleton));
|
||
if (when === 'commit') {
|
||
(0, assert_1.default)(!this.commitTriggers[name], `命名为${name}的trigger出现了多次,请检查`);
|
||
this.commitTriggers[name] = !!cs;
|
||
if (cs || singleton && (0, env_1.getClusterInfo)().instanceId === 0) {
|
||
this.sub(name);
|
||
}
|
||
}
|
||
this.dbStore.registerTrigger(trigger);
|
||
}
|
||
operateInWatcher(entity, operation, context, singleton) {
|
||
const { instanceCount, instanceId } = (0, env_1.getClusterInfo)();
|
||
(0, assert_1.default)(instanceCount && typeof instanceId === 'number');
|
||
const { filter } = operation;
|
||
const filter2 = singleton ? filter : (0, filter_1.combineFilters)(entity, this.dbStore.getSchema(), [filter, {
|
||
$$seq$$: {
|
||
$mod: [instanceCount, instanceId]
|
||
}
|
||
}]);
|
||
return super.operateInWatcher(entity, {
|
||
...operation,
|
||
filter: filter2,
|
||
}, context);
|
||
}
|
||
selectInWatcher(entity, selection, context, forUpdate, singleton) {
|
||
const { instanceCount, instanceId } = (0, env_1.getClusterInfo)();
|
||
(0, assert_1.default)(instanceCount && typeof instanceId === 'number');
|
||
const { filter } = selection;
|
||
const filter2 = singleton ? filter : (0, filter_1.combineFilters)(entity, this.dbStore.getSchema(), [filter, {
|
||
$$seq$$: {
|
||
$mod: [instanceCount, instanceId]
|
||
}
|
||
}]);
|
||
return super.selectInWatcher(entity, {
|
||
...selection,
|
||
filter: filter2,
|
||
}, context, forUpdate);
|
||
}
|
||
async execWatcher(watcher) {
|
||
if (watcher.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) {
|
||
return;
|
||
}
|
||
return super.execWatcher(watcher);
|
||
}
|
||
execFreeTimer(timer, context) {
|
||
if (timer.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) {
|
||
return;
|
||
}
|
||
return super.execFreeTimer(timer, context);
|
||
}
|
||
async checkpoint() {
|
||
const { instanceCount, instanceId } = (0, env_1.getClusterInfo)();
|
||
let count = 0;
|
||
for (const name in this.commitTriggers) {
|
||
if (this.commitTriggers[name]) {
|
||
count += await this.dbStore.independentCheckPoint(name, this.getCheckpointTs(), instanceCount, instanceId);
|
||
}
|
||
else {
|
||
count += await this.dbStore.independentCheckPoint(name, this.getCheckpointTs());
|
||
}
|
||
}
|
||
return count;
|
||
}
|
||
}
|
||
exports.ClusterAppLoader = ClusterAppLoader;
|