From 10077f150ef67fc745c57569a3c0e955a291d195 Mon Sep 17 00:00:00 2001 From: qcqcqc <1220204124@zust.edu.cn> Date: Fri, 26 Dec 2025 13:30:07 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9ES3=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=88=86=E7=89=87=E4=B8=8A=E4=BC=A0=E7=AD=89=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- es/triggers/index.d.ts | 2 +- es/utils/cos/aliyun.d.ts | 1 - es/utils/cos/aliyun.js | 126 +------------------------ es/utils/cos/common.d.ts | 18 ++++ es/utils/cos/common.js | 128 +++++++++++++++++++++++++ es/utils/cos/s3.backend.d.ts | 11 ++- es/utils/cos/s3.backend.js | 36 +++++-- es/utils/cos/s3.d.ts | 4 + es/utils/cos/s3.js | 56 ++++++----- lib/triggers/index.d.ts | 2 +- lib/utils/cos/aliyun.d.ts | 1 - lib/utils/cos/aliyun.js | 124 +----------------------- lib/utils/cos/common.d.ts | 18 ++++ lib/utils/cos/common.js | 132 ++++++++++++++++++++++++++ lib/utils/cos/s3.backend.d.ts | 11 ++- lib/utils/cos/s3.backend.js | 34 +++++-- lib/utils/cos/s3.d.ts | 4 + lib/utils/cos/s3.js | 56 ++++++----- src/utils/cos/aliyun.ts | 170 +-------------------------------- src/utils/cos/common.ts | 173 ++++++++++++++++++++++++++++++++++ src/utils/cos/s3.backend.ts | 57 +++++++++-- src/utils/cos/s3.ts | 77 +++++++++------ 22 files changed, 717 insertions(+), 524 deletions(-) create mode 100644 es/utils/cos/common.d.ts create mode 100644 es/utils/cos/common.js create mode 100644 lib/utils/cos/common.d.ts create mode 100644 lib/utils/cos/common.js create mode 100644 src/utils/cos/common.ts diff --git a/es/triggers/index.d.ts b/es/triggers/index.d.ts index 96c34b4e7..8737638aa 100644 --- a/es/triggers/index.d.ts +++ b/es/triggers/index.d.ts @@ -1,2 +1,2 @@ -declare const _default: (import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger>)[]; +declare const _default: (import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger>)[]; export default _default; diff --git a/es/utils/cos/aliyun.d.ts b/es/utils/cos/aliyun.d.ts index e298d23c4..fa2298fb8 100644 --- a/es/utils/cos/aliyun.d.ts +++ b/es/utils/cos/aliyun.d.ts @@ -10,7 +10,6 @@ export default class ALiYun implements Cos { account: import("../../types/Config").AliCloudConfig; }; protected formKey(extraFile: Partial): string; - private chunkUpload; upload(options: { extraFile: OpSchema; uploadFn: UploadFn; diff --git a/es/utils/cos/aliyun.js b/es/utils/cos/aliyun.js index 2cb2374a8..f76028017 100644 --- a/es/utils/cos/aliyun.js +++ b/es/utils/cos/aliyun.js @@ -1,7 +1,7 @@ import { assert } from 'oak-domain/lib/utils/assert'; import { OakUploadException } from '../../types/Exception'; -import { isOakException, OakNetworkException, OakUserException } from 'oak-domain/lib/types/Exception'; -import { sliceFile, cleanTempFiles } from '../files/slice'; +import { OakNetworkException } from 'oak-domain/lib/types/Exception'; +import { chunkUpload } from './common'; export default class ALiYun { name = 'aliyun'; autoInform() { @@ -25,134 +25,14 @@ export default class ALiYun { assert(objectId); return `extraFile/${objectId}${extension ? '.' + extension : ''}`; } - async chunkUpload(options) { - const { extraFile, uploadFn, file, uploadToAspect, getPercent, onChunkSuccess } = options; - const chunkInfo = extraFile.chunkInfo; - const parallelism = options.parallelism || 5; - const retryTimes = options.retryTimes || 5; - const retryDelay = options.retryDelay || 1000; - // 过滤出未完成的分片 - const pendingParts = chunkInfo.parts.filter(part => !part.etag); - if (pendingParts.length === 0) { - return; // 所有分片已上传完成 - } - // 将文件分片 - const chunks = await sliceFile(file, chunkInfo.chunkSize, chunkInfo.partCount); - const everyPercent = {}; // 用于记录每个分片的进度百分比 - const updateChunkPercent = (partNumber, percent) => { - everyPercent[partNumber] = percent; - }; - const updatePercentInterval = setInterval(() => { - if (getPercent) { - const totalPercent = Object.values(everyPercent).reduce((acc, val) => acc + val, 0) / chunkInfo.partCount; - getPercent(totalPercent); - } - }, 500); - // 上传单个分片的函数,带重试 - const uploadPart = async (part, chunk) => { - let lastError; - for (let attempt = 0; attempt <= retryTimes; attempt++) { - try { - const response = await uploadFn(chunk, 'file', part.uploadUrl, part.formData || {}, true, (percent) => { - // 更新每个分片的进度 - updateChunkPercent(part.partNumber, percent); - }, `${extraFile.id}:${part.partNumber}`, "PUT"); - // 验证上传是否成功 - let isSuccess = false; - if (process.env.OAK_PLATFORM === 'wechatMp') { - if (response.errMsg === 'uploadFile:ok') { - const data = JSON.parse(response.data); - isSuccess = !!(data.status === 204 || data.status === 200); - } - } - else { - isSuccess = !!(response.status === 200 || response.status === 204); - } - if (isSuccess) { - // 标记该分片已完成 - part.etag = response.headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag"); - assert(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`); - return; - } - throw new OakUploadException(`分片 ${part.partNumber} 上传失败`); - } - catch (err) { - console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err); - lastError = err; - // 如果是DomError,并且name是AbortError,说明是用户主动中止上传,不进行重试 - if (isOakException(err, OakUserException)) { - throw err; - } - if (attempt < retryTimes) { - // 等待后重试 - await new Promise(resolve => setTimeout(resolve, retryDelay)); - } - } - } - throw lastError || new OakUploadException(`分片 ${part.partNumber} 上传失败`); - }; - // 并行上传控制 - const uploadTasks = pendingParts.map((part) => ({ - part, - chunk: chunks[part.partNumber - 1] - })); - // 使用并发控制执行上传 - const executing = new Set(); - const errors = []; - for (const task of uploadTasks) { - let promise; - promise = (async () => { - try { - await uploadPart(task.part, task.chunk); - } - catch (err) { - if (isOakException(err, OakUserException)) { - // 用户主动中止上传,抛到上层再处理 - console.log(`分片 ${task.part.partNumber} 上传被用户中止`); - } - errors.push(err); - throw err; - } - finally { - if (promise) { - executing.delete(promise); - } - } - })(); - executing.add(promise); - // 当达到并发限制时,等待任意一个完成 - if (executing.size >= parallelism) { - await Promise.race(executing).catch(() => { }); - } - } - // 等待所有任务完成 - await Promise.allSettled([...executing]); - clearInterval(updatePercentInterval); - // 检查是否有错误 - if (errors.length > 0) { - throw errors[0]; - } - // 等待所有任务完成 - await Promise.all(executing); - // 调用分片成功回调(所有分片完成后) - if (onChunkSuccess) { - await onChunkSuccess(chunkInfo); - } - // 清理小程序环境下的临时文件 - if (process.env.OAK_PLATFORM === 'wechatMp' && typeof file === 'string') { - await cleanTempFiles(chunks); - } - return; - } async upload(options) { const { extraFile, uploadFn, file, uploadToAspect, getPercent, onChunkSuccess } = options; const uploadMeta = extraFile.uploadMeta; if (extraFile.enableChunkedUpload) { - return this.chunkUpload({ + return chunkUpload({ extraFile, uploadFn, file, - uploadToAspect, getPercent, parallelism: options.parallelism, retryTimes: options.retryTimes, diff --git a/es/utils/cos/common.d.ts b/es/utils/cos/common.d.ts new file mode 100644 index 000000000..6551d3a4a --- /dev/null +++ b/es/utils/cos/common.d.ts @@ -0,0 +1,18 @@ +import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema'; +import { UploadFn } from "../../types/Cos"; +import { EntityDict } from '../../oak-app-domain'; +/** + * 分片上传通用方法,适用于所有类S3存储服务,如AWS,MinIO、阿里云OSS等 + * @param options 参数 + * @return + */ +export declare function chunkUpload(options: { + extraFile: OpSchema; + uploadFn: UploadFn; + file: string | File; + getPercent?: Function; + parallelism?: number; + retryTimes?: number; + retryDelay?: number; + onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise; +}): Promise; diff --git a/es/utils/cos/common.js b/es/utils/cos/common.js new file mode 100644 index 000000000..ea3f2775a --- /dev/null +++ b/es/utils/cos/common.js @@ -0,0 +1,128 @@ +import { isOakException, OakUserException } from 'oak-domain/lib/types/Exception'; +import { sliceFile, cleanTempFiles } from '../files/slice'; +import assert from 'assert'; +import { OakUploadException } from '../../types/Exception'; +/** + * 分片上传通用方法,适用于所有类S3存储服务,如AWS,MinIO、阿里云OSS等 + * @param options 参数 + * @return + */ +export async function chunkUpload(options) { + const { extraFile, uploadFn, file, getPercent, onChunkSuccess } = options; + const chunkInfo = extraFile.chunkInfo; + const parallelism = options.parallelism || 5; + const retryTimes = options.retryTimes || 5; + const retryDelay = options.retryDelay || 1000; + // 过滤出未完成的分片 + const pendingParts = chunkInfo.parts.filter(part => !part.etag); + if (pendingParts.length === 0) { + return; // 所有分片已上传完成 + } + // 将文件分片 + const chunks = await sliceFile(file, chunkInfo.chunkSize, chunkInfo.partCount); + const everyPercent = {}; // 用于记录每个分片的进度百分比 + const updateChunkPercent = (partNumber, percent) => { + everyPercent[partNumber] = percent; + }; + const updatePercentInterval = setInterval(() => { + if (getPercent) { + const totalPercent = Object.values(everyPercent).reduce((acc, val) => acc + val, 0) / chunkInfo.partCount; + getPercent(totalPercent); + } + }, 500); + // 上传单个分片的函数,带重试 + const uploadPart = async (part, chunk) => { + let lastError; + for (let attempt = 0; attempt <= retryTimes; attempt++) { + try { + const response = await uploadFn(chunk, 'file', part.uploadUrl, part.formData || {}, true, (percent) => { + // 更新每个分片的进度 + updateChunkPercent(part.partNumber, percent); + }, `${extraFile.id}:${part.partNumber}`, "PUT"); + // 验证上传是否成功 + let isSuccess = false; + if (process.env.OAK_PLATFORM === 'wechatMp') { + if (response.errMsg === 'uploadFile:ok') { + const data = JSON.parse(response.data); + isSuccess = !!(data.status === 204 || data.status === 200); + } + } + else { + isSuccess = !!(response.status === 200 || response.status === 204); + } + if (isSuccess) { + // 标记该分片已完成 + part.etag = response.headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag"); + assert(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`); + return; + } + throw new OakUploadException(`分片 ${part.partNumber} 上传失败`); + } + catch (err) { + console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err); + lastError = err; + // 如果是OakUserException说明是用户主动中止上传,不进行重试 + if (isOakException(err, OakUserException)) { + throw err; + } + if (attempt < retryTimes) { + // 等待后重试 + await new Promise(resolve => setTimeout(resolve, retryDelay)); + } + } + } + throw lastError || new OakUploadException(`分片 ${part.partNumber} 上传失败`); + }; + // 并行上传控制 + const uploadTasks = pendingParts.map((part) => ({ + part, + chunk: chunks[part.partNumber - 1] + })); + // 使用并发控制执行上传 + const executing = new Set(); + const errors = []; + for (const task of uploadTasks) { + let promise; + promise = (async () => { + try { + await uploadPart(task.part, task.chunk); + } + catch (err) { + if (isOakException(err, OakUserException)) { + // 用户主动中止上传,抛到上层再处理 + console.log(`分片 ${task.part.partNumber} 上传被用户中止`); + } + errors.push(err); + throw err; + } + finally { + if (promise) { + executing.delete(promise); + } + } + })(); + executing.add(promise); + // 当达到并发限制时,等待任意一个完成 + if (executing.size >= parallelism) { + await Promise.race(executing).catch(() => { }); + } + } + // 等待所有任务完成 + await Promise.allSettled([...executing]); + clearInterval(updatePercentInterval); + // 检查是否有错误 + if (errors.length > 0) { + throw errors[0]; + } + // 等待所有任务完成 + await Promise.all(executing); + // 调用分片成功回调(所有分片完成后) + if (onChunkSuccess) { + await onChunkSuccess(chunkInfo); + } + // 清理小程序环境下的临时文件 + if (process.env.OAK_PLATFORM === 'wechatMp' && typeof file === 'string') { + await cleanTempFiles(chunks); + } + return; +} diff --git a/es/utils/cos/s3.backend.d.ts b/es/utils/cos/s3.backend.d.ts index e230f87f2..37323d4aa 100644 --- a/es/utils/cos/s3.backend.d.ts +++ b/es/utils/cos/s3.backend.d.ts @@ -18,12 +18,15 @@ export default class S3Backend extends S3 implements CosBackend { uploadId: string; chunkSize: number; partCount: number; - partSize: number; - parts: never[]; + parts: { + partNumber: number; + uploadUrl: string; + formData: {}; + }[]; }>; /** - * 完成分片上传后的合并操作 - */ +* 完成分片上传后的合并操作 +*/ mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC): Promise; abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC): Promise; } diff --git a/es/utils/cos/s3.backend.js b/es/utils/cos/s3.backend.js index e98cf6c4a..09b40471e 100644 --- a/es/utils/cos/s3.backend.js +++ b/es/utils/cos/s3.backend.js @@ -1,7 +1,7 @@ import { assert } from 'oak-domain/lib/utils/assert'; import S3 from './s3'; import { S3SDK } from 'oak-external-sdk'; -import { OakExternalException, OakPreConditionUnsetException } from 'oak-domain/lib/types/Exception'; +import { OakExternalException } from 'oak-domain/lib/types/Exception'; export default class S3Backend extends S3 { getConfigAndInstance(application, bucket) { const { config, account, endpoint, defaultBucket } = this.getConfig(application); @@ -96,21 +96,37 @@ export default class S3Backend extends S3 { } } async composeChunkUploadInfo(application, extraFile, context) { - throw new OakPreConditionUnsetException('S3暂不支持分片上传'); + const key = this.formKey(extraFile); + const { instance, config: s3Config } = this.getConfigAndInstance(application, extraFile.bucket); + const preInit = await instance.prepareMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo?.partCount, { + endpoint: s3Config.endpoint, + expiresIn: 30000, // 上传链接过期时间,单位秒 + }); return { - uploadId: '', - chunkSize: 0, - partCount: 0, - partSize: 0, - parts: [], + uploadId: preInit.uploadId, + chunkSize: extraFile.chunkInfo?.chunkSize, + partCount: preInit.parts.length, + parts: preInit.parts.map((part) => ({ + partNumber: part.partNumber, + uploadUrl: part.uploadUrl, + formData: {}, // S3不需要额外的formData + })), }; } /** - * 完成分片上传后的合并操作 - */ +* 完成分片上传后的合并操作 +*/ async mergeChunkedUpload(application, extraFile, context) { - // Implementation here + const key = this.formKey(extraFile); + const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket); + await instance.completeMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo.uploadId, extraFile.chunkInfo.parts.map((part) => ({ + partNumber: part.partNumber, + eTag: part.etag, + })), s3CosConfig.endpoint); } async abortMultipartUpload(application, extraFile, context) { + const key = this.formKey(extraFile); + const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket); + await instance.abortMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo.uploadId, s3CosConfig.endpoint); } } diff --git a/es/utils/cos/s3.d.ts b/es/utils/cos/s3.d.ts index 31730aa42..b4c924e41 100644 --- a/es/utils/cos/s3.d.ts +++ b/es/utils/cos/s3.d.ts @@ -18,6 +18,10 @@ export default class S3 implements Cos { file: string | File; uploadToAspect?: UploadToAspect; getPercent?: Function; + parallelism?: number; + retryTimes?: number; + retryDelay?: number; + onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise; }): Promise; composeFileUrl(options: { application: Partial; diff --git a/es/utils/cos/s3.js b/es/utils/cos/s3.js index 240d068f7..359a7c1ec 100644 --- a/es/utils/cos/s3.js +++ b/es/utils/cos/s3.js @@ -1,6 +1,7 @@ import { assert } from 'oak-domain/lib/utils/assert'; import { OakUploadException } from '../../types/Exception'; import { OakNetworkException } from 'oak-domain/lib/types/Exception'; +import { chunkUpload } from './common'; export default class S3 { name = 's3'; autoInform() { @@ -27,30 +28,43 @@ export default class S3 { return `extraFile/${objectId}${extension ? '.' + extension : ''}`; } async upload(options) { - const { extraFile, uploadFn, file, uploadToAspect, getPercent } = options; + const { extraFile, uploadFn, file, getPercent, parallelism, retryTimes, retryDelay, onChunkSuccess } = options; const uploadMeta = extraFile.uploadMeta; - assert(extraFile.enableChunkedUpload !== true, '暂不支持分片上传'); - let response; - try { - // S3 使用预签名 URL 直接上传,不需要额外的 formData - response = await uploadFn(file, 'file', uploadMeta.uploadUrl, {}, true, getPercent, extraFile.id, "PUT"); - } - catch (err) { - throw new OakNetworkException('网络异常,请求失败'); - } - let isSuccess = false; - if (process.env.OAK_PLATFORM === 'wechatMp') { - // 小程序端上传 - if (response.errMsg === 'uploadFile:ok') { - const statusCode = response.statusCode; - isSuccess = statusCode === 200 || statusCode === 204; - } + if (extraFile.enableChunkedUpload) { + return chunkUpload({ + extraFile, + uploadFn, + file, + getPercent, + parallelism: parallelism, + retryTimes: retryTimes, + retryDelay: retryDelay, + onChunkSuccess: onChunkSuccess, + }); } else { - isSuccess = response.status === 200 || response.status === 204; - } - if (isSuccess) { - return; + let response; + try { + // S3 使用预签名 URL 直接上传,不需要额外的 formData + response = await uploadFn(file, 'file', uploadMeta.uploadUrl, {}, true, getPercent, extraFile.id, "PUT"); + } + catch (err) { + throw new OakNetworkException('网络异常,请求失败'); + } + let isSuccess = false; + if (process.env.OAK_PLATFORM === 'wechatMp') { + // 小程序端上传 + if (response.errMsg === 'uploadFile:ok') { + const statusCode = response.statusCode; + isSuccess = statusCode === 200 || statusCode === 204; + } + } + else { + isSuccess = response.status === 200 || response.status === 204; + } + if (isSuccess) { + return; + } } throw new OakUploadException('文件上传S3失败'); } diff --git a/lib/triggers/index.d.ts b/lib/triggers/index.d.ts index 5e0cf60ac..8737638aa 100644 --- a/lib/triggers/index.d.ts +++ b/lib/triggers/index.d.ts @@ -1,2 +1,2 @@ -declare const _default: (import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger>)[]; +declare const _default: (import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger> | import("oak-domain/lib/types").Trigger>)[]; export default _default; diff --git a/lib/utils/cos/aliyun.d.ts b/lib/utils/cos/aliyun.d.ts index e298d23c4..fa2298fb8 100644 --- a/lib/utils/cos/aliyun.d.ts +++ b/lib/utils/cos/aliyun.d.ts @@ -10,7 +10,6 @@ export default class ALiYun implements Cos { account: import("../../types/Config").AliCloudConfig; }; protected formKey(extraFile: Partial): string; - private chunkUpload; upload(options: { extraFile: OpSchema; uploadFn: UploadFn; diff --git a/lib/utils/cos/aliyun.js b/lib/utils/cos/aliyun.js index 44c196b4d..49004db24 100644 --- a/lib/utils/cos/aliyun.js +++ b/lib/utils/cos/aliyun.js @@ -3,7 +3,7 @@ Object.defineProperty(exports, "__esModule", { value: true }); const assert_1 = require("oak-domain/lib/utils/assert"); const Exception_1 = require("../../types/Exception"); const Exception_2 = require("oak-domain/lib/types/Exception"); -const slice_1 = require("../files/slice"); +const common_1 = require("./common"); class ALiYun { name = 'aliyun'; autoInform() { @@ -27,134 +27,14 @@ class ALiYun { (0, assert_1.assert)(objectId); return `extraFile/${objectId}${extension ? '.' + extension : ''}`; } - async chunkUpload(options) { - const { extraFile, uploadFn, file, uploadToAspect, getPercent, onChunkSuccess } = options; - const chunkInfo = extraFile.chunkInfo; - const parallelism = options.parallelism || 5; - const retryTimes = options.retryTimes || 5; - const retryDelay = options.retryDelay || 1000; - // 过滤出未完成的分片 - const pendingParts = chunkInfo.parts.filter(part => !part.etag); - if (pendingParts.length === 0) { - return; // 所有分片已上传完成 - } - // 将文件分片 - const chunks = await (0, slice_1.sliceFile)(file, chunkInfo.chunkSize, chunkInfo.partCount); - const everyPercent = {}; // 用于记录每个分片的进度百分比 - const updateChunkPercent = (partNumber, percent) => { - everyPercent[partNumber] = percent; - }; - const updatePercentInterval = setInterval(() => { - if (getPercent) { - const totalPercent = Object.values(everyPercent).reduce((acc, val) => acc + val, 0) / chunkInfo.partCount; - getPercent(totalPercent); - } - }, 500); - // 上传单个分片的函数,带重试 - const uploadPart = async (part, chunk) => { - let lastError; - for (let attempt = 0; attempt <= retryTimes; attempt++) { - try { - const response = await uploadFn(chunk, 'file', part.uploadUrl, part.formData || {}, true, (percent) => { - // 更新每个分片的进度 - updateChunkPercent(part.partNumber, percent); - }, `${extraFile.id}:${part.partNumber}`, "PUT"); - // 验证上传是否成功 - let isSuccess = false; - if (process.env.OAK_PLATFORM === 'wechatMp') { - if (response.errMsg === 'uploadFile:ok') { - const data = JSON.parse(response.data); - isSuccess = !!(data.status === 204 || data.status === 200); - } - } - else { - isSuccess = !!(response.status === 200 || response.status === 204); - } - if (isSuccess) { - // 标记该分片已完成 - part.etag = response.headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag"); - (0, assert_1.assert)(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`); - return; - } - throw new Exception_1.OakUploadException(`分片 ${part.partNumber} 上传失败`); - } - catch (err) { - console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err); - lastError = err; - // 如果是DomError,并且name是AbortError,说明是用户主动中止上传,不进行重试 - if ((0, Exception_2.isOakException)(err, Exception_2.OakUserException)) { - throw err; - } - if (attempt < retryTimes) { - // 等待后重试 - await new Promise(resolve => setTimeout(resolve, retryDelay)); - } - } - } - throw lastError || new Exception_1.OakUploadException(`分片 ${part.partNumber} 上传失败`); - }; - // 并行上传控制 - const uploadTasks = pendingParts.map((part) => ({ - part, - chunk: chunks[part.partNumber - 1] - })); - // 使用并发控制执行上传 - const executing = new Set(); - const errors = []; - for (const task of uploadTasks) { - let promise; - promise = (async () => { - try { - await uploadPart(task.part, task.chunk); - } - catch (err) { - if ((0, Exception_2.isOakException)(err, Exception_2.OakUserException)) { - // 用户主动中止上传,抛到上层再处理 - console.log(`分片 ${task.part.partNumber} 上传被用户中止`); - } - errors.push(err); - throw err; - } - finally { - if (promise) { - executing.delete(promise); - } - } - })(); - executing.add(promise); - // 当达到并发限制时,等待任意一个完成 - if (executing.size >= parallelism) { - await Promise.race(executing).catch(() => { }); - } - } - // 等待所有任务完成 - await Promise.allSettled([...executing]); - clearInterval(updatePercentInterval); - // 检查是否有错误 - if (errors.length > 0) { - throw errors[0]; - } - // 等待所有任务完成 - await Promise.all(executing); - // 调用分片成功回调(所有分片完成后) - if (onChunkSuccess) { - await onChunkSuccess(chunkInfo); - } - // 清理小程序环境下的临时文件 - if (process.env.OAK_PLATFORM === 'wechatMp' && typeof file === 'string') { - await (0, slice_1.cleanTempFiles)(chunks); - } - return; - } async upload(options) { const { extraFile, uploadFn, file, uploadToAspect, getPercent, onChunkSuccess } = options; const uploadMeta = extraFile.uploadMeta; if (extraFile.enableChunkedUpload) { - return this.chunkUpload({ + return (0, common_1.chunkUpload)({ extraFile, uploadFn, file, - uploadToAspect, getPercent, parallelism: options.parallelism, retryTimes: options.retryTimes, diff --git a/lib/utils/cos/common.d.ts b/lib/utils/cos/common.d.ts new file mode 100644 index 000000000..6551d3a4a --- /dev/null +++ b/lib/utils/cos/common.d.ts @@ -0,0 +1,18 @@ +import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema'; +import { UploadFn } from "../../types/Cos"; +import { EntityDict } from '../../oak-app-domain'; +/** + * 分片上传通用方法,适用于所有类S3存储服务,如AWS,MinIO、阿里云OSS等 + * @param options 参数 + * @return + */ +export declare function chunkUpload(options: { + extraFile: OpSchema; + uploadFn: UploadFn; + file: string | File; + getPercent?: Function; + parallelism?: number; + retryTimes?: number; + retryDelay?: number; + onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise; +}): Promise; diff --git a/lib/utils/cos/common.js b/lib/utils/cos/common.js new file mode 100644 index 000000000..7a225b1bb --- /dev/null +++ b/lib/utils/cos/common.js @@ -0,0 +1,132 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.chunkUpload = chunkUpload; +const tslib_1 = require("tslib"); +const Exception_1 = require("oak-domain/lib/types/Exception"); +const slice_1 = require("../files/slice"); +const assert_1 = tslib_1.__importDefault(require("assert")); +const Exception_2 = require("../../types/Exception"); +/** + * 分片上传通用方法,适用于所有类S3存储服务,如AWS,MinIO、阿里云OSS等 + * @param options 参数 + * @return + */ +async function chunkUpload(options) { + const { extraFile, uploadFn, file, getPercent, onChunkSuccess } = options; + const chunkInfo = extraFile.chunkInfo; + const parallelism = options.parallelism || 5; + const retryTimes = options.retryTimes || 5; + const retryDelay = options.retryDelay || 1000; + // 过滤出未完成的分片 + const pendingParts = chunkInfo.parts.filter(part => !part.etag); + if (pendingParts.length === 0) { + return; // 所有分片已上传完成 + } + // 将文件分片 + const chunks = await (0, slice_1.sliceFile)(file, chunkInfo.chunkSize, chunkInfo.partCount); + const everyPercent = {}; // 用于记录每个分片的进度百分比 + const updateChunkPercent = (partNumber, percent) => { + everyPercent[partNumber] = percent; + }; + const updatePercentInterval = setInterval(() => { + if (getPercent) { + const totalPercent = Object.values(everyPercent).reduce((acc, val) => acc + val, 0) / chunkInfo.partCount; + getPercent(totalPercent); + } + }, 500); + // 上传单个分片的函数,带重试 + const uploadPart = async (part, chunk) => { + let lastError; + for (let attempt = 0; attempt <= retryTimes; attempt++) { + try { + const response = await uploadFn(chunk, 'file', part.uploadUrl, part.formData || {}, true, (percent) => { + // 更新每个分片的进度 + updateChunkPercent(part.partNumber, percent); + }, `${extraFile.id}:${part.partNumber}`, "PUT"); + // 验证上传是否成功 + let isSuccess = false; + if (process.env.OAK_PLATFORM === 'wechatMp') { + if (response.errMsg === 'uploadFile:ok') { + const data = JSON.parse(response.data); + isSuccess = !!(data.status === 204 || data.status === 200); + } + } + else { + isSuccess = !!(response.status === 200 || response.status === 204); + } + if (isSuccess) { + // 标记该分片已完成 + part.etag = response.headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag"); + (0, assert_1.default)(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`); + return; + } + throw new Exception_2.OakUploadException(`分片 ${part.partNumber} 上传失败`); + } + catch (err) { + console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err); + lastError = err; + // 如果是OakUserException说明是用户主动中止上传,不进行重试 + if ((0, Exception_1.isOakException)(err, Exception_1.OakUserException)) { + throw err; + } + if (attempt < retryTimes) { + // 等待后重试 + await new Promise(resolve => setTimeout(resolve, retryDelay)); + } + } + } + throw lastError || new Exception_2.OakUploadException(`分片 ${part.partNumber} 上传失败`); + }; + // 并行上传控制 + const uploadTasks = pendingParts.map((part) => ({ + part, + chunk: chunks[part.partNumber - 1] + })); + // 使用并发控制执行上传 + const executing = new Set(); + const errors = []; + for (const task of uploadTasks) { + let promise; + promise = (async () => { + try { + await uploadPart(task.part, task.chunk); + } + catch (err) { + if ((0, Exception_1.isOakException)(err, Exception_1.OakUserException)) { + // 用户主动中止上传,抛到上层再处理 + console.log(`分片 ${task.part.partNumber} 上传被用户中止`); + } + errors.push(err); + throw err; + } + finally { + if (promise) { + executing.delete(promise); + } + } + })(); + executing.add(promise); + // 当达到并发限制时,等待任意一个完成 + if (executing.size >= parallelism) { + await Promise.race(executing).catch(() => { }); + } + } + // 等待所有任务完成 + await Promise.allSettled([...executing]); + clearInterval(updatePercentInterval); + // 检查是否有错误 + if (errors.length > 0) { + throw errors[0]; + } + // 等待所有任务完成 + await Promise.all(executing); + // 调用分片成功回调(所有分片完成后) + if (onChunkSuccess) { + await onChunkSuccess(chunkInfo); + } + // 清理小程序环境下的临时文件 + if (process.env.OAK_PLATFORM === 'wechatMp' && typeof file === 'string') { + await (0, slice_1.cleanTempFiles)(chunks); + } + return; +} diff --git a/lib/utils/cos/s3.backend.d.ts b/lib/utils/cos/s3.backend.d.ts index e230f87f2..37323d4aa 100644 --- a/lib/utils/cos/s3.backend.d.ts +++ b/lib/utils/cos/s3.backend.d.ts @@ -18,12 +18,15 @@ export default class S3Backend extends S3 implements CosBackend { uploadId: string; chunkSize: number; partCount: number; - partSize: number; - parts: never[]; + parts: { + partNumber: number; + uploadUrl: string; + formData: {}; + }[]; }>; /** - * 完成分片上传后的合并操作 - */ +* 完成分片上传后的合并操作 +*/ mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC): Promise; abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC): Promise; } diff --git a/lib/utils/cos/s3.backend.js b/lib/utils/cos/s3.backend.js index 3d4f6c70a..aeb82c1a7 100644 --- a/lib/utils/cos/s3.backend.js +++ b/lib/utils/cos/s3.backend.js @@ -99,22 +99,38 @@ class S3Backend extends s3_1.default { } } async composeChunkUploadInfo(application, extraFile, context) { - throw new Exception_1.OakPreConditionUnsetException('S3暂不支持分片上传'); + const key = this.formKey(extraFile); + const { instance, config: s3Config } = this.getConfigAndInstance(application, extraFile.bucket); + const preInit = await instance.prepareMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo?.partCount, { + endpoint: s3Config.endpoint, + expiresIn: 30000, // 上传链接过期时间,单位秒 + }); return { - uploadId: '', - chunkSize: 0, - partCount: 0, - partSize: 0, - parts: [], + uploadId: preInit.uploadId, + chunkSize: extraFile.chunkInfo?.chunkSize, + partCount: preInit.parts.length, + parts: preInit.parts.map((part) => ({ + partNumber: part.partNumber, + uploadUrl: part.uploadUrl, + formData: {}, // S3不需要额外的formData + })), }; } /** - * 完成分片上传后的合并操作 - */ +* 完成分片上传后的合并操作 +*/ async mergeChunkedUpload(application, extraFile, context) { - // Implementation here + const key = this.formKey(extraFile); + const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket); + await instance.completeMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo.uploadId, extraFile.chunkInfo.parts.map((part) => ({ + partNumber: part.partNumber, + eTag: part.etag, + })), s3CosConfig.endpoint); } async abortMultipartUpload(application, extraFile, context) { + const key = this.formKey(extraFile); + const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket); + await instance.abortMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo.uploadId, s3CosConfig.endpoint); } } exports.default = S3Backend; diff --git a/lib/utils/cos/s3.d.ts b/lib/utils/cos/s3.d.ts index 31730aa42..b4c924e41 100644 --- a/lib/utils/cos/s3.d.ts +++ b/lib/utils/cos/s3.d.ts @@ -18,6 +18,10 @@ export default class S3 implements Cos { file: string | File; uploadToAspect?: UploadToAspect; getPercent?: Function; + parallelism?: number; + retryTimes?: number; + retryDelay?: number; + onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise; }): Promise; composeFileUrl(options: { application: Partial; diff --git a/lib/utils/cos/s3.js b/lib/utils/cos/s3.js index 6fd982a55..0daeba736 100644 --- a/lib/utils/cos/s3.js +++ b/lib/utils/cos/s3.js @@ -3,6 +3,7 @@ Object.defineProperty(exports, "__esModule", { value: true }); const assert_1 = require("oak-domain/lib/utils/assert"); const Exception_1 = require("../../types/Exception"); const Exception_2 = require("oak-domain/lib/types/Exception"); +const common_1 = require("./common"); class S3 { name = 's3'; autoInform() { @@ -29,30 +30,43 @@ class S3 { return `extraFile/${objectId}${extension ? '.' + extension : ''}`; } async upload(options) { - const { extraFile, uploadFn, file, uploadToAspect, getPercent } = options; + const { extraFile, uploadFn, file, getPercent, parallelism, retryTimes, retryDelay, onChunkSuccess } = options; const uploadMeta = extraFile.uploadMeta; - (0, assert_1.assert)(extraFile.enableChunkedUpload !== true, '暂不支持分片上传'); - let response; - try { - // S3 使用预签名 URL 直接上传,不需要额外的 formData - response = await uploadFn(file, 'file', uploadMeta.uploadUrl, {}, true, getPercent, extraFile.id, "PUT"); - } - catch (err) { - throw new Exception_2.OakNetworkException('网络异常,请求失败'); - } - let isSuccess = false; - if (process.env.OAK_PLATFORM === 'wechatMp') { - // 小程序端上传 - if (response.errMsg === 'uploadFile:ok') { - const statusCode = response.statusCode; - isSuccess = statusCode === 200 || statusCode === 204; - } + if (extraFile.enableChunkedUpload) { + return (0, common_1.chunkUpload)({ + extraFile, + uploadFn, + file, + getPercent, + parallelism: parallelism, + retryTimes: retryTimes, + retryDelay: retryDelay, + onChunkSuccess: onChunkSuccess, + }); } else { - isSuccess = response.status === 200 || response.status === 204; - } - if (isSuccess) { - return; + let response; + try { + // S3 使用预签名 URL 直接上传,不需要额外的 formData + response = await uploadFn(file, 'file', uploadMeta.uploadUrl, {}, true, getPercent, extraFile.id, "PUT"); + } + catch (err) { + throw new Exception_2.OakNetworkException('网络异常,请求失败'); + } + let isSuccess = false; + if (process.env.OAK_PLATFORM === 'wechatMp') { + // 小程序端上传 + if (response.errMsg === 'uploadFile:ok') { + const statusCode = response.statusCode; + isSuccess = statusCode === 200 || statusCode === 204; + } + } + else { + isSuccess = response.status === 200 || response.status === 204; + } + if (isSuccess) { + return; + } } throw new Exception_1.OakUploadException('文件上传S3失败'); } diff --git a/src/utils/cos/aliyun.ts b/src/utils/cos/aliyun.ts index 1449378c8..837ab9310 100644 --- a/src/utils/cos/aliyun.ts +++ b/src/utils/cos/aliyun.ts @@ -6,9 +6,8 @@ import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema'; import { AliYunUploadInfo } from '../../types/Upload'; import { ALiYunCosConfig, Protocol } from '../../types/Config'; import { OakUploadException } from '../../types/Exception'; -import { isOakException, -OakNetworkException,OakUserException } from 'oak-domain/lib/types/Exception'; -import { sliceFile, cleanTempFiles } from '../files/slice'; +import { OakNetworkException } from 'oak-domain/lib/types/Exception'; +import { chunkUpload } from './common'; export default class ALiYun implements Cos { name = 'aliyun'; @@ -40,168 +39,6 @@ export default class ALiYun implements Cos { return `extraFile/${objectId}${extension ? '.' + extension : ''}`; } - private async chunkUpload( - options: { - extraFile: OpSchema, - uploadFn: UploadFn, - file: string | File, - uploadToAspect?: UploadToAspect, - getPercent?: Function - // 分片上传时使用 - parallelism?: number // 并行线程数 - retryTimes?: number // 重试次数 - retryDelay?: number // 重试间隔,单位毫秒 - onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise // 每个分片上传成功的回调 - } - ) { - const { extraFile, uploadFn, file, uploadToAspect, getPercent, onChunkSuccess } = options; - const chunkInfo = extraFile.chunkInfo!; - const parallelism = options.parallelism || 5; - const retryTimes = options.retryTimes || 5; - const retryDelay = options.retryDelay || 1000; - - // 过滤出未完成的分片 - const pendingParts = chunkInfo.parts.filter(part => !part.etag); - - if (pendingParts.length === 0) { - return; // 所有分片已上传完成 - } - - // 将文件分片 - const chunks = await sliceFile(file, chunkInfo.chunkSize, chunkInfo.partCount); - - const everyPercent: { - [partNumber: number]: number - } = {} // 用于记录每个分片的进度百分比 - - const updateChunkPercent = (partNumber: number, percent: number) => { - everyPercent[partNumber] = percent; - } - - const updatePercentInterval = setInterval(() => { - if (getPercent) { - const totalPercent = Object.values(everyPercent).reduce((acc, val) => acc + val, 0) / chunkInfo.partCount; - getPercent(totalPercent); - } - }, 500); - - // 上传单个分片的函数,带重试 - const uploadPart = async (part: typeof chunkInfo.parts[0], chunk: File | Blob | string) => { - let lastError; - for (let attempt = 0; attempt <= retryTimes; attempt++) { - try { - const response = await uploadFn( - chunk, - 'file', - part.uploadUrl, - part.formData || {}, - true, - (percent: number) => { - // 更新每个分片的进度 - updateChunkPercent(part.partNumber, percent); - }, - `${extraFile.id}:${part.partNumber}`, - "PUT" - ); - - // 验证上传是否成功 - let isSuccess = false; - if (process.env.OAK_PLATFORM === 'wechatMp') { - if (response.errMsg === 'uploadFile:ok') { - const data = JSON.parse(response.data); - isSuccess = !!(data.status === 204 || data.status === 200); - } - } else { - isSuccess = !!(response.status === 200 || response.status === 204); - } - - if (isSuccess) { - // 标记该分片已完成 - part.etag = (response as Response).headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag") - assert(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`); - return; - } - - throw new OakUploadException(`分片 ${part.partNumber} 上传失败`); - } catch (err: any) { - console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err); - lastError = err; - // 如果是DomError,并且name是AbortError,说明是用户主动中止上传,不进行重试 - if (isOakException(err, OakUserException)) { - throw err; - } - if (attempt < retryTimes) { - // 等待后重试 - await new Promise(resolve => setTimeout(resolve, retryDelay)); - } - } - } - throw lastError || new OakUploadException(`分片 ${part.partNumber} 上传失败`); - }; - - // 并行上传控制 - const uploadTasks = pendingParts.map((part) => ({ - part, - chunk: chunks[part.partNumber - 1] - })); - - // 使用并发控制执行上传 - const executing: Set> = new Set(); - const errors: Error[] = []; - - for (const task of uploadTasks) { - let promise; - promise = (async () => { - try { - await uploadPart(task.part, task.chunk); - } catch (err) { - if (isOakException(err, OakUserException)) { - // 用户主动中止上传,抛到上层再处理 - console.log(`分片 ${task.part.partNumber} 上传被用户中止`); - } - errors.push(err as Error); - throw err; - } finally { - if (promise) { - executing.delete(promise); - } - } - })(); - - executing.add(promise); - - // 当达到并发限制时,等待任意一个完成 - if (executing.size >= parallelism) { - await Promise.race(executing).catch(() => { }); - } - } - - // 等待所有任务完成 - await Promise.allSettled([...executing]); - - clearInterval(updatePercentInterval); - - // 检查是否有错误 - if (errors.length > 0) { - throw errors[0]; - } - - // 等待所有任务完成 - await Promise.all(executing); - - // 调用分片成功回调(所有分片完成后) - if (onChunkSuccess) { - await onChunkSuccess(chunkInfo); - } - - // 清理小程序环境下的临时文件 - if (process.env.OAK_PLATFORM === 'wechatMp' && typeof file === 'string') { - await cleanTempFiles(chunks as string[]); - } - - return; - } - async upload( options: { extraFile: OpSchema, @@ -219,11 +56,10 @@ export default class ALiYun implements Cos { const { extraFile, uploadFn, file, uploadToAspect, getPercent, onChunkSuccess } = options; const uploadMeta = extraFile.uploadMeta! as AliYunUploadInfo; if (extraFile.enableChunkedUpload) { - return this.chunkUpload({ + return chunkUpload({ extraFile, uploadFn, file, - uploadToAspect, getPercent, parallelism: options.parallelism, retryTimes: options.retryTimes, diff --git a/src/utils/cos/common.ts b/src/utils/cos/common.ts new file mode 100644 index 000000000..f236abba0 --- /dev/null +++ b/src/utils/cos/common.ts @@ -0,0 +1,173 @@ +import { isOakException, OakUserException } from 'oak-domain/lib/types/Exception'; +import { sliceFile, cleanTempFiles } from '../files/slice'; +import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema'; +import { UploadFn } from "../../types/Cos"; +import { EntityDict } from '../../oak-app-domain'; +import assert from 'assert'; +import { OakUploadException } from '../../types/Exception'; + +/** + * 分片上传通用方法,适用于所有类S3存储服务,如AWS,MinIO、阿里云OSS等 + * @param options 参数 + * @return + */ +export async function chunkUpload( + options: { + extraFile: OpSchema, + uploadFn: UploadFn, + file: string | File, + getPercent?: Function + // 分片上传时使用 + parallelism?: number // 并行线程数 + retryTimes?: number // 重试次数 + retryDelay?: number // 重试间隔,单位毫秒 + onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise // 每个分片上传成功的回调 + } +) { + const { extraFile, uploadFn, file, getPercent, onChunkSuccess } = options; + const chunkInfo = extraFile.chunkInfo!; + const parallelism = options.parallelism || 5; + const retryTimes = options.retryTimes || 5; + const retryDelay = options.retryDelay || 1000; + + // 过滤出未完成的分片 + const pendingParts = chunkInfo.parts.filter(part => !part.etag); + + if (pendingParts.length === 0) { + return; // 所有分片已上传完成 + } + + // 将文件分片 + const chunks = await sliceFile(file, chunkInfo.chunkSize, chunkInfo.partCount); + + const everyPercent: { + [partNumber: number]: number + } = {} // 用于记录每个分片的进度百分比 + + const updateChunkPercent = (partNumber: number, percent: number) => { + everyPercent[partNumber] = percent; + } + + const updatePercentInterval = setInterval(() => { + if (getPercent) { + const totalPercent = Object.values(everyPercent).reduce((acc, val) => acc + val, 0) / chunkInfo.partCount; + getPercent(totalPercent); + } + }, 500); + + // 上传单个分片的函数,带重试 + const uploadPart = async (part: typeof chunkInfo.parts[0], chunk: File | Blob | string) => { + let lastError; + for (let attempt = 0; attempt <= retryTimes; attempt++) { + try { + const response = await uploadFn( + chunk, + 'file', + part.uploadUrl, + part.formData || {}, + true, + (percent: number) => { + // 更新每个分片的进度 + updateChunkPercent(part.partNumber, percent); + }, + `${extraFile.id}:${part.partNumber}`, + "PUT" + ); + + // 验证上传是否成功 + let isSuccess = false; + if (process.env.OAK_PLATFORM === 'wechatMp') { + if (response.errMsg === 'uploadFile:ok') { + const data = JSON.parse(response.data); + isSuccess = !!(data.status === 204 || data.status === 200); + } + } else { + isSuccess = !!(response.status === 200 || response.status === 204); + } + + if (isSuccess) { + // 标记该分片已完成 + part.etag = (response as Response).headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag") + assert(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`); + return; + } + + throw new OakUploadException(`分片 ${part.partNumber} 上传失败`); + } catch (err: any) { + console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err); + lastError = err; + // 如果是OakUserException说明是用户主动中止上传,不进行重试 + if (isOakException(err, OakUserException)) { + throw err; + } + if (attempt < retryTimes) { + // 等待后重试 + await new Promise(resolve => setTimeout(resolve, retryDelay)); + } + } + } + throw lastError || new OakUploadException(`分片 ${part.partNumber} 上传失败`); + }; + + // 并行上传控制 + const uploadTasks = pendingParts.map((part) => ({ + part, + chunk: chunks[part.partNumber - 1] + })); + + // 使用并发控制执行上传 + const executing: Set> = new Set(); + const errors: Error[] = []; + + for (const task of uploadTasks) { + let promise; + promise = (async () => { + try { + await uploadPart(task.part, task.chunk); + } catch (err) { + if (isOakException(err, OakUserException)) { + // 用户主动中止上传,抛到上层再处理 + console.log(`分片 ${task.part.partNumber} 上传被用户中止`); + } + errors.push(err as Error); + throw err; + } finally { + if (promise) { + executing.delete(promise); + } + } + })(); + + executing.add(promise); + + // 当达到并发限制时,等待任意一个完成 + if (executing.size >= parallelism) { + await Promise.race(executing).catch(() => { }); + } + } + + // 等待所有任务完成 + await Promise.allSettled([...executing]); + + clearInterval(updatePercentInterval); + + // 检查是否有错误 + if (errors.length > 0) { + throw errors[0]; + } + + // 等待所有任务完成 + await Promise.all(executing); + + // 调用分片成功回调(所有分片完成后) + if (onChunkSuccess) { + await onChunkSuccess(chunkInfo); + } + + // 清理小程序环境下的临时文件 + if (process.env.OAK_PLATFORM === 'wechatMp' && typeof file === 'string') { + await cleanTempFiles(chunks as string[]); + } + + return; +} diff --git a/src/utils/cos/s3.backend.ts b/src/utils/cos/s3.backend.ts index 8a02ae71c..623baccb2 100644 --- a/src/utils/cos/s3.backend.ts +++ b/src/utils/cos/s3.backend.ts @@ -180,26 +180,54 @@ export default class S3Backend extends S3 implements CosBackend { extraFile: OpSchema, context: BRC, ) { - throw new OakPreConditionUnsetException('S3暂不支持分片上传'); + const key = this.formKey(extraFile); + const { instance, config: s3Config } = + this.getConfigAndInstance(application, extraFile.bucket!); + + const preInit = await instance.prepareMultipartUpload( + extraFile.bucket!, + key, + extraFile.chunkInfo?.partCount!, + { + endpoint: s3Config.endpoint, + expiresIn: 30000, // 上传链接过期时间,单位秒 + } + ) return { - uploadId: '', - chunkSize: 0, - partCount: 0, - partSize: 0, - parts: [], - } + uploadId: preInit.uploadId, + chunkSize: extraFile.chunkInfo?.chunkSize!, + partCount: preInit.parts.length, + parts: preInit.parts.map((part) => ({ + partNumber: part.partNumber, + uploadUrl: part.uploadUrl, + formData: {}, // S3不需要额外的formData + })), + }; } /** - * 完成分片上传后的合并操作 - */ +* 完成分片上传后的合并操作 +*/ async mergeChunkedUpload( application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC, ): Promise { - // Implementation here + const key = this.formKey(extraFile); + const { instance, config: s3CosConfig } = + this.getConfigAndInstance(application, extraFile.bucket!); + + await instance.completeMultipartUpload( + extraFile.bucket!, + key, + extraFile.chunkInfo!.uploadId!, + extraFile.chunkInfo!.parts!.map((part) => ({ + partNumber: part.partNumber, + eTag: part.etag!, + })), + s3CosConfig.endpoint! + ); } async abortMultipartUpload( @@ -207,6 +235,15 @@ export default class S3Backend extends S3 implements CosBackend { extraFile: OpSchema, context: BRC, ): Promise { + const key = this.formKey(extraFile); + const { instance, config: s3CosConfig } = + this.getConfigAndInstance(application, extraFile.bucket!); + await instance.abortMultipartUpload( + extraFile.bucket!, + key, + extraFile.chunkInfo!.uploadId!, + s3CosConfig.endpoint! + ); } } \ No newline at end of file diff --git a/src/utils/cos/s3.ts b/src/utils/cos/s3.ts index dc5e6fb8f..8b25b6509 100644 --- a/src/utils/cos/s3.ts +++ b/src/utils/cos/s3.ts @@ -7,6 +7,7 @@ import { S3UploadInfo } from '../../types/Upload'; import { S3CosConfig, Protocol } from '../../types/Config'; import { OakUploadException } from '../../types/Exception'; import { OakNetworkException } from 'oak-domain/lib/types/Exception'; +import { chunkUpload } from './common'; export default class S3 implements Cos { name = 's3'; @@ -47,42 +48,60 @@ export default class S3 implements Cos { file: string | File, uploadToAspect?: UploadToAspect, getPercent?: Function + // 分片上传时使用 + parallelism?: number // 并行线程数 + retryTimes?: number // 重试次数 + retryDelay?: number // 重试间隔,单位毫秒 + onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise // 每个分片上传成功的回调 } ) { - const { extraFile, uploadFn, file, uploadToAspect, getPercent } = options; + const { extraFile, uploadFn, file, getPercent, parallelism, retryTimes, retryDelay, onChunkSuccess } = options; const uploadMeta = extraFile.uploadMeta! as S3UploadInfo; - assert(extraFile.enableChunkedUpload !== true, '暂不支持分片上传'); - let response; - try { - // S3 使用预签名 URL 直接上传,不需要额外的 formData - response = await uploadFn( + + if (extraFile.enableChunkedUpload) { + return chunkUpload({ + extraFile, + uploadFn, file, - 'file', - uploadMeta.uploadUrl, - {}, - true, getPercent, - extraFile.id, - "PUT" - ); - } catch (err) { - throw new OakNetworkException('网络异常,请求失败'); - } - - let isSuccess = false; - if (process.env.OAK_PLATFORM === 'wechatMp') { - // 小程序端上传 - if (response.errMsg === 'uploadFile:ok') { - const statusCode = response.statusCode; - isSuccess = statusCode === 200 || statusCode === 204; - } + parallelism: parallelism, + retryTimes: retryTimes, + retryDelay: retryDelay, + onChunkSuccess: onChunkSuccess, + }); } else { - isSuccess = response.status === 200 || response.status === 204; - } + let response; + try { + // S3 使用预签名 URL 直接上传,不需要额外的 formData + response = await uploadFn( + file, + 'file', + uploadMeta.uploadUrl, + {}, + true, + getPercent, + extraFile.id, + "PUT" + ); + } catch (err) { + throw new OakNetworkException('网络异常,请求失败'); + } - if (isSuccess) { - return; + let isSuccess = false; + if (process.env.OAK_PLATFORM === 'wechatMp') { + // 小程序端上传 + if (response.errMsg === 'uploadFile:ok') { + const statusCode = response.statusCode; + isSuccess = statusCode === 200 || statusCode === 204; + } + } else { + isSuccess = response.status === 200 || response.status === 204; + } + + if (isSuccess) { + return; + } } throw new OakUploadException('文件上传S3失败'); } @@ -127,4 +146,4 @@ export default class S3 implements Cos { } return ''; } -} \ No newline at end of file +} \ No newline at end of file