import { Feature } from '../types/Feature'; import { set, pull, intersection, unset, union } from 'oak-domain/lib/utils/lodash'; import { CacheStore } from '../cacheStore/CacheStore'; import { OakRowUnexistedException, OakException, OakUserException } from 'oak-domain/lib/types/Exception'; import { assert } from 'oak-domain/lib/utils/assert'; import { RelationAuth as BaseRelationAuth } from 'oak-domain/lib/store/RelationAuth'; import { LOCAL_STORAGE_KEYS } from '../constant/constant'; import { checkFilterContains, combineFilters } from 'oak-domain/lib/store/filter'; const DEFAULT_KEEP_FRESH_PERIOD = 600 * 1000; // 10分钟不刷新 ; export class Cache extends Feature { cacheStore; syncEventsCallbacks; contextBuilder; executing = 0; savedEntities; keepFreshPeriod; localStorage; refreshRecords = {}; context; initPromise; attrUpdateMatrix; connector; baseRelationAuth; // 缓存在某一时间戳状态下,对某行进行某个action的判断结果 entityActionAuthDict; registerSelectionRewriter(rewriter) { this.cacheStore.registerSelectionRewriter(rewriter); } registerOperationRewriter(rewriter) { this.cacheStore.registerOperationRewriter(rewriter); } constructor(storageSchema, connector, frontendContextBuilder, checkers, localStorage, common) { super(); this.syncEventsCallbacks = []; this.connector = connector; this.cacheStore = new CacheStore(storageSchema); this.contextBuilder = () => frontendContextBuilder(this.cacheStore); this.savedEntities = ['actionAuth', 'i18n', 'path', ...(common.cacheSavedEntities || [])]; this.keepFreshPeriod = common.cacheKeepFreshPeriod || DEFAULT_KEEP_FRESH_PERIOD; this.localStorage = localStorage; this.attrUpdateMatrix = common.attrUpdateMatrix; this.baseRelationAuth = new BaseRelationAuth(storageSchema, common.authDeduceRelationMap, common.selectFreeEntities, common.updateFreeDict); checkers.forEach((checker) => this.cacheStore.registerChecker(checker)); // 现在这个init变成了异步行为,不知道有没有影响。by Xc 20231126 this.initPromise = new Promise((resolve) => this.initSavedLogic(resolve)); this.buildEntityGraph(); this.entityActionAuthDict = {}; } /** * 处理cache中需要缓存的数据 */ async initSavedLogic(complete) { const data = {}; await Promise.all(this.savedEntities.map(async (entity) => { // 加载缓存的数据项 const key = `${LOCAL_STORAGE_KEYS.cacheSaved}:${entity}`; const cached = await this.localStorage.load(key); if (cached) { data[entity] = cached; } // 加载缓存的时间戳项 const key2 = `${LOCAL_STORAGE_KEYS.cacheRefreshRecord}:${entity}`; const cachedTs = await this.localStorage.load(key2); if (cachedTs) { this.refreshRecords[entity] = cachedTs; } })); this.cacheStore.resetInitialData(data); this.cacheStore.onCommit(async (result) => { const entities = Object.keys(result); const referenced = intersection(entities, this.savedEntities); if (referenced.length > 0) { const saved = this.cacheStore.getCurrentData(referenced); for (const entity in saved) { const key = `${LOCAL_STORAGE_KEYS.cacheSaved}:${entity}`; await this.localStorage.save(key, saved[entity]); } } }); complete(); } async onInitialized() { await this.initPromise; } getSchema() { return this.cacheStore.getSchema(); } /* getCurrentUserId(allowUnloggedIn?: boolean) { const context = this.contextBuilder && this.contextBuilder(); return context?.getCurrentUserId(allowUnloggedIn); } */ async exec(name, params, callback, dontPublish, ignoreContext) { try { this.executing++; const { result, opRecords, message } = await this.connector.callAspect(name, params, ignoreContext ? undefined : this.context || this.contextBuilder()); callback && callback(result, opRecords); if (opRecords?.length) { this.syncInner(opRecords); } this.executing--; if (opRecords && opRecords.length > 0 && !dontPublish) { this.publish(); } return { result: result, message, }; } catch (e) { // 如果是数据不一致错误,这里可以让用户知道 this.executing--; if (e instanceof OakException) { const { opRecords } = e; if (opRecords.length) { this.syncInner(opRecords); this.publish(); } } throw e; } } saveRefreshRecord(entity) { const records = this.refreshRecords[entity]; assert(records); const key2 = `${LOCAL_STORAGE_KEYS.cacheRefreshRecord}:${entity}`; this.localStorage.save(key2, records); } addRefreshRecord(entity, key, now) { const originTimestamp = this.refreshRecords[entity] && this.refreshRecords[entity][key]; if (this.refreshRecords[entity]) { Object.assign(this.refreshRecords[entity], { [key]: now, }); } else { Object.assign(this.refreshRecords, { [entity]: { [key]: now, } }); } if (originTimestamp) { return () => this.addRefreshRecord(entity, key, originTimestamp); } return () => unset(this.refreshRecords[entity], key); } /** * 向服务器刷新数据 * @param entity 要刷新的实体 * @param selection 查询映射 * @param option 选项 * @param callback 回调函数 * @param refreshOption 刷新选项 * @returns 返回查询结果 * @description 支持增量更新,可以使用useLocalCache来将一些metadata级的数据本地缓存,减少更新次数。 * 使用增量更新这里要注意,传入的keys如果有一个key是首次更新,会导致所有的keys全部更新。使用模块自己保证这种情况不要出现 */ async refresh(entity, selection, option, callback, refreshOption) { // todo 还要判定没有aggregation const { dontPublish, useLocalCache, ignoreContext } = refreshOption || {}; const onlyReturnFresh = refreshOption?.useLocalCache?.onlyReturnFresh; let undoFns = []; const originFilter = selection.filter; if (useLocalCache) { assert(!selection.indexFrom && !selection.count, '用cache的查询不能使用分页'); assert(this.savedEntities.includes(entity), `${entity}不在系统设置的应缓存对象当中`); const { keys, gap } = useLocalCache; let oldest = Number.MAX_SAFE_INTEGER; keys.forEach((k) => { const last = this.refreshRecords[entity] && this.refreshRecords[entity][k]; if (typeof last === 'number') { if (last < oldest) { oldest = last; } } else { // 说明这个key没有取过,直接赋0 oldest = 0; } }); const gap2 = gap || this.keepFreshPeriod; const now = Date.now(); if (oldest < Number.MAX_SAFE_INTEGER && oldest > now - gap2) { // 说明可以用localCache的数据,不用去请求 if (process.env.NODE_ENV === 'development') { // console.warn('根据keepFresh规则,省略了一次请求数据的行为', entity, selection); } if (onlyReturnFresh) { return { data: [], }; } const data = this.get(entity, selection); return { data, }; } else { if (oldest > 0) { // 说明key曾经都取过了,只取updateAt在oldest之后的数据 selection.filter = selection.filter ? combineFilters(entity, this.getSchema(), [selection.filter, { $$updateAt$$: { $gte: oldest, } }]) : { $$updateAt$$: { $gte: oldest, } }; } undoFns = keys.map((k) => this.addRefreshRecord(entity, k, now)); } } try { const { result: { data: sr, total } } = await this.exec('select', { entity, selection, option, }, callback, dontPublish, ignoreContext); let filter2 = { id: { $in: Object.keys(sr), } }; if (undoFns.length > 0 && !onlyReturnFresh) { filter2 = originFilter; } const selection2 = Object.assign({}, selection, { filter: filter2, // 因为id已经确定,这里不能再有indexFrom和count了 indexFrom: undefined, count: undefined, }); const data = this.get(entity, selection2, sr); if (useLocalCache) { this.saveRefreshRecord(entity); } return { data, total, }; } catch (err) { undoFns && undoFns.forEach((fn) => fn()); throw err; } } /** * 聚合查询 * @param entity 实体 * @param aggregation 聚合条件 * @param option 选项 * @returns 返回查询结果 */ async aggregate(entity, aggregation, option) { const { result } = await this.connector.callAspect('aggregate', { entity, aggregation, option, }, this.context || this.contextBuilder()); return result; } /** * 操作 * @param entity 实体 * @param operation 操作名 * @param option 选项 * @param callback 回调函数 * @returns 返回操作结果 */ async operate(entity, operation, option, callback) { const result = await this.exec('operate', { entity, operation, option, }, callback); return result; } async count(entity, selection, option, callback) { const { result } = await this.exec('count', { entity, selection, option, }, callback); return result; } syncInner(records) { this.begin(); try { this.cacheStore.sync(records, this.context); // 同步以后,当前所有缓存的action结果都不成立了 this.entityActionAuthDict = {}; // 唤起同步注册的回调 this.syncEventsCallbacks.map((ele) => ele(records)); this.context.commit(); this.context = undefined; } catch (err) { this.context.rollback(); this.context = undefined; throw err; } } sync(records) { if (records.length) { this.syncInner(records); this.publish(); } } /** * 前端缓存做operation只可能是测试权限,必然回滚 * @param entity * @param operation * @returns */ tryRedoOperations(operations) { const rollback = this.begin(); try { for (const oper of operations) { const { entity, operation } = oper; this.context.operate(entity, operation, { dontCollect: true, }); } rollback(); return true; } catch (err) { rollback(); // 现在如果cache中属性缺失会报OakRowUnexistedException,待进一步细化 if (!(err instanceof OakUserException) && !(err instanceof OakRowUnexistedException)) { throw err; } return err; } } /** * 根据初始化定义的attrUpdateMatrix,检查当前entity是否支持用action去更新Attrs属性 * 返回通过合法性检查的Attrs * @param entity * @param action * @param attrs * @returns */ getLegalUpdateAttrs(entity, action, attrs, id) { if (!this.attrUpdateMatrix) { return [...attrs]; } const matrix = this.attrUpdateMatrix[entity]; if (!matrix) { return [...attrs]; } const result = []; // 当前这个函数的调用一定是在reRender过程中,无需要额外处理上下文 assert(this.context); // const rollback = this.begin(true); for (const attr of attrs) { const def = matrix[attr]; if (def) { const { actions, filter } = def; if (actions && !actions.includes(action)) { continue; } const filter2 = typeof filter === 'function' ? filter({ action, data: { [attr]: 'void', // 这里里面应该不会用到更新的值,这样写不要紧…… }, filter: { id } }, this.context) : filter; if (!filter2 || this.checkFilterContains(entity, filter2, { id }, true)) { result.push(attr); } } } return result; } checkOperation(entity, operation, checkerTypes, cacheInsensative) { // 缓存同一id对同一action的判断,避免性能低下 const { action, data, filter } = operation; let id; let ts; const checkTypeString = checkerTypes ? checkerTypes.join('-') : '$$all'; if (!cacheInsensative) { if (filter && !data && typeof filter.id === 'string') { id = filter.id; ts = this.cacheStore.getLastUpdateTs(); if (this.entityActionAuthDict[ts]?.[entity]?.[id]?.[checkTypeString]?.hasOwnProperty(action)) { return this.entityActionAuthDict[ts][entity][id][checkTypeString][action]; } } } let rollback = this.begin(true); try { this.cacheStore.check(entity, operation, this.context, checkerTypes); rollback && rollback(); if (checkerTypes?.includes('relation')) { rollback = this.begin(true); this.baseRelationAuth.checkRelationSync(entity, operation, this.context); rollback && rollback(); } if (id && ts) { set(this.entityActionAuthDict, `${ts}.${entity}.${id}.${checkTypeString}.${action}`, true); } return true; } catch (err) { rollback && rollback(); if (err instanceof OakRowUnexistedException) { // 有外键缺失,尝试发一下请求 this.fetchRows(err.getRows()); if (id && ts) { set(this.entityActionAuthDict, `${ts}.${entity}.${id}.${checkTypeString}.${action}`, false); } return false; } /** * 这里逻辑有问题,tryExecute这个动作前,是需要把父亲结点上的动作先执行掉才有意义。 * 后面再改,现在先容错 by Xc 20240712 */ /* if (!(err instanceof OakUserException)) { throw err; } */ if (id && ts) { set(this.entityActionAuthDict, `${ts}.${entity}.${id}.${checkTypeString}.${action}`, err); } return err; } } redoOperation(opers) { assert(this.context); opers.forEach((oper) => { const { entity, operation } = oper; this.cacheStore.operate(entity, operation, this.context, { checkerTypes: ['logical'], // 这里不能检查data,不然在数据没填完前会有大量异常 dontCollect: true, }); }); return; } fetchRows(missedRows) { if (!this.executing) { if (process.env.NODE_ENV === 'development') { console.warn('缓存被动去获取数据,请查看页面行为并加以优化', missedRows); } this.exec('fetchRows', missedRows, async (result, opRecords) => { // missedRows理论上一定要取到,不能为空集。否则就是程序员有遗漏 for (const record of opRecords) { const { d } = record; assert(Object.keys(d).length > 0, '在通过fetchRow取不一致数据时返回了空数据,请拿该程序员祭天。'); for (const mr of missedRows) { assert(Object.keys(d[mr.entity]).length > 0, `在通过fetchRow取不一致数据时返回了空数据,请拿该程序员祭天。entity是${mr.entity}`); } } }); } } getInner(entity, selection) { const rollback = this.begin(true); try { const result = this.cacheStore.select(entity, selection, this.context, { dontCollect: true, includedDeleted: true, warnWhenAttributeMiss: !this.executing && process.env.NODE_ENV === 'development', }); rollback && rollback(); return result; } catch (err) { rollback && rollback(); if (err instanceof OakRowUnexistedException) { // 现在只有外键缺失会抛出RowUnexisted异常,前台在modi中连接了其它表的外键时会出现 this.fetchRows(err.getRows()); return []; } throw err; } } /** * 把select的结果merge到sr中,因为select有可能存在aggr数据,在这里必须要使用合并后的结果 * sr的数据结构不好规范化描述,参见common-aspect中的select接口 * @param entity * @param rows * @param sr */ mergeSelectResult(entity, rows, sr) { const mergeSingleRow = (e, r, sr2) => { for (const k in sr2) { if (k.endsWith('$$aggr')) { Object.assign(r, { [k]: sr2[k], }); } else if (r[k]) { const rel = this.judgeRelation(e, k); if (rel === 2) { mergeSingleRow(k, r[k], sr2[k]); } else if (typeof rel === 'string') { mergeSingleRow(rel, r[k], sr2[k]); } else { assert(rel instanceof Array); assert(r[k] instanceof Array); const { data } = sr2[k]; this.mergeSelectResult(rel[0], r[k], data); } } } }; const unwanted = []; rows.forEach((row) => { const { id } = row; if (sr[id]) { mergeSingleRow(entity, row, sr[id]); } else { unwanted.push(row); } }); pull(rows, ...unwanted); // assert(rows.length === Object.keys(sr).length); } get(entity, selection, sr) { const rows = this.getInner(entity, selection); if (sr) { this.mergeSelectResult(entity, rows, sr); } return rows; } judgeRelation(entity, attr) { return this.cacheStore.judgeRelation(entity, attr); } bindOnSync(callback) { this.syncEventsCallbacks.push(callback); } unbindOnSync(callback) { pull(this.syncEventsCallbacks, callback); } getCachedData() { return this.cacheStore.getCurrentData(); } getFullData() { return this.connector.getFullData(); } makeBridgeUrl(url, headers) { return this.connector.makeBridgeUrl(url, headers); } begin(allowInTxn) { if (this.context) { assert(allowInTxn); return; } this.context = this.contextBuilder(); this.context.begin(); return () => { this.context.rollback(); this.context = undefined; }; } checkFilterContains(entity, contained, filter, dataCompare) { assert(this.context); return checkFilterContains(entity, this.context, contained, filter, dataCompare); } entityGraph; buildEntityGraph() { const schema = this.getSchema(); // 构建出一张图来 const data = []; const links = []; const nodeOutSet = {}; const nodeInSet = {}; const ExcludeEntities = ['modi', 'modiEntity', 'oper', 'operEntity', 'relation', 'relationAuth', 'actionAuth', 'userRelation']; for (const entity in schema) { if (ExcludeEntities.includes(entity)) { continue; } const { attributes } = schema[entity]; for (const attr in attributes) { const { ref } = attributes[attr]; if (ref instanceof Array) { ref.forEach((reff) => { if (reff === entity || ExcludeEntities.includes(reff) || nodeOutSet[entity]?.includes(reff)) { return; } if (nodeInSet[reff]) { nodeInSet[reff].push(entity); } else { nodeInSet[reff] = [entity]; } if (nodeOutSet[entity]) { nodeOutSet[entity].push(reff); } else { nodeOutSet[entity] = [reff]; } }); } else if (ref && ref !== entity && !ExcludeEntities.includes(ref) && !nodeOutSet[entity]?.includes(ref)) { if (nodeInSet[ref]) { nodeInSet[ref].push(entity); } else { nodeInSet[ref] = [entity]; } if (nodeOutSet[entity]) { nodeOutSet[entity].push(ref); } else { nodeOutSet[entity] = [ref]; } } } } // 把完全独立的对象剥离 const entities = union(Object.keys(nodeOutSet), Object.keys(nodeInSet)); entities.forEach((entity) => data.push({ name: entity })); // link上的value代表其长度。出入度越多的结点,其关联的边的value越大,以便于上层用引力布局渲染 for (const entity in nodeOutSet) { const fromValue = nodeOutSet[entity].length + nodeInSet[entity]?.length || 0; for (const target of nodeOutSet[entity]) { const toValue = nodeOutSet[target]?.length || 0 + nodeInSet[target]?.length || 0; links.push({ source: entity, target, value: fromValue + toValue, }); } } this.entityGraph = { data, links, }; } getEntityGraph() { const { data, links } = this.entityGraph; return { data, links, }; } async getRelationIdByName(entity, name, entityId) { const filter = { entity: entity, name, }; if (entityId) { filter.$or = [ { entityId, }, { entityId: { $exists: false, }, } ]; } else { filter.entityId = { $exists: false, }; } const { data: relations } = await this.refresh('relation', { data: { id: 1, entity: 1, entityId: 1, name: 1, display: 1, actionAuth$relation: { $entity: 'actionAuth', data: { id: 1, pathId: 1, deActions: 1, path: { id: 1, destEntity: 1, value: 1, sourceEntity: 1, recursive: 1, } }, }, }, filter, }); if (relations.length === 2) { return relations.find(ele => ele.entityId).id; } return relations[0].id; } }