初始化流程有个先后顺序问题
This commit is contained in:
parent
1133e656a9
commit
030bb71a20
|
|
@ -158,7 +158,6 @@ class AppLoader extends types_1.AppLoader {
|
|||
async mount(initialize) {
|
||||
const { path } = this;
|
||||
if (!initialize) {
|
||||
this.initTriggers();
|
||||
const { dbConfig, syncConfig } = this.getConfiguration();
|
||||
if (syncConfig) {
|
||||
const { self, remotes } = syncConfig;
|
||||
|
|
@ -215,6 +214,7 @@ class AppLoader extends types_1.AppLoader {
|
|||
})
|
||||
}, this.dbStore.getSchema());
|
||||
}
|
||||
this.initTriggers();
|
||||
}
|
||||
const { importations, exportations } = require(`${path}/lib/ports/index`);
|
||||
(0, index_1.registerPorts)(importations || [], exportations || []);
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
|
|||
* 其实这里还无法严格保证先产生的oper一定先到达被推送,因为volatile trigger是在事务提交后再发生的,但这种情况在目前应该跑不出来,在实际执行oper的时候assert掉先。by Xc 20240226
|
||||
*/
|
||||
private pushOper;
|
||||
private loadPublicKey;
|
||||
private getSelfEncryptInfo;
|
||||
private makeCreateOperTrigger;
|
||||
constructor(config: SyncConfigWrapper<ED, Cxt>, schema: StorageSchema<ED>);
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -25,19 +25,19 @@ class Synchronizer {
|
|||
// 失败重试的间隔,失败次数多了应当适当延长,最多延长到1024秒
|
||||
let nextPushTimestamp2 = typeof retry === 'number' ? Math.pow(2, Math.min(retry, 10)) : 1;
|
||||
channel.nextPushTimestamp = nextPushTimestamp2 * 1000 + Date.now();
|
||||
(0, assert_1.default)(this.selfEncryptInfo);
|
||||
const opers = queue.map(ele => ele.oper);
|
||||
let restOpers = [];
|
||||
let needRetry = false;
|
||||
let json;
|
||||
try {
|
||||
// todo 加密
|
||||
const selfEncryptInfo = await this.getSelfEncryptInfo();
|
||||
console.log('向远端结点sync数据', api, JSON.stringify(opers));
|
||||
const res = await fetch(api, {
|
||||
method: 'post',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
[OAK_SYNC_HEADER_ITEM]: this.selfEncryptInfo.id,
|
||||
[OAK_SYNC_HEADER_ITEM]: selfEncryptInfo.id,
|
||||
},
|
||||
body: JSON.stringify(opers),
|
||||
});
|
||||
|
|
@ -125,7 +125,6 @@ class Synchronizer {
|
|||
if (!channel.handler) {
|
||||
channel.nextPushTimestamp = nextPushTimestamp2;
|
||||
channel.handler = setTimeout(async () => {
|
||||
(0, assert_1.default)(this.selfEncryptInfo);
|
||||
await this.pushOnChannel(channel);
|
||||
}, nextPushTimestamp2 - now);
|
||||
}
|
||||
|
|
@ -140,8 +139,12 @@ class Synchronizer {
|
|||
console.warn('在sync数据时,遇到了重复推送的oper', JSON.stringify(oper), userId, url);
|
||||
}
|
||||
}
|
||||
async loadPublicKey() {
|
||||
async getSelfEncryptInfo() {
|
||||
if (this.selfEncryptInfo) {
|
||||
return this.selfEncryptInfo;
|
||||
}
|
||||
this.selfEncryptInfo = await this.config.self.getSelfEncryptInfo();
|
||||
return this.selfEncryptInfo;
|
||||
}
|
||||
makeCreateOperTrigger() {
|
||||
const { config } = this;
|
||||
|
|
@ -308,7 +311,6 @@ class Synchronizer {
|
|||
constructor(config, schema) {
|
||||
this.config = config;
|
||||
this.schema = schema;
|
||||
this.loadPublicKey();
|
||||
}
|
||||
/**
|
||||
* 根据sync的定义,生成对应的 commit triggers
|
||||
|
|
|
|||
|
|
@ -214,7 +214,6 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
async mount(initialize?: true) {
|
||||
const { path } = this;
|
||||
if (!initialize) {
|
||||
this.initTriggers();
|
||||
const { dbConfig, syncConfig } = this.getConfiguration();
|
||||
|
||||
if (syncConfig) {
|
||||
|
|
@ -278,6 +277,8 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
|
|||
)
|
||||
}, this.dbStore.getSchema());
|
||||
}
|
||||
|
||||
this.initTriggers();
|
||||
}
|
||||
const { importations, exportations } = require(`${path}/lib/ports/index`);
|
||||
registerPorts(importations || [], exportations || []);
|
||||
|
|
|
|||
|
|
@ -47,7 +47,6 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
|
|||
let nextPushTimestamp2 = typeof retry === 'number' ? Math.pow(2, Math.min(retry, 10)) : 1;
|
||||
channel.nextPushTimestamp = nextPushTimestamp2 * 1000 + Date.now();
|
||||
|
||||
assert(this.selfEncryptInfo);
|
||||
const opers = queue.map(ele => ele.oper);
|
||||
|
||||
let restOpers = [] as typeof queue;
|
||||
|
|
@ -60,12 +59,13 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
|
|||
};
|
||||
try {
|
||||
// todo 加密
|
||||
const selfEncryptInfo = await this.getSelfEncryptInfo();
|
||||
console.log('向远端结点sync数据', api, JSON.stringify(opers));
|
||||
const res = await fetch(api, {
|
||||
method: 'post',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
[OAK_SYNC_HEADER_ITEM]: this.selfEncryptInfo!.id,
|
||||
[OAK_SYNC_HEADER_ITEM]: selfEncryptInfo!.id,
|
||||
},
|
||||
body: JSON.stringify(opers),
|
||||
});
|
||||
|
|
@ -168,7 +168,6 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
|
|||
if (!channel.handler) {
|
||||
channel.nextPushTimestamp = nextPushTimestamp2;
|
||||
channel.handler = setTimeout(async () => {
|
||||
assert(this.selfEncryptInfo);
|
||||
await this.pushOnChannel(channel);
|
||||
}, nextPushTimestamp2 - now);
|
||||
}
|
||||
|
|
@ -185,8 +184,12 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
|
|||
}
|
||||
}
|
||||
|
||||
private async loadPublicKey() {
|
||||
private async getSelfEncryptInfo() {
|
||||
if (this.selfEncryptInfo) {
|
||||
return this.selfEncryptInfo;
|
||||
}
|
||||
this.selfEncryptInfo = await this.config.self.getSelfEncryptInfo();
|
||||
return this.selfEncryptInfo!;
|
||||
}
|
||||
|
||||
private makeCreateOperTrigger() {
|
||||
|
|
@ -389,7 +392,6 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
|
|||
constructor(config: SyncConfigWrapper<ED, Cxt>, schema: StorageSchema<ED>) {
|
||||
this.config = config;
|
||||
this.schema = schema;
|
||||
this.loadPublicKey();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Reference in New Issue