继续完善sychronizer

This commit is contained in:
Xu Chang 2024-02-06 19:44:18 +08:00
parent 2ef489e58a
commit 9421e54990
2 changed files with 85 additions and 31 deletions

View File

@ -3,10 +3,11 @@ import { VolatileTrigger } from 'oak-domain/lib/types/Trigger';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { destructRelationPath, destructDirectPath } from 'oak-domain/lib/utils/relationPath';
import { BackendRuntimeContext } from 'oak-frontend-base';
import { RemoteAccessInfo, SyncConfigWrapper, Algorithm } from './types/Sync';
import { RemotePushInfo, SyncConfigWrapper, Algorithm, RemotePullInfo, SelfEncryptInfo } from './types/Sync';
import { assert } from 'console';
import { uniq } from 'oak-domain/lib/utils/lodash';
const OAK_SYNC_HEADER_ITEM = 'oak-sync-remote-id';
type Channel<ED extends EntityDict & BaseEntityDict> = {
remoteMaxTimestamp?: number; // 远端已经接受的最大时间戳
@ -21,7 +22,7 @@ type Channel<ED extends EntityDict & BaseEntityDict> = {
};
async function pushRequestOnChannel<ED extends EntityDict & BaseEntityDict>(channel: Channel<ED>, algorithm: string, privateKey: string) {
async function pushRequestOnChannel<ED extends EntityDict & BaseEntityDict>(channel: Channel<ED>, selfEncryptInfo: SelfEncryptInfo) {
const { queue, api } = channel;
channel.queue = [];
channel.lastPushTimestamp = Date.now();
@ -29,10 +30,12 @@ async function pushRequestOnChannel<ED extends EntityDict & BaseEntityDict>(chan
const opers = queue.map(ele => ele.oper);
try {
// todo 加密
const res = await fetch(api, {
method: 'post',
headers: {
'Content-Type': 'application/json',
[OAK_SYNC_HEADER_ITEM]: selfEncryptInfo.id,
},
body: JSON.stringify(opers),
});
@ -64,28 +67,28 @@ async function pushRequestOnChannel<ED extends EntityDict & BaseEntityDict>(chan
export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> {
private config: SyncConfigWrapper<ED>;
private schema: StorageSchema<ED>;
private privateKey?: string;
private algorithm?: string;
private selfEncryptInfo?: SelfEncryptInfo;
private remotePullInfoMap: Record<string, Record<string, RemotePullInfo>> = {};
private remoteAccessChannel: Record<string, Channel<ED>> = {};
private remotePushChannel: Record<string, Channel<ED>> = {};
// 将产生的oper推送到远端Node。注意要尽量在本地阻止重复推送
private async pushOper(
oper: Partial<ED['oper']['Schema']>,
userIds: string[],
getRemoteAccessInfo: (userId: string) => Promise<RemoteAccessInfo>,
getRemoteAccessInfo: (userId: string) => Promise<RemotePushInfo>,
endpoint?: string) {
await Promise.all(
userIds.map(
async (userId) => {
if (!this.remoteAccessChannel[userId]) {
if (!this.remotePushChannel[userId]) {
const { url } = await getRemoteAccessInfo(userId);
this.remoteAccessChannel[userId] = {
this.remotePushChannel[userId] = {
api: `${url}/${endpoint || 'sync'}`,
queue: [],
};
}
const channel = this.remoteAccessChannel[userId];
const channel = this.remotePushChannel[userId];
if (channel.remoteMaxTimestamp && oper.bornAt! < channel.remoteMaxTimestamp) {
// 说明已经同步过了
return;
@ -102,9 +105,8 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
);
if (!channel.handler) {
channel.handler = setTimeout(async () => {
assert(this.privateKey);
assert(this.algorithm);
await pushRequestOnChannel(channel, this.algorithm!, this.privateKey!);
assert(this.selfEncryptInfo);
await pushRequestOnChannel(channel, this.selfEncryptInfo!);
}, 1000); // 1秒钟集中同步一次
}
@ -112,14 +114,10 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
}
)
);
}
private async loadPublicKey() {
const { privateKey, algorithm } = await this.config.self.getSelfEncryptInfo();
this.privateKey = privateKey;
this.algorithm = algorithm;
this.selfEncryptInfo = await this.config.self.getSelfEncryptInfo();
}
private makeCreateOperTrigger() {
@ -130,12 +128,12 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
const pushAccessMap: Record<string, Array<{
projection: ED[keyof ED]['Selection']['data']; // 从entity上取到相关user需要的projection
getUserIds: (rows: Partial<ED[keyof ED]['Schema']>[]) => string[]; // 从取得的行中获得userId的逻辑
getRemoteAccessInfo: (userId: string) => Promise<RemoteAccessInfo>; // 根据userId获得相应push远端的信息
getRemotePushInfo: (userId: string) => Promise<RemotePushInfo>; // 根据userId获得相应push远端的信息
endpoint?: string; // 远端接收endpoint的url
}>> = {};
remotes.forEach(
(remote) => {
const { getRemoteAccessInfo, syncEntities, endpoint } = remote;
const { getRemotePushInfo, syncEntities, endpoint } = remote;
const pushEntityDefs = syncEntities.filter(ele => ele.direction === 'push');
const pushEntities = pushEntityDefs.map(ele => ele.entity);
pushEntities.forEach(
@ -167,7 +165,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
pushAccessMap[entity as string] = [{
projection,
getUserIds,
getRemoteAccessInfo,
getRemotePushInfo,
endpoint,
}];
}
@ -175,7 +173,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
pushAccessMap[entity as string].push({
projection,
getUserIds,
getRemoteAccessInfo,
getRemotePushInfo,
endpoint,
});
}
@ -229,7 +227,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
if (pushNodes) {
await Promise.all(
pushNodes.map(
async ({ projection, getUserIds, getRemoteAccessInfo, endpoint }) => {
async ({ projection, getUserIds, getRemotePushInfo: getRemoteAccessInfo, endpoint }) => {
const rows = await context.select(targetEntity!, {
data: {
id: 1,
@ -284,8 +282,55 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
method: 'post',
fn: async (context, params, headers, req, body) => {
// body中是传过来的oper数组信息
const opers = body as ED['oper']['Schema'][];
const { entity } = params;
const {[OAK_SYNC_HEADER_ITEM]: id} = headers;
if (!this.remotePullInfoMap[entity]) {
this.remotePullInfoMap[entity] = {};
}
if (!this.remotePullInfoMap[entity]![id as string]) {
const { getRemotePullInfo } = this.config.remotes.find(ele => ele.entity === entity)!;
this.remotePullInfoMap[entity]![id as string] = await getRemotePullInfo(id as string);
}
const pullInfo = this.remotePullInfoMap[entity][id as string];
const { userId, algorithm, publicKey } = pullInfo;
// todo 解密
// 如果本次同步中有bornAt比本用户操作的最大的bornAt要小则说明是重复更新直接返回
const [ maxOper ] = await context.select('oper', {
data: {
id: 1,
bornAt: 1,
},
filter: {
operatorId: userId,
},
sorter: [
{
$attr: {
bornAt: 1,
},
$direction: 'desc',
},
],
indexFrom: 0,
count: 1,
}, { dontCollect: true });
const opers = body as ED['oper']['Schema'][];
const legalOpers = maxOper ? opers.filter(
ele => ele.bornAt > maxOper.bornAt
) : opers;
if (legalOpers.length > 0) {
}
else {
assert(maxOper);
return {
timestamp: maxOper.bornAt,
};
}
}
};
}

View File

@ -4,14 +4,20 @@ import { BackendRuntimeContext } from 'oak-frontend-base';
export type Algorithm = 'rsa' | 'ec' | 'ed25519';
export type RemoteAccessInfo = {
export type RemotePushInfo = {
url: string;
publicKey: string;
userId: string;
algorithm: Algorithm;
};
type SelfEncryptInfo = {
export type RemotePullInfo = {
id: string;
publicKey: string;
algorithm: Algorithm;
userId: string;
};
export type SelfEncryptInfo = {
id: string;
privateKey: string;
algorithm: Algorithm;
};
@ -25,16 +31,19 @@ export interface SyncEntityDef<ED extends EntityDict & BaseEntityDict, T extends
};
interface SyncRemoteConfigBase<ED extends EntityDict & BaseEntityDict> {
endpoint?: string; // 对方结点同步数据的endpoint默认为/sync
syncEntities: Array<SyncEntityDef<ED, keyof ED>>;
entity: keyof ED; // 对方结点所关联的entity名称
endpoint?: string; // 对方结点同步数据的endpoint默认为/sync/:entity
syncEntities: Array<SyncEntityDef<ED, keyof ED>>; // 在这个entity上需要同步的entities
};
interface SyncRemoteConfigWrapper<ED extends EntityDict & BaseEntityDict> extends SyncRemoteConfigBase<ED> {
getRemoteAccessInfo: (userId: string) => Promise<RemoteAccessInfo>;
getRemotePushInfo: (userId: string) => Promise<RemotePushInfo>;
getRemotePullInfo: (id: string) => Promise<RemotePullInfo>;
};
interface SyncRemoteConfig<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends SyncRemoteConfigBase<ED> {
getRemoteAccessInfo: (userId: string, context: Cxt) => Promise<RemoteAccessInfo>;
getRemotePushInfo: (userId: string, context: Cxt) => Promise<RemotePushInfo>;
getRemotePullInfo: (id: string, context: Cxt) => Promise<RemotePullInfo>;
};
interface SyncSelfConfigBase<ED extends EntityDict & BaseEntityDict> {