初始化例程修改

This commit is contained in:
Xu Chang 2023-09-04 11:14:13 +08:00
parent a6d8ff1de1
commit dbf3b75630
6 changed files with 131 additions and 115 deletions

View File

@ -1,13 +1,9 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.initialize = void 0;
const tslib_1 = require("tslib");
/// <reference path="../typings/polyfill.d.ts" />
const path_1 = tslib_1.__importDefault(require("path"));
const oak_backend_base_1 = require("oak-backend-base");
async function initialize(path, contextBuilder, dropIfExists) {
const dbConfig = require(path_1.default.join(path, '/configuration/mysql.json'));
const appLoader = new oak_backend_base_1.AppLoader(path, contextBuilder, dbConfig);
const appLoader = new oak_backend_base_1.AppLoader(path, contextBuilder);
await appLoader.mount(true);
await appLoader.initialize(dropIfExists);
await appLoader.unmount();

View File

@ -4,4 +4,4 @@ import { Connector, EntityDict } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AsyncContext, AsyncRowStore } from 'oak-domain/lib/store/AsyncRowStore';
import { SyncContext } from 'oak-domain/lib/store/SyncRowStore';
export declare function startup<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>, FrontCxt extends SyncContext<ED>>(path: string, contextBuilder: (scene?: string) => (store: AsyncRowStore<ED, Cxt>) => Promise<Cxt>, connector: Connector<ED, Cxt, FrontCxt>, omitWatchers?: boolean, omitTimers?: boolean, routine?: (context: Cxt) => Promise<void>): Promise<void>;
export declare function startup<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>, FrontCxt extends SyncContext<ED>>(path: string, contextBuilder: (scene?: string) => (store: AsyncRowStore<ED, Cxt>) => Promise<Cxt>, connector: Connector<ED, FrontCxt>, omitWatchers?: boolean, omitTimers?: boolean, routine?: (context: Cxt) => Promise<void>): Promise<void>;

View File

@ -4,18 +4,39 @@ exports.startup = void 0;
const tslib_1 = require("tslib");
/// <reference path="../typings/polyfill.d.ts" />
require("./polyfill");
const http_1 = require("http");
const path_1 = tslib_1.__importDefault(require("path"));
const koa_1 = tslib_1.__importDefault(require("koa"));
const koa_router_1 = tslib_1.__importDefault(require("koa-router"));
const koa_body_1 = tslib_1.__importDefault(require("koa-body"));
const oak_backend_base_1 = require("oak-backend-base");
const types_1 = require("oak-domain/lib/types");
const socket_io_1 = require("socket.io");
const cluster_adapter_1 = require("@socket.io/cluster-adapter");
const sticky_1 = require("@socket.io/sticky");
async function startup(path, contextBuilder, connector, omitWatchers, omitTimers, routine) {
const dbConfig = require(path_1.default.join(path, '/configuration/mysql.json'));
const appLoader = new oak_backend_base_1.AppLoader(path, contextBuilder, dbConfig);
const koa = new koa_1.default();
// socket
const httpServer = (0, http_1.createServer)(koa.callback());
const socketOption = {
path: connector.getSubscribeRouter(),
};
if (process.env.NODE_ENV === 'development') {
socketOption.cors = {
origin: '*',
allowedHeaders: ["oak-cxt"],
};
}
const io = new socket_io_1.Server(httpServer, socketOption);
if (process.env.PM2_STATUS) {
// pm2环境下要接入clusterAdapter
// https://socket.io/zh-CN/docs/v4/pm2/
io.adapter((0, cluster_adapter_1.createAdapter)());
(0, sticky_1.setupWorker)(io);
}
const appLoader = new oak_backend_base_1.AppLoader(path, contextBuilder, io);
await appLoader.mount();
await appLoader.execStartRoutines();
const koa = new koa_1.default();
if (routine) {
// 如果传入了routine执行完成后就结束
await appLoader.execRoutine(routine);
@ -39,6 +60,7 @@ async function startup(path, contextBuilder, connector, omitWatchers, omitTimers
multipart: true,
}));
const router = new koa_router_1.default();
const serverConfig = require(path_1.default.join(path, '/configuration/server.json'));
// 如果是开发环境允许options
if (process.env.NODE_ENV === 'development') {
koa.use(async (ctx, next) => {
@ -56,18 +78,9 @@ async function startup(path, contextBuilder, connector, omitWatchers, omitTimers
router.post(connector.getRouter(), async (ctx) => {
const { request } = ctx;
const data = request.files ? Object.assign({}, request.body, request.files) : request.body; // 这里处理multiPart的文件不是太好
const { name, params, context } = await connector.parseRequest(request.headers, data, appLoader.getStore());
await context.begin();
let result;
try {
result = await appLoader.execAspect(name, context, params);
await context.commit();
}
catch (err) {
await context.rollback();
throw err;
}
const { body, headers } = await connector.serializeResult(result, context, request.headers, request.body);
const { contextString, aspectName } = connector.parseRequestHeaders(request.headers);
const { result, opRecords, message } = await appLoader.execAspect(aspectName, contextString, data);
const { body, headers } = await connector.serializeResult(result, opRecords, request.headers, request.body, message);
ctx.response.body = body;
return;
});
@ -80,59 +93,54 @@ async function startup(path, contextBuilder, connector, omitWatchers, omitTimers
response.body = res.body;
return;
});
// 注入所有的endpoints
const endpoints = appLoader.getEndpoints();
const endpointsArray = [];
for (const ep in endpoints) {
const useEndpointItem = (item) => {
const { method, fn, params, name } = item;
if (endpointsArray.find(ele => ele[0] === ep && ele[1] === method)) {
throw new Error(`endpoint中url为「${ep}」的方法「${method}」存在重复定义`);
}
let url = `/endpoint/${ep}`;
if (params) {
for (const p of params) {
url += `/:${p}`;
}
}
endpointsArray.push([name, method, url]);
router[method](name, url, async (ctx) => {
const { req, request, params } = ctx;
const { body, headers } = request;
const context = await contextBuilder()(appLoader.getStore());
await context.begin();
try {
const result = await fn(context, params, headers, req, body);
await context.commit();
ctx.response.body = result;
return;
}
catch (err) {
await context.rollback();
console.error(`endpoint「${ep}」「${method}」出错`, err);
ctx.response.status = 500;
return;
}
});
};
if (endpoints[ep] instanceof Array) {
endpoints[ep].forEach(epi => useEndpointItem(epi));
// 外部socket接口
router.get(connector.getSubscribePointRouter(), async (ctx) => {
const { response } = ctx;
if (process.env.PM2_STATUS) {
// 如果使用了pm2则返回 @socket.io/pm2所监听的PM2_PORT端口
response.body = {
path: connector.getSubscribeRouter(),
port: process.env.PM2_PORT || 8080,
};
// 开发环境socket直接连接
return;
}
else {
useEndpointItem(endpoints[ep]);
// 不使用pm2则监听在http服务器端口上
response.body = {
path: connector.getSubscribeRouter(),
port: serverConfig.port,
};
return;
}
}
});
// 注入所有的endpoints
const endpoints = appLoader.getEndpoints();
endpoints.forEach(([name, method, url, fn]) => {
router[method](name, url, async (ctx) => {
const { req, request, params } = ctx;
const { body, headers } = request;
try {
const result = await fn(params, headers, req, body);
ctx.response.body = result;
return;
}
catch (err) {
ctx.response.status = 500;
return;
}
});
});
router.get('/endpoint', async (ctx) => {
ctx.response.body = endpointsArray;
ctx.response.body = endpoints;
});
koa.use(router.routes());
const serverConfig = require(path_1.default.join(path, '/configuration/server.json'));
console.log(`server will listen on port ${serverConfig.port}`);
koa.on('error', (err) => {
console.error(err);
throw err;
});
koa.listen(serverConfig.port);
httpServer.listen(serverConfig.port);
if (!omitWatchers) {
appLoader.startWatchers();
}

View File

@ -36,6 +36,8 @@
},
"dependencies": {
"@pmmmwh/react-refresh-webpack-plugin": "^0.5.3",
"@socket.io/cluster-adapter": "^0.2.2",
"@socket.io/sticky": "^1.0.4",
"@svgr/webpack": "^5.5.0",
"@xmldom/xmldom": "0.8.2",
"axios": ">=0.21.1",
@ -106,6 +108,7 @@
"sass-loader": "^12.3.0",
"semver": "^7.3.5",
"shelljs": "^0.8.4",
"socket.io": "^4.7.2",
"source-map-loader": "^3.0.0",
"stream-browserify": "^3.0.0",
"style-loader": "^3.3.1",

View File

@ -9,8 +9,7 @@ export async function initialize<ED extends EntityDict & BaseEntityDict, Cxt ext
path: string,
contextBuilder: (scene?: string) => (store: AsyncRowStore<ED, Cxt>) => Promise<Cxt>,
dropIfExists?: boolean) {
const dbConfig = require(PathLib.join(path, '/configuration/mysql.json'));
const appLoader = new AppLoader(path, contextBuilder, dbConfig);
const appLoader = new AppLoader(path, contextBuilder);
await appLoader.mount(true);
await appLoader.initialize(dropIfExists);
await appLoader.unmount();

View File

@ -1,5 +1,6 @@
/// <reference path="../typings/polyfill.d.ts" />
import './polyfill';
import { createServer } from "http";
import PathLib from 'path';
import Koa from 'koa';
import KoaRouter from 'koa-router';
@ -9,26 +10,48 @@ import { OakException, Connector, EntityDict, EndpointItem, RowStore } from 'oak
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AsyncContext, AsyncRowStore } from 'oak-domain/lib/store/AsyncRowStore';
import { SyncContext } from 'oak-domain/lib/store/SyncRowStore';
import { Server } from "socket.io";
import { createAdapter } from "@socket.io/cluster-adapter";
import { setupWorker } from "@socket.io/sticky";
export async function startup<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>, FrontCxt extends SyncContext<ED>>(
path: string,
contextBuilder: (scene?: string) => (store: AsyncRowStore<ED, Cxt>) => Promise<Cxt>,
connector: Connector<ED, Cxt, FrontCxt>,
connector: Connector<ED, FrontCxt>,
omitWatchers?: boolean,
omitTimers?: boolean,
routine?: (context: Cxt) => Promise<void>,
) {
const dbConfig = require(PathLib.join(path, '/configuration/mysql.json'));
const appLoader = new AppLoader(path, contextBuilder, dbConfig);
const koa = new Koa();
// socket
const httpServer = createServer(koa.callback());
const socketOption: any = {
path: connector.getSubscribeRouter(),
};
if (process.env.NODE_ENV === 'development') {
socketOption.cors = {
origin: '*', // 应该只有debug模式存在
allowedHeaders: ["oak-cxt"],
};
}
const io = new Server(httpServer, socketOption);
if (process.env.PM2_STATUS) {
// pm2环境下要接入clusterAdapter
// https://socket.io/zh-CN/docs/v4/pm2/
io.adapter(createAdapter());
setupWorker(io);
}
const appLoader = new AppLoader(path, contextBuilder, io);
await appLoader.mount();
await appLoader.execStartRoutines();
const koa = new Koa();
if (routine) {
// 如果传入了routine执行完成后就结束
await appLoader.execRoutine(routine);
return;
}
// 否则启动服务器模式
koa.use(async (ctx, next) => {
try {
@ -48,6 +71,7 @@ export async function startup<ED extends EntityDict & BaseEntityDict, Cxt extend
}));
const router = new KoaRouter();
const serverConfig = require(PathLib.join(path, '/configuration/server.json'));
// 如果是开发环境允许options
if (process.env.NODE_ENV === 'development') {
koa.use(async (ctx, next) => {
@ -65,18 +89,10 @@ export async function startup<ED extends EntityDict & BaseEntityDict, Cxt extend
router.post(connector.getRouter(), async (ctx) => {
const { request } = ctx;
const data = request.files ? Object.assign({}, request.body, request.files) : request.body; // 这里处理multiPart的文件不是太好
const { name, params, context } = await connector.parseRequest(request.headers, data, appLoader.getStore());
await context.begin();
let result: any;
try {
result = await appLoader.execAspect(name, context, params);
await context.commit();
}
catch (err: any) {
await context.rollback();
throw err;
}
const { body, headers } = await connector.serializeResult(result, context, request.headers, request.body);
const { contextString, aspectName } = connector.parseRequestHeaders(request.headers);
const { result, opRecords, message } = await appLoader.execAspect(aspectName, contextString, data);
const { body, headers } = await connector.serializeResult(result, opRecords, request.headers, request.body, message);
ctx.response.body = body;
return;
});
@ -92,65 +108,59 @@ export async function startup<ED extends EntityDict & BaseEntityDict, Cxt extend
return;
});
// 外部socket接口
router.get(connector.getSubscribePointRouter(), async (ctx) => {
const { response } = ctx;
if (process.env.PM2_STATUS) {
// 如果使用了pm2则返回 @socket.io/pm2所监听的PM2_PORT端口
response.body = {
path: connector.getSubscribeRouter(),
port: process.env.PM2_PORT || 8080,
};
// 开发环境socket直接连接
return;
}
else {
// 不使用pm2则监听在http服务器端口上
response.body = {
path: connector.getSubscribeRouter(),
port: serverConfig.port,
};
return;
}
});
// 注入所有的endpoints
const endpoints = appLoader.getEndpoints();
const endpointsArray: [string, string, string][] = [];
for (const ep in endpoints) {
const useEndpointItem = (item: EndpointItem<ED, Cxt>) => {
const { method, fn, params, name } = item;
if (endpointsArray.find(
ele => ele[0] === ep && ele[1] === method
)) {
throw new Error(`endpoint中url为「${ep}」的方法「${method}」存在重复定义`);
}
let url = `/endpoint/${ep}`;
if (params) {
for (const p of params) {
url += `/:${p}`;
}
}
endpointsArray.push([name, method, url]);
endpoints.forEach(
([name, method, url, fn]) => {
router[method](name, url, async (ctx) => {
const { req, request, params } = ctx;
const { body, headers } = request;
const context = await contextBuilder()(appLoader.getStore());
await context.begin();
try {
const result = await fn(context, params, headers, req, body);
await context.commit();
const result = await fn(params, headers, req, body);
ctx.response.body = result;
return;
}
catch(err) {
await context.rollback();
console.error(`endpoint「${ep}」「${method}」出错`, err);
ctx.response.status = 500;
return;
}
});
};
if (endpoints[ep] instanceof Array) {
(endpoints[ep] as EndpointItem<ED, Cxt>[]).forEach(
epi => useEndpointItem(epi)
);
}
else {
useEndpointItem(endpoints[ep] as EndpointItem<ED, Cxt>);
}
}
);
router.get('/endpoint', async (ctx) => {
ctx.response.body = endpointsArray;
ctx.response.body = endpoints;
});
koa.use(router.routes());
const serverConfig = require(PathLib.join(path, '/configuration/server.json'));
console.log(`server will listen on port ${serverConfig.port}`);
koa.on('error', (err) => {
console.error(err);
throw err;
});
koa.listen(serverConfig.port);
httpServer.listen(serverConfig.port);
if (!omitWatchers) {
appLoader.startWatchers();