oak-backend-base/lib/Synchronizer.js

646 lines
29 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const tslib_1 = require("tslib");
const crypto_1 = require("crypto");
const types_1 = require("oak-domain/lib/types");
const relationPath_1 = require("oak-domain/lib/utils/relationPath");
const assert_1 = tslib_1.__importDefault(require("assert"));
const path_1 = require("path");
const lodash_1 = require("oak-domain/lib/utils/lodash");
const filter_1 = require("oak-domain/lib/store/filter");
const uuid_1 = require("oak-domain/lib/utils/uuid");
const lodash_2 = require("lodash");
const OAK_SYNC_HEADER_ENTITY = 'oak-sync-entity';
const OAK_SYNC_HEADER_ENTITY_ID = 'oak-sync-entity-id';
const OAK_SYNC_HEADER_TIMESTAMP = 'oak-sync-timestamp';
const OAK_SYNC_HEADER_NONCE = 'oak-sync-nonce';
const OAK_SYNC_HEADER_SIGN = 'oak-sync-sign';
function generateSignStr(body, ts, nonce) {
return `${body}\n${ts}\n${nonce}`;
}
async function sign(privateKey, body) {
const ts = Date.now();
const nonce = await (0, uuid_1.generateNewIdAsync)();
const sign2 = (0, crypto_1.createSign)('SHA256');
sign2.update(generateSignStr(body, `${ts}`, nonce));
sign2.end();
const signature = sign2.sign(privateKey).toString('hex');
return {
ts,
nonce,
signature,
};
}
function verify(publicKey, body, ts, nonce, signature) {
const verify2 = (0, crypto_1.createVerify)('SHA256');
verify2.update(generateSignStr(body, ts, nonce));
verify2.end();
return verify2.verify(publicKey, signature, 'hex');
}
async function fetchWithTimeout(url, options, timeout = 5000) {
if (typeof AbortController === 'undefined' || timeout === 0) {
return fetch(url, options);
}
const controller = new AbortController();
const signal = controller.signal;
// 设置超时
const timeoutId = setTimeout(() => {
controller.abort();
}, timeout);
// 发起 fetch 请求并传递 signal
return fetch(url, Object.assign({}, options, { signal }))
.then(response => {
clearTimeout(timeoutId); // 如果请求成功,清除超时
return response;
})
.catch(error => {
clearTimeout(timeoutId); // 如果请求失败,清除超时
if (error.name === 'AbortError') {
throw new types_1.OakRequestTimeoutException();
}
throw error; // 其他错误
});
}
class Synchronizer {
config;
schema;
remotePullInfoMap = {};
channelDict = {};
contextBuilder;
pushAccessMap = {};
async startChannel2(context, channel) {
const { queue, api, selfEncryptInfo, entity, entityId, onFailed, timeout = 5000 } = channel;
// todo 加密
const opers = queue.map(ele => ele.oper);
if (process.env.NODE_ENV === 'development') {
console.log('向远端结点sync oper', JSON.stringify(opers.map(ele => ({
id: ele.id,
seq: ele.$$seq$$,
}))), 'txnId:', context.getCurrentTxnId());
}
const finalApi = (0, path_1.join)(api, selfEncryptInfo.id);
channel.queue = [];
try {
const body = JSON.stringify(opers);
const { ts, nonce, signature } = await sign(selfEncryptInfo.privateKey, body);
const res = await fetchWithTimeout(finalApi, {
method: 'post',
headers: {
'Content-Type': 'application/json',
[OAK_SYNC_HEADER_ENTITY]: entity,
[OAK_SYNC_HEADER_ENTITY_ID]: entityId,
[OAK_SYNC_HEADER_TIMESTAMP]: `${ts}`,
[OAK_SYNC_HEADER_NONCE]: nonce,
[OAK_SYNC_HEADER_SIGN]: signature,
},
body,
}, timeout);
if (res.status !== 200) {
throw new Error(`sync数据时访问api「${finalApi}」的结果不是200。「${res.status}`);
}
const json = await res.json();
if (json.exception) {
throw new Error(`sync数据时远端服务报异常「${json.exception}`);
}
}
catch (err) {
if (onFailed) {
context.on('rollback', async () => {
const context2 = this.contextBuilder();
await context2.begin();
try {
await onFailed({
remoteEntity: entity,
remoteEntityId: entityId,
data: queue.map((ele) => ({
entity: ele.oper.targetEntity,
rowIds: ele.oper.filter.id.$in,
action: ele.oper.action,
data: ele.oper.data,
})),
reason: err,
}, context2);
await context2.commit();
}
catch (err) {
await context2.rollback();
}
});
}
throw err;
}
// 如果是200则已经成功
for (const ele of queue) {
const { oper, onSynchronized } = ele;
if (onSynchronized) {
const operEntityArr = await context.select('operEntity', {
data: {
id: 1,
entity: 1,
entityId: 1
},
filter: {
operId: oper.id,
}
}, {});
const entityIds = operEntityArr.map(ele => ele.entityId);
await onSynchronized({
action: oper.action,
data: oper.data,
rowIds: entityIds,
remoteEntity: entity,
remoteEntityId: entityId,
}, context);
}
}
const operIds = queue.map(ele => ele.oper.id);
// 在内部清理相应的triggerData
await context.operate('oper', {
id: await (0, uuid_1.generateNewIdAsync)(),
action: 'update',
data: {
[types_1.TriggerDataAttribute]: null,
[types_1.TriggerUuidAttribute]: null,
},
filter: {
id: {
$in: operIds,
}
}
}, {});
}
/**开始同步这些channel上的oper。注意这时候即使某个channel上失败了也不应影响本事务提交其它的channel成功了 */
async startAllChannel(context) {
return await Promise.all(Object.keys(this.channelDict).map(async (k) => {
const channel = this.channelDict[k];
if (channel.queue.length > 0) {
channel.queue.sort((o1, o2) => o1.oper.$$seq$$ - o2.oper.$$seq$$);
try {
return this.startChannel2(context, channel);
}
catch (err) {
const msg = `startChannel推送数据出错channel是「${k}」,异常是「${err.message}`;
console.error(err);
return new types_1.OakPartialSuccess(msg);
}
}
}));
}
pushOperToChannel(oper, userId, url, endpoint, remoteEntity, remoteEntityId, selfEncryptInfo, onSynchronized, onFailed, timeout) {
if (!this.channelDict[userId]) {
// channel上缓存这些信息暂不支持动态更新
this.channelDict[userId] = {
api: (0, path_1.join)(url, 'endpoint', endpoint),
queue: [],
entity: remoteEntity,
entityId: remoteEntityId,
selfEncryptInfo,
onFailed,
timeout,
};
}
else {
// 趁机更新一下加密信息
this.channelDict[userId].selfEncryptInfo = selfEncryptInfo;
(0, assert_1.default)(this.channelDict[userId].onFailed === onFailed);
}
const channel = this.channelDict[userId];
(0, assert_1.default)(channel.api === (0, path_1.join)(url, 'endpoint', endpoint));
(0, assert_1.default)(channel.entity === remoteEntity);
(0, assert_1.default)(channel.entityId === remoteEntityId);
if (channel.queue.find(ele => ele.oper.id === oper.id)) {
console.error('channel.queue找到相同的需推送的oper');
}
channel.queue.push({
oper,
onSynchronized,
});
}
refineOperData(oper, rowIds) {
const { action, id, targetEntity, data, $$seq$$, filter } = oper;
const data2 = (action === 'create' && data instanceof Array) ? data.filter(ele => rowIds.includes(ele.id)) : data;
// 过滤掉数据中的跨事务trigger信息
if (data2 instanceof Array) {
data2.forEach((d) => {
(0, lodash_2.unset)(d, types_1.TriggerDataAttribute);
(0, lodash_2.unset)(d, types_1.TriggerUuidAttribute);
});
}
else {
(0, lodash_2.unset)(data2, types_1.TriggerDataAttribute);
(0, lodash_2.unset)(data2, types_1.TriggerUuidAttribute);
}
return {
id, action, targetEntity, data: data2, $$seq$$, filter,
};
}
async dispatchOperToChannels(oper, context) {
const { operatorId, targetEntity, filter, action, data, operEntity$oper } = oper;
const entityIds = operEntity$oper?.map(ele => ele.entityId);
(0, assert_1.default)(entityIds && entityIds.length > 0);
const pushEntityNodes = this.pushAccessMap[targetEntity];
let pushed = false;
if (pushEntityNodes && pushEntityNodes.length > 0) {
// 每个pushEntityNode代表配置的一个remoteEntity
await Promise.all(pushEntityNodes.map(async (node) => {
const { projection, groupByUsers, getRemotePushInfo: getRemoteAccessInfo, groupBySelfEntity, endpoint, actions, onSynchronized, onFailed, timeout } = node;
// 定义中应该不可能没有actions
if (!actions || actions.includes(action)) {
const rows = await context.select(targetEntity, {
data: {
id: 1,
...projection,
},
filter: {
id: {
$in: entityIds,
},
},
}, { dontCollect: true, includedDeleted: true });
// userId就是需要发送给远端的user但是要将本次操作的user过滤掉操作的原本产生者
const userSendDict = groupByUsers(rows);
const selfEntityIdDict = groupBySelfEntity(rows);
const encryptInfoDict = {};
const pushToUserIdFn = async (userId) => {
const { entity, entityId, rowIds } = userSendDict[userId];
const selfEntityIds = rowIds.map((rowId) => selfEntityIdDict[rowId]);
const uniqSelfEntityIds = (0, lodash_2.uniq)(selfEntityIds);
(0, assert_1.default)(uniqSelfEntityIds.length === 1, '推向同一个userId的oper不可能关联在多个不同的selfEntity行上');
const selfEntityId = uniqSelfEntityIds[0];
if (!encryptInfoDict[selfEntityId]) {
encryptInfoDict[selfEntityId] = await this.config.self.getSelfEncryptInfo(context, selfEntityId);
}
const selfEncryptInfo = encryptInfoDict[selfEntityId];
// 推送到远端结点的oper
const oper2 = this.refineOperData(oper, rowIds);
const { url } = await getRemoteAccessInfo(context, {
userId,
remoteEntityId: entityId,
});
this.pushOperToChannel(oper2, userId, url, endpoint, entity, entityId, selfEncryptInfo, onSynchronized, onFailed, timeout);
};
for (const userId in userSendDict) {
if (userId !== operatorId || !oper.bornAt) {
await pushToUserIdFn(userId);
pushed = true;
}
}
}
}));
}
// 如果oper一个也不用推送说明其定义的推送path和对象行的path不匹配动态指针
return pushed;
}
/**
* 为了保证推送的oper序采用从database中顺序读取所有需要推送的oper来进行推送
* 每个进程都保证把当前所有的oper顺序处理掉就不会有乱序的问题大家通过database上的锁来完成同步
* @param context
*/
async trySynchronizeOpers(context) {
let result = undefined;
// 暂时全用root身份去执行未来不一定对)
await context.initialize();
context.openRootMode();
let dirtyOpers = await context.select('oper', {
data: {
id: 1,
},
filter: {
[types_1.TriggerUuidAttribute]: {
$exists: true,
},
}
}, { dontCollect: true });
if (dirtyOpers.length > 0) {
// 这一步是加锁,保证只有一个进程完成推送,推送者提交前会将$$triggerData$$清零
const ids = dirtyOpers.map(ele => ele.id);
dirtyOpers = await context.select('oper', {
data: {
id: 1,
action: 1,
data: 1,
targetEntity: 1,
operatorId: 1,
[types_1.TriggerDataAttribute]: 1,
[types_1.TriggerUuidAttribute]: 1,
bornAt: 1,
$$createAt$$: 1,
$$seq$$: 1,
filter: 1,
operEntity$oper: {
$entity: 'operEntity',
data: {
entityId: 1,
operId: 1,
entity: 1,
id: 1,
}
}
},
filter: {
id: { $in: ids },
},
}, { dontCollect: true, forUpdate: true });
dirtyOpers = dirtyOpers.filter(ele => !!ele[types_1.TriggerUuidAttribute]);
if (dirtyOpers.length > 0) {
for (const c in this.channelDict) {
(0, assert_1.default)(this.channelDict[c].queue.length === 0);
}
const pushedIds = [];
const unPushedIds = [];
await Promise.all(dirtyOpers.map(async (oper) => {
const result = await this.dispatchOperToChannels(oper, context);
if (result) {
pushedIds.push(oper.id);
}
else {
unPushedIds.push(oper.id);
}
}));
if (unPushedIds.length > 0) {
// 在内部清理相应的triggerData
await context.operate('oper', {
id: await (0, uuid_1.generateNewIdAsync)(),
action: 'update',
data: {
[types_1.TriggerDataAttribute]: null,
[types_1.TriggerUuidAttribute]: null,
},
filter: {
id: {
$in: unPushedIds,
}
}
}, {});
}
if (pushedIds.length > 0) {
result = await this.startAllChannel(context);
}
}
}
if (result) {
const exceptions = result.filter(ele => ele instanceof Error);
if (exceptions.length > 0) {
const notPartialSuccess = exceptions.find(ele => !(ele instanceof types_1.OakPartialSuccess));
if (notPartialSuccess) {
throw notPartialSuccess;
}
else {
console.warn('出现了PartialSuccess部分Oper同步失败事务仍然提交');
}
}
}
}
makeCreateOperTrigger() {
const { config } = this;
const { remotes, self } = config;
// 根据remotes定义建立从entity到需要同步的远端结点信息的Map
remotes.forEach((remote) => {
const { getPushInfo, pushEntities: pushEntityDefs, endpoint, pathToUser, relationName: rnRemote, onFailed, timeout } = remote;
if (pushEntityDefs) {
const pushEntities = [];
const endpoint2 = (0, path_1.join)(endpoint || 'sync', self.entity);
for (const def of pushEntityDefs) {
const { pathToRemoteEntity, pathToSelfEntity, relationName, recursive, entity, actions, onSynchronized } = def;
pushEntities.push(entity);
const relationName2 = relationName || rnRemote;
const path2 = pathToUser ? `${pathToRemoteEntity}.${pathToUser}` : pathToRemoteEntity;
(0, assert_1.default)(!recursive);
const { projection, getData } = relationName2 ? (0, relationPath_1.destructRelationPath)(this.schema, entity, path2, {
relation: {
name: relationName,
}
}, recursive) : (0, relationPath_1.destructDirectUserPath)(this.schema, entity, path2);
const toSelfEntity = (0, relationPath_1.destructDirectPath)(this.schema, entity, pathToSelfEntity);
const groupByUsers = (rows) => {
const userRowDict = {};
rows.forEach((row) => {
const goals = getData(row);
if (goals) {
goals.forEach(({ entity, entityId, userId }) => {
(0, assert_1.default)(userId);
if (userRowDict[userId]) {
// 逻辑上来说同一个userId其关联的entity和entityId必然相同这个entity/entityId代表了对方
(0, assert_1.default)(userRowDict[userId].entity === entity && userRowDict[userId].entityId === entityId);
userRowDict[userId].rowIds.push(row.id);
}
else {
userRowDict[userId] = {
entity,
entityId,
rowIds: [row.id],
};
}
});
}
});
return userRowDict;
};
const projectionMerged = (0, lodash_2.merge)(projection, toSelfEntity.projection);
const groupBySelfEntity = (rows) => {
const selfEntityIdDict = {};
for (const row of rows) {
const selfEntityInfo = toSelfEntity.getData(row, pathToSelfEntity);
if (selfEntityInfo) {
const selfEntityIds = selfEntityInfo.map((info) => {
(0, assert_1.default)(info.entity === this.config.self.entity);
return info.data.id;
});
const uniqSelfEntityIds = (0, lodash_2.uniq)(selfEntityIds);
(0, assert_1.default)(uniqSelfEntityIds.length === 1, '同一行数据不可能关联在两行selfEntity上');
selfEntityIdDict[row.id] = uniqSelfEntityIds[0];
}
}
return selfEntityIdDict;
};
if (!this.pushAccessMap[entity]) {
this.pushAccessMap[entity] = [{
projection: projectionMerged,
groupByUsers,
groupBySelfEntity,
getRemotePushInfo: getPushInfo,
endpoint: endpoint2,
entity,
actions,
onSynchronized,
onFailed,
timeout,
}];
}
else {
this.pushAccessMap[entity].push({
projection,
groupByUsers,
groupBySelfEntity,
getRemotePushInfo: getPushInfo,
endpoint: endpoint2,
entity,
actions,
onSynchronized,
onFailed,
timeout,
});
}
}
}
});
const pushEntities = Object.keys(this.pushAccessMap);
// push相关联的entity在发生操作时需要将operation推送到远端
const createOperTrigger = {
name: 'push oper to remote node',
entity: 'oper',
action: 'create',
when: 'commit',
strict: 'makeSure',
singleton: true,
grouped: true,
cleanTriggerDataBySelf: true,
check: (operation) => {
const { data } = operation;
const { targetEntity, action } = data;
return pushEntities.includes(data.targetEntity)
&& !!this.pushAccessMap[targetEntity].find(({ actions }) => !actions || actions.includes(action));
},
fn: async ({ ids }, context) => {
return this.trySynchronizeOpers(context);
}
};
return createOperTrigger;
}
constructor(config, schema, contextBuilder) {
this.config = config;
this.schema = schema;
this.contextBuilder = contextBuilder;
}
/**
* 根据sync的定义生成对应的 commit triggers
* @returns
*/
getSyncTriggers() {
return [this.makeCreateOperTrigger()];
}
getSelfEndpoint() {
return {
name: this.config.self.endpoint || 'sync',
method: 'post',
params: ['entity', 'entityId'],
fn: async (context, params, headers, req, body) => {
// body中是传过来的oper数组信息
const { entity, entityId } = params;
const { [OAK_SYNC_HEADER_ENTITY]: meEntity, [OAK_SYNC_HEADER_ENTITY_ID]: meEntityId, [OAK_SYNC_HEADER_NONCE]: syncNonce, [OAK_SYNC_HEADER_TIMESTAMP]: syncTs, [OAK_SYNC_HEADER_SIGN]: syncSign } = headers;
if (process.env.NODE_ENV === 'development') {
console.log('接收到来自远端的sync数据', entity, JSON.stringify(body));
}
// todo 这里先缓存,不考虑本身同步相关信息的更新
if (!this.remotePullInfoMap[entity]) {
this.remotePullInfoMap[entity] = {};
}
if (!this.remotePullInfoMap[entity][entityId]) {
const { getPullInfo, pullEntities, clockDriftDuration } = this.config.remotes.find(ele => ele.entity === entity);
const pullEntityDict = {};
if (pullEntities) {
pullEntities.forEach((def) => pullEntityDict[def.entity] = def);
}
const closeFn = context.openRootMode();
this.remotePullInfoMap[entity][entityId] = {
pullInfo: await getPullInfo(context, {
selfId: meEntityId,
remoteEntityId: entityId,
}),
pullEntityDict,
clockDriftDuration,
};
closeFn();
}
const { pullInfo, pullEntityDict, clockDriftDuration } = this.remotePullInfoMap[entity][entityId];
const { userId, algorithm, publicKey, cxtInfo } = pullInfo;
(0, assert_1.default)(userId);
context.setCurrentUserId(userId);
if (cxtInfo) {
await context.initialize(cxtInfo);
}
const syncTimestamp = parseInt(syncTs, 10);
if (clockDriftDuration !== 0) {
if (!(Date.now() - syncTimestamp < (clockDriftDuration || 10000))) {
throw new types_1.OakClockDriftException('同步时钟漂移过长');
}
}
if (!verify(publicKey, JSON.stringify(body), syncTs, syncNonce, syncSign)) {
throw new types_1.OakSignatureVerificationException('同步验签失败');
}
const opers = body;
const ids = opers.map(ele => ele.id);
const existsIds = (await context.select('oper', {
data: {
id: 1,
},
filter: {
id: {
$in: ids,
},
}
}, {})).map(ele => ele.id);
const staleOpers = opers.filter((ele) => existsIds.includes(ele.id));
const freshOpers = opers.filter((ele) => !existsIds.includes(ele.id));
if (process.env.NODE_ENV !== 'production') {
const maxStaleSeq = Math.max(...staleOpers.map(ele => ele.$$seq$$));
for (const oper of freshOpers) {
(0, assert_1.default)(oper.$$seq$$ > maxStaleSeq, '发现了seq没有按序进行同步');
}
}
// 检查已经应用过的opers是否完整
const staleIds = staleOpers.map(ele => ele.id);
if (staleIds.length > 0) {
const opersExisted = await context.select('oper', {
data: {
id: 1,
},
filter: {
id: {
$in: staleIds,
}
}
}, { dontCollect: true });
if (opersExisted.length < staleIds.length) {
const missed = (0, lodash_1.difference)(staleIds, opersExisted.map(ele => ele.id));
// todo 这里如果远端业务逻辑严格发生乱序应是无关的oper直接执行就好 by Xc
throw new Error(`在sync过程中发现有丢失的oper数据「${missed}`);
}
}
// 应用所有的freshOpers失败则报错
for (const freshOper of freshOpers) {
// freshOpers是按$$seq$$序产生的
const { id, targetEntity, action, data, $$seq$$, filter } = freshOper;
const ids = (0, filter_1.getRelevantIds)(filter);
(0, assert_1.default)(ids.length > 0);
if (pullEntityDict && pullEntityDict[targetEntity]) {
const { process } = pullEntityDict[targetEntity];
if (process) {
await process(action, data, context);
}
}
const operation = {
id,
data,
action,
filter: {
id: ids.length === 1 ? ids[0] : {
$in: ids,
},
},
bornAt: $$seq$$,
};
await context.operate(targetEntity, operation, {});
}
if (process.env.NODE_ENV === 'development') {
console.log(`同步成功其中重复的oper ${staleIds.length}新的oper ${freshOpers.length}个。`);
}
return {};
}
};
}
tryCreateSyncProcess() {
}
}
exports.default = Synchronizer;