实现了同步数据,但异常还没有处理干净

This commit is contained in:
Xu Chang 2024-02-24 16:23:27 +08:00
parent df89d5705b
commit f5e4d010ee
8 changed files with 492 additions and 268 deletions

7
lib/AppLoader.d.ts vendored
View File

@ -7,7 +7,6 @@ import { IncomingHttpHeaders, IncomingMessage } from 'http';
import { Namespace } from 'socket.io';
import DataSubscriber from './cluster/DataSubscriber';
import Synchronizer from './Synchronizer';
import { SyncConfig } from './types/Sync';
export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends GeneralAppLoader<ED, Cxt> {
protected dbStore: DbStore<ED, Cxt>;
private aspectDict;
@ -17,7 +16,11 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
protected contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>;
private requireSth;
protected makeContext(cxtStr?: string, headers?: IncomingHttpHeaders): Promise<Cxt>;
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace, nsServer?: Namespace, syncConfig?: SyncConfig<ED, Cxt>);
/**
* configuration
*/
private getConfiguration;
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace, nsServer?: Namespace);
protected registerTrigger(trigger: Trigger<ED, keyof ED, Cxt>): void;
initTriggers(): void;
mount(initialize?: true): Promise<void>;

View File

@ -94,9 +94,22 @@ class AppLoader extends types_1.AppLoader {
context.headers = headers;
return context;
}
constructor(path, contextBuilder, ns, nsServer, syncConfig) {
/**
* 后台启动的configuration统一放在这里读取
*/
getConfiguration() {
const dbConfigFile = (0, path_1.join)(this.path, 'configuration', 'mysql.json');
const dbConfig = require(dbConfigFile);
const syncConfigFile = (0, path_1.join)(this.path, 'lib', 'configuration', 'sync.js');
const syncConfig = (0, fs_1.existsSync)(syncConfigFile) && require(syncConfigFile).default;
return {
dbConfig: dbConfig,
syncConfig: syncConfig,
};
}
constructor(path, contextBuilder, ns, nsServer) {
super(path);
const dbConfig = require((0, path_1.join)(path, '/configuration/mysql.json'));
const { dbConfig, syncConfig } = this.getConfiguration();
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
const { authDeduceRelationMap, selectFreeEntities, updateFreeDict } = require(`${path}/lib/config/relation`);
this.externalDependencies = require((0, env_1.OAK_EXTERNAL_LIBS_FILEPATH)((0, path_1.join)(path, 'lib')));
@ -107,6 +120,7 @@ class AppLoader extends types_1.AppLoader {
}
if (syncConfig) {
const { self, remotes } = syncConfig;
const { getSelfEncryptInfo, ...restSelf } = self;
this.synchronizer = new Synchronizer_1.default({
self: {
// entity: self.entity,
@ -122,38 +136,41 @@ class AppLoader extends types_1.AppLoader {
await context.rollback();
throw err;
}
}
},
remotes: remotes.map((r) => ({
entity: r.entity,
syncEntities: r.syncEntities,
getRemotePushInfo: async (id) => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await r.getPushInfo(id, context);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
throw err;
}
},
getRemotePullInfo: async (userId) => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await r.getPullInfo(userId, context);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
throw err;
}
}
}))
...restSelf
},
remotes: remotes.map((r) => {
const { getPushInfo, getPullInfo, ...rest } = r;
return {
getRemotePushInfo: async (id) => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await getPushInfo(id, context);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
throw err;
}
},
getRemotePullInfo: async (userId) => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await getPullInfo(userId, context);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
throw err;
}
},
...rest,
};
})
}, this.dbStore.getSchema());
}
this.contextBuilder = (scene) => async (store) => {

View File

@ -12,7 +12,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
private pushOper;
private loadPublicKey;
private makeCreateOperTrigger;
constructor(config: SyncConfigWrapper<ED>, schema: StorageSchema<ED>);
constructor(config: SyncConfigWrapper<ED, Cxt>, schema: StorageSchema<ED>);
/**
* sync的定义 commit triggers
* @returns

View File

@ -1,8 +1,10 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const tslib_1 = require("tslib");
const relationPath_1 = require("oak-domain/lib/utils/relationPath");
const console_1 = require("console");
const lodash_1 = require("oak-domain/lib/utils/lodash");
const assert_1 = tslib_1.__importDefault(require("assert"));
const path_1 = require("path");
const filter_1 = require("oak-domain/lib/store/filter");
const OAK_SYNC_HEADER_ITEM = 'oak-sync-remote-id';
async function pushRequestOnChannel(channel, selfEncryptInfo) {
const { queue, api } = channel;
@ -12,6 +14,7 @@ async function pushRequestOnChannel(channel, selfEncryptInfo) {
const opers = queue.map(ele => ele.oper);
try {
// todo 加密
console.log('向远端结点sync数据', api, JSON.stringify(opers));
const res = await fetch(api, {
method: 'post',
headers: {
@ -44,36 +47,33 @@ class Synchronizer {
remotePullInfoMap = {};
remotePushChannel = {};
// 将产生的oper推送到远端Node。注意要尽量在本地阻止重复推送
async pushOper(oper, userIds, getRemoteAccessInfo, endpoint) {
await Promise.all(userIds.map(async (userId) => {
if (!this.remotePushChannel[userId]) {
const { url } = await getRemoteAccessInfo(userId);
this.remotePushChannel[userId] = {
// todo 规范化
api: `${url}/endpoint/${endpoint || 'sync'}`,
queue: [],
};
}
const channel = this.remotePushChannel[userId];
if (channel.remoteMaxTimestamp && oper.bornAt < channel.remoteMaxTimestamp) {
// 说明已经同步过了
return;
}
const waiter = new Promise((resolve, reject) => {
channel.queue.push({
oper,
resolve,
reject
});
async pushOper(oper, userId, url, endpoint) {
if (!this.remotePushChannel[userId]) {
this.remotePushChannel[userId] = {
// todo 规范化
api: (0, path_1.join)(url, 'endpoint', endpoint),
queue: [],
};
}
const channel = this.remotePushChannel[userId];
if (channel.remoteMaxTimestamp && oper.bornAt < channel.remoteMaxTimestamp) {
// 说明已经同步过了
return;
}
const waiter = new Promise((resolve, reject) => {
channel.queue.push({
oper,
resolve,
reject
});
if (!channel.handler) {
channel.handler = setTimeout(async () => {
(0, console_1.assert)(this.selfEncryptInfo);
await pushRequestOnChannel(channel, this.selfEncryptInfo);
}, 1000); // 1秒钟集中同步一次
}
await waiter;
}));
});
if (!channel.handler) {
channel.handler = setTimeout(async () => {
(0, assert_1.default)(this.selfEncryptInfo);
await pushRequestOnChannel(channel, this.selfEncryptInfo);
}, 1000); // 1秒钟集中同步一次
}
await waiter;
}
async loadPublicKey() {
this.selfEncryptInfo = await this.config.self.getSelfEncryptInfo();
@ -84,38 +84,61 @@ class Synchronizer {
// 根据remotes定义建立从entity到需要同步的远端结点信息的Map
const pushAccessMap = {};
remotes.forEach((remote) => {
const { getRemotePushInfo, syncEntities, endpoint } = remote;
const pushEntityDefs = syncEntities.filter(ele => ele.direction === 'push');
const pushEntities = pushEntityDefs.map(ele => ele.entity);
pushEntities.forEach((entity) => {
const def = syncEntities.find(ele => ele.entity === entity);
const { path, relationName, recursive } = def;
const { projection, getData } = relationName ? (0, relationPath_1.destructRelationPath)(this.schema, entity, path, {
relation: {
name: relationName,
const { getRemotePushInfo, pushEntities: pushEntityDefs, endpoint, pathToUser, relationName: rnRemote, entitySelf } = remote;
if (pushEntityDefs) {
const pushEntities = [];
const endpoint2 = (0, path_1.join)(endpoint || 'sync', entitySelf || self.entitySelf);
for (const def of pushEntityDefs) {
const { path, relationName, recursive, entity, actions, onSynchronized } = def;
pushEntities.push(entity);
const relationName2 = relationName || rnRemote;
const path2 = pathToUser ? `${path}.${pathToUser}` : path;
const { projection, getData } = relationName2 ? (0, relationPath_1.destructRelationPath)(this.schema, entity, path2, {
relation: {
name: relationName,
}
}, recursive) : (0, relationPath_1.destructDirectPath)(this.schema, entity, path2, recursive);
const groupByUsers = (rows) => {
const userRowDict = {};
rows.filter((row) => {
const userIds = getData(row)?.map(ele => ele.userId);
if (userIds) {
userIds.forEach((userId) => {
if (userRowDict[userId]) {
userRowDict[userId].push(row.id);
}
else {
userRowDict[userId] = [row.id];
}
});
}
});
return userRowDict;
};
if (!pushAccessMap[entity]) {
pushAccessMap[entity] = [{
projection,
groupByUsers,
getRemotePushInfo,
endpoint: endpoint2,
entity,
actions,
onSynchronized
}];
}
}, recursive) : (0, relationPath_1.destructDirectPath)(this.schema, entity, path, recursive);
const getUserIds = (rows) => {
const urs = rows.map((row) => getData(row)).flat();
return (0, lodash_1.uniq)(urs.map(ele => ele.userId));
};
if (!pushAccessMap[entity]) {
pushAccessMap[entity] = [{
else {
pushAccessMap[entity].push({
projection,
getUserIds,
groupByUsers,
getRemotePushInfo,
endpoint,
}];
endpoint: endpoint2,
entity,
actions,
onSynchronized
});
}
}
else {
pushAccessMap[entity].push({
projection,
getUserIds,
getRemotePushInfo,
endpoint,
});
}
});
}
});
const pushEntities = Object.keys(pushAccessMap);
// push相关联的entity在发生操作时需要将operation推送到远端
@ -130,7 +153,7 @@ class Synchronizer {
return pushEntities.includes(data.targetEntity);
},
fn: async ({ ids }, context) => {
(0, console_1.assert)(ids.length === 1);
(0, assert_1.default)(ids.length === 1);
const [oper] = await context.select('oper', {
data: {
id: 1,
@ -146,36 +169,91 @@ class Synchronizer {
entityId: 1,
},
},
bornAt: 1,
$$createAt$$: 1,
},
filter: {
id: ids[0],
}
}, { dontCollect: true });
const { operatorId, targetEntity, operEntity$oper: operEntities } = oper;
const { operatorId, targetEntity, operEntity$oper: operEntities, action, data } = oper;
const entityIds = operEntities.map(ele => ele.entityId);
const pushNodes = pushAccessMap[targetEntity];
if (pushNodes) {
await Promise.all(pushNodes.map(async ({ projection, getUserIds, getRemotePushInfo: getRemoteAccessInfo, endpoint }) => {
const rows = await context.select(targetEntity, {
data: {
id: 1,
...projection,
},
filter: {
id: {
$in: entityIds,
const pushEntityNodes = pushAccessMap[targetEntity];
if (pushEntityNodes && pushEntityNodes.length > 0) {
// 每个pushEntityNode代表配置的一个remoteEntity
await Promise.all(pushEntityNodes.map(async (node) => {
const { projection, groupByUsers, getRemotePushInfo: getRemoteAccessInfo, endpoint, entity, actions, onSynchronized } = node;
if (!actions || actions.includes(action)) {
const pushed = [];
const rows = await context.select(targetEntity, {
data: {
id: 1,
...projection,
},
},
}, { dontCollect: true });
// userId就是需要发送给远端的user但是要将本次操作的user过滤掉他是操作的产生者
const userIds = getUserIds(rows).filter((ele) => ele !== operatorId);
if (userIds.length > 0) {
await this.pushOper(oper, userIds, getRemoteAccessInfo, endpoint);
filter: {
id: {
$in: entityIds,
},
},
}, { dontCollect: true, includedDeleted: true });
// userId就是需要发送给远端的user但是要将本次操作的user过滤掉操作的原本产生者
const userSendDict = groupByUsers(rows);
const pushToUserIdFn = async (userId) => {
const rowIds = userSendDict[userId];
// 推送到远端结点的oper
const oper2 = {
id: oper.id,
action: action,
data: (action === 'create' && data instanceof Array) ? data.filter(ele => rowIds.includes(ele.id)) : data,
filter: {
id: rowIds.length === 1 ? rowIds[0] : {
$in: rowIds,
}
},
bornAt: oper.bornAt,
targetEntity,
};
const { url } = await getRemoteAccessInfo(userId);
try {
await this.pushOper(oper2 /** 这里不明白为什么过不去 */, userId, url, endpoint);
return {
userId,
rowIds,
};
}
catch (err) {
return {
userId,
rowIds,
error: err,
};
}
};
for (const userId in userSendDict) {
if (userId !== operatorId) {
pushed.push(pushToUserIdFn(userId));
}
}
if (pushed.length > 0) {
const result = await Promise.all(pushed);
if (onSynchronized) {
await onSynchronized({
action: action,
data: data,
result,
}, context);
}
else {
const errResult = result.find(ele => !!ele.error);
if (errResult) {
console.error('同步数据时出错', errResult.userId, errResult.rowIds, errResult.error);
throw errResult.error;
}
}
}
}
return undefined;
}));
return entityIds.length * pushNodes.length;
return entityIds.length * pushEntityNodes.length;
}
return 0;
}
@ -205,6 +283,7 @@ class Synchronizer {
// body中是传过来的oper数组信息
const { entity } = params;
const { [OAK_SYNC_HEADER_ITEM]: id } = headers;
console.log('接收到来自远端的sync数据', entity, JSON.stringify(body));
try {
// todo 这里先缓存,不考虑本身同步相关信息的更新
if (!this.remotePullInfoMap[entity]) {
@ -217,6 +296,8 @@ class Synchronizer {
const pullInfo = this.remotePullInfoMap[entity][id];
const { userId, algorithm, publicKey } = pullInfo;
// todo 解密
(0, assert_1.default)(userId);
context.setCurrentUserId(userId);
// 如果本次同步中有bornAt比本用户操作的最大的bornAt要小则说明是重复更新直接返回
const [maxHisOper] = await context.select('oper', {
data: {
@ -241,15 +322,16 @@ class Synchronizer {
const legalOpers = maxHisOper ? opers.filter(ele => ele.bornAt > maxHisOper.bornAt) : opers;
if (legalOpers.length > 0) {
for (const oper of legalOpers) {
const { id, targetEntity, action, data, bornAt, operEntity$oper: operEntities } = oper;
const ids = operEntities.map(ele => ele.id);
const { id, targetEntity, action, data, bornAt, filter } = oper;
const ids = (0, filter_1.getRelevantIds)(filter);
(0, assert_1.default)(ids.length > 0);
this.checkOperationConsistent(targetEntity, ids, bornAt);
const operation = {
id,
data,
action,
filter: {
id: {
id: ids.length === 1 ? ids[0] : {
$in: ids,
},
},
@ -263,7 +345,7 @@ class Synchronizer {
};
}
else {
(0, console_1.assert)(maxHisOper);
(0, assert_1.default)(maxHisOper);
return {
timestamp: maxHisOper.bornAt,
};

7
lib/types/Sync.d.ts vendored
View File

@ -1,15 +1,16 @@
import { EntityDict } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { RemotePushInfo, RemotePullInfo, SelfEncryptInfo, SyncRemoteConfigBase, SyncSelfConfigBase, SyncConfig } from 'oak-domain/lib/types/Sync';
interface SyncRemoteConfigWrapper<ED extends EntityDict & BaseEntityDict> extends SyncRemoteConfigBase<ED> {
interface SyncRemoteConfigWrapper<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends SyncRemoteConfigBase<ED, Cxt> {
getRemotePushInfo: (userId: string) => Promise<RemotePushInfo>;
getRemotePullInfo: (id: string) => Promise<RemotePullInfo>;
}
interface SyncSelfConfigWrapper<ED extends EntityDict & BaseEntityDict> extends SyncSelfConfigBase<ED> {
getSelfEncryptInfo: () => Promise<SelfEncryptInfo>;
}
export interface SyncConfigWrapper<ED extends EntityDict & BaseEntityDict> {
export interface SyncConfigWrapper<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> {
self: SyncSelfConfigWrapper<ED>;
remotes: Array<SyncRemoteConfigWrapper<ED>>;
remotes: Array<SyncRemoteConfigWrapper<ED, Cxt>>;
}
export { RemotePushInfo, RemotePullInfo, SelfEncryptInfo, SyncConfig, };

View File

@ -120,9 +120,24 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
return context;
}
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace, nsServer?: Namespace, syncConfig?: SyncConfig<ED, Cxt>) {
/**
* configuration
*/
private getConfiguration() {
const dbConfigFile = join(this.path, 'configuration', 'mysql.json');
const dbConfig = require(dbConfigFile);
const syncConfigFile = join(this.path, 'lib', 'configuration', 'sync.js');
const syncConfig = existsSync(syncConfigFile) && require(syncConfigFile).default;
return {
dbConfig: dbConfig as MySQLConfiguration,
syncConfig: syncConfig as SyncConfig<ED, Cxt> | undefined,
};
}
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace, nsServer?: Namespace) {
super(path);
const dbConfig: MySQLConfiguration = require(join(path, '/configuration/mysql.json'));
const { dbConfig, syncConfig } = this.getConfiguration();
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
const { authDeduceRelationMap, selectFreeEntities, updateFreeDict } = require(`${path}/lib/config/relation`)
this.externalDependencies = require(OAK_EXTERNAL_LIBS_FILEPATH(join(path, 'lib')));
@ -135,6 +150,8 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
const {
self, remotes
} = syncConfig;
const { getSelfEncryptInfo, ...restSelf } = self;
this.synchronizer = new Synchronizer({
self: {
@ -151,39 +168,42 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
await context.rollback();
throw err;
}
}
},
...restSelf
},
remotes: remotes.map(
(r) => ({
entity: r.entity,
syncEntities: r.syncEntities,
getRemotePushInfo: async (id) => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await r.getPushInfo(id, context);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
throw err;
}
},
getRemotePullInfo: async (userId) => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await r.getPullInfo(userId, context);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
throw err;
}
}
})
(r) => {
const { getPushInfo, getPullInfo, ...rest } = r;
return {
getRemotePushInfo: async (id) => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await getPushInfo(id, context);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
throw err;
}
},
getRemotePullInfo: async (userId) => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await getPullInfo(userId, context);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
throw err;
}
},
...rest,
};
}
)
}, this.dbStore.getSchema());
}

View File

@ -1,11 +1,13 @@
import { EntityDict, StorageSchema, EndpointItem } from 'oak-domain/lib/types';
import { EntityDict, StorageSchema, EndpointItem, RemotePullInfo, SelfEncryptInfo, RemotePushInfo, PushEntityDef, PullEntityDef } from 'oak-domain/lib/types';
import { VolatileTrigger } from 'oak-domain/lib/types/Trigger';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { destructRelationPath, destructDirectPath } from 'oak-domain/lib/utils/relationPath';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { RemotePushInfo, SyncConfigWrapper, RemotePullInfo, SelfEncryptInfo } from './types/Sync';
import { assert } from 'console';
import assert from 'assert';
import { join } from 'path';
import { uniq } from 'oak-domain/lib/utils/lodash';
import { SyncConfigWrapper } from './types/Sync';
import { getRelevantIds } from 'oak-domain/lib/store/filter';
const OAK_SYNC_HEADER_ITEM = 'oak-sync-remote-id';
@ -31,6 +33,7 @@ async function pushRequestOnChannel<ED extends EntityDict & BaseEntityDict>(chan
const opers = queue.map(ele => ele.oper);
try {
// todo 加密
console.log('向远端结点sync数据', api, JSON.stringify(opers));
const res = await fetch(api, {
method: 'post',
headers: {
@ -65,56 +68,52 @@ async function pushRequestOnChannel<ED extends EntityDict & BaseEntityDict>(chan
}
export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> {
private config: SyncConfigWrapper<ED>;
private config: SyncConfigWrapper<ED, Cxt>;
private schema: StorageSchema<ED>;
private selfEncryptInfo?: SelfEncryptInfo;
private remotePullInfoMap: Record<string, Record<string, RemotePullInfo>> = {};
private remotePullInfoMap: Record<string, Record<string, {
pullInfo: RemotePullInfo,
pullEntityDict: Record<string, PullEntityDef<ED, keyof ED, Cxt>>;
}>> = {};
private remotePushChannel: Record<string, Channel<ED>> = {};
// 将产生的oper推送到远端Node。注意要尽量在本地阻止重复推送
private async pushOper(
oper: Partial<ED['oper']['Schema']>,
userIds: string[],
getRemoteAccessInfo: (userId: string) => Promise<RemotePushInfo>,
endpoint?: string) {
await Promise.all(
userIds.map(
async (userId) => {
if (!this.remotePushChannel[userId]) {
const { url } = await getRemoteAccessInfo(userId);
this.remotePushChannel[userId] = {
// todo 规范化
api: `${url}/endpoint/${endpoint || 'sync'}`,
queue: [],
};
}
const channel = this.remotePushChannel[userId];
if (channel.remoteMaxTimestamp && oper.bornAt as number < channel.remoteMaxTimestamp) {
// 说明已经同步过了
return;
}
userId: string,
url: string,
endpoint: string) {
if (!this.remotePushChannel[userId]) {
this.remotePushChannel[userId] = {
// todo 规范化
api: join(url, 'endpoint', endpoint),
queue: [],
};
}
const channel = this.remotePushChannel[userId];
if (channel.remoteMaxTimestamp && oper.bornAt as number < channel.remoteMaxTimestamp) {
// 说明已经同步过了
return;
}
const waiter = new Promise<void>(
(resolve, reject) => {
channel.queue.push({
oper,
resolve,
reject
});
}
);
if (!channel.handler) {
channel.handler = setTimeout(async () => {
assert(this.selfEncryptInfo);
await pushRequestOnChannel(channel, this.selfEncryptInfo!);
}, 1000); // 1秒钟集中同步一次
}
await waiter;
}
)
const waiter = new Promise<void>(
(resolve, reject) => {
channel.queue.push({
oper,
resolve,
reject
});
}
);
if (!channel.handler) {
channel.handler = setTimeout(async () => {
assert(this.selfEncryptInfo);
await pushRequestOnChannel(channel, this.selfEncryptInfo!);
}, 1000); // 1秒钟集中同步一次
}
await waiter;
}
private async loadPublicKey() {
@ -127,59 +126,81 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
// 根据remotes定义建立从entity到需要同步的远端结点信息的Map
const pushAccessMap: Record<string, Array<{
projection: ED[keyof ED]['Selection']['data']; // 从entity上取到相关user需要的projection
getUserIds: (rows: Partial<ED[keyof ED]['Schema']>[]) => string[]; // 从取得的行中获得userId的逻辑
getRemotePushInfo: (userId: string) => Promise<RemotePushInfo>; // 根据userId获得相应push远端的信息
endpoint?: string; // 远端接收endpoint的url
projection: ED[keyof ED]['Selection']['data']; // 从entity上取到相关user需要的projection
groupByUsers: (row: Partial<ED[keyof ED]['Schema']>[]) => Record<string, string[]>; // 根据相关数据行关联的userId对行ID进行分组
getRemotePushInfo: (userId: string) => Promise<RemotePushInfo>; // 根据userId获得相应push远端的信息
endpoint: string; // 远端接收endpoint的url
actions?: string[];
onSynchronized: PushEntityDef<ED, keyof ED, Cxt>['onSynchronized'];
entity: keyof ED;
}>> = {};
remotes.forEach(
(remote) => {
const { getRemotePushInfo, syncEntities, endpoint } = remote;
const pushEntityDefs = syncEntities.filter(ele => ele.direction === 'push');
const pushEntities = pushEntityDefs.map(ele => ele.entity);
pushEntities.forEach(
(entity) => {
const def = syncEntities.find(ele => ele.entity === entity)!;
const { path, relationName, recursive } = def;
const { getRemotePushInfo, pushEntities: pushEntityDefs, endpoint, pathToUser, relationName: rnRemote, entitySelf } = remote;
if (pushEntityDefs) {
const pushEntities = [] as Array<keyof ED>;
const endpoint2 = join(endpoint || 'sync', entitySelf as string || self.entitySelf as string);
for (const def of pushEntityDefs) {
const { path, relationName, recursive, entity, actions, onSynchronized } = def;
pushEntities.push(entity);
const relationName2 = relationName || rnRemote;
const path2 = pathToUser ? `${path}.${pathToUser}` : path;
const {
projection,
getData
} = relationName ? destructRelationPath(this.schema, entity, path, {
} = relationName2 ? destructRelationPath(this.schema, entity, path2, {
relation: {
name: relationName,
}
}, recursive) : destructDirectPath(this.schema, entity, path, recursive);
}, recursive) : destructDirectPath(this.schema, entity, path2, recursive);
const getUserIds = (rows: Partial<ED[keyof ED]['Schema']>[]) => {
const urs = rows.map(
(row) => getData(row)
).flat();
return uniq(
urs.map(
ele => ele.userId!
)
const groupByUsers = (rows: Partial<ED[keyof ED]['Schema']>[]) => {
const userRowDict: Record<string, string[]> = {};
rows.filter(
(row) => {
const userIds = getData(row)?.map(ele => ele.userId);
if (userIds) {
userIds.forEach(
(userId) => {
if (userRowDict[userId]) {
userRowDict[userId].push(row.id!);
}
else {
userRowDict[userId] = [row.id!];
}
}
)
}
}
);
return userRowDict;
};
if (!pushAccessMap[entity as string]) {
pushAccessMap[entity as string] = [{
projection,
getUserIds,
groupByUsers,
getRemotePushInfo,
endpoint,
endpoint: endpoint2,
entity,
actions,
onSynchronized
}];
}
else {
pushAccessMap[entity as string].push({
projection,
getUserIds,
groupByUsers,
getRemotePushInfo,
endpoint,
endpoint: endpoint2,
entity,
actions,
onSynchronized
});
}
}
)
}
}
);
@ -213,48 +234,107 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
entityId: 1,
},
},
bornAt: 1,
$$createAt$$: 1,
},
filter: {
id: ids[0],
}
}, { dontCollect: true });
const { operatorId, targetEntity, operEntity$oper: operEntities } = oper;
const { operatorId, targetEntity, operEntity$oper: operEntities, action, data } = oper;
const entityIds = operEntities!.map(
ele => ele.entityId!
);
const pushNodes = pushAccessMap[targetEntity!];
if (pushNodes) {
const pushEntityNodes = pushAccessMap[targetEntity!];
if (pushEntityNodes && pushEntityNodes.length > 0) {
// 每个pushEntityNode代表配置的一个remoteEntity
await Promise.all(
pushNodes.map(
async ({ projection, getUserIds, getRemotePushInfo: getRemoteAccessInfo, endpoint }) => {
const rows = await context.select(targetEntity!, {
data: {
id: 1,
...projection,
},
filter: {
id: {
$in: entityIds,
pushEntityNodes.map(
async (node) => {
const { projection, groupByUsers, getRemotePushInfo: getRemoteAccessInfo, endpoint, entity, actions, onSynchronized } = node;
if (!actions || actions.includes(action!)) {
const pushed = [] as Promise<{
userId: string,
rowIds: string[],
error?: Error,
}>[];
const rows = await context.select(targetEntity!, {
data: {
id: 1,
...projection,
},
},
}, { dontCollect: true });
filter: {
id: {
$in: entityIds,
},
},
}, { dontCollect: true, includedDeleted: true });
// userId就是需要发送给远端的user但是要将本次操作的user过滤掉操作的原本产生者
const userSendDict = groupByUsers(rows);
const pushToUserIdFn = async (userId: string) => {
const rowIds = userSendDict[userId];
// 推送到远端结点的oper
const oper2 = {
id: oper.id!,
action: action!,
data: (action === 'create' && data instanceof Array) ? data.filter(ele => rowIds.includes(ele.id)) : data!,
filter: {
id: rowIds.length === 1 ? rowIds[0] : {
$in: rowIds,
}
},
bornAt: oper.bornAt!,
targetEntity,
};
const { url } = await getRemoteAccessInfo(userId);
try {
await this.pushOper(oper2 as any /** 这里不明白为什么过不去 */, userId, url, endpoint);
return {
userId,
rowIds,
};
}
catch (err: any) {
return {
userId,
rowIds,
error: err as Error,
};
}
};
for (const userId in userSendDict) {
if (userId !== operatorId) {
pushed.push(pushToUserIdFn(userId));
}
}
// userId就是需要发送给远端的user但是要将本次操作的user过滤掉他是操作的产生者
const userIds = getUserIds(rows).filter(
(ele) => ele !== operatorId
);
if (userIds.length > 0) {
await this.pushOper(oper, userIds, getRemoteAccessInfo, endpoint);
if (pushed.length > 0) {
const result = await Promise.all(pushed);
if (onSynchronized) {
await onSynchronized({
action: action!,
data: data!,
result,
}, context);
}
else {
const errResult = result.find(
ele => !!ele.error
);
if (errResult) {
console.error('同步数据时出错', errResult.userId, errResult.rowIds, errResult.error);
throw errResult.error;
}
}
}
}
return undefined;
}
)
);
return entityIds.length * pushNodes.length;
return entityIds.length * pushEntityNodes.length;
}
return 0;
}
@ -263,7 +343,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
return createOperTrigger;
}
constructor(config: SyncConfigWrapper<ED>, schema: StorageSchema<ED>) {
constructor(config: SyncConfigWrapper<ED, Cxt>, schema: StorageSchema<ED>) {
this.config = config;
this.schema = schema;
this.loadPublicKey();
@ -289,24 +369,36 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
fn: async (context, params, headers, req, body) => {
// body中是传过来的oper数组信息
const { entity } = params;
const {[OAK_SYNC_HEADER_ITEM]: id} = headers;
const { [OAK_SYNC_HEADER_ITEM]: id } = headers;
console.log('接收到来自远端的sync数据', entity, JSON.stringify(body));
try {
// todo 这里先缓存,不考虑本身同步相关信息的更新
if (!this.remotePullInfoMap[entity]) {
this.remotePullInfoMap[entity] = {};
}
if (!this.remotePullInfoMap[entity]![id as string]) {
const { getRemotePullInfo } = this.config.remotes.find(ele => ele.entity === entity)!;
this.remotePullInfoMap[entity]![id as string] = await getRemotePullInfo(id as string);
const { getRemotePullInfo, pullEntities } = this.config.remotes.find(ele => ele.entity === entity)!;
const pullEntityDict = {} as Record<string, PullEntityDef<ED, keyof ED, Cxt>;
if (pullEntities) {
pullEntities.forEach(
(def) => pullEntityDict[def.entity as string] = def
);
}
this.remotePullInfoMap[entity]![id as string] = {
pullInfo: await getRemotePullInfo(id as string),
pullEntityDict,
};
}
const pullInfo = this.remotePullInfoMap[entity][id as string];
const { pullInfo, pullEntityDict } = this.remotePullInfoMap[entity][id as string]!;
const { userId, algorithm, publicKey } = pullInfo;
// todo 解密
assert(userId);
context.setCurrentUserId(userId);
// 如果本次同步中有bornAt比本用户操作的最大的bornAt要小则说明是重复更新直接返回
const [ maxHisOper ] = await context.select('oper', {
const [maxHisOper] = await context.select('oper', {
data: {
id: 1,
bornAt: 1,
@ -325,24 +417,32 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
indexFrom: 0,
count: 1,
}, { dontCollect: true });
const opers = body as ED['oper']['Schema'][];
const legalOpers = maxHisOper ? opers.filter(
ele => ele.bornAt > maxHisOper.bornAt!
) : opers;
if (legalOpers.length > 0) {
if (legalOpers.length > 0) {
for (const oper of legalOpers) {
const { id, targetEntity, action, data, bornAt, operEntity$oper: operEntities } = oper;
const ids = operEntities!.map(ele => ele.id);
const { id, targetEntity, action, data, bornAt, filter } = oper;
const ids = getRelevantIds(filter!);
assert(ids.length > 0);
this.checkOperationConsistent(targetEntity, ids, bornAt as number);
if (pullEntityDict && pullEntityDict[targetEntity]) {
const { process } = pullEntityDict[targetEntity];
if (process) {
await process(action!, data, context);
}
}
const operation: ED[keyof ED]['Operation'] = {
id,
data,
action,
filter: {
id: {
id: ids.length === 1 ? ids[0] : {
$in: ids,
},
},

View File

@ -1,8 +1,9 @@
import { EntityDict } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { BackendRuntimeContext } from 'oak-frontend-base/lib/context/BackendRuntimeContext';
import { RemotePushInfo, RemotePullInfo, SelfEncryptInfo, SyncRemoteConfigBase, SyncSelfConfigBase, SyncConfig } from 'oak-domain/lib/types/Sync';
interface SyncRemoteConfigWrapper<ED extends EntityDict & BaseEntityDict> extends SyncRemoteConfigBase<ED> {
interface SyncRemoteConfigWrapper<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends SyncRemoteConfigBase<ED, Cxt> {
getRemotePushInfo: (userId: string) => Promise<RemotePushInfo>;
getRemotePullInfo: (id: string) => Promise<RemotePullInfo>;
};
@ -11,9 +12,9 @@ interface SyncSelfConfigWrapper<ED extends EntityDict & BaseEntityDict> extends
getSelfEncryptInfo: () => Promise<SelfEncryptInfo>;
};
export interface SyncConfigWrapper<ED extends EntityDict & BaseEntityDict> {
export interface SyncConfigWrapper<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> {
self: SyncSelfConfigWrapper<ED>;
remotes: Array<SyncRemoteConfigWrapper<ED>>;
remotes: Array<SyncRemoteConfigWrapper<ED, Cxt>>;
};
export {