修正同步报错回调时加上awiat

This commit is contained in:
wkj 2024-12-10 10:48:59 +08:00
parent f50a3553cc
commit b3d858041b
3 changed files with 38 additions and 24 deletions

View File

@ -154,13 +154,15 @@ class AppLoader extends types_1.AppLoader {
if (this.watcherTimerId) {
console.log('取消watcher...');
clearTimeout(this.watcherTimerId);
this.watcherTimerId = undefined;
}
for (const job in this.scheduledJobs) {
console.log(`取消定时任务【${job}】...`);
this.scheduledJobs[job].cancel();
await this.scheduledJobs[job]?.cancel();
delete this.scheduledJobs[job];
}
(0, index_1.clearPorts)();
this.dbStore.disconnect();
await (0, index_1.clearPorts)();
await this.dbStore.disconnect();
}
async execAspect(name, headers, contextString, params) {
// 从aspect过来的不能有空cxtString以防被误判为root
@ -239,7 +241,7 @@ class AppLoader extends types_1.AppLoader {
}
}
}
this.dbStore.disconnect();
await this.dbStore.disconnect();
}
getStore() {
return this.dbStore;
@ -424,6 +426,10 @@ class AppLoader extends types_1.AppLoader {
}
}
});
if (!job) {
// console.error(`定时器【${name}】创建失败请检查cron表达式是否正确`);
throw new Error(`定时器【${name}】创建失败请检查cron表达式是否正确`);
}
if (this.scheduledJobs[name]) {
// console.error(`定时器【${name}】已经存在,请检查定时器名称是否重复`);
throw new Error(`定时器【${name}】已经存在,请检查定时器名称是否重复`);

View File

@ -66,7 +66,7 @@ class Synchronizer {
contextBuilder;
pushAccessMap = {};
async startChannel2(context, channel) {
const { queue, api, selfEncryptInfo, entity, entityId, onFailed, timeout } = channel;
const { queue, api, selfEncryptInfo, entity, entityId, onFailed, timeout = 5000 } = channel;
// todo 加密
const opers = queue.map(ele => ele.oper);
if (process.env.NODE_ENV === 'development') {
@ -91,7 +91,7 @@ class Synchronizer {
[OAK_SYNC_HEADER_SIGN]: signature,
},
body,
}, timeout || 5000);
}, timeout);
if (res.status !== 200) {
throw new Error(`sync数据时访问api「${finalApi}」的结果不是200。「${res.status}`);
}
@ -104,23 +104,23 @@ class Synchronizer {
if (onFailed) {
context.on('rollback', async () => {
const context2 = this.contextBuilder();
context2.begin();
await context2.begin();
try {
await onFailed({
remoteEntity: entity,
remoteEntityId: entityId,
data: queue.map((ele) => ({
entity: ele.oper.targetEntity,
rowIds: ele.oper.filter.id.$in, // 暂时应该没什么用
rowIds: ele.oper.filter.id.$in,
action: ele.oper.action,
data: ele.oper.data,
})),
reason: err,
}, context2);
context2.commit();
await context2.commit();
}
catch (err) {
context2.rollback();
await context2.rollback();
}
});
}
@ -526,7 +526,7 @@ class Synchronizer {
this.remotePullInfoMap[entity] = {};
}
if (!this.remotePullInfoMap[entity][entityId]) {
const { getPullInfo, pullEntities } = this.config.remotes.find(ele => ele.entity === entity);
const { getPullInfo, pullEntities, clockDriftDuration } = this.config.remotes.find(ele => ele.entity === entity);
const pullEntityDict = {};
if (pullEntities) {
pullEntities.forEach((def) => pullEntityDict[def.entity] = def);
@ -538,10 +538,11 @@ class Synchronizer {
remoteEntityId: entityId,
}),
pullEntityDict,
clockDriftDuration,
};
closeFn();
}
const { pullInfo, pullEntityDict } = this.remotePullInfoMap[entity][entityId];
const { pullInfo, pullEntityDict, clockDriftDuration } = this.remotePullInfoMap[entity][entityId];
const { userId, algorithm, publicKey, cxtInfo } = pullInfo;
(0, assert_1.default)(userId);
context.setCurrentUserId(userId);
@ -549,8 +550,10 @@ class Synchronizer {
await context.initialize(cxtInfo);
}
const syncTimestamp = parseInt(syncTs, 10);
if (!(Date.now() - syncTimestamp < 10000)) {
throw new Error('同步时钟漂移过长');
if (clockDriftDuration !== 0) {
if (!(Date.now() - syncTimestamp < (clockDriftDuration || 10000))) {
throw new types_1.OakClockDriftException('同步时钟漂移过长');
}
}
if (!verify(publicKey, JSON.stringify(body), syncTs, syncNonce, syncSign)) {
throw new Error('sync验签失败');

View File

@ -2,7 +2,7 @@ import { createSign, createVerify } from 'crypto';
import {
EntityDict, StorageSchema, EndpointItem, RemotePullInfo, SelfEncryptInfo,
RemotePushInfo, PushEntityDef, PullEntityDef, SyncConfig, TriggerDataAttribute, TriggerUuidAttribute,
SyncRemoteConfig, OakPartialSuccess, OakRequestTimeoutException
SyncRemoteConfig, OakPartialSuccess, OakRequestTimeoutException, OakClockDriftException
} from 'oak-domain/lib/types';
import { VolatileTrigger } from 'oak-domain/lib/types/Trigger';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
@ -33,6 +33,7 @@ type Channel<ED extends EntityDict & BaseEntityDict, Cxt extends BackendRuntimeC
selfEncryptInfo: SelfEncryptInfo;
onFailed?: SyncRemoteConfig<ED, Cxt>['onFailed'];
timeout?: SyncRemoteConfig<ED, Cxt>['timeout'];
clockDriftDuration?: SyncRemoteConfig<ED, Cxt>['clockDriftDuration'];
};
function generateSignStr(body: string, ts: string, nonce: string) {
@ -91,6 +92,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
private remotePullInfoMap: Record<string, Record<string, {
pullInfo: RemotePullInfo,
pullEntityDict: Record<string, PullEntityDef<ED, keyof ED, Cxt>>;
clockDriftDuration?: SyncRemoteConfig<ED, Cxt>['clockDriftDuration'];
}>> = {};
private channelDict: Record<string, Channel<ED, Cxt>> = {};
private contextBuilder: () => Cxt;
@ -113,7 +115,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
}>> = {};
private async startChannel2(context: Cxt, channel: Channel<ED, Cxt>) {
const { queue, api, selfEncryptInfo, entity, entityId, onFailed, timeout } = channel;
const { queue, api, selfEncryptInfo, entity, entityId, onFailed, timeout = 5000 } = channel;
// todo 加密
const opers = queue.map(ele => ele.oper);
@ -140,7 +142,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
[OAK_SYNC_HEADER_SIGN]: signature,
},
body,
}, timeout || 5000);
}, timeout);
if (res.status !== 200) {
throw new Error(`sync数据时访问api「${finalApi}」的结果不是200。「${res.status}`);
@ -155,7 +157,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
if (onFailed) {
context.on('rollback', async () => {
const context2 = this.contextBuilder();
context2.begin();
await context2.begin();
try {
await onFailed({
remoteEntity: entity!,
@ -170,10 +172,10 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
),
reason: err,
}, context2);
context2.commit();
await context2.commit();
}
catch (err) {
context2.rollback();
await context2.rollback();
}
});
}
@ -675,7 +677,7 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
this.remotePullInfoMap[entity] = {};
}
if (!this.remotePullInfoMap[entity]![entityId]) {
const { getPullInfo, pullEntities } = this.config.remotes.find(ele => ele.entity === entity)!;
const { getPullInfo, pullEntities, clockDriftDuration } = this.config.remotes.find(ele => ele.entity === entity)!;
const pullEntityDict = {} as Record<string, PullEntityDef<ED, keyof ED, Cxt>>;
if (pullEntities) {
pullEntities.forEach(
@ -689,11 +691,12 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
remoteEntityId: entityId,
}),
pullEntityDict,
clockDriftDuration,
};
closeFn();
}
const { pullInfo, pullEntityDict } = this.remotePullInfoMap[entity][entityId]!;
const { pullInfo, pullEntityDict, clockDriftDuration } = this.remotePullInfoMap[entity][entityId]!;
const { userId, algorithm, publicKey, cxtInfo } = pullInfo;
assert(userId);
context.setCurrentUserId(userId);
@ -702,8 +705,10 @@ export default class Synchronizer<ED extends EntityDict & BaseEntityDict, Cxt ex
}
const syncTimestamp = parseInt(syncTs as string, 10);
if (!(Date.now() - syncTimestamp < 10000)) {
throw new Error('同步时钟漂移过长');
if (clockDriftDuration !== 0) {
if (!(Date.now() - syncTimestamp < (clockDriftDuration || 10000))) {
throw new OakClockDriftException('同步时钟漂移过长');
}
}
if (!verify(publicKey, JSON.stringify(body), syncTs as string, syncNonce as string, syncSign as string)) {