只有makeSure的volatile trigger写入跨事务属性

This commit is contained in:
Xu Chang 2023-12-15 18:39:12 +08:00
parent b454040e2e
commit 528165a713
2 changed files with 68 additions and 78 deletions

View File

@ -429,6 +429,7 @@ class TriggerExecutor {
} }
async checkpoint(timestamp) { async checkpoint(timestamp) {
let result = 0; let result = 0;
console.log('checkpoint start', this.volatileEntities.join(','));
for (const entity of this.volatileEntities) { for (const entity of this.volatileEntities) {
const filter = { const filter = {
[Entity_1.TriggerUuidAttribute]: { [Entity_1.TriggerUuidAttribute]: {
@ -474,6 +475,7 @@ class TriggerExecutor {
this.logger.error(`执行checkpoint时出错对象是「${entity}」,异常是`, err); this.logger.error(`执行checkpoint时出错对象是「${entity}」,异常是`, err);
} }
} }
console.log('checkpoint end');
return result; return result;
} }
} }

View File

@ -204,69 +204,71 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
) { ) {
assert(trigger.action !== 'select'); assert(trigger.action !== 'select');
assert(trigger.when === 'commit'); assert(trigger.when === 'commit');
const uuid = await generateNewIdAsync(); if (trigger.strict === 'makeSure') {
const cxtStr = context.toString(); const uuid = await generateNewIdAsync();
const { data } = operation; const cxtStr = context.toString();
switch (operation.action) { const { data } = operation;
case 'create': { switch (operation.action) {
if (data instanceof Array) { case 'create': {
data.forEach( if (data instanceof Array) {
(d) => { data.forEach(
if (d.hasOwnProperty(TriggerDataAttribute) || d.hasOwnProperty(TriggerUuidAttribute)) { (d) => {
throw new Error('同一行数据上不能同时存在两个跨事务约束'); if (d.hasOwnProperty(TriggerDataAttribute) || d.hasOwnProperty(TriggerUuidAttribute)) {
throw new Error('同一行数据上不能同时存在两个跨事务约束');
}
} }
} )
)
}
else {
if (data.hasOwnProperty(TriggerDataAttribute) || data.hasOwnProperty(TriggerUuidAttribute)) {
throw new Error('同一行数据上不能存在两个跨事务约束');
} }
else {
if (data.hasOwnProperty(TriggerDataAttribute) || data.hasOwnProperty(TriggerUuidAttribute)) {
throw new Error('同一行数据上不能存在两个跨事务约束');
}
}
break;
} }
break; default: {
} const { filter } = operation;
default: { // 此时要保证更新或者删除的行上没有跨事务约束
const { filter } = operation; const filter2 = combineFilters(entity, context.getSchema(), [{
// 此时要保证更新或者删除的行上没有跨事务约束 [TriggerUuidAttribute]: {
const filter2 = combineFilters(entity, context.getSchema(), [{ $exists: true,
[TriggerUuidAttribute]: {
$exists: true,
},
}, filter]);
const count = await context.count(entity, {
filter: filter2
} as Omit<ED[T]['Selection'], 'action' | 'sorter' | 'data'>, {});
if (count > 0) {
throw new Error(`对象${String(entity)}的行「${JSON.stringify(operation)}」上已经存在未完成的跨事务约束`);
}
break;
}
}
if (data instanceof Array) {
data.forEach(
(d) => {
Object.assign(d, {
[TriggerDataAttribute]: {
name: trigger.name,
cxtStr: context.toString(),
option,
}, },
[TriggerUuidAttribute]: uuid, }, filter]);
}); const count = await context.count(entity, {
filter: filter2
} as Omit<ED[T]['Selection'], 'action' | 'sorter' | 'data'>, {});
if (count > 0) {
throw new Error(`对象${String(entity)}的行「${JSON.stringify(operation)}」上已经存在未完成的跨事务约束`);
}
break;
} }
); }
}
else { if (data instanceof Array) {
Object.assign(data, { data.forEach(
[TriggerDataAttribute]: { (d) => {
name: trigger.name, Object.assign(d, {
cxtStr, [TriggerDataAttribute]: {
option, name: trigger.name,
}, cxtStr: context.toString(),
[TriggerUuidAttribute]: uuid, option,
}); },
[TriggerUuidAttribute]: uuid,
});
}
);
}
else {
Object.assign(data, {
[TriggerDataAttribute]: {
name: trigger.name,
cxtStr,
option,
},
[TriggerUuidAttribute]: uuid,
});
}
} }
} }
@ -405,24 +407,8 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
assert(ids.length > 0); assert(ids.length > 0);
const { fn } = trigger as VolatileTrigger<ED, T, Cxt>; const { fn } = trigger as VolatileTrigger<ED, T, Cxt>;
await fn({ ids }, context, option); await fn({ ids }, context, option);
try { if (trigger.strict === 'makeSure') {
await context.operate(entity, { try {
id: await generateNewIdAsync(),
action: 'update',
data: {
[TriggerDataAttribute]: null,
[TriggerUuidAttribute]: null,
},
filter: {
id: {
$in: ids,
}
}
}, { includedDeleted: true, blockTrigger: true });
}
catch (err) {
if (trigger.strict === 'takeEasy') {
// 如果不是makeSure的就直接清空
await context.operate(entity, { await context.operate(entity, {
id: await generateNewIdAsync(), id: await generateNewIdAsync(),
action: 'update', action: 'update',
@ -435,9 +421,11 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
$in: ids, $in: ids,
} }
} }
}, { includedDeleted: true }); }, { includedDeleted: true, blockTrigger: true });
}
catch (err) {
throw err;
} }
throw err;
} }
} }