connect修正

This commit is contained in:
Xu Chang 2023-08-30 19:17:07 +08:00
parent 26fd2734f0
commit e9c6655bf6
8 changed files with 145 additions and 49 deletions

View File

@ -1,4 +1,4 @@
import { EntityDict, SubDataDef } from "./Entity";
import { EntityDict } from "./Entity";
import { OpRecord } from "./Entity";
import { AsyncContext } from "../store/AsyncRowStore";
export interface Aspect<ED extends EntityDict, Cxt extends AsyncContext<ED>> {
@ -10,6 +10,4 @@ export interface AspectWrapper<ED extends EntityDict, Cxt extends AsyncContext<E
opRecords?: OpRecord<ED>[];
message?: string | null;
}>;
sub: (data: Array<SubDataDef<ED, keyof ED>>, callback: (records: OpRecord<ED>[], ids: string[]) => void) => Promise<void>;
unsub: (ids: string[]) => Promise<void>;
}

View File

@ -4,30 +4,34 @@ import { AsyncContext, AsyncRowStore } from "../store/AsyncRowStore";
import { SyncContext } from "../store/SyncRowStore";
import { EntityDict, OpRecord } from "./Entity";
import { OakException } from "./Exception";
export declare abstract class Connector<ED extends EntityDict, BackCxt extends AsyncContext<ED>, FrontCxt extends SyncContext<ED>> {
abstract callAspect(name: string, params: any, context: FrontCxt): Promise<{
export interface Connector<ED extends EntityDict, BackCxt extends AsyncContext<ED>, FrontCxt extends SyncContext<ED>> {
callAspect: (name: string, params: any, context: FrontCxt) => Promise<{
result: any;
opRecords?: OpRecord<ED>[];
message?: string | null;
}>;
abstract getRouter(): string;
abstract parseRequest(headers: IncomingHttpHeaders, body: any, store: AsyncRowStore<ED, BackCxt>): Promise<{
getRouter: () => string;
parseRequest: (headers: IncomingHttpHeaders, body: any, store: AsyncRowStore<ED, BackCxt>) => Promise<{
name: string;
params: any;
context: BackCxt;
}>;
abstract serializeResult(result: any, context: BackCxt, headers: IncomingHttpHeaders, body: any): Promise<{
serializeResult: (result: any, context: BackCxt, headers: IncomingHttpHeaders, body: any) => Promise<{
body: any;
headers?: Record<string, any>;
}>;
abstract serializeException(exception: OakException<ED>, headers: IncomingHttpHeaders, body: any): {
serializeException: (exception: OakException<ED>, headers: IncomingHttpHeaders, body: any) => {
body: any;
headers?: Record<string, any>;
};
abstract getSubscribeRouter(): string;
abstract getBridgeRouter(): string;
abstract makeBridgeUrl(url: string, headers?: Record<string, string>): string;
abstract parseBridgeRequestQuery(urlParams: string): {
getSubscribeRouter: () => string;
getSubscribePoint: () => Promise<{
url: string;
path: string;
}>;
getBridgeRouter: () => string;
makeBridgeUrl: (url: string, headers?: Record<string, string>) => string;
parseBridgeRequestQuery: (urlParams: string) => {
url: string;
headers?: Record<string, string>;
};

View File

@ -1,9 +1,2 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Connector = void 0;
var Connector = /** @class */ (function () {
function Connector() {
}
return Connector;
}());
exports.Connector = Connector;

View File

@ -3,15 +3,23 @@ import { IncomingHttpHeaders } from "http";
import { AsyncContext, AsyncRowStore } from '../store/AsyncRowStore';
import { SyncContext } from '../store/SyncRowStore';
import { Connector, EntityDict, OakException } from "../types";
export declare class SimpleConnector<ED extends EntityDict, BackCxt extends AsyncContext<ED>, FrontCxt extends SyncContext<ED>> extends Connector<ED, BackCxt, FrontCxt> {
declare type ServerOption = {
protocol: string;
hostname: string;
port?: number;
apiPath?: string;
};
export declare class SimpleConnector<ED extends EntityDict, BackCxt extends AsyncContext<ED>, FrontCxt extends SyncContext<ED>> implements Connector<ED, BackCxt, FrontCxt> {
static ASPECT_ROUTER: string;
static BRIDGE_ROUTER: string;
static SUBSCRIBE_ROUTER: string;
private serverAspectUrl;
private serverBridgeUrl;
private serverSubscribeUrl;
private option;
private makeException;
private contextBuilder;
constructor(serverUrl: string, makeException: (exceptionData: any) => OakException<ED>, contextBuilder: (str: string | undefined) => (store: AsyncRowStore<ED, BackCxt>) => Promise<BackCxt>);
constructor(option: ServerOption, makeException: (exceptionData: any) => OakException<ED>, contextBuilder: (str: string | undefined) => (store: AsyncRowStore<ED, BackCxt>) => Promise<BackCxt>);
callAspect(name: string, params: any, context: FrontCxt): Promise<{
result: any;
opRecords: any;
@ -23,6 +31,10 @@ export declare class SimpleConnector<ED extends EntityDict, BackCxt extends Asyn
}>;
getRouter(): string;
getSubscribeRouter(): string;
getSubscribePoint(): Promise<{
url: any;
path: any;
}>;
parseRequest(headers: IncomingHttpHeaders, body: any, store: AsyncRowStore<ED, BackCxt>): Promise<{
name: string;
params: any;
@ -48,3 +60,4 @@ export declare class SimpleConnector<ED extends EntityDict, BackCxt extends Asyn
headers?: Record<string, string> | undefined;
};
}
export {};

View File

@ -21,15 +21,23 @@ function makeContentTypeAndBody(data) {
body: JSON.stringify(data),
};
}
var SimpleConnector = /** @class */ (function (_super) {
tslib_1.__extends(SimpleConnector, _super);
function SimpleConnector(serverUrl, makeException, contextBuilder) {
var _this = _super.call(this) || this;
_this.serverAspectUrl = "".concat(serverUrl).concat(SimpleConnector.ASPECT_ROUTER);
_this.serverBridgeUrl = "".concat(serverUrl).concat(SimpleConnector.BRIDGE_ROUTER);
_this.makeException = makeException;
_this.contextBuilder = contextBuilder;
return _this;
var SimpleConnector = /** @class */ (function () {
function SimpleConnector(option, makeException, contextBuilder) {
this.option = option;
var protocol = option.protocol, hostname = option.hostname, port = option.port, apiPath = option.apiPath;
var serverUrl = "".concat(protocol).concat(hostname);
if (typeof port === 'number') {
serverUrl += ":".concat(port);
}
if (apiPath) {
(0, assert_1.default)(apiPath.startsWith('/'));
serverUrl += apiPath;
}
this.serverAspectUrl = "".concat(serverUrl).concat(SimpleConnector.ASPECT_ROUTER);
this.serverBridgeUrl = "".concat(serverUrl).concat(SimpleConnector.BRIDGE_ROUTER);
this.serverSubscribeUrl = "".concat(serverUrl).concat(SimpleConnector.SUBSCRIBE_ROUTER);
this.makeException = makeException;
this.contextBuilder = contextBuilder;
}
SimpleConnector.prototype.callAspect = function (name, params, context) {
return tslib_1.__awaiter(this, void 0, void 0, function () {
@ -89,6 +97,36 @@ var SimpleConnector = /** @class */ (function (_super) {
SimpleConnector.prototype.getSubscribeRouter = function () {
return SimpleConnector.SUBSCRIBE_ROUTER;
};
SimpleConnector.prototype.getSubscribePoint = function () {
return tslib_1.__awaiter(this, void 0, void 0, function () {
var response, err, message, responseType, _a, url, path, port, url2;
return tslib_1.__generator(this, function (_b) {
switch (_b.label) {
case 0: return [4 /*yield*/, global.fetch(this.serverSubscribeUrl)];
case 1:
response = _b.sent();
if (response.status > 299) {
err = new types_1.OakExternalException("\u7F51\u7EDC\u8BF7\u6C42\u8FD4\u56DE\u5F02\u5E38\uFF0Cstatus\u662F".concat(response.status));
throw err;
}
message = response.headers.get('oak-message');
responseType = response.headers.get('Content-Type') || response.headers.get('content-type');
if (!(responseType === null || responseType === void 0 ? void 0 : responseType.toLocaleLowerCase().match(/application\/json/i))) return [3 /*break*/, 3];
return [4 /*yield*/, response.json()];
case 2:
_a = _b.sent(), url = _a.url, path = _a.path, port = _a.port;
url2 = url || "".concat(this.option.protocol).concat(this.option.hostname);
(0, assert_1.default)(port);
url2 += ':port';
return [2 /*return*/, {
url: url2,
path: path,
}];
case 3: throw new Error("\u5C1A\u4E0D\u652F\u6301\u7684content-type\u7C7B\u578B".concat(responseType));
}
});
});
};
SimpleConnector.prototype.parseRequest = function (headers, body, store) {
return tslib_1.__awaiter(this, void 0, void 0, function () {
var oakCxtStr, aspectName, context;
@ -180,5 +218,5 @@ var SimpleConnector = /** @class */ (function (_super) {
SimpleConnector.BRIDGE_ROUTER = '/bridge';
SimpleConnector.SUBSCRIBE_ROUTER = '/subscribe';
return SimpleConnector;
}(types_1.Connector));
}());
exports.SimpleConnector = SimpleConnector;

View File

@ -12,8 +12,4 @@ export interface AspectWrapper<ED extends EntityDict, Cxt extends AsyncContext<E
opRecords?: OpRecord<ED>[];
message?: string | null;
}>;
sub: (data: Array<SubDataDef<ED, keyof ED>>, callback: (records: OpRecord<ED>[], ids: string[]) => void) => Promise<void>;
unsub: (ids: string[]) => Promise<void>;
};

View File

@ -5,34 +5,39 @@ import { SyncContext } from "../store/SyncRowStore";
import { EntityDict, OpRecord } from "./Entity";
import { OakException } from "./Exception";
export abstract class Connector<ED extends EntityDict, BackCxt extends AsyncContext<ED>, FrontCxt extends SyncContext<ED>> {
abstract callAspect(name: string, params: any, context: FrontCxt): Promise<{
export interface Connector<ED extends EntityDict, BackCxt extends AsyncContext<ED>, FrontCxt extends SyncContext<ED>> {
callAspect: (name: string, params: any, context: FrontCxt) => Promise<{
result: any;
opRecords?: OpRecord<ED>[];
message?: string | null;
}>;
abstract getRouter(): string;
getRouter: () => string;
abstract parseRequest(headers: IncomingHttpHeaders, body: any, store: AsyncRowStore<ED, BackCxt>): Promise<{ name: string; params: any; context: BackCxt; }>;
parseRequest: (headers: IncomingHttpHeaders, body: any, store: AsyncRowStore<ED, BackCxt>) => Promise<{ name: string; params: any; context: BackCxt; }>;
abstract serializeResult(result: any, context: BackCxt, headers: IncomingHttpHeaders, body: any): Promise<{
serializeResult: (result: any, context: BackCxt, headers: IncomingHttpHeaders, body: any) => Promise<{
body: any;
headers?: Record<string, any>;
}>;
abstract serializeException(exception: OakException<ED>, headers: IncomingHttpHeaders, body: any): {
serializeException: (exception: OakException<ED>, headers: IncomingHttpHeaders, body: any) => {
body: any;
headers?: Record<string, any>;
};
abstract getSubscribeRouter(): string;
getSubscribeRouter: () => string;
abstract getBridgeRouter(): string;
getSubscribePoint: () => Promise<{
url: string;
path: string;
}>;
getBridgeRouter: () => string;
abstract makeBridgeUrl(url: string, headers?: Record<string, string>): string;
makeBridgeUrl: (url: string, headers?: Record<string, string>) => string;
abstract parseBridgeRequestQuery(urlParams: string): {
parseBridgeRequestQuery: (urlParams: string) => {
url: string;
headers?: Record<string, string>;
}

View File

@ -23,19 +23,38 @@ function makeContentTypeAndBody(data: any) {
};
}
export class SimpleConnector<ED extends EntityDict, BackCxt extends AsyncContext<ED>, FrontCxt extends SyncContext<ED>> extends Connector<ED, BackCxt, FrontCxt> {
type ServerOption = {
protocol: string;
hostname: string;
port?: number;
apiPath?: string;
};
export class SimpleConnector<ED extends EntityDict, BackCxt extends AsyncContext<ED>, FrontCxt extends SyncContext<ED>> implements Connector<ED, BackCxt, FrontCxt> {
static ASPECT_ROUTER = '/aspect';
static BRIDGE_ROUTER = '/bridge';
static SUBSCRIBE_ROUTER = '/subscribe';
private serverAspectUrl: string;
private serverBridgeUrl: string;
private serverSubscribeUrl: string;
private option: ServerOption;
private makeException: (exceptionData: any) => OakException<ED>;
private contextBuilder: (str: string | undefined) => (store: AsyncRowStore<ED, BackCxt>) => Promise<BackCxt>;
constructor(serverUrl: string, makeException: (exceptionData: any) => OakException<ED>, contextBuilder: (str: string | undefined) => (store: AsyncRowStore<ED, BackCxt>) => Promise<BackCxt>) {
super();
constructor(option: ServerOption, makeException: (exceptionData: any) => OakException<ED>, contextBuilder: (str: string | undefined) => (store: AsyncRowStore<ED, BackCxt>) => Promise<BackCxt>) {
this.option = option;
const { protocol, hostname, port, apiPath } = option;
let serverUrl = `${protocol}${hostname}`;
if (typeof port === 'number') {
serverUrl += `:${port}`;
}
if (apiPath) {
assert(apiPath.startsWith('/'));
serverUrl += apiPath;
}
this.serverAspectUrl = `${serverUrl}${SimpleConnector.ASPECT_ROUTER}`;
this.serverBridgeUrl = `${serverUrl}${SimpleConnector.BRIDGE_ROUTER}`;
this.serverSubscribeUrl = `${serverUrl}${SimpleConnector.SUBSCRIBE_ROUTER}`;
this.makeException = makeException;
this.contextBuilder = contextBuilder;
}
@ -100,6 +119,36 @@ export class SimpleConnector<ED extends EntityDict, BackCxt extends AsyncContext
return SimpleConnector.SUBSCRIBE_ROUTER;
}
async getSubscribePoint() {
const response = await global.fetch(this.serverSubscribeUrl);
if (response.status > 299) {
const err = new OakExternalException(`网络请求返回异常status是${response.status}`);
throw err;
}
const message = response.headers.get('oak-message');
const responseType = response.headers.get('Content-Type') || response.headers.get('content-type');
if (responseType?.toLocaleLowerCase().match(/application\/json/i)) {
const {
url,
path,
port,
} = await response.json();
let url2 = url || `${this.option.protocol}${this.option.hostname}`;
assert(port);
url2 += ':port';
return {
url: url2,
path,
};
}
else {
throw new Error(`尚不支持的content-type类型${responseType}`);
}
}
async parseRequest(headers: IncomingHttpHeaders, body: any, store: AsyncRowStore<ED, BackCxt>): Promise<{ name: string; params: any; context: BackCxt; }> {
const { 'oak-cxt': oakCxtStr, 'oak-aspect': aspectName } = headers;
assert(typeof oakCxtStr === 'string' || oakCxtStr === undefined);