重新设计实现了cluster的配置和DataSubscribe

This commit is contained in:
Xu Chang 2023-12-08 11:59:31 +08:00
parent ae3a32c8e4
commit ac26505aba
7 changed files with 116 additions and 292 deletions

View File

@ -101,8 +101,14 @@ class AppLoader extends types_1.AppLoader {
// 注入在提交前向dataSubscribe
const originCommit = context.commit;
context.commit = async () => {
this.dataSubscriber.onDataCommited(context);
const { eventOperationMap, opRecords } = context;
await originCommit.call(context);
Object.keys(eventOperationMap).forEach((event) => {
const ids = eventOperationMap[event];
const opRecordsToPublish = opRecords.filter((ele) => !!ele.id && ids.includes(ele.id));
(0, assert_1.default)(opRecordsToPublish.length === ids.length, '要推送的事件的operation数量不足请检查确保');
this.dataSubscriber.publishEvent(event, opRecordsToPublish);
});
};
return context;
};
@ -209,7 +215,7 @@ class AppLoader extends types_1.AppLoader {
await context.commit();
await context.refineOpRecords();
return {
opRecords: context.opRecords,
opRecords: context.opRecords.map(ele => (0, lodash_1.omit)(ele, 'id')),
message: context.getMessage(),
result,
};

View File

@ -1,18 +1,21 @@
import { EntityDict } from 'oak-domain/lib/types';
import { EntityDict, OpRecord } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { BackendRuntimeContext } from 'oak-frontend-base';
import { Namespace } from 'socket.io';
/**
*
* socket.io通过adapter在集群间通信时pm2 + cluster-adapteradpater启用时需要再测一次
* 1client连接到node1并join room1时node1上会有create room事件room结构本身在结点间并不共享
* 2node执行 .adapter.to('room1').emit()client均能收到消息使room可以实现跨结点推包
* 3) serverSideEmit执行时如果有callbackcallback的话
*/
export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Context extends BackendRuntimeContext<ED>> {
private ns;
private contextBuilder;
private filterMap;
private idEntityMap;
constructor(ns: Namespace, contextBuilder: (scene?: string) => Promise<Context>);
private formCreateRoomRoutine;
/**
* socket连接
*/
private startup;
private sendRecord;
onDataCommited(context: Context): void;
publishEvent(event: string, records: OpRecord<ED>[], sid?: string): void;
}

View File

@ -1,41 +1,20 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const lodash_1 = require("oak-domain/lib/utils/lodash");
const oak_domain_1 = require("oak-domain");
const env_1 = require("./env");
/**
* 集群行为备忘
* 当socket.io通过adapter在集群间通信时测试行为如下测试环境为pm2 + cluster-adapter其它adpater启用时需要再测一次
* 1当client连接到node1并join room1时只有node1上会有create room事件room结构本身在结点间并不共享
* 2当某一个node执行 .adapter.to('room1').emit()连接到任一结点的client均能收到消息但使用room可以实现跨结点推包
* 3) serverSideEmit执行时如果有callback而不是所有的接收者都执行callback的话会抛出一个异常意味着不需要本结点来判定是否收到全部的返回值了
*/
class DataSubscriber {
ns;
contextBuilder;
filterMap;
idEntityMap;
constructor(ns, contextBuilder) {
this.ns = ns;
this.contextBuilder = contextBuilder;
this.startup();
this.filterMap = {};
this.idEntityMap = {};
}
formCreateRoomRoutine(def) {
const { id, entity, filter } = def;
return (room) => {
if (room === id) {
console.log('instance:', process.env.NODE_APP_INSTANCE, 'add filter', room);
// 本房间不存在说明这个filter是新出现的
if (this.filterMap[entity]) {
// id的唯一性由前台保证重复则无视
Object.assign(this.filterMap[entity], {
[id]: filter,
});
}
else {
Object.assign(this.filterMap, {
[entity]: {
[id]: filter,
}
});
}
this.idEntityMap[id] = entity;
}
};
}
/**
* 来自外部的socket连接监听数据变化
@ -43,39 +22,14 @@ class DataSubscriber {
startup() {
this.ns.on('connection', async (socket) => {
try {
const { 'oak-cxt': cxtStr } = socket.handshake.headers;
const context = await this.contextBuilder(cxtStr);
socket.userId = context.getCurrentUserId();
socket.context = context;
socket.idMap = {};
socket.on('sub', async (data) => {
try {
console.log('instance:', process.env.NODE_APP_INSTANCE, 'on sub', JSON.stringify(data));
await Promise.all(data.map(async (ele) => {
const { id, entity, filter } = ele;
// 尝试select此filter如果失败说明权限越界
await context.select(entity, {
data: {
id: 1,
},
filter,
}, {});
}));
}
catch (err) {
socket.emit('error', err.toString());
return;
}
data.forEach((ele) => {
const createRoomRoutine = this.formCreateRoomRoutine(ele);
this.ns.adapter.on('create-room', createRoomRoutine);
socket.join(ele.id);
this.ns.adapter.off('create-room', createRoomRoutine);
});
const { instanceId } = (0, env_1.getClusterInfo)();
console.log('on connection', instanceId);
socket.on('sub', async (events) => {
events.forEach((event) => socket.join(event));
});
socket.on('unsub', (ids) => {
socket.on('unsub', (events) => {
// console.log('instance:', process.env.NODE_APP_INSTANCE, 'on unsub', JSON.stringify(ids));
ids.forEach((id) => {
events.forEach((id) => {
socket.leave(id);
});
});
@ -84,75 +38,16 @@ class DataSubscriber {
socket.emit('error', err.toString());
}
});
this.ns.adapter.on('delete-room', (room) => {
const entity = this.idEntityMap[room];
if (entity) {
// console.log('instance:', process.env.NODE_APP_INSTANCE, 'remove filter', room);
(0, lodash_1.unset)(this.filterMap[entity], room);
(0, lodash_1.unset)(this.idEntityMap, room);
}
});
this.ns.on('sendRecord', (entity, filter, record, isCreate) => {
console.log('instance:', process.env.NODE_APP_INSTANCE, 'get record from another', JSON.stringify(entity));
});
}
sendRecord(entity, filter, record, sid, isCreate) {
if (entity === 'spContractApplyment') {
console.log('instance:', process.env.NODE_APP_INSTANCE, 'sendRecord', JSON.stringify(entity));
publishEvent(event, records, sid) {
const { instanceId } = (0, env_1.getClusterInfo)();
console.log('publishEvent', instanceId);
if (sid) {
this.ns.to(event).except(sid).emit('data', records);
}
this.ns.serverSideEmit('sendRecord', entity, filter, record, isCreate);
if (this.filterMap[entity]) {
Object.keys(this.filterMap[entity]).forEach(async (room) => {
const context = await this.contextBuilder();
const filter2 = this.filterMap[entity][room];
let needSend = false;
if (isCreate) {
// 如果是插入数据肯定是单行,使用相容性检测
const contained = await (0, oak_domain_1.checkFilterContains)(entity, context, filter2, filter, true);
needSend = contained;
}
else {
const repeled = await (0, oak_domain_1.checkFilterRepel)(entity, context, filter, filter2, true);
needSend = !repeled;
}
if (needSend) {
// console.log('instance:', process.env.NODE_APP_INSTANCE, 'needSend', JSON.stringify(room));
if (sid) {
this.ns.to(room).except(sid).emit('data', [record], [room]);
}
else {
this.ns.to(room).emit('data', [record], [room]);
}
}
});
else {
this.ns.to(event).emit('data', records);
}
}
onDataCommited(context) {
const sid = context.getSubscriberId();
const { opRecords } = context;
opRecords.forEach((record) => {
const { a } = record;
switch (a) {
case 'c': {
const { e, d } = record;
this.sendRecord(e, d, record, sid, true);
break;
}
case 'u': {
const { e, d, f } = record;
this.sendRecord(e, f, record, sid);
break;
}
case 'r': {
const { e, f } = record;
this.sendRecord(e, f, record, sid);
break;
}
default: {
break;
}
}
});
}
}
exports.default = DataSubscriber;

View File

@ -14,13 +14,27 @@ function getProcessEnvOption(option) {
return process.env[upperCase];
}
}
// 初始化判定集群状态目前支持pm2的集群信息
// 初始化判定集群状态,需要在环境变量中注入两个值
/** pm2注入方法https://pm2.fenxianglu.cn/docs/general/environment-variables
* apps: [
{
name: 'xxx',
script: "xxxjs",
instances: "2",
increment_var: "OAK_INSTANCE_ID",
env: {
OAK_INSTANCE_CNT: 9,
OAK_INSTANCE_ID: 8,
}
},
],
**/
function initialize() {
const pmId = getProcessEnvOption('NODE_APP_INSTANCE');
if (pmId) {
const instanceIdStr = getProcessEnvOption('OAK_INSTANCE_ID');
if (instanceIdStr) {
const usingCluster = true;
const instanceId = parseInt(pmId);
const instanceCount = parseInt(getProcessEnvOption('instances'));
const instanceId = parseInt(instanceIdStr);
const instanceCount = parseInt(getProcessEnvOption('OAK_INSTANCE_CNT'));
return {
usingCluster,
instanceCount,

View File

@ -3,10 +3,10 @@ import { join } from 'path';
import { scheduleJob } from 'node-schedule';
import { OAK_EXTERNAL_LIBS_FILEPATH } from 'oak-domain/lib/compiler/env';
import { makeIntrinsicCTWs } from "oak-domain/lib/store/actionDef";
import { intersection } from 'oak-domain/lib/utils/lodash';
import { intersection, omit } from 'oak-domain/lib/utils/lodash';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { generateNewIdAsync } from 'oak-domain/lib/utils/uuid';
import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, RowStore, Context, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord } from "oak-domain/lib/types";
import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, CreateOpResult, Context, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord } from "oak-domain/lib/types";
import { DbStore } from "./DbStore";
import generalAspectDict, { clearPorts, registerPorts } from 'oak-common-aspect/lib/index';
import { MySQLConfiguration } from 'oak-db/lib/MySQL/types/Configuration';
@ -126,8 +126,20 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
// 注入在提交前向dataSubscribe
const originCommit = context.commit;
context.commit = async () => {
this.dataSubscriber!.onDataCommited(context);
const { eventOperationMap, opRecords } = context;
await originCommit.call(context);
Object.keys(eventOperationMap).forEach(
(event) => {
const ids = eventOperationMap[event];
const opRecordsToPublish = (opRecords as CreateOpResult<ED, keyof ED>[]).filter(
(ele) => !!ele.id && ids.includes(ele.id)
);
assert(opRecordsToPublish.length === ids.length, '要推送的事件的operation数量不足请检查确保');
this.dataSubscriber!.publishEvent(event, opRecordsToPublish);
}
)
};
return context;
@ -259,7 +271,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
await context.commit();
await context.refineOpRecords();
return {
opRecords: context.opRecords,
opRecords: (context.opRecords as CreateOpResult<ED, keyof ED>[]).map(ele => omit(ele, 'id')),
message: context.getMessage(),
result,
};

View File

@ -1,50 +1,25 @@
import assert from 'assert';
import { unset } from 'oak-domain/lib/utils/lodash';
import { EntityDict, SubDataDef, OpRecord, CreateOpResult, UpdateOpResult, RemoveOpResult } from 'oak-domain/lib/types';
import { EntityDict, OpRecord } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { BackendRuntimeContext } from 'oak-frontend-base';
import { Namespace } from 'socket.io';
import { checkFilterContains, checkFilterRepel } from 'oak-domain';
import { getClusterInfo } from './env';
/**
*
* socket.io通过adapter在集群间通信时pm2 + cluster-adapteradpater启用时需要再测一次
* 1client连接到node1并join room1时node1上会有create room事件room结构本身在结点间并不共享
* 2node执行 .adapter.to('room1').emit()client均能收到消息使room可以实现跨结点推包
* 3) serverSideEmit执行时如果有callbackcallback的话
*/
export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Context extends BackendRuntimeContext<ED>> {
private ns: Namespace;
private contextBuilder: (scene?: string) => Promise<Context>;
private filterMap: {
[k in keyof ED]?: Record<string, ED[keyof ED]['Selection']['filter']>;
}
private idEntityMap: Record<string, keyof ED>;
constructor(ns: Namespace, contextBuilder: (scene?: string) => Promise<Context>) {
this.ns = ns;
this.contextBuilder = contextBuilder;
this.startup();
this.filterMap = {};
this.idEntityMap = {};
}
private formCreateRoomRoutine(def: SubDataDef<ED, keyof ED>) {
const { id, entity, filter } = def;
return (room: string) => {
if (room === id) {
console.log('instance:', process.env.NODE_APP_INSTANCE, 'add filter', room);
// 本房间不存在说明这个filter是新出现的
if (this.filterMap[entity]) {
// id的唯一性由前台保证重复则无视
Object.assign(this.filterMap[entity]!, {
[id]: filter,
});
}
else {
Object.assign(this.filterMap, {
[entity]: {
[id]: filter,
}
});
}
this.idEntityMap[id] = entity;
}
};
}
/**
@ -53,47 +28,17 @@ export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Cont
private startup() {
this.ns.on('connection', async (socket) => {
try {
const { 'oak-cxt': cxtStr } = socket.handshake.headers;
const context = await this.contextBuilder(cxtStr as string);
(socket as any).userId = context.getCurrentUserId();
(socket as any).context = context;
(socket as any).idMap = {};
socket.on('sub', async (data: SubDataDef<ED, keyof ED>[]) => {
try {
console.log('instance:', process.env.NODE_APP_INSTANCE, 'on sub', JSON.stringify(data));
await Promise.all(
data.map(
async (ele) => {
const { id, entity, filter } = ele;
// 尝试select此filter如果失败说明权限越界
await context.select(entity, {
data: {
id: 1,
},
filter,
}, {});
}
)
);
}
catch (err: any) {
socket.emit('error', err.toString());
return;
}
data.forEach(
(ele) => {
const createRoomRoutine = this.formCreateRoomRoutine(ele);
this.ns.adapter.on('create-room', createRoomRoutine);
socket.join(ele.id);
this.ns.adapter.off('create-room', createRoomRoutine);
}
const { instanceId } = getClusterInfo();
console.log('on connection', instanceId);
socket.on('sub', async (events: string[]) => {
events.forEach(
(event) => socket.join(event)
);
});
socket.on('unsub', (ids: string[]) => {
socket.on('unsub', (events: string[]) => {
// console.log('instance:', process.env.NODE_APP_INSTANCE, 'on unsub', JSON.stringify(ids));
ids.forEach(
events.forEach(
(id) => {
socket.leave(id);
}
@ -104,82 +49,16 @@ export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Cont
socket.emit('error', err.toString());
}
});
this.ns.adapter.on('delete-room', (room) => {
const entity = this.idEntityMap[room];
if (entity) {
// console.log('instance:', process.env.NODE_APP_INSTANCE, 'remove filter', room);
unset(this.filterMap[entity], room);
unset(this.idEntityMap, room);
}
});
this.ns.on('sendRecord', (entity, filter, record, isCreate) => {
console.log('instance:', process.env.NODE_APP_INSTANCE, 'get record from another', JSON.stringify(entity));
});
}
private sendRecord(entity: keyof ED, filter: ED[keyof ED]['Selection']['filter'], record: OpRecord<ED>, sid?: string, isCreate?: boolean) {
if (entity === 'spContractApplyment') {
console.log('instance:', process.env.NODE_APP_INSTANCE, 'sendRecord', JSON.stringify(entity));
publishEvent(event: string, records: OpRecord<ED>[], sid?: string) {
const { instanceId } = getClusterInfo();
console.log('publishEvent', instanceId);
if (sid) {
this.ns.to(event).except(sid).emit('data', records);
}
this.ns.serverSideEmit('sendRecord', entity, filter, record, isCreate);
if (this.filterMap[entity]) {
Object.keys(this.filterMap[entity]!).forEach(
async (room) => {
const context = await this.contextBuilder();
const filter2 = this.filterMap[entity]![room];
let needSend = false;
if (isCreate) {
// 如果是插入数据肯定是单行,使用相容性检测
const contained = await checkFilterContains(entity, context, filter2, filter, true);
needSend = contained;
}
else {
const repeled = await checkFilterRepel(entity, context, filter, filter2, true);
needSend = !repeled;
}
if (needSend) {
// console.log('instance:', process.env.NODE_APP_INSTANCE, 'needSend', JSON.stringify(room));
if (sid) {
this.ns.to(room).except(sid).emit('data', [record], [room]);
}
else {
this.ns.to(room).emit('data', [record], [room]);
}
}
}
);
else {
this.ns.to(event).emit('data', records);
}
}
onDataCommited(context: Context) {
const sid = context.getSubscriberId();
const { opRecords } = context;
opRecords.forEach(
(record) => {
const { a } = record;
switch (a) {
case 'c': {
const { e, d } = record as CreateOpResult<ED, keyof ED>;
this.sendRecord(e, d, record, sid, true);
break;
}
case 'u': {
const { e, d, f } = record as UpdateOpResult<ED, keyof ED>;
this.sendRecord(e, f, record, sid);
break;
}
case 'r': {
const { e, f } = record as RemoveOpResult<ED, keyof ED>;
this.sendRecord(e, f, record, sid);
break;
}
default: {
break;
}
}
}
);
}
}

View File

@ -14,13 +14,28 @@ function getProcessEnvOption(option: string) {
}
}
// 初始化判定集群状态目前支持pm2的集群信息
// 初始化判定集群状态,需要在环境变量中注入两个值
/** pm2注入方法https://pm2.fenxianglu.cn/docs/general/environment-variables
* apps: [
{
name: 'xxx',
script: "xxxjs",
instances: "2",
increment_var: "OAK_INSTANCE_ID",
env: {
OAK_INSTANCE_CNT: 9,
OAK_INSTANCE_ID: 8,
}
},
],
**/
function initialize() {
const pmId = getProcessEnvOption('NODE_APP_INSTANCE');
if (pmId) {
const instanceIdStr = getProcessEnvOption('OAK_INSTANCE_ID');
if (instanceIdStr) {
const usingCluster = true;
const instanceId = parseInt(pmId);
const instanceCount = parseInt(getProcessEnvOption('instances')!);
const instanceId = parseInt(instanceIdStr);
const instanceCount = parseInt(getProcessEnvOption('OAK_INSTANCE_CNT')!);
return {
usingCluster,
instanceCount,