synchronizer的部分逻辑(未完全完成,但不影响测试)

This commit is contained in:
Xu Chang 2024-02-05 22:46:21 +08:00
parent 0da7a7bedb
commit fd93173d7e
3 changed files with 167 additions and 67 deletions

View File

@ -232,6 +232,10 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
if (this.synchronizer) {
// 同步数据到远端结点通过commit trigger来完成
const syncTriggers = this.synchronizer.getSyncTriggers();
syncTriggers.forEach(
(trigger) => this.registerTrigger(trigger)
);
}
}

View File

@ -1,73 +1,169 @@
import { EntityDict, StorageSchema, EndpointItem } from 'oak-domain/lib/types';
import { VolatileTrigger } from 'oak-domain/lib/types/Trigger';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { destructRelationPath, destructDirectPath } from 'oak-domain/lib/utils/relationPath';
import { BackendRuntimeContext } from 'oak-frontend-base';
import { SyncConfigWrapper, SyncEntityDef } from './types/Sync';
import { RemoteAccessInfo, SyncConfigWrapper, Algorithm } from './types/Sync';
import { assert } from 'console';
import { uniq } from 'oak-domain/lib/utils/lodash';
export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> {
private config: SyncConfigWrapper<ED>;
private schema: StorageSchema<ED>;
private analyzeConfig() {
const { config, schema } = this;
const { remotes } = config;
// 将产生的oper推送到远端Node。注意要尽量在本地阻止重复推送
private async pushOper(
oper: Partial<ED['oper']['Schema']>,
userIds: string[],
getRemoteAccessInfo: (userId: string) => Promise<RemoteAccessInfo>,
privateKey: string,
algorithm: Algorithm,
endpoint?: string) {
const analyzeSyncEntityDef = (defs: SyncEntityDef<ED, keyof ED>[]) => {
const pushEntityDefs = defs.filter(ele => ele.direction === 'push');
const pushEntities = pushEntityDefs.map(ele => ele.entity);
}
// push相关联的entity在发生操作时需要将操作推送到远端
const createOperTrigger: VolatileTrigger<ED, 'oper', Cxt> = {
name: 'push oper to remote node',
entity: 'oper',
action: 'create',
when: 'commit',
strict: 'makeSure',
check: (operation: ED['oper']['Create']) => {
const { data } = operation;
return pushEntities.includes((<ED['oper']['CreateSingle']['data']>data).targetEntity!);
},
fn: async ({ ids }, context) => {
assert(ids.length === 1);
const [oper] = await context.select('oper', {
data: {
id: 1,
action: 1,
data: 1,
targetEntity: 1,
operEntity$oper: {
$entity: 'operEntity',
data: {
id: 1,
entity: 1,
entityId: 1,
},
private makeCreateOperTrigger() {
const { config } = this;
const { remotes, self } = config;
// 根据remotes定义建立从entity到需要同步的远端结点信息的Map
const pushAccessMap: Record<string, Array<{
projection: ED[keyof ED]['Selection']['data']; // 从entity上取到相关user需要的projection
getUserIds: (rows: Partial<ED[keyof ED]['Schema']>[]) => string[]; // 从取得的行中获得userId的逻辑
getRemoteAccessInfo: (userId: string) => Promise<RemoteAccessInfo>; // 根据userId获得相应push远端的信息
endpoint?: string; // 远端接收endpoint的url
}>> = {};
remotes.forEach(
(remote) => {
const { getRemoteAccessInfo, syncEntities, endpoint } = remote;
const pushEntityDefs = syncEntities.filter(ele => ele.direction === 'push');
const pushEntities = pushEntityDefs.map(ele => ele.entity);
pushEntities.forEach(
(entity) => {
const def = syncEntities.find(ele => ele.entity === entity)!;
const { path, relationName, recursive } = def;
const {
projection,
getData
} = relationName ? destructRelationPath(this.schema, entity, path, {
relation: {
name: relationName,
}
}, recursive) : destructDirectPath(this.schema, entity, path, recursive);
const getUserIds = (rows: Partial<ED[keyof ED]['Schema']>[]) => {
const urs = rows.map(
(row) => getData(row)
).flat();
return uniq(
urs.map(
ele => ele.userId!
)
);
};
if (!pushAccessMap[entity as string]) {
pushAccessMap[entity as string] = [{
projection,
getUserIds,
getRemoteAccessInfo,
endpoint,
}];
}
else {
pushAccessMap[entity as string].push({
projection,
getUserIds,
getRemoteAccessInfo,
endpoint,
});
}
}
)
}
);
const pushEntities = Object.keys(pushAccessMap);
// push相关联的entity在发生操作时需要将operation推送到远端
const createOperTrigger: VolatileTrigger<ED, 'oper', Cxt> = {
name: 'push oper to remote node',
entity: 'oper',
action: 'create',
when: 'commit',
strict: 'makeSure',
check: (operation: ED['oper']['Create']) => {
const { data } = operation;
return pushEntities.includes((<ED['oper']['CreateSingle']['data']>data).targetEntity!);
},
fn: async ({ ids }, context) => {
assert(ids.length === 1);
const [oper] = await context.select('oper', {
data: {
id: 1,
action: 1,
data: 1,
targetEntity: 1,
operatorId: 1,
operEntity$oper: {
$entity: 'operEntity',
data: {
id: 1,
entity: 1,
entityId: 1,
},
},
filter: {
id: ids[0],
}
}, { dontCollect: true });
},
filter: {
id: ids[0],
}
}, { dontCollect: true });
const { operatorId, targetEntity, operEntity$oper: operEntities } = oper;
const entityIds = operEntities!.map(
ele => ele.entityId!
);
const def = pushEntityDefs.find(
ele => ele.entity === oper.targetEntity!
)!;
const { entity, path, relationName, direction } = def;
const { privateKey, algorithm } = await self.getSelfEncryptInfo();
// 要找到对应的所有需要推送的node对象信息
const pushNodes = pushAccessMap[targetEntity!];
if (pushNodes) {
await Promise.all(
pushNodes.map(
async ({ projection, getUserIds, getRemoteAccessInfo, endpoint }) => {
const rows = await context.select(targetEntity!, {
data: {
id: 1,
...projection,
},
filter: {
id: {
$in: entityIds,
},
},
}, { dontCollect: true });
// userId就是需要发送给远端的user但是要将本次操作的user过滤掉他是操作的产生者
const userIds = getUserIds(rows).filter(
(ele) => ele !== operatorId
);
return 1;
if (userIds.length > 0) {
await this.pushOper(oper, userIds, getRemoteAccessInfo, privateKey, algorithm, endpoint);
}
return undefined;
}
)
);
return entityIds.length * pushNodes.length;
}
return 0;
}
};
remotes.forEach(
(remote) => analyzeSyncEntityDef(remote.syncEntities)
);
return createOperTrigger;
}
constructor(config: SyncConfigWrapper<ED>, schema: StorageSchema<ED>) {
@ -79,9 +175,8 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
* sync的定义 commit triggers
* @returns
*/
getSyncTriggers(): Array<VolatileTrigger<ED, keyof ED, Cxt>> {
return [];
getSyncTriggers() {
return [this.makeCreateOperTrigger()] as Array<VolatileTrigger<ED, keyof ED, Cxt>>;
}
getSelfEndpoint(): EndpointItem<ED, Cxt> {

View File

@ -2,58 +2,59 @@ import { EntityDict } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { BackendRuntimeContext } from 'oak-frontend-base';
type RemoteAccessInfo = {
export type Algorithm = 'rsa' | 'ec' | 'ed25519';
export type RemoteAccessInfo = {
url: string;
publicKey: string;
userId: string;
algorithm: 'rsa' | 'ec' | 'ed25519';
algorithm: Algorithm;
};
type SelfEncryptInfo = {
privateKey: string;
algorithm: 'rsa' | 'ec' | 'ed25519';
algorithm: Algorithm;
};
export interface SyncEntityDef<ED extends EntityDict & BaseEntityDict, T extends keyof ED> {
entity: T; // 需要同步的entity
path: string; // 此entity到需要同步到的根对象的路径
recursive?: boolean; // 表明path的最后一项是递归的
relationName?: string; // 要同步的user与根对象的relation名称为空说明是userId)
direction: 'pull' | 'push'; // pull说明是从远端拉过来push说明是从本地推过去
};
interface SyncRemoteConfigBase<ED extends EntityDict & BaseEntityDict, T extends keyof ED> {
entity: T;
interface SyncRemoteConfigBase<ED extends EntityDict & BaseEntityDict> {
endpoint?: string; // 对方结点同步数据的endpoint默认为/sync
syncEntities: Array<SyncEntityDef<ED, keyof ED>>;
};
interface SyncRemoteConfigWrapper<ED extends EntityDict & BaseEntityDict, T extends keyof ED> extends SyncRemoteConfigBase<ED, T> {
getRemoteAccessInfo: (id: string) => Promise<RemoteAccessInfo>;
interface SyncRemoteConfigWrapper<ED extends EntityDict & BaseEntityDict> extends SyncRemoteConfigBase<ED> {
getRemoteAccessInfo: (userId: string) => Promise<RemoteAccessInfo>;
};
interface SyncRemoteConfig<ED extends EntityDict & BaseEntityDict, T extends keyof ED, Cxt extends BackendRuntimeContext<ED>> extends SyncRemoteConfigBase<ED, T> {
getRemoteAccessInfo: (id: string, context: Cxt) => Promise<RemoteAccessInfo>;
interface SyncRemoteConfig<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends SyncRemoteConfigBase<ED> {
getRemoteAccessInfo: (userId: string, context: Cxt) => Promise<RemoteAccessInfo>;
};
interface SyncSelfConfigBase<ED extends EntityDict & BaseEntityDict, T extends keyof ED> {
entity: T;
interface SyncSelfConfigBase<ED extends EntityDict & BaseEntityDict> {
endpoint?: string; // 本结点同步数据的endpoint默认为/sync
};
interface SyncSelfConfigWrapper<ED extends EntityDict & BaseEntityDict, T extends keyof ED> extends SyncSelfConfigBase<ED, T> {
interface SyncSelfConfigWrapper<ED extends EntityDict & BaseEntityDict> extends SyncSelfConfigBase<ED> {
getSelfEncryptInfo: () => Promise<SelfEncryptInfo>;
};
interface SyncSelfConfig<ED extends EntityDict & BaseEntityDict, T extends keyof ED, Cxt extends BackendRuntimeContext<ED>> extends SyncSelfConfigBase<ED, T>{
interface SyncSelfConfig<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends SyncSelfConfigBase<ED>{
getSelfEncryptInfo: (context: Cxt) => Promise<SelfEncryptInfo>;
};
export interface SyncConfig<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> {
self: SyncSelfConfig<ED, keyof ED, Cxt>;
remotes: Array<SyncRemoteConfig<ED, keyof ED, Cxt>>;
self: SyncSelfConfig<ED, Cxt>;
remotes: Array<SyncRemoteConfig<ED, Cxt>>;
};
export interface SyncConfigWrapper<ED extends EntityDict & BaseEntityDict> {
self: SyncSelfConfigWrapper<ED, keyof ED>;
remotes: Array<SyncRemoteConfigWrapper<ED, keyof ED>>;
self: SyncSelfConfigWrapper<ED>;
remotes: Array<SyncRemoteConfigWrapper<ED>>;
};