create的oper也存入filter,commit的trigger改善了加锁性能
This commit is contained in:
parent
ba152f8887
commit
9122c8ebe5
|
|
@ -1181,6 +1181,11 @@ class CascadeStore extends RowStore_1.RowStore {
|
|||
entity: entity,
|
||||
},
|
||||
}],
|
||||
filter: {
|
||||
id: data instanceof Array ? {
|
||||
id: { $in: data.map(ele => ele.id) },
|
||||
} : data.id,
|
||||
}
|
||||
},
|
||||
};
|
||||
const closeRootMode = context.openRootMode();
|
||||
|
|
|
|||
|
|
@ -425,6 +425,10 @@ class TriggerExecutor {
|
|||
async checkpoint(timestamp) {
|
||||
let result = 0;
|
||||
for (const entity of this.volatileEntities) {
|
||||
if (entity === 'oper') {
|
||||
// oper上的跨事务同步是系统synchronizer统一处理
|
||||
continue;
|
||||
}
|
||||
const filter = {
|
||||
[Entity_1.TriggerUuidAttribute]: {
|
||||
$exists: true,
|
||||
|
|
@ -445,22 +449,39 @@ class TriggerExecutor {
|
|||
const rows = await context.select(entity, {
|
||||
data: {
|
||||
id: 1,
|
||||
[Entity_1.TriggerDataAttribute]: 1,
|
||||
[Entity_1.TriggerUuidAttribute]: 1,
|
||||
},
|
||||
filter,
|
||||
}, {
|
||||
includedDeleted: true,
|
||||
dontCollect: true,
|
||||
forUpdate: 'skip locked', // 防止某个跨事务trigger的逻辑执行周期太长
|
||||
});
|
||||
const grouped = (0, lodash_1.groupBy)(rows, Entity_1.TriggerUuidAttribute);
|
||||
for (const uuid in grouped) {
|
||||
const rs = grouped[uuid];
|
||||
const { [Entity_1.TriggerDataAttribute]: triggerData } = rs[0];
|
||||
const { name, cxtStr, option } = triggerData;
|
||||
await context.initialize(JSON.parse(cxtStr), true);
|
||||
await this.execVolatileTrigger(entity, name, rs.map(ele => ele.id), context, option);
|
||||
if (rows.length > 0) {
|
||||
// 要用id来再锁一次,不然会锁住filter的范围,影响并发性
|
||||
// by Xc 20240314,在haina-busi和haina-cn数据sync过程中发现这个问题
|
||||
const rows2 = await context.select(entity, {
|
||||
data: {
|
||||
id: 1,
|
||||
[Entity_1.TriggerDataAttribute]: 1,
|
||||
[Entity_1.TriggerUuidAttribute]: 1,
|
||||
},
|
||||
filter: {
|
||||
id: {
|
||||
$in: rows.map(ele => ele.id),
|
||||
},
|
||||
},
|
||||
}, {
|
||||
includedDeleted: true,
|
||||
dontCollect: true,
|
||||
forUpdate: 'skip locked', // 如果加不上锁就下次再处理,或者有可能应用自己在处理
|
||||
});
|
||||
const grouped = (0, lodash_1.groupBy)(rows2, Entity_1.TriggerUuidAttribute);
|
||||
for (const uuid in grouped) {
|
||||
const rs = grouped[uuid];
|
||||
const { [Entity_1.TriggerDataAttribute]: triggerData } = rs[0];
|
||||
const { name, cxtStr, option } = triggerData;
|
||||
await context.initialize(JSON.parse(cxtStr), true);
|
||||
await this.execVolatileTrigger(entity, name, rs.map(ele => ele.id), context, option);
|
||||
}
|
||||
}
|
||||
await context.commit();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1454,6 +1454,11 @@ export abstract class CascadeStore<ED extends EntityDict & BaseEntityDict> exten
|
|||
entity: entity as string,
|
||||
},
|
||||
}],
|
||||
filter: {
|
||||
id: data instanceof Array ? {
|
||||
id: { $in: data.map(ele => ele.id!) },
|
||||
} : data.id!,
|
||||
}
|
||||
},
|
||||
};
|
||||
const closeRootMode = context.openRootMode();
|
||||
|
|
|
|||
|
|
@ -39,8 +39,8 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
|
|||
private logger: Logger;
|
||||
private contextBuilder: (cxtString?: string) => Promise<Cxt>;
|
||||
private onVolatileTrigger: <T extends keyof ED>(
|
||||
entity: T,
|
||||
trigger: VolatileTrigger<ED, T, Cxt>,
|
||||
entity: T,
|
||||
trigger: VolatileTrigger<ED, T, Cxt>,
|
||||
ids: string[],
|
||||
cxtStr: string,
|
||||
option: OperateOption
|
||||
|
|
@ -51,8 +51,8 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
|
|||
logger: Logger = console,
|
||||
onVolatileTrigger?: <T extends keyof ED>(
|
||||
entity: T,
|
||||
trigger: VolatileTrigger<ED, T, Cxt>,
|
||||
ids: string[],
|
||||
trigger: VolatileTrigger<ED, T, Cxt>,
|
||||
ids: string[],
|
||||
cxtStr: string,
|
||||
option: OperateOption) => Promise<void>
|
||||
) {
|
||||
|
|
@ -79,8 +79,8 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
|
|||
setOnVolatileTrigger(
|
||||
onVolatileTrigger: <T extends keyof ED>(
|
||||
entity: T,
|
||||
trigger: VolatileTrigger<ED, T, Cxt>,
|
||||
ids: string[],
|
||||
trigger: VolatileTrigger<ED, T, Cxt>,
|
||||
ids: string[],
|
||||
cxtStr: string,
|
||||
option: OperateOption) => Promise<void>
|
||||
) {
|
||||
|
|
@ -224,7 +224,7 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
|
|||
if (data.hasOwnProperty(TriggerDataAttribute) || data.hasOwnProperty(TriggerUuidAttribute)) {
|
||||
throw new Error('同一行数据上不能存在两个跨事务约束');
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
@ -245,7 +245,7 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (data instanceof Array) {
|
||||
data.forEach(
|
||||
(d) => {
|
||||
|
|
@ -279,7 +279,7 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
|
|||
trigger: VolatileTrigger<ED, T, Cxt>,
|
||||
context: Cxt,
|
||||
option: OperateOption
|
||||
) {
|
||||
) {
|
||||
context.on('commit', async () => {
|
||||
let ids = [] as string[];
|
||||
let cxtStr = await context.toString();
|
||||
|
|
@ -414,7 +414,7 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
|
|||
const { fn } = trigger as VolatileTrigger<ED, T, Cxt>;
|
||||
await fn({ ids }, context, option);
|
||||
if (trigger.strict === 'makeSure') {
|
||||
try {
|
||||
try {
|
||||
await context.operate(entity, {
|
||||
id: await generateNewIdAsync(),
|
||||
action: 'update',
|
||||
|
|
@ -534,6 +534,10 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
|
|||
async checkpoint(timestamp: number): Promise<number> {
|
||||
let result = 0;
|
||||
for (const entity of this.volatileEntities) {
|
||||
if (entity === 'oper') {
|
||||
// oper上的跨事务同步是系统synchronizer统一处理
|
||||
continue;
|
||||
}
|
||||
const filter: ED[keyof ED]['Selection']['filter'] = {
|
||||
[TriggerUuidAttribute]: {
|
||||
$exists: true,
|
||||
|
|
@ -554,24 +558,42 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
|
|||
const rows = await context.select(entity, {
|
||||
data: {
|
||||
id: 1,
|
||||
[TriggerDataAttribute]: 1,
|
||||
[TriggerUuidAttribute]: 1,
|
||||
},
|
||||
filter,
|
||||
} as any, {
|
||||
}, {
|
||||
includedDeleted: true,
|
||||
dontCollect: true,
|
||||
forUpdate: 'skip locked', // 防止某个跨事务trigger的逻辑执行周期太长
|
||||
});
|
||||
|
||||
const grouped = groupBy(rows, TriggerUuidAttribute);
|
||||
if (rows.length > 0) {
|
||||
// 要用id来再锁一次,不然会锁住filter的范围,影响并发性
|
||||
// by Xc 20240314,在haina-busi和haina-cn数据sync过程中发现这个问题
|
||||
const rows2 = await context.select(entity, {
|
||||
data: {
|
||||
id: 1,
|
||||
[TriggerDataAttribute]: 1,
|
||||
[TriggerUuidAttribute]: 1,
|
||||
},
|
||||
filter: {
|
||||
id: {
|
||||
$in: rows.map(ele => ele.id!),
|
||||
},
|
||||
},
|
||||
}, {
|
||||
includedDeleted: true,
|
||||
dontCollect: true,
|
||||
forUpdate: 'skip locked', // 如果加不上锁就下次再处理,或者有可能应用自己在处理
|
||||
});
|
||||
|
||||
for (const uuid in grouped) {
|
||||
const rs = grouped[uuid];
|
||||
const { [TriggerDataAttribute]: triggerData } = rs[0];
|
||||
const { name, cxtStr, option } = triggerData!;
|
||||
await context.initialize(JSON.parse(cxtStr), true);
|
||||
await this.execVolatileTrigger(entity, name, rs.map(ele => ele.id!), context, option);
|
||||
const grouped = groupBy(rows2, TriggerUuidAttribute);
|
||||
|
||||
for (const uuid in grouped) {
|
||||
const rs = grouped[uuid];
|
||||
const { [TriggerDataAttribute]: triggerData } = rs[0];
|
||||
const { name, cxtStr, option } = triggerData!;
|
||||
await context.initialize(JSON.parse(cxtStr), true);
|
||||
await this.execVolatileTrigger(entity, name, rs.map(ele => ele.id!), context, option);
|
||||
}
|
||||
}
|
||||
await context.commit();
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue