fix: 修复清理ExecutingMarks的bug

This commit is contained in:
Pan Qiancheng 2026-01-23 14:55:37 +08:00
parent 4f253bab2c
commit 691419645a
2 changed files with 14 additions and 11 deletions

View File

@ -398,10 +398,10 @@ class AppLoader extends types_1.AppLoader {
filterAndMarkExecutingRows(watcher, rows) { filterAndMarkExecutingRows(watcher, rows) {
if (watcher.exclusive !== true) { if (watcher.exclusive !== true) {
// 不需要排他执行,直接返回所有行 // 不需要排他执行,直接返回所有行
return [rows, false]; return [rows, []];
} }
const rowsWithoutExecuting = []; const rowsWithoutExecuting = [];
let hasSkipped = false; const skipedRows = [];
const watcherName = watcher.name; const watcherName = watcher.name;
for (const row of rows) { for (const row of rows) {
if (!row.id) { if (!row.id) {
@ -413,11 +413,11 @@ class AppLoader extends types_1.AppLoader {
rowsWithoutExecuting.push(row); rowsWithoutExecuting.push(row);
} }
else { else {
hasSkipped = true; skipedRows.push(row);
console.warn(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】将跳过正在被执行的数据ID${row.id}】,请检查是否执行超时`); console.warn(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】将跳过正在被执行的数据ID${row.id}】,请检查是否执行超时`);
} }
} }
return [rowsWithoutExecuting, hasSkipped]; return [rowsWithoutExecuting, skipedRows];
} }
/** /**
* 清理执行标记 * 清理执行标记
@ -494,6 +494,7 @@ class AppLoader extends types_1.AppLoader {
if (!isFreeType) { if (!isFreeType) {
await selectContext.commit(); await selectContext.commit();
} }
this.cleanupExecutingMarks(watcher.name, hasSkipped);
return result; return result;
} }
// 3. 执行业务逻辑 // 3. 执行业务逻辑
@ -511,7 +512,7 @@ class AppLoader extends types_1.AppLoader {
} }
catch (err) { catch (err) {
// 清理执行标记 // 清理执行标记
this.cleanupExecutingMarks(watcher.name, rowsWithoutExecuting); this.cleanupExecutingMarks(watcher.name, rows);
if (!isFreeType) { if (!isFreeType) {
if (err instanceof types_1.OakPartialSuccess) { if (err instanceof types_1.OakPartialSuccess) {
await selectContext.commit(); await selectContext.commit();

View File

@ -479,14 +479,14 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
private filterAndMarkExecutingRows<T extends keyof ED>( private filterAndMarkExecutingRows<T extends keyof ED>(
watcher: Watcher<ED, T, Cxt>, watcher: Watcher<ED, T, Cxt>,
rows: Partial<ED[T]['Schema']>[] rows: Partial<ED[T]['Schema']>[]
): [Partial<ED[T]['Schema']>[], boolean] { ): [Partial<ED[T]['Schema']>[], Partial<ED[T]['Schema']>[]] {
if (watcher.exclusive !== true) { if (watcher.exclusive !== true) {
// 不需要排他执行,直接返回所有行 // 不需要排他执行,直接返回所有行
return [rows, false]; return [rows, []];
} }
const rowsWithoutExecuting: Partial<ED[T]['Schema']>[] = []; const rowsWithoutExecuting: Partial<ED[T]['Schema']>[] = [];
let hasSkipped = false; const skipedRows: Partial<ED[T]['Schema']>[] = [];
const watcherName = watcher.name; const watcherName = watcher.name;
for (const row of rows) { for (const row of rows) {
@ -499,12 +499,12 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
if (this.checkDataExecuting(watcherName, row.id)) { if (this.checkDataExecuting(watcherName, row.id)) {
rowsWithoutExecuting.push(row); rowsWithoutExecuting.push(row);
} else { } else {
hasSkipped = true; skipedRows.push(row);
console.warn(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】将跳过正在被执行的数据ID${row.id}】,请检查是否执行超时`); console.warn(`实例【${process.env.OAK_INSTANCE_ID || '单机'}】执行器【${watcherName}】将跳过正在被执行的数据ID${row.id}】,请检查是否执行超时`);
} }
} }
return [rowsWithoutExecuting, hasSkipped]; return [rowsWithoutExecuting, skipedRows];
} }
/** /**
@ -607,6 +607,8 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
if (!isFreeType) { if (!isFreeType) {
await selectContext.commit(); await selectContext.commit();
} }
this.cleanupExecutingMarks(watcher.name, hasSkipped);
return result; return result;
} }
@ -624,7 +626,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
return result; return result;
} catch (err) { } catch (err) {
// 清理执行标记 // 清理执行标记
this.cleanupExecutingMarks(watcher.name, rowsWithoutExecuting); this.cleanupExecutingMarks(watcher.name, rows);
if (!isFreeType) { if (!isFreeType) {
if (err instanceof OakPartialSuccess) { if (err instanceof OakPartialSuccess) {