删除了原来编译多余的DataSubscriber

This commit is contained in:
Xu Chang 2024-05-12 23:45:17 +08:00
parent bf37cec7f7
commit f31d99c537
4 changed files with 4 additions and 180 deletions

View File

@ -75,7 +75,7 @@ class AppLoader extends types_1.AppLoader {
const { BackendRuntimeContext } = require(`${path}/lib/context/BackendRuntimeContext`);
const loaderThis = this;
// 需要重载context上的构造和commit方法否则程序中执行context.restartToExecute这样的方法中new一个context出来是无法正确执行的
class NewBackendRuntimeContext extends BackendRuntimeContext {
class BackendRuntimeContextWrapper extends BackendRuntimeContext {
constructor(store) {
super(store);
this.clusterInfo = (0, env_1.getClusterInfo)();
@ -95,7 +95,7 @@ class AppLoader extends types_1.AppLoader {
}
}
;
this.contextBuilder = (store) => new NewBackendRuntimeContext(store);
this.contextBuilder = (store) => new BackendRuntimeContextWrapper(store);
}
registerTrigger(trigger) {
this.dbStore.registerTrigger(trigger);

View File

@ -1,18 +0,0 @@
import { EntityDict } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { BackendRuntimeContext } from 'oak-frontend-base';
import { Namespace } from 'socket.io';
export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Context extends BackendRuntimeContext<ED>> {
private ns;
private contextBuilder;
private filterMap;
private idEntityMap;
constructor(ns: Namespace, contextBuilder: (scene?: string) => Promise<Context>);
private formCreateRoomRoutine;
/**
* socket连接
*/
private startup;
private sendRecord;
onDataCommited(context: Context): void;
}

View File

@ -1,158 +0,0 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const lodash_1 = require("oak-domain/lib/utils/lodash");
const oak_domain_1 = require("oak-domain");
class DataSubscriber {
ns;
contextBuilder;
filterMap;
idEntityMap;
constructor(ns, contextBuilder) {
this.ns = ns;
this.contextBuilder = contextBuilder;
this.startup();
this.filterMap = {};
this.idEntityMap = {};
}
formCreateRoomRoutine(def) {
const { id, entity, filter } = def;
return (room) => {
if (room === id) {
console.log('instance:', process.env.NODE_APP_INSTANCE, 'add filter', room);
// 本房间不存在说明这个filter是新出现的
if (this.filterMap[entity]) {
// id的唯一性由前台保证重复则无视
Object.assign(this.filterMap[entity], {
[id]: filter,
});
}
else {
Object.assign(this.filterMap, {
[entity]: {
[id]: filter,
}
});
}
this.idEntityMap[id] = entity;
}
};
}
/**
* 来自外部的socket连接监听数据变化
*/
startup() {
this.ns.on('connection', async (socket) => {
try {
const { 'oak-cxt': cxtStr } = socket.handshake.headers;
const context = await this.contextBuilder(cxtStr);
socket.userId = context.getCurrentUserId();
socket.context = context;
socket.idMap = {};
socket.on('sub', async (data) => {
try {
console.log('instance:', process.env.NODE_APP_INSTANCE, 'on sub', JSON.stringify(data));
await Promise.all(data.map(async (ele) => {
const { id, entity, filter } = ele;
// 尝试select此filter如果失败说明权限越界
await context.select(entity, {
data: {
id: 1,
},
filter,
}, {});
}));
}
catch (err) {
socket.emit('error', err.toString());
return;
}
data.forEach((ele) => {
const createRoomRoutine = this.formCreateRoomRoutine(ele);
this.ns.adapter.on('create-room', createRoomRoutine);
socket.join(ele.id);
this.ns.adapter.off('create-room', createRoomRoutine);
});
});
socket.on('unsub', (ids) => {
// console.log('instance:', process.env.NODE_APP_INSTANCE, 'on unsub', JSON.stringify(ids));
ids.forEach((id) => {
socket.leave(id);
});
});
}
catch (err) {
socket.emit('error', err.toString());
}
});
this.ns.adapter.on('delete-room', (room) => {
const entity = this.idEntityMap[room];
if (entity) {
// console.log('instance:', process.env.NODE_APP_INSTANCE, 'remove filter', room);
(0, lodash_1.unset)(this.filterMap[entity], room);
(0, lodash_1.unset)(this.idEntityMap, room);
}
});
this.ns.on('sendRecord', (entity, filter, record, isCreate) => {
console.log('instance:', process.env.NODE_APP_INSTANCE, 'get record from another', JSON.stringify(entity));
});
}
sendRecord(entity, filter, record, sid, isCreate) {
if (entity === 'spContractApplyment') {
console.log('instance:', process.env.NODE_APP_INSTANCE, 'sendRecord', JSON.stringify(entity));
}
this.ns.serverSideEmit('sendRecord', entity, filter, record, isCreate);
if (this.filterMap[entity]) {
Object.keys(this.filterMap[entity]).forEach(async (room) => {
const context = await this.contextBuilder();
const filter2 = this.filterMap[entity][room];
let needSend = false;
if (isCreate) {
// 如果是插入数据肯定是单行,使用相容性检测
const contained = await (0, oak_domain_1.checkFilterContains)(entity, context, filter2, filter, true);
needSend = contained;
}
else {
const repeled = await (0, oak_domain_1.checkFilterRepel)(entity, context, filter, filter2, true);
needSend = !repeled;
}
if (needSend) {
// console.log('instance:', process.env.NODE_APP_INSTANCE, 'needSend', JSON.stringify(room));
if (sid) {
this.ns.to(room).except(sid).emit('data', [record], [room]);
}
else {
this.ns.to(room).emit('data', [record], [room]);
}
}
});
}
}
onDataCommited(context) {
const sid = context.getSubscriberId();
const { opRecords } = context;
opRecords.forEach((record) => {
const { a } = record;
switch (a) {
case 'c': {
const { e, d } = record;
this.sendRecord(e, d, record, sid, true);
break;
}
case 'u': {
const { e, d, f } = record;
this.sendRecord(e, f, record, sid);
break;
}
case 'r': {
const { e, f } = record;
this.sendRecord(e, f, record, sid);
break;
}
default: {
break;
}
}
});
}
}
exports.default = DataSubscriber;

View File

@ -95,7 +95,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
const loaderThis = this;
// 需要重载context上的构造和commit方法否则程序中执行context.restartToExecute这样的方法中new一个context出来是无法正确执行的
class NewBackendRuntimeContext extends BackendRuntimeContext {
class BackendRuntimeContextWrapper extends BackendRuntimeContext {
constructor(store: DbStore<ED, Cxt>) {
super(store);
this.clusterInfo = getClusterInfo();
@ -122,7 +122,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
}
};
this.contextBuilder = (store) => new NewBackendRuntimeContext(store) as Cxt;
this.contextBuilder = (store) => new BackendRuntimeContextWrapper(store) as Cxt;
}
protected registerTrigger(trigger: Trigger<ED, keyof ED, Cxt>) {