diff --git a/lib/AppLoader.js b/lib/AppLoader.js index c99e341..99655f7 100644 --- a/lib/AppLoader.js +++ b/lib/AppLoader.js @@ -154,13 +154,15 @@ class AppLoader extends types_1.AppLoader { if (this.watcherTimerId) { console.log('取消watcher...'); clearTimeout(this.watcherTimerId); + this.watcherTimerId = undefined; } for (const job in this.scheduledJobs) { console.log(`取消定时任务【${job}】...`); - this.scheduledJobs[job].cancel(); + await this.scheduledJobs[job]?.cancel(); + delete this.scheduledJobs[job]; } - (0, index_1.clearPorts)(); - this.dbStore.disconnect(); + await (0, index_1.clearPorts)(); + await this.dbStore.disconnect(); } async execAspect(name, headers, contextString, params) { // 从aspect过来的,不能有空cxtString,以防被误判为root @@ -239,7 +241,7 @@ class AppLoader extends types_1.AppLoader { } } } - this.dbStore.disconnect(); + await this.dbStore.disconnect(); } getStore() { return this.dbStore; @@ -424,6 +426,10 @@ class AppLoader extends types_1.AppLoader { } } }); + if (!job) { + // console.error(`定时器【${name}】创建失败,请检查cron表达式是否正确`); + throw new Error(`定时器【${name}】创建失败,请检查cron表达式是否正确`); + } if (this.scheduledJobs[name]) { // console.error(`定时器【${name}】已经存在,请检查定时器名称是否重复`); throw new Error(`定时器【${name}】已经存在,请检查定时器名称是否重复`); diff --git a/lib/Synchronizer.js b/lib/Synchronizer.js index d5c9391..b380dc3 100644 --- a/lib/Synchronizer.js +++ b/lib/Synchronizer.js @@ -66,7 +66,7 @@ class Synchronizer { contextBuilder; pushAccessMap = {}; async startChannel2(context, channel) { - const { queue, api, selfEncryptInfo, entity, entityId, onFailed, timeout } = channel; + const { queue, api, selfEncryptInfo, entity, entityId, onFailed, timeout = 5000 } = channel; // todo 加密 const opers = queue.map(ele => ele.oper); if (process.env.NODE_ENV === 'development') { @@ -91,7 +91,7 @@ class Synchronizer { [OAK_SYNC_HEADER_SIGN]: signature, }, body, - }, timeout || 5000); + }, timeout); if (res.status !== 200) { throw new Error(`sync数据时,访问api「${finalApi}」的结果不是200。「${res.status}」`); } @@ -104,23 +104,23 @@ class Synchronizer { if (onFailed) { context.on('rollback', async () => { const context2 = this.contextBuilder(); - context2.begin(); + await context2.begin(); try { await onFailed({ remoteEntity: entity, remoteEntityId: entityId, data: queue.map((ele) => ({ entity: ele.oper.targetEntity, - rowIds: ele.oper.filter.id.$in, // 暂时应该没什么用 + rowIds: ele.oper.filter.id.$in, action: ele.oper.action, data: ele.oper.data, })), reason: err, }, context2); - context2.commit(); + await context2.commit(); } catch (err) { - context2.rollback(); + await context2.rollback(); } }); } @@ -526,7 +526,7 @@ class Synchronizer { this.remotePullInfoMap[entity] = {}; } if (!this.remotePullInfoMap[entity][entityId]) { - const { getPullInfo, pullEntities } = this.config.remotes.find(ele => ele.entity === entity); + const { getPullInfo, pullEntities, clockDriftDuration } = this.config.remotes.find(ele => ele.entity === entity); const pullEntityDict = {}; if (pullEntities) { pullEntities.forEach((def) => pullEntityDict[def.entity] = def); @@ -538,10 +538,11 @@ class Synchronizer { remoteEntityId: entityId, }), pullEntityDict, + clockDriftDuration, }; closeFn(); } - const { pullInfo, pullEntityDict } = this.remotePullInfoMap[entity][entityId]; + const { pullInfo, pullEntityDict, clockDriftDuration } = this.remotePullInfoMap[entity][entityId]; const { userId, algorithm, publicKey, cxtInfo } = pullInfo; (0, assert_1.default)(userId); context.setCurrentUserId(userId); @@ -549,8 +550,10 @@ class Synchronizer { await context.initialize(cxtInfo); } const syncTimestamp = parseInt(syncTs, 10); - if (!(Date.now() - syncTimestamp < 10000)) { - throw new Error('同步时钟漂移过长'); + if (clockDriftDuration !== 0) { + if (!(Date.now() - syncTimestamp < (clockDriftDuration || 10000))) { + throw new types_1.OakClockDriftException('同步时钟漂移过长'); + } } if (!verify(publicKey, JSON.stringify(body), syncTs, syncNonce, syncSign)) { throw new Error('sync验签失败'); diff --git a/src/Synchronizer.ts b/src/Synchronizer.ts index 950a464..67f883c 100644 --- a/src/Synchronizer.ts +++ b/src/Synchronizer.ts @@ -2,7 +2,7 @@ import { createSign, createVerify } from 'crypto'; import { EntityDict, StorageSchema, EndpointItem, RemotePullInfo, SelfEncryptInfo, RemotePushInfo, PushEntityDef, PullEntityDef, SyncConfig, TriggerDataAttribute, TriggerUuidAttribute, - SyncRemoteConfig, OakPartialSuccess, OakRequestTimeoutException + SyncRemoteConfig, OakPartialSuccess, OakRequestTimeoutException, OakClockDriftException } from 'oak-domain/lib/types'; import { VolatileTrigger } from 'oak-domain/lib/types/Trigger'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; @@ -33,6 +33,7 @@ type Channel['onFailed']; timeout?: SyncRemoteConfig['timeout']; + clockDriftDuration?: SyncRemoteConfig['clockDriftDuration']; }; function generateSignStr(body: string, ts: string, nonce: string) { @@ -91,6 +92,7 @@ export default class Synchronizer>; + clockDriftDuration?: SyncRemoteConfig['clockDriftDuration']; }>> = {}; private channelDict: Record> = {}; private contextBuilder: () => Cxt; @@ -113,7 +115,7 @@ export default class Synchronizer> = {}; private async startChannel2(context: Cxt, channel: Channel) { - const { queue, api, selfEncryptInfo, entity, entityId, onFailed, timeout } = channel; + const { queue, api, selfEncryptInfo, entity, entityId, onFailed, timeout = 5000 } = channel; // todo 加密 const opers = queue.map(ele => ele.oper); @@ -140,7 +142,7 @@ export default class Synchronizer { const context2 = this.contextBuilder(); - context2.begin(); + await context2.begin(); try { await onFailed({ remoteEntity: entity!, @@ -170,10 +172,10 @@ export default class Synchronizer ele.entity === entity)!; + const { getPullInfo, pullEntities, clockDriftDuration } = this.config.remotes.find(ele => ele.entity === entity)!; const pullEntityDict = {} as Record>; if (pullEntities) { pullEntities.forEach( @@ -689,11 +691,12 @@ export default class Synchronizer