diff --git a/lib/store/TriggerExecutor.d.ts b/lib/store/TriggerExecutor.d.ts index f29f2b9..12796b5 100644 --- a/lib/store/TriggerExecutor.d.ts +++ b/lib/store/TriggerExecutor.d.ts @@ -39,4 +39,12 @@ export declare class TriggerExecutor[]): Promise | void; checkpoint(timestamp: number): Promise; + /** + * 由外部来控制进行按volatileTrigger逐个进行checkpoint + * @param name + * @param timestamp + * @param instanceCount + * @param instanceId + */ + independentCheckPoint(name: string, timestamp: number, instanceCount?: number, instanceId?: number): Promise; } diff --git a/lib/store/TriggerExecutor.js b/lib/store/TriggerExecutor.js index 62106e3..c8d4c9f 100644 --- a/lib/store/TriggerExecutor.js +++ b/lib/store/TriggerExecutor.js @@ -49,8 +49,12 @@ class TriggerExecutor { await context.commit(); } catch (err) { - await context.rollback(); - if (!(err instanceof types_1.OakMakeSureByMySelfException)) { + if (!(err instanceof types_1.OakPartialSuccess)) { + await context.rollback(); + this.logger.error('error on volatile trigger', entity, trigger.name, ids.join(','), err); + } + else { + await context.commit(); this.logger.error('error on volatile trigger', entity, trigger.name, ids.join(','), err); } // throw err; @@ -363,6 +367,12 @@ class TriggerExecutor { filter: { id: { $in: ids, + }, + [Entity_1.TriggerDataAttribute]: { + $exists: true, + }, + [Entity_1.TriggerUuidAttribute]: { + $exists: true, } } }, { includedDeleted: true, blockTrigger: true }); @@ -473,10 +483,6 @@ class TriggerExecutor { async checkpoint(timestamp) { let result = 0; for (const entity of this.volatileEntities) { - if (entity === 'oper') { - // oper上的跨事务同步是系统synchronizer统一处理 - continue; - } const filter = { [Entity_1.TriggerUuidAttribute]: { $exists: true, @@ -486,12 +492,6 @@ class TriggerExecutor { } }; const context = this.contextBuilder(); - if (context.clusterInfo?.usingCluster) { - const { instanceCount, instanceId } = context.clusterInfo; - filter.$$seq$$ = { - $mod: [instanceCount, instanceId], - }; - } await context.begin(); try { const rows = await context.select(entity, { @@ -532,15 +532,108 @@ class TriggerExecutor { } } await context.commit(); + result += rows.length; } catch (err) { - await context.rollback(); - if (!(err instanceof types_1.OakMakeSureByMySelfException)) { - this.logger.error(`执行checkpoint时出错,对象是「${entity}」,异常是`, err); + if (!(err instanceof types_1.OakPartialSuccess)) { + await context.rollback(); + this.logger.error(`error in checkpoint on entity 「${entity}」`, err); + } + else { + await context.commit(); + this.logger.error(`error in checkpoint on entity 「${entity}」`, err); } } } return result; } + /** + * 由外部来控制进行按volatileTrigger逐个进行checkpoint + * @param name + * @param timestamp + * @param instanceCount + * @param instanceId + */ + async independentCheckPoint(name, timestamp, instanceCount, instanceId) { + const trigger = this.triggerNameMap[name]; + (0, assert_1.default)(trigger && trigger.when === 'commit'); + const { fn, entity, grouped } = trigger; + const filter = { + [Entity_1.TriggerDataAttribute]: { + name, + }, + [Entity_1.TriggerUuidAttribute]: { + $exists: true, + }, + [Entity_1.UpdateAtAttribute]: { + $lt: timestamp, + }, + }; + if (instanceCount) { + filter.$$seq$$ = { + $mod: [instanceCount, instanceId] + }; + } + const context = this.contextBuilder(); + await context.begin(); + try { + const rows = await context.select(entity, { + data: { + id: 1, + }, + filter, + }, { + includedDeleted: true, + dontCollect: true, + }); + if (rows.length > 0) { + // 要用id来再锁一次,不然会锁住filter的范围,影响并发性 + // by Xc 20240314,在haina-busi和haina-cn数据sync过程中发现这个问题 + const rows2 = await context.select(entity, { + data: { + id: 1, + [Entity_1.TriggerDataAttribute]: 1, + [Entity_1.TriggerUuidAttribute]: 1, + }, + filter: { + id: { + $in: rows.map(ele => ele.id), + }, + }, + }, { + includedDeleted: true, + dontCollect: true, + forUpdate: 'skip locked', // 如果加不上锁就下次再处理,或者有可能应用自己在处理 + }); + if (grouped) { + // grouped不需要上下文了吧,内部一定会用root + await this.execVolatileTrigger(entity, name, rows.map(ele => ele.id), context, {}); + } + else { + const groupedRowDict = (0, lodash_1.groupBy)(rows2, Entity_1.TriggerUuidAttribute); + for (const uuid in groupedRowDict) { + const rs = groupedRowDict[uuid]; + const { [Entity_1.TriggerDataAttribute]: triggerData } = rs[0]; + const { cxtStr, option } = triggerData; + await context.initialize(JSON.parse(cxtStr), true); + await this.execVolatileTrigger(entity, name, rs.map(ele => ele.id), context, option); + } + } + } + await context.commit(); + return rows.length; + } + catch (err) { + if (!(err instanceof types_1.OakPartialSuccess)) { + await context.rollback(); + this.logger.error('error on volatile trigger', entity, trigger.name, err); + } + else { + await context.commit(); + this.logger.error('error on volatile trigger', entity, trigger.name, err); + } + return 0; + } + } } exports.TriggerExecutor = TriggerExecutor; diff --git a/lib/types/Exception.d.ts b/lib/types/Exception.d.ts index c8b5c60..eb7fa36 100644 --- a/lib/types/Exception.d.ts +++ b/lib/types/Exception.d.ts @@ -19,7 +19,7 @@ export declare class OakException extend tag2?: boolean; tag3?: any; } -export declare class OakMakeSureByMySelfException extends OakException { +export declare class OakPartialSuccess extends OakException { } export declare class OakDataException extends OakException { } diff --git a/lib/types/Exception.js b/lib/types/Exception.js index 0d06c67..47bc9dc 100644 --- a/lib/types/Exception.js +++ b/lib/types/Exception.js @@ -1,6 +1,6 @@ "use strict"; Object.defineProperty(exports, "__esModule", { value: true }); -exports.makeException = exports.OakExternalException = exports.OakPreConditionUnsetException = exports.OakDeadlock = exports.OakCongruentRowExists = exports.OakRowLockedException = exports.OakUnloggedInException = exports.OakUserInvisibleException = exports.OakUserUnpermittedException = exports.OakAttrCantUpdateException = exports.OakAttrNotNullException = exports.OakInputIllegalException = exports.OakRowInconsistencyException = exports.OakServerProxyException = exports.OakNetworkException = exports.OakImportDataParseException = exports.OakUniqueViolationException = exports.OakUserException = exports.OakRowUnexistedException = exports.OakOperExistedException = exports.OakNoRelationDefException = exports.OakDataException = exports.OakMakeSureByMySelfException = exports.OakException = void 0; +exports.makeException = exports.OakExternalException = exports.OakPreConditionUnsetException = exports.OakDeadlock = exports.OakCongruentRowExists = exports.OakRowLockedException = exports.OakUnloggedInException = exports.OakUserInvisibleException = exports.OakUserUnpermittedException = exports.OakAttrCantUpdateException = exports.OakAttrNotNullException = exports.OakInputIllegalException = exports.OakRowInconsistencyException = exports.OakServerProxyException = exports.OakNetworkException = exports.OakImportDataParseException = exports.OakUniqueViolationException = exports.OakUserException = exports.OakRowUnexistedException = exports.OakOperExistedException = exports.OakNoRelationDefException = exports.OakDataException = exports.OakPartialSuccess = exports.OakException = void 0; const relation_1 = require("../store/relation"); const lodash_1 = require("../utils/lodash"); class OakException extends Error { @@ -90,10 +90,10 @@ class OakException extends Error { tag3; } exports.OakException = OakException; -// 这个异常表示模块自己处理跨事务一致性,框架pass(在分布式数据传递时会用到) -class OakMakeSureByMySelfException extends OakException { +// 这个异常表示事务虽然没有完全成功,但是仍然应该提交并抛出异常(在分布式数据传递时会用到) +class OakPartialSuccess extends OakException { } -exports.OakMakeSureByMySelfException = OakMakeSureByMySelfException; +exports.OakPartialSuccess = OakPartialSuccess; class OakDataException extends OakException { } exports.OakDataException = OakDataException; diff --git a/lib/types/Timer.d.ts b/lib/types/Timer.d.ts index bc6898b..526c0c3 100644 --- a/lib/types/Timer.d.ts +++ b/lib/types/Timer.d.ts @@ -12,6 +12,7 @@ export type FreeTimer> = { name: string; cron: RecurrenceRule | RecurrenceSpecDateRange | RecurrenceSpecObjLit | Date | string | number; timer: FreeOperateFn; + singleton?: true; }; export type Routine> = FreeRoutine | Watcher; export type Timer> = FreeTimer | Watcher & { diff --git a/lib/types/Trigger.d.ts b/lib/types/Trigger.d.ts index c43e2e2..78a3321 100644 --- a/lib/types/Trigger.d.ts +++ b/lib/types/Trigger.d.ts @@ -39,6 +39,8 @@ interface TriggerCrossTxn | when: 'commit'; strict?: 'takeEasy' | 'makeSure'; cs?: true; + singleton?: true; + grouped?: true; fn: (event: { ids: string[]; }, context: Cxt, option: OperateOption) => Promise<((context: Cxt, option: OperateOption) => Promise) | void>; diff --git a/lib/types/Watcher.d.ts b/lib/types/Watcher.d.ts index a2fa7d5..50dfd90 100644 --- a/lib/types/Watcher.d.ts +++ b/lib/types/Watcher.d.ts @@ -8,6 +8,7 @@ export interface BBWatcher { filter: ED[T]['Selection']['filter'] | (() => ED[T]['Selection']['filter']); action: Omit; actionData: ActionData | (() => Promise>); + singleton?: true; } export interface WBWatcher> { name: string; @@ -15,6 +16,7 @@ export interface WBWatcher Promise); projection: ED[T]['Selection']['data'] | (() => Promise); fn: (context: Cxt, data: Partial[]) => Promise>; + singleton?: true; } export type Watcher> = BBWatcher | WBWatcher; export {}; diff --git a/src/store/TriggerExecutor.ts b/src/store/TriggerExecutor.ts index e7c6086..0f8c053 100644 --- a/src/store/TriggerExecutor.ts +++ b/src/store/TriggerExecutor.ts @@ -11,7 +11,7 @@ import { SyncContext } from './SyncRowStore'; import { translateCheckerInAsyncContext } from './checker'; import { generateNewIdAsync } from '../utils/uuid'; import { readOnlyActions } from '../actions/action'; -import { OakMakeSureByMySelfException, StorageSchema } from '../types'; +import { OakPartialSuccess, StorageSchema } from '../types'; /** * update可能会传入多种不同的action,此时都需要检查update trigger @@ -74,8 +74,12 @@ export class TriggerExecutor { let result = 0; for (const entity of this.volatileEntities) { - if (entity === 'oper') { - // oper上的跨事务同步是系统synchronizer统一处理 - continue; - } const filter: ED[keyof ED]['Selection']['filter'] = { [TriggerUuidAttribute]: { $exists: true, @@ -598,12 +604,6 @@ export class TriggerExecutor 0) { + // 要用id来再锁一次,不然会锁住filter的范围,影响并发性 + // by Xc 20240314,在haina-busi和haina-cn数据sync过程中发现这个问题 + const rows2 = await context.select(entity, { + data: { + id: 1, + [TriggerDataAttribute]: 1, + [TriggerUuidAttribute]: 1, + }, + filter: { + id: { + $in: rows.map(ele => ele.id!), + }, + }, + }, { + includedDeleted: true, + dontCollect: true, + forUpdate: 'skip locked', // 如果加不上锁就下次再处理,或者有可能应用自己在处理 + }); + + if (grouped) { + // grouped不需要上下文了吧,内部一定会用root + await this.execVolatileTrigger(entity, name, rows.map(ele => ele.id!), context, {}); + } + else { + const groupedRowDict = groupBy(rows2, TriggerUuidAttribute); + for (const uuid in groupedRowDict) { + const rs = groupedRowDict[uuid]; + const { [TriggerDataAttribute]: triggerData } = rs[0]; + const { cxtStr, option } = triggerData!; + await context.initialize(JSON.parse(cxtStr), true); + await this.execVolatileTrigger(entity, name, rs.map(ele => ele.id!), context, option); + } + } + } + await context.commit(); + return rows.length; + } + catch (err) { + if (!(err instanceof OakPartialSuccess)) { + await context.rollback(); + this.logger.error('error on volatile trigger', entity, trigger.name, err); + } + else { + await context.commit(); + this.logger.error('error on volatile trigger', entity, trigger.name, err); + } + return 0; + } + } } diff --git a/src/types/Exception.ts b/src/types/Exception.ts index 1b6056b..3cac3ff 100644 --- a/src/types/Exception.ts +++ b/src/types/Exception.ts @@ -102,8 +102,8 @@ export class OakException extends Error tag3?: any; } -// 这个异常表示模块自己处理跨事务一致性,框架pass(在分布式数据传递时会用到) -export class OakMakeSureByMySelfException extends OakException { +// 这个异常表示事务虽然没有完全成功,但是仍然应该提交并抛出异常(在分布式数据传递时会用到) +export class OakPartialSuccess extends OakException { } diff --git a/src/types/Timer.ts b/src/types/Timer.ts index 586f8ed..d16c123 100644 --- a/src/types/Timer.ts +++ b/src/types/Timer.ts @@ -17,6 +17,7 @@ export type FreeTimer> = { name: string; cron: RecurrenceRule | RecurrenceSpecDateRange | RecurrenceSpecObjLit | Date | string | number; timer: FreeOperateFn; + singleton?: true; // 置singleton意味着在集群环境中只有一个进程会去执行 }; export type Routine> = FreeRoutine | Watcher; diff --git a/src/types/Trigger.ts b/src/types/Trigger.ts index 4f44cb1..403273e 100644 --- a/src/types/Trigger.ts +++ b/src/types/Trigger.ts @@ -51,6 +51,8 @@ interface TriggerCrossTxn | when: 'commit', strict?: 'takeEasy' | 'makeSure'; cs?: true; // cluster sensative,集群敏感的,需要由对应的集群进程统一处理 + singleton?: true; // 置singleton意味着在集群环境中只有一个进程会去统一执行 + grouped?: true; // 置grouped则在检查点时会将所有的不一致行全部传入一次性处理 fn: (event: { ids: string[] }, context: Cxt, option: OperateOption) => Promise<((context: Cxt, option: OperateOption) => Promise) | void>; // 跨事务的trigger可能紧接着下来就要触发另一个跨事务trigger,这里只能用回调的方式进行 } diff --git a/src/types/Watcher.ts b/src/types/Watcher.ts index 1b7d3b0..af255df 100644 --- a/src/types/Watcher.ts +++ b/src/types/Watcher.ts @@ -11,6 +11,7 @@ export interface BBWatcher { filter: ED[T]['Selection']['filter'] | (() => ED[T]['Selection']['filter']); action: Omit; actionData: ActionData | (() => Promise>); + singleton?: true; // 置singleton意味着在集群环境中只有一个进程会去执行 }; export interface WBWatcher> { @@ -19,6 +20,7 @@ export interface WBWatcher Promise); projection: ED[T]['Selection']['data'] | (() => Promise); fn: (context: Cxt, data: Partial[]) => Promise>; + singleton?: true; // 置singleton意味着在集群环境中只有一个进程会去执行 }; export type Watcher> = BBWatcher | WBWatcher;