继续完善synch

This commit is contained in:
Xu Chang 2024-02-06 18:40:25 +08:00
parent fd93173d7e
commit 2ef489e58a
2 changed files with 108 additions and 7 deletions

View File

@ -138,7 +138,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
this.synchronizer = new Synchronizer({
self: {
entity: self.entity,
// entity: self.entity,
getSelfEncryptInfo: async() => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
@ -155,7 +155,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
},
remotes: remotes.map(
(r) => ({
entity: r.entity,
// entity: r.entity,
syncEntities: r.syncEntities,
getRemoteAccessInfo: async (id) => {
const context = await contextBuilder()(this.dbStore);

View File

@ -8,19 +8,118 @@ import { assert } from 'console';
import { uniq } from 'oak-domain/lib/utils/lodash';
type Channel<ED extends EntityDict & BaseEntityDict> = {
remoteMaxTimestamp?: number; // 远端已经接受的最大时间戳
queue: Array<{
resolve: () => void;
reject: (err: any) => void;
oper: Partial<ED['oper']['Schema']>;
}>; // 要推送的oper队列
api: string; // 推送的api
lastPushTimestamp?: number; // 最后一次推送的时间戳
handler?: ReturnType<typeof setTimeout>; // 推送定时器
};
async function pushRequestOnChannel<ED extends EntityDict & BaseEntityDict>(channel: Channel<ED>, algorithm: string, privateKey: string) {
const { queue, api } = channel;
channel.queue = [];
channel.lastPushTimestamp = Date.now();
channel.handler = undefined;
const opers = queue.map(ele => ele.oper);
try {
const res = await fetch(api, {
method: 'post',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(opers),
});
if (res.status !== 200) {
throw new Error(`访问api「${api}」的结果不是200。「${res.status}`);
}
const json = await res.json();
const { timestamp, error } = json;
if (error) {
throw new Error(`访问api「${api}」的结果出错,是${error}`);
}
if (!channel.remoteMaxTimestamp || channel.remoteMaxTimestamp < timestamp) {
channel.remoteMaxTimestamp = timestamp;
}
queue.forEach(
(ele) => ele.resolve()
);
}
catch (err: any) {
queue.forEach(
({ reject }) => reject(err)
);
}
}
export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> {
private config: SyncConfigWrapper<ED>;
private schema: StorageSchema<ED>;
private privateKey?: string;
private algorithm?: string;
private remoteAccessChannel: Record<string, Channel<ED>> = {};
// 将产生的oper推送到远端Node。注意要尽量在本地阻止重复推送
private async pushOper(
oper: Partial<ED['oper']['Schema']>,
userIds: string[],
getRemoteAccessInfo: (userId: string) => Promise<RemoteAccessInfo>,
privateKey: string,
algorithm: Algorithm,
endpoint?: string) {
await Promise.all(
userIds.map(
async (userId) => {
if (!this.remoteAccessChannel[userId]) {
const { url } = await getRemoteAccessInfo(userId);
this.remoteAccessChannel[userId] = {
api: `${url}/${endpoint || 'sync'}`,
queue: [],
};
}
const channel = this.remoteAccessChannel[userId];
if (channel.remoteMaxTimestamp && oper.bornAt! < channel.remoteMaxTimestamp) {
// 说明已经同步过了
return;
}
const waiter = new Promise<void>(
(resolve, reject) => {
channel.queue.push({
oper,
resolve,
reject
});
}
);
if (!channel.handler) {
channel.handler = setTimeout(async () => {
assert(this.privateKey);
assert(this.algorithm);
await pushRequestOnChannel(channel, this.algorithm!, this.privateKey!);
}, 1000); // 1秒钟集中同步一次
}
await waiter;
}
)
);
}
private async loadPublicKey() {
const { privateKey, algorithm } = await this.config.self.getSelfEncryptInfo();
this.privateKey = privateKey;
this.algorithm = algorithm;
}
private makeCreateOperTrigger() {
@ -115,6 +214,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
entityId: 1,
},
},
$$createAt$$: 1,
},
filter: {
id: ids[0],
@ -125,8 +225,6 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
ele => ele.entityId!
);
const { privateKey, algorithm } = await self.getSelfEncryptInfo();
const pushNodes = pushAccessMap[targetEntity!];
if (pushNodes) {
await Promise.all(
@ -150,7 +248,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
);
if (userIds.length > 0) {
await this.pushOper(oper, userIds, getRemoteAccessInfo, privateKey, algorithm, endpoint);
await this.pushOper(oper, userIds, getRemoteAccessInfo, endpoint);
}
return undefined;
}
@ -169,6 +267,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
constructor(config: SyncConfigWrapper<ED>, schema: StorageSchema<ED>) {
this.config = config;
this.schema = schema;
this.loadPublicKey();
}
/**
@ -184,6 +283,8 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
name: this.config.self.endpoint || 'sync',
method: 'post',
fn: async (context, params, headers, req, body) => {
// body中是传过来的oper数组信息
const opers = body as ED['oper']['Schema'][];
}
};