From 75e687c019f8a74854a88d053d96cf08d9462dfc Mon Sep 17 00:00:00 2001 From: "Xc@centOs" Date: Fri, 29 Mar 2024 18:26:45 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BA=86synchronizer,?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=BA=86backendContext=E7=9A=84=E7=94=9F?= =?UTF-8?q?=E6=88=90=E5=87=BD=E6=95=B0=E6=B3=A8=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/AppLoader.d.ts | 9 ++++---- lib/AppLoader.js | 34 ++++++++++++++-------------- lib/ClusterAppLoader.d.ts | 3 +-- lib/ClusterAppLoader.js | 13 ++++++----- lib/DbStore.d.ts | 4 ++-- lib/DbStore.js | 2 +- lib/Synchronizer.d.ts | 2 +- lib/Synchronizer.js | 25 ++++++++++++++++++--- lib/cluster/DataSubscriber.d.ts | 3 +-- lib/cluster/DataSubscriber.js | 4 +--- src/AppLoader.ts | 39 +++++++++++++++++---------------- src/ClusterAppLoader.ts | 14 +++++++----- src/DbStore.ts | 6 ++--- src/Synchronizer.ts | 30 ++++++++++++++++++++----- src/cluster/DataSubscriber.ts | 4 +--- 15 files changed, 116 insertions(+), 76 deletions(-) diff --git a/lib/AppLoader.d.ts b/lib/AppLoader.d.ts index fa6c37a..ce92163 100644 --- a/lib/AppLoader.d.ts +++ b/lib/AppLoader.d.ts @@ -7,20 +7,21 @@ import { IncomingHttpHeaders, IncomingMessage } from 'http'; import { Namespace } from 'socket.io'; import DataSubscriber from './cluster/DataSubscriber'; import Synchronizer from './Synchronizer'; +import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore'; export declare class AppLoader> extends GeneralAppLoader { protected dbStore: DbStore; private aspectDict; private externalDependencies; protected dataSubscriber?: DataSubscriber; protected synchronizer?: Synchronizer; - protected contextBuilder: (scene?: string) => (store: DbStore) => Promise; + protected contextBuilder: (store: DbStore) => Cxt; private requireSth; protected makeContext(cxtStr?: string, headers?: IncomingHttpHeaders): Promise; /** * 后台启动的configuration,统一放在这里读取 */ private getConfiguration; - constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore) => Promise, ns?: Namespace, nsServer?: Namespace); + constructor(path: string, ns?: Namespace, nsServer?: Namespace); protected registerTrigger(trigger: Trigger): void; initTriggers(): void; mount(initialize?: true): Promise; @@ -30,7 +31,7 @@ export declare class AppLoader; - initialize(dropIfExists?: boolean): Promise; + initialize(truncate?: boolean): Promise; getStore(): DbStore; getEndpoints(prefix: string): [string, "post" | "get" | "put" | "delete", string, (params: Record, headers: IncomingHttpHeaders, req: IncomingMessage, body?: any) => Promise][]; protected operateInWatcher(entity: T, operation: ED[T]['Update'], context: Cxt): Promise>; @@ -39,5 +40,5 @@ export declare class AppLoader; - execRoutine(routine: (context: Cxt) => Promise): Promise; + execRoutine(routine: >(context: Cxt) => Promise): Promise; } diff --git a/lib/AppLoader.js b/lib/AppLoader.js index 01dde8b..16f9019 100644 --- a/lib/AppLoader.js +++ b/lib/AppLoader.js @@ -89,8 +89,11 @@ class AppLoader extends types_1.AppLoader { return sthOut; } async makeContext(cxtStr, headers) { - const context = await this.contextBuilder(cxtStr)(this.dbStore); - context.clusterInfo = (0, env_2.getClusterInfo)(); + const context = this.contextBuilder(this.dbStore); + await context.begin(); + if (cxtStr) { + await context.initialize(JSON.parse(cxtStr)); + } context.headers = headers; return context; } @@ -107,19 +110,21 @@ class AppLoader extends types_1.AppLoader { syncConfig: syncConfigs, }; } - constructor(path, contextBuilder, ns, nsServer) { + constructor(path, ns, nsServer) { super(path); const { dbConfig } = this.getConfiguration(); const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`); const { authDeduceRelationMap, selectFreeEntities, updateFreeDict } = require(`${path}/lib/config/relation`); this.externalDependencies = require((0, env_1.OAK_EXTERNAL_LIBS_FILEPATH)((0, path_1.join)(path, 'lib'))); this.aspectDict = Object.assign({}, index_1.default, this.requireSth('lib/aspects/index')); - this.dbStore = new DbStore_1.DbStore(storageSchema, (cxtStr) => this.makeContext(cxtStr), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict); + this.dbStore = new DbStore_1.DbStore(storageSchema, () => this.contextBuilder(this.dbStore), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict); if (ns) { - this.dataSubscriber = new DataSubscriber_1.default(ns, (scene) => this.contextBuilder(scene)(this.dbStore), nsServer); + this.dataSubscriber = new DataSubscriber_1.default(ns, nsServer); } - this.contextBuilder = (scene) => async (store) => { - const context = await contextBuilder(scene)(store); + const { BackendRuntimeContext } = require(`${path}/lib/context/BackendRuntimeContext`); + this.contextBuilder = (store) => { + const context = new BackendRuntimeContext(store); + context.clusterInfo = (0, env_2.getClusterInfo)(); const originCommit = context.commit; context.commit = async () => { const { eventOperationMap, opRecords } = context; @@ -160,7 +165,7 @@ class AppLoader extends types_1.AppLoader { if (!initialize) { const { syncConfig: syncConfig } = this.getConfiguration(); if (syncConfig) { - this.synchronizer = new Synchronizer_1.default(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder()(this.dbStore)); + this.synchronizer = new Synchronizer_1.default(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore)); } this.initTriggers(); } @@ -179,7 +184,6 @@ class AppLoader extends types_1.AppLoader { if (!fn) { throw new Error(`不存在的接口名称: ${name}`); } - await context.begin(); try { const result = await fn(params, context); await context.refineOpRecords(); @@ -197,17 +201,17 @@ class AppLoader extends types_1.AppLoader { throw err; } } - async initialize(dropIfExists) { - await this.dbStore.initialize(dropIfExists); + async initialize(truncate) { + await this.dbStore.initialize({ ifExists: 'dropIfNotStatic' }); const data = this.requireSth('lib/data/index'); - const context = await this.contextBuilder()(this.dbStore); + const context = this.contextBuilder(this.dbStore); for (const entity in data) { let rows = data[entity]; if (entity === 'area') { // 对area暂时处理一下 rows = require('./data/area.json'); } - if (rows.length > 0) { + if (rows.length > 0 && (!truncate || !this.dbStore.getSchema()[entity].static)) { await context.begin(); try { await this.dbStore.operate(entity, { @@ -251,7 +255,6 @@ class AppLoader extends types_1.AppLoader { } endPointRouters.push([name, method, url, async (params, headers, req, body) => { const context = await this.makeContext(undefined, headers); - await context.begin(); try { const result = await fn(context, params, headers, req, body); await context.commit(); @@ -292,7 +295,6 @@ class AppLoader extends types_1.AppLoader { } async execWatcher(watcher) { const context = await this.makeContext(); - await context.begin(); let result; try { if (watcher.hasOwnProperty('actionData')) { @@ -378,7 +380,6 @@ class AppLoader extends types_1.AppLoader { } else { const context = await this.makeContext(); - await context.begin(); try { const { timer: timerFn } = timer; const result = await timerFn(context); @@ -415,7 +416,6 @@ class AppLoader extends types_1.AppLoader { const { name, routine: routineFn } = routine; const context = await this.makeContext(); const start = Date.now(); - await context.begin(); try { const result = await routineFn(context); console.log(`例程【${name}】执行成功,耗时${Date.now() - start}毫秒,结果是【${result}】`); diff --git a/lib/ClusterAppLoader.d.ts b/lib/ClusterAppLoader.d.ts index 6393a5d..b6adefd 100644 --- a/lib/ClusterAppLoader.d.ts +++ b/lib/ClusterAppLoader.d.ts @@ -2,7 +2,6 @@ import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { EntityDict, OperationResult, Trigger } from 'oak-domain/lib/types'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { AppLoader } from './AppLoader'; -import { DbStore } from './DbStore'; import { Namespace } from 'socket.io'; import { Socket } from 'socket.io-client'; export declare class ClusterAppLoader> extends AppLoader { @@ -10,7 +9,7 @@ export declare class ClusterAppLoader (store: DbStore) => Promise, nsDs: Namespace, nsServer: Namespace, socketPath: string); + constructor(path: string, nsDs: Namespace, nsServer: Namespace, socketPath: string); protected registerTrigger(trigger: Trigger): void; protected operateInWatcher(entity: T, operation: ED[T]['Update'], context: Cxt): Promise>; protected selectInWatcher(entity: T, selection: ED[T]['Selection'], context: Cxt): Promise[]>; diff --git a/lib/ClusterAppLoader.js b/lib/ClusterAppLoader.js index 7905e3d..c1f1a3f 100644 --- a/lib/ClusterAppLoader.js +++ b/lib/ClusterAppLoader.js @@ -4,6 +4,7 @@ exports.ClusterAppLoader = void 0; const tslib_1 = require("tslib"); const lodash_1 = require("oak-domain/lib/utils/lodash"); const filter_1 = require("oak-domain/lib/store/filter"); +const types_1 = require("oak-domain/lib/types"); const env_1 = require("./cluster/env"); const AppLoader_1 = require("./AppLoader"); const assert_1 = tslib_1.__importDefault(require("assert")); @@ -27,7 +28,7 @@ class ClusterAppLoader extends AppLoader_1.AppLoader { }); this.socket.on('data', async (entity, name, ids, cxtStr, option) => { const context = await this.makeContext(cxtStr); - await context.begin(); + // await context.begin(); try { await this.dbStore.execVolatileTrigger(entity, name, ids, context, option); await context.commit(); @@ -50,19 +51,21 @@ class ClusterAppLoader extends AppLoader_1.AppLoader { this.socket.connect(); } } - constructor(path, contextBuilder, nsDs, nsServer, socketPath) { - super(path, contextBuilder, nsDs, nsServer); + constructor(path, nsDs, nsServer, socketPath) { + super(path, nsDs, nsServer); this.dbStore.setOnVolatileTrigger(async (entity, trigger, ids, cxtStr, option) => { const execLocal = async (ids2) => { const context = await this.makeContext(cxtStr); - await context.begin(); + // await context.begin(); try { await this.dbStore.execVolatileTrigger(entity, trigger.name, ids2, context, option); await context.commit(); } catch (err) { await context.rollback(); - console.error('execVolatileTrigger异常', entity, trigger.name, ids2, option, err); + if (!(err instanceof types_1.OakMakeSureByMySelfException)) { + console.error('execVolatileTrigger异常', entity, trigger.name, ids2, option, err); + } } }; if (trigger.cs) { diff --git a/lib/DbStore.d.ts b/lib/DbStore.d.ts index ec2ab1b..631259d 100644 --- a/lib/DbStore.d.ts +++ b/lib/DbStore.d.ts @@ -3,11 +3,11 @@ import { EntityDict, StorageSchema, Trigger, Checker, SelectOption, SelectFreeEn import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { MySQLConfiguration } from 'oak-db/lib/MySQL/types/Configuration'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; -import { AsyncContext, AsyncRowStore } from 'oak-domain/lib/store/AsyncRowStore'; +import { AsyncRowStore, AsyncContext } from 'oak-domain/lib/store/AsyncRowStore'; export declare class DbStore> extends MysqlStore implements AsyncRowStore { private executor; private relationAuth; - constructor(storageSchema: StorageSchema, contextBuilder: (scene?: string) => Promise, mysqlConfiguration: MySQLConfiguration, authDeduceRelationMap: AuthDeduceRelationMap, selectFreeEntities?: SelectFreeEntities, updateFreeDict?: UpdateFreeDict, onVolatileTrigger?: (entity: T, trigger: VolatileTrigger, ids: string[], cxtStr: string, option: OperateOption) => Promise); + constructor(storageSchema: StorageSchema, contextBuilder: () => Cxt, mysqlConfiguration: MySQLConfiguration, authDeduceRelationMap: AuthDeduceRelationMap, selectFreeEntities?: SelectFreeEntities, updateFreeDict?: UpdateFreeDict, onVolatileTrigger?: (entity: T, trigger: VolatileTrigger, ids: string[], cxtStr: string, option: OperateOption) => Promise); protected cascadeUpdateAsync(entity: T, operation: ED[T]['Operation'], context: AsyncContext, option: MysqlOperateOption): Promise>; operate(entity: T, operation: ED[T]['Operation'], context: Cxt, option: MysqlOperateOption): Promise>; select(entity: T, selection: ED[T]['Selection'], context: Cxt, option: MySqlSelectOption): Promise[]>; diff --git a/lib/DbStore.js b/lib/DbStore.js index 278c731..91f8b3f 100644 --- a/lib/DbStore.js +++ b/lib/DbStore.js @@ -9,7 +9,7 @@ class DbStore extends oak_db_1.MysqlStore { relationAuth; constructor(storageSchema, contextBuilder, mysqlConfiguration, authDeduceRelationMap, selectFreeEntities = [], updateFreeDict = {}, onVolatileTrigger) { super(storageSchema, mysqlConfiguration); - this.executor = new TriggerExecutor_1.TriggerExecutor((scene) => contextBuilder(scene), undefined, onVolatileTrigger); + this.executor = new TriggerExecutor_1.TriggerExecutor(contextBuilder, undefined, onVolatileTrigger); this.relationAuth = new RelationAuth_1.RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict); } async cascadeUpdateAsync(entity, operation, context, option) { diff --git a/lib/Synchronizer.d.ts b/lib/Synchronizer.d.ts index 4472165..d97738b 100644 --- a/lib/Synchronizer.d.ts +++ b/lib/Synchronizer.d.ts @@ -26,7 +26,7 @@ export default class Synchronizer, schema: StorageSchema, contextBuilder: () => Promise); + constructor(config: SyncConfig, schema: StorageSchema, contextBuilder: () => Cxt); /** * 根据sync的定义,生成对应的 commit triggers * @returns diff --git a/lib/Synchronizer.js b/lib/Synchronizer.js index 90ab93e..565a5b3 100644 --- a/lib/Synchronizer.js +++ b/lib/Synchronizer.js @@ -30,7 +30,12 @@ class Synchronizer { // todo 加密 const queue = channel.queue; const opers = queue.map(ele => ele.oper); - console.log('向远端结点sync数据', api, JSON.stringify(opers)); + if (process.env.NODE_ENV === 'development') { + console.log('向远端结点sync oper', JSON.stringify(opers.map(ele => ({ + id: ele.id, + seq: ele.$$seq$$, + }))), 'txnId:', context.getCurrentTxnId()); + } const finalApi = (0, path_1.join)(api, selfEncryptInfo.id); const res = await fetch(finalApi, { method: 'post', @@ -61,6 +66,9 @@ class Synchronizer { /** * 返回结构见this.getSelfEndpoint */ + if (process.env.NODE_ENV === 'development') { + console.log('同步oper返回结果', JSON.stringify(json), 'txnId:', context.getCurrentTxnId()); + } const { successIds, failed, redundantIds } = json; if (failed) { const { id, error } = failed; @@ -112,7 +120,7 @@ class Synchronizer { if (channel.queue.length > 0) { // 最大延迟redo时间512秒 const retryDelay = Math.pow(2, Math.min(9, retry)) * 1000; - console.error(`有${channel.queue.length}个oper同步失败,将于${retryDelay}毫秒后重试`); + console.error(`有${channel.queue.length}个oper同步失败,id是「${channel.queue.map(ele => ele.oper.id).join(',')}」,将于${retryDelay}毫秒后重试`); return new Promise((resolve) => { setTimeout(async () => { await this.startChannel(context, channel, retry + 1); @@ -149,6 +157,9 @@ class Synchronizer { (0, assert_1.default)(channel.api === (0, path_1.join)(url, 'endpoint', endpoint)); (0, assert_1.default)(channel.entity === remoteEntity); (0, assert_1.default)(channel.entityId === remoteEntityId); + if (channel.queue.find(ele => ele.oper.id === oper.id)) { + console.error('aaaaa'); + } channel.queue.push({ oper, onSynchronized, @@ -235,8 +246,11 @@ class Synchronizer { * @param context */ async trySynchronizeOpers() { - const context = await this.contextBuilder(); + const context = this.contextBuilder(); await context.begin(); + // 暂时全用root身份去执行(未来不一定对) + await context.initialize(); + context.openRootMode(); try { let dirtyOpers = await context.select('oper', { data: { @@ -270,6 +284,9 @@ class Synchronizer { }, { dontCollect: true, forUpdate: true }); dirtyOpers = dirtyOpers.filter(ele => !!ele[types_1.TriggerDataAttribute]); if (dirtyOpers.length > 0) { + for (const c in this.channelDict) { + (0, assert_1.default)(this.channelDict[c].queue.length === 0); + } const pushedIds = []; const unpushedIds = []; await Promise.all(dirtyOpers.map(async (oper) => { @@ -465,6 +482,7 @@ class Synchronizer { if (pullEntities) { pullEntities.forEach((def) => pullEntityDict[def.entity] = def); } + const closeFn = context.openRootMode(); this.remotePullInfoMap[entity][entityId] = { pullInfo: await getPullInfo(context, { selfId: meEntityId, @@ -472,6 +490,7 @@ class Synchronizer { }), pullEntityDict, }; + closeFn(); } const { pullInfo, pullEntityDict } = this.remotePullInfoMap[entity][entityId]; const { userId, algorithm, publicKey, cxtInfo } = pullInfo; diff --git a/lib/cluster/DataSubscriber.d.ts b/lib/cluster/DataSubscriber.d.ts index 20a736d..84e06e3 100644 --- a/lib/cluster/DataSubscriber.d.ts +++ b/lib/cluster/DataSubscriber.d.ts @@ -12,8 +12,7 @@ import { Namespace } from 'socket.io'; export default class DataSubscriber> { private ns; private nsServer?; - private contextBuilder; - constructor(ns: Namespace, contextBuilder: (scene?: string) => Promise, nsServer?: Namespace); + constructor(ns: Namespace, nsServer?: Namespace); /** * 来自外部的socket连接,监听数据变化 */ diff --git a/lib/cluster/DataSubscriber.js b/lib/cluster/DataSubscriber.js index 6f7d14e..93bc213 100644 --- a/lib/cluster/DataSubscriber.js +++ b/lib/cluster/DataSubscriber.js @@ -12,11 +12,9 @@ const console_1 = require("console"); class DataSubscriber { ns; nsServer; - contextBuilder; - constructor(ns, contextBuilder, nsServer) { + constructor(ns, nsServer) { this.ns = ns; this.nsServer = nsServer; - this.contextBuilder = contextBuilder; this.startup(); } /** diff --git a/src/AppLoader.ts b/src/AppLoader.ts index f76b572..9b6f985 100644 --- a/src/AppLoader.ts +++ b/src/AppLoader.ts @@ -19,6 +19,7 @@ import { Server as SocketIoServer, Namespace } from 'socket.io'; import DataSubscriber from './cluster/DataSubscriber'; import { getClusterInfo } from './cluster/env'; import Synchronizer from './Synchronizer'; +import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore'; export class AppLoader> extends GeneralAppLoader { @@ -27,7 +28,7 @@ export class AppLoader; protected synchronizer?: Synchronizer; - protected contextBuilder: (scene?: string) => (store: DbStore) => Promise; + protected contextBuilder: (store: DbStore) => Cxt; private requireSth(filePath: string): any { const depFilePath = join(this.path, filePath); @@ -112,8 +113,11 @@ export class AppLoader (store: DbStore) => Promise, ns?: Namespace, nsServer?: Namespace) { + constructor(path: string, ns?: Namespace, nsServer?: Namespace) { super(path); const { dbConfig } = this.getConfiguration(); const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`); const { authDeduceRelationMap, selectFreeEntities, updateFreeDict } = require(`${path}/lib/config/relation`) this.externalDependencies = require(OAK_EXTERNAL_LIBS_FILEPATH(join(path, 'lib'))); this.aspectDict = Object.assign({}, generalAspectDict, this.requireSth('lib/aspects/index')); - this.dbStore = new DbStore(storageSchema, (cxtStr) => this.makeContext(cxtStr), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict); + this.dbStore = new DbStore(storageSchema, () => this.contextBuilder(this.dbStore), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict); if (ns) { - this.dataSubscriber = new DataSubscriber(ns, (scene) => this.contextBuilder(scene)(this.dbStore), nsServer); + this.dataSubscriber = new DataSubscriber(ns, nsServer); } - this.contextBuilder = (scene) => async (store) => { - const context = await contextBuilder(scene)(store); + const { BackendRuntimeContext } = require(`${path}/lib/context/BackendRuntimeContext`); + this.contextBuilder = (store) => { + const context = new BackendRuntimeContext(store); + context.clusterInfo = getClusterInfo(); const originCommit = context.commit; context.commit = async () => { const { eventOperationMap, opRecords } = context; @@ -216,7 +222,7 @@ export class AppLoader this.contextBuilder()(this.dbStore)); + this.synchronizer = new Synchronizer(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore)); } this.initTriggers(); @@ -243,7 +249,6 @@ export class AppLoader 0) { + if (rows.length > 0 && (!truncate || !this.dbStore.getSchema()[entity].static)) { await context.begin(); try { await this.dbStore.operate(entity as keyof ED, { @@ -322,7 +327,6 @@ export class AppLoader { const context = await this.makeContext(undefined, headers); - await context.begin(); try { const result = await fn(context, params, headers, req, body); await context.commit(); @@ -370,7 +374,6 @@ export class AppLoader) { const context = await this.makeContext(); - await context.begin(); let result: OperationResult | undefined; try { if (watcher.hasOwnProperty('actionData')) { @@ -464,7 +467,6 @@ export class AppLoader; const result = await timerFn(context); @@ -503,7 +505,6 @@ export class AppLoader Promise) { + async execRoutine(routine: >(context: Cxt) => Promise) { const context = await this.makeContext(); await routine(context); diff --git a/src/ClusterAppLoader.ts b/src/ClusterAppLoader.ts index 799c4ba..4bdc153 100644 --- a/src/ClusterAppLoader.ts +++ b/src/ClusterAppLoader.ts @@ -1,7 +1,7 @@ import { groupBy } from 'oak-domain/lib/utils/lodash'; import { combineFilters } from 'oak-domain/lib/store/filter'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; -import { EntityDict, OperationResult, VolatileTrigger, Trigger, OperateOption } from 'oak-domain/lib/types'; +import { EntityDict, OperationResult, VolatileTrigger, Trigger, OperateOption, OakMakeSureByMySelfException } from 'oak-domain/lib/types'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; import { getClusterInfo } from './cluster/env'; @@ -35,7 +35,7 @@ export class ClusterAppLoader { const context = await this.makeContext(cxtStr); - await context.begin(); + // await context.begin(); try { await this.dbStore.execVolatileTrigger(entity, name, ids, context, option); await context.commit(); @@ -61,20 +61,22 @@ export class ClusterAppLoader (store: DbStore) => Promise, nsDs: Namespace, nsServer: Namespace, socketPath: string) { - super(path, contextBuilder, nsDs, nsServer); + constructor(path: string, nsDs: Namespace, nsServer: Namespace, socketPath: string) { + super(path, nsDs, nsServer); this.dbStore.setOnVolatileTrigger( async (entity, trigger, ids, cxtStr, option) => { const execLocal = async (ids2: string[]) => { const context = await this.makeContext(cxtStr); - await context.begin(); + // await context.begin(); try { await this.dbStore.execVolatileTrigger(entity, trigger.name, ids2, context, option); await context.commit(); } catch (err) { await context.rollback(); - console.error('execVolatileTrigger异常', entity, trigger.name, ids2, option, err); + if (!(err instanceof OakMakeSureByMySelfException)) { + console.error('execVolatileTrigger异常', entity, trigger.name, ids2, option, err); + } } }; if (trigger.cs) { diff --git a/src/DbStore.ts b/src/DbStore.ts index 03f150d..b369577 100644 --- a/src/DbStore.ts +++ b/src/DbStore.ts @@ -4,7 +4,7 @@ import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { TriggerExecutor } from 'oak-domain/lib/store/TriggerExecutor'; import { MySQLConfiguration, } from 'oak-db/lib/MySQL/types/Configuration'; import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext'; -import { AsyncContext, AsyncRowStore } from 'oak-domain/lib/store/AsyncRowStore'; +import { AsyncRowStore, AsyncContext } from 'oak-domain/lib/store/AsyncRowStore'; import { RelationAuth } from 'oak-domain/lib/store/RelationAuth'; @@ -14,14 +14,14 @@ export class DbStore, - contextBuilder: (scene?: string) => Promise, + contextBuilder: () => Cxt, mysqlConfiguration: MySQLConfiguration, authDeduceRelationMap: AuthDeduceRelationMap, selectFreeEntities: SelectFreeEntities = [], updateFreeDict: UpdateFreeDict = {}, onVolatileTrigger?: (entity: T, trigger: VolatileTrigger, ids: string[], cxtStr: string, option: OperateOption) => Promise) { super(storageSchema, mysqlConfiguration); - this.executor = new TriggerExecutor((scene) => contextBuilder(scene), undefined, onVolatileTrigger); + this.executor = new TriggerExecutor(contextBuilder, undefined, onVolatileTrigger); this.relationAuth = new RelationAuth(storageSchema, authDeduceRelationMap, selectFreeEntities, updateFreeDict); } diff --git a/src/Synchronizer.ts b/src/Synchronizer.ts index 3e37be3..5923aa1 100644 --- a/src/Synchronizer.ts +++ b/src/Synchronizer.ts @@ -37,7 +37,7 @@ export default class Synchronizer>; }>> = {}; private channelDict: Record> = {}; - private contextBuilder: () => Promise; + private contextBuilder: () => Cxt; private pushAccessMap: Record ele.oper); - console.log('向远端结点sync数据', api, JSON.stringify(opers)); + if (process.env.NODE_ENV === 'development') { + console.log('向远端结点sync oper', JSON.stringify(opers.map(ele => ({ + id: ele.id, + seq: ele.$$seq$$, + }))), 'txnId:', context.getCurrentTxnId()); + } const finalApi = join(api, selfEncryptInfo.id); const res = await fetch(finalApi, { method: 'post', @@ -109,6 +114,9 @@ export default class Synchronizer 0) { // 最大延迟redo时间512秒 const retryDelay = Math.pow(2, Math.min(9, retry)) * 1000; - console.error(`有${channel.queue.length}个oper同步失败,将于${retryDelay}毫秒后重试`); + console.error(`有${channel.queue.length}个oper同步失败,id是「${channel.queue.map(ele => ele.oper.id).join(',')}」,将于${retryDelay}毫秒后重试`); return new Promise( (resolve) => { @@ -231,6 +239,9 @@ export default class Synchronizer ele.oper.id === oper.id)) { + console.error('aaaaa'); + } channel.queue.push({ oper, onSynchronized, @@ -336,9 +347,13 @@ export default class Synchronizer !!(ele as any)[TriggerDataAttribute] ); if (dirtyOpers.length > 0) { + for (const c in this.channelDict) { + assert(this.channelDict[c].queue.length === 0); + } const pushedIds = [] as string[]; const unpushedIds = [] as string[]; await Promise.all( @@ -561,7 +579,7 @@ export default class Synchronizer, schema: StorageSchema, contextBuilder: () => Promise) { + constructor(config: SyncConfig, schema: StorageSchema, contextBuilder: () => Cxt) { this.config = config; this.schema = schema; this.contextBuilder = contextBuilder; @@ -621,6 +639,7 @@ export default class Synchronizer pullEntityDict[def.entity as string] = def ); } + const closeFn = context.openRootMode(); this.remotePullInfoMap[entity]![entityId] = { pullInfo: await getPullInfo(context, { selfId: meEntityId as string, @@ -628,6 +647,7 @@ export default class Synchronizer> { private ns: Namespace; private nsServer?: Namespace; - private contextBuilder: (scene?: string) => Promise; - constructor(ns: Namespace, contextBuilder: (scene?: string) => Promise, nsServer?: Namespace) { + constructor(ns: Namespace, nsServer?: Namespace) { this.ns = ns; this.nsServer = nsServer; - this.contextBuilder = contextBuilder; this.startup(); }