feat: 新增S3实现分片上传等功能

This commit is contained in:
Pan Qiancheng 2025-12-26 13:30:07 +08:00
parent d5a09546dd
commit 10077f150e
22 changed files with 717 additions and 524 deletions

View File

@ -1,2 +1,2 @@
declare const _default: (import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatLogin", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "address", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "application", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "article", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "articleMenu", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "extraFile", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "user", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "userEntityGrant", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatQrCode", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "message", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "notification", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "parasite", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "sessionMessage", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatMenu", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatPublicTag", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatMpJump", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "system", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "passport", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthApplication", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthProvider", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthUser", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthUserAuthorization", import("../context/BackendRuntimeContext").BackendRuntimeContext<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "mobile", import("..").BRC<import("../oak-app-domain").EntityDict>>)[];
declare const _default: (import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "extraFile", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthUser", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "application", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "address", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "user", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "userEntityGrant", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatQrCode", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "message", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "notification", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatLogin", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "articleMenu", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "article", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "parasite", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "sessionMessage", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatMenu", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatPublicTag", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatMpJump", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "system", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "passport", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthApplication", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthProvider", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthUserAuthorization", import("../context/BackendRuntimeContext").BackendRuntimeContext<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "mobile", import("..").BRC<import("../oak-app-domain").EntityDict>>)[];
export default _default;

View File

@ -10,7 +10,6 @@ export default class ALiYun implements Cos<EntityDict> {
account: import("../../types/Config").AliCloudConfig;
};
protected formKey(extraFile: Partial<OpSchema>): string;
private chunkUpload;
upload(options: {
extraFile: OpSchema;
uploadFn: UploadFn;

View File

@ -1,7 +1,7 @@
import { assert } from 'oak-domain/lib/utils/assert';
import { OakUploadException } from '../../types/Exception';
import { isOakException, OakNetworkException, OakUserException } from 'oak-domain/lib/types/Exception';
import { sliceFile, cleanTempFiles } from '../files/slice';
import { OakNetworkException } from 'oak-domain/lib/types/Exception';
import { chunkUpload } from './common';
export default class ALiYun {
name = 'aliyun';
autoInform() {
@ -25,134 +25,14 @@ export default class ALiYun {
assert(objectId);
return `extraFile/${objectId}${extension ? '.' + extension : ''}`;
}
async chunkUpload(options) {
const { extraFile, uploadFn, file, uploadToAspect, getPercent, onChunkSuccess } = options;
const chunkInfo = extraFile.chunkInfo;
const parallelism = options.parallelism || 5;
const retryTimes = options.retryTimes || 5;
const retryDelay = options.retryDelay || 1000;
// 过滤出未完成的分片
const pendingParts = chunkInfo.parts.filter(part => !part.etag);
if (pendingParts.length === 0) {
return; // 所有分片已上传完成
}
// 将文件分片
const chunks = await sliceFile(file, chunkInfo.chunkSize, chunkInfo.partCount);
const everyPercent = {}; // 用于记录每个分片的进度百分比
const updateChunkPercent = (partNumber, percent) => {
everyPercent[partNumber] = percent;
};
const updatePercentInterval = setInterval(() => {
if (getPercent) {
const totalPercent = Object.values(everyPercent).reduce((acc, val) => acc + val, 0) / chunkInfo.partCount;
getPercent(totalPercent);
}
}, 500);
// 上传单个分片的函数,带重试
const uploadPart = async (part, chunk) => {
let lastError;
for (let attempt = 0; attempt <= retryTimes; attempt++) {
try {
const response = await uploadFn(chunk, 'file', part.uploadUrl, part.formData || {}, true, (percent) => {
// 更新每个分片的进度
updateChunkPercent(part.partNumber, percent);
}, `${extraFile.id}:${part.partNumber}`, "PUT");
// 验证上传是否成功
let isSuccess = false;
if (process.env.OAK_PLATFORM === 'wechatMp') {
if (response.errMsg === 'uploadFile:ok') {
const data = JSON.parse(response.data);
isSuccess = !!(data.status === 204 || data.status === 200);
}
}
else {
isSuccess = !!(response.status === 200 || response.status === 204);
}
if (isSuccess) {
// 标记该分片已完成
part.etag = response.headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag");
assert(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`);
return;
}
throw new OakUploadException(`分片 ${part.partNumber} 上传失败`);
}
catch (err) {
console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err);
lastError = err;
// 如果是DomError并且name是AbortError说明是用户主动中止上传不进行重试
if (isOakException(err, OakUserException)) {
throw err;
}
if (attempt < retryTimes) {
// 等待后重试
await new Promise(resolve => setTimeout(resolve, retryDelay));
}
}
}
throw lastError || new OakUploadException(`分片 ${part.partNumber} 上传失败`);
};
// 并行上传控制
const uploadTasks = pendingParts.map((part) => ({
part,
chunk: chunks[part.partNumber - 1]
}));
// 使用并发控制执行上传
const executing = new Set();
const errors = [];
for (const task of uploadTasks) {
let promise;
promise = (async () => {
try {
await uploadPart(task.part, task.chunk);
}
catch (err) {
if (isOakException(err, OakUserException)) {
// 用户主动中止上传,抛到上层再处理
console.log(`分片 ${task.part.partNumber} 上传被用户中止`);
}
errors.push(err);
throw err;
}
finally {
if (promise) {
executing.delete(promise);
}
}
})();
executing.add(promise);
// 当达到并发限制时,等待任意一个完成
if (executing.size >= parallelism) {
await Promise.race(executing).catch(() => { });
}
}
// 等待所有任务完成
await Promise.allSettled([...executing]);
clearInterval(updatePercentInterval);
// 检查是否有错误
if (errors.length > 0) {
throw errors[0];
}
// 等待所有任务完成
await Promise.all(executing);
// 调用分片成功回调(所有分片完成后)
if (onChunkSuccess) {
await onChunkSuccess(chunkInfo);
}
// 清理小程序环境下的临时文件
if (process.env.OAK_PLATFORM === 'wechatMp' && typeof file === 'string') {
await cleanTempFiles(chunks);
}
return;
}
async upload(options) {
const { extraFile, uploadFn, file, uploadToAspect, getPercent, onChunkSuccess } = options;
const uploadMeta = extraFile.uploadMeta;
if (extraFile.enableChunkedUpload) {
return this.chunkUpload({
return chunkUpload({
extraFile,
uploadFn,
file,
uploadToAspect,
getPercent,
parallelism: options.parallelism,
retryTimes: options.retryTimes,

18
es/utils/cos/common.d.ts vendored Normal file
View File

@ -0,0 +1,18 @@
import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema';
import { UploadFn } from "../../types/Cos";
import { EntityDict } from '../../oak-app-domain';
/**
* S3存储服务AWSMinIOOSS等
* @param options
* @return
*/
export declare function chunkUpload(options: {
extraFile: OpSchema;
uploadFn: UploadFn;
file: string | File;
getPercent?: Function;
parallelism?: number;
retryTimes?: number;
retryDelay?: number;
onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise<void>;
}): Promise<void>;

128
es/utils/cos/common.js Normal file
View File

@ -0,0 +1,128 @@
import { isOakException, OakUserException } from 'oak-domain/lib/types/Exception';
import { sliceFile, cleanTempFiles } from '../files/slice';
import assert from 'assert';
import { OakUploadException } from '../../types/Exception';
/**
* 分片上传通用方法适用于所有类S3存储服务如AWSMinIO阿里云OSS等
* @param options 参数
* @return
*/
export async function chunkUpload(options) {
const { extraFile, uploadFn, file, getPercent, onChunkSuccess } = options;
const chunkInfo = extraFile.chunkInfo;
const parallelism = options.parallelism || 5;
const retryTimes = options.retryTimes || 5;
const retryDelay = options.retryDelay || 1000;
// 过滤出未完成的分片
const pendingParts = chunkInfo.parts.filter(part => !part.etag);
if (pendingParts.length === 0) {
return; // 所有分片已上传完成
}
// 将文件分片
const chunks = await sliceFile(file, chunkInfo.chunkSize, chunkInfo.partCount);
const everyPercent = {}; // 用于记录每个分片的进度百分比
const updateChunkPercent = (partNumber, percent) => {
everyPercent[partNumber] = percent;
};
const updatePercentInterval = setInterval(() => {
if (getPercent) {
const totalPercent = Object.values(everyPercent).reduce((acc, val) => acc + val, 0) / chunkInfo.partCount;
getPercent(totalPercent);
}
}, 500);
// 上传单个分片的函数,带重试
const uploadPart = async (part, chunk) => {
let lastError;
for (let attempt = 0; attempt <= retryTimes; attempt++) {
try {
const response = await uploadFn(chunk, 'file', part.uploadUrl, part.formData || {}, true, (percent) => {
// 更新每个分片的进度
updateChunkPercent(part.partNumber, percent);
}, `${extraFile.id}:${part.partNumber}`, "PUT");
// 验证上传是否成功
let isSuccess = false;
if (process.env.OAK_PLATFORM === 'wechatMp') {
if (response.errMsg === 'uploadFile:ok') {
const data = JSON.parse(response.data);
isSuccess = !!(data.status === 204 || data.status === 200);
}
}
else {
isSuccess = !!(response.status === 200 || response.status === 204);
}
if (isSuccess) {
// 标记该分片已完成
part.etag = response.headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag");
assert(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`);
return;
}
throw new OakUploadException(`分片 ${part.partNumber} 上传失败`);
}
catch (err) {
console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err);
lastError = err;
// 如果是OakUserException说明是用户主动中止上传不进行重试
if (isOakException(err, OakUserException)) {
throw err;
}
if (attempt < retryTimes) {
// 等待后重试
await new Promise(resolve => setTimeout(resolve, retryDelay));
}
}
}
throw lastError || new OakUploadException(`分片 ${part.partNumber} 上传失败`);
};
// 并行上传控制
const uploadTasks = pendingParts.map((part) => ({
part,
chunk: chunks[part.partNumber - 1]
}));
// 使用并发控制执行上传
const executing = new Set();
const errors = [];
for (const task of uploadTasks) {
let promise;
promise = (async () => {
try {
await uploadPart(task.part, task.chunk);
}
catch (err) {
if (isOakException(err, OakUserException)) {
// 用户主动中止上传,抛到上层再处理
console.log(`分片 ${task.part.partNumber} 上传被用户中止`);
}
errors.push(err);
throw err;
}
finally {
if (promise) {
executing.delete(promise);
}
}
})();
executing.add(promise);
// 当达到并发限制时,等待任意一个完成
if (executing.size >= parallelism) {
await Promise.race(executing).catch(() => { });
}
}
// 等待所有任务完成
await Promise.allSettled([...executing]);
clearInterval(updatePercentInterval);
// 检查是否有错误
if (errors.length > 0) {
throw errors[0];
}
// 等待所有任务完成
await Promise.all(executing);
// 调用分片成功回调(所有分片完成后)
if (onChunkSuccess) {
await onChunkSuccess(chunkInfo);
}
// 清理小程序环境下的临时文件
if (process.env.OAK_PLATFORM === 'wechatMp' && typeof file === 'string') {
await cleanTempFiles(chunks);
}
return;
}

View File

@ -18,12 +18,15 @@ export default class S3Backend extends S3 implements CosBackend<EntityDict> {
uploadId: string;
chunkSize: number;
partCount: number;
partSize: number;
parts: never[];
parts: {
partNumber: number;
uploadUrl: string;
formData: {};
}[];
}>;
/**
*
*/
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
}

View File

@ -1,7 +1,7 @@
import { assert } from 'oak-domain/lib/utils/assert';
import S3 from './s3';
import { S3SDK } from 'oak-external-sdk';
import { OakExternalException, OakPreConditionUnsetException } from 'oak-domain/lib/types/Exception';
import { OakExternalException } from 'oak-domain/lib/types/Exception';
export default class S3Backend extends S3 {
getConfigAndInstance(application, bucket) {
const { config, account, endpoint, defaultBucket } = this.getConfig(application);
@ -96,21 +96,37 @@ export default class S3Backend extends S3 {
}
}
async composeChunkUploadInfo(application, extraFile, context) {
throw new OakPreConditionUnsetException('S3暂不支持分片上传');
const key = this.formKey(extraFile);
const { instance, config: s3Config } = this.getConfigAndInstance(application, extraFile.bucket);
const preInit = await instance.prepareMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo?.partCount, {
endpoint: s3Config.endpoint,
expiresIn: 30000, // 上传链接过期时间,单位秒
});
return {
uploadId: '',
chunkSize: 0,
partCount: 0,
partSize: 0,
parts: [],
uploadId: preInit.uploadId,
chunkSize: extraFile.chunkInfo?.chunkSize,
partCount: preInit.parts.length,
parts: preInit.parts.map((part) => ({
partNumber: part.partNumber,
uploadUrl: part.uploadUrl,
formData: {}, // S3不需要额外的formData
})),
};
}
/**
* 完成分片上传后的合并操作
*/
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
// Implementation here
const key = this.formKey(extraFile);
const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket);
await instance.completeMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo.uploadId, extraFile.chunkInfo.parts.map((part) => ({
partNumber: part.partNumber,
eTag: part.etag,
})), s3CosConfig.endpoint);
}
async abortMultipartUpload(application, extraFile, context) {
const key = this.formKey(extraFile);
const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket);
await instance.abortMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo.uploadId, s3CosConfig.endpoint);
}
}

View File

@ -18,6 +18,10 @@ export default class S3 implements Cos<EntityDict> {
file: string | File;
uploadToAspect?: UploadToAspect;
getPercent?: Function;
parallelism?: number;
retryTimes?: number;
retryDelay?: number;
onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise<void>;
}): Promise<void>;
composeFileUrl(options: {
application: Partial<EntityDict['application']['Schema']>;

View File

@ -1,6 +1,7 @@
import { assert } from 'oak-domain/lib/utils/assert';
import { OakUploadException } from '../../types/Exception';
import { OakNetworkException } from 'oak-domain/lib/types/Exception';
import { chunkUpload } from './common';
export default class S3 {
name = 's3';
autoInform() {
@ -27,30 +28,43 @@ export default class S3 {
return `extraFile/${objectId}${extension ? '.' + extension : ''}`;
}
async upload(options) {
const { extraFile, uploadFn, file, uploadToAspect, getPercent } = options;
const { extraFile, uploadFn, file, getPercent, parallelism, retryTimes, retryDelay, onChunkSuccess } = options;
const uploadMeta = extraFile.uploadMeta;
assert(extraFile.enableChunkedUpload !== true, '暂不支持分片上传');
let response;
try {
// S3 使用预签名 URL 直接上传,不需要额外的 formData
response = await uploadFn(file, 'file', uploadMeta.uploadUrl, {}, true, getPercent, extraFile.id, "PUT");
}
catch (err) {
throw new OakNetworkException('网络异常,请求失败');
}
let isSuccess = false;
if (process.env.OAK_PLATFORM === 'wechatMp') {
// 小程序端上传
if (response.errMsg === 'uploadFile:ok') {
const statusCode = response.statusCode;
isSuccess = statusCode === 200 || statusCode === 204;
}
if (extraFile.enableChunkedUpload) {
return chunkUpload({
extraFile,
uploadFn,
file,
getPercent,
parallelism: parallelism,
retryTimes: retryTimes,
retryDelay: retryDelay,
onChunkSuccess: onChunkSuccess,
});
}
else {
isSuccess = response.status === 200 || response.status === 204;
}
if (isSuccess) {
return;
let response;
try {
// S3 使用预签名 URL 直接上传,不需要额外的 formData
response = await uploadFn(file, 'file', uploadMeta.uploadUrl, {}, true, getPercent, extraFile.id, "PUT");
}
catch (err) {
throw new OakNetworkException('网络异常,请求失败');
}
let isSuccess = false;
if (process.env.OAK_PLATFORM === 'wechatMp') {
// 小程序端上传
if (response.errMsg === 'uploadFile:ok') {
const statusCode = response.statusCode;
isSuccess = statusCode === 200 || statusCode === 204;
}
}
else {
isSuccess = response.status === 200 || response.status === 204;
}
if (isSuccess) {
return;
}
}
throw new OakUploadException('文件上传S3失败');
}

View File

@ -1,2 +1,2 @@
declare const _default: (import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "address", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "application", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "article", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "articleMenu", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "extraFile", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "user", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "userEntityGrant", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatQrCode", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "message", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "notification", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatLogin", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "parasite", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "sessionMessage", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatMenu", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatPublicTag", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatMpJump", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "system", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "passport", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthApplication", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthProvider", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthUser", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthUserAuthorization", import("../context/BackendRuntimeContext").BackendRuntimeContext<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "mobile", import("..").BRC<import("../oak-app-domain").EntityDict>>)[];
declare const _default: (import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "extraFile", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthUser", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "application", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "address", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "user", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "userEntityGrant", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatQrCode", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "message", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "notification", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatLogin", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "articleMenu", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "article", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "parasite", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "sessionMessage", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatMenu", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatPublicTag", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "wechatMpJump", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "system", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "passport", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthApplication", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthProvider", import("..").BRC<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "oauthUserAuthorization", import("../context/BackendRuntimeContext").BackendRuntimeContext<import("../oak-app-domain").EntityDict>> | import("oak-domain/lib/types").Trigger<import("../oak-app-domain").EntityDict, "mobile", import("..").BRC<import("../oak-app-domain").EntityDict>>)[];
export default _default;

View File

@ -10,7 +10,6 @@ export default class ALiYun implements Cos<EntityDict> {
account: import("../../types/Config").AliCloudConfig;
};
protected formKey(extraFile: Partial<OpSchema>): string;
private chunkUpload;
upload(options: {
extraFile: OpSchema;
uploadFn: UploadFn;

View File

@ -3,7 +3,7 @@ Object.defineProperty(exports, "__esModule", { value: true });
const assert_1 = require("oak-domain/lib/utils/assert");
const Exception_1 = require("../../types/Exception");
const Exception_2 = require("oak-domain/lib/types/Exception");
const slice_1 = require("../files/slice");
const common_1 = require("./common");
class ALiYun {
name = 'aliyun';
autoInform() {
@ -27,134 +27,14 @@ class ALiYun {
(0, assert_1.assert)(objectId);
return `extraFile/${objectId}${extension ? '.' + extension : ''}`;
}
async chunkUpload(options) {
const { extraFile, uploadFn, file, uploadToAspect, getPercent, onChunkSuccess } = options;
const chunkInfo = extraFile.chunkInfo;
const parallelism = options.parallelism || 5;
const retryTimes = options.retryTimes || 5;
const retryDelay = options.retryDelay || 1000;
// 过滤出未完成的分片
const pendingParts = chunkInfo.parts.filter(part => !part.etag);
if (pendingParts.length === 0) {
return; // 所有分片已上传完成
}
// 将文件分片
const chunks = await (0, slice_1.sliceFile)(file, chunkInfo.chunkSize, chunkInfo.partCount);
const everyPercent = {}; // 用于记录每个分片的进度百分比
const updateChunkPercent = (partNumber, percent) => {
everyPercent[partNumber] = percent;
};
const updatePercentInterval = setInterval(() => {
if (getPercent) {
const totalPercent = Object.values(everyPercent).reduce((acc, val) => acc + val, 0) / chunkInfo.partCount;
getPercent(totalPercent);
}
}, 500);
// 上传单个分片的函数,带重试
const uploadPart = async (part, chunk) => {
let lastError;
for (let attempt = 0; attempt <= retryTimes; attempt++) {
try {
const response = await uploadFn(chunk, 'file', part.uploadUrl, part.formData || {}, true, (percent) => {
// 更新每个分片的进度
updateChunkPercent(part.partNumber, percent);
}, `${extraFile.id}:${part.partNumber}`, "PUT");
// 验证上传是否成功
let isSuccess = false;
if (process.env.OAK_PLATFORM === 'wechatMp') {
if (response.errMsg === 'uploadFile:ok') {
const data = JSON.parse(response.data);
isSuccess = !!(data.status === 204 || data.status === 200);
}
}
else {
isSuccess = !!(response.status === 200 || response.status === 204);
}
if (isSuccess) {
// 标记该分片已完成
part.etag = response.headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag");
(0, assert_1.assert)(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`);
return;
}
throw new Exception_1.OakUploadException(`分片 ${part.partNumber} 上传失败`);
}
catch (err) {
console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err);
lastError = err;
// 如果是DomError并且name是AbortError说明是用户主动中止上传不进行重试
if ((0, Exception_2.isOakException)(err, Exception_2.OakUserException)) {
throw err;
}
if (attempt < retryTimes) {
// 等待后重试
await new Promise(resolve => setTimeout(resolve, retryDelay));
}
}
}
throw lastError || new Exception_1.OakUploadException(`分片 ${part.partNumber} 上传失败`);
};
// 并行上传控制
const uploadTasks = pendingParts.map((part) => ({
part,
chunk: chunks[part.partNumber - 1]
}));
// 使用并发控制执行上传
const executing = new Set();
const errors = [];
for (const task of uploadTasks) {
let promise;
promise = (async () => {
try {
await uploadPart(task.part, task.chunk);
}
catch (err) {
if ((0, Exception_2.isOakException)(err, Exception_2.OakUserException)) {
// 用户主动中止上传,抛到上层再处理
console.log(`分片 ${task.part.partNumber} 上传被用户中止`);
}
errors.push(err);
throw err;
}
finally {
if (promise) {
executing.delete(promise);
}
}
})();
executing.add(promise);
// 当达到并发限制时,等待任意一个完成
if (executing.size >= parallelism) {
await Promise.race(executing).catch(() => { });
}
}
// 等待所有任务完成
await Promise.allSettled([...executing]);
clearInterval(updatePercentInterval);
// 检查是否有错误
if (errors.length > 0) {
throw errors[0];
}
// 等待所有任务完成
await Promise.all(executing);
// 调用分片成功回调(所有分片完成后)
if (onChunkSuccess) {
await onChunkSuccess(chunkInfo);
}
// 清理小程序环境下的临时文件
if (process.env.OAK_PLATFORM === 'wechatMp' && typeof file === 'string') {
await (0, slice_1.cleanTempFiles)(chunks);
}
return;
}
async upload(options) {
const { extraFile, uploadFn, file, uploadToAspect, getPercent, onChunkSuccess } = options;
const uploadMeta = extraFile.uploadMeta;
if (extraFile.enableChunkedUpload) {
return this.chunkUpload({
return (0, common_1.chunkUpload)({
extraFile,
uploadFn,
file,
uploadToAspect,
getPercent,
parallelism: options.parallelism,
retryTimes: options.retryTimes,

18
lib/utils/cos/common.d.ts vendored Normal file
View File

@ -0,0 +1,18 @@
import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema';
import { UploadFn } from "../../types/Cos";
import { EntityDict } from '../../oak-app-domain';
/**
* S3存储服务AWSMinIOOSS等
* @param options
* @return
*/
export declare function chunkUpload(options: {
extraFile: OpSchema;
uploadFn: UploadFn;
file: string | File;
getPercent?: Function;
parallelism?: number;
retryTimes?: number;
retryDelay?: number;
onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise<void>;
}): Promise<void>;

132
lib/utils/cos/common.js Normal file
View File

@ -0,0 +1,132 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.chunkUpload = chunkUpload;
const tslib_1 = require("tslib");
const Exception_1 = require("oak-domain/lib/types/Exception");
const slice_1 = require("../files/slice");
const assert_1 = tslib_1.__importDefault(require("assert"));
const Exception_2 = require("../../types/Exception");
/**
* 分片上传通用方法适用于所有类S3存储服务如AWSMinIO阿里云OSS等
* @param options 参数
* @return
*/
async function chunkUpload(options) {
const { extraFile, uploadFn, file, getPercent, onChunkSuccess } = options;
const chunkInfo = extraFile.chunkInfo;
const parallelism = options.parallelism || 5;
const retryTimes = options.retryTimes || 5;
const retryDelay = options.retryDelay || 1000;
// 过滤出未完成的分片
const pendingParts = chunkInfo.parts.filter(part => !part.etag);
if (pendingParts.length === 0) {
return; // 所有分片已上传完成
}
// 将文件分片
const chunks = await (0, slice_1.sliceFile)(file, chunkInfo.chunkSize, chunkInfo.partCount);
const everyPercent = {}; // 用于记录每个分片的进度百分比
const updateChunkPercent = (partNumber, percent) => {
everyPercent[partNumber] = percent;
};
const updatePercentInterval = setInterval(() => {
if (getPercent) {
const totalPercent = Object.values(everyPercent).reduce((acc, val) => acc + val, 0) / chunkInfo.partCount;
getPercent(totalPercent);
}
}, 500);
// 上传单个分片的函数,带重试
const uploadPart = async (part, chunk) => {
let lastError;
for (let attempt = 0; attempt <= retryTimes; attempt++) {
try {
const response = await uploadFn(chunk, 'file', part.uploadUrl, part.formData || {}, true, (percent) => {
// 更新每个分片的进度
updateChunkPercent(part.partNumber, percent);
}, `${extraFile.id}:${part.partNumber}`, "PUT");
// 验证上传是否成功
let isSuccess = false;
if (process.env.OAK_PLATFORM === 'wechatMp') {
if (response.errMsg === 'uploadFile:ok') {
const data = JSON.parse(response.data);
isSuccess = !!(data.status === 204 || data.status === 200);
}
}
else {
isSuccess = !!(response.status === 200 || response.status === 204);
}
if (isSuccess) {
// 标记该分片已完成
part.etag = response.headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag");
(0, assert_1.default)(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`);
return;
}
throw new Exception_2.OakUploadException(`分片 ${part.partNumber} 上传失败`);
}
catch (err) {
console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err);
lastError = err;
// 如果是OakUserException说明是用户主动中止上传不进行重试
if ((0, Exception_1.isOakException)(err, Exception_1.OakUserException)) {
throw err;
}
if (attempt < retryTimes) {
// 等待后重试
await new Promise(resolve => setTimeout(resolve, retryDelay));
}
}
}
throw lastError || new Exception_2.OakUploadException(`分片 ${part.partNumber} 上传失败`);
};
// 并行上传控制
const uploadTasks = pendingParts.map((part) => ({
part,
chunk: chunks[part.partNumber - 1]
}));
// 使用并发控制执行上传
const executing = new Set();
const errors = [];
for (const task of uploadTasks) {
let promise;
promise = (async () => {
try {
await uploadPart(task.part, task.chunk);
}
catch (err) {
if ((0, Exception_1.isOakException)(err, Exception_1.OakUserException)) {
// 用户主动中止上传,抛到上层再处理
console.log(`分片 ${task.part.partNumber} 上传被用户中止`);
}
errors.push(err);
throw err;
}
finally {
if (promise) {
executing.delete(promise);
}
}
})();
executing.add(promise);
// 当达到并发限制时,等待任意一个完成
if (executing.size >= parallelism) {
await Promise.race(executing).catch(() => { });
}
}
// 等待所有任务完成
await Promise.allSettled([...executing]);
clearInterval(updatePercentInterval);
// 检查是否有错误
if (errors.length > 0) {
throw errors[0];
}
// 等待所有任务完成
await Promise.all(executing);
// 调用分片成功回调(所有分片完成后)
if (onChunkSuccess) {
await onChunkSuccess(chunkInfo);
}
// 清理小程序环境下的临时文件
if (process.env.OAK_PLATFORM === 'wechatMp' && typeof file === 'string') {
await (0, slice_1.cleanTempFiles)(chunks);
}
return;
}

View File

@ -18,12 +18,15 @@ export default class S3Backend extends S3 implements CosBackend<EntityDict> {
uploadId: string;
chunkSize: number;
partCount: number;
partSize: number;
parts: never[];
parts: {
partNumber: number;
uploadUrl: string;
formData: {};
}[];
}>;
/**
*
*/
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
}

View File

@ -99,22 +99,38 @@ class S3Backend extends s3_1.default {
}
}
async composeChunkUploadInfo(application, extraFile, context) {
throw new Exception_1.OakPreConditionUnsetException('S3暂不支持分片上传');
const key = this.formKey(extraFile);
const { instance, config: s3Config } = this.getConfigAndInstance(application, extraFile.bucket);
const preInit = await instance.prepareMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo?.partCount, {
endpoint: s3Config.endpoint,
expiresIn: 30000, // 上传链接过期时间,单位秒
});
return {
uploadId: '',
chunkSize: 0,
partCount: 0,
partSize: 0,
parts: [],
uploadId: preInit.uploadId,
chunkSize: extraFile.chunkInfo?.chunkSize,
partCount: preInit.parts.length,
parts: preInit.parts.map((part) => ({
partNumber: part.partNumber,
uploadUrl: part.uploadUrl,
formData: {}, // S3不需要额外的formData
})),
};
}
/**
* 完成分片上传后的合并操作
*/
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
// Implementation here
const key = this.formKey(extraFile);
const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket);
await instance.completeMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo.uploadId, extraFile.chunkInfo.parts.map((part) => ({
partNumber: part.partNumber,
eTag: part.etag,
})), s3CosConfig.endpoint);
}
async abortMultipartUpload(application, extraFile, context) {
const key = this.formKey(extraFile);
const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket);
await instance.abortMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo.uploadId, s3CosConfig.endpoint);
}
}
exports.default = S3Backend;

View File

@ -18,6 +18,10 @@ export default class S3 implements Cos<EntityDict> {
file: string | File;
uploadToAspect?: UploadToAspect;
getPercent?: Function;
parallelism?: number;
retryTimes?: number;
retryDelay?: number;
onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise<void>;
}): Promise<void>;
composeFileUrl(options: {
application: Partial<EntityDict['application']['Schema']>;

View File

@ -3,6 +3,7 @@ Object.defineProperty(exports, "__esModule", { value: true });
const assert_1 = require("oak-domain/lib/utils/assert");
const Exception_1 = require("../../types/Exception");
const Exception_2 = require("oak-domain/lib/types/Exception");
const common_1 = require("./common");
class S3 {
name = 's3';
autoInform() {
@ -29,30 +30,43 @@ class S3 {
return `extraFile/${objectId}${extension ? '.' + extension : ''}`;
}
async upload(options) {
const { extraFile, uploadFn, file, uploadToAspect, getPercent } = options;
const { extraFile, uploadFn, file, getPercent, parallelism, retryTimes, retryDelay, onChunkSuccess } = options;
const uploadMeta = extraFile.uploadMeta;
(0, assert_1.assert)(extraFile.enableChunkedUpload !== true, '暂不支持分片上传');
let response;
try {
// S3 使用预签名 URL 直接上传,不需要额外的 formData
response = await uploadFn(file, 'file', uploadMeta.uploadUrl, {}, true, getPercent, extraFile.id, "PUT");
}
catch (err) {
throw new Exception_2.OakNetworkException('网络异常,请求失败');
}
let isSuccess = false;
if (process.env.OAK_PLATFORM === 'wechatMp') {
// 小程序端上传
if (response.errMsg === 'uploadFile:ok') {
const statusCode = response.statusCode;
isSuccess = statusCode === 200 || statusCode === 204;
}
if (extraFile.enableChunkedUpload) {
return (0, common_1.chunkUpload)({
extraFile,
uploadFn,
file,
getPercent,
parallelism: parallelism,
retryTimes: retryTimes,
retryDelay: retryDelay,
onChunkSuccess: onChunkSuccess,
});
}
else {
isSuccess = response.status === 200 || response.status === 204;
}
if (isSuccess) {
return;
let response;
try {
// S3 使用预签名 URL 直接上传,不需要额外的 formData
response = await uploadFn(file, 'file', uploadMeta.uploadUrl, {}, true, getPercent, extraFile.id, "PUT");
}
catch (err) {
throw new Exception_2.OakNetworkException('网络异常,请求失败');
}
let isSuccess = false;
if (process.env.OAK_PLATFORM === 'wechatMp') {
// 小程序端上传
if (response.errMsg === 'uploadFile:ok') {
const statusCode = response.statusCode;
isSuccess = statusCode === 200 || statusCode === 204;
}
}
else {
isSuccess = response.status === 200 || response.status === 204;
}
if (isSuccess) {
return;
}
}
throw new Exception_1.OakUploadException('文件上传S3失败');
}

View File

@ -6,9 +6,8 @@ import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema';
import { AliYunUploadInfo } from '../../types/Upload';
import { ALiYunCosConfig, Protocol } from '../../types/Config';
import { OakUploadException } from '../../types/Exception';
import { isOakException,
OakNetworkException,OakUserException } from 'oak-domain/lib/types/Exception';
import { sliceFile, cleanTempFiles } from '../files/slice';
import { OakNetworkException } from 'oak-domain/lib/types/Exception';
import { chunkUpload } from './common';
export default class ALiYun implements Cos<EntityDict> {
name = 'aliyun';
@ -40,168 +39,6 @@ export default class ALiYun implements Cos<EntityDict> {
return `extraFile/${objectId}${extension ? '.' + extension : ''}`;
}
private async chunkUpload(
options: {
extraFile: OpSchema,
uploadFn: UploadFn,
file: string | File,
uploadToAspect?: UploadToAspect,
getPercent?: Function
// 分片上传时使用
parallelism?: number // 并行线程数
retryTimes?: number // 重试次数
retryDelay?: number // 重试间隔,单位毫秒
onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise<void> // 每个分片上传成功的回调
}
) {
const { extraFile, uploadFn, file, uploadToAspect, getPercent, onChunkSuccess } = options;
const chunkInfo = extraFile.chunkInfo!;
const parallelism = options.parallelism || 5;
const retryTimes = options.retryTimes || 5;
const retryDelay = options.retryDelay || 1000;
// 过滤出未完成的分片
const pendingParts = chunkInfo.parts.filter(part => !part.etag);
if (pendingParts.length === 0) {
return; // 所有分片已上传完成
}
// 将文件分片
const chunks = await sliceFile(file, chunkInfo.chunkSize, chunkInfo.partCount);
const everyPercent: {
[partNumber: number]: number
} = {} // 用于记录每个分片的进度百分比
const updateChunkPercent = (partNumber: number, percent: number) => {
everyPercent[partNumber] = percent;
}
const updatePercentInterval = setInterval(() => {
if (getPercent) {
const totalPercent = Object.values(everyPercent).reduce((acc, val) => acc + val, 0) / chunkInfo.partCount;
getPercent(totalPercent);
}
}, 500);
// 上传单个分片的函数,带重试
const uploadPart = async (part: typeof chunkInfo.parts[0], chunk: File | Blob | string) => {
let lastError;
for (let attempt = 0; attempt <= retryTimes; attempt++) {
try {
const response = await uploadFn(
chunk,
'file',
part.uploadUrl,
part.formData || {},
true,
(percent: number) => {
// 更新每个分片的进度
updateChunkPercent(part.partNumber, percent);
},
`${extraFile.id}:${part.partNumber}`,
"PUT"
);
// 验证上传是否成功
let isSuccess = false;
if (process.env.OAK_PLATFORM === 'wechatMp') {
if (response.errMsg === 'uploadFile:ok') {
const data = JSON.parse(response.data);
isSuccess = !!(data.status === 204 || data.status === 200);
}
} else {
isSuccess = !!(response.status === 200 || response.status === 204);
}
if (isSuccess) {
// 标记该分片已完成
part.etag = (response as Response).headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag")
assert(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`);
return;
}
throw new OakUploadException(`分片 ${part.partNumber} 上传失败`);
} catch (err: any) {
console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err);
lastError = err;
// 如果是DomError并且name是AbortError说明是用户主动中止上传不进行重试
if (isOakException(err, OakUserException)) {
throw err;
}
if (attempt < retryTimes) {
// 等待后重试
await new Promise(resolve => setTimeout(resolve, retryDelay));
}
}
}
throw lastError || new OakUploadException(`分片 ${part.partNumber} 上传失败`);
};
// 并行上传控制
const uploadTasks = pendingParts.map((part) => ({
part,
chunk: chunks[part.partNumber - 1]
}));
// 使用并发控制执行上传
const executing: Set<Promise<void>> = new Set();
const errors: Error[] = [];
for (const task of uploadTasks) {
let promise;
promise = (async () => {
try {
await uploadPart(task.part, task.chunk);
} catch (err) {
if (isOakException(err, OakUserException)) {
// 用户主动中止上传,抛到上层再处理
console.log(`分片 ${task.part.partNumber} 上传被用户中止`);
}
errors.push(err as Error);
throw err;
} finally {
if (promise) {
executing.delete(promise);
}
}
})();
executing.add(promise);
// 当达到并发限制时,等待任意一个完成
if (executing.size >= parallelism) {
await Promise.race(executing).catch(() => { });
}
}
// 等待所有任务完成
await Promise.allSettled([...executing]);
clearInterval(updatePercentInterval);
// 检查是否有错误
if (errors.length > 0) {
throw errors[0];
}
// 等待所有任务完成
await Promise.all(executing);
// 调用分片成功回调(所有分片完成后)
if (onChunkSuccess) {
await onChunkSuccess(chunkInfo);
}
// 清理小程序环境下的临时文件
if (process.env.OAK_PLATFORM === 'wechatMp' && typeof file === 'string') {
await cleanTempFiles(chunks as string[]);
}
return;
}
async upload(
options: {
extraFile: OpSchema,
@ -219,11 +56,10 @@ export default class ALiYun implements Cos<EntityDict> {
const { extraFile, uploadFn, file, uploadToAspect, getPercent, onChunkSuccess } = options;
const uploadMeta = extraFile.uploadMeta! as AliYunUploadInfo;
if (extraFile.enableChunkedUpload) {
return this.chunkUpload({
return chunkUpload({
extraFile,
uploadFn,
file,
uploadToAspect,
getPercent,
parallelism: options.parallelism,
retryTimes: options.retryTimes,

173
src/utils/cos/common.ts Normal file
View File

@ -0,0 +1,173 @@
import { isOakException, OakUserException } from 'oak-domain/lib/types/Exception';
import { sliceFile, cleanTempFiles } from '../files/slice';
import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema';
import { UploadFn } from "../../types/Cos";
import { EntityDict } from '../../oak-app-domain';
import assert from 'assert';
import { OakUploadException } from '../../types/Exception';
/**
* S3存储服务AWSMinIOOSS等
* @param options
* @return
*/
export async function chunkUpload(
options: {
extraFile: OpSchema,
uploadFn: UploadFn,
file: string | File,
getPercent?: Function
// 分片上传时使用
parallelism?: number // 并行线程数
retryTimes?: number // 重试次数
retryDelay?: number // 重试间隔,单位毫秒
onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise<void> // 每个分片上传成功的回调
}
) {
const { extraFile, uploadFn, file, getPercent, onChunkSuccess } = options;
const chunkInfo = extraFile.chunkInfo!;
const parallelism = options.parallelism || 5;
const retryTimes = options.retryTimes || 5;
const retryDelay = options.retryDelay || 1000;
// 过滤出未完成的分片
const pendingParts = chunkInfo.parts.filter(part => !part.etag);
if (pendingParts.length === 0) {
return; // 所有分片已上传完成
}
// 将文件分片
const chunks = await sliceFile(file, chunkInfo.chunkSize, chunkInfo.partCount);
const everyPercent: {
[partNumber: number]: number
} = {} // 用于记录每个分片的进度百分比
const updateChunkPercent = (partNumber: number, percent: number) => {
everyPercent[partNumber] = percent;
}
const updatePercentInterval = setInterval(() => {
if (getPercent) {
const totalPercent = Object.values(everyPercent).reduce((acc, val) => acc + val, 0) / chunkInfo.partCount;
getPercent(totalPercent);
}
}, 500);
// 上传单个分片的函数,带重试
const uploadPart = async (part: typeof chunkInfo.parts[0], chunk: File | Blob | string) => {
let lastError;
for (let attempt = 0; attempt <= retryTimes; attempt++) {
try {
const response = await uploadFn(
chunk,
'file',
part.uploadUrl,
part.formData || {},
true,
(percent: number) => {
// 更新每个分片的进度
updateChunkPercent(part.partNumber, percent);
},
`${extraFile.id}:${part.partNumber}`,
"PUT"
);
// 验证上传是否成功
let isSuccess = false;
if (process.env.OAK_PLATFORM === 'wechatMp') {
if (response.errMsg === 'uploadFile:ok') {
const data = JSON.parse(response.data);
isSuccess = !!(data.status === 204 || data.status === 200);
}
} else {
isSuccess = !!(response.status === 200 || response.status === 204);
}
if (isSuccess) {
// 标记该分片已完成
part.etag = (response as Response).headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag")
assert(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`);
return;
}
throw new OakUploadException(`分片 ${part.partNumber} 上传失败`);
} catch (err: any) {
console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err);
lastError = err;
// 如果是OakUserException说明是用户主动中止上传不进行重试
if (isOakException(err, OakUserException)) {
throw err;
}
if (attempt < retryTimes) {
// 等待后重试
await new Promise(resolve => setTimeout(resolve, retryDelay));
}
}
}
throw lastError || new OakUploadException(`分片 ${part.partNumber} 上传失败`);
};
// 并行上传控制
const uploadTasks = pendingParts.map((part) => ({
part,
chunk: chunks[part.partNumber - 1]
}));
// 使用并发控制执行上传
const executing: Set<Promise<void>> = new Set();
const errors: Error[] = [];
for (const task of uploadTasks) {
let promise;
promise = (async () => {
try {
await uploadPart(task.part, task.chunk);
} catch (err) {
if (isOakException(err, OakUserException)) {
// 用户主动中止上传,抛到上层再处理
console.log(`分片 ${task.part.partNumber} 上传被用户中止`);
}
errors.push(err as Error);
throw err;
} finally {
if (promise) {
executing.delete(promise);
}
}
})();
executing.add(promise);
// 当达到并发限制时,等待任意一个完成
if (executing.size >= parallelism) {
await Promise.race(executing).catch(() => { });
}
}
// 等待所有任务完成
await Promise.allSettled([...executing]);
clearInterval(updatePercentInterval);
// 检查是否有错误
if (errors.length > 0) {
throw errors[0];
}
// 等待所有任务完成
await Promise.all(executing);
// 调用分片成功回调(所有分片完成后)
if (onChunkSuccess) {
await onChunkSuccess(chunkInfo);
}
// 清理小程序环境下的临时文件
if (process.env.OAK_PLATFORM === 'wechatMp' && typeof file === 'string') {
await cleanTempFiles(chunks as string[]);
}
return;
}

View File

@ -180,26 +180,54 @@ export default class S3Backend extends S3 implements CosBackend<EntityDict> {
extraFile: OpSchema,
context: BRC<EntityDict>,
) {
throw new OakPreConditionUnsetException('S3暂不支持分片上传');
const key = this.formKey(extraFile);
const { instance, config: s3Config } =
this.getConfigAndInstance(application, extraFile.bucket!);
const preInit = await instance.prepareMultipartUpload(
extraFile.bucket!,
key,
extraFile.chunkInfo?.partCount!,
{
endpoint: s3Config.endpoint,
expiresIn: 30000, // 上传链接过期时间,单位秒
}
)
return {
uploadId: '',
chunkSize: 0,
partCount: 0,
partSize: 0,
parts: [],
}
uploadId: preInit.uploadId,
chunkSize: extraFile.chunkInfo?.chunkSize!,
partCount: preInit.parts.length,
parts: preInit.parts.map((part) => ({
partNumber: part.partNumber,
uploadUrl: part.uploadUrl,
formData: {}, // S3不需要额外的formData
})),
};
}
/**
*
*/
*
*/
async mergeChunkedUpload(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
context: BRC<EntityDict>,
): Promise<void> {
// Implementation here
const key = this.formKey(extraFile);
const { instance, config: s3CosConfig } =
this.getConfigAndInstance(application, extraFile.bucket!);
await instance.completeMultipartUpload(
extraFile.bucket!,
key,
extraFile.chunkInfo!.uploadId!,
extraFile.chunkInfo!.parts!.map((part) => ({
partNumber: part.partNumber,
eTag: part.etag!,
})),
s3CosConfig.endpoint!
);
}
async abortMultipartUpload(
@ -207,6 +235,15 @@ export default class S3Backend extends S3 implements CosBackend<EntityDict> {
extraFile: OpSchema,
context: BRC<EntityDict>,
): Promise<void> {
const key = this.formKey(extraFile);
const { instance, config: s3CosConfig } =
this.getConfigAndInstance(application, extraFile.bucket!);
await instance.abortMultipartUpload(
extraFile.bucket!,
key,
extraFile.chunkInfo!.uploadId!,
s3CosConfig.endpoint!
);
}
}

View File

@ -7,6 +7,7 @@ import { S3UploadInfo } from '../../types/Upload';
import { S3CosConfig, Protocol } from '../../types/Config';
import { OakUploadException } from '../../types/Exception';
import { OakNetworkException } from 'oak-domain/lib/types/Exception';
import { chunkUpload } from './common';
export default class S3 implements Cos<EntityDict> {
name = 's3';
@ -47,42 +48,60 @@ export default class S3 implements Cos<EntityDict> {
file: string | File,
uploadToAspect?: UploadToAspect,
getPercent?: Function
// 分片上传时使用
parallelism?: number // 并行线程数
retryTimes?: number // 重试次数
retryDelay?: number // 重试间隔,单位毫秒
onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise<void> // 每个分片上传成功的回调
}
) {
const { extraFile, uploadFn, file, uploadToAspect, getPercent } = options;
const { extraFile, uploadFn, file, getPercent, parallelism, retryTimes, retryDelay, onChunkSuccess } = options;
const uploadMeta = extraFile.uploadMeta! as S3UploadInfo;
assert(extraFile.enableChunkedUpload !== true, '暂不支持分片上传');
let response;
try {
// S3 使用预签名 URL 直接上传,不需要额外的 formData
response = await uploadFn(
if (extraFile.enableChunkedUpload) {
return chunkUpload({
extraFile,
uploadFn,
file,
'file',
uploadMeta.uploadUrl,
{},
true,
getPercent,
extraFile.id,
"PUT"
);
} catch (err) {
throw new OakNetworkException('网络异常,请求失败');
}
let isSuccess = false;
if (process.env.OAK_PLATFORM === 'wechatMp') {
// 小程序端上传
if (response.errMsg === 'uploadFile:ok') {
const statusCode = response.statusCode;
isSuccess = statusCode === 200 || statusCode === 204;
}
parallelism: parallelism,
retryTimes: retryTimes,
retryDelay: retryDelay,
onChunkSuccess: onChunkSuccess,
});
} else {
isSuccess = response.status === 200 || response.status === 204;
}
let response;
try {
// S3 使用预签名 URL 直接上传,不需要额外的 formData
response = await uploadFn(
file,
'file',
uploadMeta.uploadUrl,
{},
true,
getPercent,
extraFile.id,
"PUT"
);
} catch (err) {
throw new OakNetworkException('网络异常,请求失败');
}
if (isSuccess) {
return;
let isSuccess = false;
if (process.env.OAK_PLATFORM === 'wechatMp') {
// 小程序端上传
if (response.errMsg === 'uploadFile:ok') {
const statusCode = response.statusCode;
isSuccess = statusCode === 200 || statusCode === 204;
}
} else {
isSuccess = response.status === 200 || response.status === 204;
}
if (isSuccess) {
return;
}
}
throw new OakUploadException('文件上传S3失败');
}
@ -127,4 +146,4 @@ export default class S3 implements Cos<EntityDict> {
}
return '';
}
}
}