Sync相关的数据结构重新设计和定义

This commit is contained in:
Xu Chang 2024-03-01 16:20:30 +08:00
parent e1fa4efd73
commit 02307ac4db
8 changed files with 38 additions and 25 deletions

View File

@ -50,7 +50,7 @@ export declare abstract class AsyncContext<ED extends EntityDict> implements Con
abstract getCurrentUserId(allowUnloggedIn?: boolean): string | undefined; abstract getCurrentUserId(allowUnloggedIn?: boolean): string | undefined;
abstract setCurrentUserId(userId: string | undefined): void; abstract setCurrentUserId(userId: string | undefined): void;
abstract toString(): Promise<string>; abstract toString(): Promise<string>;
abstract initialize(data: any): Promise<void>; abstract initialize(data: any, later?: boolean): Promise<void>;
abstract allowUserUpdate(): boolean; abstract allowUserUpdate(): boolean;
abstract openRootMode(): () => void; abstract openRootMode(): () => void;
} }

View File

@ -153,7 +153,7 @@ class TriggerExecutor {
(0, assert_1.default)(trigger.when === 'commit'); (0, assert_1.default)(trigger.when === 'commit');
if (trigger.strict === 'makeSure') { if (trigger.strict === 'makeSure') {
const uuid = await (0, uuid_1.generateNewIdAsync)(); const uuid = await (0, uuid_1.generateNewIdAsync)();
const cxtStr = context.toString(); const cxtStr = await context.toString();
const { data } = operation; const { data } = operation;
switch (operation.action) { switch (operation.action) {
case 'create': { case 'create': {
@ -193,7 +193,7 @@ class TriggerExecutor {
Object.assign(d, { Object.assign(d, {
[Entity_1.TriggerDataAttribute]: { [Entity_1.TriggerDataAttribute]: {
name: trigger.name, name: trigger.name,
cxtStr: context.toString(), cxtStr,
option, option,
}, },
[Entity_1.TriggerUuidAttribute]: uuid, [Entity_1.TriggerUuidAttribute]: uuid,
@ -445,14 +445,14 @@ class TriggerExecutor {
}, { }, {
includedDeleted: true, includedDeleted: true,
dontCollect: true, dontCollect: true,
forUpdate: true, forUpdate: 'skip locked', // 防止某个跨事务trigger的逻辑执行周期太长
}); });
const grouped = (0, lodash_1.groupBy)(rows, Entity_1.TriggerUuidAttribute); const grouped = (0, lodash_1.groupBy)(rows, Entity_1.TriggerUuidAttribute);
for (const uuid in grouped) { for (const uuid in grouped) {
const rs = grouped[uuid]; const rs = grouped[uuid];
const { [Entity_1.TriggerDataAttribute]: triggerData } = rs[0]; const { [Entity_1.TriggerDataAttribute]: triggerData } = rs[0];
const { name, cxtStr, option } = triggerData; const { name, cxtStr, option } = triggerData;
// await context.initialize(JSON.parse(cxtStr)); // 这里token有可能过期用户注销先用root态模拟吧 await context.initialize(JSON.parse(cxtStr), true);
await this.execVolatileTrigger(entity, name, rs.map(ele => ele.id), context, option); await this.execVolatileTrigger(entity, name, rs.map(ele => ele.id), context, option);
} }
await context.commit(); await context.commit();

View File

@ -26,7 +26,7 @@ export type SelectOption = {
dontCollect?: boolean; dontCollect?: boolean;
blockTrigger?: true; blockTrigger?: true;
obscure?: boolean; obscure?: boolean;
forUpdate?: true; forUpdate?: true | 'skip locked' | 'no wait';
includedDeleted?: true; includedDeleted?: true;
ignoreAttrMiss?: true; ignoreAttrMiss?: true;
dummy?: 1; dummy?: 1;

14
lib/types/Sync.d.ts vendored
View File

@ -11,6 +11,7 @@ export type RemotePullInfo = {
publicKey: string; publicKey: string;
algorithm: Algorithm; algorithm: Algorithm;
userId: string; userId: string;
cxtInfo?: any;
}; };
export type SelfEncryptInfo = { export type SelfEncryptInfo = {
id: string; id: string;
@ -43,7 +44,6 @@ export interface PushEntityDef<ED extends EntityDict & BaseEntityDict, T extends
} }
export interface SyncRemoteConfigBase<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> { export interface SyncRemoteConfigBase<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> {
entity: keyof ED; entity: keyof ED;
entitySelf?: keyof ED;
endpoint?: string; endpoint?: string;
pathToUser?: string; pathToUser?: string;
relationName?: string; relationName?: string;
@ -51,12 +51,18 @@ export interface SyncRemoteConfigBase<ED extends EntityDict & BaseEntityDict, Cx
pullEntities?: Array<PullEntityDef<ED, keyof ED, Cxt>>; pullEntities?: Array<PullEntityDef<ED, keyof ED, Cxt>>;
} }
interface SyncRemoteConfig<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends SyncRemoteConfigBase<ED, Cxt> { interface SyncRemoteConfig<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends SyncRemoteConfigBase<ED, Cxt> {
getPushInfo: (userId: string, context: Cxt) => Promise<RemotePushInfo>; getPushInfo: (context: Cxt, option: {
getPullInfo: (id: string, context: Cxt) => Promise<RemotePullInfo>; remoteEntityId: string;
userId: string;
}) => Promise<RemotePushInfo>;
getPullInfo: (context: Cxt, option: {
selfId: string;
remoteEntityId: string;
}) => Promise<RemotePullInfo>;
} }
export interface SyncSelfConfigBase<ED extends EntityDict & BaseEntityDict> { export interface SyncSelfConfigBase<ED extends EntityDict & BaseEntityDict> {
endpoint?: string; endpoint?: string;
entitySelf: keyof ED; entity: keyof ED;
} }
interface SyncSelfConfig<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends SyncSelfConfigBase<ED> { interface SyncSelfConfig<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends SyncSelfConfigBase<ED> {
getSelfEncryptInfo: (context: Cxt) => Promise<SelfEncryptInfo>; getSelfEncryptInfo: (context: Cxt) => Promise<SelfEncryptInfo>;

View File

@ -225,7 +225,8 @@ export abstract class AsyncContext<ED extends EntityDict> implements Context {
abstract toString(): Promise<string>; abstract toString(): Promise<string>;
// 此接口将字符串parse成对象再进行初始化 // 此接口将字符串parse成对象再进行初始化
abstract initialize(data: any): Promise<void>; // later表示允许延时状态上下文要处理在时间维度上可能的异常比如用户token已经注销等
abstract initialize(data: any, later?: boolean): Promise<void>;
abstract allowUserUpdate(): boolean; abstract allowUserUpdate(): boolean;

View File

@ -206,7 +206,7 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
assert(trigger.when === 'commit'); assert(trigger.when === 'commit');
if (trigger.strict === 'makeSure') { if (trigger.strict === 'makeSure') {
const uuid = await generateNewIdAsync(); const uuid = await generateNewIdAsync();
const cxtStr = context.toString(); const cxtStr = await context.toString();
const { data } = operation; const { data } = operation;
switch (operation.action) { switch (operation.action) {
case 'create': { case 'create': {
@ -251,7 +251,7 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
Object.assign(d, { Object.assign(d, {
[TriggerDataAttribute]: { [TriggerDataAttribute]: {
name: trigger.name, name: trigger.name,
cxtStr: context.toString(), cxtStr,
option, option,
}, },
[TriggerUuidAttribute]: uuid, [TriggerUuidAttribute]: uuid,
@ -554,7 +554,7 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
} as any, { } as any, {
includedDeleted: true, includedDeleted: true,
dontCollect: true, dontCollect: true,
forUpdate: true, forUpdate: 'skip locked', // 防止某个跨事务trigger的逻辑执行周期太长
}); });
const grouped = groupBy(rows, TriggerUuidAttribute); const grouped = groupBy(rows, TriggerUuidAttribute);
@ -563,7 +563,7 @@ export class TriggerExecutor<ED extends EntityDict & BaseEntityDict, Cxt extends
const rs = grouped[uuid]; const rs = grouped[uuid];
const { [TriggerDataAttribute]: triggerData } = rs[0]; const { [TriggerDataAttribute]: triggerData } = rs[0];
const { name, cxtStr, option } = triggerData!; const { name, cxtStr, option } = triggerData!;
// await context.initialize(JSON.parse(cxtStr)); // 这里token有可能过期用户注销先用root态模拟吧 await context.initialize(JSON.parse(cxtStr), true);
await this.execVolatileTrigger(entity, name, rs.map(ele => ele.id!), context, option); await this.execVolatileTrigger(entity, name, rs.map(ele => ele.id!), context, option);
} }
await context.commit(); await context.commit();

View File

@ -30,11 +30,11 @@ type FilterPart<A extends string, F extends Object | undefined> = {
export type SelectOption = { export type SelectOption = {
dontCollect?: boolean; dontCollect?: boolean;
blockTrigger?: true; blockTrigger?: true;
obscure?: boolean; // 如果为置为true则在filter过程中因数据不完整而不能判断为真的时候都假设为真前端缓存专用 obscure?: boolean; // 如果为置为true则在filter过程中因数据不完整而不能判断为真的时候都假设为真前端缓存专用
forUpdate?: true; forUpdate?: true | 'skip locked' | 'no wait'; // mysql 8.0以上支持的加锁方式
includedDeleted?: true; // 是否包含删除行的信息 includedDeleted?: true; // 是否包含删除行的信息
ignoreAttrMiss?: true; // 作为cache时是否允许属性缺失 ignoreAttrMiss?: true; // 作为cache时是否允许属性缺失
dummy?: 1; // 无用为了继承Option通过编译 dummy?: 1; // 无用为了继承Option通过编译
}; };
export type OperateOption = { export type OperateOption = {

View File

@ -14,6 +14,7 @@ export type RemotePullInfo = {
publicKey: string; publicKey: string;
algorithm: Algorithm; algorithm: Algorithm;
userId: string; userId: string;
cxtInfo?: any;
}; };
export type SelfEncryptInfo = { export type SelfEncryptInfo = {
@ -53,8 +54,7 @@ export interface PushEntityDef<ED extends EntityDict & BaseEntityDict, T extends
export interface SyncRemoteConfigBase<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> { export interface SyncRemoteConfigBase<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> {
entity: keyof ED; // 对方结点所关联的entity名称 entity: keyof ED; // 对方结点所关联的entity名称两边一致
entitySelf?: keyof ED; // 自己在对方结点上所定义的entity名称如果不配置则使用selfConfigBase上的entitySelf
endpoint?: string; // 对方结点同步数据的endpoint默认为/sync/:entity endpoint?: string; // 对方结点同步数据的endpoint默认为/sync/:entity
pathToUser?: string; // entity到对应remote user的路径如果remote user和enitity之间是relation关系则为空 pathToUser?: string; // entity到对应remote user的路径如果remote user和enitity之间是relation关系则为空
relationName?: string; // 如果remote user和entity之间是relation关系此处表达的是relation名称 relationName?: string; // 如果remote user和entity之间是relation关系此处表达的是relation名称
@ -63,13 +63,19 @@ export interface SyncRemoteConfigBase<ED extends EntityDict & BaseEntityDict, Cx
}; };
interface SyncRemoteConfig<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends SyncRemoteConfigBase<ED, Cxt> { interface SyncRemoteConfig<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends SyncRemoteConfigBase<ED, Cxt> {
getPushInfo: (userId: string, context: Cxt) => Promise<RemotePushInfo>; getPushInfo: (context: Cxt, option: {
getPullInfo: (id: string, context: Cxt) => Promise<RemotePullInfo>; remoteEntityId: string;
userId: string;
}) => Promise<RemotePushInfo>;
getPullInfo: (context: Cxt, option: {
selfId: string,
remoteEntityId: string,
}) => Promise<RemotePullInfo>;
}; };
export interface SyncSelfConfigBase<ED extends EntityDict & BaseEntityDict> { export interface SyncSelfConfigBase<ED extends EntityDict & BaseEntityDict> {
endpoint?: string; // 本结点同步数据的endpoint默认为/sync endpoint?: string; // 本结点同步数据的endpoint默认为/sync
entitySelf: keyof ED; // 自己在对方结点上的默认entity名称 entity: keyof ED; // 本方结点所关联的entity名称两边一致
}; };
interface SyncSelfConfig<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends SyncSelfConfigBase<ED>{ interface SyncSelfConfig<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends SyncSelfConfigBase<ED>{