修正了node对更新的正确处理

This commit is contained in:
Xu Chang 2022-04-18 14:23:21 +08:00
parent 2688607d6b
commit 9fab25843a
3 changed files with 235 additions and 59 deletions

View File

@ -1,10 +1,10 @@
import { EntityDict, OperationResult } from 'oak-domain/lib/types/Entity';
import { EntityDict, OperationResult, OpRecord } from 'oak-domain/lib/types/Entity';
import { StorageSchema } from "oak-domain/lib/types/Storage";
import { TriggerExecutor } from 'oak-domain/lib/store/TriggerExecutor';
import { Checker, Context } from 'oak-domain/lib/types';
import { Checker, Context, Trigger } from 'oak-domain/lib/types';
import { TreeStore } from 'oak-memory-tree-store';
export class CacheStore<ED extends EntityDict> extends TreeStore<ED> {
export class CacheStore<ED extends EntityDict> extends TreeStore<ED> {
private executor: TriggerExecutor<ED>;
constructor(storageSchema: StorageSchema<ED>, initialData?: {
@ -48,13 +48,13 @@ export class CacheStore<ED extends EntityDict> extends TreeStore<ED> {
context: Context<ED>,
params?: Object
) {
const autoCommit = !context.getCurrentTxnId();
if (autoCommit) {
await context.begin();
}
let result;
try {
result = await super.select(entity, selection, context, params);
}

View File

@ -1,11 +1,12 @@
import { StorageSchema, DeduceSelection, EntityDict, OperateParams, OpRecord, Aspect, Checker, RowStore, Context, OperationResult } from 'oak-domain/lib/types';
import { StorageSchema, DeduceSelection, EntityDict, OperateParams, OpRecord, Aspect, Checker, RowStore, Context, OperationResult, Trigger, SelectOpResult, UpdateOpResult } from 'oak-domain/lib/types';
import { Action, Feature } from '../types/Feature';
import { assign } from 'lodash';
import { assign, pull, uniq } from 'lodash';
import { CacheStore } from '../cacheStore/CacheStore';
export class Cache<ED extends EntityDict, Cxt extends Context<ED>, AD extends Record<string, Aspect<ED, Cxt>>> extends Feature<ED, Cxt, AD> {
cacheStore: CacheStore<ED>;
createContext: (store: RowStore<ED>) => Cxt;
private syncEventsCallbacks: Array<(opRecords: OpRecord<ED>[]) => Promise<void>>;
constructor(storageSchema: StorageSchema<ED>, createContext: (store: RowStore<ED>) => Cxt, checkers?: Array<Checker<ED, keyof ED>>) {
const cacheStore = new CacheStore(storageSchema);
@ -17,6 +18,7 @@ export class Cache<ED extends EntityDict, Cxt extends Context<ED>, AD extends Re
super();
this.cacheStore = cacheStore;
this.createContext = createContext;
this.syncEventsCallbacks = [];
}
@ -40,6 +42,12 @@ export class Cache<ED extends EntityDict, Cxt extends Context<ED>, AD extends Re
throw err;
}
await context.commit();
// 唤起同步注册的回调
const result = this.syncEventsCallbacks.map(
ele => ele(records)
);
await Promise.all(result);
}
@Action
@ -73,4 +81,14 @@ export class Cache<ED extends EntityDict, Cxt extends Context<ED>, AD extends Re
judgeRelation(entity: keyof ED, attr: string) {
return this.cacheStore.judgeRelation(entity, attr);
}
bindOnSync(callback: (opRecords: OpRecord<ED>[]) => Promise<void>) {
this.syncEventsCallbacks.push(callback);
}
unbindOnSync(callback: (opRecords: OpRecord<ED>[]) => Promise<void>) {
pull(this.syncEventsCallbacks, callback);
}
}

View File

@ -1,6 +1,6 @@
import { set, cloneDeep, pull, unset } from 'lodash';
import { AttrFilter, DeduceCreateOperation, DeduceFilter, DeduceOperation, DeduceSelection, DeduceUpdateOperation, EntityDict, EntityShape, SelectRowShape } from 'oak-domain/lib/types/Entity';
import { Aspect, Context } from 'oak-domain/lib/types';
import { AttrFilter, CreateOpResult, DeduceCreateOperation, DeduceFilter, DeduceOperation, DeduceSelection, DeduceUpdateOperation, EntityDict, EntityShape, OpRecord, SelectOpResult, SelectRowShape, UpdateOpResult } from 'oak-domain/lib/types/Entity';
import { Aspect, Context, Trigger } from 'oak-domain/lib/types';
import { combineFilters } from 'oak-domain/lib/store/filter';
import { Action, Feature } from '../types/Feature';
import { Cache } from './cache';
@ -11,25 +11,32 @@ import { judgeRelation } from 'oak-domain/lib/store/relation';
import { StorageSchema } from 'oak-domain/lib/types/Storage';
import { Pagination } from '../types/Pagination';
export class Node<ED extends EntityDict, T extends keyof ED> {
export class Node<ED extends EntityDict, T extends keyof ED, Cxt extends Context<ED>, AD extends Record<string, Aspect<ED, Cxt>>> {
protected entity: T;
protected fullPath: string;
protected schema: StorageSchema<ED>;
protected projection?: ED[T]['Selection']['data']; // 只在Page层有
protected parent?: Node<ED, keyof ED>;
protected parent?: Node<ED, keyof ED, Cxt, AD>;
protected action?: ED[T]['Action'];
protected dirty: boolean;
protected updateData?: DeduceUpdateOperation<ED[T]['OpSchema']>['data'];
protected cache: Cache<ED, Cxt, AD>;
protected needReGetValue: boolean;
protected refreshing: boolean;
private refreshFn?: (opRecords: OpRecord<ED>[]) => Promise<void>;
constructor(entity: T, fullPath: string, schema: StorageSchema<ED>, projection?: ED[T]['Selection']['data'],
parent?: Node<ED, keyof ED>, action?: ED[T]['Action']) {
constructor(entity: T, fullPath: string, schema: StorageSchema<ED>, cache: Cache<ED, Cxt, AD>, projection?: ED[T]['Selection']['data'],
parent?: Node<ED, keyof ED, Cxt, AD>, action?: ED[T]['Action']) {
this.entity = entity;
this.fullPath = fullPath;
this.schema = schema;
this.cache = cache;
this.projection = projection;
this.parent = parent;
this.action = action;
this.dirty = false;
this.needReGetValue = true;
this.refreshing = false;
}
getSubEntity(path: string): {
@ -87,6 +94,22 @@ export class Node<ED extends EntityDict, T extends keyof ED> {
getProjection() {
return this.projection!;
}
registerValueSentry(refreshFn: (opRecords: OpRecord<ED>[]) => Promise<void>) {
/**
* singleNode可以进一步优化refresh
* by Xc 20220417
*/
this.refreshFn = refreshFn;
this.cache.bindOnSync(refreshFn);
}
unregisterValueSentry() {
if (this.refreshFn) {
this.cache.unbindOnSync(this.refreshFn);
this.refreshFn = undefined;
}
}
}
const DEFAULT_PAGINATION: Pagination = {
@ -96,25 +119,29 @@ const DEFAULT_PAGINATION: Pagination = {
more: true,
}
class ListNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
class ListNode<ED extends EntityDict, T extends keyof ED, Cxt extends Context<ED>, AD extends Record<string, Aspect<ED, Cxt>>> extends Node<ED, T, Cxt, AD>{
private ids: string[];
protected children: SingleNode<ED, T>[];
protected children: SingleNode<ED, T, Cxt, AD>[];
protected value: Array<Partial<ED[T]['Schema']>>;
private filters: DeduceFilter<ED[T]['Schema']>[];
private sorter?: ED[T]['Selection']['sorter'];
private pagination: Pagination;
constructor(entity: T, fullPath: string, schema: StorageSchema<ED>, projection?: ED[T]['Selection']['data'],
parent?: Node<ED, keyof ED>, pagination?: Pagination, filters?: DeduceFilter<ED[T]['Schema']>[],
constructor(entity: T, fullPath: string, schema: StorageSchema<ED>, cache: Cache<ED, Cxt, AD>, projection?: ED[T]['Selection']['data'],
parent?: Node<ED, keyof ED, Cxt, AD>, pagination?: Pagination, filters?: DeduceFilter<ED[T]['Schema']>[],
sorter?: ED[T]['Selection']['sorter'], action?: ED[T]['Action']) {
super(entity, fullPath, schema, projection, parent, action);
super(entity, fullPath, schema, cache, projection, parent, action);
this.ids = [];
this.value = [];
this.children = [];
this.filters = filters || [];
this.sorter = sorter || [];
this.pagination = pagination || DEFAULT_PAGINATION;
if (projection) {
this.registerValueSentry((records) => this.onRecordSynchoronized(records));
}
}
async composeOperation(): Promise<DeduceOperation<ED[T]['Schema']> | DeduceOperation<ED[T]['Schema']>[] | undefined> {
@ -138,7 +165,7 @@ class ListNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
return actions;
}
addChild(path: string, node: SingleNode<ED, T>) {
addChild(path: string, node: SingleNode<ED, T, Cxt, AD>) {
const { children } = this;
const idx = parseInt(path, 10);
assert(!children[idx]);
@ -156,7 +183,7 @@ class ListNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
const idx = parseInt(path, 10);
let node = this.children[idx];
if (create && !node) {
node = new SingleNode(this.entity, `${this.fullPath}.${idx}`, this.schema, undefined, this, this.ids[idx]);
node = new SingleNode(this.entity, `${this.fullPath}.${idx}`, this.schema, this.cache, undefined, this as any, this.ids[idx]);
node.setValue(this.value[idx]);
this.addChild(path, node);
}
@ -171,11 +198,12 @@ class ListNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
this.filters = filters;
}
async refresh<Cxt extends Context<ED>, AD extends Record<string, Aspect<ED, Cxt>>>(cache: Cache<ED, Cxt, AD>) {
async refresh() {
const { filters, sorter, pagination, entity, projection } = this;
assert(projection);
const { step } = pagination;
const { ids } = await cache.refresh(entity, {
this.refreshing = true;
const { ids } = await this.cache.refresh(entity, {
data: projection as any,
filter: filters.length > 0 ? combineFilters(filters) : undefined,
sorter,
@ -186,6 +214,7 @@ class ListNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
this.ids = ids;
this.pagination.indexFrom = 0;
this.pagination.more = ids.length === step;
this.refreshing = false;
}
updateChildrenValue() {
@ -194,24 +223,28 @@ class ListNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
);
}
async getValue<Cxt extends Context<ED>, AD extends Record<string, Aspect<ED, Cxt>>>(cache: Cache<ED, Cxt, AD>) {
async reGetValue() {
const { entity, ids, projection } = this;
if (ids.length) {
const filter: Partial<AttrFilter<ED[T]['Schema']>> = {
id: {
$in: ids,
},
} as any; // 这里也没搞懂用EntityShape能过ED[T]['Schema']就不过
const rows = await cache.get({
assert(projection);
if (ids.length > 0) {
const rows = await this.cache.get({
entity,
selection: {
data: projection as any,
filter,
}
filter: {
id: {
$in: ids,
}
} as any,
},
});
this.value = ids.map(
id => rows.find(ele => ele.id === id)!
(id) => rows.find(
(ele) => ele.id === id
)
).filter(
ele => !!ele
) as any;
}
else {
@ -219,7 +252,83 @@ class ListNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
}
this.updateChildrenValue();
}
async onRecordSynchoronized(records: OpRecord<ED>[]) {
if (this.refreshing) {
this.needReGetValue = true;
return;
}
for (const record of records) {
const { a } = record;
switch (a) {
case 'c': {
const { e, d } = record as CreateOpResult<ED, T>;
if (e === this.entity) {
const { id } = d;
const filter = combineFilters([{ id } as any, ...this.filters]);
const { ids } = await this.cache.operate(e, {
data: {
id: 1,
} as any,
filter,
action: 'select',
}, false, { obscure: true });
if (ids!.length > 0) {
// todo 这里更严格应该还要考虑sorter但前端可能没有完整的供sort用的cache数据
this.ids.push(id);
this.needReGetValue = true;
}
}
break;
}
case 'r':
case 'u': {
const { e, f } = record as UpdateOpResult<ED, T>;
if (e === this.entity) {
// todo 这里更严格应该考虑f对当前value有无影响同上面一样这里可能没有完整的供f用的cache数据
this.needReGetValue = true;
}
}
case 's': {
const { d } = record as SelectOpResult<ED>;
for (const e in d) {
if (e as string === this.entity) {
for (const id in d[e]) {
if (this.ids.includes(id)) {
this.needReGetValue = true;
break;
}
else {
const filter = combineFilters([{ id } as any, ...this.filters]);
const { ids } = await this.cache.operate(e, {
data: {
id: 1,
} as any,
filter,
action: 'select',
}, false, { obscure: true });
if (ids!.length > 0) {
// todo 这里更严格应该还要考虑sorter但前端可能没有完整的供sort用的cache数据
this.ids.push(id);
this.needReGetValue = true;
break;
}
}
}
}
break;
}
}
}
}
}
async getValue() {
if (this.needReGetValue) {
await this.reGetValue();
this.needReGetValue = false;
}
return this.value;
}
@ -248,29 +357,34 @@ class ListNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
}
class SingleNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
class SingleNode<ED extends EntityDict, T extends keyof ED, Cxt extends Context<ED>, AD extends Record<string, Aspect<ED, Cxt>>> extends Node<ED, T, Cxt, AD>{
private id?: string;
private value?: Partial<ED[T]['Schema']>;
private children: {
[K in keyof ED[T]['Schema']]?: SingleNode<ED, keyof ED> | ListNode<ED, keyof ED>;
[K in keyof ED[T]['Schema']]?: SingleNode<ED, keyof ED, Cxt, AD> | ListNode<ED, keyof ED, Cxt, AD>;
};
constructor(entity: T, fullPath: string, schema: StorageSchema<ED>, projection?: ED[T]['Selection']['data'],
parent?: Node<ED, keyof ED>, id?: string, action?: ED[T]['Action']) {
super(entity, fullPath, schema, projection, parent, action);
constructor(entity: T, fullPath: string, schema: StorageSchema<ED>, cache: Cache<ED, Cxt, AD>, projection?: ED[T]['Selection']['data'],
parent?: Node<ED, keyof ED, Cxt, AD>, id?: string, action?: ED[T]['Action']) {
super(entity, fullPath, schema, cache, projection, parent, action);
this.id = id;
this.children = {};
if (projection) {
this.registerValueSentry((record) => this.onRecordSynchoronized(record));
}
}
async refresh<Cxt extends Context<ED>, AD extends Record<string, Aspect<ED, Cxt>>>(cache: Cache<ED, Cxt, AD>) {
async refresh() {
assert(this.projection);
if (this.action !== 'create') {
await cache.refresh(this.entity, {
this.refreshing = true;
await this.cache.refresh(this.entity, {
data: this.projection,
filter: {
id: this.id,
},
} as any);
this.refreshing = false;
}
}
@ -299,7 +413,7 @@ class SingleNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
return action;
}
addChild(path: string, node: Node<ED, keyof ED>) {
addChild(path: string, node: Node<ED, keyof ED, Cxt, AD>) {
const { children } = this;
assert(!children[path]);
assign(children, {
@ -321,7 +435,7 @@ class SingleNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
if (relation === 2) {
// 基于entityId的多对一
if (this.value && this.value.entityId) {
node = new SingleNode(path as any, `${this.fullPath}.${path}`, this.schema, undefined, this, this.value.entityId);
node = new SingleNode(path as keyof ED, `${this.fullPath}.${path}`, this.schema, this.cache, undefined, this as any, this.value.entityId);
}
else {
// 新建对象并关联
@ -342,12 +456,12 @@ class SingleNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
id: this.id!,
}
} as any); */
node = new SingleNode(path as any, `${this.fullPath}.${path}`, this.schema, undefined, this, id, 'create');
node = new SingleNode(path as any, `${this.fullPath}.${path}`, this.schema, this.cache, undefined, this as any, id, 'create');
}
}
else if (typeof relation === 'string') {
if (this.value && this.value[`${path}Id`]) {
node = new SingleNode(relation as any, `${this.fullPath}.${path}`, this.schema, undefined, this, this.value![`${path}Id`]);
node = new SingleNode(relation as any, `${this.fullPath}.${path}`, this.schema, this.cache, undefined, this as any, this.value![`${path}Id`]);
}
else {
// 新建对象并关联
@ -367,12 +481,12 @@ class SingleNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
id: this.id!,
}
} as any); */
node = new SingleNode(path as any, `${this.fullPath}.${path}`, this.schema, undefined, this, id, 'create');
node = new SingleNode(path as any, `${this.fullPath}.${path}`, this.schema, this.cache, undefined, this as any, id, 'create');
}
}
else {
assert(relation instanceof Array);
node = new ListNode(relation[0] as any, `${this.fullPath}.${path}`, this.schema, undefined, this);
node = new ListNode(relation[0] as any, `${this.fullPath}.${path}`, this.schema, this.cache, undefined, this as any);
}
node.setValue(this.value && this.value[path] as any);
this.addChild(path as string, node);
@ -394,14 +508,14 @@ class SingleNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
}
}
async getValue<Cxt extends Context<ED>, AD extends Record<string, Aspect<ED, Cxt>>>(cache: Cache<ED, Cxt, AD>) {
async reGetValue() {
const { entity, id, projection } = this;
assert(projection);
if (id) {
const filter: Partial<AttrFilter<ED[T]["Schema"]>> = {
id,
} as any;
const value = await cache.get({
const value = await this.cache.get({
entity,
selection: {
data: projection as any,
@ -411,6 +525,13 @@ class SingleNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
this.value = value[0] as Partial<ED[T]['Schema']>;
this.updateChildrenValues();
}
}
async getValue() {
if (this.needReGetValue) {
await this.reGetValue();
this.needReGetValue = false;
}
return this.value;
}
@ -428,13 +549,49 @@ class SingleNode<ED extends EntityDict, T extends keyof ED> extends Node<ED, T>{
this.children[attr]?.resetUpdateData();
}
}
async onRecordSynchoronized(records: OpRecord<ED>[]) {
if (this.refreshing) {
this.needReGetValue = true;
return;
}
for (const record of records) {
const { a } = record;
switch (a) {
case 'c': {
}
case 'r':
case 'u': {
const { e, f } = record as UpdateOpResult<ED, T>;
if (e === this.entity) {
// todo 这里更严格应该考虑f对当前filter有无影响同上面一样这里可能没有完整的供f用的cache数据
this.needReGetValue = true;
}
}
case 's': {
const { d } = record as SelectOpResult<ED>;
for (const e in d) {
if (e as string === this.entity) {
for (const id in d[e]) {
if (this.id === id) {
this.needReGetValue = true;
break;
}
}
}
}
break;
}
}
}
}
}
export class RunningNode<ED extends EntityDict, Cxt extends Context<ED>, AD extends Record<string, Aspect<ED, Cxt>>> extends Feature<ED, Cxt, AD> {
private cache: Cache<ED, Cxt, AD>;
private schema?: StorageSchema<ED>;
private root: Record<string, SingleNode<ED, keyof ED> | ListNode<ED, keyof ED>>;
private root: Record<string, SingleNode<ED, keyof ED, Cxt, AD> | ListNode<ED, keyof ED, Cxt, AD>>;
constructor(cache: Cache<ED, Cxt, AD>) {
super();
@ -446,7 +603,7 @@ export class RunningNode<ED extends EntityDict, Cxt extends Context<ED>, AD exte
entity?: T, isList?: boolean, isPicker?: boolean, projection?: ED[T]['Selection']['data'], id?: string,
pagination?: Pagination, filters?: DeduceFilter<ED[T]['Schema']>[],
sorter?: ED[T]['Selection']['sorter']) {
let node: ListNode<ED, T> | SingleNode<ED, T>;
let node: ListNode<ED, T, Cxt, AD> | SingleNode<ED, T, Cxt, AD>;
const parentNode = parent ? await this.findNode(parent) : undefined;
const fullPath = parent ? `${parent}.${path}` : `${path}`;
const subEntity = parentNode && parentNode.getSubEntity(path);
@ -454,7 +611,7 @@ export class RunningNode<ED extends EntityDict, Cxt extends Context<ED>, AD exte
const isList2 = subEntity ? subEntity.isList : isList!;
if (isPicker || isList2) {
node = new ListNode<ED, T>(entity2 as T, fullPath, this.schema!, projection, parentNode, pagination, filters, sorter);
node = new ListNode<ED, T, Cxt, AD>(entity2 as T, fullPath, this.schema!, this.cache, projection, parentNode, pagination, filters, sorter);
}
else {
/* let id2: string = id || v4({ random: await getRandomValues(16) });
@ -467,13 +624,13 @@ export class RunningNode<ED extends EntityDict, Cxt extends Context<ED>, AD exte
} as FormCreateData<ED[T]['OpSchema']>,
}, context);
} */
node = new SingleNode<ED, T>(entity2 as T, fullPath, this.schema!, projection, parentNode, id, !id ? 'create' : undefined);
node = new SingleNode<ED, T, Cxt, AD>(entity2 as T, fullPath, this.schema!, this.cache, projection, parentNode, id, !id ? 'create' : undefined);
}
if (parentNode) {
parentNode.addChild(path, node as any);
}
else {
this.root[path] = node;
this.root[path] = node as any;
}
return entity2;
@ -482,6 +639,7 @@ export class RunningNode<ED extends EntityDict, Cxt extends Context<ED>, AD exte
async destroyNode(path: string) {
const node = await this.findNode(path);
if (node) {
node.unregisterValueSentry();
const childPath = path.slice(path.lastIndexOf('.') + 1);
const parent = node.getParent();
if (parent instanceof SingleNode) {
@ -662,7 +820,7 @@ export class RunningNode<ED extends EntityDict, Cxt extends Context<ED>, AD exte
async get(path: string) {
const node = await this.findNode(path);
let value = await node.getValue<Cxt, AD>(this.cache);
let value = await node.getValue();
if (node.isDirty()) {
const operation = await node.composeOperation();
const projection = node.getProjection();
@ -670,9 +828,9 @@ export class RunningNode<ED extends EntityDict, Cxt extends Context<ED>, AD exte
}
if (value instanceof Array) {
return value!;
return value;
}
return [value!];
return [value];
}
async isDirty(path: string) {
@ -735,7 +893,7 @@ export class RunningNode<ED extends EntityDict, Cxt extends Context<ED>, AD exte
@Action
async refresh(path: string) {
const node = await this.findNode(path);
await node.refresh(this.cache);
await node.refresh();
}
@Action
@ -744,7 +902,7 @@ export class RunningNode<ED extends EntityDict, Cxt extends Context<ED>, AD exte
if (node instanceof ListNode) {
node.setFilters(filters);
if (refresh) {
await node.refresh(this.cache);
await node.refresh();
}
}
}