oak-backend-base/lib/ClusterAppLoader.js

192 lines
8.8 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.ClusterAppLoader = void 0;
const tslib_1 = require("tslib");
const lodash_1 = require("oak-domain/lib/utils/lodash");
const filter_1 = require("oak-domain/lib/store/filter");
const types_1 = require("oak-domain/lib/types");
const env_1 = require("./cluster/env");
const AppLoader_1 = require("./AppLoader");
const assert_1 = tslib_1.__importDefault(require("assert"));
const socket_io_client_1 = require("socket.io-client");
class ClusterAppLoader extends AppLoader_1.AppLoader {
commitTriggers;
static VolatileTriggerEvent = 'vtEvent';
socket;
connectServerSocket() {
const { instanceId } = (0, env_1.getClusterInfo)();
this.socket.on('connect', () => {
const csTriggerNames = Object.keys(this.commitTriggers).filter(ele => this.commitTriggers[ele]).map(ele => `${ele}-${instanceId}`);
if (csTriggerNames.length > 0) {
this.socket.emit('sub', csTriggerNames);
}
});
this.socket.on(ClusterAppLoader.VolatileTriggerEvent, async (entity, name, ids, cxtStr, option) => {
const context = await this.makeContext(cxtStr);
if (process.env.NODE_ENV === 'development') {
console.log(`${(0, env_1.getClusterInfo)().instanceId}」号实例接收到来自其它进程的volatileTrigger请求 name是「${name}」, ids是「${ids.join(',')}`);
}
// await context.begin();
try {
await this.dbStore.execVolatileTrigger(entity, name, ids, context, option);
await context.commit();
}
catch (err) {
await context.rollback();
console.error('在集群环境下处理来自其它实例的trigger数据execVolatileTrigger异常', entity, name, ids, option, err);
}
});
}
sub(name) {
const { instanceId } = (0, env_1.getClusterInfo)();
if (this.socket.connected) {
this.socket.emit('sub', [`${name}-${instanceId}`]);
}
else {
this.socket.connect();
}
}
constructor(path, nsDs, nsSocket, nsServer, socketPath) {
super(path, nsDs, nsSocket, nsServer);
this.dbStore.setOnVolatileTrigger(async (entity, trigger, ids, cxtStr, option) => {
const execLocal = async (ids2) => {
const context = await this.makeContext(cxtStr);
// await context.begin();
try {
await this.dbStore.execVolatileTrigger(entity, trigger.name, ids2, context, option);
await context.commit();
}
catch (err) {
if (err instanceof types_1.OakPartialSuccess) {
await context.commit();
console.error('execVolatileTrigger异常', entity, trigger.name, ids2, option, err);
}
else {
await context.rollback();
console.error('execVolatileTrigger异常', entity, trigger.name, ids2, option, err);
}
}
};
if (trigger.cs) {
// 如果是cluster sensative的触发器需要发送到相应的instance上被处理
const context = await this.makeContext();
const rows = !!(ids.length) ? await context.select(entity, {
data: {
id: 1,
$$seq$$: 1,
},
filter: {
id: { $in: ids },
}
}, { dontCollect: true }) : [];
await context.commit();
const { instanceCount, instanceId } = (0, env_1.getClusterInfo)();
const grouped = (0, lodash_1.groupBy)(rows, (ele) => ele.$$seq$$ % instanceCount);
for (const seqMod in grouped) {
const ids2 = grouped[seqMod].map(ele => ele.id);
if (parseInt(seqMod) === instanceId) {
await execLocal(ids2);
}
else {
if (process.env.NODE_ENV === 'development') {
console.log(`在trigger「${trigger.name}」上因为cs原因数据「${ids2.join(',')}」将被推送到「${seqMod}」号进程操作(当前进程号是「${instanceId}」)。`);
}
this.dataSubscriber.publishServerEvent(`${trigger.name}-${seqMod}`, ClusterAppLoader.VolatileTriggerEvent, entity, trigger.name, ids2, cxtStr, option);
}
}
}
else if (trigger.singleton) {
if ((0, env_1.getClusterInfo)().instanceId === 0) {
await execLocal(ids);
}
else if (ids.length) {
if (process.env.NODE_ENV === 'development') {
console.log(`在trigger「${trigger.name}」上因为singleton原因数据「${ids.join(',')}」将被推送到「0」号进程操作当前进程号是「${(0, env_1.getClusterInfo)().instanceId}」)`);
}
this.dataSubscriber.publishServerEvent(`${trigger.name}-0`, ClusterAppLoader.VolatileTriggerEvent, entity, trigger.name, ids, cxtStr, option);
}
}
else {
await execLocal(ids);
}
});
this.commitTriggers = {};
const { name } = nsServer;
// 本机pm2的socketio连接在cli中连接到adaptor之后会被自然推到redis这边继续保持pm2的socketio连接即可
const socketUrl = `http://localhost:${process.env.PM2_PORT || 8080}${name}`;
this.socket = (0, socket_io_client_1.io)(socketUrl, {
path: socketPath,
});
this.connectServerSocket();
}
registerTrigger(trigger) {
// 如果是cluster sensative的trigger注册到socket事件上
// 如果是singletone的trigger只有0号实例注册
const { when, cs, singleton, name } = trigger;
(0, assert_1.default)(!(cs && singleton));
if (when === 'commit') {
(0, assert_1.default)(!this.commitTriggers[name], `命名为${name}的trigger出现了多次请检查`);
const needSub = !!(cs || singleton && (0, env_1.getClusterInfo)().instanceId === 0);
this.commitTriggers[name] = needSub;
if (needSub) {
this.sub(name);
}
}
this.dbStore.registerTrigger(trigger);
}
operateInWatcher(entity, operation, context, singleton) {
const { instanceCount, instanceId } = (0, env_1.getClusterInfo)();
(0, assert_1.default)(instanceCount && typeof instanceId === 'number');
const { filter } = operation;
const filter2 = singleton ? filter : (0, filter_1.combineFilters)(entity, this.dbStore.getSchema(), [filter, {
$$seq$$: {
$mod: [instanceCount, instanceId]
}
}]);
return super.operateInWatcher(entity, {
...operation,
filter: filter2,
}, context);
}
selectInWatcher(entity, selection, context, forUpdate, singleton) {
const { instanceCount, instanceId } = (0, env_1.getClusterInfo)();
(0, assert_1.default)(instanceCount && typeof instanceId === 'number');
const { filter } = selection;
const filter2 = singleton ? filter : (0, filter_1.combineFilters)(entity, this.dbStore.getSchema(), [filter, {
$$seq$$: {
$mod: [instanceCount, instanceId]
}
}]);
return super.selectInWatcher(entity, {
...selection,
filter: filter2,
}, context, forUpdate);
}
async execWatcher(watcher) {
if (watcher.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) {
return;
}
return super.execWatcher(watcher);
}
execFreeTimer(timer, context) {
if (timer.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) {
return;
}
return super.execFreeTimer(timer, context);
}
async checkpoint() {
const { instanceCount, instanceId } = (0, env_1.getClusterInfo)();
let count = 0;
for (const name in this.commitTriggers) {
if (this.commitTriggers[name]) {
count += await this.dbStore.independentCheckPoint(name, this.getCheckpointTs(), instanceCount, instanceId);
}
else {
count += await this.dbStore.independentCheckPoint(name, this.getCheckpointTs());
}
}
return count;
}
}
exports.ClusterAppLoader = ClusterAppLoader;