oak-cli/src/server/start.ts

488 lines
18 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/// <reference path="../typings/polyfill.d.ts" />
import './polyfill';
import { IncomingHttpHeaders, createServer } from "http";
import PathLib, { join } from 'path';
import Koa from 'koa';
import KoaRouter from 'koa-router';
import KoaBody from 'koa-body';
import logger from 'koa-logger';
import { AppLoader, getClusterInfo, ClusterAppLoader } from 'oak-backend-base';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { OakException, Connector, EntityDict, ClusterInfo } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AsyncRowStore, AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
import { SyncContext } from 'oak-domain/lib/store/SyncRowStore';
import { createAdapter } from "@socket.io/cluster-adapter";
import { setupWorker } from "@socket.io/sticky";
import { createAdapter as createRedisAdapter } from "@socket.io/redis-adapter";
import Redis from "ioredis";
import { Server, ServerOptions } from "socket.io";
import { ServerConfiguration } from 'oak-domain/lib/types/Configuration';
import { instrument } from "@socket.io/admin-ui";
import serve from 'koa-static';
import mount from 'koa-mount';
import chalk from 'chalk';
import { checkNodeVersion, randomString } from '../utils';
import bcrypt from 'bcryptjs';
import { LogFormatter, polyfillConsole, removePolyfill } from './polyfill';
import { ErrorHandler } from '../types';
checkNodeVersion()
const socketAdminUI = join(__dirname, '../../ui/socket-admin');
const DATA_SUBSCRIBE_NAMESPACE = '/dsn';
const SOCKET_NAMESPACE = '/sn';
const SERVER_SUBSCRIBER_NAMESPACE = process.env.OAK_SSUB_NAMESPACE || '/ssub';
const ExceptionMask = '内部不可知错误';
function concat(...paths: string[]) {
return paths.reduce(
(prev, current) => {
if (current.startsWith('/')) {
return `${prev}${current}`;
}
return `${prev}/${current}`;
}
);
}
export async function startup<ED extends EntityDict & BaseEntityDict, FrontCxt extends SyncContext<ED>, Cxt extends BackendRuntimeContext<ED>>(
path: string,
connector: Connector<ED, FrontCxt>,
omitWatchers?: boolean,
omitTimers?: boolean,
routine?: (context: AsyncContext<ED>) => Promise<void>,
): Promise<(() => Promise<any>) | any> {
let errorHandler: ErrorHandler<ED> | undefined = undefined;
try {
errorHandler = require(join(
path,
'lib',
'configuration',
'errors'
)).default;
} catch (err) {
// 不存在errors配置
}
const serverConfiguration: ServerConfiguration = require(join(
path,
'lib',
'configuration',
'server'
)).default;
// 拿到package.json用作项目的唯一标识否则无法区分不同项目的Redis+socketIO连接
const packageJson = require(join(path, 'package.json'));
const corsHeaders = [
'Content-Type',
'Content-Length',
'Authorization',
'Accept',
'X-Requested-With',
];
const corsMethods = ['PUT', 'POST', 'GET', 'DELETE', 'OPTIONS'];
const koa = new Koa();
// 使用 koa-logger 中间件打印日志
koa.use(logger());
// socket
const httpServer = createServer(koa.callback());
const socketPath = connector.getSocketPath();
const socketOption: Partial<ServerOptions> = {
path: socketPath,
cors: ['development', 'staging'].includes(process.env.NODE_ENV!)
? {
origin: '*',
allowedHeaders: corsHeaders.concat(connector.getCorsHeader()),
}
: serverConfiguration.cors
? {
origin: serverConfiguration.cors.origin, //socket.io配置cors origin是支持数组和字符串
allowedHeaders: [
...corsHeaders.concat(connector.getCorsHeader()),
...(serverConfiguration.cors!.headers || []),
],
}
: undefined,
};
const io = new Server(httpServer, socketOption);
const clusterInfo = getClusterInfo();
if (clusterInfo.usingCluster) {
// 目前只支持单物理结点的pm2模式
// pm2环境下要接入clusterAdapter
// https://socket.io/zh-CN/docs/v4/pm2/
// 在单机的所有实例之间使用pm2的集群适配器
io.adapter(createAdapter());
setupWorker(io);
if (clusterInfo.enableRedis) {
// 在多机器之间使用redis适配器
const redisConfig = serverConfiguration.redis
if (!redisConfig) {
console.error('未配置Redis连接信息');
process.exit(-1);
}
const isCluster = Array.isArray(redisConfig);
// 创建 Redis 客户端
const pubClient = isCluster ?
new Redis.Cluster(redisConfig.map((config) => ({
...config,
lazyConnect: true,
})))
: new Redis({
...redisConfig,
lazyConnect: true,
});
const subClient = pubClient.duplicate();
pubClient.on('connect', () => {
console.log('PUB已成功连接到Redis服务器');
});
pubClient.on('error', (err) => {
console.error('连接到Redis失败', err);
// 如果连接到Redis失败直接退出
process.exit(-1);
});
subClient.on('connect', () => {
console.log('SUB已成功连接到Redis服务器');
});
subClient.on('error', (err) => {
console.error('连接到Redis失败', err);
// 如果连接到Redis失败直接退出
process.exit(-1);
});
await Promise.all([
pubClient.connect(),
subClient.connect(),
]);
io.adapter(
// 为了使单台Redis可以在多个项目之间复用需要为每个项目创建一个唯一的key
createRedisAdapter(pubClient, subClient, {
key: `${packageJson.name}-socket.io-${process.env.NODE_ENV || 'development'}`
.replace(/[^a-zA-Z0-9-_:.]/g, '') // 移除特殊字符(只保留字母、数字、-、_、:、.
.replace(/\s+/g, '-') // 将空格替换为-
.toLowerCase(), // 转换为小写(统一格式)
})
);
console.log('已启用Redis适配器');
} else {
// 如果没有启用Redis不应该出现instanceCount大于实际实例数的情况
console.warn('正处于单机集群环境,请确保实例数正确!');
}
console.log(
`以集群模式启动,实例总数『${clusterInfo.instanceCount}』,当前实例号『${clusterInfo.instanceId}`
);
} else {
console.log('以单实例模式启动');
}
const { ui } = serverConfiguration;
const isPasswordSet = !!(process.env.SOCKET_ADMIN_PASSWORD || ui?.password)
// 密码使用随机字符串
const passwordForAdminUI = process.env.SOCKET_ADMIN_PASSWORD || ui?.password || randomString(16);
if (!ui?.disable) {
instrument(io, {
auth: {
type: "basic", // 使用基本认证,生产建议关闭或换成自定义 auth
username: ui?.username || "admin",
password: bcrypt.hashSync(passwordForAdminUI, 10), // 必须使用 bcrypt 加密之后的密码
},
mode: process.env.NODE_ENV === 'production' ? "production" : "development", // 根据环境设置模式
});
}
const appLoader = clusterInfo.usingCluster
? new ClusterAppLoader<ED, Cxt>(
path,
io.of(DATA_SUBSCRIBE_NAMESPACE),
io.of(SOCKET_NAMESPACE),
io.of(SERVER_SUBSCRIBER_NAMESPACE),
connector.getSocketPath()
)
: new AppLoader<ED, Cxt>(
path,
io.of(DATA_SUBSCRIBE_NAMESPACE),
io.of(SOCKET_NAMESPACE),
io.of(SERVER_SUBSCRIBER_NAMESPACE)
);
await appLoader.mount();
await appLoader.execStartRoutines();
if (routine) {
// 如果传入了routine执行完成后就结束
const result = await appLoader.execRoutine(routine);
await appLoader.unmount();
return result;
}
if (errorHandler && typeof errorHandler === 'function') {
polyfillConsole("startup", true, (props) => {
if (props.level === "error") {
appLoader.execRoutine(async (ctx) => {
await errorHandler(props.caller, props.args, ctx);
}).catch((err) => {
console.warn('执行全局错误处理失败:', err);
});
}
return props.args;
});
}
// 否则启动服务器模式
koa.use(async (ctx, next) => {
try {
await next();
} catch (err) {
console.error(err);
const { request } = ctx;
const exception =
err instanceof OakException
? err
: new OakException<ED>(
serverConfiguration?.internalExceptionMask ||
ExceptionMask
);
const { body } = connector.serializeException(
exception,
request.headers,
request.body
);
ctx.response.body = body;
return;
}
});
koa.use(
KoaBody(Object.assign({
multipart: true,
}, serverConfiguration.koaBody))
);
const router = new KoaRouter();
// 如果是开发环境允许options
if (['development', 'staging'].includes(process.env.NODE_ENV!)) {
koa.use(async (ctx, next) => {
ctx.set('Access-Control-Allow-Origin', '*');
ctx.set('Access-Control-Allow-Headers', corsHeaders.concat(connector.getCorsHeader()));
ctx.set('Access-Control-Allow-Methods', corsMethods);
if (ctx.method == 'OPTIONS') {
ctx.body = 200;
} else {
await next();
}
});
} else if (serverConfiguration.cors) {
koa.use(async (ctx, next) => {
if (serverConfiguration.cors!.origin instanceof Array) {
// 获取到req.headers.origin格式https://xxx.xx.com 加上端口号
const origin = ctx.req.headers.origin;
if (serverConfiguration.cors!.origin.includes(origin!)) {
ctx.set('Access-Control-Allow-Origin', origin!);
}
}
else {
ctx.set(
'Access-Control-Allow-Origin',
serverConfiguration.cors!.origin!
);
}
ctx.set('Access-Control-Allow-Headers', [
...corsHeaders.concat(connector.getCorsHeader()),
...(serverConfiguration.cors!.headers || []),
]);
ctx.set(
'Access-Control-Allow-Methods',
serverConfiguration.cors!.methods || corsMethods
);
if (ctx.method == 'OPTIONS') {
ctx.body = 200;
} else {
await next();
}
});
}
router.post(connector.getRouter(), async (ctx) => {
const { request } = ctx;
const { contextString, aspectName, data } = connector.parseRequest(
request.headers,
request.body,
request.files
);
const { result, opRecords, message } = await appLoader.execAspect(
aspectName,
request.headers,
contextString,
data
);
const { body, headers } = await connector.serializeResult(
result,
opRecords,
request.headers,
request.body,
message
);
ctx.response.body = body;
return;
});
// 桥接访问外部资源的入口
router.get(connector.getBridgeRouter(), async (ctx) => {
const {
request: { querystring },
response,
} = ctx;
const { url, headers } = connector.parseBridgeRequestQuery(querystring);
// headers待处理
const res = await fetch(url as string);
response.body = res.body;
return;
});
// 外部socket接口
/**
* 不用pm2 不用nginx socket与http同端口
用pm2 不用nginx: socket监听8080
不用pm2 用nginx: nginx映射路径到server端口手动配置socket与http同端口加路径
用pm2 用nginx: nginx映射路径到8080手动配置socket与http同端口加路径
*/
const { nginx, hostname, port, socket: getSocketConfig } = serverConfiguration;
const protocol = nginx?.ssl ? 'https://' : 'http://';
let url = `${protocol}${hostname}`;
if (nginx) {
if (nginx.port) {
url += `:${nginx.port}`;
}
} else if (clusterInfo.usingCluster) {
url += `:${process.env.PM2_PORT || 8080}`;
} else {
url += `:${port}`;
}
// Example:
// import { io } from "socket.io-client";
// const socket = io('https://example.com/order', {
// path: '/my-custom-path/',
// });
// the Socket instance is attached to the "order" Namespace
// the HTTP requests will look like: GET https://example.com/my-custom-path/?EIO=4&transport=polling&t=ML4jUwU
// 文档 https://socket.io/docs/v4/client-options/
router.get(connector.getSocketPointRouter(), async (ctx) => {
const { response } = ctx;
let subscribeUrl = concat(url, DATA_SUBSCRIBE_NAMESPACE);
let socketUrl = concat(url, SOCKET_NAMESPACE);
if (typeof getSocketConfig === 'function') {
const socketConfig = getSocketConfig(ctx);
const url2 = socketConfig?.url;
if (url2) {
subscribeUrl = concat(url2, DATA_SUBSCRIBE_NAMESPACE);
socketUrl = concat(url2, SOCKET_NAMESPACE);
}
}
response.body = {
path: (nginx?.socketPath ? `/${nginx.socketPath}` : '') + connector.getSocketPath(),
socketUrl,
subscribeUrl,
};
return;
});
// 注入所有的endpoints
const endpoints = appLoader.getEndpoints(connector.getEndpointRouter());
endpoints.forEach(([name, method, url, fn]) => {
router[method](url, async (ctx) => {
const { req, request, params, response, res } = ctx;
const { body, headers, files } = request;
try {
const result = await fn(params, headers, req, files ? Object.assign({}, body, files) : body);
const { headers: headers2, data, statusCode } = result;
response.body = data;
if (headers2) {
Object.keys(headers2).forEach(
(k) => res.setHeader(k, headers2[k])
)
}
if (statusCode) {
res.statusCode = statusCode
}
return;
} catch (err) {
ctx.response.status = 500;
return;
}
});
});
router.get(connector.getEndpointRouter(), async (ctx) => {
ctx.response.body = endpoints;
});
const socketAdminMountRaw = ui?.path || '/socket-admin'
const socketAdminMount = socketAdminMountRaw.startsWith('/')
? socketAdminMountRaw : `/${socketAdminMountRaw}`;
// 注册静态资源
if (!ui?.disable) {
koa.use(mount(socketAdminMount, serve(socketAdminUI)));
}
koa.use(router.routes());
koa.on('error', (err) => {
console.error(err);
throw err;
});
httpServer.listen(serverConfiguration.port, () => {
const protocol = nginx?.ssl ? 'https' : 'http';
const host = hostname || 'localhost';
const port = nginx?.port || (clusterInfo.usingCluster ? (process.env.PM2_PORT || 8080) : serverConfiguration.port);
const baseUrl = `${protocol}://${host}:${port}`;
const adminUIUrl = `${baseUrl}${socketAdminMount}`;
console.log(chalk.greenBright.bold('\n🚀 Server started successfully!\n'));
console.log(`🔗 ${chalk.cyan('Server URL')}: ${chalk.underline(baseUrl)}`);
// socketio地址
console.log(`🔗 ${chalk.cyan('Socket URL')}: ${chalk.underline(concat(url, socketPath))}\n`);
if (!ui?.disable) {
console.log(`🛠️ ${chalk.magenta('Socket Admin UI')}: ${chalk.underline(adminUIUrl)}`);
// 账号密码
// 是否设置密码
if (isPasswordSet) {
console.log(`🔑 ${chalk.yellow('Socket Admin UI Password has been set, check the config file\n')}`)
} else {
console.log(chalk.yellow('Socket Admin UI Password Generated: ') + chalk.red(passwordForAdminUI));
console.log(chalk.yellow('Please set the password when running prod env.\n'));
}
}
});
if (!omitWatchers) {
appLoader.startWatchers();
}
if (!omitTimers) {
appLoader.startTimers();
}
process.on('SIGINT', async () => {
await appLoader.unmount();
process.exit(0);
});
const shutdown = async () => {
await httpServer.close();
await koa.removeAllListeners();
await appLoader.unmount();
removePolyfill("startup");
}
return shutdown
}