提交了apploader的一些改动

This commit is contained in:
Xu Chang 2023-09-15 12:27:01 +08:00
parent 518242e22d
commit ae5353242c
6 changed files with 119 additions and 68 deletions

4
lib/AppLoader.d.ts vendored
View File

@ -3,7 +3,7 @@ import { AppLoader as GeneralAppLoader, EntityDict, OpRecord } from "oak-domain/
import { DbStore } from "./DbStore";
import { AsyncContext } from "oak-domain/lib/store/AsyncRowStore";
import { IncomingHttpHeaders, IncomingMessage } from 'http';
import { Server as SocketIoServer } from 'socket.io';
import { Namespace } from 'socket.io';
export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends GeneralAppLoader<ED, Cxt> {
private dbStore;
private aspectDict;
@ -11,7 +11,7 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
private dataSubscriber?;
private contextBuilder;
private requireSth;
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, io?: SocketIoServer);
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace);
initTriggers(): void;
startWatchers(): void;
mount(initialize?: true): Promise<void>;

View File

@ -85,7 +85,7 @@ class AppLoader extends types_1.AppLoader {
Object.assign(sthOut, sth);
return sthOut;
}
constructor(path, contextBuilder, io) {
constructor(path, contextBuilder, ns) {
super(path);
const dbConfig = require((0, path_1.join)(path, '/configuration/mysql.json'));
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
@ -93,8 +93,8 @@ class AppLoader extends types_1.AppLoader {
this.externalDependencies = require((0, env_1.OAK_EXTERNAL_LIBS_FILEPATH)((0, path_1.join)(path, 'lib')));
this.aspectDict = Object.assign({}, index_1.default, this.requireSth('lib/aspects/index'));
this.dbStore = new DbStore_1.DbStore(storageSchema, contextBuilder, dbConfig, ActionCascadePathGraph, RelationCascadePathGraph, deducedRelationMap, selectFreeEntities, createFreeEntities, updateFreeEntities);
if (io) {
this.dataSubscriber = new DataSubscriber_1.default(io, (scene) => this.contextBuilder(scene)(this.dbStore));
if (ns) {
this.dataSubscriber = new DataSubscriber_1.default(ns, (scene) => this.contextBuilder(scene)(this.dbStore));
this.contextBuilder = (scene) => async (store) => {
const context = await contextBuilder(scene)(store);
// 注入在提交前向dataSubscribe
@ -102,7 +102,7 @@ class AppLoader extends types_1.AppLoader {
context.commit = async () => {
const { opRecords } = context;
const userId = context.getCurrentUserId();
originCommit.call(context);
await originCommit.call(context);
};
return context;
};

View File

@ -1,13 +1,12 @@
import { EntityDict, OpRecord } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
import { Server } from 'socket.io';
import { Namespace } from 'socket.io';
export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Context extends AsyncContext<ED>> {
private io;
private ns;
private contextBuilder;
private hash;
constructor(io: Server, contextBuilder: (scene?: string) => Promise<Context>);
private calcEntityFilterID;
private filterMap;
constructor(ns: Namespace, contextBuilder: (scene?: string) => Promise<Context>);
/**
* socket连接
*/

View File

@ -1,57 +1,80 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const crypto_1 = require("crypto");
class DataSubscriber {
io;
ns;
contextBuilder;
hash;
constructor(io, contextBuilder) {
this.io = io;
filterMap;
constructor(ns, contextBuilder) {
this.ns = ns;
this.contextBuilder = contextBuilder;
this.startup();
this.hash = (0, crypto_1.createHash)('sha256');
}
calcEntityFilterID(entity, filter) {
// 用哈希计算来保证id唯一性
const h = this.hash.copy();
h.update(`${entity}-${JSON.stringify(filter)}`);
const id = h.digest('hex');
return id;
this.filterMap = {};
}
/**
* 来自外部的socket连接监听数据变化
*/
startup() {
this.io.on('connection', async (socket) => {
this.ns.on('connection', async (socket) => {
console.log('connection', socket.id);
const { 'oak-cxt': cxtStr } = socket.handshake.headers;
const context = await this.contextBuilder(cxtStr);
socket.userId = context.getCurrentUserId();
socket.context = context;
socket.idMap = {};
socket.on('sub', (data, callback) => {
socket.on('sub', async (data, callback) => {
console.log(data);
try {
await Promise.all(data.map(async (ele) => {
const { id, entity, filter } = ele;
console.log('sub', id, entity, filter);
// 尝试select此filter如果失败说明权限越界
await context.select(entity, {
data: {
id: 1,
},
filter,
}, {});
}));
}
catch (err) {
callback(err.toString());
return;
}
const { rooms } = this.ns.adapter;
data.forEach((ele) => {
const { id, entity, filter } = ele;
console.log('sub', id, entity, filter);
// 尝试select此filter如果失败说明权限越界
// todo
const globalId = this.calcEntityFilterID(entity, filter);
socket.idMap[id] = globalId;
socket.join(globalId);
if (!rooms.get(id)) {
// 本房间不存在说明这个filter是新出现的
if (this.filterMap[entity]) {
// id的唯一性由前台保证重复则无视
Object.assign(this.filterMap[entity], {
[id]: filter,
});
}
else {
Object.assign(this.filterMap, {
[entity]: {
id: filter,
}
});
}
}
socket.join(id);
});
});
socket.on('unsub', (ids) => {
console.log('unsub', ids);
ids.forEach((id) => {
const globalId = socket.idMap[id];
socket.leave(globalId);
socket.leave(id);
});
});
socket.on('disconnect', (reason) => {
console.log('disconnect', reason);
});
});
this.ns.on('delete-room', (room, id) => {
console.log(room, id);
});
}
onDataCommited(records, userId) {
}

View File

@ -14,7 +14,7 @@ import { AsyncContext } from "oak-domain/lib/store/AsyncRowStore";
import { Endpoint, EndpointItem } from 'oak-domain/lib/types/Endpoint';
import assert from 'assert';
import { IncomingHttpHeaders, IncomingMessage } from 'http';
import { Server as SocketIoServer } from 'socket.io';
import { Server as SocketIoServer, Namespace } from 'socket.io';
import DataSubscriber from './DataSubscriber';
@ -108,7 +108,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
return sthOut;
}
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, io?: SocketIoServer) {
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace) {
super(path);
const dbConfig: MySQLConfiguration = require(join(path, '/configuration/mysql.json'));
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
@ -117,8 +117,8 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
this.aspectDict = Object.assign({}, generalAspectDict, this.requireSth('lib/aspects/index'));
this.dbStore = new DbStore<ED, Cxt>(storageSchema, contextBuilder, dbConfig, ActionCascadePathGraph, RelationCascadePathGraph, deducedRelationMap,
selectFreeEntities, createFreeEntities, updateFreeEntities);
if (io) {
this.dataSubscriber = new DataSubscriber(io, (scene) => this.contextBuilder(scene)(this.dbStore));
if (ns) {
this.dataSubscriber = new DataSubscriber(ns, (scene) => this.contextBuilder(scene)(this.dbStore));
this.contextBuilder = (scene) => async (store) => {
const context = await contextBuilder(scene)(store);
@ -128,7 +128,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
const { opRecords } = context;
const userId = context.getCurrentUserId();
originCommit.call(context);
await originCommit.call(context);
};
return context;

View File

@ -1,36 +1,29 @@
import { Hash, createHash } from 'crypto';
import assert from 'assert';
import { EntityDict, SubDataDef, OpRecord } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
import { Server } from 'socket.io';
import { Server, Namespace } from 'socket.io';
export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Context extends AsyncContext<ED>> {
private io: Server;
private ns: Namespace;
private contextBuilder: (scene?: string) => Promise<Context>;
private hash: Hash;
constructor(io: Server, contextBuilder: (scene?: string) => Promise<Context>) {
this.io = io;
this.contextBuilder = contextBuilder;
this.startup();
this.hash = createHash('sha256');
private filterMap: {
[k in keyof ED]?: Record<string, ED[keyof ED]['Selection']['filter']>;
}
private calcEntityFilterID(entity: keyof ED, filter: ED[keyof ED]['Selection']['filter']) {
// 用哈希计算来保证id唯一性
const h = this.hash.copy();
h.update(`${entity as string}-${JSON.stringify(filter)}`);
const id = h.digest('hex');
return id;
constructor(ns: Namespace, contextBuilder: (scene?: string) => Promise<Context>) {
this.ns = ns;
this.contextBuilder = contextBuilder;
this.startup();
this.filterMap = {};
}
/**
* socket连接
*/
private startup() {
this.io.on('connection', async (socket) => {
this.ns.on('connection', async (socket) => {
console.log('connection', socket.id);
const { 'oak-cxt': cxtStr } = socket.handshake.headers;
const context = await this.contextBuilder(cxtStr as string);
@ -38,18 +31,51 @@ export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Cont
(socket as any).context = context;
(socket as any).idMap = {};
socket.on('sub', (data: SubDataDef<ED, keyof ED>[], callback) => {
socket.on('sub', async (data: SubDataDef<ED, keyof ED>[], callback) => {
console.log(data);
try {
await Promise.all(
data.map(
async (ele) => {
const { id, entity, filter } = ele;
console.log('sub', id, entity, filter);
// 尝试select此filter如果失败说明权限越界
await context.select(entity, {
data: {
id: 1,
},
filter,
}, {});
}
)
);
}
catch (err: any) {
callback(err.toString());
return;
}
const { rooms } = this.ns.adapter;
data.forEach(
(ele) => {
const { id, entity, filter } = ele;
console.log('sub', id, entity, filter);
// 尝试select此filter如果失败说明权限越界
// todo
const globalId = this.calcEntityFilterID(entity, filter);
(socket as any).idMap[id] = globalId;
socket.join(globalId);
if (!rooms.get(id)) {
// 本房间不存在说明这个filter是新出现的
if (this.filterMap[entity]) {
// id的唯一性由前台保证重复则无视
Object.assign(this.filterMap[entity]!, {
[id]: filter,
});
}
else {
Object.assign(this.filterMap, {
[entity]: {
id: filter,
}
});
}
}
socket.join(id);
}
);
});
@ -58,19 +84,22 @@ export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Cont
console.log('unsub', ids);
ids.forEach(
(id) => {
const globalId = (socket as any).idMap[id];
socket.leave(globalId);
socket.leave(id)
}
)
);
});
socket.on('disconnect', (reason) => {
console.log('disconnect', reason);
});
});
this.ns.on('delete-room', (room, id) => {
console.log(room, id);
})
}
onDataCommited(records: OpRecord<ED>[], userId?: string) {
}
}