去掉了maxBornAt的设计
This commit is contained in:
parent
587dc1051f
commit
386f67c5b8
|
|
@ -6,7 +6,6 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
|
||||||
private config;
|
private config;
|
||||||
private schema;
|
private schema;
|
||||||
private remotePullInfoMap;
|
private remotePullInfoMap;
|
||||||
private pullMaxBornAtMap;
|
|
||||||
private channelDict;
|
private channelDict;
|
||||||
private contextBuilder;
|
private contextBuilder;
|
||||||
private pushAccessMap;
|
private pushAccessMap;
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,6 @@ class Synchronizer {
|
||||||
config;
|
config;
|
||||||
schema;
|
schema;
|
||||||
remotePullInfoMap = {};
|
remotePullInfoMap = {};
|
||||||
pullMaxBornAtMap = {};
|
|
||||||
channelDict = {};
|
channelDict = {};
|
||||||
contextBuilder;
|
contextBuilder;
|
||||||
pushAccessMap = {};
|
pushAccessMap = {};
|
||||||
|
|
@ -482,32 +481,26 @@ class Synchronizer {
|
||||||
await context.initialize(cxtInfo);
|
await context.initialize(cxtInfo);
|
||||||
}
|
}
|
||||||
// todo 解密
|
// todo 解密
|
||||||
if (!this.pullMaxBornAtMap.hasOwnProperty(entityId)) {
|
|
||||||
const [maxHisOper] = await context.select('oper', {
|
|
||||||
data: {
|
|
||||||
id: 1,
|
|
||||||
bornAt: 1,
|
|
||||||
},
|
|
||||||
filter: {
|
|
||||||
operatorId: userId,
|
|
||||||
},
|
|
||||||
sorter: [
|
|
||||||
{
|
|
||||||
$attr: {
|
|
||||||
bornAt: 1,
|
|
||||||
},
|
|
||||||
$direction: 'desc',
|
|
||||||
},
|
|
||||||
],
|
|
||||||
indexFrom: 0,
|
|
||||||
count: 1,
|
|
||||||
}, { dontCollect: true });
|
|
||||||
this.pullMaxBornAtMap[entityId] = maxHisOper?.bornAt || 0;
|
|
||||||
}
|
|
||||||
let maxBornAt = this.pullMaxBornAtMap[entityId];
|
|
||||||
const opers = body;
|
const opers = body;
|
||||||
const staleOpers = opers.filter(ele => ele.$$seq$$ <= maxBornAt);
|
const ids = opers.map(ele => ele.id);
|
||||||
const freshOpers = opers.filter(ele => ele.$$seq$$ > maxBornAt);
|
const existeIds = (await context.select('oper', {
|
||||||
|
data: {
|
||||||
|
id: 1,
|
||||||
|
},
|
||||||
|
filter: {
|
||||||
|
id: {
|
||||||
|
$in: ids,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}, {})).map(ele => ele.id);
|
||||||
|
const staleOpers = opers.filter(ele => existeIds.includes(ele.id));
|
||||||
|
const freshOpers = opers.filter(ele => !existeIds.includes(ele.id));
|
||||||
|
if (process.env.NODE_ENV !== 'production') {
|
||||||
|
const maxStaleSeq = Math.max(...staleOpers.map(ele => ele.$$seq$$));
|
||||||
|
for (const oper of freshOpers) {
|
||||||
|
(0, assert_1.default)(oper.$$seq$$ > maxStaleSeq, '发现了seq没有按序进行同步');
|
||||||
|
}
|
||||||
|
}
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
// 无法严格保证推送按bornAt,所以一旦还有outdatedOpers,检查其已经被apply
|
// 无法严格保证推送按bornAt,所以一旦还有outdatedOpers,检查其已经被apply
|
||||||
(async () => {
|
(async () => {
|
||||||
|
|
@ -557,7 +550,6 @@ class Synchronizer {
|
||||||
};
|
};
|
||||||
await context.operate(targetEntity, operation, {});
|
await context.operate(targetEntity, operation, {});
|
||||||
successIds.push(id);
|
successIds.push(id);
|
||||||
maxBornAt = $$seq$$;
|
|
||||||
}
|
}
|
||||||
catch (err) {
|
catch (err) {
|
||||||
console.error(err);
|
console.error(err);
|
||||||
|
|
@ -571,7 +563,6 @@ class Synchronizer {
|
||||||
}
|
}
|
||||||
})()
|
})()
|
||||||
]);
|
]);
|
||||||
this.pullMaxBornAtMap[entityId] = maxBornAt;
|
|
||||||
return {
|
return {
|
||||||
successIds,
|
successIds,
|
||||||
failed,
|
failed,
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,6 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
|
||||||
pullInfo: RemotePullInfo,
|
pullInfo: RemotePullInfo,
|
||||||
pullEntityDict: Record<string, PullEntityDef<ED, keyof ED, Cxt>>;
|
pullEntityDict: Record<string, PullEntityDef<ED, keyof ED, Cxt>>;
|
||||||
}>> = {};
|
}>> = {};
|
||||||
private pullMaxBornAtMap: Record<string, number> = {};
|
|
||||||
private channelDict: Record<string, Channel<ED, Cxt>> = {};
|
private channelDict: Record<string, Channel<ED, Cxt>> = {};
|
||||||
private contextBuilder: () => Promise<Cxt>;
|
private contextBuilder: () => Promise<Cxt>;
|
||||||
|
|
||||||
|
|
@ -640,39 +639,36 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
|
||||||
}
|
}
|
||||||
// todo 解密
|
// todo 解密
|
||||||
|
|
||||||
if (!this.pullMaxBornAtMap.hasOwnProperty(entityId)) {
|
|
||||||
const [maxHisOper] = await context.select('oper', {
|
|
||||||
data: {
|
|
||||||
id: 1,
|
|
||||||
bornAt: 1,
|
|
||||||
},
|
|
||||||
filter: {
|
|
||||||
operatorId: userId,
|
|
||||||
},
|
|
||||||
sorter: [
|
|
||||||
{
|
|
||||||
$attr: {
|
|
||||||
bornAt: 1,
|
|
||||||
},
|
|
||||||
$direction: 'desc',
|
|
||||||
},
|
|
||||||
],
|
|
||||||
indexFrom: 0,
|
|
||||||
count: 1,
|
|
||||||
}, { dontCollect: true });
|
|
||||||
this.pullMaxBornAtMap[entityId] = maxHisOper?.bornAt as number || 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
let maxBornAt = this.pullMaxBornAtMap[entityId]!;
|
|
||||||
const opers = body as ED['oper']['Schema'][];
|
const opers = body as ED['oper']['Schema'][];
|
||||||
|
|
||||||
|
const ids = opers.map(ele => ele.id!);
|
||||||
|
|
||||||
|
const existeIds = (await context.select('oper', {
|
||||||
|
data: {
|
||||||
|
id: 1,
|
||||||
|
},
|
||||||
|
filter: {
|
||||||
|
id: {
|
||||||
|
$in: ids,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}, {})).map(ele => ele.id!);
|
||||||
|
|
||||||
const staleOpers = opers.filter(
|
const staleOpers = opers.filter(
|
||||||
ele => ele.$$seq$$ <= maxBornAt
|
ele => existeIds.includes(ele.id)
|
||||||
);
|
);
|
||||||
const freshOpers = opers.filter(
|
const freshOpers = opers.filter(
|
||||||
ele => ele.$$seq$$ as number > maxBornAt
|
ele => !existeIds.includes(ele.id)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
if (process.env.NODE_ENV !== 'production') {
|
||||||
|
const maxStaleSeq = Math.max(...staleOpers.map(ele => ele.$$seq$$!));
|
||||||
|
|
||||||
|
for (const oper of freshOpers) {
|
||||||
|
assert(oper.$$seq$$ > maxStaleSeq, '发现了seq没有按序进行同步');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
await Promise.all(
|
await Promise.all(
|
||||||
[
|
[
|
||||||
// 无法严格保证推送按bornAt,所以一旦还有outdatedOpers,检查其已经被apply
|
// 无法严格保证推送按bornAt,所以一旦还有outdatedOpers,检查其已经被apply
|
||||||
|
|
@ -726,7 +722,6 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
|
||||||
};
|
};
|
||||||
await context.operate(targetEntity, operation, {});
|
await context.operate(targetEntity, operation, {});
|
||||||
successIds.push(id);
|
successIds.push(id);
|
||||||
maxBornAt = $$seq$$;
|
|
||||||
}
|
}
|
||||||
catch (err: any) {
|
catch (err: any) {
|
||||||
console.error(err);
|
console.error(err);
|
||||||
|
|
@ -742,7 +737,6 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
|
||||||
]
|
]
|
||||||
);
|
);
|
||||||
|
|
||||||
this.pullMaxBornAtMap[entityId] = maxBornAt;
|
|
||||||
return {
|
return {
|
||||||
successIds,
|
successIds,
|
||||||
failed,
|
failed,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue