"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); const tslib_1 = require("tslib"); const crypto_1 = require("crypto"); const types_1 = require("oak-domain/lib/types"); const relationPath_1 = require("oak-domain/lib/utils/relationPath"); const assert_1 = tslib_1.__importDefault(require("assert")); const path_1 = require("path"); const lodash_1 = require("oak-domain/lib/utils/lodash"); const filter_1 = require("oak-domain/lib/store/filter"); const uuid_1 = require("oak-domain/lib/utils/uuid"); const lodash_2 = require("lodash"); const OAK_SYNC_HEADER_ENTITY = 'oak-sync-entity'; const OAK_SYNC_HEADER_ENTITY_ID = 'oak-sync-entity-id'; const OAK_SYNC_HEADER_TIMESTAMP = 'oak-sync-timestamp'; const OAK_SYNC_HEADER_NONCE = 'oak-sync-nonce'; const OAK_SYNC_HEADER_SIGN = 'oak-sync-sign'; function generateSignStr(body, ts, nonce) { return `${body}\n${ts}\n${nonce}`; } async function sign(privateKey, body) { const ts = Date.now(); const nonce = await (0, uuid_1.generateNewIdAsync)(); const sign2 = (0, crypto_1.createSign)('SHA256'); sign2.update(generateSignStr(body, `${ts}`, nonce)); sign2.end(); const signature = sign2.sign(privateKey).toString('hex'); return { ts, nonce, signature, }; } function verify(publicKey, body, ts, nonce, signature) { const verify2 = (0, crypto_1.createVerify)('SHA256'); verify2.update(generateSignStr(body, ts, nonce)); verify2.end(); return verify2.verify(publicKey, signature, 'hex'); } async function fetchWithTimeout(url, options, timeout = 5000) { if (typeof AbortController === 'undefined' || timeout === 0) { return fetch(url, options); } const controller = new AbortController(); const signal = controller.signal; // 设置超时 const timeoutId = setTimeout(() => { controller.abort(); }, timeout); // 发起 fetch 请求并传递 signal return fetch(url, Object.assign({}, options, { signal })) .then(response => { clearTimeout(timeoutId); // 如果请求成功,清除超时 return response; }) .catch(error => { clearTimeout(timeoutId); // 如果请求失败,清除超时 if (error.name === 'AbortError') { throw new types_1.OakRequestTimeoutException(); } throw error; // 其他错误 }); } class Synchronizer { config; schema; remotePullInfoMap = {}; channelDict = {}; contextBuilder; pushAccessMap = {}; async startChannel2(context, channel) { const { queue, api, selfEncryptInfo, entity, entityId, onFailed, timeout = 5000 } = channel; // todo 加密 const opers = queue.map(ele => ele.oper); if (process.env.NODE_ENV === 'development') { console.log('向远端结点sync oper', JSON.stringify(opers.map(ele => ({ id: ele.id, seq: ele.$$seq$$, }))), 'txnId:', context.getCurrentTxnId()); } const finalApi = (0, path_1.join)(api, selfEncryptInfo.id); channel.queue = []; try { const body = JSON.stringify(opers); const { ts, nonce, signature } = await sign(selfEncryptInfo.privateKey, body); const res = await fetchWithTimeout(finalApi, { method: 'post', headers: { 'Content-Type': 'application/json', [OAK_SYNC_HEADER_ENTITY]: entity, [OAK_SYNC_HEADER_ENTITY_ID]: entityId, [OAK_SYNC_HEADER_TIMESTAMP]: `${ts}`, [OAK_SYNC_HEADER_NONCE]: nonce, [OAK_SYNC_HEADER_SIGN]: signature, }, body, }, timeout); if (res.status !== 200) { throw new Error(`sync数据时,访问api「${finalApi}」的结果不是200。「${res.status}」`); } const json = await res.json(); if (json.exception) { throw new Error(`sync数据时,远端服务报异常「${json.exception}」`); } } catch (err) { if (onFailed) { context.on('rollback', async () => { const context2 = this.contextBuilder(); await context2.begin(); try { await onFailed({ remoteEntity: entity, remoteEntityId: entityId, data: queue.map((ele) => ({ entity: ele.oper.targetEntity, rowIds: ele.oper.filter.id.$in, // 暂时应该没什么用 action: ele.oper.action, data: ele.oper.data, })), reason: err, }, context2); await context2.commit(); } catch (err) { await context2.rollback(); } }); } throw err; } // 如果是200,则已经成功 for (const ele of queue) { const { oper, onSynchronized } = ele; if (onSynchronized) { const operEntityArr = await context.select('operEntity', { data: { id: 1, entity: 1, entityId: 1 }, filter: { operId: oper.id, } }, {}); const entityIds = operEntityArr.map(ele => ele.entityId); await onSynchronized({ action: oper.action, data: oper.data, rowIds: entityIds, remoteEntity: entity, remoteEntityId: entityId, }, context); } } const operIds = queue.map(ele => ele.oper.id); // 在内部清理相应的triggerData await context.operate('oper', { id: await (0, uuid_1.generateNewIdAsync)(), action: 'update', data: { [types_1.TriggerDataAttribute]: null, [types_1.TriggerUuidAttribute]: null, }, filter: { id: { $in: operIds, } } }, {}); } /**开始同步这些channel上的oper。注意,这时候即使某个channel上失败了,也不应影响本事务提交(其它的channel成功了) */ async startAllChannel(context) { return await Promise.all(Object.keys(this.channelDict).map(async (k) => { const channel = this.channelDict[k]; if (channel.queue.length > 0) { channel.queue.sort((o1, o2) => o1.oper.$$seq$$ - o2.oper.$$seq$$); try { return this.startChannel2(context, channel); } catch (err) { const msg = `startChannel推送数据出错,channel是「${k}」,异常是「${err.message}」`; console.error(err); return new types_1.OakPartialSuccess(msg); } } })); } pushOperToChannel(oper, userId, url, endpoint, remoteEntity, remoteEntityId, selfEncryptInfo, onSynchronized, onFailed, timeout) { if (!this.channelDict[userId]) { // channel上缓存这些信息,暂不支持动态更新 this.channelDict[userId] = { api: (0, path_1.join)(url, 'endpoint', endpoint), queue: [], entity: remoteEntity, entityId: remoteEntityId, selfEncryptInfo, onFailed, timeout, }; } else { // 趁机更新一下加密信息 this.channelDict[userId].selfEncryptInfo = selfEncryptInfo; (0, assert_1.default)(this.channelDict[userId].onFailed === onFailed); } const channel = this.channelDict[userId]; (0, assert_1.default)(channel.api === (0, path_1.join)(url, 'endpoint', endpoint)); (0, assert_1.default)(channel.entity === remoteEntity); (0, assert_1.default)(channel.entityId === remoteEntityId); if (channel.queue.find(ele => ele.oper.id === oper.id)) { console.error('channel.queue找到相同的需推送的oper'); } channel.queue.push({ oper, onSynchronized, }); } refineOperData(oper, rowIds) { const { action, id, targetEntity, data, $$seq$$, filter } = oper; const data2 = (action === 'create' && data instanceof Array) ? data.filter(ele => rowIds.includes(ele.id)) : data; // 过滤掉数据中的跨事务trigger信息 if (data2 instanceof Array) { data2.forEach((d) => { (0, lodash_2.unset)(d, types_1.TriggerDataAttribute); (0, lodash_2.unset)(d, types_1.TriggerUuidAttribute); }); } else { (0, lodash_2.unset)(data2, types_1.TriggerDataAttribute); (0, lodash_2.unset)(data2, types_1.TriggerUuidAttribute); } return { id, action, targetEntity, data: data2, $$seq$$, filter, }; } async dispatchOperToChannels(oper, context) { const { operatorId, targetEntity, filter, action, data, operEntity$oper } = oper; const entityIds = operEntity$oper?.map(ele => ele.entityId); (0, assert_1.default)(entityIds && entityIds.length > 0); const pushEntityNodes = this.pushAccessMap[targetEntity]; let pushed = false; if (pushEntityNodes && pushEntityNodes.length > 0) { // 每个pushEntityNode代表配置的一个remoteEntity await Promise.all(pushEntityNodes.map(async (node) => { const { projection, groupByUsers, getRemotePushInfo: getRemoteAccessInfo, groupBySelfEntity, endpoint, actions, onSynchronized, onFailed, timeout } = node; // 定义中应该不可能没有actions if (!actions || actions.includes(action)) { const rows = await context.select(targetEntity, { data: { id: 1, ...projection, }, filter: { id: { $in: entityIds, }, }, }, { dontCollect: true, includedDeleted: true }); // userId就是需要发送给远端的user,但是要将本次操作的user过滤掉(操作的原本产生者) const userSendDict = groupByUsers(rows); const selfEntityIdDict = groupBySelfEntity(rows); const encryptInfoDict = {}; const pushToUserIdFn = async (userId) => { const { entity, entityId, rowIds } = userSendDict[userId]; const selfEntityIds = rowIds.map((rowId) => selfEntityIdDict[rowId]); const uniqSelfEntityIds = (0, lodash_2.uniq)(selfEntityIds); (0, assert_1.default)(uniqSelfEntityIds.length === 1, '推向同一个userId的oper不可能关联在多个不同的selfEntity行上'); const selfEntityId = uniqSelfEntityIds[0]; if (!encryptInfoDict[selfEntityId]) { encryptInfoDict[selfEntityId] = await this.config.self.getSelfEncryptInfo(context, selfEntityId); } const selfEncryptInfo = encryptInfoDict[selfEntityId]; // 推送到远端结点的oper const oper2 = this.refineOperData(oper, rowIds); const { url } = await getRemoteAccessInfo(context, { userId, remoteEntityId: entityId, }); this.pushOperToChannel(oper2, userId, url, endpoint, entity, entityId, selfEncryptInfo, onSynchronized, onFailed, timeout); }; for (const userId in userSendDict) { if (userId !== operatorId || !oper.bornAt) { await pushToUserIdFn(userId); pushed = true; } } } })); } // 如果oper一个也不用推送,说明其定义的推送path和对象行的path不匹配(动态指针) return pushed; } /** * 为了保证推送的oper序,采用从database中顺序读取所有需要推送的oper来进行推送 * 每个进程都保证把当前所有的oper顺序处理掉,就不会有乱序的问题,大家通过database上的锁来完成同步 * @param context */ async trySynchronizeOpers(context) { let result = undefined; // 暂时全用root身份去执行(未来不一定对) await context.initialize(); context.openRootMode(); let dirtyOpers = await context.select('oper', { data: { id: 1, }, filter: { [types_1.TriggerUuidAttribute]: { $exists: true, }, } }, { dontCollect: true }); if (dirtyOpers.length > 0) { // 这一步是加锁,保证只有一个进程完成推送,推送者提交前会将$$triggerData$$清零 const ids = dirtyOpers.map(ele => ele.id); dirtyOpers = await context.select('oper', { data: { id: 1, action: 1, data: 1, targetEntity: 1, operatorId: 1, [types_1.TriggerDataAttribute]: 1, [types_1.TriggerUuidAttribute]: 1, bornAt: 1, $$createAt$$: 1, $$seq$$: 1, filter: 1, operEntity$oper: { $entity: 'operEntity', data: { entityId: 1, operId: 1, entity: 1, id: 1, } } }, filter: { id: { $in: ids }, }, }, { dontCollect: true, forUpdate: true }); dirtyOpers = dirtyOpers.filter(ele => !!ele[types_1.TriggerUuidAttribute]); if (dirtyOpers.length > 0) { for (const c in this.channelDict) { (0, assert_1.default)(this.channelDict[c].queue.length === 0); } const pushedIds = []; const unPushedIds = []; await Promise.all(dirtyOpers.map(async (oper) => { const result = await this.dispatchOperToChannels(oper, context); if (result) { pushedIds.push(oper.id); } else { unPushedIds.push(oper.id); } })); if (unPushedIds.length > 0) { // 在内部清理相应的triggerData await context.operate('oper', { id: await (0, uuid_1.generateNewIdAsync)(), action: 'update', data: { [types_1.TriggerDataAttribute]: null, [types_1.TriggerUuidAttribute]: null, }, filter: { id: { $in: unPushedIds, } } }, {}); } if (pushedIds.length > 0) { result = await this.startAllChannel(context); } } } if (result) { const exceptions = result.filter(ele => ele instanceof Error); if (exceptions.length > 0) { const notPartialSuccess = exceptions.find(ele => !(ele instanceof types_1.OakPartialSuccess)); if (notPartialSuccess) { throw notPartialSuccess; } else { console.warn('出现了PartialSuccess,部分Oper同步失败,事务仍然提交'); } } } } makeCreateOperTrigger() { const { config } = this; const { remotes, self } = config; // 根据remotes定义,建立从entity到需要同步的远端结点信息的Map remotes.forEach((remote) => { const { getPushInfo, pushEntities: pushEntityDefs, endpoint, pathToUser, relationName: rnRemote, onFailed, timeout } = remote; if (pushEntityDefs) { const pushEntities = []; const endpoint2 = (0, path_1.join)(endpoint || 'sync', self.entity); for (const def of pushEntityDefs) { const { pathToRemoteEntity, pathToSelfEntity, relationName, recursive, entity, actions, onSynchronized } = def; pushEntities.push(entity); const relationName2 = relationName || rnRemote; const path2 = pathToUser ? `${pathToRemoteEntity}.${pathToUser}` : pathToRemoteEntity; (0, assert_1.default)(!recursive); const { projection, getData } = relationName2 ? (0, relationPath_1.destructRelationPath)(this.schema, entity, path2, { relation: { name: relationName, } }, recursive) : (0, relationPath_1.destructDirectUserPath)(this.schema, entity, path2); const toSelfEntity = (0, relationPath_1.destructDirectPath)(this.schema, entity, pathToSelfEntity); const groupByUsers = (rows) => { const userRowDict = {}; rows.forEach((row) => { const goals = getData(row); if (goals) { goals.forEach(({ entity, entityId, userId }) => { (0, assert_1.default)(userId); if (userRowDict[userId]) { // 逻辑上来说同一个userId,其关联的entity和entityId必然相同,这个entity/entityId代表了对方 (0, assert_1.default)(userRowDict[userId].entity === entity && userRowDict[userId].entityId === entityId); userRowDict[userId].rowIds.push(row.id); } else { userRowDict[userId] = { entity, entityId, rowIds: [row.id], }; } }); } }); return userRowDict; }; const projectionMerged = (0, lodash_2.merge)(projection, toSelfEntity.projection); const groupBySelfEntity = (rows) => { const selfEntityIdDict = {}; for (const row of rows) { const selfEntityInfo = toSelfEntity.getData(row, pathToSelfEntity); if (selfEntityInfo) { const selfEntityIds = selfEntityInfo.map((info) => { (0, assert_1.default)(info.entity === this.config.self.entity); return info.data.id; }); const uniqSelfEntityIds = (0, lodash_2.uniq)(selfEntityIds); (0, assert_1.default)(uniqSelfEntityIds.length === 1, '同一行数据不可能关联在两行selfEntity上'); selfEntityIdDict[row.id] = uniqSelfEntityIds[0]; } } return selfEntityIdDict; }; if (!this.pushAccessMap[entity]) { this.pushAccessMap[entity] = [{ projection: projectionMerged, groupByUsers, groupBySelfEntity, getRemotePushInfo: getPushInfo, endpoint: endpoint2, entity, actions, onSynchronized, onFailed, timeout, }]; } else { this.pushAccessMap[entity].push({ projection, groupByUsers, groupBySelfEntity, getRemotePushInfo: getPushInfo, endpoint: endpoint2, entity, actions, onSynchronized, onFailed, timeout, }); } } } }); const pushEntities = Object.keys(this.pushAccessMap); // push相关联的entity,在发生操作时,需要将operation推送到远端 const createOperTrigger = { name: 'push oper to remote node', entity: 'oper', action: 'create', when: 'commit', strict: 'makeSure', singleton: true, grouped: true, cleanTriggerDataBySelf: true, check: (operation) => { const { data } = operation; const { targetEntity, action } = data; return pushEntities.includes(data.targetEntity) && !!this.pushAccessMap[targetEntity].find(({ actions }) => !actions || actions.includes(action)); }, fn: async ({ ids }, context) => { return this.trySynchronizeOpers(context); } }; return createOperTrigger; } constructor(config, schema, contextBuilder) { this.config = config; this.schema = schema; this.contextBuilder = contextBuilder; } /** * 根据sync的定义,生成对应的 commit triggers * @returns */ getSyncTriggers() { return [this.makeCreateOperTrigger()]; } getSelfEndpoint() { return { name: this.config.self.endpoint || 'sync', method: 'post', params: ['entity', 'entityId'], fn: async (context, params, headers, req, body) => { // body中是传过来的oper数组信息 const { entity, entityId } = params; const { [OAK_SYNC_HEADER_ENTITY]: meEntity, [OAK_SYNC_HEADER_ENTITY_ID]: meEntityId, [OAK_SYNC_HEADER_NONCE]: syncNonce, [OAK_SYNC_HEADER_TIMESTAMP]: syncTs, [OAK_SYNC_HEADER_SIGN]: syncSign } = headers; if (process.env.NODE_ENV === 'development') { console.log('接收到来自远端的sync数据', entity, JSON.stringify(body)); } // todo 这里先缓存,不考虑本身同步相关信息的更新 if (!this.remotePullInfoMap[entity]) { this.remotePullInfoMap[entity] = {}; } if (!this.remotePullInfoMap[entity][entityId]) { const { getPullInfo, pullEntities, clockDriftDuration } = this.config.remotes.find(ele => ele.entity === entity); const pullEntityDict = {}; if (pullEntities) { pullEntities.forEach((def) => pullEntityDict[def.entity] = def); } const closeFn = context.openRootMode(); this.remotePullInfoMap[entity][entityId] = { pullInfo: await getPullInfo(context, { selfId: meEntityId, remoteEntityId: entityId, }), pullEntityDict, clockDriftDuration, }; closeFn(); } const { pullInfo, pullEntityDict, clockDriftDuration } = this.remotePullInfoMap[entity][entityId]; const { userId, algorithm, publicKey, cxtInfo } = pullInfo; (0, assert_1.default)(userId); context.setCurrentUserId(userId); if (cxtInfo) { await context.initialize(cxtInfo); } const syncTimestamp = parseInt(syncTs, 10); if (clockDriftDuration !== 0) { if (!(Date.now() - syncTimestamp < (clockDriftDuration || 10000))) { throw new types_1.OakClockDriftException('同步时钟漂移过长'); } } if (!verify(publicKey, JSON.stringify(body), syncTs, syncNonce, syncSign)) { throw new types_1.OakSignatureVerificationException('同步验签失败'); } const opers = body; const ids = opers.map(ele => ele.id); const existsIds = (await context.select('oper', { data: { id: 1, }, filter: { id: { $in: ids, }, } }, {})).map(ele => ele.id); const staleOpers = opers.filter((ele) => existsIds.includes(ele.id)); const freshOpers = opers.filter((ele) => !existsIds.includes(ele.id)); if (process.env.NODE_ENV !== 'production') { const maxStaleSeq = Math.max(...staleOpers.map(ele => ele.$$seq$$)); for (const oper of freshOpers) { (0, assert_1.default)(oper.$$seq$$ > maxStaleSeq, '发现了seq没有按序进行同步'); } } // 检查已经应用过的opers是否完整 const staleIds = staleOpers.map(ele => ele.id); if (staleIds.length > 0) { const opersExisted = await context.select('oper', { data: { id: 1, }, filter: { id: { $in: staleIds, } } }, { dontCollect: true }); if (opersExisted.length < staleIds.length) { const missed = (0, lodash_1.difference)(staleIds, opersExisted.map(ele => ele.id)); // todo 这里如果远端业务逻辑严格,发生乱序应是无关的oper,直接执行就好 by Xc throw new Error(`在sync过程中发现有丢失的oper数据「${missed}」`); } } // 应用所有的freshOpers,失败则报错 for (const freshOper of freshOpers) { // freshOpers是按$$seq$$序产生的 const { id, targetEntity, action, data, $$seq$$, filter } = freshOper; const ids = (0, filter_1.getRelevantIds)(filter); (0, assert_1.default)(ids.length > 0); if (pullEntityDict && pullEntityDict[targetEntity]) { const { process } = pullEntityDict[targetEntity]; if (process) { await process(action, data, context); } } const operation = { id, data, action, filter: { id: ids.length === 1 ? ids[0] : { $in: ids, }, }, bornAt: $$seq$$, }; await context.operate(targetEntity, operation, {}); } if (process.env.NODE_ENV === 'development') { console.log(`同步成功,其中重复的oper ${staleIds.length}个,新的oper ${freshOpers.length}个。`); } return {}; } }; } tryCreateSyncProcess() { } } exports.default = Synchronizer;