import { EntityDict, OperateOption, SelectOption, OpRecord, AspectWrapper, CheckerType, Aspect, SelectOpResult, StorageSchema, Checker, SubDataDef, AttrUpdateMatrix, Connector, AggregationResult, SelectionRewriter, OperationRewriter } from 'oak-domain/lib/types'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { AspectDict as CommonAspectDict } from 'oak-common-aspect'; import { Feature } from '../types/Feature'; import { set, pull, intersection, omit, unset, union } from 'oak-domain/lib/utils/lodash'; import { CacheStore } from '../cacheStore/CacheStore'; import { OakRowUnexistedException, OakRowInconsistencyException, OakException, OakUserException } from 'oak-domain/lib/types/Exception'; import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore'; import { SyncContext } from 'oak-domain/lib/store/SyncRowStore'; import { assert } from 'oak-domain/lib/utils/assert'; import { RelationAuth as BaseRelationAuth } from 'oak-domain/lib/store/RelationAuth'; import { LocalStorage } from './localStorage'; import { LOCAL_STORAGE_KEYS } from '../constant/constant'; import { checkFilterContains, combineFilters } from 'oak-domain/lib/store/filter'; import { CommonConfiguration } from 'oak-domain/lib/types/Configuration'; const DEFAULT_KEEP_FRESH_PERIOD = 600 * 1000; // 10分钟不刷新 interface CacheSelectOption extends SelectOption { ignoreKeepFreshRule?: true, }; type RefreshOption = { dontPublish?: true; useLocalCache?: { keys: string[]; // entity的查询根据这些keys上次查询是不是在gap()内判定是用cache的数据还是刷新 gap?: number; onlyReturnFresh?: boolean; // 如果置true只返回新的(update大于now - gap的) }; ignoreContext?: true; }; type WholeAspectDict>>> = CommonAspectDict & AD; export class Cache extends Feature { private cacheStore: CacheStore; private syncEventsCallbacks: Array< (opRecords: OpRecord[]) => void >; private contextBuilder: () => SyncContext; private executing = 0; private savedEntities: (keyof ED)[]; private keepFreshPeriod: number; private localStorage: LocalStorage; private refreshRecords: { [T in keyof ED]?: Record; } = {}; private context?: SyncContext; private initPromise: Promise; private attrUpdateMatrix?: AttrUpdateMatrix; private connector: Connector>; private baseRelationAuth: BaseRelationAuth; // 缓存在某一时间戳状态下,对某行进行某个action的判断结果 private entityActionAuthDict: { [ts: number]: { [T in keyof ED]?: { [I: string]: { [cts: string]: { [A in ED[T]['Action']]: boolean | OakUserException; } } }; }; }; registerSelectionRewriter(rewriter: SelectionRewriter | SyncContext, SelectOption>) { this.cacheStore.registerSelectionRewriter(rewriter); } registerOperationRewriter(rewriter: OperationRewriter | SyncContext, OperateOption>) { this.cacheStore.registerOperationRewriter(rewriter); } constructor( storageSchema: StorageSchema, connector: Connector>, frontendContextBuilder: (store: CacheStore) => SyncContext, checkers: Array>>, localStorage: LocalStorage, common: CommonConfiguration ) { super(); this.syncEventsCallbacks = []; this.connector = connector; this.cacheStore = new CacheStore(storageSchema); this.contextBuilder = () => frontendContextBuilder(this.cacheStore); this.savedEntities = ['actionAuth', 'i18n', 'path', ...(common.cacheSavedEntities || [])]; this.keepFreshPeriod = common.cacheKeepFreshPeriod || DEFAULT_KEEP_FRESH_PERIOD; this.localStorage = localStorage; this.attrUpdateMatrix = common.attrUpdateMatrix; this.baseRelationAuth = new BaseRelationAuth(storageSchema, common.authDeduceRelationMap, common.selectFreeEntities, common.updateFreeDict); checkers.forEach( (checker) => this.cacheStore.registerChecker(checker) ); // 现在这个init变成了异步行为,不知道有没有影响。by Xc 20231126 this.initPromise = new Promise( (resolve) => this.initSavedLogic(resolve) ); this.buildEntityGraph(); this.entityActionAuthDict = {}; } /** * 处理cache中需要缓存的数据 */ private async initSavedLogic(complete: () => void) { const data: { [T in keyof ED]?: ED[T]['OpSchema'][]; } = {}; await Promise.all( this.savedEntities.map( async (entity) => { // 加载缓存的数据项 const key = `${LOCAL_STORAGE_KEYS.cacheSaved}:${entity as string}`; const cached = await this.localStorage.load(key); if (cached) { data[entity] = cached; } // 加载缓存的时间戳项 const key2 = `${LOCAL_STORAGE_KEYS.cacheRefreshRecord}:${entity as string}`; const cachedTs = await this.localStorage.load(key2); if (cachedTs) { this.refreshRecords[entity] = cachedTs; } } ) ); this.cacheStore.resetInitialData(data); this.cacheStore.onCommit( async (result) => { const entities = Object.keys(result); const referenced = intersection(entities, this.savedEntities); if (referenced.length > 0) { const saved = this.cacheStore.getCurrentData(referenced); for (const entity in saved) { const key = `${LOCAL_STORAGE_KEYS.cacheSaved}:${entity as string}`; await this.localStorage.save(key, saved[entity]); } } } ); complete(); } async onInitialized() { await this.initPromise; } getSchema() { return this.cacheStore.getSchema(); } /* getCurrentUserId(allowUnloggedIn?: boolean) { const context = this.contextBuilder && this.contextBuilder(); return context?.getCurrentUserId(allowUnloggedIn); } */ async exec>>, K extends keyof WholeAspectDict>( name: K, params: Parameters[K]>[0], callback?: (result: Awaited[K]>>, opRecords?: OpRecord[]) => void, dontPublish?: true, ignoreContext?: true, ) { try { this.executing++; const { result, opRecords, message } = await this.connector.callAspect(name as string, params, ignoreContext ? undefined : this.context || this.contextBuilder()); callback && callback(result, opRecords); if (opRecords?.length) { this.syncInner(opRecords); } this.executing--; if (opRecords && opRecords.length > 0 && !dontPublish) { this.publish(); } return { result: result as Awaited[K]>>, message, }; } catch (e) { // 如果是数据不一致错误,这里可以让用户知道 this.executing--; if (e instanceof OakException) { const { opRecords } = e; if (opRecords.length) { this.syncInner(opRecords as OpRecord[]); this.publish(); } } throw e; } } private saveRefreshRecord(entity: keyof ED) { const records = this.refreshRecords[entity]; assert(records); const key2 = `${LOCAL_STORAGE_KEYS.cacheRefreshRecord}:${entity as string}`; this.localStorage.save(key2, records); } private addRefreshRecord(entity: keyof ED, key: string, now: number): () => void { const originTimestamp = this.refreshRecords[entity] && this.refreshRecords[entity]![key]; if (this.refreshRecords[entity]) { Object.assign(this.refreshRecords[entity]!, { [key]: now, }); } else { Object.assign(this.refreshRecords, { [entity]: { [key]: now, } }); } if (originTimestamp) { return () => this.addRefreshRecord(entity, key, originTimestamp); } return () => unset(this.refreshRecords[entity], key); } /** * 向服务器刷新数据 * @param entity 要刷新的实体 * @param selection 查询映射 * @param option 选项 * @param callback 回调函数 * @param refreshOption 刷新选项 * @returns 返回查询结果 * @description 支持增量更新,可以使用useLocalCache来将一些metadata级的数据本地缓存,减少更新次数。 * 使用增量更新这里要注意,传入的keys如果有一个key是首次更新,会导致所有的keys全部更新。使用模块自己保证这种情况不要出现 */ async refresh( entity: T, selection: ED[T]['Selection'], option?: OP, callback?: (result: Awaited['select']>>) => void, refreshOption?: RefreshOption, ) { // todo 还要判定没有aggregation const { dontPublish, useLocalCache, ignoreContext } = refreshOption || {}; const onlyReturnFresh = refreshOption?.useLocalCache?.onlyReturnFresh; let undoFns = [] as Array<() => void>; const originFilter = selection.filter; if (useLocalCache) { assert(!selection.indexFrom && !selection.count, '用cache的查询不能使用分页'); assert(this.savedEntities.includes(entity), `${entity as string}不在系统设置的应缓存对象当中`); const { keys, gap } = useLocalCache; let oldest = Number.MAX_SAFE_INTEGER; keys.forEach( (k) => { const last = this.refreshRecords[entity] && this.refreshRecords[entity]![k]; if (typeof last === 'number') { if (last < oldest) { oldest = last; } } else { // 说明这个key没有取过,直接赋0 oldest = 0; } } ); const gap2 = gap || this.keepFreshPeriod; const now = Date.now(); if (oldest < Number.MAX_SAFE_INTEGER && oldest > now - gap2) { // 说明可以用localCache的数据,不用去请求 if (process.env.NODE_ENV === 'development') { // console.warn('根据keepFresh规则,省略了一次请求数据的行为', entity, selection); } if (onlyReturnFresh) { return { data: [], }; } const data = this.get(entity, selection); return { data, }; } else { if (oldest > 0) { // 说明key曾经都取过了,只取updateAt在oldest之后的数据 selection.filter = selection.filter ? combineFilters(entity, this.getSchema(), [selection.filter, { $$updateAt$$: { $gte: oldest, } }]) : { $$updateAt$$: { $gte: oldest, } }; } undoFns = keys.map( (k) => this.addRefreshRecord(entity, k, now) ); } } try { const { result: { data: sr, total } } = await this.exec('select', { entity, selection, option, }, callback, dontPublish, ignoreContext); let filter2: ED[T]['Filter'] = { id: { $in: Object.keys(sr), } }; if (undoFns.length > 0 && !onlyReturnFresh) { filter2 = originFilter!; } const selection2 = Object.assign({}, selection, { filter: filter2, // 因为id已经确定,这里不能再有indexFrom和count了 indexFrom: undefined, count: undefined, }); const data = this.get(entity, selection2, sr); if (useLocalCache) { this.saveRefreshRecord(entity); } return { data, total, }; } catch (err) { undoFns && undoFns.forEach( (fn) => fn() ); throw err; } } /** * 聚合查询 * @param entity 实体 * @param aggregation 聚合条件 * @param option 选项 * @returns 返回查询结果 */ async aggregate( entity: T, aggregation: ED[T]['Aggregation'], option?: OP, ) { const { result } = await this.connector.callAspect('aggregate', { entity, aggregation, option, }, this.context || this.contextBuilder()); return result as AggregationResult; } /** * 操作 * @param entity 实体 * @param operation 操作名 * @param option 选项 * @param callback 回调函数 * @returns 返回操作结果 */ async operate( entity: T, operation: ED[T]['Operation'] | ED[T]['Operation'][], option?: OP, callback?: (result: Awaited['operate']>>) => void, ) { const result = await this.exec('operate', { entity, operation, option, }, callback); return result; } async count( entity: T, selection: Pick, option?: OP, callback?: (result: Awaited['count']>>) => void, ) { const { result } = await this.exec('count', { entity, selection, option, }, callback); return result; } private syncInner(records: OpRecord[]) { this.begin(); try { this.cacheStore!.sync(records, this.context!); // 同步以后,当前所有缓存的action结果都不成立了 this.entityActionAuthDict = {}; // 唤起同步注册的回调 this.syncEventsCallbacks.map((ele) => ele(records)); this.context!.commit(); this.context = undefined; } catch (err: any) { this.context!.rollback(); this.context = undefined; throw err; } } sync(records: OpRecord[]) { if (records.length) { this.syncInner(records); this.publish(); } } /** * 前端缓存做operation只可能是测试权限,必然回滚 * @param entity * @param operation * @returns */ tryRedoOperations(operations: ({ operation: ED[T]['Operation']; entity: T })[]) { const rollback = this.begin(); try { for (const oper of operations) { const { entity, operation } = oper; this.context!.operate(entity, operation, { dontCollect: true, }); } rollback!(); return true; } catch (err) { rollback!(); // 现在如果cache中属性缺失会报OakRowUnexistedException,待进一步细化 if (!(err instanceof OakUserException) && !(err instanceof OakRowUnexistedException)) { throw err; } return err as Error; } } /** * 根据初始化定义的attrUpdateMatrix,检查当前entity是否支持用action去更新Attrs属性 * 返回通过合法性检查的Attrs * @param entity * @param action * @param attrs * @returns */ getLegalUpdateAttrs( entity: T, action: ED[T]['Action'], attrs: (keyof ED[T]['Update']['data'])[], id: string, ) { if (!this.attrUpdateMatrix) { return [...attrs]; } const matrix = this.attrUpdateMatrix[entity]; if (!matrix) { return [...attrs]; } const result = [] as (keyof ED[T]['Update']['data'])[]; // 当前这个函数的调用一定是在reRender过程中,无需要额外处理上下文 assert(this.context); // const rollback = this.begin(true); for (const attr of attrs) { const def = matrix[attr]; if (def) { const { actions, filter } = def; if (actions && !actions.includes(action)) { continue; } const filter2 = typeof filter === 'function' ? filter({ action, data: { [attr]: 'void', // 这里里面应该不会用到更新的值,这样写不要紧…… }, filter: { id } }, this.context!) as ED[T]['Filter']: filter; if (!filter2 || this.checkFilterContains(entity, filter2, { id }, true)) { result.push(attr); } } } return result; } checkOperation( entity: T, operation: { action: ED[T]['Action'], data?: ED[T]['Operation']['data'], filter?: ED[T]['Filter'], }, checkerTypes?: CheckerType[], cacheInsensative?: true, ) { // 缓存同一id对同一action的判断,避免性能低下 const { action, data, filter } = operation; let id: string | undefined; let ts: number | undefined; const checkTypeString = checkerTypes ? checkerTypes.join('-') : '$$all'; if (!cacheInsensative) { if (filter && !data && typeof filter.id === 'string') { id = filter.id; ts = this.cacheStore.getLastUpdateTs(); if (this.entityActionAuthDict[ts]?.[entity]?.[id]?.[checkTypeString]?.hasOwnProperty(action)) { return this.entityActionAuthDict[ts]![entity]![id]![checkTypeString]![action]!; } } } let rollback = this.begin(true); try { this.cacheStore!.check(entity, operation, this.context!, checkerTypes); rollback && rollback(); if (checkerTypes?.includes('relation')) { rollback = this.begin(true); this.baseRelationAuth.checkRelationSync(entity, operation as Omit, this.context!); rollback && rollback(); } if (id && ts) { set(this.entityActionAuthDict, `${ts}.${entity as string}.${id}.${checkTypeString}.${action}`, true); } return true; } catch (err) { rollback && rollback(); if (err instanceof OakRowUnexistedException) { // 有外键缺失,尝试发一下请求 this.fetchRows(err.getRows()); if (id && ts) { set(this.entityActionAuthDict, `${ts}.${entity as string}.${id}.${checkTypeString}.${action}`, false); } return false; } /** * 这里逻辑有问题,tryExecute这个动作前,是需要把父亲结点上的动作先执行掉才有意义。 * 后面再改,现在先容错 by Xc 20240712 */ /* if (!(err instanceof OakUserException)) { throw err; } */ if (id && ts) { set(this.entityActionAuthDict, `${ts}.${entity as string}.${id}.${checkTypeString}.${action}`, err); } return err as OakUserException; } } redoOperation(opers: Array<{ entity: keyof ED; operation: ED[keyof ED]['Operation']; }>) { assert(this.context); opers.forEach( (oper) => { const { entity, operation } = oper; this.cacheStore!.operate(entity, operation, this.context!, { checkerTypes: ['logical'], // 这里不能检查data,不然在数据没填完前会有大量异常 dontCollect: true, }); } ); return; } fetchRows(missedRows: Array<{ entity: keyof ED, selection: ED[keyof ED]['Selection'] }>) { if (!this.executing) { if (process.env.NODE_ENV === 'development') { console.warn('缓存被动去获取数据,请查看页面行为并加以优化', missedRows); } this.exec('fetchRows', missedRows, async (result, opRecords) => { // missedRows理论上一定要取到,不能为空集。否则就是程序员有遗漏 for (const record of opRecords!) { const { d } = record as SelectOpResult; assert(Object.keys(d).length > 0, '在通过fetchRow取不一致数据时返回了空数据,请拿该程序员祭天。'); for (const mr of missedRows) { assert(Object.keys(d![mr.entity]!).length > 0, `在通过fetchRow取不一致数据时返回了空数据,请拿该程序员祭天。entity是${mr.entity as string}`); } } }) } } private getInner( entity: T, selection: ED[T]['Selection']): Partial[] { const rollback = this.begin(true); try { const result = this.cacheStore!.select( entity, selection, this.context!, { dontCollect: true, includedDeleted: true, warnWhenAttributeMiss: !this.executing && process.env.NODE_ENV === 'development', } ); rollback && rollback(); return result; } catch (err) { rollback && rollback(); if (err instanceof OakRowUnexistedException) { // 现在只有外键缺失会抛出RowUnexisted异常,前台在modi中连接了其它表的外键时会出现 this.fetchRows(err.getRows()); return []; } throw err; } } /** * 把select的结果merge到sr中,因为select有可能存在aggr数据,在这里必须要使用合并后的结果 * sr的数据结构不好规范化描述,参见common-aspect中的select接口 * @param entity * @param rows * @param sr */ mergeSelectResult(entity: T, rows: Partial[], sr: Record) { const mergeSingleRow = (e: keyof ED, r: Partial, sr2: Record) => { for (const k in sr2) { if (k.endsWith('$$aggr')) { Object.assign(r, { [k]: sr2[k], }); } else if (r[k]) { const rel = this.judgeRelation(e, k); if (rel === 2) { mergeSingleRow(k, r[k]!, sr2[k]); } else if (typeof rel === 'string') { mergeSingleRow(rel, r[k]!, sr2[k]); } else { assert(rel instanceof Array); assert((r[k] as any) instanceof Array) const { data } = sr2[k]; this.mergeSelectResult(rel[0], r[k]!, data); } } } }; const unwanted: Partial[] = []; rows.forEach( (row) => { const { id } = row; if (sr[id!]) { mergeSingleRow(entity, row, sr[id!]); } else { unwanted.push(row); } } ); pull(rows, ...unwanted); // assert(rows.length === Object.keys(sr).length); } get( entity: T, selection: ED[T]['Selection'], sr?: Record, ) { const rows = this.getInner(entity, selection); if (sr) { this.mergeSelectResult(entity, rows, sr); } return rows; } judgeRelation(entity: keyof ED, attr: string) { return this.cacheStore!.judgeRelation(entity, attr); } bindOnSync(callback: (opRecords: OpRecord[]) => void) { this.syncEventsCallbacks.push(callback); } unbindOnSync(callback: (opRecords: OpRecord[]) => void) { pull(this.syncEventsCallbacks, callback); } getCachedData() { return this.cacheStore!.getCurrentData(); } getFullData() { return this.connector.getFullData(); } makeBridgeUrl(url: string, headers?: Record) { return this.connector.makeBridgeUrl(url, headers); } begin(allowInTxn?: boolean) { if (this.context) { assert(allowInTxn); return; } this.context = this.contextBuilder!(); this.context.begin(); return () => { this.context!.rollback(); this.context = undefined; }; } checkFilterContains( entity: T, contained: NonNullable, filter: NonNullable, dataCompare?: true) { assert(this.context); return checkFilterContains>(entity, this.context!, contained, filter, dataCompare); } private entityGraph?: { data: Array<{ name: string }>; links: Array<{ source: string; target: string; value: number; }> }; private buildEntityGraph() { const schema = this.getSchema(); // 构建出一张图来 const data: Array<{ name: string }> = []; const links: Array<{ source: string; target: string; value: number; }> = []; const nodeOutSet: Record = {}; const nodeInSet: Record = {}; const ExcludeEntities = ['modi', 'modiEntity', 'oper', 'operEntity', 'relation', 'relationAuth', 'actionAuth', 'userRelation']; for (const entity in schema) { if (ExcludeEntities.includes(entity)) { continue; } const { attributes } = schema[entity]; for (const attr in attributes) { const { ref } = attributes[attr]; if (ref instanceof Array) { ref.forEach( (reff) => { if (reff === entity || ExcludeEntities.includes(reff) || nodeOutSet[entity]?.includes(reff)) { return; } if (nodeInSet[reff]) { nodeInSet[reff].push(entity); } else { nodeInSet[reff] = [entity]; } if (nodeOutSet[entity]) { nodeOutSet[entity].push(reff); } else { nodeOutSet[entity] = [reff]; } } ); } else if (ref && ref !== entity && !ExcludeEntities.includes(ref) && !nodeOutSet[entity]?.includes(ref)) { if (nodeInSet[ref]) { nodeInSet[ref].push(entity); } else { nodeInSet[ref] = [entity]; } if (nodeOutSet[entity]) { nodeOutSet[entity].push(ref); } else { nodeOutSet[entity] = [ref]; } } } } // 把完全独立的对象剥离 const entities = union(Object.keys(nodeOutSet), Object.keys(nodeInSet)); entities.forEach( (entity) => data.push({ name: entity }) ); // link上的value代表其长度。出入度越多的结点,其关联的边的value越大,以便于上层用引力布局渲染 for (const entity in nodeOutSet) { const fromValue = nodeOutSet[entity].length + nodeInSet[entity]?.length || 0; for (const target of nodeOutSet[entity]) { const toValue = nodeOutSet[target]?.length || 0 + nodeInSet[target]?.length || 0; links.push({ source: entity, target, value: fromValue + toValue, }); } } this.entityGraph = { data, links, }; } getEntityGraph() { const { data, links } = this.entityGraph!; return { data, links, }; } async getRelationIdByName(entity: keyof ED, name: string, entityId?: string) { const filter: ED['relation']['Filter'] = { entity: entity as string, name, }; if (entityId) { filter.$or = [ { entityId, }, { entityId: { $exists: false, }, } ]; } else { filter.entityId = { $exists: false, }; } const { data: relations } = await this.refresh('relation', { data: { id: 1, entity: 1, entityId: 1, name: 1, display: 1, actionAuth$relation: { $entity: 'actionAuth', data: { id: 1, pathId: 1, deActions: 1, path: { id: 1, destEntity: 1, value: 1, sourceEntity: 1, recursive: 1, } }, }, }, filter, }); if (relations.length === 2) { return relations.find( ele => ele.entityId )!.id; } return relations[0].id; } }