import { sliceFile } from '../files/slice'; import assert from 'assert'; import { OakUploadException } from '../../types/Exception'; export function isAbortError(error) { return error instanceof DOMException && error.name === 'AbortError'; } /** * 分片上传通用方法,适用于所有类S3存储服务,如AWS,MinIO、阿里云OSS等 * @param options 参数 * @return */ export async function chunkUpload(options) { const { extraFile, uploadFn, file, getPercent, onChunkSuccess, presignMultiPartUpload } = options; const chunkInfo = extraFile.chunkInfo; const parallelism = options.parallelism || 5; const retryTimes = options.retryTimes || 5; const retryDelay = options.retryDelay || 1000; // 过滤出未完成的分片 const pendingPartNumbers = chunkInfo.parts .map((etag, index) => ({ partNumber: index + 1, etag })) .filter(item => !item.etag) .map(item => item.partNumber); if (pendingPartNumbers.length === 0) { return; // 所有分片已上传完成 } // 将文件分片 const chunks = await sliceFile(file, chunkInfo.chunkSize, chunkInfo.partCount); const everyPercent = {}; // 用于记录每个分片的进度百分比 const updateChunkPercent = (partNumber, percent) => { everyPercent[partNumber] = percent; }; const updatePercentInterval = setInterval(() => { if (getPercent) { const totalPercent = Object.values(everyPercent).reduce((acc, val) => acc + val, 0) / chunkInfo.partCount; getPercent(totalPercent); } }, 500); const uploadTasks = pendingPartNumbers.map((partNumber) => ({ partNumber, chunk: chunks[partNumber - 1] })); // 预签名池管理 const presignPool = new Map(); const fetchingRanges = new Map(); // 记录正在请求的范围 const BATCH_SIZE = 50; /** * 检查两个范围是否重叠 */ const isRangeOverlap = (a1, a2, b1, b2) => { return a1 <= b2 && b1 <= a2; }; /** * 查找与指定范围重叠的正在请求的范围 */ const findOverlappingRanges = (from, to) => { const overlapping = []; for (const [key, range] of fetchingRanges.entries()) { if (isRangeOverlap(from, to, range.from, range.to)) { overlapping.push(range.promise); } } return overlapping; }; /** * 获取指定 partNumber 的预签名信息 * 优化:允许不重叠范围的请求并发执行 */ const getPresign = async (partNumber) => { if (presignPool.has(partNumber)) { return presignPool.get(partNumber); } // 标准化范围计算,对齐到 BATCH_SIZE 的倍数 const batchIndex = Math.floor((partNumber - 1) / BATCH_SIZE); const from = batchIndex * BATCH_SIZE + 1; const to = Math.min(from + BATCH_SIZE - 1, chunkInfo.partCount); const rangeKey = `${from}-${to}`; // 如果已经有相同范围的请求,等待它 if (fetchingRanges.has(rangeKey)) { await fetchingRanges.get(rangeKey).promise; assert(presignPool.has(partNumber), `无法获取分片 ${partNumber} 的预签名信息`); return presignPool.get(partNumber); } // 查找重叠的范围 let overlappingRequests = findOverlappingRanges(from, to); while (overlappingRequests.length > 0) { await Promise.all(overlappingRequests); if (presignPool.has(partNumber)) { return presignPool.get(partNumber); } // 在等待期间,可能其他任务已经发起了相同范围的请求 if (fetchingRanges.has(rangeKey)) { await fetchingRanges.get(rangeKey).promise; assert(presignPool.has(partNumber), `无法获取分片 ${partNumber} 的预签名信息`); return presignPool.get(partNumber); } overlappingRequests = findOverlappingRanges(from, to); } // 创建请求 const fetchPromise = (async () => { try { const presignedParts = await presignMultiPartUpload(from, to); for (const item of presignedParts) { presignPool.set(item.partNumber, { uploadUrl: item.uploadUrl, formData: item.formData }); } } finally { fetchingRanges.delete(rangeKey); } })(); fetchingRanges.set(rangeKey, { from, to, promise: fetchPromise }); await fetchPromise; assert(presignPool.has(partNumber), `无法获取分片 ${partNumber} 的预签名信息`); return presignPool.get(partNumber); }; // 上传单个分片的函数,带重试 const uploadPart = async (partNumber, chunk) => { let lastError; for (let attempt = 0; attempt <= retryTimes; attempt++) { try { // 从预签名池获取信息 const presignInfo = await getPresign(partNumber); let data; if (chunk.type === 'getter') { data = await chunk.getFile(); } else { data = chunk; } const response = await uploadFn({ file: data, name: 'file', uploadUrl: presignInfo.uploadUrl, // 从池中获取 formData: presignInfo.formData, // 从池中获取 autoInform: true, getPercent: (percent) => { updateChunkPercent(partNumber, percent); // 使用 partNumber }, uploadId: `${extraFile.id}:${partNumber}`, // 使用 partNumber method: "PUT" }); let isSuccess = !!(response.status === 200 || response.status === 204); if (isSuccess) { return; } throw new OakUploadException(`分片 ${partNumber} 上传失败`); } catch (err) { console.error(`分片 ${partNumber} 上传第 ${attempt + 1} 次失败:`, err); lastError = err; if (isAbortError(err)) { throw err; } if (attempt < retryTimes) { await new Promise(resolve => setTimeout(resolve, retryDelay)); } } } throw lastError || new OakUploadException(`分片 ${partNumber} 上传失败`); }; // 使用并发控制执行上传 const executing = new Set(); const errors = []; let shouldAbort = false; // 中止标志 for (const task of uploadTasks) { // 如果已经需要中止,跳过未开始的任务 if (shouldAbort) { break; } let promise; promise = (async () => { try { await uploadPart(task.partNumber, task.chunk); // 修改参数 } catch (err) { if (isAbortError(err)) { console.log(`分片 ${task.partNumber} 上传被用户中止`); // 修改日志 shouldAbort = true; } errors.push(err); throw err; } finally { if (promise) { executing.delete(promise); } } })(); executing.add(promise); // 当达到并发限制时,等待任意一个完成 if (executing.size >= parallelism) { await Promise.race(executing).catch(() => { }); } } // 等待所有任务完成 await Promise.allSettled([...executing]); clearInterval(updatePercentInterval); // 检查是否有错误 if (errors.length > 0) { throw errors[0]; } // 等待所有任务完成 await Promise.all(executing); // // 调用分片成功回调(所有分片完成后) // if (onChunkSuccess) { // await onChunkSuccess(chunkInfo); // } return; }