增强了trigger/timer/watcher,增加了singleton模式

This commit is contained in:
Xu Chang 2024-08-03 19:30:00 +08:00
parent 996b0ed9dc
commit 802c3cc127
12 changed files with 248 additions and 38 deletions

View File

@ -39,4 +39,12 @@ export declare class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt
action: 'select';
}, context: Cxt, option: OperateOption | SelectOption, result?: Partial<ED[T]['Schema']>[]): Promise<void> | void;
checkpoint(timestamp: number): Promise<number>;
/**
* volatileTrigger逐个进行checkpoint
* @param name
* @param timestamp
* @param instanceCount
* @param instanceId
*/
independentCheckPoint(name: string, timestamp: number, instanceCount?: number, instanceId?: number): Promise<number>;
}

View File

@ -49,8 +49,12 @@ class TriggerExecutor {
await context.commit();
}
catch (err) {
await context.rollback();
if (!(err instanceof types_1.OakMakeSureByMySelfException)) {
if (!(err instanceof types_1.OakPartialSuccess)) {
await context.rollback();
this.logger.error('error on volatile trigger', entity, trigger.name, ids.join(','), err);
}
else {
await context.commit();
this.logger.error('error on volatile trigger', entity, trigger.name, ids.join(','), err);
}
// throw err;
@ -363,6 +367,12 @@ class TriggerExecutor {
filter: {
id: {
$in: ids,
},
[Entity_1.TriggerDataAttribute]: {
$exists: true,
},
[Entity_1.TriggerUuidAttribute]: {
$exists: true,
}
}
}, { includedDeleted: true, blockTrigger: true });
@ -473,10 +483,6 @@ class TriggerExecutor {
async checkpoint(timestamp) {
let result = 0;
for (const entity of this.volatileEntities) {
if (entity === 'oper') {
// oper上的跨事务同步是系统synchronizer统一处理
continue;
}
const filter = {
[Entity_1.TriggerUuidAttribute]: {
$exists: true,
@ -486,12 +492,6 @@ class TriggerExecutor {
}
};
const context = this.contextBuilder();
if (context.clusterInfo?.usingCluster) {
const { instanceCount, instanceId } = context.clusterInfo;
filter.$$seq$$ = {
$mod: [instanceCount, instanceId],
};
}
await context.begin();
try {
const rows = await context.select(entity, {
@ -532,15 +532,108 @@ class TriggerExecutor {
}
}
await context.commit();
result += rows.length;
}
catch (err) {
await context.rollback();
if (!(err instanceof types_1.OakMakeSureByMySelfException)) {
this.logger.error(`执行checkpoint时出错对象是「${entity}」,异常是`, err);
if (!(err instanceof types_1.OakPartialSuccess)) {
await context.rollback();
this.logger.error(`error in checkpoint on entity 「${entity}`, err);
}
else {
await context.commit();
this.logger.error(`error in checkpoint on entity 「${entity}`, err);
}
}
}
return result;
}
/**
* 由外部来控制进行按volatileTrigger逐个进行checkpoint
* @param name
* @param timestamp
* @param instanceCount
* @param instanceId
*/
async independentCheckPoint(name, timestamp, instanceCount, instanceId) {
const trigger = this.triggerNameMap[name];
(0, assert_1.default)(trigger && trigger.when === 'commit');
const { fn, entity, grouped } = trigger;
const filter = {
[Entity_1.TriggerDataAttribute]: {
name,
},
[Entity_1.TriggerUuidAttribute]: {
$exists: true,
},
[Entity_1.UpdateAtAttribute]: {
$lt: timestamp,
},
};
if (instanceCount) {
filter.$$seq$$ = {
$mod: [instanceCount, instanceId]
};
}
const context = this.contextBuilder();
await context.begin();
try {
const rows = await context.select(entity, {
data: {
id: 1,
},
filter,
}, {
includedDeleted: true,
dontCollect: true,
});
if (rows.length > 0) {
// 要用id来再锁一次不然会锁住filter的范围影响并发性
// by Xc 20240314在haina-busi和haina-cn数据sync过程中发现这个问题
const rows2 = await context.select(entity, {
data: {
id: 1,
[Entity_1.TriggerDataAttribute]: 1,
[Entity_1.TriggerUuidAttribute]: 1,
},
filter: {
id: {
$in: rows.map(ele => ele.id),
},
},
}, {
includedDeleted: true,
dontCollect: true,
forUpdate: 'skip locked', // 如果加不上锁就下次再处理,或者有可能应用自己在处理
});
if (grouped) {
// grouped不需要上下文了吧内部一定会用root
await this.execVolatileTrigger(entity, name, rows.map(ele => ele.id), context, {});
}
else {
const groupedRowDict = (0, lodash_1.groupBy)(rows2, Entity_1.TriggerUuidAttribute);
for (const uuid in groupedRowDict) {
const rs = groupedRowDict[uuid];
const { [Entity_1.TriggerDataAttribute]: triggerData } = rs[0];
const { cxtStr, option } = triggerData;
await context.initialize(JSON.parse(cxtStr), true);
await this.execVolatileTrigger(entity, name, rs.map(ele => ele.id), context, option);
}
}
}
await context.commit();
return rows.length;
}
catch (err) {
if (!(err instanceof types_1.OakPartialSuccess)) {
await context.rollback();
this.logger.error('error on volatile trigger', entity, trigger.name, err);
}
else {
await context.commit();
this.logger.error('error on volatile trigger', entity, trigger.name, err);
}
return 0;
}
}
}
exports.TriggerExecutor = TriggerExecutor;

View File

@ -19,7 +19,7 @@ export declare class OakException<ED extends EntityDict & BaseEntityDict> extend
tag2?: boolean;
tag3?: any;
}
export declare class OakMakeSureByMySelfException<ED extends EntityDict & BaseEntityDict> extends OakException<ED> {
export declare class OakPartialSuccess<ED extends EntityDict & BaseEntityDict> extends OakException<ED> {
}
export declare class OakDataException<ED extends EntityDict & BaseEntityDict> extends OakException<ED> {
}

View File

@ -1,6 +1,6 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.makeException = exports.OakExternalException = exports.OakPreConditionUnsetException = exports.OakDeadlock = exports.OakCongruentRowExists = exports.OakRowLockedException = exports.OakUnloggedInException = exports.OakUserInvisibleException = exports.OakUserUnpermittedException = exports.OakAttrCantUpdateException = exports.OakAttrNotNullException = exports.OakInputIllegalException = exports.OakRowInconsistencyException = exports.OakServerProxyException = exports.OakNetworkException = exports.OakImportDataParseException = exports.OakUniqueViolationException = exports.OakUserException = exports.OakRowUnexistedException = exports.OakOperExistedException = exports.OakNoRelationDefException = exports.OakDataException = exports.OakMakeSureByMySelfException = exports.OakException = void 0;
exports.makeException = exports.OakExternalException = exports.OakPreConditionUnsetException = exports.OakDeadlock = exports.OakCongruentRowExists = exports.OakRowLockedException = exports.OakUnloggedInException = exports.OakUserInvisibleException = exports.OakUserUnpermittedException = exports.OakAttrCantUpdateException = exports.OakAttrNotNullException = exports.OakInputIllegalException = exports.OakRowInconsistencyException = exports.OakServerProxyException = exports.OakNetworkException = exports.OakImportDataParseException = exports.OakUniqueViolationException = exports.OakUserException = exports.OakRowUnexistedException = exports.OakOperExistedException = exports.OakNoRelationDefException = exports.OakDataException = exports.OakPartialSuccess = exports.OakException = void 0;
const relation_1 = require("../store/relation");
const lodash_1 = require("../utils/lodash");
class OakException extends Error {
@ -90,10 +90,10 @@ class OakException extends Error {
tag3;
}
exports.OakException = OakException;
// 这个异常表示模块自己处理跨事务一致性框架pass(在分布式数据传递时会用到)
class OakMakeSureByMySelfException extends OakException {
// 这个异常表示事务虽然没有完全成功,但是仍然应该提交并抛出异常(在分布式数据传递时会用到)
class OakPartialSuccess extends OakException {
}
exports.OakMakeSureByMySelfException = OakMakeSureByMySelfException;
exports.OakPartialSuccess = OakPartialSuccess;
class OakDataException extends OakException {
}
exports.OakDataException = OakDataException;

View File

@ -12,6 +12,7 @@ export type FreeTimer<ED extends EntityDict, Cxt extends AsyncContext<ED>> = {
name: string;
cron: RecurrenceRule | RecurrenceSpecDateRange | RecurrenceSpecObjLit | Date | string | number;
timer: FreeOperateFn<ED, Cxt>;
singleton?: true;
};
export type Routine<ED extends EntityDict, T extends keyof ED, Cxt extends AsyncContext<ED>> = FreeRoutine<ED, Cxt> | Watcher<ED, T, Cxt>;
export type Timer<ED extends EntityDict, T extends keyof ED, Cxt extends AsyncContext<ED>> = FreeTimer<ED, Cxt> | Watcher<ED, T, Cxt> & {

View File

@ -39,6 +39,8 @@ interface TriggerCrossTxn<ED extends EntityDict, Cxt extends AsyncContext<ED> |
when: 'commit';
strict?: 'takeEasy' | 'makeSure';
cs?: true;
singleton?: true;
grouped?: true;
fn: (event: {
ids: string[];
}, context: Cxt, option: OperateOption) => Promise<((context: Cxt, option: OperateOption) => Promise<any>) | void>;

View File

@ -8,6 +8,7 @@ export interface BBWatcher<ED extends EntityDict, T extends keyof ED> {
filter: ED[T]['Selection']['filter'] | (() => ED[T]['Selection']['filter']);
action: Omit<ED[T]['Action'], 'create' | ReadOnlyAction>;
actionData: ActionData<ED, T> | (() => Promise<ActionData<ED, T>>);
singleton?: true;
}
export interface WBWatcher<ED extends EntityDict, T extends keyof ED, Cxt extends AsyncContext<ED>> {
name: string;
@ -15,6 +16,7 @@ export interface WBWatcher<ED extends EntityDict, T extends keyof ED, Cxt extend
filter: ED[T]['Selection']['filter'] | (() => Promise<ED[T]['Selection']['filter']>);
projection: ED[T]['Selection']['data'] | (() => Promise<ED[T]['Selection']['data']>);
fn: (context: Cxt, data: Partial<ED[T]['Schema']>[]) => Promise<OperationResult<ED>>;
singleton?: true;
}
export type Watcher<ED extends EntityDict, T extends keyof ED, Cxt extends AsyncContext<ED>> = BBWatcher<ED, T> | WBWatcher<ED, T, Cxt>;
export {};

View File

@ -11,7 +11,7 @@ import { SyncContext } from './SyncRowStore';
import { translateCheckerInAsyncContext } from './checker';
import { generateNewIdAsync } from '../utils/uuid';
import { readOnlyActions } from '../actions/action';
import { OakMakeSureByMySelfException, StorageSchema } from '../types';
import { OakPartialSuccess, StorageSchema } from '../types';
/**
* update可能会传入多种不同的actionupdate trigger
@ -74,8 +74,12 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
await context.commit();
}
catch (err) {
await context.rollback();
if (!(err instanceof OakMakeSureByMySelfException)) {
if (!(err instanceof OakPartialSuccess)) {
await context.rollback();
this.logger.error('error on volatile trigger', entity, trigger.name, ids.join(','), err);
}
else {
await context.commit();
this.logger.error('error on volatile trigger', entity, trigger.name, ids.join(','), err);
}
// throw err;
@ -455,6 +459,12 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
filter: {
id: {
$in: ids,
},
[TriggerDataAttribute]: {
$exists: true,
},
[TriggerUuidAttribute]: {
$exists: true,
}
}
}, { includedDeleted: true, blockTrigger: true });
@ -585,10 +595,6 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
async checkpoint(timestamp: number): Promise<number> {
let result = 0;
for (const entity of this.volatileEntities) {
if (entity === 'oper') {
// oper上的跨事务同步是系统synchronizer统一处理
continue;
}
const filter: ED[keyof ED]['Selection']['filter'] = {
[TriggerUuidAttribute]: {
$exists: true,
@ -598,12 +604,6 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
}
};
const context = this.contextBuilder();
if (context.clusterInfo?.usingCluster) {
const { instanceCount, instanceId } = context.clusterInfo!;
filter.$$seq$$ = {
$mod: [instanceCount!, instanceId!],
};
}
await context.begin();
try {
const rows = await context.select(entity, {
@ -636,6 +636,7 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
forUpdate: 'skip locked', // 如果加不上锁就下次再处理,或者有可能应用自己在处理
});
const grouped = groupBy(rows2, TriggerUuidAttribute);
for (const uuid in grouped) {
@ -647,14 +648,112 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
}
}
await context.commit();
result += rows.length;
}
catch (err) {
await context.rollback();
if (!(err instanceof OakMakeSureByMySelfException)) {
this.logger.error(`执行checkpoint时出错对象是「${entity as string}」,异常是`, err);
if (!(err instanceof OakPartialSuccess)) {
await context.rollback();
this.logger.error(`error in checkpoint on entity 「${entity as string}`, err);
}
else {
await context.commit();
this.logger.error(`error in checkpoint on entity 「${entity as string}`, err);
}
}
}
return result;
}
/**
* volatileTrigger逐个进行checkpoint
* @param name
* @param timestamp
* @param instanceCount
* @param instanceId
*/
async independentCheckPoint(name: string, timestamp: number, instanceCount?: number, instanceId?: number) {
const trigger = this.triggerNameMap[name];
assert(trigger && trigger.when === 'commit');
const { fn, entity, grouped } = trigger;
const filter: ED[keyof ED]['Selection']['filter'] = {
[TriggerDataAttribute]: {
name,
},
[TriggerUuidAttribute]: {
$exists: true,
},
[UpdateAtAttribute]: {
$lt: timestamp,
},
};
if (instanceCount) {
filter.$$seq$$ = {
$mod: [instanceCount, instanceId]
};
}
const context = this.contextBuilder();
await context.begin();
try {
const rows = await context.select(entity, {
data: {
id: 1,
},
filter,
}, {
includedDeleted: true,
dontCollect: true,
});
if (rows.length > 0) {
// 要用id来再锁一次不然会锁住filter的范围影响并发性
// by Xc 20240314在haina-busi和haina-cn数据sync过程中发现这个问题
const rows2 = await context.select(entity, {
data: {
id: 1,
[TriggerDataAttribute]: 1,
[TriggerUuidAttribute]: 1,
},
filter: {
id: {
$in: rows.map(ele => ele.id!),
},
},
}, {
includedDeleted: true,
dontCollect: true,
forUpdate: 'skip locked', // 如果加不上锁就下次再处理,或者有可能应用自己在处理
});
if (grouped) {
// grouped不需要上下文了吧内部一定会用root
await this.execVolatileTrigger(entity, name, rows.map(ele => ele.id!), context, {});
}
else {
const groupedRowDict = groupBy(rows2, TriggerUuidAttribute);
for (const uuid in groupedRowDict) {
const rs = groupedRowDict[uuid];
const { [TriggerDataAttribute]: triggerData } = rs[0];
const { cxtStr, option } = triggerData!;
await context.initialize(JSON.parse(cxtStr), true);
await this.execVolatileTrigger(entity, name, rs.map(ele => ele.id!), context, option);
}
}
}
await context.commit();
return rows.length;
}
catch (err) {
if (!(err instanceof OakPartialSuccess)) {
await context.rollback();
this.logger.error('error on volatile trigger', entity, trigger.name, err);
}
else {
await context.commit();
this.logger.error('error on volatile trigger', entity, trigger.name, err);
}
return 0;
}
}
}

View File

@ -102,8 +102,8 @@ export class OakException<ED extends EntityDict & BaseEntityDict> extends Error
tag3?: any;
}
// 这个异常表示模块自己处理跨事务一致性框架pass(在分布式数据传递时会用到)
export class OakMakeSureByMySelfException<ED extends EntityDict & BaseEntityDict> extends OakException<ED> {
// 这个异常表示事务虽然没有完全成功,但是仍然应该提交并抛出异常(在分布式数据传递时会用到)
export class OakPartialSuccess<ED extends EntityDict & BaseEntityDict> extends OakException<ED> {
}

View File

@ -17,6 +17,7 @@ export type FreeTimer<ED extends EntityDict, Cxt extends AsyncContext<ED>> = {
name: string;
cron: RecurrenceRule | RecurrenceSpecDateRange | RecurrenceSpecObjLit | Date | string | number;
timer: FreeOperateFn<ED, Cxt>;
singleton?: true; // 置singleton意味着在集群环境中只有一个进程会去执行
};
export type Routine<ED extends EntityDict, T extends keyof ED, Cxt extends AsyncContext<ED>> = FreeRoutine<ED, Cxt> | Watcher<ED, T, Cxt>;

View File

@ -51,6 +51,8 @@ interface TriggerCrossTxn<ED extends EntityDict, Cxt extends AsyncContext<ED> |
when: 'commit',
strict?: 'takeEasy' | 'makeSure';
cs?: true; // cluster sensative集群敏感的需要由对应的集群进程统一处理
singleton?: true; // 置singleton意味着在集群环境中只有一个进程会去统一执行
grouped?: true; // 置grouped则在检查点时会将所有的不一致行全部传入一次性处理
fn: (event: { ids: string[] }, context: Cxt, option: OperateOption) =>
Promise<((context: Cxt, option: OperateOption) => Promise<any>) | void>; // 跨事务的trigger可能紧接着下来就要触发另一个跨事务trigger这里只能用回调的方式进行
}

View File

@ -11,6 +11,7 @@ export interface BBWatcher<ED extends EntityDict, T extends keyof ED> {
filter: ED[T]['Selection']['filter'] | (() => ED[T]['Selection']['filter']);
action: Omit<ED[T]['Action'], 'create' | ReadOnlyAction>;
actionData: ActionData<ED, T> | (() => Promise<ActionData<ED, T>>);
singleton?: true; // 置singleton意味着在集群环境中只有一个进程会去执行
};
export interface WBWatcher<ED extends EntityDict, T extends keyof ED, Cxt extends AsyncContext<ED>> {
@ -19,6 +20,7 @@ export interface WBWatcher<ED extends EntityDict, T extends keyof ED, Cxt extend
filter: ED[T]['Selection']['filter'] | (() => Promise<ED[T]['Selection']['filter']>);
projection: ED[T]['Selection']['data'] | (() => Promise<ED[T]['Selection']['data']>);
fn: (context: Cxt, data: Partial<ED[T]['Schema']>[]) => Promise<OperationResult<ED>>;
singleton?: true; // 置singleton意味着在集群环境中只有一个进程会去执行
};
export type Watcher<ED extends EntityDict, T extends keyof ED, Cxt extends AsyncContext<ED>> = BBWatcher<ED, T> | WBWatcher<ED, T, Cxt>;