Synchronizer(未测试)

This commit is contained in:
Xu Chang 2024-02-07 16:42:20 +08:00
parent 9421e54990
commit 1cdbccee78
8 changed files with 548 additions and 63 deletions

7
lib/AppLoader.d.ts vendored
View File

@ -6,15 +6,18 @@ import { BackendRuntimeContext } from 'oak-frontend-base';
import { IncomingHttpHeaders, IncomingMessage } from 'http';
import { Namespace } from 'socket.io';
import DataSubscriber from './cluster/DataSubscriber';
import Synchronizer from './Synchronizer';
import { SyncConfig } from './types/Sync';
export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends GeneralAppLoader<ED, Cxt> {
protected dbStore: DbStore<ED, Cxt>;
private aspectDict;
private externalDependencies;
protected dataSubscriber?: DataSubscriber<ED, Cxt>;
protected synchronizer?: Synchronizer<ED, Cxt>;
protected contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>;
private requireSth;
protected makeContext(cxtStr?: string, headers?: IncomingHttpHeaders): Promise<Cxt>;
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace, nsServer?: Namespace);
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace, nsServer?: Namespace, syncConfig?: SyncConfig<ED, Cxt>);
protected registerTrigger(trigger: Trigger<ED, keyof ED, Cxt>): void;
initTriggers(): void;
mount(initialize?: true): Promise<void>;
@ -26,7 +29,7 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
}>;
initialize(dropIfExists?: boolean): Promise<void>;
getStore(): DbStore<ED, Cxt>;
getEndpoints(prefix: string): [string, "get" | "post" | "put" | "delete", string, (params: Record<string, string>, headers: IncomingHttpHeaders, req: IncomingMessage, body?: any) => Promise<any>][];
getEndpoints(prefix: string): [string, "post" | "get" | "put" | "delete", string, (params: Record<string, string>, headers: IncomingHttpHeaders, req: IncomingMessage, body?: any) => Promise<any>][];
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt): Promise<OperationResult<ED>>;
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt): Promise<Partial<ED[T]["Schema"]>[]>;
protected execWatcher(watcher: Watcher<ED, keyof ED, Cxt>): Promise<OperationResult<ED> | undefined>;

View File

@ -15,11 +15,13 @@ const index_1 = tslib_1.__importStar(require("oak-common-aspect/lib/index"));
const assert_1 = tslib_1.__importDefault(require("assert"));
const DataSubscriber_1 = tslib_1.__importDefault(require("./cluster/DataSubscriber"));
const env_2 = require("./cluster/env");
const Synchronizer_1 = tslib_1.__importDefault(require("./Synchronizer"));
class AppLoader extends types_1.AppLoader {
dbStore;
aspectDict;
externalDependencies;
dataSubscriber;
synchronizer;
contextBuilder;
requireSth(filePath) {
const depFilePath = (0, path_1.join)(this.path, filePath);
@ -92,7 +94,7 @@ class AppLoader extends types_1.AppLoader {
context.headers = headers;
return context;
}
constructor(path, contextBuilder, ns, nsServer) {
constructor(path, contextBuilder, ns, nsServer, syncConfig) {
super(path);
const dbConfig = require((0, path_1.join)(path, '/configuration/mysql.json'));
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
@ -102,26 +104,76 @@ class AppLoader extends types_1.AppLoader {
this.dbStore = new DbStore_1.DbStore(storageSchema, (cxtStr) => this.makeContext(cxtStr), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
if (ns) {
this.dataSubscriber = new DataSubscriber_1.default(ns, (scene) => this.contextBuilder(scene)(this.dbStore), nsServer);
this.contextBuilder = (scene) => async (store) => {
const context = await contextBuilder(scene)(store);
// 注入在提交前向dataSubscribe
const originCommit = context.commit;
context.commit = async () => {
const { eventOperationMap, opRecords } = context;
await originCommit.call(context);
}
if (syncConfig) {
const { self, remotes } = syncConfig;
this.synchronizer = new Synchronizer_1.default({
self: {
// entity: self.entity,
getSelfEncryptInfo: async () => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await self.getSelfEncryptInfo(context);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
throw err;
}
}
},
remotes: remotes.map((r) => ({
entity: r.entity,
syncEntities: r.syncEntities,
getRemotePushInfo: async (id) => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await r.getRemotePushInfo(id, context);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
throw err;
}
},
getRemotePullInfo: async (userId) => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await r.getRemotePullInfo(userId, context);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
throw err;
}
}
}))
}, this.dbStore.getSchema());
}
this.contextBuilder = (scene) => async (store) => {
const context = await contextBuilder(scene)(store);
const originCommit = context.commit;
context.commit = async () => {
const { eventOperationMap, opRecords } = context;
await originCommit.call(context);
// 注入在提交后向dataSubscribe发送订阅的事件
if (this.dataSubscriber) {
Object.keys(eventOperationMap).forEach((event) => {
const ids = eventOperationMap[event];
const opRecordsToPublish = opRecords.filter((ele) => !!ele.id && ids.includes(ele.id));
(0, assert_1.default)(opRecordsToPublish.length === ids.length, '要推送的事件的operation数量不足请检查确保');
this.dataSubscriber.publishEvent(event, opRecordsToPublish, context.getSubscriberId());
});
};
return context;
}
};
}
else {
this.contextBuilder = contextBuilder;
}
return context;
};
}
registerTrigger(trigger) {
this.dbStore.registerTrigger(trigger);
@ -135,6 +187,11 @@ class AppLoader extends types_1.AppLoader {
adTriggers.forEach((trigger) => this.registerTrigger(trigger));
checkers.forEach((checker) => this.dbStore.registerChecker(checker));
adCheckers.forEach((checker) => this.dbStore.registerChecker(checker));
if (this.synchronizer) {
// 同步数据到远端结点通过commit trigger来完成
const syncTriggers = this.synchronizer.getSyncTriggers();
syncTriggers.forEach((trigger) => this.registerTrigger(trigger));
}
}
async mount(initialize) {
const { path } = this;
@ -250,6 +307,10 @@ class AppLoader extends types_1.AppLoader {
transformEndpointItem(router, item);
}
}
if (this.synchronizer) {
const syncEp = this.synchronizer.getSelfEndpoint();
transformEndpointItem(syncEp.name, syncEp);
}
return endPointRouters;
}
operateInWatcher(entity, operation, context) {

23
lib/Synchronizer.d.ts vendored Normal file
View File

@ -0,0 +1,23 @@
import { EntityDict, StorageSchema, EndpointItem } from 'oak-domain/lib/types';
import { VolatileTrigger } from 'oak-domain/lib/types/Trigger';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { BackendRuntimeContext } from 'oak-frontend-base';
import { SyncConfigWrapper } from './types/Sync';
export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> {
private config;
private schema;
private selfEncryptInfo?;
private remotePullInfoMap;
private remotePushChannel;
private pushOper;
private loadPublicKey;
private makeCreateOperTrigger;
constructor(config: SyncConfigWrapper<ED>, schema: StorageSchema<ED>);
/**
* sync的定义 commit triggers
* @returns
*/
getSyncTriggers(): VolatileTrigger<ED, keyof ED, Cxt>[];
private checkOperationConsistent;
getSelfEndpoint(): EndpointItem<ED, Cxt>;
}

281
lib/Synchronizer.js Normal file
View File

@ -0,0 +1,281 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const relationPath_1 = require("oak-domain/lib/utils/relationPath");
const console_1 = require("console");
const lodash_1 = require("oak-domain/lib/utils/lodash");
const OAK_SYNC_HEADER_ITEM = 'oak-sync-remote-id';
async function pushRequestOnChannel(channel, selfEncryptInfo) {
const { queue, api } = channel;
channel.queue = [];
channel.lastPushTimestamp = Date.now();
channel.handler = undefined;
const opers = queue.map(ele => ele.oper);
try {
// todo 加密
const res = await fetch(api, {
method: 'post',
headers: {
'Content-Type': 'application/json',
[OAK_SYNC_HEADER_ITEM]: selfEncryptInfo.id,
},
body: JSON.stringify(opers),
});
if (res.status !== 200) {
throw new Error(`访问api「${api}」的结果不是200。「${res.status}`);
}
const json = await res.json();
const { timestamp, error } = json;
if (error) {
throw new Error(`访问api「${api}」的结果出错,是${error}`);
}
if (!channel.remoteMaxTimestamp || channel.remoteMaxTimestamp < timestamp) {
channel.remoteMaxTimestamp = timestamp;
}
queue.forEach((ele) => ele.resolve());
}
catch (err) {
queue.forEach(({ reject }) => reject(err));
}
}
class Synchronizer {
config;
schema;
selfEncryptInfo;
remotePullInfoMap = {};
remotePushChannel = {};
// 将产生的oper推送到远端Node。注意要尽量在本地阻止重复推送
async pushOper(oper, userIds, getRemoteAccessInfo, endpoint) {
await Promise.all(userIds.map(async (userId) => {
if (!this.remotePushChannel[userId]) {
const { url } = await getRemoteAccessInfo(userId);
this.remotePushChannel[userId] = {
// todo 规范化
api: `${url}/endpoint/${endpoint || 'sync'}`,
queue: [],
};
}
const channel = this.remotePushChannel[userId];
if (channel.remoteMaxTimestamp && oper.bornAt < channel.remoteMaxTimestamp) {
// 说明已经同步过了
return;
}
const waiter = new Promise((resolve, reject) => {
channel.queue.push({
oper,
resolve,
reject
});
});
if (!channel.handler) {
channel.handler = setTimeout(async () => {
(0, console_1.assert)(this.selfEncryptInfo);
await pushRequestOnChannel(channel, this.selfEncryptInfo);
}, 1000); // 1秒钟集中同步一次
}
await waiter;
}));
}
async loadPublicKey() {
this.selfEncryptInfo = await this.config.self.getSelfEncryptInfo();
}
makeCreateOperTrigger() {
const { config } = this;
const { remotes, self } = config;
// 根据remotes定义建立从entity到需要同步的远端结点信息的Map
const pushAccessMap = {};
remotes.forEach((remote) => {
const { getRemotePushInfo, syncEntities, endpoint } = remote;
const pushEntityDefs = syncEntities.filter(ele => ele.direction === 'push');
const pushEntities = pushEntityDefs.map(ele => ele.entity);
pushEntities.forEach((entity) => {
const def = syncEntities.find(ele => ele.entity === entity);
const { path, relationName, recursive } = def;
const { projection, getData } = relationName ? (0, relationPath_1.destructRelationPath)(this.schema, entity, path, {
relation: {
name: relationName,
}
}, recursive) : (0, relationPath_1.destructDirectPath)(this.schema, entity, path, recursive);
const getUserIds = (rows) => {
const urs = rows.map((row) => getData(row)).flat();
return (0, lodash_1.uniq)(urs.map(ele => ele.userId));
};
if (!pushAccessMap[entity]) {
pushAccessMap[entity] = [{
projection,
getUserIds,
getRemotePushInfo,
endpoint,
}];
}
else {
pushAccessMap[entity].push({
projection,
getUserIds,
getRemotePushInfo,
endpoint,
});
}
});
});
const pushEntities = Object.keys(pushAccessMap);
// push相关联的entity在发生操作时需要将operation推送到远端
const createOperTrigger = {
name: 'push oper to remote node',
entity: 'oper',
action: 'create',
when: 'commit',
strict: 'makeSure',
check: (operation) => {
const { data } = operation;
return pushEntities.includes(data.targetEntity);
},
fn: async ({ ids }, context) => {
(0, console_1.assert)(ids.length === 1);
const [oper] = await context.select('oper', {
data: {
id: 1,
action: 1,
data: 1,
targetEntity: 1,
operatorId: 1,
operEntity$oper: {
$entity: 'operEntity',
data: {
id: 1,
entity: 1,
entityId: 1,
},
},
$$createAt$$: 1,
},
filter: {
id: ids[0],
}
}, { dontCollect: true });
const { operatorId, targetEntity, operEntity$oper: operEntities } = oper;
const entityIds = operEntities.map(ele => ele.entityId);
const pushNodes = pushAccessMap[targetEntity];
if (pushNodes) {
await Promise.all(pushNodes.map(async ({ projection, getUserIds, getRemotePushInfo: getRemoteAccessInfo, endpoint }) => {
const rows = await context.select(targetEntity, {
data: {
id: 1,
...projection,
},
filter: {
id: {
$in: entityIds,
},
},
}, { dontCollect: true });
// userId就是需要发送给远端的user但是要将本次操作的user过滤掉他是操作的产生者
const userIds = getUserIds(rows).filter((ele) => ele !== operatorId);
if (userIds.length > 0) {
await this.pushOper(oper, userIds, getRemoteAccessInfo, endpoint);
}
return undefined;
}));
return entityIds.length * pushNodes.length;
}
return 0;
}
};
return createOperTrigger;
}
constructor(config, schema) {
this.config = config;
this.schema = schema;
this.loadPublicKey();
}
/**
* 根据sync的定义生成对应的 commit triggers
* @returns
*/
getSyncTriggers() {
return [this.makeCreateOperTrigger()];
}
async checkOperationConsistent(entity, ids, bornAt) {
}
getSelfEndpoint() {
return {
name: this.config.self.endpoint || 'sync',
method: 'post',
params: ['entity'],
fn: async (context, params, headers, req, body) => {
// body中是传过来的oper数组信息
const { entity } = params;
const { [OAK_SYNC_HEADER_ITEM]: id } = headers;
try {
// todo 这里先缓存,不考虑本身同步相关信息的更新
if (!this.remotePullInfoMap[entity]) {
this.remotePullInfoMap[entity] = {};
}
if (!this.remotePullInfoMap[entity][id]) {
const { getRemotePullInfo } = this.config.remotes.find(ele => ele.entity === entity);
this.remotePullInfoMap[entity][id] = await getRemotePullInfo(id);
}
const pullInfo = this.remotePullInfoMap[entity][id];
const { userId, algorithm, publicKey } = pullInfo;
// todo 解密
// 如果本次同步中有bornAt比本用户操作的最大的bornAt要小则说明是重复更新直接返回
const [maxHisOper] = await context.select('oper', {
data: {
id: 1,
bornAt: 1,
},
filter: {
operatorId: userId,
},
sorter: [
{
$attr: {
bornAt: 1,
},
$direction: 'desc',
},
],
indexFrom: 0,
count: 1,
}, { dontCollect: true });
const opers = body;
const legalOpers = maxHisOper ? opers.filter(ele => ele.bornAt > maxHisOper.bornAt) : opers;
if (legalOpers.length > 0) {
for (const oper of legalOpers) {
const { id, targetEntity, action, data, bornAt, operEntity$oper: operEntities } = oper;
const ids = operEntities.map(ele => ele.id);
this.checkOperationConsistent(targetEntity, ids, bornAt);
const operation = {
id,
data,
action,
filter: {
id: {
$in: ids,
},
},
bornAt: bornAt,
};
await context.operate(targetEntity, operation, {});
}
// 因为legalOpers就是排好序的所以直接返回最后一项的bornAt
return {
timestamp: legalOpers[legalOpers.length - 1].bornAt,
};
}
else {
(0, console_1.assert)(maxHisOper);
return {
timestamp: maxHisOper.bornAt,
};
}
}
catch (err) {
return {
error: JSON.stringify(err),
};
}
}
};
}
}
exports.default = Synchronizer;

57
lib/types/Sync.d.ts vendored Normal file
View File

@ -0,0 +1,57 @@
import { EntityDict } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { BackendRuntimeContext } from 'oak-frontend-base';
export type Algorithm = 'rsa' | 'ec' | 'ed25519';
export type RemotePushInfo = {
url: string;
userId: string;
};
export type RemotePullInfo = {
id: string;
publicKey: string;
algorithm: Algorithm;
userId: string;
};
export type SelfEncryptInfo = {
id: string;
privateKey: string;
algorithm: Algorithm;
};
export interface SyncEntityDef<ED extends EntityDict & BaseEntityDict, T extends keyof ED> {
entity: T;
path: string;
recursive?: boolean;
relationName?: string;
direction: 'pull' | 'push';
}
interface SyncRemoteConfigBase<ED extends EntityDict & BaseEntityDict> {
entity: keyof ED;
endpoint?: string;
syncEntities: Array<SyncEntityDef<ED, keyof ED>>;
}
interface SyncRemoteConfigWrapper<ED extends EntityDict & BaseEntityDict> extends SyncRemoteConfigBase<ED> {
getRemotePushInfo: (userId: string) => Promise<RemotePushInfo>;
getRemotePullInfo: (id: string) => Promise<RemotePullInfo>;
}
interface SyncRemoteConfig<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends SyncRemoteConfigBase<ED> {
getRemotePushInfo: (userId: string, context: Cxt) => Promise<RemotePushInfo>;
getRemotePullInfo: (id: string, context: Cxt) => Promise<RemotePullInfo>;
}
interface SyncSelfConfigBase<ED extends EntityDict & BaseEntityDict> {
endpoint?: string;
}
interface SyncSelfConfigWrapper<ED extends EntityDict & BaseEntityDict> extends SyncSelfConfigBase<ED> {
getSelfEncryptInfo: () => Promise<SelfEncryptInfo>;
}
interface SyncSelfConfig<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends SyncSelfConfigBase<ED> {
getSelfEncryptInfo: (context: Cxt) => Promise<SelfEncryptInfo>;
}
export interface SyncConfig<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> {
self: SyncSelfConfig<ED, Cxt>;
remotes: Array<SyncRemoteConfig<ED, Cxt>>;
}
export interface SyncConfigWrapper<ED extends EntityDict & BaseEntityDict> {
self: SyncSelfConfigWrapper<ED>;
remotes: Array<SyncRemoteConfigWrapper<ED>>;
}
export {};

11
lib/types/Sync.js Normal file
View File

@ -0,0 +1,11 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
;
;
;
;
;
;
;
;
;

View File

@ -155,13 +155,26 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
},
remotes: remotes.map(
(r) => ({
// entity: r.entity,
entity: r.entity,
syncEntities: r.syncEntities,
getRemoteAccessInfo: async (id) => {
getRemotePushInfo: async (id) => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await r.getRemoteAccessInfo(id, context);
const result = await r.getRemotePushInfo(id, context);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
throw err;
}
},
getRemotePullInfo: async (userId) => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await r.getRemotePullInfo(userId, context);
await context.commit();
return result;
}

View File

@ -3,7 +3,7 @@ import { VolatileTrigger } from 'oak-domain/lib/types/Trigger';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { destructRelationPath, destructDirectPath } from 'oak-domain/lib/utils/relationPath';
import { BackendRuntimeContext } from 'oak-frontend-base';
import { RemotePushInfo, SyncConfigWrapper, Algorithm, RemotePullInfo, SelfEncryptInfo } from './types/Sync';
import { RemotePushInfo, SyncConfigWrapper, RemotePullInfo, SelfEncryptInfo } from './types/Sync';
import { assert } from 'console';
import { uniq } from 'oak-domain/lib/utils/lodash';
@ -84,12 +84,13 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
if (!this.remotePushChannel[userId]) {
const { url } = await getRemoteAccessInfo(userId);
this.remotePushChannel[userId] = {
api: `${url}/${endpoint || 'sync'}`,
// todo 规范化
api: `${url}/endpoint/${endpoint || 'sync'}`,
queue: [],
};
}
const channel = this.remotePushChannel[userId];
if (channel.remoteMaxTimestamp && oper.bornAt! < channel.remoteMaxTimestamp) {
if (channel.remoteMaxTimestamp && oper.bornAt as number < channel.remoteMaxTimestamp) {
// 说明已经同步过了
return;
}
@ -276,59 +277,94 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
return [this.makeCreateOperTrigger()] as Array<VolatileTrigger<ED, keyof ED, Cxt>>;
}
private async checkOperationConsistent(entity: keyof ED, ids: string[], bornAt: number) {
}
getSelfEndpoint(): EndpointItem<ED, Cxt> {
return {
name: this.config.self.endpoint || 'sync',
method: 'post',
params: ['entity'],
fn: async (context, params, headers, req, body) => {
// body中是传过来的oper数组信息
const { entity } = params;
const {[OAK_SYNC_HEADER_ITEM]: id} = headers;
if (!this.remotePullInfoMap[entity]) {
this.remotePullInfoMap[entity] = {};
}
if (!this.remotePullInfoMap[entity]![id as string]) {
const { getRemotePullInfo } = this.config.remotes.find(ele => ele.entity === entity)!;
this.remotePullInfoMap[entity]![id as string] = await getRemotePullInfo(id as string);
}
const pullInfo = this.remotePullInfoMap[entity][id as string];
const { userId, algorithm, publicKey } = pullInfo;
// todo 解密
// 如果本次同步中有bornAt比本用户操作的最大的bornAt要小则说明是重复更新直接返回
const [ maxOper ] = await context.select('oper', {
data: {
id: 1,
bornAt: 1,
},
filter: {
operatorId: userId,
},
sorter: [
{
$attr: {
bornAt: 1,
},
$direction: 'desc',
try {
// todo 这里先缓存,不考虑本身同步相关信息的更新
if (!this.remotePullInfoMap[entity]) {
this.remotePullInfoMap[entity] = {};
}
if (!this.remotePullInfoMap[entity]![id as string]) {
const { getRemotePullInfo } = this.config.remotes.find(ele => ele.entity === entity)!;
this.remotePullInfoMap[entity]![id as string] = await getRemotePullInfo(id as string);
}
const pullInfo = this.remotePullInfoMap[entity][id as string];
const { userId, algorithm, publicKey } = pullInfo;
// todo 解密
// 如果本次同步中有bornAt比本用户操作的最大的bornAt要小则说明是重复更新直接返回
const [ maxHisOper ] = await context.select('oper', {
data: {
id: 1,
bornAt: 1,
},
],
indexFrom: 0,
count: 1,
}, { dontCollect: true });
const opers = body as ED['oper']['Schema'][];
const legalOpers = maxOper ? opers.filter(
ele => ele.bornAt > maxOper.bornAt
) : opers;
if (legalOpers.length > 0) {
filter: {
operatorId: userId,
},
sorter: [
{
$attr: {
bornAt: 1,
},
$direction: 'desc',
},
],
indexFrom: 0,
count: 1,
}, { dontCollect: true });
const opers = body as ED['oper']['Schema'][];
const legalOpers = maxHisOper ? opers.filter(
ele => ele.bornAt > maxHisOper.bornAt!
) : opers;
if (legalOpers.length > 0) {
for (const oper of legalOpers) {
const { id, targetEntity, action, data, bornAt, operEntity$oper: operEntities } = oper;
const ids = operEntities!.map(ele => ele.id);
this.checkOperationConsistent(targetEntity, ids, bornAt as number);
const operation: ED[keyof ED]['Operation'] = {
id,
data,
action,
filter: {
id: {
$in: ids,
},
},
bornAt: bornAt as number,
};
await context.operate(targetEntity, operation, {});
}
// 因为legalOpers就是排好序的所以直接返回最后一项的bornAt
return {
timestamp: legalOpers[legalOpers.length - 1].bornAt,
};
}
else {
assert(maxHisOper);
return {
timestamp: maxHisOper.bornAt,
};
}
}
else {
assert(maxOper);
catch (err) {
return {
timestamp: maxOper.bornAt,
error: JSON.stringify(err),
};
}
}