197 lines
7.0 KiB
JavaScript
197 lines
7.0 KiB
JavaScript
import { assert } from 'oak-domain/lib/utils/assert';
|
||
import { getCosBackend } from '../utils/cos/index.backend';
|
||
import { generateNewIdAsync } from 'oak-domain/lib/utils/uuid';
|
||
import { groupBy } from 'oak-domain/lib/utils/lodash';
|
||
import { applicationProjection, extraFileProjection } from '../types/Projection';
|
||
async function checkWhetherSuccess(context, applicationId, rows) {
|
||
const successIds = [];
|
||
const failedIds = [];
|
||
await context.setApplication(applicationId);
|
||
for (const d of rows) {
|
||
const { origin } = d;
|
||
const cos = getCosBackend(origin);
|
||
assert(cos);
|
||
const success = await cos.checkWhetherSuccess(context.getApplication(), d, context);
|
||
if (success) {
|
||
successIds.push(d.id);
|
||
}
|
||
else {
|
||
failedIds.push(d.id);
|
||
}
|
||
}
|
||
if (successIds.length > 0) {
|
||
await context.operate('extraFile', {
|
||
id: await generateNewIdAsync(),
|
||
action: 'update',
|
||
data: {
|
||
uploadState: 'success',
|
||
},
|
||
filter: {
|
||
id: {
|
||
$in: successIds,
|
||
}
|
||
}
|
||
}, {});
|
||
}
|
||
if (failedIds.length > 0) {
|
||
await context.operate('extraFile', {
|
||
id: await generateNewIdAsync(),
|
||
action: 'update',
|
||
data: {
|
||
uploadState: 'failed',
|
||
},
|
||
filter: {
|
||
id: {
|
||
$in: successIds,
|
||
}
|
||
}
|
||
}, {});
|
||
}
|
||
}
|
||
const watchers = [
|
||
{
|
||
name: '确定uploading的文件状态',
|
||
entity: 'extraFile',
|
||
filter: async () => {
|
||
const now = Date.now();
|
||
const deadline = process.env.NODE_ENV === 'production' ? now - 3600 * 1000 : now - 60 * 1000;
|
||
return {
|
||
$$updateAt$$: {
|
||
$lt: deadline,
|
||
},
|
||
uploadState: 'uploading',
|
||
enableChunkedUpload: {
|
||
$exists: false,
|
||
}
|
||
};
|
||
},
|
||
projection: {
|
||
id: 1,
|
||
applicationId: 1,
|
||
origin: 1,
|
||
bucket: 1,
|
||
uploadState: 1,
|
||
objectId: 1,
|
||
extension: 1,
|
||
},
|
||
fn: async (context, data) => {
|
||
const eg = groupBy(data, 'applicationId');
|
||
for (const appId in eg) {
|
||
await checkWhetherSuccess(context, appId, eg[appId]);
|
||
}
|
||
return {
|
||
extraFile: {
|
||
update: data.length,
|
||
}
|
||
};
|
||
}
|
||
},
|
||
{
|
||
name: '处理长时间未完成分片上传的文件',
|
||
entity: 'extraFile',
|
||
filter: async () => {
|
||
const now = Date.now();
|
||
const deadline = process.env.NODE_ENV === 'production' ? now - 3 * 24 * 60 * 60 * 1000 : now - 60 * 1000;
|
||
return {
|
||
$$updateAt$$: {
|
||
$lt: deadline,
|
||
},
|
||
uploadState: 'uploading',
|
||
enableChunkedUpload: true
|
||
};
|
||
},
|
||
projection: {
|
||
...extraFileProjection,
|
||
application: {
|
||
...applicationProjection,
|
||
},
|
||
enableChunkedUpload: 1,
|
||
chunkInfo: 1,
|
||
},
|
||
fn: async (context, data) => {
|
||
const eg = groupBy(data, 'applicationId');
|
||
for (const appId in eg) {
|
||
// 这里检查需要分片上传的文件信息,但是考虑到可能还在上传之类的,所以先不管,后面再处理
|
||
// 1. 要去查询分片信息,看看是不是都有etag了,如果都有etag了,就说明上传完成了,否则就标记为失败
|
||
const rows = eg[appId];
|
||
const successIds = [];
|
||
const failedIds = [];
|
||
await context.setApplication(appId);
|
||
for (const d of rows) {
|
||
const { origin } = d;
|
||
const cos = getCosBackend(origin);
|
||
assert(cos);
|
||
const success = await cos.checkWhetherSuccess(context.getApplication(), d, context);
|
||
if (success) {
|
||
successIds.push(d.id);
|
||
}
|
||
else {
|
||
// 这个文件不存在,此时去尝试合并,如果合并失败,则标记为失败
|
||
try {
|
||
if (d.chunkInfo?.merged) {
|
||
// 已经合并过了,说明是合并后删除的
|
||
failedIds.push(d.id);
|
||
continue;
|
||
}
|
||
// 去合并分片
|
||
await cos.mergeChunkedUpload(context.getApplication(), d, context);
|
||
await context.operate('extraFile', {
|
||
id: await generateNewIdAsync(),
|
||
action: 'update',
|
||
data: {
|
||
chunkInfo: {
|
||
...d.chunkInfo,
|
||
merged: true,
|
||
},
|
||
},
|
||
filter: {
|
||
id: d.id,
|
||
},
|
||
}, {});
|
||
successIds.push(d.id);
|
||
}
|
||
catch (err) {
|
||
console.log(`分片合并失败,文件: ${d.id} 标记为上传失败: `, err);
|
||
failedIds.push(d.id);
|
||
}
|
||
}
|
||
}
|
||
if (successIds.length > 0) {
|
||
await context.operate('extraFile', {
|
||
id: await generateNewIdAsync(),
|
||
action: 'update',
|
||
data: {
|
||
uploadState: 'success',
|
||
},
|
||
filter: {
|
||
id: {
|
||
$in: successIds,
|
||
}
|
||
}
|
||
}, {});
|
||
}
|
||
if (failedIds.length > 0) {
|
||
await context.operate('extraFile', {
|
||
id: await generateNewIdAsync(),
|
||
action: 'update',
|
||
data: {
|
||
uploadState: 'failed',
|
||
},
|
||
filter: {
|
||
id: {
|
||
$in: successIds,
|
||
}
|
||
}
|
||
}, {});
|
||
}
|
||
}
|
||
return {
|
||
extraFile: {
|
||
update: data.length,
|
||
}
|
||
};
|
||
}
|
||
},
|
||
];
|
||
export default watchers;
|