oak-domain/lib/store/AsyncRowStore.js

217 lines
6.5 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.AsyncContext = void 0;
const tslib_1 = require("tslib");
const types_1 = require("../types");
const action_1 = require("../actions/action");
const assert_1 = tslib_1.__importDefault(require("assert"));
const filter_1 = require("./filter");
/**
* 服务器端执行的异步环境的底层抽象
*/
class AsyncContext {
rowStore;
uuid;
opRecords;
scene;
headers;
clusterInfo;
opResult;
message;
events;
constructor(store) {
this.rowStore = store;
this.opRecords = [];
this.events = {
commit: [],
rollback: [],
};
this.opResult = {};
}
// 使一个上下文重新开始事务执行,清除历史数据(定时器中使用)
async restartToExecute(routine) {
const data = await this.getSerializedData();
const newContext = (new (Object.getPrototypeOf(this).constructor)(this.rowStore));
await newContext.begin();
await newContext.initialize(data, true);
newContext.opRecords = [];
newContext.events = {
commit: [],
rollback: [],
};
newContext.opResult = {};
try {
await routine(newContext);
await newContext.commit();
}
catch (err) {
await newContext.rollback();
throw err;
}
}
getHeader(key) {
if (this.headers) {
return this.headers[key];
}
}
getScene() {
return this.scene;
}
setScene(scene) {
this.scene = scene;
}
resetEvents() {
this.events = {
commit: [],
rollback: [],
};
}
on(event, callback) {
this.uuid && this.events[event].push(callback);
}
saveOpRecord(entity, operation) {
const { action, data, filter, id } = operation;
switch (action) {
case 'create': {
this.opRecords.push({
id,
a: 'c',
e: entity,
d: data,
});
break;
}
case 'remove': {
const deleteAt = data[types_1.DeleteAtAttribute];
(0, assert_1.default)(deleteAt);
this.opRecords.push({
id,
a: 'r',
d: {
[types_1.DeleteAtAttribute]: deleteAt,
},
e: entity,
f: filter,
});
break;
}
default: {
(0, assert_1.default)(!action_1.readOnlyActions.includes(action));
this.opRecords.push({
id,
a: action,
e: entity,
d: data,
f: filter,
});
}
}
}
/**
* 查询某operation所处理的row ids
* 如果该operation还未执行可能返回空数组不知道实际关联的row id但是在after的trigger中返回是准确的ids值此时如果是空数组说明没有有关row id
* @param id
*/
getRowIdsOfOperation(operation) {
const { id, action, data, filter } = operation;
if (action === 'create') {
if (data instanceof Array) {
return data.map(ele => ele.id);
}
return [data.id];
}
else if (filter) {
const ids = (0, filter_1.getRelevantIds)(filter);
if (ids.length > 0) {
return ids;
}
}
const oper = this.opRecords.find(ele => ele.id === id);
if (oper) {
const { a } = oper;
(0, assert_1.default)(a !== 'create');
const { f } = oper;
(0, assert_1.default)(f && f?.id?.$in && f.id.$in instanceof Array);
return f.id.$in;
}
return [];
}
/**
* 一个context中不应该有并发的事务这里将事务串行化使用的时候千万要注意不要自己等自己
* @param options
*/
async begin(options) {
if (!this.uuid) {
this.uuid = await this.rowStore.begin(options);
}
else {
(0, assert_1.default)(false);
}
}
async commit() {
if (this.uuid) {
await this.rowStore.commit(this.uuid);
const { commit: commitEvents } = this.events;
/* for (const e of commitEvents) {
await e();
} */
// 提交时不能等在跨事务trigger上
const cxtStr = await this.toString();
commitEvents.forEach(evt => evt(this.opRecords, cxtStr));
this.uuid = undefined;
this.resetEvents();
this.opRecords = [];
this.opResult = {};
this.message = '';
}
}
async rollback() {
if (this.uuid) {
await this.rowStore.rollback(this.uuid);
const { rollback: rollbackEvents } = this.events;
// 回退时不能等在跨事务trigger上
const cxtStr = await this.toString();
rollbackEvents.forEach(evt => evt(this.opRecords, cxtStr));
this.uuid = undefined;
this.opRecords = [];
this.opResult = {};
this.resetEvents();
}
}
async operate(entity, operation, option) {
const result = await this.rowStore.operate(entity, operation, this, option);
this.opResult = this.mergeMultipleResults([this.opResult, result]);
return result;
}
select(entity, selection, option) {
return this.rowStore.select(entity, selection, this, option);
}
aggregate(entity, aggregation, option) {
return this.rowStore.aggregate(entity, aggregation, this, option);
}
count(entity, selection, option) {
return this.rowStore.count(entity, selection, this, option);
}
exec(script, txnId) {
return this.rowStore.exec(script, txnId);
}
mergeMultipleResults(toBeMerged) {
return this.rowStore.mergeMultipleResults(toBeMerged);
}
getCurrentTxnId() {
return this.uuid;
}
getSchema() {
return this.rowStore.getSchema();
}
setMessage(message) {
this.message = message;
}
getMessage() {
return this.message;
}
}
exports.AsyncContext = AsyncContext;
;
;