639 lines
29 KiB
JavaScript
639 lines
29 KiB
JavaScript
"use strict";
|
||
Object.defineProperty(exports, "__esModule", { value: true });
|
||
const tslib_1 = require("tslib");
|
||
const crypto_1 = require("crypto");
|
||
const types_1 = require("oak-domain/lib/types");
|
||
const relationPath_1 = require("oak-domain/lib/utils/relationPath");
|
||
const assert_1 = tslib_1.__importDefault(require("assert"));
|
||
const path_1 = require("path");
|
||
const lodash_1 = require("oak-domain/lib/utils/lodash");
|
||
const filter_1 = require("oak-domain/lib/store/filter");
|
||
const uuid_1 = require("oak-domain/lib/utils/uuid");
|
||
const lodash_2 = require("lodash");
|
||
const OAK_SYNC_HEADER_ENTITY = 'oak-sync-entity';
|
||
const OAK_SYNC_HEADER_ENTITY_ID = 'oak-sync-entity-id';
|
||
const OAK_SYNC_HEADER_TIMESTAMP = 'oak-sync-timestamp';
|
||
const OAK_SYNC_HEADER_NONCE = 'oak-sync-nonce';
|
||
const OAK_SYNC_HEADER_SIGN = 'oak-sync-sign';
|
||
function generateSignStr(body, ts, nonce) {
|
||
return `${body}\n${ts}\n${nonce}`;
|
||
}
|
||
async function sign(privateKey, body) {
|
||
const ts = Date.now();
|
||
const nonce = await (0, uuid_1.generateNewIdAsync)();
|
||
const sign2 = (0, crypto_1.createSign)('SHA256');
|
||
sign2.update(generateSignStr(body, `${ts}`, nonce));
|
||
sign2.end();
|
||
const signature = sign2.sign(privateKey).toString('hex');
|
||
return {
|
||
ts,
|
||
nonce,
|
||
signature,
|
||
};
|
||
}
|
||
function verify(publicKey, body, ts, nonce, signature) {
|
||
const verify2 = (0, crypto_1.createVerify)('SHA256');
|
||
verify2.update(generateSignStr(body, ts, nonce));
|
||
verify2.end();
|
||
return verify2.verify(publicKey, signature, 'hex');
|
||
}
|
||
async function fetchWithTimeout(url, options, timeout = 5000) {
|
||
if (typeof AbortController === 'undefined' || timeout === 0) {
|
||
return fetch(url, options);
|
||
}
|
||
const controller = new AbortController();
|
||
const signal = controller.signal;
|
||
// 设置超时
|
||
const timeoutId = setTimeout(() => {
|
||
controller.abort();
|
||
}, timeout);
|
||
// 发起 fetch 请求并传递 signal
|
||
return fetch(url, Object.assign({}, options, { signal }))
|
||
.then(response => {
|
||
clearTimeout(timeoutId); // 如果请求成功,清除超时
|
||
return response;
|
||
})
|
||
.catch(error => {
|
||
clearTimeout(timeoutId); // 如果请求失败,清除超时
|
||
if (error.name === 'AbortError') {
|
||
throw new types_1.OakRequestTimeoutException();
|
||
}
|
||
throw error; // 其他错误
|
||
});
|
||
}
|
||
class Synchronizer {
|
||
config;
|
||
schema;
|
||
remotePullInfoMap = {};
|
||
channelDict = {};
|
||
contextBuilder;
|
||
pushAccessMap = {};
|
||
async startChannel2(context, channel) {
|
||
const { queue, api, selfEncryptInfo, entity, entityId, onFailed, timeout = 5000 } = channel;
|
||
// todo 加密
|
||
const opers = queue.map(ele => ele.oper);
|
||
if (process.env.NODE_ENV === 'development') {
|
||
console.log('向远端结点sync oper', JSON.stringify(opers.map(ele => ({
|
||
id: ele.id,
|
||
seq: ele.$$seq$$,
|
||
}))), 'txnId:', context.getCurrentTxnId());
|
||
}
|
||
const finalApi = (0, path_1.join)(api, selfEncryptInfo.id);
|
||
channel.queue = [];
|
||
try {
|
||
const body = JSON.stringify(opers);
|
||
const { ts, nonce, signature } = await sign(selfEncryptInfo.privateKey, body);
|
||
const res = await fetchWithTimeout(finalApi, {
|
||
method: 'post',
|
||
headers: {
|
||
'Content-Type': 'application/json',
|
||
[OAK_SYNC_HEADER_ENTITY]: entity,
|
||
[OAK_SYNC_HEADER_ENTITY_ID]: entityId,
|
||
[OAK_SYNC_HEADER_TIMESTAMP]: `${ts}`,
|
||
[OAK_SYNC_HEADER_NONCE]: nonce,
|
||
[OAK_SYNC_HEADER_SIGN]: signature,
|
||
},
|
||
body,
|
||
}, timeout);
|
||
if (res.status !== 200) {
|
||
throw new Error(`sync数据时,访问api「${finalApi}」的结果不是200。「${res.status}」`);
|
||
}
|
||
const json = await res.json();
|
||
if (json.exception) {
|
||
throw new Error(`sync数据时,远端服务报异常「${json.exception}」`);
|
||
}
|
||
}
|
||
catch (err) {
|
||
if (onFailed) {
|
||
context.on('rollback', async () => {
|
||
const context2 = this.contextBuilder();
|
||
await context2.begin();
|
||
try {
|
||
await onFailed({
|
||
remoteEntity: entity,
|
||
remoteEntityId: entityId,
|
||
data: queue.map((ele) => ({
|
||
entity: ele.oper.targetEntity,
|
||
rowIds: ele.oper.filter.id.$in,
|
||
action: ele.oper.action,
|
||
data: ele.oper.data,
|
||
})),
|
||
reason: err,
|
||
}, context2);
|
||
await context2.commit();
|
||
}
|
||
catch (err) {
|
||
await context2.rollback();
|
||
}
|
||
});
|
||
}
|
||
throw err;
|
||
}
|
||
// 如果是200,则已经成功
|
||
for (const ele of queue) {
|
||
const { oper, onSynchronized } = ele;
|
||
if (onSynchronized) {
|
||
const operEntityArr = await context.select('operEntity', {
|
||
data: {
|
||
id: 1,
|
||
entity: 1,
|
||
entityId: 1
|
||
},
|
||
filter: {
|
||
operId: oper.id,
|
||
}
|
||
}, {});
|
||
const entityIds = operEntityArr.map(ele => ele.entityId);
|
||
return onSynchronized({
|
||
action: oper.action,
|
||
data: oper.data,
|
||
rowIds: entityIds,
|
||
remoteEntity: entity,
|
||
remoteEntityId: entityId,
|
||
}, context);
|
||
}
|
||
}
|
||
/**
|
||
* 自主将triggerData属性清零
|
||
*/
|
||
const operIds = queue.map(ele => ele.oper.id);
|
||
await context.operate('oper', {
|
||
id: await (0, uuid_1.generateNewIdAsync)(),
|
||
action: 'update',
|
||
data: {
|
||
[types_1.TriggerDataAttribute]: null,
|
||
[types_1.TriggerUuidAttribute]: null,
|
||
},
|
||
filter: {
|
||
id: {
|
||
$in: operIds,
|
||
}
|
||
}
|
||
}, {});
|
||
}
|
||
/**开始同步这些channel上的oper。注意,这时候即使某个channel上失败了,也不应影响本事务提交(其它的channel成功了) */
|
||
async startAllChannel(context) {
|
||
return await Promise.all(Object.keys(this.channelDict).map(async (k) => {
|
||
const channel = this.channelDict[k];
|
||
if (channel.queue.length > 0) {
|
||
channel.queue.sort((o1, o2) => o1.oper.$$seq$$ - o2.oper.$$seq$$);
|
||
try {
|
||
return this.startChannel2(context, channel);
|
||
}
|
||
catch (err) {
|
||
const msg = `startChannel推送数据出错,channel是「${k}」,异常是「${err.message}」`;
|
||
console.error(err);
|
||
return new types_1.OakPartialSuccess(msg);
|
||
}
|
||
}
|
||
}));
|
||
}
|
||
pushOperToChannel(oper, userId, url, endpoint, remoteEntity, remoteEntityId, selfEncryptInfo, onSynchronized, onFailed, timeout) {
|
||
if (!this.channelDict[userId]) {
|
||
// channel上缓存这些信息,暂不支持动态更新
|
||
this.channelDict[userId] = {
|
||
api: (0, path_1.join)(url, 'endpoint', endpoint),
|
||
queue: [],
|
||
entity: remoteEntity,
|
||
entityId: remoteEntityId,
|
||
selfEncryptInfo,
|
||
onFailed,
|
||
timeout,
|
||
};
|
||
}
|
||
else {
|
||
// 趁机更新一下加密信息
|
||
this.channelDict[userId].selfEncryptInfo = selfEncryptInfo;
|
||
(0, assert_1.default)(this.channelDict[userId].onFailed === onFailed);
|
||
}
|
||
const channel = this.channelDict[userId];
|
||
(0, assert_1.default)(channel.api === (0, path_1.join)(url, 'endpoint', endpoint));
|
||
(0, assert_1.default)(channel.entity === remoteEntity);
|
||
(0, assert_1.default)(channel.entityId === remoteEntityId);
|
||
if (channel.queue.find(ele => ele.oper.id === oper.id)) {
|
||
console.error('channel.queue找到相同的需推送的oper');
|
||
}
|
||
channel.queue.push({
|
||
oper,
|
||
onSynchronized,
|
||
});
|
||
}
|
||
refineOperData(oper, rowIds) {
|
||
const { action, id, targetEntity, data, $$seq$$, filter } = oper;
|
||
const data2 = (action === 'create' && data instanceof Array) ? data.filter(ele => rowIds.includes(ele.id)) : data;
|
||
// 过滤掉数据中的跨事务trigger信息
|
||
if (data2 instanceof Array) {
|
||
data2.forEach((d) => {
|
||
(0, lodash_2.unset)(d, types_1.TriggerDataAttribute);
|
||
(0, lodash_2.unset)(d, types_1.TriggerUuidAttribute);
|
||
});
|
||
}
|
||
else {
|
||
(0, lodash_2.unset)(data2, types_1.TriggerDataAttribute);
|
||
(0, lodash_2.unset)(data2, types_1.TriggerUuidAttribute);
|
||
}
|
||
return {
|
||
id, action, targetEntity, data: data2, $$seq$$, filter,
|
||
};
|
||
}
|
||
async dispatchOperToChannels(oper, context) {
|
||
const { operatorId, targetEntity, filter, action, data, operEntity$oper } = oper;
|
||
const entityIds = operEntity$oper?.map(ele => ele.entityId);
|
||
(0, assert_1.default)(entityIds && entityIds.length > 0);
|
||
const pushEntityNodes = this.pushAccessMap[targetEntity];
|
||
let pushed = false;
|
||
if (pushEntityNodes && pushEntityNodes.length > 0) {
|
||
// 每个pushEntityNode代表配置的一个remoteEntity
|
||
await Promise.all(pushEntityNodes.map(async (node) => {
|
||
const { projection, groupByUsers, getRemotePushInfo: getRemoteAccessInfo, groupBySelfEntity, endpoint, actions, onSynchronized, onFailed, timeout } = node;
|
||
// 定义中应该不可能没有actions
|
||
if (!actions || actions.includes(action)) {
|
||
const rows = await context.select(targetEntity, {
|
||
data: {
|
||
id: 1,
|
||
...projection,
|
||
},
|
||
filter: {
|
||
id: {
|
||
$in: entityIds,
|
||
},
|
||
},
|
||
}, { dontCollect: true, includedDeleted: true });
|
||
// userId就是需要发送给远端的user,但是要将本次操作的user过滤掉(操作的原本产生者)
|
||
const userSendDict = groupByUsers(rows);
|
||
const selfEntityIdDict = groupBySelfEntity(rows);
|
||
const encryptInfoDict = {};
|
||
const pushToUserIdFn = async (userId) => {
|
||
const { entity, entityId, rowIds } = userSendDict[userId];
|
||
const selfEntityIds = rowIds.map((rowId) => selfEntityIdDict[rowId]);
|
||
const uniqSelfEntityIds = (0, lodash_2.uniq)(selfEntityIds);
|
||
(0, assert_1.default)(uniqSelfEntityIds.length === 1, '推向同一个userId的oper不可能关联在多个不同的selfEntity行上');
|
||
const selfEntityId = uniqSelfEntityIds[0];
|
||
if (!encryptInfoDict[selfEntityId]) {
|
||
encryptInfoDict[selfEntityId] = await this.config.self.getSelfEncryptInfo(context, selfEntityId);
|
||
}
|
||
const selfEncryptInfo = encryptInfoDict[selfEntityId];
|
||
// 推送到远端结点的oper
|
||
const oper2 = this.refineOperData(oper, rowIds);
|
||
const { url } = await getRemoteAccessInfo(context, {
|
||
userId,
|
||
remoteEntityId: entityId,
|
||
});
|
||
this.pushOperToChannel(oper2, userId, url, endpoint, entity, entityId, selfEncryptInfo, onSynchronized, onFailed, timeout);
|
||
};
|
||
for (const userId in userSendDict) {
|
||
if (userId !== operatorId || !oper.bornAt) {
|
||
await pushToUserIdFn(userId);
|
||
pushed = true;
|
||
}
|
||
}
|
||
}
|
||
}));
|
||
}
|
||
// 如果oper一个也不用推送,说明其定义的推送path和对象行的path不匹配(动态指针)
|
||
return pushed;
|
||
}
|
||
/**
|
||
* 为了保证推送的oper序,采用从database中顺序读取所有需要推送的oper来进行推送
|
||
* 每个进程都保证把当前所有的oper顺序处理掉,就不会有乱序的问题,大家通过database上的锁来完成同步
|
||
* @param context
|
||
*/
|
||
async trySynchronizeOpers(context) {
|
||
let result = undefined;
|
||
// 暂时全用root身份去执行(未来不一定对)
|
||
await context.initialize();
|
||
context.openRootMode();
|
||
let dirtyOpers = await context.select('oper', {
|
||
data: {
|
||
id: 1,
|
||
},
|
||
filter: {
|
||
[types_1.TriggerDataAttribute]: {
|
||
$exists: true,
|
||
},
|
||
}
|
||
}, { dontCollect: true });
|
||
if (dirtyOpers.length > 0) {
|
||
// 这一步是加锁,保证只有一个进程完成推送,推送者提交前会将$$triggerData$$清零
|
||
const ids = dirtyOpers.map(ele => ele.id);
|
||
dirtyOpers = await context.select('oper', {
|
||
data: {
|
||
id: 1,
|
||
action: 1,
|
||
data: 1,
|
||
targetEntity: 1,
|
||
operatorId: 1,
|
||
[types_1.TriggerDataAttribute]: 1,
|
||
bornAt: 1,
|
||
$$createAt$$: 1,
|
||
$$seq$$: 1,
|
||
filter: 1,
|
||
operEntity$oper: {
|
||
$entity: 'operEntity',
|
||
data: {
|
||
entityId: 1,
|
||
operId: 1,
|
||
entity: 1,
|
||
id: 1,
|
||
}
|
||
}
|
||
},
|
||
filter: {
|
||
id: { $in: ids },
|
||
},
|
||
}, { dontCollect: true, forUpdate: true });
|
||
dirtyOpers = dirtyOpers.filter(ele => !!ele[types_1.TriggerDataAttribute]);
|
||
if (dirtyOpers.length > 0) {
|
||
for (const c in this.channelDict) {
|
||
(0, assert_1.default)(this.channelDict[c].queue.length === 0);
|
||
}
|
||
const pushedIds = [];
|
||
const unPushedIds = [];
|
||
await Promise.all(dirtyOpers.map(async (oper) => {
|
||
const result = await this.dispatchOperToChannels(oper, context);
|
||
if (result) {
|
||
pushedIds.push(oper.id);
|
||
}
|
||
else {
|
||
unPushedIds.push(oper.id);
|
||
}
|
||
}));
|
||
if (unPushedIds.length > 0) {
|
||
await context.operate('oper', {
|
||
id: await (0, uuid_1.generateNewIdAsync)(),
|
||
action: 'update',
|
||
data: {
|
||
[types_1.TriggerDataAttribute]: null,
|
||
[types_1.TriggerUuidAttribute]: null,
|
||
},
|
||
filter: {
|
||
id: {
|
||
$in: unPushedIds,
|
||
}
|
||
}
|
||
}, {});
|
||
}
|
||
if (pushedIds.length > 0) {
|
||
result = await this.startAllChannel(context);
|
||
}
|
||
}
|
||
}
|
||
if (result) {
|
||
const exception = result.find(ele => ele instanceof Error);
|
||
if (exception) {
|
||
throw exception;
|
||
}
|
||
}
|
||
}
|
||
makeCreateOperTrigger() {
|
||
const { config } = this;
|
||
const { remotes, self } = config;
|
||
// 根据remotes定义,建立从entity到需要同步的远端结点信息的Map
|
||
remotes.forEach((remote) => {
|
||
const { getPushInfo, pushEntities: pushEntityDefs, endpoint, pathToUser, relationName: rnRemote, onFailed, timeout } = remote;
|
||
if (pushEntityDefs) {
|
||
const pushEntities = [];
|
||
const endpoint2 = (0, path_1.join)(endpoint || 'sync', self.entity);
|
||
for (const def of pushEntityDefs) {
|
||
const { pathToRemoteEntity, pathToSelfEntity, relationName, recursive, entity, actions, onSynchronized } = def;
|
||
pushEntities.push(entity);
|
||
const relationName2 = relationName || rnRemote;
|
||
const path2 = pathToUser ? `${pathToRemoteEntity}.${pathToUser}` : pathToRemoteEntity;
|
||
(0, assert_1.default)(!recursive);
|
||
const { projection, getData } = relationName2 ? (0, relationPath_1.destructRelationPath)(this.schema, entity, path2, {
|
||
relation: {
|
||
name: relationName,
|
||
}
|
||
}, recursive) : (0, relationPath_1.destructDirectUserPath)(this.schema, entity, path2);
|
||
const toSelfEntity = (0, relationPath_1.destructDirectPath)(this.schema, entity, pathToSelfEntity);
|
||
const groupByUsers = (rows) => {
|
||
const userRowDict = {};
|
||
rows.forEach((row) => {
|
||
const goals = getData(row);
|
||
if (goals) {
|
||
goals.forEach(({ entity, entityId, userId }) => {
|
||
(0, assert_1.default)(userId);
|
||
if (userRowDict[userId]) {
|
||
// 逻辑上来说同一个userId,其关联的entity和entityId必然相同,这个entity/entityId代表了对方
|
||
(0, assert_1.default)(userRowDict[userId].entity === entity && userRowDict[userId].entityId === entityId);
|
||
userRowDict[userId].rowIds.push(row.id);
|
||
}
|
||
else {
|
||
userRowDict[userId] = {
|
||
entity,
|
||
entityId,
|
||
rowIds: [row.id],
|
||
};
|
||
}
|
||
});
|
||
}
|
||
});
|
||
return userRowDict;
|
||
};
|
||
const projectionMerged = (0, lodash_2.merge)(projection, toSelfEntity.projection);
|
||
const groupBySelfEntity = (rows) => {
|
||
const selfEntityIdDict = {};
|
||
for (const row of rows) {
|
||
const selfEntityInfo = toSelfEntity.getData(row, pathToSelfEntity);
|
||
if (selfEntityInfo) {
|
||
const selfEntityIds = selfEntityInfo.map((info) => {
|
||
(0, assert_1.default)(info.entity === this.config.self.entity);
|
||
return info.data.id;
|
||
});
|
||
const uniqSelfEntityIds = (0, lodash_2.uniq)(selfEntityIds);
|
||
(0, assert_1.default)(uniqSelfEntityIds.length === 1, '同一行数据不可能关联在两行selfEntity上');
|
||
selfEntityIdDict[row.id] = uniqSelfEntityIds[0];
|
||
}
|
||
}
|
||
return selfEntityIdDict;
|
||
};
|
||
if (!this.pushAccessMap[entity]) {
|
||
this.pushAccessMap[entity] = [{
|
||
projection: projectionMerged,
|
||
groupByUsers,
|
||
groupBySelfEntity,
|
||
getRemotePushInfo: getPushInfo,
|
||
endpoint: endpoint2,
|
||
entity,
|
||
actions,
|
||
onSynchronized,
|
||
onFailed,
|
||
timeout,
|
||
}];
|
||
}
|
||
else {
|
||
this.pushAccessMap[entity].push({
|
||
projection,
|
||
groupByUsers,
|
||
groupBySelfEntity,
|
||
getRemotePushInfo: getPushInfo,
|
||
endpoint: endpoint2,
|
||
entity,
|
||
actions,
|
||
onSynchronized,
|
||
onFailed,
|
||
timeout,
|
||
});
|
||
}
|
||
}
|
||
}
|
||
});
|
||
const pushEntities = Object.keys(this.pushAccessMap);
|
||
// push相关联的entity,在发生操作时,需要将operation推送到远端
|
||
const createOperTrigger = {
|
||
name: 'push oper to remote node',
|
||
entity: 'oper',
|
||
action: 'create',
|
||
when: 'commit',
|
||
strict: 'makeSure',
|
||
singleton: true,
|
||
grouped: true,
|
||
check: (operation) => {
|
||
const { data } = operation;
|
||
const { targetEntity, action } = data;
|
||
return pushEntities.includes(data.targetEntity)
|
||
&& !!this.pushAccessMap[targetEntity].find(({ actions }) => !actions || actions.includes(action));
|
||
},
|
||
fn: async ({ ids }, context) => {
|
||
return this.trySynchronizeOpers(context);
|
||
}
|
||
};
|
||
return createOperTrigger;
|
||
}
|
||
constructor(config, schema, contextBuilder) {
|
||
this.config = config;
|
||
this.schema = schema;
|
||
this.contextBuilder = contextBuilder;
|
||
}
|
||
/**
|
||
* 根据sync的定义,生成对应的 commit triggers
|
||
* @returns
|
||
*/
|
||
getSyncTriggers() {
|
||
return [this.makeCreateOperTrigger()];
|
||
}
|
||
getSelfEndpoint() {
|
||
return {
|
||
name: this.config.self.endpoint || 'sync',
|
||
method: 'post',
|
||
params: ['entity', 'entityId'],
|
||
fn: async (context, params, headers, req, body) => {
|
||
// body中是传过来的oper数组信息
|
||
const { entity, entityId } = params;
|
||
const { [OAK_SYNC_HEADER_ENTITY]: meEntity, [OAK_SYNC_HEADER_ENTITY_ID]: meEntityId, [OAK_SYNC_HEADER_NONCE]: syncNonce, [OAK_SYNC_HEADER_TIMESTAMP]: syncTs, [OAK_SYNC_HEADER_SIGN]: syncSign } = headers;
|
||
if (process.env.NODE_ENV === 'development') {
|
||
console.log('接收到来自远端的sync数据', entity, JSON.stringify(body));
|
||
}
|
||
// todo 这里先缓存,不考虑本身同步相关信息的更新
|
||
if (!this.remotePullInfoMap[entity]) {
|
||
this.remotePullInfoMap[entity] = {};
|
||
}
|
||
if (!this.remotePullInfoMap[entity][entityId]) {
|
||
const { getPullInfo, pullEntities, clockDriftDuration } = this.config.remotes.find(ele => ele.entity === entity);
|
||
const pullEntityDict = {};
|
||
if (pullEntities) {
|
||
pullEntities.forEach((def) => pullEntityDict[def.entity] = def);
|
||
}
|
||
const closeFn = context.openRootMode();
|
||
this.remotePullInfoMap[entity][entityId] = {
|
||
pullInfo: await getPullInfo(context, {
|
||
selfId: meEntityId,
|
||
remoteEntityId: entityId,
|
||
}),
|
||
pullEntityDict,
|
||
clockDriftDuration,
|
||
};
|
||
closeFn();
|
||
}
|
||
const { pullInfo, pullEntityDict, clockDriftDuration } = this.remotePullInfoMap[entity][entityId];
|
||
const { userId, algorithm, publicKey, cxtInfo } = pullInfo;
|
||
(0, assert_1.default)(userId);
|
||
context.setCurrentUserId(userId);
|
||
if (cxtInfo) {
|
||
await context.initialize(cxtInfo);
|
||
}
|
||
const syncTimestamp = parseInt(syncTs, 10);
|
||
if (clockDriftDuration !== 0) {
|
||
if (!(Date.now() - syncTimestamp < (clockDriftDuration || 10000))) {
|
||
throw new types_1.OakClockDriftException('同步时钟漂移过长');
|
||
}
|
||
}
|
||
if (!verify(publicKey, JSON.stringify(body), syncTs, syncNonce, syncSign)) {
|
||
throw new Error('sync验签失败');
|
||
}
|
||
const opers = body;
|
||
const ids = opers.map(ele => ele.id);
|
||
const existsIds = (await context.select('oper', {
|
||
data: {
|
||
id: 1,
|
||
},
|
||
filter: {
|
||
id: {
|
||
$in: ids,
|
||
},
|
||
}
|
||
}, {})).map(ele => ele.id);
|
||
const staleOpers = opers.filter((ele) => existsIds.includes(ele.id));
|
||
const freshOpers = opers.filter((ele) => !existsIds.includes(ele.id));
|
||
if (process.env.NODE_ENV !== 'production') {
|
||
const maxStaleSeq = Math.max(...staleOpers.map(ele => ele.$$seq$$));
|
||
for (const oper of freshOpers) {
|
||
(0, assert_1.default)(oper.$$seq$$ > maxStaleSeq, '发现了seq没有按序进行同步');
|
||
}
|
||
}
|
||
// 检查已经应用过的opers是否完整
|
||
const staleIds = staleOpers.map(ele => ele.id);
|
||
if (staleIds.length > 0) {
|
||
const opersExisted = await context.select('oper', {
|
||
data: {
|
||
id: 1,
|
||
},
|
||
filter: {
|
||
id: {
|
||
$in: staleIds,
|
||
}
|
||
}
|
||
}, { dontCollect: true });
|
||
if (opersExisted.length < staleIds.length) {
|
||
const missed = (0, lodash_1.difference)(staleIds, opersExisted.map(ele => ele.id));
|
||
// todo 这里如果远端业务逻辑严格,发生乱序应是无关的oper,直接执行就好 by Xc
|
||
throw new Error(`在sync过程中发现有丢失的oper数据「${missed}」`);
|
||
}
|
||
}
|
||
// 应用所有的freshOpers,失败则报错
|
||
for (const freshOper of freshOpers) {
|
||
// freshOpers是按$$seq$$序产生的
|
||
const { id, targetEntity, action, data, $$seq$$, filter } = freshOper;
|
||
const ids = (0, filter_1.getRelevantIds)(filter);
|
||
(0, assert_1.default)(ids.length > 0);
|
||
if (pullEntityDict && pullEntityDict[targetEntity]) {
|
||
const { process } = pullEntityDict[targetEntity];
|
||
if (process) {
|
||
await process(action, data, context);
|
||
}
|
||
}
|
||
const operation = {
|
||
id,
|
||
data,
|
||
action,
|
||
filter: {
|
||
id: ids.length === 1 ? ids[0] : {
|
||
$in: ids,
|
||
},
|
||
},
|
||
bornAt: $$seq$$,
|
||
};
|
||
await context.operate(targetEntity, operation, {});
|
||
}
|
||
if (process.env.NODE_ENV === 'development') {
|
||
console.log(`同步成功,其中重复的oper ${staleIds.length}个,新的oper ${freshOpers.length}个。`);
|
||
}
|
||
return {};
|
||
}
|
||
};
|
||
}
|
||
tryCreateSyncProcess() {
|
||
}
|
||
}
|
||
exports.default = Synchronizer;
|