socket拼接url时, 增加demo注释说明

This commit is contained in:
wkj 2024-04-25 20:03:18 +08:00
parent dfe86166c8
commit a8d4e25e9c
2 changed files with 155 additions and 73 deletions

View File

@ -27,20 +27,32 @@ function concat(...paths) {
} }
async function startup(path, connector, omitWatchers, omitTimers, routine) { async function startup(path, connector, omitWatchers, omitTimers, routine) {
const serverConfiguration = require((0, path_1.join)(path, 'lib', 'configuration', 'server')).default; const serverConfiguration = require((0, path_1.join)(path, 'lib', 'configuration', 'server')).default;
const corsHeaders = ['Content-Type', 'Content-Length', 'Authorization', 'Accept', 'X-Requested-With', 'oak-cxt', 'oak-aspect']; const corsHeaders = [
'Content-Type',
'Content-Length',
'Authorization',
'Accept',
'X-Requested-With',
'oak-cxt',
'oak-aspect',
];
const corsMethods = ['PUT', 'POST', 'GET', 'DELETE', 'OPTIONS']; const corsMethods = ['PUT', 'POST', 'GET', 'DELETE', 'OPTIONS'];
const koa = new koa_1.default(); const koa = new koa_1.default();
// socket // socket
const httpServer = (0, http_1.createServer)(koa.callback()); const httpServer = (0, http_1.createServer)(koa.callback());
const socketOption = { const socketOption = {
path: connector.getSubscribeRouter(), path: connector.getSubscribeRouter(),
cors: process.env.NODE_ENV === 'development' ? { cors: process.env.NODE_ENV === 'development'
origin: '*', ? {
allowedHeaders: corsHeaders, origin: '*',
} : (serverConfiguration.cors ? { allowedHeaders: corsHeaders,
origin: serverConfiguration.cors.origin, }
allowedHeaders: serverConfiguration.cors.headers, : serverConfiguration.cors
} : undefined), ? {
origin: serverConfiguration.cors.origin,
allowedHeaders: serverConfiguration.cors.headers,
}
: undefined,
}; };
const io = new socket_io_1.Server(httpServer, socketOption); const io = new socket_io_1.Server(httpServer, socketOption);
const clusterInfo = (0, oak_backend_base_1.getClusterInfo)(); const clusterInfo = (0, oak_backend_base_1.getClusterInfo)();
@ -73,7 +85,10 @@ async function startup(path, connector, omitWatchers, omitTimers, routine) {
catch (err) { catch (err) {
console.error(err); console.error(err);
const { request } = ctx; const { request } = ctx;
const exception = err instanceof types_1.OakException ? err : new types_1.OakException(serverConfiguration?.internalExceptionMask || ExceptionMask); const exception = err instanceof types_1.OakException
? err
: new types_1.OakException(serverConfiguration?.internalExceptionMask ||
ExceptionMask);
const { body } = connector.serializeException(exception, request.headers, request.body); const { body } = connector.serializeException(exception, request.headers, request.body);
ctx.response.body = body; ctx.response.body = body;
return; return;
@ -123,7 +138,7 @@ async function startup(path, connector, omitWatchers, omitTimers, routine) {
}); });
// 桥接访问外部资源的入口 // 桥接访问外部资源的入口
router.get(connector.getBridgeRouter(), async (ctx) => { router.get(connector.getBridgeRouter(), async (ctx) => {
const { request: { querystring }, response } = ctx; const { request: { querystring }, response, } = ctx;
const { url, headers } = connector.parseBridgeRequestQuery(querystring); const { url, headers } = connector.parseBridgeRequestQuery(querystring);
// headers待处理 // headers待处理
const res = await fetch(url); const res = await fetch(url);
@ -144,7 +159,6 @@ async function startup(path, connector, omitWatchers, omitTimers, routine) {
if (nginx.port) { if (nginx.port) {
url += `:${nginx.port}`; url += `:${nginx.port}`;
} }
// url = concat(url, `/${nginx.socketPath}`);
} }
else if (clusterInfo.usingCluster) { else if (clusterInfo.usingCluster) {
url += `:${process.env.PM2_PORT || 8080}`; url += `:${process.env.PM2_PORT || 8080}`;
@ -153,12 +167,21 @@ async function startup(path, connector, omitWatchers, omitTimers, routine) {
url += `:${port}`; url += `:${port}`;
} }
url = concat(url, DATA_SUBSCRIBER_NAMESPACE); url = concat(url, DATA_SUBSCRIBER_NAMESPACE);
// 配置nginx的socketPath 需加在path上 // Example:
// import { io } from "socket.io-client";
// const socket = io('https://example.com/order', {
// path: '/my-custom-path/',
// });
// the Socket instance is attached to the "order" Namespace
// the HTTP requests will look like: GET https://example.com/my-custom-path/?EIO=4&transport=polling&t=ML4jUwU
// 文档 https://socket.io/docs/v4/client-options/
router.get(connector.getSubscribePointRouter(), async (ctx) => { router.get(connector.getSubscribePointRouter(), async (ctx) => {
const { response } = ctx; const { response } = ctx;
response.body = { response.body = {
url, url,
path: nginx?.socketPath ? `/${nginx.socketPath}` : '' + connector.getSubscribeRouter(), path: nginx?.socketPath
? `/${nginx.socketPath}`
: '' + connector.getSubscribeRouter(),
}; };
return; return;
}); });

View File

@ -37,10 +37,21 @@ export async function startup<ED extends EntityDict & BaseEntityDict, FrontCxt e
omitTimers?: boolean, omitTimers?: boolean,
routine?: (context: AsyncContext<EntityDict & BaseEntityDict>) => Promise<void>, routine?: (context: AsyncContext<EntityDict & BaseEntityDict>) => Promise<void>,
) { ) {
const serverConfiguration: ServerConfiguration = require( const serverConfiguration: ServerConfiguration = require(join(
join(path, 'lib', 'configuration', 'server') path,
).default; 'lib',
const corsHeaders = ['Content-Type', 'Content-Length', 'Authorization', 'Accept', 'X-Requested-With', 'oak-cxt', 'oak-aspect']; 'configuration',
'server'
)).default;
const corsHeaders = [
'Content-Type',
'Content-Length',
'Authorization',
'Accept',
'X-Requested-With',
'oak-cxt',
'oak-aspect',
];
const corsMethods = ['PUT', 'POST', 'GET', 'DELETE', 'OPTIONS']; const corsMethods = ['PUT', 'POST', 'GET', 'DELETE', 'OPTIONS'];
const koa = new Koa(); const koa = new Koa();
@ -48,13 +59,18 @@ export async function startup<ED extends EntityDict & BaseEntityDict, FrontCxt e
const httpServer = createServer(koa.callback()); const httpServer = createServer(koa.callback());
const socketOption: Partial<ServerOptions> = { const socketOption: Partial<ServerOptions> = {
path: connector.getSubscribeRouter(), path: connector.getSubscribeRouter(),
cors: process.env.NODE_ENV === 'development' ? { cors:
origin: '*', process.env.NODE_ENV === 'development'
allowedHeaders: corsHeaders, ? {
} : (serverConfiguration.cors ? { origin: '*',
origin: serverConfiguration.cors.origin, allowedHeaders: corsHeaders,
allowedHeaders: serverConfiguration.cors.headers, }
} : undefined), : serverConfiguration.cors
? {
origin: serverConfiguration.cors.origin,
allowedHeaders: serverConfiguration.cors.headers,
}
: undefined,
}; };
const io = new Server(httpServer, socketOption); const io = new Server(httpServer, socketOption);
const clusterInfo = getClusterInfo(); const clusterInfo = getClusterInfo();
@ -64,14 +80,20 @@ export async function startup<ED extends EntityDict & BaseEntityDict, FrontCxt e
// https://socket.io/zh-CN/docs/v4/pm2/ // https://socket.io/zh-CN/docs/v4/pm2/
io.adapter(createAdapter()); io.adapter(createAdapter());
setupWorker(io); setupWorker(io);
console.log(`以集群模式启动,实例总数『${clusterInfo.instanceCount}』,当前实例号『${clusterInfo.instanceId}`); console.log(
} `以集群模式启动,实例总数『${clusterInfo.instanceCount}』,当前实例号『${clusterInfo.instanceId}`
else { );
} else {
console.log('以单实例模式启动'); console.log('以单实例模式启动');
} }
const appLoader = clusterInfo.usingCluster const appLoader = clusterInfo.usingCluster
? new ClusterAppLoader(path, io.of(DATA_SUBSCRIBER_NAMESPACE), io.of(SERVER_SUBSCRIBER_NAMESPACE), connector.getSubscribeRouter()) ? new ClusterAppLoader(
path,
io.of(DATA_SUBSCRIBER_NAMESPACE),
io.of(SERVER_SUBSCRIBER_NAMESPACE),
connector.getSubscribeRouter()
)
: new AppLoader(path, io.of(DATA_SUBSCRIBER_NAMESPACE)); : new AppLoader(path, io.of(DATA_SUBSCRIBER_NAMESPACE));
await appLoader.mount(); await appLoader.mount();
await appLoader.execStartRoutines(); await appLoader.execStartRoutines();
@ -85,19 +107,30 @@ export async function startup<ED extends EntityDict & BaseEntityDict, FrontCxt e
koa.use(async (ctx, next) => { koa.use(async (ctx, next) => {
try { try {
await next(); await next();
} } catch (err) {
catch (err) {
console.error(err); console.error(err);
const { request } = ctx; const { request } = ctx;
const exception = err instanceof OakException ? err : new OakException(serverConfiguration?.internalExceptionMask || ExceptionMask); const exception =
const { body } = connector.serializeException(exception, request.headers, request.body); err instanceof OakException
? err
: new OakException(
serverConfiguration?.internalExceptionMask ||
ExceptionMask
);
const { body } = connector.serializeException(
exception,
request.headers,
request.body
);
ctx.response.body = body; ctx.response.body = body;
return; return;
} }
}) });
koa.use(KoaBody({ koa.use(
multipart: true, KoaBody({
})); multipart: true,
})
);
const router = new KoaRouter(); const router = new KoaRouter();
// 如果是开发环境允许options // 如果是开发环境允许options
@ -112,15 +145,20 @@ export async function startup<ED extends EntityDict & BaseEntityDict, FrontCxt e
await next(); await next();
} }
}); });
} } else if (serverConfiguration.cors) {
else if (serverConfiguration.cors) {
koa.use(async (ctx, next) => { koa.use(async (ctx, next) => {
ctx.set('Access-Control-Allow-Origin', serverConfiguration.cors!.origin!); ctx.set(
'Access-Control-Allow-Origin',
serverConfiguration.cors!.origin!
);
ctx.set('Access-Control-Allow-Headers', [ ctx.set('Access-Control-Allow-Headers', [
...corsHeaders, ...corsHeaders,
...(serverConfiguration.cors!.headers || []), ...(serverConfiguration.cors!.headers || []),
]); ]);
ctx.set('Access-Control-Allow-Methods', serverConfiguration.cors!.methods || corsMethods); ctx.set(
'Access-Control-Allow-Methods',
serverConfiguration.cors!.methods || corsMethods
);
if (ctx.method == 'OPTIONS') { if (ctx.method == 'OPTIONS') {
ctx.body = 200; ctx.body = 200;
} else { } else {
@ -131,17 +169,35 @@ export async function startup<ED extends EntityDict & BaseEntityDict, FrontCxt e
router.post(connector.getRouter(), async (ctx) => { router.post(connector.getRouter(), async (ctx) => {
const { request } = ctx; const { request } = ctx;
const { contextString, aspectName, data } = connector.parseRequest(request.headers, request.body, request.files); const { contextString, aspectName, data } = connector.parseRequest(
request.headers,
const { result, opRecords, message } = await appLoader.execAspect(aspectName, request.headers, contextString, data); request.body,
const { body, headers } = await connector.serializeResult(result, opRecords, request.headers, request.body, message); request.files
);
const { result, opRecords, message } = await appLoader.execAspect(
aspectName,
request.headers,
contextString,
data
);
const { body, headers } = await connector.serializeResult(
result,
opRecords,
request.headers,
request.body,
message
);
ctx.response.body = body; ctx.response.body = body;
return; return;
}); });
// 桥接访问外部资源的入口 // 桥接访问外部资源的入口
router.get(connector.getBridgeRouter(), async (ctx) => { router.get(connector.getBridgeRouter(), async (ctx) => {
const { request: { querystring }, response } = ctx; const {
request: { querystring },
response,
} = ctx;
const { url, headers } = connector.parseBridgeRequestQuery(querystring); const { url, headers } = connector.parseBridgeRequestQuery(querystring);
// headers待处理 // headers待处理
@ -149,7 +205,7 @@ export async function startup<ED extends EntityDict & BaseEntityDict, FrontCxt e
response.body = res.body; response.body = res.body;
return; return;
}); });
// 外部socket接口 // 外部socket接口
/** /**
* pm2 nginx socket与http同端口 * pm2 nginx socket与http同端口
@ -164,45 +220,48 @@ export async function startup<ED extends EntityDict & BaseEntityDict, FrontCxt e
if (nginx.port) { if (nginx.port) {
url += `:${nginx.port}`; url += `:${nginx.port}`;
} }
// url = concat(url, `/${nginx.socketPath}`); } else if (clusterInfo.usingCluster) {
}
else if (clusterInfo.usingCluster){
url += `:${process.env.PM2_PORT || 8080}`; url += `:${process.env.PM2_PORT || 8080}`;
} } else {
else {
url += `:${port}`; url += `:${port}`;
} }
url = concat(url, DATA_SUBSCRIBER_NAMESPACE); url = concat(url, DATA_SUBSCRIBER_NAMESPACE);
// 配置nginx的socketPath 需加在path上 // Example:
// import { io } from "socket.io-client";
// const socket = io('https://example.com/order', {
// path: '/my-custom-path/',
// });
// the Socket instance is attached to the "order" Namespace
// the HTTP requests will look like: GET https://example.com/my-custom-path/?EIO=4&transport=polling&t=ML4jUwU
// 文档 https://socket.io/docs/v4/client-options/
router.get(connector.getSubscribePointRouter(), async (ctx) => { router.get(connector.getSubscribePointRouter(), async (ctx) => {
const { response } = ctx; const { response } = ctx;
response.body = { response.body = {
url, url,
path: nginx?.socketPath ? `/${nginx.socketPath}` : '' + connector.getSubscribeRouter(), path: nginx?.socketPath
? `/${nginx.socketPath}`
: '' + connector.getSubscribeRouter(),
}; };
return; return;
}); });
// 注入所有的endpoints // 注入所有的endpoints
const endpoints = appLoader.getEndpoints(connector.getEndpointRouter()); const endpoints = appLoader.getEndpoints(connector.getEndpointRouter());
endpoints.forEach( endpoints.forEach(([name, method, url, fn]) => {
([name, method, url, fn]) => { router[method](url, async (ctx) => {
router[method](url, async (ctx) => { const { req, request, params } = ctx;
const { req, request, params } = ctx; const { body, headers } = request;
const { body, headers } = request; try {
try { const result = await fn(params, headers, req, body);
const result = await fn(params, headers, req, body); ctx.response.body = result;
ctx.response.body = result; return;
return; } catch (err) {
} ctx.response.status = 500;
catch(err) { return;
ctx.response.status = 500; }
return; });
} });
});
}
);
router.get(connector.getEndpointRouter(), async (ctx) => { router.get(connector.getEndpointRouter(), async (ctx) => {
ctx.response.body = endpoints; ctx.response.body = endpoints;
}); });