dataScriber中的部分实现

This commit is contained in:
Xu Chang 2023-09-13 17:36:01 +08:00
parent d5c1cc9e71
commit 4e86d6a229
3 changed files with 84 additions and 4 deletions

View File

@ -5,7 +5,9 @@ import { Server } from 'socket.io';
export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Context extends AsyncContext<ED>> {
private io;
private contextBuilder;
private hash;
constructor(io: Server, contextBuilder: (scene?: string) => Promise<Context>);
private calcEntityFilterID;
/**
* socket连接
*/

View File

@ -1,19 +1,56 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const crypto_1 = require("crypto");
class DataSubscriber {
io;
contextBuilder;
hash;
constructor(io, contextBuilder) {
this.io = io;
this.contextBuilder = contextBuilder;
this.startup();
this.hash = (0, crypto_1.createHash)('sha256');
}
calcEntityFilterID(entity, filter) {
// 用哈希计算来保证id唯一性
const h = this.hash.copy();
h.update(`${entity}-${JSON.stringify(filter)}`);
const id = h.digest('hex');
return id;
}
/**
* 来自外部的socket连接监听数据变化
*/
startup() {
this.io.on('connection', (socket) => {
this.io.on('connection', async (socket) => {
console.log('connection', socket.id);
const { 'oak-cxt': cxtStr } = socket.handshake.headers;
const context = await this.contextBuilder(cxtStr);
socket.userId = context.getCurrentUserId();
socket.context = context;
socket.idMap = {};
socket.on('sub', (data, callback) => {
console.log(data);
data.forEach((ele) => {
const { id, entity, filter } = ele;
console.log('sub', id, entity, filter);
const globalId = this.calcEntityFilterID(entity, filter);
const rooms = this.io.of("/").adapter.rooms;
console.log(rooms);
socket.idMap[id] = globalId;
socket.join(globalId);
});
});
socket.on('unsub', (ids) => {
console.log('unsub', ids);
ids.forEach((id) => {
const globalId = socket.idMap[id];
const rooms = this.io.of("/").adapter.rooms;
console.log(rooms);
socket.leave(globalId);
console.log(rooms);
});
});
socket.on('disconnect', (reason) => {
console.log('disconnect', reason);
});

View File

@ -1,28 +1,69 @@
import { EntityDict } from 'oak-domain/lib/types';
import { Hash, createHash } from 'crypto';
import assert from 'assert';
import { EntityDict, SubDataDef } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
import { Server } from 'socket.io';
export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Context extends AsyncContext<ED>> {
private io: Server;
private contextBuilder: (scene?: string) => Promise<Context>;
private hash: Hash;
constructor(io: Server, contextBuilder: (scene?: string) => Promise<Context>) {
this.io = io;
this.contextBuilder = contextBuilder;
this.startup();
this.hash = createHash('sha256');
}
private calcEntityFilterID(entity: keyof ED, filter: ED[keyof ED]['Selection']['filter']) {
// 用哈希计算来保证id唯一性
const h = this.hash.copy();
h.update(`${entity as string}-${JSON.stringify(filter)}`);
const id = h.digest('hex');
return id;
}
/**
* socket连接
*/
private startup() {
this.io.on('connection', (socket) => {
this.io.on('connection', async (socket) => {
console.log('connection', socket.id);
const { 'oak-cxt': cxtStr } = socket.handshake.headers;
const context = await this.contextBuilder(cxtStr as string);
(socket as any).userId = context.getCurrentUserId();
(socket as any).context = context;
(socket as any).idMap = {};
socket.on('sub', (data: SubDataDef<ED, keyof ED>[], callback) => {
console.log(data);
data.forEach(
(ele) => {
const { id, entity, filter } = ele;
console.log('sub', id, entity, filter);
const globalId = this.calcEntityFilterID(entity, filter);
(socket as any).idMap[id] = globalId;
socket.join(globalId);
}
);
});
socket.on('unsub', (ids: string[]) => {
console.log('unsub', ids);
ids.forEach(
(id) => {
const globalId = (socket as any).idMap[id];
socket.leave(globalId);
}
)
});
socket.on('disconnect', (reason) => {
console.log('disconnect', reason);
});
})
});
}
}