"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.isAbortError = isAbortError; exports.chunkUpload = chunkUpload; const slice_1 = require("../files/slice"); const Exception_1 = require("../../types/Exception"); function isAbortError(error) { return error instanceof DOMException && error.name === 'AbortError'; } /** * 分片上传通用方法,适用于所有类S3存储服务,如AWS,MinIO、阿里云OSS等 * @param options 参数 * @return */ async function chunkUpload(options) { const { extraFile, uploadFn, file, getPercent, onChunkSuccess } = options; const chunkInfo = extraFile.chunkInfo; const parallelism = options.parallelism || 5; const retryTimes = options.retryTimes || 5; const retryDelay = options.retryDelay || 1000; // 过滤出未完成的分片 const pendingParts = chunkInfo.parts.filter(part => !part.etag); if (pendingParts.length === 0) { return; // 所有分片已上传完成 } // 将文件分片 const chunks = await (0, slice_1.sliceFile)(file, chunkInfo.chunkSize, chunkInfo.partCount); const everyPercent = {}; // 用于记录每个分片的进度百分比 const updateChunkPercent = (partNumber, percent) => { everyPercent[partNumber] = percent; }; const updatePercentInterval = setInterval(() => { if (getPercent) { const totalPercent = Object.values(everyPercent).reduce((acc, val) => acc + val, 0) / chunkInfo.partCount; getPercent(totalPercent); } }, 500); // 上传单个分片的函数,带重试 const uploadPart = async (part, chunk) => { let lastError; for (let attempt = 0; attempt <= retryTimes; attempt++) { try { let data; if (chunk.type === 'getter') { data = await chunk.getFile(); } else { data = chunk; } const response = await uploadFn({ file: data, name: 'file', uploadUrl: part.uploadUrl, formData: part.formData || {}, autoInform: true, getPercent: (percent) => { // 更新每个分片的进度 updateChunkPercent(part.partNumber, percent); }, uploadId: `${extraFile.id}:${part.partNumber}`, method: "PUT" }); // 验证上传是否成功 let isSuccess = false; isSuccess = !!(response.status === 200 || response.status === 204); if (isSuccess) { // // 标记该分片已完成 // part.etag = (response as Response).headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag") // assert(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`); return; } throw new Exception_1.OakUploadException(`分片 ${part.partNumber} 上传失败`); } catch (err) { console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err); lastError = err; // 如果是OakUserException说明是用户主动中止上传,不进行重试 if (isAbortError(err)) { throw err; } if (attempt < retryTimes) { // 等待后重试 await new Promise(resolve => setTimeout(resolve, retryDelay)); } } } throw lastError || new Exception_1.OakUploadException(`分片 ${part.partNumber} 上传失败`); }; // 并行上传控制 const uploadTasks = pendingParts.map((part) => ({ part, chunk: chunks[part.partNumber - 1] })); // 使用并发控制执行上传 const executing = new Set(); const errors = []; let shouldAbort = false; // 中止标志 for (const task of uploadTasks) { // 如果已经需要中止,跳过未开始的任务 if (shouldAbort) { break; } let promise; promise = (async () => { try { await uploadPart(task.part, task.chunk); } catch (err) { if (isAbortError(err)) { // 用户主动中止上传,设置中止标志,阻止后续任务开始 console.log(`分片 ${task.part.partNumber} 上传被用户中止`); shouldAbort = true; } errors.push(err); throw err; } finally { if (promise) { executing.delete(promise); } } })(); executing.add(promise); // 当达到并发限制时,等待任意一个完成 if (executing.size >= parallelism) { await Promise.race(executing).catch(() => { }); } } // 等待所有任务完成 await Promise.allSettled([...executing]); clearInterval(updatePercentInterval); // 检查是否有错误 if (errors.length > 0) { throw errors[0]; } // 等待所有任务完成 await Promise.all(executing); // // 调用分片成功回调(所有分片完成后) // if (onChunkSuccess) { // await onChunkSuccess(chunkInfo); // } return; }