增加了socketIo的初始化

This commit is contained in:
Xu Chang 2023-09-04 11:12:57 +08:00
parent cf0497bab4
commit d5c1cc9e71
8 changed files with 249 additions and 36 deletions

17
lib/AppLoader.d.ts vendored
View File

@ -1,24 +1,29 @@
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AppLoader as GeneralAppLoader, EntityDict } from "oak-domain/lib/types";
import { AppLoader as GeneralAppLoader, EntityDict, OpRecord } from "oak-domain/lib/types";
import { DbStore } from "./DbStore";
import { MySQLConfiguration } from 'oak-db/lib/MySQL/types/Configuration';
import { AsyncContext } from "oak-domain/lib/store/AsyncRowStore";
import { Endpoint } from 'oak-domain/lib/types/Endpoint';
import { IncomingHttpHeaders, IncomingMessage } from 'http';
import { Server as SocketIoServer } from 'socket.io';
export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends GeneralAppLoader<ED, Cxt> {
private dbStore;
private aspectDict;
private externalDependencies;
private dataSubscriber?;
private contextBuilder;
private requireSth;
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, dbConfig: MySQLConfiguration);
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, io?: SocketIoServer);
initTriggers(): void;
startWatchers(): void;
mount(initialize?: true): Promise<void>;
unmount(): Promise<void>;
execAspect(name: string, context: Cxt, params?: any): Promise<any>;
execAspect(name: string, contextString?: string, params?: any): Promise<{
opRecords: OpRecord<ED>[];
result: any;
message?: string;
}>;
initialize(dropIfExists?: boolean): Promise<void>;
getStore(): DbStore<ED, Cxt>;
getEndpoints(): Record<string, Endpoint<ED, Cxt>>;
getEndpoints(): [string, "get" | "delete" | "post" | "put", string, (params: Record<string, string>, headers: IncomingHttpHeaders, req: IncomingMessage, body?: any) => Promise<any>][];
startTimers(): void;
execStartRoutines(): Promise<void>;
execRoutine(routine: (context: Cxt) => Promise<void>): Promise<void>;

View File

@ -13,19 +13,33 @@ const types_1 = require("oak-domain/lib/types");
const DbStore_1 = require("./DbStore");
const index_1 = tslib_1.__importStar(require("oak-common-aspect/lib/index"));
const assert_1 = tslib_1.__importDefault(require("assert"));
const DataSubscriber_1 = tslib_1.__importDefault(require("./DataSubscriber"));
class AppLoader extends types_1.AppLoader {
dbStore;
aspectDict;
externalDependencies;
dataSubscriber;
contextBuilder;
requireSth(filePath) {
const sth = require((0, path_1.join)(this.path, filePath)).default;
const depFilePath = (0, path_1.join)(this.path, filePath);
let sth;
if ((0, fs_1.existsSync)(`${depFilePath}.js`)) {
sth = require((0, path_1.join)(this.path, filePath)).default;
}
const sthExternal = this.externalDependencies.map(ele => {
const depFilePath = (0, path_1.join)(this.path, 'node_modules', ele, filePath);
if ((0, fs_1.existsSync)(`${depFilePath}.js`)) {
return require(depFilePath).default;
}
}).filter(ele => !!ele);
if (!sth) {
if (sthExternal.length > 0 && sthExternal[0] instanceof Array) {
sth = [];
}
else {
sth = {};
}
}
if (sth instanceof Array) {
sthExternal.forEach((sth2, idx) => {
(0, assert_1.default)(sth2 instanceof Array, `${(0, path_1.join)(this.path, 'node_modules', this.externalDependencies[idx], filePath)}中的default输出对象不是数组与项目对应路径的输出不一致`);
@ -71,19 +85,22 @@ class AppLoader extends types_1.AppLoader {
Object.assign(sthOut, sth);
return sthOut;
}
constructor(path, contextBuilder, dbConfig) {
constructor(path, contextBuilder, io) {
super(path);
const dbConfig = require((0, path_1.join)(path, '/configuration/mysql.json'));
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
const { ActionCascadePathGraph, RelationCascadePathGraph, selectFreeEntities, createFreeEntities, updateFreeEntities, deducedRelationMap } = require(`${path}/lib/oak-app-domain/Relation`);
this.externalDependencies = require((0, env_1.OAK_EXTERNAL_LIBS_FILEPATH)((0, path_1.join)(path, 'lib')));
this.aspectDict = Object.assign({}, index_1.default, this.requireSth('lib/aspects/index'));
this.dbStore = new DbStore_1.DbStore(storageSchema, contextBuilder, dbConfig, ActionCascadePathGraph, RelationCascadePathGraph, deducedRelationMap, selectFreeEntities, createFreeEntities, updateFreeEntities);
this.contextBuilder = contextBuilder;
if (io) {
this.dataSubscriber = new DataSubscriber_1.default(io, (scene) => this.contextBuilder(scene)(this.dbStore));
}
}
initTriggers() {
const triggers = this.requireSth('lib/triggers/index');
const checkers = this.requireSth('lib/checkers/index');
const authDict = this.requireSth('lib/auth/index');
const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
const { triggers: adTriggers, checkers: adCheckers } = (0, actionDef_1.makeIntrinsicCTWs)(this.dbStore.getSchema(), ActionDefDict);
triggers.forEach((trigger) => this.dbStore.registerTrigger(trigger));
@ -158,12 +175,27 @@ class AppLoader extends types_1.AppLoader {
(0, index_1.clearPorts)();
this.dbStore.disconnect();
}
async execAspect(name, context, params) {
async execAspect(name, contextString, params) {
const context = await this.contextBuilder(contextString)(this.dbStore);
const fn = this.aspectDict[name];
if (!fn) {
throw new Error(`不存在的接口名称: ${name}`);
}
return await fn(params, context);
await context.begin();
try {
const result = await fn(params, context);
await context.commit();
await context.refineOpRecords();
return {
opRecords: context.opRecords,
message: context.getMessage(),
result,
};
}
catch (err) {
await context.rollback();
throw err;
}
}
async initialize(dropIfExists) {
await this.dbStore.initialize(dropIfExists);
@ -186,7 +218,7 @@ class AppLoader extends types_1.AppLoader {
dontCreateOper: true,
});
await context.commit();
console.log(`data in ${entity} initialized!`);
console.log(`data in ${entity} initialized, ${rows.length} rows inserted`);
}
catch (err) {
await context.rollback();
@ -202,7 +234,40 @@ class AppLoader extends types_1.AppLoader {
}
getEndpoints() {
const endpoints = this.requireSth('lib/endpoints/index');
return endpoints;
const endPointRouters = [];
const endPointMap = {};
const transformEndpointItem = (key, item) => {
const { name, method, fn } = item;
const k = `${key}-${name}-${method}`;
if (endPointMap[k]) {
throw new Error(`endpoint中url为「${key}」、名为${name}的方法「${method}」存在重复定义`);
}
endPointMap[k] = true;
endPointRouters.push([name, method, key, async (params, headers, req, body) => {
const context = await this.contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await fn(context, params, headers, req, body);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
console.error(`endpoint「${key}」方法「${method}」出错`, err);
throw err;
}
}]);
};
for (const router in endpoints) {
const item = endpoints[router];
if (item instanceof Array) {
item.forEach(ele => transformEndpointItem(router, ele));
}
else {
transformEndpointItem(router, item);
}
}
return endPointRouters;
}
startTimers() {
const timers = this.requireSth('lib/timers/index');

13
lib/DataSubscriber.d.ts vendored Normal file
View File

@ -0,0 +1,13 @@
import { EntityDict } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
import { Server } from 'socket.io';
export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Context extends AsyncContext<ED>> {
private io;
private contextBuilder;
constructor(io: Server, contextBuilder: (scene?: string) => Promise<Context>);
/**
* socket连接
*/
private startup;
}

23
lib/DataSubscriber.js Normal file
View File

@ -0,0 +1,23 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
class DataSubscriber {
io;
contextBuilder;
constructor(io, contextBuilder) {
this.io = io;
this.contextBuilder = contextBuilder;
this.startup();
}
/**
* 来自外部的socket连接监听数据变化
*/
startup() {
this.io.on('connection', (socket) => {
console.log('connection', socket.id);
socket.on('disconnect', (reason) => {
console.log('disconnect', reason);
});
});
}
}
exports.default = DataSubscriber;

View File

@ -80,6 +80,7 @@ class DbStore extends oak_db_1.MysqlStore {
await context.begin();
}
try {
// count不用检查权限因为检查权限中本身要用到count
// const selection2 = Object.assign({
// action: 'select',
// }, selection) as ED[T]['Operation'];

View File

@ -24,6 +24,7 @@
"oak-common-aspect": "file:../oak-common-aspect",
"oak-db": "file:../oak-db",
"oak-domain": "file:../oak-domain",
"socket.io": "^4.7.2",
"uuid": "^8.3.2"
},
"license": "ISC",

View File

@ -4,27 +4,34 @@ import { scheduleJob } from 'node-schedule';
import { OAK_EXTERNAL_LIBS_FILEPATH } from 'oak-domain/lib/compiler/env';
import { makeIntrinsicCTWs } from "oak-domain/lib/store/actionDef";
import { intersection } from 'oak-domain/lib/utils/lodash';
import { createDynamicCheckers } from 'oak-domain/lib/checkers';
import { createDynamicTriggers } from 'oak-domain/lib/triggers';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { generateNewIdAsync } from 'oak-domain/lib/utils/uuid';
import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, RowStore, Context, EntityDict, Watcher, BBWatcher, WBWatcher } from "oak-domain/lib/types";
import { AppLoader as GeneralAppLoader, Trigger, Checker, Aspect, RowStore, Context, EntityDict, Watcher, BBWatcher, WBWatcher, OpRecord } from "oak-domain/lib/types";
import { DbStore } from "./DbStore";
import generalAspectDict, { clearPorts, registerPorts } from 'oak-common-aspect/lib/index';
import { MySQLConfiguration } from 'oak-db/lib/MySQL/types/Configuration';
import { AsyncContext } from "oak-domain/lib/store/AsyncRowStore";
import { Endpoint } from 'oak-domain/lib/types/Endpoint';
import { Endpoint, EndpointItem } from 'oak-domain/lib/types/Endpoint';
import assert from 'assert';
import { IncomingHttpHeaders, IncomingMessage } from 'http';
import { Server as SocketIoServer } from 'socket.io';
import DataSubscriber from './DataSubscriber';
export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends GeneralAppLoader<ED, Cxt> {
private dbStore: DbStore<ED, Cxt>;
private aspectDict: Record<string, Aspect<ED, Cxt>>;
private externalDependencies: string[];
private dataSubscriber?: DataSubscriber<ED, Cxt>;
private contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>;
private requireSth(filePath: string): any {
const sth = require(join(this.path, filePath)).default;
const depFilePath = join(this.path, filePath);
let sth: any;
if (existsSync(`${depFilePath}.js`)) {
sth = require(join(this.path, filePath)).default;
}
const sthExternal = this.externalDependencies.map(
ele => {
const depFilePath = join(this.path, 'node_modules', ele, filePath);
@ -35,6 +42,16 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
).filter(
ele => !!ele
);
if (!sth) {
if (sthExternal.length > 0 && sthExternal[0] instanceof Array) {
sth = [];
}
else {
sth = {};
}
}
if (sth instanceof Array) {
sthExternal.forEach(
(sth2, idx) => {
@ -91,8 +108,9 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
return sthOut;
}
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, dbConfig: MySQLConfiguration) {
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, io?: SocketIoServer) {
super(path);
const dbConfig: MySQLConfiguration = require(join(path, '/configuration/mysql.json'));
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
const { ActionCascadePathGraph, RelationCascadePathGraph, selectFreeEntities, createFreeEntities, updateFreeEntities, deducedRelationMap } = require(`${path}/lib/oak-app-domain/Relation`);
this.externalDependencies = require(OAK_EXTERNAL_LIBS_FILEPATH(join(path, 'lib')));
@ -100,14 +118,16 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
this.dbStore = new DbStore<ED, Cxt>(storageSchema, contextBuilder, dbConfig, ActionCascadePathGraph, RelationCascadePathGraph, deducedRelationMap,
selectFreeEntities, createFreeEntities, updateFreeEntities);
this.contextBuilder = contextBuilder;
if (io) {
this.dataSubscriber = new DataSubscriber(io, (scene) => this.contextBuilder(scene)(this.dbStore));
}
}
initTriggers() {
const triggers = this.requireSth('lib/triggers/index');
const checkers = this.requireSth('lib/checkers/index');
const authDict = this.requireSth('lib/auth/index');
const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
const { triggers: adTriggers, checkers: adCheckers } = makeIntrinsicCTWs(this.dbStore.getSchema(), ActionDefDict);
triggers.forEach(
(trigger: Trigger<ED, keyof ED, Cxt>) => this.dbStore.registerTrigger(trigger)
@ -120,16 +140,16 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
);
adCheckers.forEach(
(checker) => this.dbStore.registerChecker(checker)
);
);
}
startWatchers() {
const watchers = this.requireSth('lib/watchers/index');
const { ActionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
const { watchers: adWatchers } = makeIntrinsicCTWs(this.dbStore.getSchema(), ActionDefDict);
const totalWatchers = (<Watcher<ED, keyof ED, Cxt>[]>watchers).concat(adWatchers);
let count = 0;
const doWatchers = async () => {
count++;
@ -150,7 +170,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
}, context, {
dontCollect: true,
});
console.log(`执行了watcher【${w.name}】,结果是:`, result);
}
else {
@ -164,7 +184,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
dontCollect: true,
blockTrigger: true,
});
const result = await fn(context, rows);
console.log(`执行了watcher【${w.name}】,结果是:`, result);
}
@ -177,7 +197,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
}
const duration = Date.now() - start;
console.log(`${count}次执行watchers共执行${watchers.length}个,耗时${duration}毫秒`);
setTimeout(() => doWatchers(), 120000);
};
doWatchers();
@ -198,12 +218,31 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
this.dbStore.disconnect();
}
async execAspect(name: string, context: Cxt, params?: any): Promise<any> {
async execAspect(name: string, contextString?: string, params?: any): Promise<{
opRecords: OpRecord<ED>[];
result: any;
message?: string;
}> {
const context = await this.contextBuilder(contextString)(this.dbStore);
const fn = this.aspectDict[name];
if (!fn) {
throw new Error(`不存在的接口名称: ${name}`);
}
return await fn(params, context);
await context.begin();
try {
const result = await fn(params, context);
await context.commit();
await context.refineOpRecords();
return {
opRecords: context.opRecords,
message: context.getMessage(),
result,
};
}
catch (err) {
await context.rollback();
throw err;
}
}
async initialize(dropIfExists?: boolean) {
@ -228,7 +267,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
dontCreateOper: true,
});
await context.commit();
console.log(`data in ${entity} initialized!`);
console.log(`data in ${entity} initialized, ${rows.length} rows inserted`);
}
catch (err) {
await context.rollback();
@ -244,9 +283,47 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
return this.dbStore;
}
getEndpoints(): Record<string, Endpoint<ED, Cxt>> {
const endpoints = this.requireSth('lib/endpoints/index');
return endpoints;
getEndpoints() {
const endpoints: Record<string, Endpoint<ED, Cxt>> = this.requireSth('lib/endpoints/index');
const endPointRouters: Array<[EndpointItem<ED, Cxt>['name'], EndpointItem<ED, Cxt>['method'], string, (params: Record<string, string>, headers: IncomingHttpHeaders, req: IncomingMessage, body?: any) => Promise<any>]> = [];
const endPointMap: Record<string, true> = {};
const transformEndpointItem = (key: string, item: EndpointItem<ED, Cxt>) => {
const { name, method, fn } = item;
const k = `${key}-${name}-${method}`;
if (endPointMap[k]) {
throw new Error(`endpoint中url为「${key}」、名为${name}的方法「${method}」存在重复定义`);
}
endPointMap[k] = true;
endPointRouters.push(
[name, method, key, async (params, headers, req, body) => {
const context = await this.contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await fn(context, params, headers, req, body);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
console.error(`endpoint「${key}」方法「${method}」出错`, err);
throw err;
}
}]
);
};
for (const router in endpoints) {
const item = endpoints[router];
if (item instanceof Array) {
item.forEach(
ele => transformEndpointItem(router, ele)
);
}
else {
transformEndpointItem(router, item);
}
}
return endPointRouters;
}
startTimers() {
@ -263,7 +340,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
console.log(`定时器【${name}】执行完成,耗时${Date.now() - start}毫秒,结果是【${result}`);
await context.commit();
}
catch(err) {
catch (err) {
console.warn(`定时器【${name}】执行失败,耗时${Date.now() - start}毫秒,错误是`, err);
await context.rollback();
}
@ -275,7 +352,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
const routines = this.requireSth('lib/routines/start');
for (const routine of routines) {
const { name, fn } = routine;
const context = await this.contextBuilder()(this.dbStore);
const context = await this.contextBuilder()(this.dbStore);
const start = Date.now();
await context.begin();
try {
@ -292,6 +369,6 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Async
async execRoutine(routine: (context: Cxt) => Promise<void>) {
const context = await this.contextBuilder()(this.dbStore);
await routine(context);
await routine(context);
}
}

28
src/DataSubscriber.ts Normal file
View File

@ -0,0 +1,28 @@
import { EntityDict } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
import { Server } from 'socket.io';
export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Context extends AsyncContext<ED>> {
private io: Server;
private contextBuilder: (scene?: string) => Promise<Context>;
constructor(io: Server, contextBuilder: (scene?: string) => Promise<Context>) {
this.io = io;
this.contextBuilder = contextBuilder;
this.startup();
}
/**
* socket连接
*/
private startup() {
this.io.on('connection', (socket) => {
console.log('connection', socket.id);
socket.on('disconnect', (reason) => {
console.log('disconnect', reason);
});
})
}
}