///
import './polyfill';
import { IncomingHttpHeaders, createServer } from "http";
import PathLib, { join } from 'path';
import Koa from 'koa';
import KoaRouter from 'koa-router';
import KoaBody from 'koa-body';
// import logger from 'koa-logger';
import { AppLoader, getClusterInfo, ClusterAppLoader } from 'oak-backend-base';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { OakException, Connector, EntityDict, ClusterInfo } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AsyncRowStore, AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
import { SyncContext } from 'oak-domain/lib/store/SyncRowStore';
import { createAdapter } from "@socket.io/cluster-adapter";
import { setupWorker } from "@socket.io/sticky";
import { createAdapter as createRedisAdapter } from "@socket.io/redis-adapter";
import Redis from "ioredis";
import { Server, ServerOptions } from "socket.io";
import { ServerConfiguration } from 'oak-domain/lib/types/Configuration';
import { instrument } from "@socket.io/admin-ui";
import serve from 'koa-static';
import mount from 'koa-mount';
import chalk from 'chalk';
import { checkNodeVersion, randomString } from '../utils';
import bcrypt from 'bcryptjs';
import { LogFormatter, polyfillConsole, removePolyfill } from './polyfill';
checkNodeVersion()
const socketAdminUI = join(__dirname, '../../ui/socket-admin');
const DATA_SUBSCRIBE_NAMESPACE = '/dsn';
const SOCKET_NAMESPACE = '/sn';
const SERVER_SUBSCRIBER_NAMESPACE = process.env.OAK_SSUB_NAMESPACE || '/ssub';
const ExceptionMask = '内部不可知错误';
function concat(...paths: string[]) {
return paths.reduce(
(prev, current) => {
if (current.startsWith('/')) {
return `${prev}${current}`;
}
return `${prev}/${current}`;
}
);
}
export async function startup, Cxt extends BackendRuntimeContext>(
path: string,
connector: Connector,
omitWatchers?: boolean,
omitTimers?: boolean,
routine?: (context: AsyncContext) => Promise,
): Promise<(() => Promise) | any> {
// let errorHandler: InternalErrorHandler | undefined = undefined;
// try {
// errorHandler = require(join(
// path,
// 'lib',
// 'configuration',
// 'exception'
// )).default;
// } catch (err) {
// // 不存在exception配置
// }
const serverConfiguration: ServerConfiguration = require(join(
path,
'lib',
'configuration',
'server'
)).default;
// 拿到package.json,用作项目的唯一标识,否则无法区分不同项目的Redis+socketIO连接
const packageJson = require(join(path, 'package.json'));
const corsHeaders = [
'Content-Type',
'Content-Length',
'Authorization',
'Accept',
'X-Requested-With',
];
const corsMethods = ['PUT', 'POST', 'GET', 'DELETE', 'OPTIONS'];
const koa = new Koa();
// 使用 koa-logger 中间件打印日志
// koa.use(logger());
// 注册自定义中间件
if (serverConfiguration.middleware) {
if (Array.isArray(serverConfiguration.middleware)) {
serverConfiguration.middleware.forEach((mw) => {
koa.use(mw);
});
}
else if (typeof serverConfiguration.middleware === 'function') {
const mws = serverConfiguration.middleware(koa);
if (!Array.isArray(mws)) {
throw new Error('middleware 配置函数必须返回 Koa.Middleware 数组');
}
mws.forEach((mw) => {
koa.use(mw);
});
}
}
// socket
const httpServer = createServer(koa.callback());
const socketPath = connector.getSocketPath();
const socketOption: Partial = {
path: socketPath,
cors: ['development', 'staging'].includes(process.env.NODE_ENV!)
? {
origin: '*',
allowedHeaders: corsHeaders.concat(connector.getCorsHeader()),
}
: serverConfiguration.cors
? {
origin: serverConfiguration.cors.origin, //socket.io配置cors origin是支持数组和字符串
allowedHeaders: [
...corsHeaders.concat(connector.getCorsHeader()),
...(serverConfiguration.cors!.headers || []),
],
}
: undefined,
};
const io = new Server(httpServer, socketOption);
const clusterInfo = getClusterInfo();
if (clusterInfo.usingCluster) {
// 目前只支持单物理结点的pm2模式
// pm2环境下要接入clusterAdapter
// https://socket.io/zh-CN/docs/v4/pm2/
// 在单机的所有实例之间使用pm2的集群适配器
io.adapter(createAdapter());
setupWorker(io);
if (clusterInfo.enableRedis) {
// 在多机器之间使用redis适配器
const redisConfig = serverConfiguration.redis
if (!redisConfig) {
console.error('未配置Redis连接信息!');
process.exit(-1);
}
const isCluster = Array.isArray(redisConfig);
// 创建 Redis 客户端
const pubClient = isCluster ?
new Redis.Cluster(redisConfig.map((config) => ({
...config,
lazyConnect: true,
})))
: new Redis({
...redisConfig,
lazyConnect: true,
});
const subClient = pubClient.duplicate();
pubClient.on('connect', () => {
console.log('PUB已成功连接到Redis服务器');
});
pubClient.on('error', (err) => {
console.error('连接到Redis失败!', err);
// 如果连接到Redis失败,直接退出
process.exit(-1);
});
subClient.on('connect', () => {
console.log('SUB已成功连接到Redis服务器');
});
subClient.on('error', (err) => {
console.error('连接到Redis失败!', err);
// 如果连接到Redis失败,直接退出
process.exit(-1);
});
await Promise.all([
pubClient.connect(),
subClient.connect(),
]);
io.adapter(
// 为了使单台Redis可以在多个项目之间复用,需要为每个项目创建一个唯一的key
createRedisAdapter(pubClient, subClient, {
key: `${packageJson.name}-socket.io-${process.env.NODE_ENV || 'development'}`
.replace(/[^a-zA-Z0-9-_:.]/g, '') // 移除特殊字符(只保留字母、数字、-、_、:、.)
.replace(/\s+/g, '-') // 将空格替换为-
.toLowerCase(), // 转换为小写(统一格式)
})
);
console.log('已启用Redis适配器');
} else {
// 如果没有启用Redis,不应该出现instanceCount大于实际实例数的情况
console.warn('正处于单机集群环境,请确保实例数正确!');
}
console.log(
`以集群模式启动,实例总数『${clusterInfo.instanceCount}』,当前实例号『${clusterInfo.instanceId}』`
);
} else {
console.log('以单实例模式启动');
}
const { ui } = serverConfiguration;
const isPasswordSet = !!(process.env.SOCKET_ADMIN_PASSWORD || ui?.password)
// 密码使用随机字符串
const passwordForAdminUI = process.env.SOCKET_ADMIN_PASSWORD || ui?.password || randomString(16);
if (!ui?.disable) {
instrument(io, {
auth: {
type: "basic", // 使用基本认证,生产建议关闭或换成自定义 auth
username: ui?.username || "admin",
password: bcrypt.hashSync(passwordForAdminUI, 10), // 必须使用 bcrypt 加密之后的密码
},
mode: process.env.NODE_ENV === 'production' ? "production" : "development", // 根据环境设置模式
});
}
const appLoader = clusterInfo.usingCluster
? new ClusterAppLoader(
path,
io.of(DATA_SUBSCRIBE_NAMESPACE),
io.of(SOCKET_NAMESPACE),
io.of(SERVER_SUBSCRIBER_NAMESPACE),
connector.getSocketPath()
)
: new AppLoader(
path,
io.of(DATA_SUBSCRIBE_NAMESPACE),
io.of(SOCKET_NAMESPACE),
io.of(SERVER_SUBSCRIBER_NAMESPACE)
);
await appLoader.mount();
await appLoader.execStartRoutines();
if (routine) {
// 如果传入了routine,执行完成后就结束
const result = await appLoader.execRoutine(routine);
await appLoader.unmount();
return result;
}
// if (errorHandler && typeof errorHandler === 'function') {
// // polyfillConsole("startup", true, (props) => {
// // if (props.level === "error") {
// // appLoader.execRoutine(async (ctx) => {
// // await errorHandler(props.caller, props.args, ctx);
// // }).catch((err) => {
// // console.warn('执行全局错误处理失败:', err);
// // });
// // }
// // return props.args;
// // });
// // appLoader.registerInternalErrorHandler(async (ctx, type, msg, err) => {
// // await errorHandler(ctx, type, msg, err);
// // });
// }
appLoader.regAllExceptionHandler()
// 否则启动服务器模式
koa.use(async (ctx, next) => {
try {
await next();
} catch (err) {
console.error(err);
const { request } = ctx;
const exception =
err instanceof OakException
? err
: new OakException(
serverConfiguration?.internalExceptionMask ||
ExceptionMask
);
const { body } = connector.serializeException(
exception,
request.headers,
request.body
);
ctx.response.body = body;
return;
}
});
koa.use(
KoaBody(Object.assign({
multipart: true,
}, serverConfiguration.koaBody))
);
const router = new KoaRouter();
// 如果是开发环境,允许options
if (['development', 'staging'].includes(process.env.NODE_ENV!)) {
koa.use(async (ctx, next) => {
ctx.set('Access-Control-Allow-Origin', '*');
ctx.set('Access-Control-Allow-Headers', corsHeaders.concat(connector.getCorsHeader()));
ctx.set('Access-Control-Allow-Methods', corsMethods);
if (ctx.method == 'OPTIONS') {
ctx.body = 200;
} else {
await next();
}
});
} else if (serverConfiguration.cors) {
koa.use(async (ctx, next) => {
if (serverConfiguration.cors!.origin instanceof Array) {
// 获取到req.headers.origin格式:https://xxx.xx.com 加上端口号
const origin = ctx.req.headers.origin;
if (serverConfiguration.cors!.origin.includes(origin!)) {
ctx.set('Access-Control-Allow-Origin', origin!);
}
}
else {
ctx.set(
'Access-Control-Allow-Origin',
serverConfiguration.cors!.origin!
);
}
ctx.set('Access-Control-Allow-Headers', [
...corsHeaders.concat(connector.getCorsHeader()),
...(serverConfiguration.cors!.headers || []),
]);
ctx.set(
'Access-Control-Allow-Methods',
serverConfiguration.cors!.methods || corsMethods
);
if (ctx.method == 'OPTIONS') {
ctx.body = 200;
} else {
await next();
}
});
}
router.post(connector.getRouter(), async (ctx) => {
const { request } = ctx;
const { contextString, aspectName, data } = connector.parseRequest(
request.headers,
request.body,
request.files
);
const { result, opRecords, message } = await appLoader.execAspect(
aspectName,
request.headers,
contextString,
data
);
const { body, headers } = await connector.serializeResult(
result,
opRecords,
request.headers,
request.body,
message
);
ctx.response.body = body;
return;
});
// 桥接访问外部资源的入口
router.get(connector.getBridgeRouter(), async (ctx) => {
const {
request: { querystring },
response,
} = ctx;
const { url, headers } = connector.parseBridgeRequestQuery(querystring);
// headers待处理
const res = await fetch(url as string);
response.body = res.body;
return;
});
// 外部socket接口
/**
* 不用pm2 不用nginx: socket与http同端口
用pm2 不用nginx: socket监听8080
不用pm2 用nginx: nginx映射路径到server端口(手动配置),socket与http同端口(加路径)
用pm2 用nginx: nginx映射路径到8080(手动配置),socket与http同端口(加路径)
*/
const { nginx, hostname, port, socket: getSocketConfig } = serverConfiguration;
const protocol = nginx?.ssl ? 'https://' : 'http://';
let url = `${protocol}${hostname}`;
if (nginx) {
if (nginx.port) {
url += `:${nginx.port}`;
}
} else if (clusterInfo.usingCluster) {
url += `:${process.env.PM2_PORT || 8080}`;
} else {
url += `:${port}`;
}
// Example:
// import { io } from "socket.io-client";
// const socket = io('https://example.com/order', {
// path: '/my-custom-path/',
// });
// the Socket instance is attached to the "order" Namespace
// the HTTP requests will look like: GET https://example.com/my-custom-path/?EIO=4&transport=polling&t=ML4jUwU
// 文档 https://socket.io/docs/v4/client-options/
router.get(connector.getSocketPointRouter(), async (ctx) => {
const { response } = ctx;
let subscribeUrl = concat(url, DATA_SUBSCRIBE_NAMESPACE);
let socketUrl = concat(url, SOCKET_NAMESPACE);
if (typeof getSocketConfig === 'function') {
const socketConfig = getSocketConfig(ctx);
const url2 = socketConfig?.url;
if (url2) {
subscribeUrl = concat(url2, DATA_SUBSCRIBE_NAMESPACE);
socketUrl = concat(url2, SOCKET_NAMESPACE);
}
}
response.body = {
path: (nginx?.socketPath ? `/${nginx.socketPath}` : '') + connector.getSocketPath(),
socketUrl,
subscribeUrl,
};
return;
});
// 注入所有的endpoints
const endpoints = appLoader.getEndpoints(connector.getEndpointRouter());
endpoints.forEach(([name, method, url, fn]) => {
router[method](url, async (ctx) => {
const { req, request, params, response, res } = ctx;
const { body, headers, files } = request;
try {
const result = await fn(params, headers, req, files ? Object.assign({}, body, files) : body);
const { headers: headers2, data, statusCode } = result;
response.body = data;
if (headers2) {
Object.keys(headers2).forEach(
(k) => res.setHeader(k, headers2[k])
)
}
if (statusCode) {
res.statusCode = statusCode
}
return;
} catch (err) {
ctx.response.status = 500;
return;
}
});
});
router.get(connector.getEndpointRouter(), async (ctx) => {
ctx.response.body = endpoints;
});
const socketAdminMountRaw = ui?.path || '/socket-admin'
const socketAdminMount = socketAdminMountRaw.startsWith('/')
? socketAdminMountRaw : `/${socketAdminMountRaw}`;
// 注册静态资源
if (!ui?.disable) {
koa.use(mount(socketAdminMount, serve(socketAdminUI)));
}
koa.use(router.routes());
koa.on('error', (err) => {
console.error(err);
throw err;
});
httpServer.listen(serverConfiguration.port, () => {
const protocol = nginx?.ssl ? 'https' : 'http';
const host = hostname || 'localhost';
const port = nginx?.port || (clusterInfo.usingCluster ? (process.env.PM2_PORT || 8080) : serverConfiguration.port);
const baseUrl = `${protocol}://${host}:${port}`;
const adminUIUrl = `${baseUrl}${socketAdminMount}`;
console.log(chalk.greenBright.bold('\n🚀 Server started successfully!\n'));
console.log(`🔗 ${chalk.cyan('Server URL')}: ${chalk.underline(baseUrl)}`);
// socketio地址
console.log(`🔗 ${chalk.cyan('Socket URL')}: ${chalk.underline(concat(url, socketPath))}\n`);
if (!ui?.disable) {
console.log(`🛠️ ${chalk.magenta('Socket Admin UI')}: ${chalk.underline(adminUIUrl)}`);
// 账号密码
// 是否设置密码
if (isPasswordSet) {
console.log(`🔑 ${chalk.yellow('Socket Admin UI Password has been set, check the config file\n')}`)
} else {
console.log(chalk.yellow('Socket Admin UI Password Generated: ') + chalk.red(passwordForAdminUI));
console.log(chalk.yellow('Please set the password when running prod env.\n'));
}
}
});
if (!omitWatchers) {
appLoader.startWatchers();
}
if (!omitTimers) {
appLoader.startTimers();
}
process.on('SIGINT', async () => {
await appLoader.unmount();
process.exit(0);
});
const shutdown = async () => {
await httpServer.close();
await koa.removeAllListeners();
await appLoader.unmount();
removePolyfill("startup");
}
return shutdown
}