oak-memory-tree-store/es/store.js

1981 lines
84 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { cloneDeep, get, groupBy, set, unset, uniqBy, uniq, differenceBy, intersectionBy, pull, pick } from 'oak-domain/lib/utils/lodash';
import { assert } from 'oak-domain/lib/utils/assert';
import { DeleteAtAttribute, CreateAtAttribute, UpdateAtAttribute } from "oak-domain/lib/types/Entity";
import { EXPRESSION_PREFIX, SUB_QUERY_PREDICATE_KEYWORD } from 'oak-domain/lib/types/Demand';
import { OakCongruentRowExists, OakException } from 'oak-domain/lib/types/Exception';
import { isRefAttrNode } from 'oak-domain/lib/types/Demand';
import { judgeRelation } from 'oak-domain/lib/store/relation';
// Expression引入下方声明改为any防止报错
import { execOp, isExpression, opMultipleParams } from 'oak-domain/lib/types/Expression';
import { SyncContext } from 'oak-domain/lib/store/SyncRowStore';
import { CascadeStore, polishSelection } from 'oak-domain/lib/store/CascadeStore';
import { SeqAttribute } from 'oak-domain/lib/types';
import { combineFilters, getRelevantIds } from 'oak-domain/lib/store/filter';
;
;
class OakExpressionUnresolvedException extends OakException {
}
function showWarningAttributeMiss(entity, attr) {
console.warn(`attribute miss: entity: ${entity}, attr: ${attr}`);
}
;
export default class TreeStore extends CascadeStore {
store;
seq;
activeTxnDict;
stat;
getNextSeq(entity) {
if (this.seq[entity]) {
const seq = this.seq[entity];
this.seq[entity]++;
return seq;
}
this.seq[entity] = 2;
return 1;
}
setMaxSeq(entity, seq) {
if (this.seq[entity]) {
if (this.seq[entity] < seq) {
this.seq[entity] = seq;
}
}
else {
this.seq[entity] = seq;
}
}
/* treeStore改成同步以后不会再出现
private async waitOnTxn(id: string, context: Cxt) {
// 先检查自己的等待者中有没有id以避免死锁
const myId = context.getCurrentTxnId()!;
const { waitList: myWaitList } = this.activeTxnDict[myId];
if (myWaitList.find(
ele => ele.id === id
)) {
throw new OakDeadlock();
}
const { waitList } = this.activeTxnDict[id];
const p = new Promise(
(resolve) => waitList.push({
id: myId,
fn: resolve,
})
);
await p;
} */
supportMultipleCreate() {
return false;
}
supportManyToOneJoin() {
return false;
}
resetInitialData(data, stat) {
this.store = {};
const now = Date.now();
const schema = this.getSchema();
for (const entity in data) {
if (!schema[entity]) {
throw new Error(`reset unknown entity:[${entity}]`);
}
const { attributes } = schema[entity];
this.store[entity] = {};
for (const row of data[entity]) {
for (const key in attributes) {
if (row[key] === undefined) {
Object.assign(row, {
[key]: null,
});
}
}
/**
* 处理初始化数据
*/
if (!row[CreateAtAttribute]) {
Object.assign(row, {
[CreateAtAttribute]: now,
});
}
if (!row[DeleteAtAttribute]) {
Object.assign(row, {
[DeleteAtAttribute]: null,
});
}
if (!row[UpdateAtAttribute]) {
Object.assign(row, {
[UpdateAtAttribute]: now,
});
}
if (!row[SeqAttribute]) {
const seq = this.getNextSeq(entity);
Object.assign(row, {
[SeqAttribute]: seq,
});
}
else {
this.setMaxSeq(entity, row[SeqAttribute] + 1);
}
assert(row.id && !row.id.includes('.'));
set(this.store, `${entity}.${row.id}.$current`, row);
}
}
if (stat) {
this.stat = stat;
}
}
getCurrentData(keys) {
const result = {};
for (const entity in this.store) {
if (keys && !keys.includes(entity)) {
continue;
}
result[entity] = [];
for (const rowId in this.store[entity]) {
result[entity]?.push(this.store[entity][rowId]['$current']);
}
}
return result;
}
constructor(storageSchema) {
super(storageSchema);
this.store = {};
this.activeTxnDict = {};
this.stat = {
create: 0,
update: 0,
remove: 0,
commit: 0,
};
this.seq = {};
}
constructRow(node, context, option) {
if (context.getCurrentTxnId() && node.$txnId === context.getCurrentTxnId()) {
if (!node.$next) {
// 如果要求返回delete数据返回带$$deleteAt$$的行
// bug fixed这里如果是自己create再删除data也是null
if (node.$current && option?.includedDeleted) {
return Object.assign({}, node.$current, {
[DeleteAtAttribute]: 1,
});
}
return null;
}
else if (!node.$current) {
// 本事务创建的若在cache中$$createAt$$和$$updateAt$$置为1
return Object.assign({}, node.$current, node.$next, context instanceof SyncContext && {
[CreateAtAttribute]: 1,
[UpdateAtAttribute]: 1,
});
}
else {
// 本事务更新的若在cache中$$updateAt$$置为1
return Object.assign({}, node.$current, node.$next, context instanceof SyncContext && {
[UpdateAtAttribute]: 1,
});
}
}
return {
...node.$current,
};
}
testFilterFns(node, nodeDict, exprResolveFns, fns) {
const { self, otm, mto } = fns;
// 三种filterFn是and的关系有一个失败就返回false优先判断顺序self -> mto -> otm
for (const f of self) {
if (!f(node, nodeDict, exprResolveFns)) {
return false;
}
}
for (const f of mto) {
if (!f(node, nodeDict, exprResolveFns)) {
return false;
}
}
for (const f of otm) {
if (!f(node, nodeDict, exprResolveFns)) {
return false;
}
}
return true;
}
translateLogicFilter(entity, projection, filter, attr, context, option) {
const self = [];
const otm = [];
const mto = [];
switch (attr) {
case '$and': {
const filters = filter[attr];
const fns = filters.map((ele) => this.translateFilterInner(entity, projection, ele, context, option));
self.push(...(fns.map(ele => ele.self).flat()));
otm.push(...(fns.map(ele => ele.otm).flat()));
mto.push(...(fns.map(ele => ele.mto).flat()));
break;
}
case '$or': {
const filters = filter[attr];
const fns = filters.map((ele) => this.translateFilterInner(entity, projection, ele, context, option));
/**
* 对于or的情况按最坏的一种判定来计算同时对所有的判定也可以排序先计算代价最轻的
*/
fns.sort((ele1, ele2) => {
if (ele2.mto.length > 0) {
return -1;
}
else if (ele1.mto.length > 0) {
return 1;
}
else if (ele2.otm.length > 0) {
return -1;
}
else if (ele1.otm.length > 0) {
return 1;
}
return 0;
});
const fn = (node, nodeDict, exprResolveFns) => {
for (const fn of fns) {
if (this.testFilterFns(node, nodeDict, exprResolveFns, fn)) {
return true;
}
}
return false;
};
const last = fns[fns.length - 1];
if (last.mto.length > 0) {
mto.push(fn);
}
else if (last.otm.length > 0) {
otm.push(fn);
}
else {
self.push(fn);
}
break;
}
case '$not': {
const filter2 = filter[attr];
const filterFn = this.translateFilterInner(entity, projection, filter2, context, option);
const fn = (node, nodeDict, exprResolveFns) => {
if (this.testFilterFns(node, nodeDict, exprResolveFns, filterFn)) {
return false;
}
return true;
};
if (filterFn.otm.length > 0) {
otm.push(fn);
}
else if (filterFn.mto.length > 0) {
mto.push(fn);
}
else {
self.push(fn);
}
break;
}
default: {
assert(false, `${attr}算子暂不支持`);
}
}
return {
self,
otm,
mto,
};
}
/**
* 对表达式中某个结点的翻译,有三种情况:
* 1、结点是一个表达式此时递归翻译其子结点
* 2、结点是一个常量直接返回
* 3、结点引用了某个属性此时返回一个函数ExprNodeTranslator该函数在实际执行时对某行进行处理又可能有两种case
* 3.1、得到结果,此时返回结果的值(常量)
* 3.2、还欠缺某些外部结点的值才能得到结果此时返回一个函数ExprLaterCheckFn此函数可以在执行中获得更多结点之后再调用并得到结果的值
* @param entity
* @param expression
* @param context
* @returns
*/
translateExpressionNode(entity,
// expression: Expression<keyof ED[T]['Schema']> | RefAttr<keyof ED[T]['Schema']> | ExpressionConstant,
expression, context, option) {
if (isExpression(expression)) {
const op = Object.keys(expression)[0];
const option2 = expression[op];
if (opMultipleParams(op)) {
const paramsTranslated = option2.map(
// const paramsTranslated = (option2 as (Expression<keyof ED[T]['Schema']> | RefAttr<keyof ED[T]['Schema']>)[]).map(
ele => this.translateExpressionNode(entity, ele, context, option2));
return (row, nodeDict) => {
let later = false;
let results = paramsTranslated.map((ele) => {
if (typeof ele === 'function') {
const r = ele(row, nodeDict);
if (typeof r === 'function') {
later = true;
}
return r;
}
return ele;
});
if (!later) {
return execOp(op, results);
}
const laterCheckFn = (nodeDict2) => {
results = results.map((ele) => {
if (typeof ele === 'function') {
const r = ele(nodeDict2);
return r;
}
return ele;
});
if (results.find(ele => typeof ele === 'function')) {
return laterCheckFn;
}
return execOp(op, results);
};
return laterCheckFn;
};
}
else {
const paramsTranslated = this.translateExpressionNode(entity, option2, context, option2);
if (typeof paramsTranslated === 'function') {
return (row, nodeDict) => {
let result = paramsTranslated(row, nodeDict);
if (typeof result === 'function') {
const laterCheckFn = (nodeDict2) => {
result = result(nodeDict2);
if (typeof result === 'function') {
return laterCheckFn;
}
return result;
};
return laterCheckFn;
}
return execOp(op, result);
};
}
else {
return () => {
return execOp(op, paramsTranslated);
};
}
}
}
else if (isRefAttrNode(expression)) {
// 是RefAttr结点
return (row, nodeDict) => {
if (expression.hasOwnProperty('#attr')) {
// 说明是本结点的属性;
const attr = row[expression['#attr']];
if (attr === undefined && option?.warnWhenAttributeMiss) {
showWarningAttributeMiss(entity, expression['#attr']);
}
return attr;
}
else {
assert(expression.hasOwnProperty('#refId'));
const { ['#refId']: refId, ['#refAttr']: refAttr } = expression;
if (nodeDict.hasOwnProperty(refId)) {
return nodeDict[refId][refAttr];
}
// 引用的结点还没有取到,此时需要在未来的某个时刻再检查
const laterCheckFn = (nodeDict2) => {
if (nodeDict2.hasOwnProperty(refId)) {
return nodeDict2[refId][refAttr];
}
return laterCheckFn;
};
return laterCheckFn;
}
};
}
else {
// 是常量结点
return expression;
}
}
translateExpression(entity,
// expression: Expression<keyof ED[T]['Schema']>,
expression, context, option) {
const expr = this.translateExpressionNode(entity, expression, context, option);
return (row, nodeDict) => {
if (typeof expr !== 'function') {
return expr;
}
const result = expr(row, nodeDict);
return result;
};
}
translateFulltext(entity, filter, context, option) {
// 全文索引查找
const { [entity]: { indexes } } = this.getSchema();
const fulltextIndex = indexes.find(ele => ele.config && ele.config.type === 'fulltext');
const { attributes } = fulltextIndex;
const { $search } = filter;
return (node) => {
const row = this.constructRow(node, context, option);
if (row) {
for (const attr of attributes) {
const { name } = attr;
const value = row[name];
if (value === undefined && option?.warnWhenAttributeMiss) {
showWarningAttributeMiss(entity, name);
}
if (typeof value === 'string' && row[name].includes($search)) {
return true;
}
}
}
return false;
};
}
translatePredicate(entity, path, predicate, value, option) {
switch (predicate) {
case '$gt': {
return (row) => {
// JsonFilter有可能是根结点path为空
const data = path ? get(row, path) : row;
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
return ['number', 'string'].includes(typeof data) && data > value;
};
}
case '$lt': {
return (row) => {
// JsonFilter有可能是根结点path为空
const data = path ? get(row, path) : row;
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
return ['number', 'string'].includes(typeof data) && data < value;
};
}
case '$gte': {
return (row) => {
// JsonFilter有可能是根结点path为空
const data = path ? get(row, path) : row;
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
return ['number', 'string'].includes(typeof data) && data >= value;
};
}
case '$lte': {
return (row) => {
// JsonFilter有可能是根结点path为空
const data = path ? get(row, path) : row;
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
return ['number', 'string'].includes(typeof data) && data <= value;
};
}
case '$eq': {
return (row) => {
// JsonFilter有可能是根结点path为空
const data = path ? get(row, path) : row;
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
return ['number', 'string'].includes(typeof data) && data === value;
};
}
case '$ne': {
return (row) => {
// JsonFilter有可能是根结点path为空
const data = path ? get(row, path) : row;
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
return ['number', 'string'].includes(typeof data) && data !== value;
};
}
case '$between': {
return (row) => {
// JsonFilter有可能是根结点path为空
const data = path ? get(row, path) : row;
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
return ['number', 'string'].includes(typeof data) && data >= value[0] && data <= value[1];
};
}
case '$mod': {
return (row) => {
// JsonFilter有可能是根结点path为空
const data = path ? get(row, path) : row;
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
return typeof data === 'number' && data % value[0] === value[1];
};
}
case '$startsWith': {
return (row) => {
// JsonFilter有可能是根结点path为空
const data = path ? get(row, path) : row;
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
return ['string'].includes(typeof data) && data.startsWith(value);
};
}
case '$endsWith': {
return (row) => {
// JsonFilter有可能是根结点path为空
const data = path ? get(row, path) : row;
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
return ['string'].includes(typeof data) && data.endsWith(value);
};
}
case '$includes': {
return (row) => {
// JsonFilter有可能是根结点path为空
const data = path ? get(row, path) : row;
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
return ['string'].includes(typeof data) && data.includes(value);
};
}
case '$exists': {
assert(typeof value === 'boolean');
return (row) => {
// JsonFilter有可能是根结点path为空
const data = path ? get(row, path) : row;
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
if (value) {
return ![null, undefined].includes(data);
}
else {
return [null, undefined].includes(data);
}
};
}
case '$in': {
assert(value instanceof Array);
return (row) => {
const data = get(row, path);
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
return value.includes(data);
};
}
case '$nin': {
assert(value instanceof Array);
return (row) => {
const data = get(row, path);
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
return !value.includes(data);
};
}
case '$contains': {
// json中的多值查询
const array = value instanceof Array ? value : [value];
return (row) => {
const data = path ? get(row, path) : row;
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
return differenceBy(array, data, (value) => {
if (typeof value === 'object') {
return JSON.stringify(value);
}
return value;
}).length === 0;
};
}
case '$overlaps': {
// json中的多值查询
const array = value instanceof Array ? value : [value];
return (row) => {
const data = path ? get(row, path) : row;
data === undefined && option?.warnWhenAttributeMiss && showWarningAttributeMiss(entity, path);
return intersectionBy(array, data, (value) => {
if (typeof value === 'object') {
return JSON.stringify(value);
}
return value;
}).length > 0;
};
}
case '$length': {
// json中的数组长度查询
const length = value;
return (row) => {
const data = path ? get(row, path) : row;
assert(data instanceof Array, '$length operator can only used on array attribute');
if (typeof length === 'number') {
return data.length === length;
}
else {
const op = Object.keys(length)[0];
return this.translatePredicate(entity, 'length', op, length[op], option)(data);
}
};
}
default: {
throw new Error(`predicate ${predicate} is not recoganized`);
}
}
}
translateObjectPredicate(entity, filter) {
const fns = [];
const translatePredicateInner = (p, path, fns2) => {
if (p instanceof Array) {
p.forEach((ele, idx) => {
const path2 = `${path}[${idx}]`;
if (typeof ele !== 'object') {
if (![null, undefined].includes(ele)) {
fns2.push(this.translatePredicate(entity, path2, '$eq', ele));
}
}
else {
translatePredicateInner(ele, path2, fns2);
}
});
}
else {
for (const attr in p) {
if (attr === '$and') {
p[attr].forEach((p2) => translatePredicateInner(p2, path, fns2));
}
else if (attr === '$or') {
const fnsOr = [];
p[attr].forEach((p2) => translatePredicateInner(p2, path, fnsOr));
fns2.push((value) => {
for (const fnOr of fnsOr) {
if (fnOr(value)) {
return true;
}
}
return false;
});
}
else if (attr.startsWith('$')) {
assert(Object.keys(p).length === 1);
fns2.push(this.translatePredicate(entity, path, attr, p[attr]));
}
else {
const attr2 = attr.startsWith('.') ? attr.slice(1) : attr;
const path2 = path ? `${path}.${attr2}` : attr2;
if (typeof p[attr] !== 'object') {
fns2.push(this.translatePredicate(entity, path2, '$eq', p[attr]));
}
else {
translatePredicateInner(p[attr], path2, fns2);
}
}
}
}
};
translatePredicateInner(filter, '', fns);
return (value) => {
for (const fn of fns) {
if (!fn(value)) {
return false;
}
}
return true;
};
}
translateAttribute(entity, filter, attr, context, option) {
if (!['object', 'array'].includes(this.getSchema()[entity].attributes[attr]?.type)) {
if (typeof filter !== 'object') {
return (node) => {
const row = this.constructRow(node, context, option);
if (row) {
const value = row[attr];
if (value === undefined && option?.warnWhenAttributeMiss) {
showWarningAttributeMiss(entity, attr);
}
return value === filter;
}
return false;
};
}
else {
const predicate = Object.keys(filter)[0];
assert(Object.keys(filter).length === 1 && predicate.startsWith('$'));
if (['$in', '$nin'].includes(predicate) && !(filter[predicate] instanceof Array)) {
throw new Error('子查询已经改用一对多的外键连接方式');
}
else {
const fn = this.translatePredicate(entity, attr, predicate, filter[predicate], option);
return (node) => {
const row = this.constructRow(node, context, option);
if (!row) {
return false;
}
return fn(row);
};
}
}
}
else {
// 对象的内部查询
if (typeof filter !== 'object') {
// 支持filter全值相等的查询方式
assert(typeof filter === 'string');
return (node) => {
const row = this.constructRow(node, context, option);
if (!row) {
return false;
}
return row.hasOwnProperty(attr) && JSON.stringify(row[attr]) === filter;
};
}
else {
const fn = this.translateObjectPredicate(entity, filter);
return (node) => {
const row = this.constructRow(node, context, option);
if (!row) {
return false;
}
return fn(row[attr]);
};
}
}
}
translateFilterInner(entity, projection, filter, context, option) {
const self = [];
const otm = [];
const mto = [];
let nodeId;
for (const attr in filter) {
if (attr === '#id') {
nodeId = filter['#id'];
}
else if (['$and', '$or', '$xor', '$not'].includes(attr)) {
const filterFns = this.translateLogicFilter(entity, projection, filter, attr, context, option);
self.push(...(filterFns.self));
otm.push(...(filterFns.otm));
mto.push(...(filterFns.mto));
}
else if (attr.toLowerCase().startsWith(EXPRESSION_PREFIX)) {
const fn = this.translateExpression(entity, filter[attr], context, option);
// expression上先假设大都是只查询自身和外层的属性不一定对。by Xc 20230824
self.push((node, nodeDict, exprResolveFns) => {
const row = this.constructRow(node, context, option);
if (!row) {
return false;
}
const result = fn(row, nodeDict);
if (typeof result === 'function') {
exprResolveFns.push(result);
}
return !!result;
});
}
else if (attr.toLowerCase() === '$text') {
self.push(this.translateFulltext(entity, filter[attr], context, option));
}
else {
// 属性级过滤
const relation = judgeRelation(this.getSchema(), entity, attr);
if (relation === 1) {
// 行本身的属性
self.push(this.translateAttribute(entity, filter[attr], attr, context, option));
}
else if (relation === 2) {
// 基于entity/entityId的指针
const filterFn = this.translateFilter(attr, projection[attr] || {}, filter[attr], context, option);
mto.push((node, nodeDict, exprResolveFns) => {
const row = this.constructRow(node, context, option);
if (!row) {
return false;
}
if (row.entityId === undefined || row.entity === undefined) {
if (option?.warnWhenAttributeMiss) {
showWarningAttributeMiss(entity, 'entity/entityId');
}
return false; // 若不能确定,认定为条件不满足
}
if (row.entity !== attr) {
return false;
}
if (row.entityId === null) {
return false;
}
const node2 = get(this.store, `${attr}.${row.entityId}`);
if (!node2) {
return false;
}
return filterFn(node2, nodeDict, exprResolveFns);
});
}
else if (typeof relation === 'string') {
// 只能是基于普通属性的外键
const filterFn = this.translateFilter(relation, projection[attr] || {}, filter[attr], context, option);
mto.push((node, nodeDict, exprResolveFns) => {
const row = this.constructRow(node, context, option);
if (!row) {
return false;
}
if (row[`${attr}Id`]) {
const node2 = get(this.store, `${relation}.${row[`${attr}Id`]}`);
if (!node2) {
return false;
}
return filterFn(node2, nodeDict, exprResolveFns);
}
if (row[`${attr}Id`] === undefined) {
if (option?.warnWhenAttributeMiss) {
showWarningAttributeMiss(entity, `${attr}Id`);
}
}
return false;
});
}
else if (relation instanceof Array) {
// 一对多的子查询
const [otmEntity, otmForeignKey] = relation;
const predicate = filter[attr][SUB_QUERY_PREDICATE_KEYWORD] || 'in';
const fk = otmForeignKey || 'entityId';
const otmProjection = {
id: 1,
[fk]: 1,
};
/**
* in代表外键连接后至少有一行数据
* not in代表外键连接后一行也不能有
* all代表反连接条件的一行也不能有符合的是否至少要有一行直觉上没这个限制
* not all 代表反连接条件的至少有一行
*
* 此时还没有确定父行只有查询中明确带有id的查询可以先执行否则不执行暂先这个逻辑 by Xc 20230725
*/
const makeAfterLogic = () => {
otm.push((node, nodeDict) => {
const row = this.constructRow(node, context, option);
if (!row) {
return false;
}
/**
* in代表外键连接后至少有一行数据
* not in代表外键连接后一行也不能有
* all代表反连接条件的一行也不能有符合的是否至少要有一行直觉上没这个限制
* not all 代表反连接条件的至少有一行
*/
const otmOriginFilter = !otmForeignKey ? Object.assign({
entity,
}, cloneDeep(filter[attr])) : cloneDeep(filter[attr]);
const otmFilter = ['not in', 'in'].includes(predicate) ? combineFilters(otmEntity, this.getSchema(), [
otmOriginFilter, {
[fk]: row.id,
}
]) : {
$not: otmOriginFilter,
[fk]: row.id,
};
const subQuerySet = (this.selectAbjointRow(otmEntity, {
data: otmProjection,
filter: otmFilter,
indexFrom: 0,
count: 1,
}, context, {
...option,
nodeDict,
dontCollect: true,
warnWhenAttributeMiss: false, // 一对多连接不必要考虑这个属性缺失
})).map((ele) => {
return (ele)[fk];
});
switch (predicate) {
case 'in':
case 'not all': {
return subQuerySet.length > 0;
}
case 'not in':
case 'all': {
return subQuerySet.length === 0;
}
default: {
throw new Error(`illegal sqp: ${predicate}`);
}
}
});
};
if (filter.id && typeof filter.id === 'string') {
const otmOriginFilter = !otmForeignKey ? Object.assign({
entity,
}, cloneDeep(filter[attr])) : cloneDeep(filter[attr]);
const otmFilter = ['not in', 'in'].includes(predicate) ? combineFilters(otmEntity, this.getSchema(), [
otmOriginFilter, {
[fk]: filter.id,
}
]) : {
$not: otmOriginFilter,
[fk]: filter.id,
};
try {
const subQuerySet = (this.selectAbjointRow(otmEntity, {
data: otmProjection,
filter: otmFilter,
indexFrom: 0,
count: 1,
}, context, {
...option,
dontCollect: true,
warnWhenAttributeMiss: false, // 一对多连接不必要考虑这个属性缺失
})).map((ele) => {
return (ele)[fk];
});
self.push((node) => {
const row = this.constructRow(node, context, option);
if (!row) {
return false;
}
switch (predicate) {
case 'in':
case 'not all': {
return subQuerySet.length > 0;
}
case 'not in':
case 'all': {
return subQuerySet.length === 0;
}
default: {
throw new Error(`illegal sqp: ${predicate}`);
}
}
});
}
catch (err) {
if (err instanceof OakExpressionUnresolvedException) {
makeAfterLogic();
}
else {
throw err;
}
}
}
else if (!option?.disableSubQueryHashjoin) {
/**
* 尝试用hashjoin将内表数组取出因为memory中的表都不会太大且用不了索引未来优化了可能可以用id直接取值因而用hash应当会更快
*/
try {
const subQueryRows = this.selectAbjointRow(otmEntity, {
data: otmProjection,
filter: filter[attr],
}, context, {
...option,
dontCollect: true,
warnWhenAttributeMiss: false, // 一对多连接不必要考虑这个属性缺失
});
const buckets = groupBy(subQueryRows, fk);
otm.push((node, nodeDict) => {
const row = this.constructRow(node, context, option);
if (!row) {
return false;
}
switch (predicate) {
case 'in': {
return (buckets[row.id]?.length > 0);
}
case 'not in': {
return (!buckets[row.id] || buckets[row.id].length === 0);
}
case 'all': {
return (buckets[row.id]?.length > 0 && Object.keys(buckets).length === 1);
}
case 'not all': {
return Object.keys(buckets).length > 1 || !buckets.hasOwnProperty(row.id);
}
default: {
assert(false, `unrecoganized sqp operator: ${predicate}`);
}
}
});
}
catch (err) {
if (err instanceof OakExpressionUnresolvedException) {
makeAfterLogic();
}
else {
throw err;
}
}
}
else {
makeAfterLogic();
}
}
else {
// metadata
assert(relation === 0);
}
}
}
return {
self,
otm,
mto,
nodeId,
};
}
translateFilter(entity, projection, filter, context, option) {
const filterFns = this.translateFilterInner(entity, projection, filter, context, option);
const { nodeId } = filterFns;
return (node, nodeDict, exprResolveFns) => {
if (nodeId) {
assert(!nodeDict.hasOwnProperty(nodeId), `Filter中的nodeId「${nodeId}」出现了多次`);
Object.assign(nodeDict, {
[nodeId]: this.constructRow(node, context, option),
});
}
return this.testFilterFns(node, nodeDict, exprResolveFns, filterFns);
};
}
translateSorter(entity, sorter, context, option) {
const compare = (row1, row2, entity2, sortAttr, direction) => {
const row11 = row1;
const row22 = row2;
assert(Object.keys(sortAttr).length === 1);
const attr = Object.keys(sortAttr)[0];
const relation = judgeRelation(this.getSchema(), entity2, attr);
if (relation === 1 || relation === 0) {
const getAttrOrExprValue = (r) => {
if (sortAttr[attr] === 1) {
return r[attr];
}
else {
// 改变策略让所有需要获得的值在projection上取得
assert(typeof sortAttr[attr] === 'string' && sortAttr[attr].startsWith('$expr'));
return r[sortAttr[attr]];
}
};
const v1 = row1 && getAttrOrExprValue(row11);
const v2 = row2 && getAttrOrExprValue(row22);
if ([null, undefined].includes(v1) || [null, undefined].includes(v2)) {
if ([null, undefined].includes(v1) && [null, undefined].includes(v2)) {
return 0;
}
if ([null, undefined].includes(v1)) {
if (direction === 'asc') {
return -1;
}
return 1;
}
if (direction === 'desc') {
return 1;
}
return -1;
}
const attrDef = this.getSchema()[entity2].attributes[attr];
// 处理enum现在enum是按定义enum的顺序从小到大排列
if (attrDef?.type === 'enum') {
const enums = attrDef.enumeration;
const i1 = enums.indexOf(v1);
const i2 = enums.indexOf(v2);
assert(i1 >= 0 && i2 >= 0);
return direction === 'asc' ? i1 - i2 : i2 - i1;
}
else {
// createAt为1时被认为是最大的新建
if (['$$createAt$$', '$$updateAt$$'].includes(attr)) {
if (v1 === 1) {
return direction === 'asc' ? 1 : -1;
}
else if (v2 === 1) {
return direction === 'asc' ? -1 : 1;
}
}
if (v1 > v2) {
if (direction === 'desc') {
return -1;
}
else {
return 1;
}
}
else if (v1 < v2) {
if (direction === 'desc') {
return 1;
}
else {
return -1;
}
}
else {
return 0;
}
}
}
else {
if (relation === 2) {
assert(row11['entity'] === row22['entity']);
assert(row11.entity === attr);
const node1 = this.store[row11.entity] && this.store[row11.entity][row11.entityId];
const node2 = this.store[row22.entity] && this.store[row22.entity][row22.entityId];
const row111 = node1 && this.constructRow(node1, context, option);
const row222 = node2 && this.constructRow(node2, context, option);
return compare(row111, row222, row11['entity'], sortAttr[attr], direction);
}
else {
assert(typeof relation === 'string');
const node1 = this.store[relation] && this.store[relation][row11[`${attr}Id`]];
const node2 = this.store[relation] && this.store[relation][row22[`${attr}Id`]];
const row111 = node1 && this.constructRow(node1, context, option);
const row222 = node2 && this.constructRow(node2, context, option);
return compare(row111, row222, relation, sortAttr[attr], direction);
}
}
};
return (row1, row2) => {
for (const sorterElement of sorter) {
const { $attr, $direction } = sorterElement;
const result = compare(row1, row2, entity, $attr, $direction);
if (result !== 0) {
return result;
}
}
return 0;
};
}
/**
* 目标行如果有id过滤条件可直接取
* @param entity
* @param selection
* @returns
*/
getEntityNodes(entity, selection, context) {
const { filter } = selection;
const ids = getRelevantIds(filter);
if (this.store[entity]) {
if (ids.length > 0) {
const entityNodes = pick(this.store[entity], ids);
return Object.values(entityNodes);
}
return Object.values(this.store[entity]);
}
return [];
}
selectAbjointRow(entity, selection, context, option) {
const { data, filter } = selection;
const nodeDict = option?.nodeDict;
const filterFn = filter && this.translateFilter(entity, data, filter, context, option);
const entityNodes = this.getEntityNodes(entity, selection, context);
const nodes = [];
for (const n of entityNodes) {
if (n.$txnId && n.$txnId !== context.getCurrentTxnId() && n.$current === null) {
continue;
}
assert(!n.$txnId || n.$txnId === context.getCurrentTxnId());
const exprResolveFns = [];
const nodeDict2 = {};
if (nodeDict) {
Object.assign(nodeDict2, nodeDict);
}
// 如果没有filterFn要保证行不为null(本事务remove的case)
if (filterFn ? filterFn(n, nodeDict2, exprResolveFns) : this.constructRow(n, context, option)) {
// 如果有延时处理的expression在这里加以判断此时所有在filter中的node应该都已经加以遍历了
let exprResult = true;
if (exprResolveFns.length > 0) {
for (const fn of exprResolveFns) {
const result = fn(nodeDict2);
if (typeof result === 'function') {
throw new OakExpressionUnresolvedException();
}
if (!!!result) {
exprResult = false;
break;
}
}
}
if (exprResult) {
nodes.push(n);
}
}
}
const rows = nodes.map((node) => this.constructRow(node, context, option));
const rows2 = this.formResult(entity, rows, selection, context, option);
return rows2;
}
updateAbjointRow(entity, operation, context, option) {
const { data, action, id: operId } = operation;
switch (action) {
case 'create': {
const { id } = data;
assert(id);
// const node = this.store[entity] && (this.store[entity]!)[id as string];
// const row = node && this.constructRow(node, context) || {};
/* if (row) {
throw new OakError(RowStore.$$LEVEL, RowStore.$$CODES.primaryKeyConfilict);
} */
if (this.store[entity] && (this.store[entity])[id]) {
const node = this.store[entity] && (this.store[entity])[id];
throw new OakCongruentRowExists(entity, this.constructRow(node, context, option));
}
if (!data.$$seq$$) {
const seq = this.getNextSeq(entity);
data.$$seq$$ = seq;
}
const node2 = {
$txnId: context.getCurrentTxnId(),
$current: null,
$next: data,
$path: `${entity}.${id}`,
};
if (!this.store[entity]) {
this.store[entity] = {};
}
set(this.store, `${entity}.${id}`, node2);
this.addToTxnNode(node2, context, 'create', data[CreateAtAttribute]);
return 1;
}
default: {
const selection = {
data: {
id: 1,
},
filter: operation.filter,
indexFrom: operation.indexFrom,
count: operation.count,
};
const rows = this.selectAbjointRow(entity, selection, context, { dontCollect: true });
const ids = rows.map(ele => ele.id);
for (const id of ids) {
let alreadyDirtyNode = false;
const node = (this.store[entity])[id];
assert(node && (!node.$txnId || node.$txnId == context.getCurrentTxnId()));
if (!node.$txnId) {
node.$txnId = context.getCurrentTxnId();
}
else {
assert(node.$txnId === context.getCurrentTxnId());
alreadyDirtyNode = true;
}
node.$path = `${entity}.${id}`;
if (action === 'remove') {
node.$next = null;
if (!alreadyDirtyNode) {
// 如果已经更新过的结点就不能再加了,会形成循环
this.addToTxnNode(node, context, 'remove', data[DeleteAtAttribute]);
}
}
else {
node.$next = Object.assign(node.$next || {}, data);
if (!alreadyDirtyNode) {
// 如果已经更新过的结点就不能再加了,会形成循环
this.addToTxnNode(node, context, 'update', data[UpdateAtAttribute]);
}
}
}
return rows.length;
}
}
}
async selectAbjointRowAsync(entity, selection, context, option) {
return this.selectAbjointRow(entity, selection, context, option);
}
async updateAbjointRowAsync(entity, operation, context, option) {
return this.updateAbjointRow(entity, operation, context, option);
}
operateSync(entity, operation, context, option) {
assert(context.getCurrentTxnId());
return super.operateSync(entity, operation, context, option);
}
async operateAsync(entity, operation, context, option) {
assert(context.getCurrentTxnId());
return super.operateAsync(entity, operation, context, option);
}
/**
* 计算最终结果集当中的函数,这个函数可能测试不够充分
* @param entity
* @param projection
* @param data
* @param nodeDict
* @param context
*/
formExprInResult(entity, projection, data, nodeDict, context) {
const laterExprDict = {};
for (const attr in projection) {
if (attr.startsWith(EXPRESSION_PREFIX)) {
const ExprNodeTranslator = this.translateExpression(entity, projection[attr], context, {});
const exprResult = ExprNodeTranslator(data, nodeDict);
if (typeof exprResult === 'function') {
Object.assign(laterExprDict, {
[attr]: exprResult,
});
}
else {
Object.assign(data, {
[attr]: exprResult,
});
}
}
else if (attr === '#id') {
const nodeId = projection[attr];
assert(!nodeDict.hasOwnProperty(nodeId), `Filter中的nodeId「${nodeId}」出现了多次`);
Object.assign(nodeDict, {
[nodeId]: data,
});
}
}
for (const attr in projection) {
const rel = this.judgeRelation(entity, attr);
if (rel === 1) {
}
else if (rel === 2) {
if (data[attr]) {
this.formExprInResult(attr, projection[attr], data[attr], nodeDict, context);
}
}
else if (typeof rel === 'string') {
if (data[attr]) {
const result2 = {};
this.formExprInResult(rel, projection[attr], data[attr], nodeDict, context);
}
}
else if (rel instanceof Array) {
if (!attr.endsWith('$$aggr')) {
if (data[attr] && data[attr] instanceof Array) {
data[attr].map((ele) => this.formExprInResult(rel[0], projection[attr].data, ele, nodeDict, context));
}
}
}
}
for (const attr in laterExprDict) {
const exprResult = laterExprDict[attr](nodeDict);
// projection是不应出现计算不出来的情况
assert(typeof exprResult !== 'function', 'data中的expr无法计算请检查命名与引用的一致性');
Object.assign(data, {
[attr]: exprResult,
});
}
}
formResult(entity, rows, selection, context, option) {
const { data, sorter, indexFrom, count } = selection;
const findAvailableExprName = (current) => {
let counter = 1;
while (counter < 20) {
const exprName = `$expr${counter++}`;
if (!current.includes(exprName)) {
return exprName;
}
}
assert(false, '找不到可用的expr命名');
};
const sortToProjection = (entity2, proj, sort) => {
Object.keys(sort).forEach((attr) => {
// 要把sorter中的expr运算提到这里做掉否则异步运算无法排序
if (attr.startsWith('$expr') && typeof sort[attr] === 'object') {
const attrName = findAvailableExprName(Object.keys(proj));
Object.assign(proj, {
[attrName]: sort[attr],
});
Object.assign(sort, {
[attr]: attrName,
});
}
const rel = judgeRelation(this.getSchema(), entity2, attr);
if (rel === 2 || typeof rel === 'string') {
if (!proj[attr]) {
Object.assign(proj, {
[attr]: {},
});
}
const entity3 = typeof rel === 'string' ? rel : attr;
sortToProjection(entity3, proj[attr], sort[attr]);
}
else if (rel === 1) {
Object.assign(proj, {
[attr]: 1,
});
}
});
};
if (sorter) {
sorter.forEach((ele) => {
sortToProjection(entity, data, ele.$attr);
});
}
// 先计算projectionformResult只处理abjoint的行不需要考虑expression和一对多多对一关系
let rows2 = [];
const { data: projection } = selection;
for (const row of rows) {
const result = {};
for (const attr in projection) {
const rel = this.judgeRelation(entity, attr);
if (rel === 1) {
if (row[attr] === undefined) {
if (process.env.NODE_ENV === 'development') {
console.warn(`对象${entity}上的属性${attr}缺失,可能会影响上层使用结果,请确定`);
}
// break;
}
else if (typeof projection[attr] === 'number') {
Object.assign(result, {
[attr]: row[attr],
});
}
else {
// object数据的深层次select
Object.assign(result, {
[attr]: {},
});
const assignIner = (dest, proj, source) => {
if (proj instanceof Array) {
assert(dest instanceof Array);
assert(source instanceof Array);
proj.forEach((attr, idx) => {
if (typeof attr === 'number') {
dest[idx] = source[idx];
}
else if (typeof attr === 'object') {
dest[idx] = {};
assignIner(dest[idx], attr, source[idx]);
}
});
}
else {
for (const attr in proj) {
if (typeof proj[attr] === 'number') {
dest[attr] = source[attr];
}
else if (typeof proj[attr] === 'object') {
dest[attr] = proj[attr] instanceof Array ? [] : {};
assignIner(dest[attr], proj[attr], source[attr]);
}
}
}
};
assignIner(result[attr], projection[attr], row[attr]);
}
}
}
// 这三个属性在前台cache中可能表达特殊语义的需要返回
if (!selection.distinct) {
if (row[DeleteAtAttribute]) {
Object.assign(result, {
[DeleteAtAttribute]: row[DeleteAtAttribute],
});
}
if (row[UpdateAtAttribute]) {
Object.assign(result, {
[UpdateAtAttribute]: row[UpdateAtAttribute],
});
}
if (row[CreateAtAttribute]) {
Object.assign(result, {
[CreateAtAttribute]: row[CreateAtAttribute],
});
}
}
rows2.push(result);
}
// 再计算sorter
if (sorter) {
const sorterFn = this.translateSorter(entity, sorter, context, option);
rows2.sort(sorterFn);
}
// 用indexFrom和count来截断
if (typeof indexFrom === 'number') {
rows2 = rows2.slice(indexFrom, indexFrom + count);
}
// 如果有distinct再计算distinct
if (selection.distinct) {
rows2 = uniqBy(rows2, (ele) => JSON.stringify(ele));
}
return rows2;
}
/**
* 本函数把结果中的相应属性映射成一个字符串用于GroupBy
* @param entity
* @param row
* @param projection
*/
mappingProjectionOnRow(entity, row, projection) {
let key = '';
let result = {};
const values = [];
const mappingIter = (entity2, row2, p2, result2) => {
const keys = Object.keys(p2).sort((ele1, ele2) => ele1 < ele2 ? -1 : 1);
for (const k of keys) {
const rel = this.judgeRelation(entity2, k);
if (rel === 2) {
result2[k] = {};
if (row2[k]) {
mappingIter(k, row2[k], p2[k], result2[k]);
}
}
else if (typeof rel === 'string') {
result2[k] = {};
if (row2[k]) {
mappingIter(rel, row2[k], p2[k], result2[k]);
}
}
else {
assert([0, 1].includes(rel));
result2[k] = row2[k];
assert(['string', 'number', 'boolean'].includes(typeof row2[k]) || row2[k] === null);
key += `${row2[k]}`;
values.push(row2[k]);
}
}
};
mappingIter(entity, row, projection, result);
return {
result,
key,
values,
};
}
calcAggregation(entity, rows, aggregationData) {
const ops = Object.keys(aggregationData).filter(ele => ele !== '#aggr' && ele.startsWith('#'));
const result = {};
const results = {};
for (const row of rows) {
for (const op of ops) {
const { values } = this.mappingProjectionOnRow(entity, row, aggregationData[op]);
assert(values.length === 1, `聚合运算中,${op}的目标属性多于1个`);
if (results[op]) {
results[op].push(values[0]);
}
else {
results[op] = [values[0]];
}
/* if (op.startsWith('#max')) {
if (![undefined, null].includes(values[0]) && (!result.hasOwnProperty(op) || result[op] < values[0])) {
result[op] = values[0];
}
}
else if (op.startsWith('#min')) {
if (![undefined, null].includes(values[0]) && (!result.hasOwnProperty(op) || result[op] > values[0])) {
result[op] = values[0];
}
}
else if (op.startsWith('#sum')) {
if (![undefined, null].includes(values[0])) {
assert(typeof values[0] === 'number', '只有number类型的属性才可以计算sum');
if (!result.hasOwnProperty(op)) {
result[op] = values[0];
}
else {
result[op] += values[0];
}
}
}
else if (op.startsWith('#count')) {
if (![undefined, null].includes(values[0])) {
if (!result.hasOwnProperty(op)) {
result[op] = 1;
}
else {
result[op] += 1;
}
}
}
else {
assert(op.startsWith('#avg'));
if (![undefined, null].includes(values[0])) {
assert(typeof values[0] === 'number', '只有number类型的属性才可以计算avg');
if (!result.hasOwnProperty(op)) {
result[op] = {
total: values[0],
count: 1,
};
}
else {
result[op].total += values[0];
result[op].count += 1;
}
}
} */
}
}
const { distinct } = aggregationData;
for (const op in results) {
if (op.startsWith('#max')) {
result[op] = null;
results[op].forEach((ele) => {
if (![undefined, null].includes(ele) && (result[op] === null || result[op] < ele)) {
result[op] = ele;
}
});
}
else if (op.startsWith('#min')) {
result[op] = null;
results[op].forEach((ele) => {
if (![undefined, null].includes(ele) && (result[op] === null || result[op] > ele)) {
result[op] = ele;
}
});
}
else if (op.startsWith('#sum')) {
result[op] = 0;
const data = distinct ? uniq(results[op]) : results[op];
data.forEach((ele) => {
if (typeof ele === 'number') {
result[op] += ele;
}
});
}
else if (op.startsWith('#count')) {
result[op] = 0;
const data = distinct ? uniq(results[op]) : results[op];
data.forEach((ele) => {
if (![undefined, null].includes(ele)) {
result[op] += 1;
}
});
}
else if (op.startsWith('#avg')) {
result[op] = 0;
const data = (distinct ? uniq(results[op]) : results[op]).filter(ele => ![undefined, null].includes(ele));
let count = 0;
data.forEach((ele) => {
if (typeof ele === 'number') {
result[op] += ele;
count += 1;
}
});
result[op] = result[op] / count;
}
}
return result;
}
formAggregation(entity, rows, aggregationData) {
const { "#aggr": aggrExpr } = aggregationData;
if (aggrExpr) {
const groups = groupBy(rows, (row) => {
const { key } = this.mappingProjectionOnRow(entity, row, aggrExpr);
return key;
});
const result = Object.keys(groups).map((ele) => {
const aggr = this.calcAggregation(entity, groups[ele], aggregationData);
const { result: r } = this.mappingProjectionOnRow(entity, groups[ele][0], aggrExpr);
aggr['#data'] = r;
return aggr;
});
return result;
}
const aggr = this.calcAggregation(entity, rows, aggregationData);
return [aggr];
}
selectSync(entity, selection, context, option) {
assert(context.getCurrentTxnId());
const result = super.selectSync(entity, selection, context, option);
// 在这里再计算所有的表达式
result.forEach((ele) => this.formExprInResult(entity, selection.data, ele, {}, context));
return result;
}
async selectAsync(entity, selection, context, option) {
assert(context.getCurrentTxnId());
const result = await super.selectAsync(entity, selection, context, option);
// 在这里再计算所有的表达式
result.forEach((ele) => this.formExprInResult(entity, selection.data, ele, {}, context));
return result;
}
aggregateAbjointRowSync(entity, aggregation, context, option) {
assert(context.getCurrentTxnId());
const { data, filter, sorter, indexFrom, count } = aggregation;
const p = {};
for (const k in data) {
Object.assign(p, cloneDeep(data[k]));
}
const selection = {
data: p,
filter,
sorter,
indexFrom,
count,
};
const result = this.cascadeSelect(entity, selection, context, Object.assign({}, option, {
dontCollect: true,
}));
// 在这里再计算所有的表达式
result.forEach((ele) => this.formExprInResult(entity, selection.data, ele, {}, context));
// 最后计算Aggregation
return this.formAggregation(entity, result, aggregation.data);
}
async aggregateAbjointRowAsync(entity, aggregation, context, option) {
assert(context.getCurrentTxnId());
const { data, filter, sorter, indexFrom, count } = aggregation;
const p = {};
for (const k in data) {
Object.assign(p, cloneDeep(data[k]));
}
const selection = {
data: p,
filter,
sorter,
indexFrom,
count,
};
polishSelection(this.getSchema(), entity, selection);
const result = await this.cascadeSelectAsync(entity, selection, context, Object.assign({}, option, {
dontCollect: true,
}));
// 在这里再计算所有的表达式
result.forEach((ele) => this.formExprInResult(entity, selection.data, ele, {}, context));
// 最后计算Aggregation
return this.formAggregation(entity, result, aggregation.data);
}
countAbjointRow(entity, selection, context, option) {
const selection2 = Object.assign({}, selection, {
data: {
id: 1,
},
});
const result = this.selectAbjointRow(entity, selection2, context, Object.assign({}, option, {
dontCollect: true,
}));
return typeof selection.count === 'number' ? Math.min(result.length, selection.count) : result.length;
}
async countAbjointRowAsync(entity, selection, context, option) {
const selection2 = Object.assign({}, selection, {
data: {
id: 1,
},
});
const result = await this.selectAbjointRowAsync(entity, selection2, context, Object.assign({}, option, {
dontCollect: true,
}));
return typeof selection.count === 'number' && selection.count > 0 ? Math.min(result.length, selection.count) : result.length;
}
addToTxnNode(node, context, action, updateAt) {
const txnNode = this.activeTxnDict[context.getCurrentTxnId()];
assert(txnNode);
if (!node.$nextNode) {
// 如果nextNode有值说明这个结点已经在链表中了
if (txnNode.nodeHeader) {
node.$nextNode = txnNode.nodeHeader;
txnNode.nodeHeader = node;
}
else {
txnNode.nodeHeader = node;
}
}
txnNode[action]++;
txnNode.lastUpdateTs = updateAt;
}
getStat() {
return this.stat;
}
beginSync() {
const uuid = `${Math.random()}`;
assert(!this.activeTxnDict.hasOwnProperty(uuid));
Object.assign(this.activeTxnDict, {
[uuid]: {
create: 0,
update: 0,
remove: 0,
waitList: [],
},
});
return uuid;
}
commitCallbacks = [];
onCommit(callback) {
this.commitCallbacks.push(callback);
return () => pull(this.commitCallbacks, callback);
}
addToOperationResult(result, entity, action) {
if (result[entity]) {
if (result[entity][action]) {
result[entity][action]++;
}
else {
Object.assign(result[entity], {
[action]: 1,
});
}
}
else {
Object.assign(result, {
[entity]: {
[action]: 1,
},
});
}
}
commitLogic(uuid) {
assert(this.activeTxnDict.hasOwnProperty(uuid), uuid);
let node = this.activeTxnDict[uuid].nodeHeader;
const result = {};
while (node) {
const node2 = node.$nextNode;
if (node.$txnId === uuid) {
assert(node.$path);
const entity = node.$path?.split('.')[0];
if (node.$next) {
// create/update
node.$current = Object.assign(node.$current || {}, node.$next);
if (node.$current) {
this.addToOperationResult(result, entity, 'create');
}
else {
this.addToOperationResult(result, entity, 'update');
}
unset(node, '$txnId');
unset(node, '$next');
unset(node, '$path');
unset(node, '$nextNode');
}
else {
// remove
this.addToOperationResult(result, entity, 'remove');
unset(this.store, node.$path);
unset(node, '$txnId');
}
}
else {
// 同一行被同一事务更新多次
assert(node.$txnId === undefined);
}
node = node2;
}
if (this.activeTxnDict[uuid].create || this.activeTxnDict[uuid].update || this.activeTxnDict[uuid].remove) {
this.stat.create += this.activeTxnDict[uuid].create;
this.stat.update += this.activeTxnDict[uuid].update;
this.stat.remove += this.activeTxnDict[uuid].remove;
this.stat.commit++;
}
// 唤起等待者
for (const waiter of this.activeTxnDict[uuid].waitList) {
waiter.fn();
}
unset(this.activeTxnDict, uuid);
return result;
}
commitSync(uuid) {
const result = this.commitLogic(uuid);
// 这里无法等待callback完成callback最好自身保证顺序前端cache应当具备的特征
this.commitCallbacks.forEach(callback => callback(result));
}
rollbackSync(uuid) {
assert(this.activeTxnDict.hasOwnProperty(uuid));
let node = this.activeTxnDict[uuid].nodeHeader;
while (node) {
const node2 = node.$nextNode;
if (node.$txnId === uuid) {
if (node.$current) {
// update/remove
unset(node, '$txnId');
unset(node, '$next');
unset(node, '$path');
unset(node, '$nextNode');
}
else {
// create
assert(node.$path);
unset(this.store, node.$path);
unset(node, '$txnId');
}
}
else {
// 该结点被同一事务反复处理
assert(node.$txnId === undefined);
}
node = node2;
}
// 唤起等待者
for (const waiter of this.activeTxnDict[uuid].waitList) {
waiter.fn();
}
unset(this.activeTxnDict, uuid);
}
async beginAsync() {
return this.beginSync();
}
async commitAsync(uuid) {
const result = this.commitLogic(uuid);
for (const fn of this.commitCallbacks) {
await fn(result);
}
}
async rollbackAsync(uuid) {
return this.rollbackSync(uuid);
}
// 将输入的OpRecord同步到数据中
sync(opRecords, context, option) {
const option2 = Object.assign({}, option, {
dontCollect: true,
dontCreateOper: true,
});
const result = {};
for (const record of opRecords) {
switch (record.a) {
case 'c': {
const { e, d } = record;
if (d instanceof Array) {
for (const dd of d) {
assert(dd[UpdateAtAttribute], `获取的${e}对象数据没有update时间戳`);
if (!this.store[e]?.[dd.id]) {
// 有可能后台通过socket等途径先把数据推到了前台这里直接忽略就行了
this.updateAbjointRow(e, {
id: 'dummy',
action: 'create',
data: dd,
}, context, option2);
this.addToOperationResult(result, e, 'create');
}
}
}
else {
assert(d[UpdateAtAttribute], `获取的${e}对象数据没有update时间戳`);
if (!this.store[e]?.[d.id]) {
this.updateAbjointRow(e, {
id: 'dummy',
action: 'create',
data: d,
}, context, option2);
this.addToOperationResult(result, e, 'create');
}
}
break;
}
case 'r': {
const { e, f, d } = record;
this.updateAbjointRow(e, {
id: 'dummy',
action: 'remove',
data: {},
filter: f,
}, context, option2);
this.addToOperationResult(result, e, 'remove');
break;
}
case 's': {
const { d } = record;
for (const entity in d) {
for (const id in d[entity]) {
assert(d[entity][id][UpdateAtAttribute], `获取的${entity}对象数据没有update时间戳`);
if (!this.store[entity]?.[id]) {
this.updateAbjointRow(entity, {
id: 'dummy',
action: 'create',
data: d[entity][id],
}, context, option2);
this.addToOperationResult(result, entity, 'create');
}
else if (this.store[entity]?.[id]) {
const row = this.constructRow(this.store[entity][id], context);
if (row[UpdateAtAttribute] <= d[entity][id][UpdateAtAttribute]) {
this.updateAbjointRow(entity, {
id: 'dummy',
action: 'update',
data: d[entity][id],
filter: {
id,
},
}, context, option2);
this.addToOperationResult(result, entity, 'update');
}
}
}
}
break;
}
default: {
const { e, d, f } = record;
assert(d[UpdateAtAttribute], `获取的${e}对象数据没有update时间戳`);
// 只更新满足条件的行的更新时间戳比自己小的BM项目中出现过socket推送的状态先回的情况
this.updateAbjointRow(e, {
id: 'dummy',
action: 'update',
data: d,
filter: combineFilters(e, this.getSchema(), [f, {
[UpdateAtAttribute]: {
$lte: d[UpdateAtAttribute],
}
}]),
}, context, option2);
this.addToOperationResult(result, e, 'update');
break;
}
}
}
// 在txn提交时应该call过了这里看上去是多余的
/* this.commitCallbacks.forEach(
callback => callback(result)
); */
}
// 这里要返回当前store的数据状态如果有事务正在更新返回更新的时间戳否则返回整个store的commit值
// 前台使用
getLastUpdateTs() {
for (const uuid in this.activeTxnDict) {
if (this.activeTxnDict[uuid].lastUpdateTs) {
return this.activeTxnDict[uuid].lastUpdateTs;
}
}
return this.stat.commit;
}
}