diff --git a/src/AppLoader.ts b/src/AppLoader.ts index c3d87e9..0838a58 100644 --- a/src/AppLoader.ts +++ b/src/AppLoader.ts @@ -18,6 +18,8 @@ import { Server as SocketIoServer, Namespace } from 'socket.io'; import DataSubscriber from './cluster/DataSubscriber'; import { getClusterInfo } from './cluster/env'; +import Synchronizer from './Synchronizer'; +import { SyncConfig } from './types/Sync'; export class AppLoader> extends GeneralAppLoader { @@ -25,6 +27,7 @@ export class AppLoader>; private externalDependencies: string[]; protected dataSubscriber?: DataSubscriber; + protected synchronizer?: Synchronizer; protected contextBuilder: (scene?: string) => (store: DbStore) => Promise; private requireSth(filePath: string): any { @@ -117,7 +120,7 @@ export class AppLoader (store: DbStore) => Promise, ns?: Namespace, nsServer?: Namespace) { + constructor(path: string, contextBuilder: (scene?: string) => (store: DbStore) => Promise, ns?: Namespace, nsServer?: Namespace, syncConfig?: SyncConfig) { super(path); const dbConfig: MySQLConfiguration = require(join(path, '/configuration/mysql.json')); const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`); @@ -126,34 +129,77 @@ export class AppLoader(storageSchema, (cxtStr) => this.makeContext(cxtStr), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict); if (ns) { - this.dataSubscriber = new DataSubscriber(ns, (scene) => this.contextBuilder(scene)(this.dbStore), nsServer); - this.contextBuilder = (scene) => async (store) => { - const context = await contextBuilder(scene)(store); + this.dataSubscriber = new DataSubscriber(ns, (scene) => this.contextBuilder(scene)(this.dbStore), nsServer); + } + if (syncConfig) { + const { + self, remotes + } = syncConfig; + + this.synchronizer = new Synchronizer({ + self: { + entity: self.entity, + getSelfEncryptInfo: async() => { + const context = await contextBuilder()(this.dbStore); + await context.begin(); + try { + const result = await self.getSelfEncryptInfo(context); + await context.commit(); + return result; + } + catch (err) { + await context.rollback(); + throw err; + } + } + }, + remotes: remotes.map( + (r) => ({ + entity: r.entity, + syncEntities: r.syncEntities, + getRemoteAccessInfo: async (id) => { + const context = await contextBuilder()(this.dbStore); + await context.begin(); + try { + const result = await r.getRemoteAccessInfo(id, context); + await context.commit(); + return result; + } + catch (err) { + await context.rollback(); + throw err; + } + } + }) + ) + }, this.dbStore.getSchema()); + } - // 注入在提交前向dataSubscribe - const originCommit = context.commit; - context.commit = async () => { - const { eventOperationMap, opRecords } = context; - await originCommit.call(context); + this.contextBuilder = (scene) => async (store) => { + const context = await contextBuilder(scene)(store); + const originCommit = context.commit; + context.commit = async () => { + const { eventOperationMap, opRecords } = context; + await originCommit.call(context); + + // 注入在提交后向dataSubscribe发送订阅的事件 + if (this.dataSubscriber) { Object.keys(eventOperationMap).forEach( (event) => { const ids = eventOperationMap[event]; - + const opRecordsToPublish = (opRecords as CreateOpResult[]).filter( (ele) => !!ele.id && ids.includes(ele.id) ); assert(opRecordsToPublish.length === ids.length, '要推送的事件的operation数量不足,请检查确保'); this.dataSubscriber!.publishEvent(event, opRecordsToPublish, context.getSubscriberId()); } - ) - }; + ); + } + }; - return context; - } - } - else { - this.contextBuilder = contextBuilder; + return context; } } @@ -183,6 +229,10 @@ export class AppLoader this.dbStore.registerChecker(checker) ); + + if (this.synchronizer) { + // 同步数据到远端结点通过commit trigger来完成 + } } async mount(initialize?: true) { @@ -316,6 +366,11 @@ export class AppLoader> { + private config: SyncConfigWrapper; + private schema: StorageSchema; + + private analyzeConfig() { + const { config, schema } = this; + const { remotes } = config; + + const analyzeSyncEntityDef = (defs: SyncEntityDef[]) => { + const pushEntityDefs = defs.filter(ele => ele.direction === 'push'); + const pushEntities = pushEntityDefs.map(ele => ele.entity); + + // push相关联的entity,在发生操作时,需要将操作推送到远端 + const createOperTrigger: VolatileTrigger = { + name: 'push oper to remote node', + entity: 'oper', + action: 'create', + when: 'commit', + strict: 'makeSure', + check: (operation: ED['oper']['Create']) => { + const { data } = operation; + return pushEntities.includes((data).targetEntity!); + }, + fn: async ({ ids }, context) => { + assert(ids.length === 1); + const [oper] = await context.select('oper', { + data: { + id: 1, + action: 1, + data: 1, + targetEntity: 1, + operEntity$oper: { + $entity: 'operEntity', + data: { + id: 1, + entity: 1, + entityId: 1, + }, + }, + }, + filter: { + id: ids[0], + } + }, { dontCollect: true }); + + const def = pushEntityDefs.find( + ele => ele.entity === oper.targetEntity! + )!; + const { entity, path, relationName, direction } = def; + + // 要找到对应的所有需要推送的node对象信息 + + + + return 1; + } + } + }; + + remotes.forEach( + (remote) => analyzeSyncEntityDef(remote.syncEntities) + ); + } + + constructor(config: SyncConfigWrapper, schema: StorageSchema) { + this.config = config; + this.schema = schema; + } + + /** + * 根据sync的定义,生成对应的 commit triggers + * @returns + */ + getSyncTriggers(): Array> { + + return []; + } + + getSelfEndpoint(): EndpointItem { + return { + name: this.config.self.endpoint || 'sync', + method: 'post', + fn: async (context, params, headers, req, body) => { + + } + }; + } +} \ No newline at end of file diff --git a/src/types/Sync.ts b/src/types/Sync.ts new file mode 100644 index 0000000..7d0db65 --- /dev/null +++ b/src/types/Sync.ts @@ -0,0 +1,59 @@ +import { EntityDict } from 'oak-domain/lib/types'; +import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; +import { BackendRuntimeContext } from 'oak-frontend-base'; + +type RemoteAccessInfo = { + url: string; + publicKey: string; + userId: string; + algorithm: 'rsa' | 'ec' | 'ed25519'; +}; + +type SelfEncryptInfo = { + privateKey: string; + algorithm: 'rsa' | 'ec' | 'ed25519'; +}; + +export interface SyncEntityDef { + entity: T; // 需要同步的entity + path: string; // 此entity到需要同步到的根对象的路径 + relationName?: string; // 要同步的user与根对象的relation名称(为空说明是userId) + direction: 'pull' | 'push'; // pull说明是从远端拉过来,push说明是从本地推过去 +}; + +interface SyncRemoteConfigBase { + entity: T; + endpoint?: string; // 对方结点同步数据的endpoint,默认为/sync + syncEntities: Array>; +}; + +interface SyncRemoteConfigWrapper extends SyncRemoteConfigBase { + getRemoteAccessInfo: (id: string) => Promise; +}; + +interface SyncRemoteConfig> extends SyncRemoteConfigBase { + getRemoteAccessInfo: (id: string, context: Cxt) => Promise; +}; + +interface SyncSelfConfigBase { + entity: T; + endpoint?: string; // 本结点同步数据的endpoint,默认为/sync +}; + +interface SyncSelfConfigWrapper extends SyncSelfConfigBase { + getSelfEncryptInfo: () => Promise; +}; + +interface SyncSelfConfig> extends SyncSelfConfigBase{ + getSelfEncryptInfo: (context: Cxt) => Promise; +}; + +export interface SyncConfig> { + self: SyncSelfConfig; + remotes: Array>; +}; + +export interface SyncConfigWrapper { + self: SyncSelfConfigWrapper; + remotes: Array>; +}; \ No newline at end of file