新增Synchronizer功能,未编译(不影响其他人使用)

This commit is contained in:
Xu Chang 2024-02-05 15:59:55 +08:00
parent 05b8df221f
commit 0da7a7bedb
3 changed files with 227 additions and 17 deletions

View File

@ -18,6 +18,8 @@ import { Server as SocketIoServer, Namespace } from 'socket.io';
import DataSubscriber from './cluster/DataSubscriber';
import { getClusterInfo } from './cluster/env';
import Synchronizer from './Synchronizer';
import { SyncConfig } from './types/Sync';
export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> extends GeneralAppLoader<ED, Cxt> {
@ -25,6 +27,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
private aspectDict: Record<string, Aspect<ED, Cxt>>;
private externalDependencies: string[];
protected dataSubscriber?: DataSubscriber<ED, Cxt>;
protected synchronizer?: Synchronizer<ED, Cxt>;
protected contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>;
private requireSth(filePath: string): any {
@ -117,7 +120,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
return context;
}
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace, nsServer?: Namespace) {
constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore<ED, Cxt>) => Promise<Cxt>, ns?: Namespace, nsServer?: Namespace, syncConfig?: SyncConfig<ED, Cxt>) {
super(path);
const dbConfig: MySQLConfiguration = require(join(path, '/configuration/mysql.json'));
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
@ -126,34 +129,77 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
this.aspectDict = Object.assign({}, generalAspectDict, this.requireSth('lib/aspects/index'));
this.dbStore = new DbStore<ED, Cxt>(storageSchema, (cxtStr) => this.makeContext(cxtStr), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
if (ns) {
this.dataSubscriber = new DataSubscriber(ns, (scene) => this.contextBuilder(scene)(this.dbStore), nsServer);
this.contextBuilder = (scene) => async (store) => {
const context = await contextBuilder(scene)(store);
this.dataSubscriber = new DataSubscriber(ns, (scene) => this.contextBuilder(scene)(this.dbStore), nsServer);
}
if (syncConfig) {
const {
self, remotes
} = syncConfig;
this.synchronizer = new Synchronizer({
self: {
entity: self.entity,
getSelfEncryptInfo: async() => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await self.getSelfEncryptInfo(context);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
throw err;
}
}
},
remotes: remotes.map(
(r) => ({
entity: r.entity,
syncEntities: r.syncEntities,
getRemoteAccessInfo: async (id) => {
const context = await contextBuilder()(this.dbStore);
await context.begin();
try {
const result = await r.getRemoteAccessInfo(id, context);
await context.commit();
return result;
}
catch (err) {
await context.rollback();
throw err;
}
}
})
)
}, this.dbStore.getSchema());
}
// 注入在提交前向dataSubscribe
const originCommit = context.commit;
context.commit = async () => {
const { eventOperationMap, opRecords } = context;
await originCommit.call(context);
this.contextBuilder = (scene) => async (store) => {
const context = await contextBuilder(scene)(store);
const originCommit = context.commit;
context.commit = async () => {
const { eventOperationMap, opRecords } = context;
await originCommit.call(context);
// 注入在提交后向dataSubscribe发送订阅的事件
if (this.dataSubscriber) {
Object.keys(eventOperationMap).forEach(
(event) => {
const ids = eventOperationMap[event];
const opRecordsToPublish = (opRecords as CreateOpResult<ED, keyof ED>[]).filter(
(ele) => !!ele.id && ids.includes(ele.id)
);
assert(opRecordsToPublish.length === ids.length, '要推送的事件的operation数量不足请检查确保');
this.dataSubscriber!.publishEvent(event, opRecordsToPublish, context.getSubscriberId());
}
)
};
);
}
};
return context;
}
}
else {
this.contextBuilder = contextBuilder;
return context;
}
}
@ -183,6 +229,10 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
adCheckers.forEach(
(checker) => this.dbStore.registerChecker(checker)
);
if (this.synchronizer) {
// 同步数据到远端结点通过commit trigger来完成
}
}
async mount(initialize?: true) {
@ -316,6 +366,11 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
transformEndpointItem(router, item);
}
}
if (this.synchronizer) {
const syncEp = this.synchronizer.getSelfEndpoint();
transformEndpointItem(syncEp.name, syncEp);
}
return endPointRouters;
}

96
src/Synchronizer.ts Normal file
View File

@ -0,0 +1,96 @@
import { EntityDict, StorageSchema, EndpointItem } from 'oak-domain/lib/types';
import { VolatileTrigger } from 'oak-domain/lib/types/Trigger';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { BackendRuntimeContext } from 'oak-frontend-base';
import { SyncConfigWrapper, SyncEntityDef } from './types/Sync';
import { assert } from 'console';
export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> {
private config: SyncConfigWrapper<ED>;
private schema: StorageSchema<ED>;
private analyzeConfig() {
const { config, schema } = this;
const { remotes } = config;
const analyzeSyncEntityDef = (defs: SyncEntityDef<ED, keyof ED>[]) => {
const pushEntityDefs = defs.filter(ele => ele.direction === 'push');
const pushEntities = pushEntityDefs.map(ele => ele.entity);
// push相关联的entity在发生操作时需要将操作推送到远端
const createOperTrigger: VolatileTrigger<ED, 'oper', Cxt> = {
name: 'push oper to remote node',
entity: 'oper',
action: 'create',
when: 'commit',
strict: 'makeSure',
check: (operation: ED['oper']['Create']) => {
const { data } = operation;
return pushEntities.includes((<ED['oper']['CreateSingle']['data']>data).targetEntity!);
},
fn: async ({ ids }, context) => {
assert(ids.length === 1);
const [oper] = await context.select('oper', {
data: {
id: 1,
action: 1,
data: 1,
targetEntity: 1,
operEntity$oper: {
$entity: 'operEntity',
data: {
id: 1,
entity: 1,
entityId: 1,
},
},
},
filter: {
id: ids[0],
}
}, { dontCollect: true });
const def = pushEntityDefs.find(
ele => ele.entity === oper.targetEntity!
)!;
const { entity, path, relationName, direction } = def;
// 要找到对应的所有需要推送的node对象信息
return 1;
}
}
};
remotes.forEach(
(remote) => analyzeSyncEntityDef(remote.syncEntities)
);
}
constructor(config: SyncConfigWrapper<ED>, schema: StorageSchema<ED>) {
this.config = config;
this.schema = schema;
}
/**
* sync的定义 commit triggers
* @returns
*/
getSyncTriggers(): Array<VolatileTrigger<ED, keyof ED, Cxt>> {
return [];
}
getSelfEndpoint(): EndpointItem<ED, Cxt> {
return {
name: this.config.self.endpoint || 'sync',
method: 'post',
fn: async (context, params, headers, req, body) => {
}
};
}
}

59
src/types/Sync.ts Normal file
View File

@ -0,0 +1,59 @@
import { EntityDict } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { BackendRuntimeContext } from 'oak-frontend-base';
type RemoteAccessInfo = {
url: string;
publicKey: string;
userId: string;
algorithm: 'rsa' | 'ec' | 'ed25519';
};
type SelfEncryptInfo = {
privateKey: string;
algorithm: 'rsa' | 'ec' | 'ed25519';
};
export interface SyncEntityDef<ED extends EntityDict & BaseEntityDict, T extends keyof ED> {
entity: T; // 需要同步的entity
path: string; // 此entity到需要同步到的根对象的路径
relationName?: string; // 要同步的user与根对象的relation名称为空说明是userId)
direction: 'pull' | 'push'; // pull说明是从远端拉过来push说明是从本地推过去
};
interface SyncRemoteConfigBase<ED extends EntityDict & BaseEntityDict, T extends keyof ED> {
entity: T;
endpoint?: string; // 对方结点同步数据的endpoint默认为/sync
syncEntities: Array<SyncEntityDef<ED, keyof ED>>;
};
interface SyncRemoteConfigWrapper<ED extends EntityDict & BaseEntityDict, T extends keyof ED> extends SyncRemoteConfigBase<ED, T> {
getRemoteAccessInfo: (id: string) => Promise<RemoteAccessInfo>;
};
interface SyncRemoteConfig<ED extends EntityDict & BaseEntityDict, T extends keyof ED, Cxt extends BackendRuntimeContext<ED>> extends SyncRemoteConfigBase<ED, T> {
getRemoteAccessInfo: (id: string, context: Cxt) => Promise<RemoteAccessInfo>;
};
interface SyncSelfConfigBase<ED extends EntityDict & BaseEntityDict, T extends keyof ED> {
entity: T;
endpoint?: string; // 本结点同步数据的endpoint默认为/sync
};
interface SyncSelfConfigWrapper<ED extends EntityDict & BaseEntityDict, T extends keyof ED> extends SyncSelfConfigBase<ED, T> {
getSelfEncryptInfo: () => Promise<SelfEncryptInfo>;
};
interface SyncSelfConfig<ED extends EntityDict & BaseEntityDict, T extends keyof ED, Cxt extends BackendRuntimeContext<ED>> extends SyncSelfConfigBase<ED, T>{
getSelfEncryptInfo: (context: Cxt) => Promise<SelfEncryptInfo>;
};
export interface SyncConfig<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeContext<ED>> {
self: SyncSelfConfig<ED, keyof ED, Cxt>;
remotes: Array<SyncRemoteConfig<ED, keyof ED, Cxt>>;
};
export interface SyncConfigWrapper<ED extends EntityDict & BaseEntityDict> {
self: SyncSelfConfigWrapper<ED, keyof ED>;
remotes: Array<SyncRemoteConfigWrapper<ED, keyof ED>>;
};