diff --git a/src/Synchronizer.ts b/src/Synchronizer.ts index b31d9f4..4dab02f 100644 --- a/src/Synchronizer.ts +++ b/src/Synchronizer.ts @@ -3,10 +3,11 @@ import { VolatileTrigger } from 'oak-domain/lib/types/Trigger'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { destructRelationPath, destructDirectPath } from 'oak-domain/lib/utils/relationPath'; import { BackendRuntimeContext } from 'oak-frontend-base'; -import { RemoteAccessInfo, SyncConfigWrapper, Algorithm } from './types/Sync'; +import { RemotePushInfo, SyncConfigWrapper, Algorithm, RemotePullInfo, SelfEncryptInfo } from './types/Sync'; import { assert } from 'console'; import { uniq } from 'oak-domain/lib/utils/lodash'; +const OAK_SYNC_HEADER_ITEM = 'oak-sync-remote-id'; type Channel = { remoteMaxTimestamp?: number; // 远端已经接受的最大时间戳 @@ -21,7 +22,7 @@ type Channel = { }; -async function pushRequestOnChannel(channel: Channel, algorithm: string, privateKey: string) { +async function pushRequestOnChannel(channel: Channel, selfEncryptInfo: SelfEncryptInfo) { const { queue, api } = channel; channel.queue = []; channel.lastPushTimestamp = Date.now(); @@ -29,10 +30,12 @@ async function pushRequestOnChannel(chan const opers = queue.map(ele => ele.oper); try { + // todo 加密 const res = await fetch(api, { method: 'post', headers: { 'Content-Type': 'application/json', + [OAK_SYNC_HEADER_ITEM]: selfEncryptInfo.id, }, body: JSON.stringify(opers), }); @@ -64,28 +67,28 @@ async function pushRequestOnChannel(chan export default class Synchronizer> { private config: SyncConfigWrapper; private schema: StorageSchema; - private privateKey?: string; - private algorithm?: string; + private selfEncryptInfo?: SelfEncryptInfo; + private remotePullInfoMap: Record> = {}; - private remoteAccessChannel: Record> = {}; + private remotePushChannel: Record> = {}; // 将产生的oper推送到远端Node。注意要尽量在本地阻止重复推送 private async pushOper( oper: Partial, userIds: string[], - getRemoteAccessInfo: (userId: string) => Promise, + getRemoteAccessInfo: (userId: string) => Promise, endpoint?: string) { await Promise.all( userIds.map( async (userId) => { - if (!this.remoteAccessChannel[userId]) { + if (!this.remotePushChannel[userId]) { const { url } = await getRemoteAccessInfo(userId); - this.remoteAccessChannel[userId] = { + this.remotePushChannel[userId] = { api: `${url}/${endpoint || 'sync'}`, queue: [], }; } - const channel = this.remoteAccessChannel[userId]; + const channel = this.remotePushChannel[userId]; if (channel.remoteMaxTimestamp && oper.bornAt! < channel.remoteMaxTimestamp) { // 说明已经同步过了 return; @@ -102,9 +105,8 @@ export default class Synchronizer { - assert(this.privateKey); - assert(this.algorithm); - await pushRequestOnChannel(channel, this.algorithm!, this.privateKey!); + assert(this.selfEncryptInfo); + await pushRequestOnChannel(channel, this.selfEncryptInfo!); }, 1000); // 1秒钟集中同步一次 } @@ -112,14 +114,10 @@ export default class Synchronizer[]) => string[]; // 从取得的行中获得userId的逻辑 - getRemoteAccessInfo: (userId: string) => Promise; // 根据userId获得相应push远端的信息 + getRemotePushInfo: (userId: string) => Promise; // 根据userId获得相应push远端的信息 endpoint?: string; // 远端接收endpoint的url }>> = {}; remotes.forEach( (remote) => { - const { getRemoteAccessInfo, syncEntities, endpoint } = remote; + const { getRemotePushInfo, syncEntities, endpoint } = remote; const pushEntityDefs = syncEntities.filter(ele => ele.direction === 'push'); const pushEntities = pushEntityDefs.map(ele => ele.entity); pushEntities.forEach( @@ -167,7 +165,7 @@ export default class Synchronizer { + async ({ projection, getUserIds, getRemotePushInfo: getRemoteAccessInfo, endpoint }) => { const rows = await context.select(targetEntity!, { data: { id: 1, @@ -284,8 +282,55 @@ export default class Synchronizer { // body中是传过来的oper数组信息 - const opers = body as ED['oper']['Schema'][]; + const { entity } = params; + const {[OAK_SYNC_HEADER_ITEM]: id} = headers; + if (!this.remotePullInfoMap[entity]) { + this.remotePullInfoMap[entity] = {}; + } + if (!this.remotePullInfoMap[entity]![id as string]) { + const { getRemotePullInfo } = this.config.remotes.find(ele => ele.entity === entity)!; + this.remotePullInfoMap[entity]![id as string] = await getRemotePullInfo(id as string); + } + + const pullInfo = this.remotePullInfoMap[entity][id as string]; + const { userId, algorithm, publicKey } = pullInfo; + // todo 解密 + // 如果本次同步中有bornAt比本用户操作的最大的bornAt要小,则说明是重复更新,直接返回 + const [ maxOper ] = await context.select('oper', { + data: { + id: 1, + bornAt: 1, + }, + filter: { + operatorId: userId, + }, + sorter: [ + { + $attr: { + bornAt: 1, + }, + $direction: 'desc', + }, + ], + indexFrom: 0, + count: 1, + }, { dontCollect: true }); + + const opers = body as ED['oper']['Schema'][]; + const legalOpers = maxOper ? opers.filter( + ele => ele.bornAt > maxOper.bornAt + ) : opers; + + if (legalOpers.length > 0) { + + } + else { + assert(maxOper); + return { + timestamp: maxOper.bornAt, + }; + } } }; } diff --git a/src/types/Sync.ts b/src/types/Sync.ts index 1ece91a..bfdcb03 100644 --- a/src/types/Sync.ts +++ b/src/types/Sync.ts @@ -4,14 +4,20 @@ import { BackendRuntimeContext } from 'oak-frontend-base'; export type Algorithm = 'rsa' | 'ec' | 'ed25519'; -export type RemoteAccessInfo = { +export type RemotePushInfo = { url: string; - publicKey: string; userId: string; - algorithm: Algorithm; }; -type SelfEncryptInfo = { +export type RemotePullInfo = { + id: string; + publicKey: string; + algorithm: Algorithm; + userId: string; +}; + +export type SelfEncryptInfo = { + id: string; privateKey: string; algorithm: Algorithm; }; @@ -25,16 +31,19 @@ export interface SyncEntityDef { - endpoint?: string; // 对方结点同步数据的endpoint,默认为/sync - syncEntities: Array>; + entity: keyof ED; // 对方结点所关联的entity名称 + endpoint?: string; // 对方结点同步数据的endpoint,默认为/sync/:entity + syncEntities: Array>; // 在这个entity上需要同步的entities }; interface SyncRemoteConfigWrapper extends SyncRemoteConfigBase { - getRemoteAccessInfo: (userId: string) => Promise; + getRemotePushInfo: (userId: string) => Promise; + getRemotePullInfo: (id: string) => Promise; }; interface SyncRemoteConfig> extends SyncRemoteConfigBase { - getRemoteAccessInfo: (userId: string, context: Cxt) => Promise; + getRemotePushInfo: (userId: string, context: Cxt) => Promise; + getRemotePullInfo: (id: string, context: Cxt) => Promise; }; interface SyncSelfConfigBase {