事务在同一行上更新的等待

This commit is contained in:
Xu Chang 2022-10-23 23:13:46 +08:00
parent f91807c720
commit 6836843d7b
4 changed files with 238 additions and 113 deletions

1
lib/store.d.ts vendored
View File

@ -13,6 +13,7 @@ export default class TreeStore<ED extends EntityDict & BaseEntityDict, Cxt exten
private store;
private activeTxnDict;
private stat;
private waitOnTxn;
protected supportMultipleCreate(): boolean;
protected supportManyToOneJoin(): boolean;
resetInitialData(data: {

View File

@ -11,6 +11,7 @@ var RowStore_1 = require("oak-domain/lib/types/RowStore");
var Demand_2 = require("oak-domain/lib/types/Demand");
var relation_1 = require("oak-domain/lib/store/relation");
var Expression_1 = require("oak-domain/lib/types/Expression");
var types_1 = require("oak-domain/src/types");
;
;
function obscurePass(row, attr, option) {
@ -31,6 +32,30 @@ var TreeStore = /** @class */ (function (_super) {
};
return _this;
}
TreeStore.prototype.waitOnTxn = function (id, context) {
return tslib_1.__awaiter(this, void 0, void 0, function () {
var myId, myWaitList, waitList, p;
return tslib_1.__generator(this, function (_a) {
switch (_a.label) {
case 0:
myId = context.getCurrentTxnId();
myWaitList = this.activeTxnDict[myId].waitList;
if (myWaitList.find(function (ele) { return ele.id === id; })) {
throw new types_1.OakDeadlock();
}
waitList = this.activeTxnDict[id].waitList;
p = new Promise(function (resolve) { return waitList.push({
id: myId,
fn: resolve,
}); });
return [4 /*yield*/, p];
case 1:
_a.sent();
return [2 /*return*/];
}
});
});
};
TreeStore.prototype.supportMultipleCreate = function () {
return false;
};
@ -1040,51 +1065,52 @@ var TreeStore = /** @class */ (function (_super) {
};
TreeStore.prototype.updateAbjointRow = function (entity, operation, context, option) {
return tslib_1.__awaiter(this, void 0, void 0, function () {
var data, action, operId, _a, id, node, node2, selection, rows, ids;
var _this = this;
return tslib_1.__generator(this, function (_b) {
switch (_b.label) {
var data, action, operId, _a, id, node, node2, selection, rows, ids, ids_1, ids_1_1, id, alreadyDirtyNode, node, e_10_1;
var e_10, _b;
return tslib_1.__generator(this, function (_c) {
switch (_c.label) {
case 0:
data = operation.data, action = operation.action, operId = operation.id;
_a = action;
switch (_a) {
case 'create': return [3 /*break*/, 1];
}
return [3 /*break*/, 2];
return [3 /*break*/, 5];
case 1:
{
id = data.id;
(0, assert_1.assert)(id);
// const node = this.store[entity] && (this.store[entity]!)[id as string];
// const row = node && this.constructRow(node, context) || {};
/* if (row) {
throw new OakError(RowStore.$$LEVEL, RowStore.$$CODES.primaryKeyConfilict);
} */
if (this.store[entity] && (this.store[entity])[id]) {
node = this.store[entity] && (this.store[entity])[id];
throw new Exception_1.OakCongruentRowExists(entity, this.constructRow(node, context));
}
if (!data.$$seq$$) {
// tree-store随意生成即可
Object.assign(data, {
$$seq$$: "".concat(Math.ceil((Math.random() + 1000) * 100)),
});
}
node2 = {
$txnId: context.getCurrentTxnId(),
$current: null,
$next: data,
$path: "".concat(entity, ".").concat(id),
};
if (!this.store[entity]) {
this.store[entity] = {};
}
(0, lodash_1.set)(this.store, "".concat(entity, ".").concat(id), node2);
this.addToTxnNode(node2, context, 'create');
return [2 /*return*/, 1];
}
_b.label = 2;
id = data.id;
(0, assert_1.assert)(id);
_c.label = 2;
case 2:
if (!(this.store[entity] && (this.store[entity])[id] && (this.store[entity])[id].$txnId)) return [3 /*break*/, 4];
(0, assert_1.assert)((this.store[entity])[id].$txnId !== context.getCurrentTxnId());
return [4 /*yield*/, this.waitOnTxn((this.store[entity])[id].$txnId, context)];
case 3:
_c.sent();
return [3 /*break*/, 2];
case 4:
if (this.store[entity] && (this.store[entity])[id]) {
node = this.store[entity] && (this.store[entity])[id];
throw new Exception_1.OakCongruentRowExists(entity, this.constructRow(node, context));
}
if (!data.$$seq$$) {
// tree-store随意生成即可
Object.assign(data, {
$$seq$$: "".concat(Math.ceil((Math.random() + 1000) * 100)),
});
}
node2 = {
$txnId: context.getCurrentTxnId(),
$current: null,
$next: data,
$path: "".concat(entity, ".").concat(id),
};
if (!this.store[entity]) {
this.store[entity] = {};
}
(0, lodash_1.set)(this.store, "".concat(entity, ".").concat(id), node2);
this.addToTxnNode(node2, context, 'create');
return [2 /*return*/, 1];
case 5:
selection = {
data: {
id: 1,
@ -1094,37 +1120,66 @@ var TreeStore = /** @class */ (function (_super) {
count: operation.count,
};
return [4 /*yield*/, this.selectAbjointRow(entity, selection, context)];
case 3:
rows = _b.sent();
case 6:
rows = _c.sent();
ids = rows.map(function (ele) { return ele.id; });
ids.forEach(function (id) {
var alreadyDirtyNode = false;
var node = (_this.store[entity])[id];
(0, assert_1.assert)(node);
if (!node.$txnId) {
node.$txnId = context.getCurrentTxnId();
_c.label = 7;
case 7:
_c.trys.push([7, 14, 15, 16]);
ids_1 = tslib_1.__values(ids), ids_1_1 = ids_1.next();
_c.label = 8;
case 8:
if (!!ids_1_1.done) return [3 /*break*/, 13];
id = ids_1_1.value;
alreadyDirtyNode = false;
node = (this.store[entity])[id];
(0, assert_1.assert)(node);
_c.label = 9;
case 9:
if (!(node.$txnId && node.$txnId !== context.getCurrentTxnId())) return [3 /*break*/, 11];
return [4 /*yield*/, this.waitOnTxn(node.$txnId, context)];
case 10:
_c.sent();
return [3 /*break*/, 9];
case 11:
if (!node.$txnId) {
node.$txnId = context.getCurrentTxnId();
}
else {
(0, assert_1.assert)(node.$txnId === context.getCurrentTxnId());
alreadyDirtyNode = true;
}
if (action === 'remove') {
node.$next = null;
node.$path = "".concat(entity, ".").concat(id);
if (!alreadyDirtyNode) {
// 如果已经更新过的结点就不能再加了,会形成循环
this.addToTxnNode(node, context, 'remove');
}
else {
(0, assert_1.assert)(node.$txnId === context.getCurrentTxnId());
alreadyDirtyNode = true;
}
else {
node.$next = Object.assign(node.$next || {}, data);
if (!alreadyDirtyNode) {
// 如果已经更新过的结点就不能再加了,会形成循环
this.addToTxnNode(node, context, 'update');
}
if (action === 'remove') {
node.$next = null;
node.$path = "".concat(entity, ".").concat(id);
if (!alreadyDirtyNode) {
// 如果已经更新过的结点就不能再加了,会形成循环
_this.addToTxnNode(node, context, 'remove');
}
}
else {
node.$next = Object.assign(node.$next || {}, data);
if (!alreadyDirtyNode) {
// 如果已经更新过的结点就不能再加了,会形成循环
_this.addToTxnNode(node, context, 'update');
}
}
});
return [2 /*return*/, rows.length];
}
_c.label = 12;
case 12:
ids_1_1 = ids_1.next();
return [3 /*break*/, 8];
case 13: return [3 /*break*/, 16];
case 14:
e_10_1 = _c.sent();
e_10 = { error: e_10_1 };
return [3 /*break*/, 16];
case 15:
try {
if (ids_1_1 && !ids_1_1.done && (_b = ids_1.return)) _b.call(ids_1);
}
finally { if (e_10) throw e_10.error; }
return [7 /*endfinally*/];
case 16: return [2 /*return*/, rows.length];
}
});
});
@ -1274,8 +1329,8 @@ var TreeStore = /** @class */ (function (_super) {
};
TreeStore.prototype.formResult = function (entity, rows, selection, context, nodeDict) {
return tslib_1.__awaiter(this, void 0, void 0, function () {
var data, sorter, indexFrom, count, findAvailableExprName, sortToProjection, rows2, rows_1, rows_1_1, row, result, nodeDict2, e_10_1, sorterFn;
var e_10, _a;
var data, sorter, indexFrom, count, findAvailableExprName, sortToProjection, rows2, rows_1, rows_1_1, row, result, nodeDict2, e_11_1, sorterFn;
var e_11, _a;
var _this = this;
return tslib_1.__generator(this, function (_b) {
switch (_b.label) {
@ -1350,14 +1405,14 @@ var TreeStore = /** @class */ (function (_super) {
return [3 /*break*/, 2];
case 5: return [3 /*break*/, 8];
case 6:
e_10_1 = _b.sent();
e_10 = { error: e_10_1 };
e_11_1 = _b.sent();
e_11 = { error: e_11_1 };
return [3 /*break*/, 8];
case 7:
try {
if (rows_1_1 && !rows_1_1.done && (_a = rows_1.return)) _a.call(rows_1);
}
finally { if (e_10) throw e_10.error; }
finally { if (e_11) throw e_11.error; }
return [7 /*endfinally*/];
case 8:
// 再计算sorter
@ -1444,6 +1499,7 @@ var TreeStore = /** @class */ (function (_super) {
create: 0,
update: 0,
remove: 0,
waitList: [],
},
_a));
return [2 /*return*/, uuid];
@ -1452,8 +1508,9 @@ var TreeStore = /** @class */ (function (_super) {
};
TreeStore.prototype.commit = function (uuid) {
return tslib_1.__awaiter(this, void 0, void 0, function () {
var node, node2;
return tslib_1.__generator(this, function (_a) {
var node, node2, _a, _b, waiter;
var e_12, _c;
return tslib_1.__generator(this, function (_d) {
(0, assert_1.assert)(this.activeTxnDict.hasOwnProperty(uuid), uuid);
node = this.activeTxnDict[uuid].nodeHeader;
while (node) {
@ -1486,6 +1543,20 @@ var TreeStore = /** @class */ (function (_super) {
this.stat.remove += this.activeTxnDict[uuid].remove;
this.stat.commit++;
}
try {
// 唤起等待者
for (_a = tslib_1.__values(this.activeTxnDict[uuid].waitList), _b = _a.next(); !_b.done; _b = _a.next()) {
waiter = _b.value;
waiter.fn();
}
}
catch (e_12_1) { e_12 = { error: e_12_1 }; }
finally {
try {
if (_b && !_b.done && (_c = _a.return)) _c.call(_a);
}
finally { if (e_12) throw e_12.error; }
}
(0, lodash_1.unset)(this.activeTxnDict, uuid);
return [2 /*return*/];
});
@ -1493,8 +1564,9 @@ var TreeStore = /** @class */ (function (_super) {
};
TreeStore.prototype.rollback = function (uuid) {
return tslib_1.__awaiter(this, void 0, void 0, function () {
var node, node2;
return tslib_1.__generator(this, function (_a) {
var node, node2, _a, _b, waiter;
var e_13, _c;
return tslib_1.__generator(this, function (_d) {
(0, assert_1.assert)(this.activeTxnDict.hasOwnProperty(uuid));
node = this.activeTxnDict[uuid].nodeHeader;
while (node) {
@ -1520,6 +1592,20 @@ var TreeStore = /** @class */ (function (_super) {
}
node = node2;
}
try {
// 唤起等待者
for (_a = tslib_1.__values(this.activeTxnDict[uuid].waitList), _b = _a.next(); !_b.done; _b = _a.next()) {
waiter = _b.value;
waiter.fn();
}
}
catch (e_13_1) { e_13 = { error: e_13_1 }; }
finally {
try {
if (_b && !_b.done && (_c = _a.return)) _c.call(_a);
}
finally { if (e_13) throw e_13.error; }
}
(0, lodash_1.unset)(this.activeTxnDict, uuid);
return [2 /*return*/];
});
@ -1528,8 +1614,8 @@ var TreeStore = /** @class */ (function (_super) {
// 将输入的OpRecord同步到数据中
TreeStore.prototype.sync = function (opRecords, context) {
return tslib_1.__awaiter(this, void 0, void 0, function () {
var opRecords_1, opRecords_1_1, record, _a, e, d, d_1, d_1_1, dd, e_11_1, _b, e, d, f, _c, e, f, d, _d, _e, _i, entity, _f, _g, _h, id, e_12_1;
var e_12, _j, e_11, _k;
var opRecords_1, opRecords_1_1, record, _a, e, d, d_1, d_1_1, dd, e_14_1, _b, e, d, f, _c, e, f, d, _d, _e, _i, entity, _f, _g, _h, id, e_15_1;
var e_15, _j, e_14, _k;
return tslib_1.__generator(this, function (_l) {
switch (_l.label) {
case 0:
@ -1553,7 +1639,7 @@ var TreeStore = /** @class */ (function (_super) {
_l.label = 3;
case 3:
_l.trys.push([3, 10, 11, 12]);
d_1 = (e_11 = void 0, tslib_1.__values(d)), d_1_1 = d_1.next();
d_1 = (e_14 = void 0, tslib_1.__values(d)), d_1_1 = d_1.next();
_l.label = 4;
case 4:
if (!!d_1_1.done) return [3 /*break*/, 9];
@ -1589,14 +1675,14 @@ var TreeStore = /** @class */ (function (_super) {
return [3 /*break*/, 4];
case 9: return [3 /*break*/, 12];
case 10:
e_11_1 = _l.sent();
e_11 = { error: e_11_1 };
e_14_1 = _l.sent();
e_14 = { error: e_14_1 };
return [3 /*break*/, 12];
case 11:
try {
if (d_1_1 && !d_1_1.done && (_k = d_1.return)) _k.call(d_1);
}
finally { if (e_11) throw e_11.error; }
finally { if (e_14) throw e_14.error; }
return [7 /*endfinally*/];
case 12: return [3 /*break*/, 17];
case 13:
@ -1716,14 +1802,14 @@ var TreeStore = /** @class */ (function (_super) {
return [3 /*break*/, 1];
case 33: return [3 /*break*/, 36];
case 34:
e_12_1 = _l.sent();
e_12 = { error: e_12_1 };
e_15_1 = _l.sent();
e_15 = { error: e_15_1 };
return [3 /*break*/, 36];
case 35:
try {
if (opRecords_1_1 && !opRecords_1_1.done && (_j = opRecords_1.return)) _j.call(opRecords_1);
}
finally { if (e_12) throw e_12.error; }
finally { if (e_15) throw e_15.error; }
return [7 /*endfinally*/];
case 36: return [2 /*return*/];
}

View File

@ -18,7 +18,7 @@
"test": "cross-env TS_NODE_PROJECT='tsconfig.mocha.json' mocha",
"build": "tsc"
},
"main": "lib/index",
"main": "src/index",
"devDependencies": {
"@babel/cli": "^7.12.13",
"@babel/core": "^7.12.13",

View File

@ -17,6 +17,8 @@ import { RowStore } from 'oak-domain/lib/types/RowStore';
import { isRefAttrNode, Q_BooleanValue, Q_FullTextValue, Q_NumberValue, Q_StringValue } from 'oak-domain/lib/types/Demand';
import { judgeRelation } from 'oak-domain/lib/store/relation';
import { execOp, Expression, ExpressionConstant, isExpression, opMultipleParams } from 'oak-domain/lib/types/Expression';
import { resolve } from 'path';
import { OakDeadlock } from 'oak-domain/src/types';
interface ExprLaterCheckFn {
@ -51,6 +53,10 @@ export default class TreeStore<ED extends EntityDict & BaseEntityDict, Cxt exten
create: number;
update: number;
remove: number;
waitList: Array<{
id: string;
fn: Function;
}>;
};
};
private stat: {
@ -60,6 +66,25 @@ export default class TreeStore<ED extends EntityDict & BaseEntityDict, Cxt exten
commit: number;
};
private async waitOnTxn(id: string, context: Cxt) {
// 先检查自己的等待者中有没有id以避免死锁
const myId = context.getCurrentTxnId()!;
const { waitList: myWaitList } = this.activeTxnDict[myId];
if (myWaitList.find(
ele => ele.id === id
)) {
throw new OakDeadlock();
}
const { waitList } = this.activeTxnDict[id];
const p = new Promise(
(resolve) => waitList.push({
id: myId,
fn: resolve,
})
);
await p;
}
protected supportMultipleCreate(): boolean {
return false;
}
@ -814,8 +839,11 @@ export default class TreeStore<ED extends EntityDict & BaseEntityDict, Cxt exten
/* if (row) {
throw new OakError(RowStore.$$LEVEL, RowStore.$$CODES.primaryKeyConfilict);
} */
if (this.store[entity] && (this.store[entity]!)[id as string]) {
while (this.store[entity] && (this.store[entity]!)[id] && (this.store[entity]!)[id].$txnId) {
assert((this.store[entity]!)[id].$txnId !== context.getCurrentTxnId());
await this.waitOnTxn((this.store[entity]!)[id].$txnId!, context);
}
if (this.store[entity] && (this.store[entity]!)[id]) {
const node = this.store[entity] && (this.store[entity]!)[id as string];
throw new OakCongruentRowExists(entity as string, this.constructRow(node, context)!);
}
@ -850,35 +878,36 @@ export default class TreeStore<ED extends EntityDict & BaseEntityDict, Cxt exten
const rows = await this.selectAbjointRow(entity, selection, context);
const ids = rows.map(ele => ele.id);
ids.forEach(
(id) => {
let alreadyDirtyNode = false;
const node = (this.store[entity]!)[id as string];
assert(node);
if (!node.$txnId) {
node.$txnId = context.getCurrentTxnId()!;
}
else {
assert(node.$txnId === context.getCurrentTxnId());
alreadyDirtyNode = true;
}
if (action === 'remove') {
node.$next = null;
node.$path = `${entity as string}.${id!}`;
if (!alreadyDirtyNode) {
// 如果已经更新过的结点就不能再加了,会形成循环
this.addToTxnNode(node, context, 'remove');
}
}
else {
node.$next = Object.assign(node.$next || {}, data);
if (!alreadyDirtyNode) {
// 如果已经更新过的结点就不能再加了,会形成循环
this.addToTxnNode(node, context, 'update');
}
for (const id of ids) {
let alreadyDirtyNode = false;
const node = (this.store[entity]!)[id as string];
assert(node);
while(node.$txnId && node.$txnId !== context.getCurrentTxnId()) {
await this.waitOnTxn(node.$txnId, context);
}
if (!node.$txnId) {
node.$txnId = context.getCurrentTxnId()!;
}
else {
assert(node.$txnId === context.getCurrentTxnId());
alreadyDirtyNode = true;
}
if (action === 'remove') {
node.$next = null;
node.$path = `${entity as string}.${id!}`;
if (!alreadyDirtyNode) {
// 如果已经更新过的结点就不能再加了,会形成循环
this.addToTxnNode(node, context, 'remove');
}
}
);
else {
node.$next = Object.assign(node.$next || {}, data);
if (!alreadyDirtyNode) {
// 如果已经更新过的结点就不能再加了,会形成循环
this.addToTxnNode(node, context, 'update');
}
}
}
return rows.length;
}
@ -1129,6 +1158,7 @@ export default class TreeStore<ED extends EntityDict & BaseEntityDict, Cxt exten
create: 0,
update: 0,
remove: 0,
waitList: [],
},
});
return uuid;
@ -1167,6 +1197,10 @@ export default class TreeStore<ED extends EntityDict & BaseEntityDict, Cxt exten
this.stat.remove += this.activeTxnDict[uuid].remove;
this.stat.commit++;
}
// 唤起等待者
for (const waiter of this.activeTxnDict[uuid].waitList) {
waiter.fn();
}
unset(this.activeTxnDict, uuid);
}
@ -1197,6 +1231,10 @@ export default class TreeStore<ED extends EntityDict & BaseEntityDict, Cxt exten
}
node = node2;
}
// 唤起等待者
for (const waiter of this.activeTxnDict[uuid].waitList) {
waiter.fn();
}
unset(this.activeTxnDict, uuid);
}