This commit is contained in:
Xu Chang 2023-09-14 10:07:50 +08:00
commit 9032783680
4 changed files with 121 additions and 41 deletions

View File

@ -5,7 +5,9 @@ import { Server } from 'socket.io';
export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Context extends AsyncContext<ED>> { export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Context extends AsyncContext<ED>> {
private io; private io;
private contextBuilder; private contextBuilder;
private hash;
constructor(io: Server, contextBuilder: (scene?: string) => Promise<Context>); constructor(io: Server, contextBuilder: (scene?: string) => Promise<Context>);
private calcEntityFilterID;
/** /**
* socket连接 * socket连接
*/ */

View File

@ -1,19 +1,56 @@
"use strict"; "use strict";
Object.defineProperty(exports, "__esModule", { value: true }); Object.defineProperty(exports, "__esModule", { value: true });
const crypto_1 = require("crypto");
class DataSubscriber { class DataSubscriber {
io; io;
contextBuilder; contextBuilder;
hash;
constructor(io, contextBuilder) { constructor(io, contextBuilder) {
this.io = io; this.io = io;
this.contextBuilder = contextBuilder; this.contextBuilder = contextBuilder;
this.startup(); this.startup();
this.hash = (0, crypto_1.createHash)('sha256');
}
calcEntityFilterID(entity, filter) {
// 用哈希计算来保证id唯一性
const h = this.hash.copy();
h.update(`${entity}-${JSON.stringify(filter)}`);
const id = h.digest('hex');
return id;
} }
/** /**
* 来自外部的socket连接监听数据变化 * 来自外部的socket连接监听数据变化
*/ */
startup() { startup() {
this.io.on('connection', (socket) => { this.io.on('connection', async (socket) => {
console.log('connection', socket.id); console.log('connection', socket.id);
const { 'oak-cxt': cxtStr } = socket.handshake.headers;
const context = await this.contextBuilder(cxtStr);
socket.userId = context.getCurrentUserId();
socket.context = context;
socket.idMap = {};
socket.on('sub', (data, callback) => {
console.log(data);
data.forEach((ele) => {
const { id, entity, filter } = ele;
console.log('sub', id, entity, filter);
const globalId = this.calcEntityFilterID(entity, filter);
const rooms = this.io.of("/").adapter.rooms;
console.log(rooms);
socket.idMap[id] = globalId;
socket.join(globalId);
});
});
socket.on('unsub', (ids) => {
console.log('unsub', ids);
ids.forEach((id) => {
const globalId = socket.idMap[id];
const rooms = this.io.of("/").adapter.rooms;
console.log(rooms);
socket.leave(globalId);
console.log(rooms);
});
});
socket.on('disconnect', (reason) => { socket.on('disconnect', (reason) => {
console.log('disconnect', reason); console.log('disconnect', reason);
}); });

View File

@ -1,39 +1,39 @@
{ {
"name": "oak-backend-base", "name": "oak-backend-base",
"version": "3.0.1", "version": "3.0.1",
"description": "oak-backend-base", "description": "oak-backend-base",
"main": "lib/index", "main": "lib/index",
"author": { "author": {
"name": "XuChang" "name": "XuChang"
}, },
"files": [ "files": [
"lib/**/*" "lib/**/*"
], ],
"scripts": { "scripts": {
"copy-files": "copyfiles -u 1 src/**/*.json lib/", "copy-files": "copyfiles -u 1 src/**/*.json lib/",
"test": "ts-node test/test.ts", "test": "ts-node test/test.ts",
"test2": "ts-node test/testDbStore.ts", "test2": "ts-node test/testDbStore.ts",
"build": "tsc && npm run copy-files" "build": "tsc && npm run copy-files"
}, },
"dependencies": { "dependencies": {
"@types/node-schedule": "^2.1.0", "lodash": "^4.17.21",
"lodash": "^4.17.21", "mysql": "^2.18.1",
"mysql": "^2.18.1", "mysql2": "^2.3.3",
"mysql2": "^2.3.3", "node-schedule": "^2.1.0",
"node-schedule": "^2.1.0", "oak-common-aspect": "file:../oak-common-aspect",
"oak-common-aspect": "file:../oak-common-aspect", "oak-db": "file:../oak-db",
"oak-db": "file:../oak-db", "oak-domain": "file:../oak-domain",
"oak-domain": "file:../oak-domain", "socket.io": "^4.7.2",
"socket.io": "^4.7.2", "uuid": "^8.3.2"
"uuid": "^8.3.2" },
}, "license": "ISC",
"license": "ISC", "devDependencies": {
"devDependencies": { "@types/node": "^17.0.40",
"@types/node": "^17.0.40", "@types/node-schedule": "^2.1.0",
"@types/uuid": "^8.3.4", "@types/uuid": "^8.3.4",
"copyfiles": "^2.4.1", "copyfiles": "^2.4.1",
"ts-node": "~10.9.1", "ts-node": "~10.9.1",
"tslib": "^2.4.0", "tslib": "^2.4.0",
"typescript": "^4.7.4" "typescript": "^4.7.4"
} }
} }

View File

@ -1,29 +1,70 @@
import { EntityDict, OpRecord } from 'oak-domain/lib/types'; import { Hash, createHash } from 'crypto';
import assert from 'assert';
import { EntityDict, SubDataDef, OpRecord } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore'; import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
import { Server } from 'socket.io'; import { Server } from 'socket.io';
export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Context extends AsyncContext<ED>> { export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Context extends AsyncContext<ED>> {
private io: Server; private io: Server;
private contextBuilder: (scene?: string) => Promise<Context>; private contextBuilder: (scene?: string) => Promise<Context>;
private hash: Hash;
constructor(io: Server, contextBuilder: (scene?: string) => Promise<Context>) { constructor(io: Server, contextBuilder: (scene?: string) => Promise<Context>) {
this.io = io; this.io = io;
this.contextBuilder = contextBuilder; this.contextBuilder = contextBuilder;
this.startup(); this.startup();
this.hash = createHash('sha256');
}
private calcEntityFilterID(entity: keyof ED, filter: ED[keyof ED]['Selection']['filter']) {
// 用哈希计算来保证id唯一性
const h = this.hash.copy();
h.update(`${entity as string}-${JSON.stringify(filter)}`);
const id = h.digest('hex');
return id;
} }
/** /**
* socket连接 * socket连接
*/ */
private startup() { private startup() {
this.io.on('connection', (socket) => { this.io.on('connection', async (socket) => {
console.log('connection', socket.id); console.log('connection', socket.id);
const { 'oak-cxt': cxtStr } = socket.handshake.headers;
const context = await this.contextBuilder(cxtStr as string);
(socket as any).userId = context.getCurrentUserId();
(socket as any).context = context;
(socket as any).idMap = {};
socket.on('sub', (data: SubDataDef<ED, keyof ED>[], callback) => {
console.log(data);
data.forEach(
(ele) => {
const { id, entity, filter } = ele;
console.log('sub', id, entity, filter);
const globalId = this.calcEntityFilterID(entity, filter);
(socket as any).idMap[id] = globalId;
socket.join(globalId);
}
);
});
socket.on('unsub', (ids: string[]) => {
console.log('unsub', ids);
ids.forEach(
(id) => {
const globalId = (socket as any).idMap[id];
socket.leave(globalId);
}
)
});
socket.on('disconnect', (reason) => { socket.on('disconnect', (reason) => {
console.log('disconnect', reason); console.log('disconnect', reason);
}); });
}) });
} }
onDataCommited(records: OpRecord<ED>[], userId?: string) { onDataCommited(records: OpRecord<ED>[], userId?: string) {