调整了synchronizer,更新了backendContext的生成函数注入

This commit is contained in:
Xu Chang 2024-03-29 18:26:45 +08:00
parent 386f67c5b8
commit 75e687c019
15 changed files with 116 additions and 76 deletions

9
lib/AppLoader.d.ts vendored
View File

@ -7,20 +7,21 @@ import { IncomingHttpHeaders, IncomingMessage } from 'http';
import { Namespace } from 'socket.io';
import DataSubscriber from './cluster/DataSubscriber';
import Synchronizer from './Synchronizer';
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends GeneralAppLoader<ED, Cxt> {
protected dbStore: DbStore<ED, Cxt>;
private aspectDict;
private externalDependencies;
protected dataSubscriber?: DataSubscriber<ED, Cxt>;
protected synchronizer?: Synchronizer<ED, Cxt>;
protected contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>;
protected contextBuilder: (store: DbStore<ED, Cxt>) => Cxt;
private requireSth;
protected makeContext(cxtStr?: string, headers?: IncomingHttpHeaders): Promise<Cxt>;
/**
* configuration
*/
private getConfiguration;
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace, nsServer?: Namespace);
constructor(path: string, ns?: Namespace, nsServer?: Namespace);
protected registerTrigger(trigger: Trigger<ED, keyof ED, Cxt>): void;
initTriggers(): void;
mount(initialize?: true): Promise<void>;
@ -30,7 +31,7 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
result: any;
message?: string;
}>;
initialize(dropIfExists?: boolean): Promise<void>;
initialize(truncate?: boolean): Promise<void>;
getStore(): DbStore<ED, Cxt>;
getEndpoints(prefix: string): [string, "post" | "get" | "put" | "delete", string, (params: Record<string, string>, headers: IncomingHttpHeaders, req: IncomingMessage, body?: any) => Promise<any>][];
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt): Promise<OperationResult<ED>>;
@ -39,5 +40,5 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
startWatchers(): void;
startTimers(): void;
execStartRoutines(): Promise<void>;
execRoutine(routine: (context: Cxt) => Promise<void>): Promise<void>;
execRoutine(routine: <Cxt extends AsyncContext<ED>>(context: Cxt) => Promise<void>): Promise<void>;
}

View File

@ -89,8 +89,11 @@ class AppLoader extends types_1.AppLoader {
return sthOut;
}
async makeContext(cxtStr, headers) {
const context = await this.contextBuilder(cxtStr)(this.dbStore);
context.clusterInfo = (0, env_2.getClusterInfo)();
const context = this.contextBuilder(this.dbStore);
await context.begin();
if (cxtStr) {
await context.initialize(JSON.parse(cxtStr));
}
context.headers = headers;
return context;
}
@ -107,19 +110,21 @@ class AppLoader extends types_1.AppLoader {
syncConfig: syncConfigs,
};
}
constructor(path, contextBuilder, ns, nsServer) {
constructor(path, ns, nsServer) {
super(path);
const { dbConfig } = this.getConfiguration();
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
const { authDeduceRelationMap, selectFreeEntities, updateFreeDict } = require(`${path}/lib/config/relation`);
this.externalDependencies = require((0, env_1.OAK_EXTERNAL_LIBS_FILEPATH)((0, path_1.join)(path, 'lib')));
this.aspectDict = Object.assign({}, index_1.default, this.requireSth('lib/aspects/index'));
this.dbStore = new DbStore_1.DbStore(storageSchema, (cxtStr) => this.makeContext(cxtStr), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
this.dbStore = new DbStore_1.DbStore(storageSchema, () => this.contextBuilder(this.dbStore), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
if (ns) {
this.dataSubscriber = new DataSubscriber_1.default(ns, (scene) => this.contextBuilder(scene)(this.dbStore), nsServer);
this.dataSubscriber = new DataSubscriber_1.default(ns, nsServer);
}
this.contextBuilder = (scene) => async (store) => {
const context = await contextBuilder(scene)(store);
const { BackendRuntimeContext } = require(`${path}/lib/context/BackendRuntimeContext`);
this.contextBuilder = (store) => {
const context = new BackendRuntimeContext(store);
context.clusterInfo = (0, env_2.getClusterInfo)();
const originCommit = context.commit;
context.commit = async () => {
const { eventOperationMap, opRecords } = context;
@ -160,7 +165,7 @@ class AppLoader extends types_1.AppLoader {
if (!initialize) {
const { syncConfig: syncConfig } = this.getConfiguration();
if (syncConfig) {
this.synchronizer = new Synchronizer_1.default(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder()(this.dbStore));
this.synchronizer = new Synchronizer_1.default(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore));
}
this.initTriggers();
}
@ -179,7 +184,6 @@ class AppLoader extends types_1.AppLoader {
if (!fn) {
throw new Error(`不存在的接口名称: ${name}`);
}
await context.begin();
try {
const result = await fn(params, context);
await context.refineOpRecords();
@ -197,17 +201,17 @@ class AppLoader extends types_1.AppLoader {
throw err;
}
}
async initialize(dropIfExists) {
await this.dbStore.initialize(dropIfExists);
async initialize(truncate) {
await this.dbStore.initialize({ ifExists: 'dropIfNotStatic' });
const data = this.requireSth('lib/data/index');
const context = await this.contextBuilder()(this.dbStore);
const context = this.contextBuilder(this.dbStore);
for (const entity in data) {
let rows = data[entity];
if (entity === 'area') {
// 对area暂时处理一下
rows = require('./data/area.json');
}
if (rows.length > 0) {
if (rows.length > 0 && (!truncate || !this.dbStore.getSchema()[entity].static)) {
await context.begin();
try {
await this.dbStore.operate(entity, {
@ -251,7 +255,6 @@ class AppLoader extends types_1.AppLoader {
}
endPointRouters.push([name, method, url, async (params, headers, req, body) => {
const context = await this.makeContext(undefined, headers);
await context.begin();
try {
const result = await fn(context, params, headers, req, body);
await context.commit();
@ -292,7 +295,6 @@ class AppLoader extends types_1.AppLoader {
}
async execWatcher(watcher) {
const context = await this.makeContext();
await context.begin();
let result;
try {
if (watcher.hasOwnProperty('actionData')) {
@ -378,7 +380,6 @@ class AppLoader extends types_1.AppLoader {
}
else {
const context = await this.makeContext();
await context.begin();
try {
const { timer: timerFn } = timer;
const result = await timerFn(context);
@ -415,7 +416,6 @@ class AppLoader extends types_1.AppLoader {
const { name, routine: routineFn } = routine;
const context = await this.makeContext();
const start = Date.now();
await context.begin();
try {
const result = await routineFn(context);
console.log(`例程【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是【${result}`);

View File

@ -2,7 +2,6 @@ import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { EntityDict, OperationResult, Trigger } from 'oak-domain/lib/types';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { AppLoader } from './AppLoader';
import { DbStore } from './DbStore';
import { Namespace } from 'socket.io';
import { Socket } from 'socket.io-client';
export declare class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends AppLoader<ED, Cxt> {
@ -10,7 +9,7 @@ export declare class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cx
private csTriggers;
private connect;
private sub;
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, nsDs: Namespace, nsServer: Namespace, socketPath: string);
constructor(path: string, nsDs: Namespace, nsServer: Namespace, socketPath: string);
protected registerTrigger(trigger: Trigger<ED, keyof ED, Cxt>): void;
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt): Promise<OperationResult<ED>>;
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt): Promise<Partial<ED[T]['Schema']>[]>;

View File

@ -4,6 +4,7 @@ exports.ClusterAppLoader = void 0;
const tslib_1 = require("tslib");
const lodash_1 = require("oak-domain/lib/utils/lodash");
const filter_1 = require("oak-domain/lib/store/filter");
const types_1 = require("oak-domain/lib/types");
const env_1 = require("./cluster/env");
const AppLoader_1 = require("./AppLoader");
const assert_1 = tslib_1.__importDefault(require("assert"));
@ -27,7 +28,7 @@ class ClusterAppLoader extends AppLoader_1.AppLoader {
});
this.socket.on('data', async (entity, name, ids, cxtStr, option) => {
const context = await this.makeContext(cxtStr);
await context.begin();
// await context.begin();
try {
await this.dbStore.execVolatileTrigger(entity, name, ids, context, option);
await context.commit();
@ -50,19 +51,21 @@ class ClusterAppLoader extends AppLoader_1.AppLoader {
this.socket.connect();
}
}
constructor(path, contextBuilder, nsDs, nsServer, socketPath) {
super(path, contextBuilder, nsDs, nsServer);
constructor(path, nsDs, nsServer, socketPath) {
super(path, nsDs, nsServer);
this.dbStore.setOnVolatileTrigger(async (entity, trigger, ids, cxtStr, option) => {
const execLocal = async (ids2) => {
const context = await this.makeContext(cxtStr);
await context.begin();
// await context.begin();
try {
await this.dbStore.execVolatileTrigger(entity, trigger.name, ids2, context, option);
await context.commit();
}
catch (err) {
await context.rollback();
console.error('execVolatileTrigger异常', entity, trigger.name, ids2, option, err);
if (!(err instanceof types_1.OakMakeSureByMySelfException)) {
console.error('execVolatileTrigger异常', entity, trigger.name, ids2, option, err);
}
}
};
if (trigger.cs) {

4
lib/DbStore.d.ts vendored
View File

@ -3,11 +3,11 @@ import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEn
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { MySQLConfiguration } from 'oak-db/lib/MySQL/types/Configuration';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { AsyncContext, AsyncRowStore } from 'oak-domain/lib/store/AsyncRowStore';
import { AsyncRowStore, AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
export declare class DbStore<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends MysqlStore<ED, Cxt> implements AsyncRowStore<ED, Cxt> {
private executor;
private relationAuth;
constructor(storageSchema: StorageSchema<ED>, contextBuilder: (scene?: string) => Promise<Cxt>, mysqlConfiguration: MySQLConfiguration, authDeduceRelationMap: AuthDeduceRelationMap<ED>, selectFreeEntities?: SelectFreeEntities<ED>, updateFreeDict?: UpdateFreeDict<ED>, onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>);
constructor(storageSchema: StorageSchema<ED>, contextBuilder: () => Cxt, mysqlConfiguration: MySQLConfiguration, authDeduceRelationMap: AuthDeduceRelationMap<ED>, selectFreeEntities?: SelectFreeEntities<ED>, updateFreeDict?: UpdateFreeDict<ED>, onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>);
protected cascadeUpdateAsync<T extends keyof ED>(entity: T, operation: ED[T]['Operation'], context: AsyncContext<ED>, option: MysqlOperateOption): Promise<import("oak-domain/lib/types").OperationResult<ED>>;
operate<T extends keyof ED>(entity: T, operation: ED[T]['Operation'], context: Cxt, option: MysqlOperateOption): Promise<import("oak-domain/lib/types").OperationResult<ED>>;
select<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, option: MySqlSelectOption): Promise<Partial<ED[T]["Schema"]>[]>;

View File

@ -9,7 +9,7 @@ class DbStore extends oak_db_1.MysqlStore {
relationAuth;
constructor(storageSchema, contextBuilder, mysqlConfiguration, authDeduceRelationMap, selectFreeEntities = [], updateFreeDict = {}, onVolatileTrigger) {
super(storageSchema, mysqlConfiguration);
this.executor = new TriggerExecutor_1.TriggerExecutor((scene) => contextBuilder(scene), undefined, onVolatileTrigger);
this.executor = new TriggerExecutor_1.TriggerExecutor(contextBuilder, undefined, onVolatileTrigger);
this.relationAuth = new RelationAuth_1.RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
}
async cascadeUpdateAsync(entity, operation, context, option) {

View File

@ -26,7 +26,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
*/
private trySynchronizeOpers;
private makeCreateOperTrigger;
constructor(config: SyncConfig<ED, Cxt>, schema: StorageSchema<ED>, contextBuilder: () => Promise<Cxt>);
constructor(config: SyncConfig<ED, Cxt>, schema: StorageSchema<ED>, contextBuilder: () => Cxt);
/**
* sync的定义 commit triggers
* @returns

View File

@ -30,7 +30,12 @@ class Synchronizer {
// todo 加密
const queue = channel.queue;
const opers = queue.map(ele => ele.oper);
console.log('向远端结点sync数据', api, JSON.stringify(opers));
if (process.env.NODE_ENV === 'development') {
console.log('向远端结点sync oper', JSON.stringify(opers.map(ele => ({
id: ele.id,
seq: ele.$$seq$$,
}))), 'txnId:', context.getCurrentTxnId());
}
const finalApi = (0, path_1.join)(api, selfEncryptInfo.id);
const res = await fetch(finalApi, {
method: 'post',
@ -61,6 +66,9 @@ class Synchronizer {
/**
* 返回结构见this.getSelfEndpoint
*/
if (process.env.NODE_ENV === 'development') {
console.log('同步oper返回结果', JSON.stringify(json), 'txnId:', context.getCurrentTxnId());
}
const { successIds, failed, redundantIds } = json;
if (failed) {
const { id, error } = failed;
@ -112,7 +120,7 @@ class Synchronizer {
if (channel.queue.length > 0) {
// 最大延迟redo时间512秒
const retryDelay = Math.pow(2, Math.min(9, retry)) * 1000;
console.error(`${channel.queue.length}个oper同步失败将于${retryDelay}毫秒后重试`);
console.error(`${channel.queue.length}个oper同步失败id是「${channel.queue.map(ele => ele.oper.id).join(',')}」,将于${retryDelay}毫秒后重试`);
return new Promise((resolve) => {
setTimeout(async () => {
await this.startChannel(context, channel, retry + 1);
@ -149,6 +157,9 @@ class Synchronizer {
(0, assert_1.default)(channel.api === (0, path_1.join)(url, 'endpoint', endpoint));
(0, assert_1.default)(channel.entity === remoteEntity);
(0, assert_1.default)(channel.entityId === remoteEntityId);
if (channel.queue.find(ele => ele.oper.id === oper.id)) {
console.error('aaaaa');
}
channel.queue.push({
oper,
onSynchronized,
@ -235,8 +246,11 @@ class Synchronizer {
* @param context
*/
async trySynchronizeOpers() {
const context = await this.contextBuilder();
const context = this.contextBuilder();
await context.begin();
// 暂时全用root身份去执行未来不一定对)
await context.initialize();
context.openRootMode();
try {
let dirtyOpers = await context.select('oper', {
data: {
@ -270,6 +284,9 @@ class Synchronizer {
}, { dontCollect: true, forUpdate: true });
dirtyOpers = dirtyOpers.filter(ele => !!ele[types_1.TriggerDataAttribute]);
if (dirtyOpers.length > 0) {
for (const c in this.channelDict) {
(0, assert_1.default)(this.channelDict[c].queue.length === 0);
}
const pushedIds = [];
const unpushedIds = [];
await Promise.all(dirtyOpers.map(async (oper) => {
@ -465,6 +482,7 @@ class Synchronizer {
if (pullEntities) {
pullEntities.forEach((def) => pullEntityDict[def.entity] = def);
}
const closeFn = context.openRootMode();
this.remotePullInfoMap[entity][entityId] = {
pullInfo: await getPullInfo(context, {
selfId: meEntityId,
@ -472,6 +490,7 @@ class Synchronizer {
}),
pullEntityDict,
};
closeFn();
}
const { pullInfo, pullEntityDict } = this.remotePullInfoMap[entity][entityId];
const { userId, algorithm, publicKey, cxtInfo } = pullInfo;

View File

@ -12,8 +12,7 @@ import { Namespace } from 'socket.io';
export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Context extends BackendRuntimeContext<ED>> {
private ns;
private nsServer?;
private contextBuilder;
constructor(ns: Namespace, contextBuilder: (scene?: string) => Promise<Context>, nsServer?: Namespace);
constructor(ns: Namespace, nsServer?: Namespace);
/**
* socket连接
*/

View File

@ -12,11 +12,9 @@ const console_1 = require("console");
class DataSubscriber {
ns;
nsServer;
contextBuilder;
constructor(ns, contextBuilder, nsServer) {
constructor(ns, nsServer) {
this.ns = ns;
this.nsServer = nsServer;
this.contextBuilder = contextBuilder;
this.startup();
}
/**

View File

@ -19,6 +19,7 @@ import { Server as SocketIoServer, Namespace } from 'socket.io';
import DataSubscriber from './cluster/DataSubscriber';
import { getClusterInfo } from './cluster/env';
import Synchronizer from './Synchronizer';
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends GeneralAppLoader<ED, Cxt> {
@ -27,7 +28,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
private externalDependencies: string[];
protected dataSubscriber?: DataSubscriber<ED, Cxt>;
protected synchronizer?: Synchronizer<ED, Cxt>;
protected contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>;
protected contextBuilder: (store: DbStore<ED, Cxt>) => Cxt;
private requireSth(filePath: string): any {
const depFilePath = join(this.path, filePath);
@ -112,8 +113,11 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
}
protected async makeContext(cxtStr?: string, headers?: IncomingHttpHeaders) {
const context = await this.contextBuilder(cxtStr)(this.dbStore);
context.clusterInfo = getClusterInfo();
const context = this.contextBuilder(this.dbStore);
await context.begin();
if (cxtStr) {
await context.initialize(JSON.parse(cxtStr));
}
context.headers = headers;
return context;
@ -134,21 +138,23 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
};
}
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace, nsServer?: Namespace) {
constructor(path: string, ns?: Namespace, nsServer?: Namespace) {
super(path);
const { dbConfig } = this.getConfiguration();
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
const { authDeduceRelationMap, selectFreeEntities, updateFreeDict } = require(`${path}/lib/config/relation`)
this.externalDependencies = require(OAK_EXTERNAL_LIBS_FILEPATH(join(path, 'lib')));
this.aspectDict = Object.assign({}, generalAspectDict, this.requireSth('lib/aspects/index'));
this.dbStore = new DbStore<ED, Cxt>(storageSchema, (cxtStr) => this.makeContext(cxtStr), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
this.dbStore = new DbStore<ED, Cxt>(storageSchema, () => this.contextBuilder(this.dbStore), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
if (ns) {
this.dataSubscriber = new DataSubscriber(ns, (scene) => this.contextBuilder(scene)(this.dbStore), nsServer);
this.dataSubscriber = new DataSubscriber(ns, nsServer);
}
this.contextBuilder = (scene) => async (store) => {
const context = await contextBuilder(scene)(store);
const { BackendRuntimeContext } = require(`${path}/lib/context/BackendRuntimeContext`);
this.contextBuilder = (store) => {
const context = new BackendRuntimeContext(store);
context.clusterInfo = getClusterInfo();
const originCommit = context.commit;
context.commit = async () => {
const { eventOperationMap, opRecords } = context;
@ -216,7 +222,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
const { syncConfig: syncConfig } = this.getConfiguration();
if (syncConfig) {
this.synchronizer = new Synchronizer(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder()(this.dbStore));
this.synchronizer = new Synchronizer(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore));
}
this.initTriggers();
@ -243,7 +249,6 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
if (!fn) {
throw new Error(`不存在的接口名称: ${name}`);
}
await context.begin();
try {
const result = await fn(params, context);
await context.refineOpRecords();
@ -262,18 +267,18 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
}
}
async initialize(dropIfExists?: boolean) {
await this.dbStore.initialize(dropIfExists);
async initialize(truncate?: boolean) {
await this.dbStore.initialize({ ifExists: 'dropIfNotStatic' });
const data = this.requireSth('lib/data/index');
const context = await this.contextBuilder()(this.dbStore);
const context = this.contextBuilder(this.dbStore);
for (const entity in data) {
let rows = data[entity];
if (entity === 'area') {
// 对area暂时处理一下
rows = require('./data/area.json');
}
if (rows.length > 0) {
if (rows.length > 0 && (!truncate || !this.dbStore.getSchema()[entity].static)) {
await context.begin();
try {
await this.dbStore.operate(entity as keyof ED, {
@ -322,7 +327,6 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
[name, method, url, async (params, headers, req, body) => {
const context = await this.makeContext(undefined, headers);
await context.begin();
try {
const result = await fn(context, params, headers, req, body);
await context.commit();
@ -370,7 +374,6 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
protected async execWatcher(watcher: Watcher<ED, keyof ED, Cxt>) {
const context = await this.makeContext();
await context.begin();
let result: OperationResult<ED> | undefined;
try {
if (watcher.hasOwnProperty('actionData')) {
@ -464,7 +467,6 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
}
else {
const context = await this.makeContext();
await context.begin();
try {
const { timer: timerFn } = timer as FreeTimer<ED, Cxt>;
const result = await timerFn(context);
@ -503,7 +505,6 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
const context = await this.makeContext();
const start = Date.now();
await context.begin();
try {
const result = await routineFn(context);
console.log(`例程【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是【${result}`);
@ -518,7 +519,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
}
}
async execRoutine(routine: (context: Cxt) => Promise<void>) {
async execRoutine(routine: <Cxt extends AsyncContext<ED>>(context: Cxt) => Promise<void>) {
const context = await this.makeContext();
await routine(context);

View File

@ -1,7 +1,7 @@
import { groupBy } from 'oak-domain/lib/utils/lodash';
import { combineFilters } from 'oak-domain/lib/store/filter';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { EntityDict, OperationResult, VolatileTrigger, Trigger, OperateOption } from 'oak-domain/lib/types';
import { EntityDict, OperationResult, VolatileTrigger, Trigger, OperateOption, OakMakeSureByMySelfException } from 'oak-domain/lib/types';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { getClusterInfo } from './cluster/env';
@ -35,7 +35,7 @@ export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extend
});
this.socket.on('data', async (entity: keyof ED, name: string, ids: string[], cxtStr: string, option: OperateOption) => {
const context = await this.makeContext(cxtStr);
await context.begin();
// await context.begin();
try {
await this.dbStore.execVolatileTrigger(entity, name, ids, context, option);
await context.commit();
@ -61,20 +61,22 @@ export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extend
}
}
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, nsDs: Namespace, nsServer: Namespace, socketPath: string) {
super(path, contextBuilder, nsDs, nsServer);
constructor(path: string, nsDs: Namespace, nsServer: Namespace, socketPath: string) {
super(path, nsDs, nsServer);
this.dbStore.setOnVolatileTrigger(
async (entity, trigger, ids, cxtStr, option) => {
const execLocal = async (ids2: string[]) => {
const context = await this.makeContext(cxtStr);
await context.begin();
// await context.begin();
try {
await this.dbStore.execVolatileTrigger(entity, trigger.name, ids2, context, option);
await context.commit();
}
catch (err) {
await context.rollback();
console.error('execVolatileTrigger异常', entity, trigger.name, ids2, option, err);
if (!(err instanceof OakMakeSureByMySelfException)) {
console.error('execVolatileTrigger异常', entity, trigger.name, ids2, option, err);
}
}
};
if (trigger.cs) {

View File

@ -4,7 +4,7 @@ import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { TriggerExecutor } from 'oak-domain/lib/store/TriggerExecutor';
import { MySQLConfiguration, } from 'oak-db/lib/MySQL/types/Configuration';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { AsyncContext, AsyncRowStore } from 'oak-domain/lib/store/AsyncRowStore';
import { AsyncRowStore, AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
import { RelationAuth } from 'oak-domain/lib/store/RelationAuth';
@ -14,14 +14,14 @@ export class DbStore<ED extends EntityDict & BaseEntityDict, Cxt extends Backend
constructor(
storageSchema: StorageSchema<ED>,
contextBuilder: (scene?: string) => Promise<Cxt>,
contextBuilder: () => Cxt,
mysqlConfiguration: MySQLConfiguration,
authDeduceRelationMap: AuthDeduceRelationMap<ED>,
selectFreeEntities: SelectFreeEntities<ED> = [],
updateFreeDict: UpdateFreeDict<ED> = {},
onVolatileTrigger?: <T extends keyof ED>(entity: T, trigger: VolatileTrigger<ED, T, Cxt>, ids: string[], cxtStr: string, option: OperateOption) => Promise<void>) {
super(storageSchema, mysqlConfiguration);
this.executor = new TriggerExecutor((scene) => contextBuilder(scene), undefined, onVolatileTrigger);
this.executor = new TriggerExecutor(contextBuilder, undefined, onVolatileTrigger);
this.relationAuth = new RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
}

View File

@ -37,7 +37,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
pullEntityDict: Record<string, PullEntityDef<ED, keyof ED, Cxt>>;
}>> = {};
private channelDict: Record<string, Channel<ED, Cxt>> = {};
private contextBuilder: () => Promise<Cxt>;
private contextBuilder: () => Cxt;
private pushAccessMap: Record<string, Array<{
projection: ED[keyof ED]['Selection']['data']; // 从entity上取到相关user需要的projection
@ -74,7 +74,12 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
// todo 加密
const queue = channel.queue;
const opers = queue.map(ele => ele.oper);
console.log('向远端结点sync数据', api, JSON.stringify(opers));
if (process.env.NODE_ENV === 'development') {
console.log('向远端结点sync oper', JSON.stringify(opers.map(ele => ({
id: ele.id,
seq: ele.$$seq$$,
}))), 'txnId:', context.getCurrentTxnId());
}
const finalApi = join(api, selfEncryptInfo.id);
const res = await fetch(finalApi, {
method: 'post',
@ -109,6 +114,9 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
/**
* this.getSelfEndpoint
*/
if (process.env.NODE_ENV === 'development') {
console.log('同步oper返回结果', JSON.stringify(json), 'txnId:', context.getCurrentTxnId());
}
const { successIds, failed, redundantIds } = json!;
if (failed) {
const {
@ -172,7 +180,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
if (channel.queue.length > 0) {
// 最大延迟redo时间512秒
const retryDelay = Math.pow(2, Math.min(9, retry)) * 1000;
console.error(`${channel.queue.length}个oper同步失败将于${retryDelay}毫秒后重试`);
console.error(`${channel.queue.length}个oper同步失败id是「${channel.queue.map(ele => ele.oper.id).join(',')}」,将于${retryDelay}毫秒后重试`);
return new Promise(
(resolve) => {
@ -231,6 +239,9 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
assert(channel.entity === remoteEntity);
assert(channel.entityId === remoteEntityId);
if(channel.queue.find(ele => ele.oper.id === oper.id)) {
console.error('aaaaa');
}
channel.queue.push({
oper,
onSynchronized,
@ -336,9 +347,13 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
* @param context
*/
private async trySynchronizeOpers() {
const context = await this.contextBuilder();
const context = this.contextBuilder();
await context.begin();
// 暂时全用root身份去执行未来不一定对)
await context.initialize();
context.openRootMode();
try {
let dirtyOpers = await context.select('oper', {
data: {
@ -376,6 +391,9 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
ele => !!(ele as any)[TriggerDataAttribute]
);
if (dirtyOpers.length > 0) {
for (const c in this.channelDict) {
assert(this.channelDict[c].queue.length === 0);
}
const pushedIds = [] as string[];
const unpushedIds = [] as string[];
await Promise.all(
@ -561,7 +579,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
constructor(config: SyncConfig<ED, Cxt>, schema: StorageSchema<ED>, contextBuilder: () => Promise<Cxt>) {
constructor(config: SyncConfig<ED, Cxt>, schema: StorageSchema<ED>, contextBuilder: () => Cxt) {
this.config = config;
this.schema = schema;
this.contextBuilder = contextBuilder;
@ -621,6 +639,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
(def) => pullEntityDict[def.entity as string] = def
);
}
const closeFn = context.openRootMode();
this.remotePullInfoMap[entity]![entityId] = {
pullInfo: await getPullInfo(context, {
selfId: meEntityId as string,
@ -628,6 +647,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
}),
pullEntityDict,
};
closeFn();
}
const { pullInfo, pullEntityDict } = this.remotePullInfoMap[entity][entityId]!;

View File

@ -16,12 +16,10 @@ import { assert } from 'console';
export default class DataSubscriber<ED extends EntityDict & BaseEntityDict, Context extends BackendRuntimeContext<ED>> {
private ns: Namespace;
private nsServer?: Namespace;
private contextBuilder: (scene?: string) => Promise<Context>;
constructor(ns: Namespace, contextBuilder: (scene?: string) => Promise<Context>, nsServer?: Namespace) {
constructor(ns: Namespace, nsServer?: Namespace) {
this.ns = ns;
this.nsServer = nsServer;
this.contextBuilder = contextBuilder;
this.startup();
}