Compare commits

...

8 Commits
5.11.0 ... dev

108 changed files with 1703 additions and 405 deletions

View File

@ -761,6 +761,19 @@ export type AspectDict<ED extends EntityDict> = {
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
/**
*
* @param params ,
*/
presignMultiPartUpload: (params: {
extraFileId: string;
from: number;
to: number;
}, context: BackendRuntimeContext<ED>) => Promise<{
partNumber: number;
uploadUrl: string;
formData: Record<string, any>;
}[]>;
/**
*
* @param loginName

View File

@ -30,3 +30,12 @@ export declare function presignFile<ED extends EntityDict>(params: {
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
export declare function presignMultiPartUpload<ED extends EntityDict>(params: {
extraFileId: string;
from: number;
to: number;
}, context: BRC<ED>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;

View File

@ -78,14 +78,10 @@ export async function mergeChunkedUpload(params, context) {
const { parts } = await cos.listMultipartUploads(extrafile.application, extrafile, context);
const allPartsDone = parts.every(part => part.etag && part.size > 0);
assert(allPartsDone, `extraFile ${extraFileId} 存在未上传完成的分片,无法合并`);
// 赋值顺带删除一些无用信息减小体积出现过mysql排序超出限制的问题
extrafile.chunkInfo.parts = parts.map((part, index) => ({
...extrafile.chunkInfo.parts[index],
await cos.mergeChunkedUpload(extrafile.application, extrafile, parts.map(part => ({
partNumber: part.partNumber,
etag: part.etag,
uploadUrl: '', // 不需要保存上传链接
}));
await cos.mergeChunkedUpload(extrafile.application, extrafile, context);
})), context);
// 更新chunkInfo状态
const closeRootMode = context.openRootMode();
try {
@ -96,6 +92,7 @@ export async function mergeChunkedUpload(params, context) {
chunkInfo: {
...extrafile.chunkInfo,
merged: true,
parts: parts.map(part => part.etag),
},
},
filter: {
@ -127,3 +124,25 @@ export async function presignFile(params, context) {
const cos = getCosBackend(extrafile.origin);
return await cos.presignFile(method, extrafile.application, extrafile, context);
}
export async function presignMultiPartUpload(params, context) {
const { extraFileId, from, to } = params;
assert(extraFileId, 'extraFileId不能为空');
assert(from >= 1, 'from必须大于等于1');
assert(to >= from, 'to必须大于等于from');
const [extrafile] = await context.select('extraFile', {
data: {
...extraFileProjection,
application: {
...applicationProjection,
},
chunkInfo: 1,
enableChunkedUpload: 1,
},
filter: {
id: extraFileId,
}
}, { dontCollect: true });
assert(extrafile, `找不到id为${extraFileId}的extraFile记录`);
const cos = getCosBackend(extrafile.origin);
return cos.presignMultiPartUpload(extrafile.application, extrafile, from, to, context);
}

View File

@ -1,5 +1,5 @@
import { bindByEmail, bindByMobile, loginByAccount, loginByEmail, loginByMobile, loginWechat, loginWechatMp, loginWechatNative, syncUserInfoWechatMp, sendCaptchaByMobile, sendCaptchaByEmail, switchTo, refreshWechatPublicUserInfo, getWechatMpUserPhoneNumber, logout, loginByWechat, wakeupParasite, refreshToken, verifyPassword, loginWebByMpToken, setUserAvatarFromWechat } from './token';
import { getInfoByUrl, mergeChunkedUpload, presignFile } from './extraFile';
import { getInfoByUrl, mergeChunkedUpload, presignFile, presignMultiPartUpload } from './extraFile';
import { getApplication, signatureJsSDK, uploadWechatMedia, batchGetArticle, getArticle, batchGetMaterialList, getMaterial, deleteMaterial } from './application';
import { updateConfig, updateApplicationConfig, updateStyle } from './config';
import { syncWechatTemplate, getMessageType } from './template';
@ -88,6 +88,7 @@ declare const aspectDict: {
setUserAvatarFromWechat: typeof setUserAvatarFromWechat;
mergeChunkedUpload: typeof mergeChunkedUpload;
presignFile: typeof presignFile;
presignMultiPartUpload: typeof presignMultiPartUpload;
registerUserByLoginName: typeof registerUserByLoginName;
};
export default aspectDict;

View File

@ -1,5 +1,5 @@
import { bindByEmail, bindByMobile, loginByAccount, loginByEmail, loginByMobile, loginWechat, loginWechatMp, loginWechatNative, syncUserInfoWechatMp, sendCaptchaByMobile, sendCaptchaByEmail, switchTo, refreshWechatPublicUserInfo, getWechatMpUserPhoneNumber, logout, loginByWechat, wakeupParasite, refreshToken, verifyPassword, loginWebByMpToken, setUserAvatarFromWechat, } from './token';
import { getInfoByUrl, mergeChunkedUpload, presignFile } from './extraFile';
import { getInfoByUrl, mergeChunkedUpload, presignFile, presignMultiPartUpload } from './extraFile';
import { getApplication, signatureJsSDK, uploadWechatMedia, batchGetArticle, getArticle, batchGetMaterialList, getMaterial, deleteMaterial, } from './application';
import { updateConfig, updateApplicationConfig, updateStyle } from './config';
import { syncWechatTemplate, getMessageType } from './template';
@ -90,6 +90,7 @@ const aspectDict = {
// extraFile新增
mergeChunkedUpload,
presignFile,
presignMultiPartUpload,
registerUserByLoginName,
};
export default aspectDict;

View File

@ -32,7 +32,7 @@ export async function loginByOauth(params, context) {
filter: {
state: stateCode,
},
}, { dontCollect: true });
}, { dontCollect: true, forUpdate: true }); // 这里直接加锁,防止其他人抢了
const systemId = context.getSystemId();
const [applicationPassport] = await context.select('applicationPassport', {
data: {
@ -96,7 +96,7 @@ export async function loginByOauth(params, context) {
providerUserId: oauthUserInfo.providerUserId,
providerConfigId: state.providerId,
}
}, { dontCollect: true });
}, { dontCollect: true, forUpdate: true }); // 加锁,防止并发绑定
// 已登录的情况
if (islogginedIn) {
// 检查当前用户是否已绑定此提供商

View File

@ -33,36 +33,6 @@ export default OakComponent({
const redirectUri = searchParams.get('redirect_uri') || '';
const scope = searchParams.get('scope') || '';
const state = searchParams.get('state') || '';
//判断是否允许oauth登录
const application = this.features.application.getApplication();
const { result: applicationPassports } = await this.features.cache.exec('getApplicationPassports', { applicationId: application.id });
const oauthPassport = applicationPassports?.find((ele) => ele.passport?.type === 'oauth');
const oauthIds = oauthPassport?.config?.oauthIds;
let allowOauth = false;
if (clientId) {
const { data: [oauthProvider] } = await this.features.cache.refresh('oauthProvider', {
data: {
id: 1,
clientId: 1,
systemId: 1,
},
filter: {
clientId,
systemId: application.systemId,
}
});
if (oauthProvider?.id && oauthIds?.length > 0 && oauthIds.includes(oauthProvider?.id)) {
allowOauth = true;
}
}
if (!allowOauth) {
this.setState({
hasError: true,
errorMsg: 'oauth.login',
});
this.setState({ loading: false });
return;
}
this.setState({
client_id: clientId,
response_type: responseType,

View File

@ -1,3 +1,4 @@
import React from 'react';
import { WebComponentProps } from "oak-frontend-base";
import { EntityDict } from "../../../../oak-app-domain";
export default function Render(props: WebComponentProps<EntityDict, 'user', false, {
@ -6,4 +7,4 @@ export default function Render(props: WebComponentProps<EntityDict, 'user', fals
setInput: (v: string) => void;
confirm: () => Promise<void>;
showTips: () => void;
}>): import("react").JSX.Element;
}>): React.JSX.Element;

View File

@ -1,3 +1,4 @@
import React from 'react';
import { Card, Input, Button } from 'antd-mobile';
import Styles from './mobile.module.less';
export default function Render(props) {

View File

@ -8,12 +8,7 @@ type ChunkInfo = {
partCount: number;
uploadId: string;
merged: boolean;
parts: Array<{
partNumber: number;
uploadUrl: string;
etag: string;
formData?: Record<string, any>;
}>;
parts: Array<string>;
};
export interface Schema extends EntityShape {
origin: CosOrigin;

View File

@ -44,5 +44,18 @@ export const entityDesc = {
loaded: '#008000',
}
}
}
},
indexes: [
{
// 业务上可能涉及的间接授权查询,建立索引以避免全表扫描
name: 'idx_oauthUser_composite',
attributes: [{
name: 'user',
}, {
name: 'providerUserId',
}, {
name: 'providerConfig',
}]
}
]
};

View File

@ -260,6 +260,14 @@ export class ExtraFile extends Feature {
await cos.upload({
extraFile: extraFile,
uploadFn: this.fileUpLoad.uploadFile,
presignMultiPartUpload: async (from, to) => {
const res = await this.cache.exec('presignMultiPartUpload', {
extraFileId,
from,
to,
});
return res.result;
},
file: file,
uploadToAspect: this.uploadToAspect.bind(this),
getPercent: getPercent,
@ -310,7 +318,7 @@ export class ExtraFile extends Feature {
const uploadIds = [];
if (extraFile.enableChunkedUpload) {
for (let partNumber = 1; partNumber <= chunkInfo.partCount; partNumber++) {
if (!chunkInfo.parts.find(part => part.partNumber === partNumber)?.etag) {
if (!chunkInfo.parts[partNumber - 1]) {
uploadIds.push(`${extraFile.id}:${partNumber}`);
}
}

View File

@ -1,7 +1,7 @@
import { Feature } from 'oak-frontend-base/es/types/Feature';
import { isOakException, OakUnloggedInException, OakNetworkException, OakServerProxyException, OakPreConditionUnsetException, OakRequestTimeoutException, OakClockDriftException } from 'oak-domain/lib/types/Exception';
import { tokenProjection } from '../types/Projection';
import { OakPasswordUnset, OakUserInfoLoadingException } from '../types/Exception';
import { OakApplicationLoadingException, OakPasswordUnset, OakUserInfoLoadingException } from '../types/Exception';
import { LOCAL_STORAGE_KEYS } from '../config/constants';
import { cloneDeep } from 'oak-domain/lib/utils/lodash';
export class Token extends Feature {
@ -10,7 +10,7 @@ export class Token extends Feature {
cache;
storage;
application;
ignoreExceptionList = [OakNetworkException, OakServerProxyException, OakRequestTimeoutException, OakClockDriftException];
ignoreExceptionList = [OakNetworkException, OakServerProxyException, OakRequestTimeoutException, OakClockDriftException, OakApplicationLoadingException];
async loadSavedToken() {
this.tokenValue = await this.storage.load(LOCAL_STORAGE_KEYS.token);
await this.refreshTokenData(this.tokenValue);

View File

@ -9,12 +9,7 @@ type ChunkInfo = {
partCount: number;
uploadId: string;
merged: boolean;
parts: Array<{
partNumber: number;
uploadUrl: string;
etag: string;
formData?: Record<string, any>;
}>;
parts: Array<string>;
};
export type OpSchema = EntityShape & {
origin: CosOrigin;

View File

@ -58,5 +58,18 @@ export const desc = {
}
},
actionType: "crud",
actions
actions,
indexes: [
{
// 业务上可能涉及的间接授权查询,建立索引以避免全表扫描
name: 'idx_oauthUser_composite',
attributes: [{
name: "userId",
}, {
name: 'providerUserId',
}, {
name: "providerConfigId",
}]
}
]
};

View File

@ -63,10 +63,10 @@ const triggers = [
});
return;
}
const cos = getCosBackend(configOrigin);
if (!cos) {
if (!configOrigin) {
throw new OakException(`origin为${configOrigin}的extraFile没有定义Cos类请调用registerCos注入`);
}
const cos = getCosBackend(configOrigin);
await cos.formUploadMeta(context.getApplication(), data, context);
Object.assign(data, {
uploadState: 'uploading',
@ -101,18 +101,21 @@ const triggers = [
assert(data.chunkInfo?.chunkSize <= 1 * 1024 * 1024 * 1024, `chunkInfo.chunkSize必须小于1GB`);
assert(data.chunkInfo?.partCount && data.chunkInfo.partCount > 0, `chunkInfo.partCount必须大于0`);
assert(!data.chunkInfo?.merged, `chunkInfo.merged必须为false`);
assert(data.chunkInfo?.partCount <= 100, `分片数量不能超过100`);
assert(data.chunkInfo?.partCount <= 1000, `分片数量不能超过1000`);
// 计算partCount 是否正确
const expectedPartCount = Math.ceil(data.size / data.chunkInfo.chunkSize);
assert(data.chunkInfo.partCount === expectedPartCount, `chunkInfo.partCount计算错误预期值为${expectedPartCount},但实际值为${data.chunkInfo.partCount}`);
const cos = getCosBackend(data.origin);
if (!cos) {
if (!data.origin) {
throw new OakException(`origin为${data.origin}的extraFile没有定义Cos类请调用registerCos注入`);
}
const infos = await cos.composeChunkUploadInfo(context.getApplication(), data, context);
const cos = getCosBackend(data.origin);
const infos = await cos.prepareChunkedUpload(context.getApplication(), data, context);
Object.assign(data, {
chunkInfo: {
...infos,
uploadId: infos.uploadId,
chunkSize: data.chunkInfo?.chunkSize,
partCount: data.chunkInfo?.partCount,
parts: new Array(data.chunkInfo.partCount).fill(null).map(() => ''),
merged: false,
},
});
@ -168,12 +171,15 @@ const triggers = [
if (extraFile.enableChunkedUpload) {
// 是否所有的分片都已经有etag上传成功
const chunkInfo = extraFile.chunkInfo;
const allPartsDone = chunkInfo?.parts?.every(part => part.etag);
const allPartsDone = chunkInfo?.parts?.every(part => !!part);
if (allPartsDone) {
if (!chunkInfo?.merged) {
try {
// 先完成分片合并
await uploader.mergeChunkedUpload(extraFile.application, extraFile, context);
await uploader.mergeChunkedUpload(extraFile.application, extraFile, extraFile.chunkInfo.parts.map((etag, index) => ({
partNumber: index + 1,
etag: etag,
})), context);
}
catch (err) {
console.error(`合并extraFile ${extraFile.id} 的分片上传时出错,但仍继续删除操作`, err);

View File

@ -14,7 +14,7 @@ export declare function createToDo<ED extends EntityDict & BaseEntityDict, T ext
redirectTo: EntityDict['toDo']['OpSchema']['redirectTo'];
entity: any;
entityId: string;
}, userIds?: string[]): Promise<0 | 1>;
}, userIds?: string[]): Promise<1 | 0>;
/**
* todo例程entity对象上进行action操作时filtertodo完成
* entity的action的后trigger中调用

27
es/types/Cos.d.ts vendored
View File

@ -7,6 +7,11 @@ export type UploadToAspect = (file: File | string, name: string, // 文件的par
aspectName: string, // 上传的aspect名
formData: Record<string, any>, // 上传的其它part参数
autoInform?: boolean) => Promise<any>;
export type PresignMultiPartUploadFn = (from: number, to: number) => Promise<{
partNumber: number;
uploadUrl: string;
formData: Record<string, any>;
}[]>;
/**
* Complicated Object Storage
* extraFile对象上对文件进行操作的目标类
@ -26,6 +31,7 @@ export interface Cos<ED extends EntityDict> {
*/
upload: (options: {
extraFile: ED['extraFile']['OpSchema'];
presignMultiPartUpload?: PresignMultiPartUploadFn;
uploadFn: UploadFn;
file: string | File;
uploadToAspect?: UploadToAspect;
@ -77,9 +83,12 @@ export interface CosBackend<ED extends EntityDict> {
* @returns
*/
removeFile: (application: ED['application']['Schema'], extraFile: ED['extraFile']['OpSchema'], context: BRC<ED>) => Promise<void>;
prepareChunkedUpload: (application: ED['application']['Schema'], extraFile: ED['extraFile']['OpSchema'], context: BRC<ED>) => Promise<{
uploadId: string;
}>;
/**
*
* @param extraFile
* @param extraFileId extraFile的id
* @returns
*/
abortMultipartUpload: (application: ED['application']['Schema'], extraFile: ED['extraFile']['OpSchema'], context: BRC<ED>) => Promise<void>;
@ -99,7 +108,10 @@ export interface CosBackend<ED extends EntityDict> {
/**
*
*/
mergeChunkedUpload: (application: ED['application']['Schema'], extraFile: ED['extraFile']['OpSchema'], context: BRC<ED>) => Promise<void>;
mergeChunkedUpload: (application: ED['application']['Schema'], extraFile: ED['extraFile']['OpSchema'], parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<ED>) => Promise<void>;
/**
*
*/
@ -124,4 +136,15 @@ export interface CosBackend<ED extends EntityDict> {
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
/**
*
* @param extraFileId extraFile的id
* @param from partNumber
* @param to partNumber
*/
presignMultiPartUpload: (application: ED['application']['Schema'], extraFile: ED['extraFile']['OpSchema'], from: number, to: number, context: BRC<ED>) => Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
}

View File

@ -27,7 +27,10 @@ export default class ALiYunBackend extends ALiYun implements CosBackend<EntityDi
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: Array<{
@ -42,4 +45,17 @@ export default class ALiYunBackend extends ALiYun implements CosBackend<EntityDi
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
/**
*
* @param extraFileId extraFile的id
* @param from partNumber
* @param to partNumber
*/
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -3,6 +3,7 @@ import ALiYun from './aliyun';
import { ALiYunSDK } from 'oak-external-sdk';
import { OakPreConditionUnsetException } from 'oak-domain/lib/types/Exception';
import { stsAssumeRole } from 'oak-external-sdk/lib/service/ali/sts';
import { OakException } from 'oak-domain/lib/types';
export default class ALiYunBackend extends ALiYun {
getConfigAndInstance(application) {
const { config, account } = this.getConfig(application);
@ -128,21 +129,29 @@ export default class ALiYunBackend extends ALiYun {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
const key = this.formKey(extraFile);
const { instance, config: aliyunCosConfig } = this.getConfigAndInstance(application);
const b = aliyunCosConfig.buckets.find((ele) => ele.name === extraFile.bucket);
assert(b, `extraFile中的bucket名称在阿里云配置中找不到「${extraFile.bucket}`);
await instance.completeMultipartUpload(extraFile.bucket, b.zone, key, extraFile.chunkInfo.uploadId, extraFile.chunkInfo.parts.map((part) => ({
number: part.partNumber,
etag: part.etag,
})));
assert(extraFile.chunkInfo?.uploadId, 'extraFile缺少chunkInfo.uploadId无法完成分片上传的合并操作');
assert(parts.length > 0, 'parts不能为空无法完成分片上传的合并操作');
try {
await instance.completeMultipartUpload(extraFile.bucket, b.zone, key, extraFile.chunkInfo.uploadId, parts.map(part => ({
number: part.partNumber,
etag: part.etag,
})));
}
catch (err) {
throw new OakException('合并分片上传失败' + 'extraFile' + err);
}
}
async abortMultipartUpload(application, extraFile, context) {
const key = this.formKey(extraFile);
const { instance, config: aliyunCosConfig } = this.getConfigAndInstance(application);
const b = aliyunCosConfig.buckets.find((ele) => ele.name === extraFile.bucket);
assert(b, `extraFile中的bucket名称在阿里云配置中找不到「${extraFile.bucket}`);
assert(extraFile.chunkInfo?.uploadId, 'extraFile缺少chunkInfo.uploadId无法中止分片上传操作');
await instance.abortMultipartUpload(extraFile.bucket, b.zone, key, extraFile.chunkInfo.uploadId);
}
async listMultipartUploads(application, extraFile, context) {
@ -150,6 +159,7 @@ export default class ALiYunBackend extends ALiYun {
const { instance, config: aliyunCosConfig } = this.getConfigAndInstance(application);
const b = aliyunCosConfig.buckets.find((ele) => ele.name === extraFile.bucket);
assert(b, `extraFile中的bucket名称在阿里云配置中找不到「${extraFile.bucket}`);
assert(extraFile.chunkInfo?.uploadId, 'extraFile缺少chunkInfo.uploadId无法列出分片上传信息');
return await instance.listParts(extraFile.bucket, b.zone, key, extraFile.chunkInfo.uploadId);
}
async presignFile(methods, application, extraFile, context) {
@ -161,4 +171,60 @@ export default class ALiYunBackend extends ALiYun {
expires: 24 * 60 * 60, // 1 day
});
}
/**
* 对一段文件的分片上传进行预签名
* @param extraFileId extraFile的id
* @param from 起始partNumber
* @param to 结束partNumber
*/
async presignMultiPartUpload(application, extraFile, from, to, context) {
const key = this.formKey(extraFile);
const { instance, config: aliyunCosConfig } = this.getConfigAndInstance(application);
const b = aliyunCosConfig.buckets.find((ele) => ele.name === extraFile.bucket);
assert(b, `extraFile中的bucket名称在阿里云配置中找不到「${extraFile.bucket}`);
const res = await instance.presignMulti(extraFile.bucket, b.zone, key, extraFile.chunkInfo.uploadId, from, to, {
expires: 24 * 60 * 60, // 1 day
});
return res;
}
async prepareChunkedUpload(application, extraFile, context) {
const key = this.formKey(extraFile);
const { instance, config: aliyunCosConfig, account } = this.getConfigAndInstance(application);
let useSts = true;
let stsInfo = {};
if (!account.stsEndpoint || !account.roleArn || !account.roleSessionName) {
useSts = false;
console.warn("阿里云Cos配置中缺少sts相关配置无法使用sts方式上传分片将使用账号授权进行上传可能存在安全风险请检查确保不会暴露accessKey");
}
else {
try {
const res = await stsAssumeRole({
accessKeyId: account.accessKeyId,
accessKeySecret: account.accessKeySecret,
endpoint: account.stsEndpoint,
roleArn: account.roleArn,
roleSessionName: account.roleSessionName,
});
stsInfo = {
stsToken: res.Credentials.SecurityToken,
accessKeyId: res.Credentials.AccessKeyId,
accessKeySecret: res.Credentials.AccessKeySecret,
};
}
catch (err) {
console.error("Failed to assume role for STS:", err);
throw new OakPreConditionUnsetException("获取阿里云STS临时凭证失败请检查配置是否正确", 'extraFile');
}
}
// 大部分校验都在formUploadMeta中完成这里可以不多做判断了
const b = aliyunCosConfig.buckets.find((ele) => ele.name === extraFile.bucket);
const preInit = await instance.initiateMultipartUpload(extraFile.bucket, b.zone, key, {
timeout: 30 * 1000, // 30 seconds
...(useSts ? stsInfo
: {}),
});
return {
uploadId: preInit.uploadId,
};
}
}

View File

@ -1,5 +1,5 @@
import { EntityDict } from '../../oak-app-domain';
import { Cos, UploadFn, UploadToAspect } from "../../types/Cos";
import { Cos, PresignMultiPartUploadFn, UploadFn, UploadToAspect } from "../../types/Cos";
import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema';
import { ALiYunCosConfig } from '../../types/Config';
export default class ALiYun implements Cos<EntityDict> {
@ -12,6 +12,7 @@ export default class ALiYun implements Cos<EntityDict> {
protected formKey(extraFile: Partial<OpSchema>): string;
upload(options: {
extraFile: OpSchema;
presignMultiPartUpload?: PresignMultiPartUploadFn;
uploadFn: UploadFn;
file: string | File;
uploadToAspect?: UploadToAspect;

View File

@ -26,11 +26,12 @@ export default class ALiYun {
return `extraFile/${objectId}${extension ? '.' + extension : ''}`;
}
async upload(options) {
const { extraFile, uploadFn, file, uploadToAspect, getPercent, onChunkSuccess } = options;
const { extraFile, uploadFn, file, presignMultiPartUpload, uploadToAspect, getPercent, onChunkSuccess } = options;
const uploadMeta = extraFile.uploadMeta;
if (extraFile.enableChunkedUpload) {
return chunkUpload({
extraFile,
presignMultiPartUpload: presignMultiPartUpload,
uploadFn,
file,
getPercent,

View File

@ -1,5 +1,5 @@
import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema';
import { UploadFn } from "../../types/Cos";
import { PresignMultiPartUploadFn, UploadFn } from "../../types/Cos";
import { EntityDict } from '../../oak-app-domain';
export declare function isAbortError(error: any): boolean;
/**
@ -9,6 +9,7 @@ export declare function isAbortError(error: any): boolean;
*/
export declare function chunkUpload(options: {
extraFile: OpSchema;
presignMultiPartUpload: PresignMultiPartUploadFn;
uploadFn: UploadFn;
file: string | File;
getPercent?: Function;

View File

@ -1,4 +1,5 @@
import { sliceFile } from '../files/slice';
import assert from 'assert';
import { OakUploadException } from '../../types/Exception';
export function isAbortError(error) {
return error instanceof DOMException && error.name === 'AbortError';
@ -9,14 +10,17 @@ export function isAbortError(error) {
* @return
*/
export async function chunkUpload(options) {
const { extraFile, uploadFn, file, getPercent, onChunkSuccess } = options;
const { extraFile, uploadFn, file, getPercent, onChunkSuccess, presignMultiPartUpload } = options;
const chunkInfo = extraFile.chunkInfo;
const parallelism = options.parallelism || 5;
const retryTimes = options.retryTimes || 5;
const retryDelay = options.retryDelay || 1000;
// 过滤出未完成的分片
const pendingParts = chunkInfo.parts.filter(part => !part.etag);
if (pendingParts.length === 0) {
const pendingPartNumbers = chunkInfo.parts
.map((etag, index) => ({ partNumber: index + 1, etag }))
.filter(item => !item.etag)
.map(item => item.partNumber);
if (pendingPartNumbers.length === 0) {
return; // 所有分片已上传完成
}
// 将文件分片
@ -31,11 +35,93 @@ export async function chunkUpload(options) {
getPercent(totalPercent);
}
}, 500);
const uploadTasks = pendingPartNumbers.map((partNumber) => ({
partNumber,
chunk: chunks[partNumber - 1]
}));
// 预签名池管理
const presignPool = new Map();
const fetchingRanges = new Map(); // 记录正在请求的范围
const BATCH_SIZE = 50;
/**
* 检查两个范围是否重叠
*/
const isRangeOverlap = (a1, a2, b1, b2) => {
return a1 <= b2 && b1 <= a2;
};
/**
* 查找与指定范围重叠的正在请求的范围
*/
const findOverlappingRanges = (from, to) => {
const overlapping = [];
for (const [key, range] of fetchingRanges.entries()) {
if (isRangeOverlap(from, to, range.from, range.to)) {
overlapping.push(range.promise);
}
}
return overlapping;
};
/**
* 获取指定 partNumber 的预签名信息
* 优化允许不重叠范围的请求并发执行
*/
const getPresign = async (partNumber) => {
if (presignPool.has(partNumber)) {
return presignPool.get(partNumber);
}
// 标准化范围计算,对齐到 BATCH_SIZE 的倍数
const batchIndex = Math.floor((partNumber - 1) / BATCH_SIZE);
const from = batchIndex * BATCH_SIZE + 1;
const to = Math.min(from + BATCH_SIZE - 1, chunkInfo.partCount);
const rangeKey = `${from}-${to}`;
// 如果已经有相同范围的请求,等待它
if (fetchingRanges.has(rangeKey)) {
await fetchingRanges.get(rangeKey).promise;
assert(presignPool.has(partNumber), `无法获取分片 ${partNumber} 的预签名信息`);
return presignPool.get(partNumber);
}
// 查找重叠的范围
let overlappingRequests = findOverlappingRanges(from, to);
while (overlappingRequests.length > 0) {
await Promise.all(overlappingRequests);
if (presignPool.has(partNumber)) {
return presignPool.get(partNumber);
}
// 在等待期间,可能其他任务已经发起了相同范围的请求
if (fetchingRanges.has(rangeKey)) {
await fetchingRanges.get(rangeKey).promise;
assert(presignPool.has(partNumber), `无法获取分片 ${partNumber} 的预签名信息`);
return presignPool.get(partNumber);
}
overlappingRequests = findOverlappingRanges(from, to);
}
// 创建请求
const fetchPromise = (async () => {
try {
const presignedParts = await presignMultiPartUpload(from, to);
for (const item of presignedParts) {
presignPool.set(item.partNumber, {
uploadUrl: item.uploadUrl,
formData: item.formData
});
}
}
finally {
fetchingRanges.delete(rangeKey);
}
})();
fetchingRanges.set(rangeKey, { from, to, promise: fetchPromise });
await fetchPromise;
assert(presignPool.has(partNumber), `无法获取分片 ${partNumber} 的预签名信息`);
return presignPool.get(partNumber);
};
// 上传单个分片的函数,带重试
const uploadPart = async (part, chunk) => {
const uploadPart = async (partNumber, chunk) => {
let lastError;
for (let attempt = 0; attempt <= retryTimes; attempt++) {
try {
// 从预签名池获取信息
const presignInfo = await getPresign(partNumber);
let data;
if (chunk.type === 'getter') {
data = await chunk.getFile();
@ -46,47 +132,34 @@ export async function chunkUpload(options) {
const response = await uploadFn({
file: data,
name: 'file',
uploadUrl: part.uploadUrl,
formData: part.formData || {},
uploadUrl: presignInfo.uploadUrl, // 从池中获取
formData: presignInfo.formData, // 从池中获取
autoInform: true,
getPercent: (percent) => {
// 更新每个分片的进度
updateChunkPercent(part.partNumber, percent);
updateChunkPercent(partNumber, percent); // 使用 partNumber
},
uploadId: `${extraFile.id}:${part.partNumber}`,
uploadId: `${extraFile.id}:${partNumber}`, // 使用 partNumber
method: "PUT"
});
// 验证上传是否成功
let isSuccess = false;
isSuccess = !!(response.status === 200 || response.status === 204);
let isSuccess = !!(response.status === 200 || response.status === 204);
if (isSuccess) {
// // 标记该分片已完成
// part.etag = (response as Response).headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag")
// assert(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`);
return;
}
throw new OakUploadException(`分片 ${part.partNumber} 上传失败`);
throw new OakUploadException(`分片 ${partNumber} 上传失败`);
}
catch (err) {
console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err);
console.error(`分片 ${partNumber} 上传第 ${attempt + 1} 次失败:`, err);
lastError = err;
// 如果是OakUserException说明是用户主动中止上传不进行重试
if (isAbortError(err)) {
throw err;
}
if (attempt < retryTimes) {
// 等待后重试
await new Promise(resolve => setTimeout(resolve, retryDelay));
}
}
}
throw lastError || new OakUploadException(`分片 ${part.partNumber} 上传失败`);
throw lastError || new OakUploadException(`分片 ${partNumber} 上传失败`);
};
// 并行上传控制
const uploadTasks = pendingParts.map((part) => ({
part,
chunk: chunks[part.partNumber - 1]
}));
// 使用并发控制执行上传
const executing = new Set();
const errors = [];
@ -99,12 +172,11 @@ export async function chunkUpload(options) {
let promise;
promise = (async () => {
try {
await uploadPart(task.part, task.chunk);
await uploadPart(task.partNumber, task.chunk); // 修改参数
}
catch (err) {
if (isAbortError(err)) {
// 用户主动中止上传,设置中止标志,阻止后续任务开始
console.log(`分片 ${task.part.partNumber} 上传被用户中止`);
console.log(`分片 ${task.partNumber} 上传被用户中止`); // 修改日志
shouldAbort = true;
}
errors.push(err);

View File

@ -24,7 +24,10 @@ export default class CTYunBackend extends CTYun implements CosBackend<EntityDict
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: never[];
@ -34,4 +37,12 @@ export default class CTYunBackend extends CTYun implements CosBackend<EntityDict
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -88,7 +88,7 @@ export default class CTYunBackend extends CTYun {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
// Implementation here
}
async abortMultipartUpload(application, extraFile, context) {
@ -109,4 +109,10 @@ export default class CTYunBackend extends CTYun {
expires: 24 * 60 * 60, // 1 day
});
}
presignMultiPartUpload(application, extraFile, from, to, context) {
throw new OakPreConditionUnsetException('天翼云暂不支持分片上传预签名');
}
prepareChunkedUpload(application, extraFile, context) {
throw new OakPreConditionUnsetException("天翼云分片上传请使用composeChunkUploadInfo方法获取上传信息", 'extraFile');
}
}

View File

@ -24,7 +24,7 @@ export function registerCosBackend(clazz) {
CosBackendDict[instance.name] = instance;
}
export function getCosBackend(origin) {
assert(CosBackendDict.hasOwnProperty(origin));
assert(CosBackendDict.hasOwnProperty(origin), `不存在类型为"${origin}"的CosBackend类`);
return CosBackendDict[origin];
}
export async function composeFileUrlBackend(application, extraFile, context, style) {

View File

@ -24,7 +24,10 @@ export default class LocalBackend extends Local implements CosBackend<EntityDict
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: never[];
@ -34,4 +37,12 @@ export default class LocalBackend extends Local implements CosBackend<EntityDict
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -66,7 +66,7 @@ export default class LocalBackend extends Local {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
// Implementation here
}
async abortMultipartUpload(application, extraFile, context) {
@ -88,5 +88,11 @@ export default class LocalBackend extends Local {
url: '',
};
}
presignMultiPartUpload(application, extraFile, from, to, context) {
throw new OakPreConditionUnsetException('本地存储暂不支持分片上传预签名');
}
prepareChunkedUpload(application, extraFile, context) {
throw new OakPreConditionUnsetException("本地存储分片上传请使用composeChunkUploadInfo方法获取上传信息", 'extraFile');
}
}
;

View File

@ -24,7 +24,10 @@ export default class QiniuBackend extends Qiniu implements CosBackend<EntityDict
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: never[];
@ -34,4 +37,12 @@ export default class QiniuBackend extends Qiniu implements CosBackend<EntityDict
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -107,7 +107,7 @@ export default class QiniuBackend extends Qiniu {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
// Implementation here
}
async abortMultipartUpload(application, extraFile, context) {
@ -128,5 +128,11 @@ export default class QiniuBackend extends Qiniu {
expires: 24 * 60 * 60, // 1 day
});
}
presignMultiPartUpload(application, extraFile, from, to, context) {
throw new OakPreConditionUnsetException('七牛云暂不支持分片上传预签名');
}
prepareChunkedUpload(application, extraFile, context) {
throw new OakPreConditionUnsetException("七牛云分片上传请使用composeChunkUploadInfo方法获取上传信息", 'extraFile');
}
}
;

View File

@ -27,7 +27,10 @@ export default class S3Backend extends S3 implements CosBackend<EntityDict> {
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: Array<{
@ -42,4 +45,12 @@ export default class S3Backend extends S3 implements CosBackend<EntityDict> {
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -1,7 +1,7 @@
import { assert } from 'oak-domain/lib/utils/assert';
import S3 from './s3';
import { S3SDK } from 'oak-external-sdk';
import { OakExternalException } from 'oak-domain/lib/types/Exception';
import { OakException, OakExternalException } from 'oak-domain/lib/types/Exception';
export default class S3Backend extends S3 {
getConfigAndInstance(application, bucket) {
const { config, account, endpoint, defaultBucket } = this.getConfig(application);
@ -116,24 +116,33 @@ export default class S3Backend extends S3 {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
const key = this.formKey(extraFile);
const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket);
await instance.completeMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo.uploadId, extraFile.chunkInfo.parts.map((part) => ({
partNumber: part.partNumber,
eTag: part.etag,
})), s3CosConfig.endpoint);
assert(extraFile.chunkInfo?.uploadId, 'extraFile缺少chunkInfo.uploadId无法完成分片上传的合并操作');
assert(parts.length > 0, 'parts不能为空无法完成分片上传的合并操作');
try {
await instance.completeMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo.uploadId, parts.map(part => ({
partNumber: part.partNumber,
eTag: part.etag,
})), s3CosConfig.endpoint);
}
catch (err) {
throw new OakException('合并分片上传失败' + 'extraFile' + err);
}
}
async abortMultipartUpload(application, extraFile, context) {
const key = this.formKey(extraFile);
const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket);
assert(extraFile.chunkInfo?.uploadId, 'extraFile缺少chunkInfo.uploadId无法中止分片上传操作');
await instance.abortMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo.uploadId, s3CosConfig.endpoint);
}
async listMultipartUploads(application, extraFile, context) {
const key = this.formKey(extraFile);
const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket);
const result = await instance.listParts(extraFile.bucket, key, extraFile.chunkInfo.uploadId, s3CosConfig.endpoint, 101);
assert(result.isTruncated === false, `分片数量超过101无法列出所有分片信息不应当出现这个情况触发器中已经限制了最大分片数量为100`);
const result = await instance.listParts(extraFile.bucket, key, extraFile.chunkInfo.uploadId, s3CosConfig.endpoint, 1001);
assert(result.isTruncated === false, `分片数量超过1001无法列出所有分片信息不应当出现这个情况触发器中已经限制了最大分片数量为100`);
assert(extraFile.chunkInfo?.uploadId, 'extraFile缺少chunkInfo.uploadId无法列出分片上传信息');
return {
parts: result.parts.map((part) => ({
partNumber: part.partNumber,
@ -152,4 +161,21 @@ export default class S3Backend extends S3 {
expires: 24 * 60 * 60, // 1 day
});
}
presignMultiPartUpload(application, extraFile, from, to, context) {
const key = this.formKey(extraFile);
const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket);
const b = s3CosConfig.buckets.find((ele) => ele.name === extraFile.bucket);
assert(b, `extraFile中的bucket名称在S3配置中找不到「${extraFile.bucket}`);
return instance.presignMulti(extraFile.bucket, key, extraFile.chunkInfo.uploadId, from, to, {
expiresIn: 3 * 24 * 60 * 60, // 3 days
});
}
async prepareChunkedUpload(application, extraFile, context) {
const key = this.formKey(extraFile);
const { instance, config: s3Config } = this.getConfigAndInstance(application, extraFile.bucket);
const preInit = await instance.createMultipartUpload(extraFile.bucket, key, s3Config.endpoint);
return {
uploadId: preInit.uploadId,
};
}
}

View File

@ -1,5 +1,5 @@
import { EntityDict } from '../../oak-app-domain';
import { Cos, UploadFn, UploadToAspect } from "../../types/Cos";
import { Cos, PresignMultiPartUploadFn, UploadFn, UploadToAspect } from "../../types/Cos";
import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema';
import { S3CosConfig } from '../../types/Config';
export default class S3 implements Cos<EntityDict> {
@ -14,6 +14,7 @@ export default class S3 implements Cos<EntityDict> {
protected formKey(extraFile: Partial<OpSchema>): string;
upload(options: {
extraFile: OpSchema;
presignMultiPartUpload?: PresignMultiPartUploadFn;
uploadFn: UploadFn;
file: string | File;
uploadToAspect?: UploadToAspect;

View File

@ -28,11 +28,12 @@ export default class S3 {
return `extraFile/${objectId}${extension ? '.' + extension : ''}`;
}
async upload(options) {
const { extraFile, uploadFn, file, getPercent, parallelism, retryTimes, retryDelay, onChunkSuccess } = options;
const { extraFile, uploadFn, file, presignMultiPartUpload, getPercent, parallelism, retryTimes, retryDelay, onChunkSuccess } = options;
const uploadMeta = extraFile.uploadMeta;
if (extraFile.enableChunkedUpload) {
return chunkUpload({
extraFile,
presignMultiPartUpload: presignMultiPartUpload,
uploadFn,
file,
getPercent,

View File

@ -24,7 +24,10 @@ export default class TencentYunBackend extends TencentYun implements CosBackend<
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: never[];
@ -34,4 +37,12 @@ export default class TencentYunBackend extends TencentYun implements CosBackend<
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -88,7 +88,7 @@ export default class TencentYunBackend extends TencentYun {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
// Implementation here
}
async abortMultipartUpload(application, extraFile, context) {
@ -107,4 +107,10 @@ export default class TencentYunBackend extends TencentYun {
expires: 24 * 60 * 60, // 1 day
});
}
presignMultiPartUpload(application, extraFile, from, to, context) {
throw new OakPreConditionUnsetException('腾讯云暂不支持分片上传预签名');
}
prepareChunkedUpload(application, extraFile, context) {
throw new OakPreConditionUnsetException("腾讯云分片上传请使用composeChunkUploadInfo方法获取上传信息", 'extraFile');
}
}

View File

@ -24,7 +24,10 @@ export default class UnknownBackend extends Unknown implements CosBackend<Entity
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: never[];
@ -34,4 +37,12 @@ export default class UnknownBackend extends Unknown implements CosBackend<Entity
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -28,7 +28,7 @@ export default class UnknownBackend extends Unknown {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
// Implementation here
}
async abortMultipartUpload(application, extraFile, context) {
@ -41,5 +41,11 @@ export default class UnknownBackend extends Unknown {
async presignFile(methods, application, extraFile, context) {
throw new OakPreConditionUnsetException('未知存储暂不支持预签名操作');
}
presignMultiPartUpload(application, extraFile, from, to, context) {
throw new OakPreConditionUnsetException('未知存储暂不支持分片上传预签名');
}
prepareChunkedUpload(application, extraFile, context) {
throw new OakPreConditionUnsetException("未知存储分片上传请使用composeChunkUploadInfo方法获取上传信息", 'extraFile');
}
}
;

View File

@ -24,7 +24,10 @@ export default class WechatBackend extends Wechat implements CosBackend<EntityDi
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: never[];
@ -34,4 +37,12 @@ export default class WechatBackend extends Wechat implements CosBackend<EntityDi
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -49,7 +49,7 @@ export default class WechatBackend extends Wechat {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
// Implementation here
}
async abortMultipartUpload(application, extraFile, context) {
@ -62,5 +62,11 @@ export default class WechatBackend extends Wechat {
async presignFile(methods, application, extraFile, context) {
return { url: '' };
}
presignMultiPartUpload(application, extraFile, from, to, context) {
throw new OakPreConditionUnsetException('微信暂不支持分片上传预签名');
}
prepareChunkedUpload(application, extraFile, context) {
throw new OakPreConditionUnsetException("微信分片上传请使用composeChunkUploadInfo方法获取上传信息", 'extraFile');
}
}
;

View File

@ -134,7 +134,10 @@ const watchers = [
continue;
}
// 去合并分片
await cos.mergeChunkedUpload(context.getApplication(), d, context);
await cos.mergeChunkedUpload(context.getApplication(), d, d.chunkInfo.parts.map((etag, index) => ({
partNumber: index + 1,
etag: etag,
})), context);
await context.operate('extraFile', {
id: await generateNewIdAsync(),
action: 'update',

View File

@ -761,6 +761,19 @@ export type AspectDict<ED extends EntityDict> = {
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
/**
*
* @param params ,
*/
presignMultiPartUpload: (params: {
extraFileId: string;
from: number;
to: number;
}, context: BackendRuntimeContext<ED>) => Promise<{
partNumber: number;
uploadUrl: string;
formData: Record<string, any>;
}[]>;
/**
*
* @param loginName

View File

@ -30,3 +30,12 @@ export declare function presignFile<ED extends EntityDict>(params: {
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
export declare function presignMultiPartUpload<ED extends EntityDict>(params: {
extraFileId: string;
from: number;
to: number;
}, context: BRC<ED>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;

View File

@ -4,6 +4,7 @@ exports.getInfoByUrl = getInfoByUrl;
exports.uploadExtraFile = uploadExtraFile;
exports.mergeChunkedUpload = mergeChunkedUpload;
exports.presignFile = presignFile;
exports.presignMultiPartUpload = presignMultiPartUpload;
const tslib_1 = require("tslib");
const WechatSDK_1 = tslib_1.__importDefault(require("oak-external-sdk/lib/WechatSDK"));
const uuid_1 = require("oak-domain/lib/utils/uuid");
@ -85,14 +86,10 @@ async function mergeChunkedUpload(params, context) {
const { parts } = await cos.listMultipartUploads(extrafile.application, extrafile, context);
const allPartsDone = parts.every(part => part.etag && part.size > 0);
(0, assert_1.assert)(allPartsDone, `extraFile ${extraFileId} 存在未上传完成的分片,无法合并`);
// 赋值顺带删除一些无用信息减小体积出现过mysql排序超出限制的问题
extrafile.chunkInfo.parts = parts.map((part, index) => ({
...extrafile.chunkInfo.parts[index],
await cos.mergeChunkedUpload(extrafile.application, extrafile, parts.map(part => ({
partNumber: part.partNumber,
etag: part.etag,
uploadUrl: '', // 不需要保存上传链接
}));
await cos.mergeChunkedUpload(extrafile.application, extrafile, context);
})), context);
// 更新chunkInfo状态
const closeRootMode = context.openRootMode();
try {
@ -103,6 +100,7 @@ async function mergeChunkedUpload(params, context) {
chunkInfo: {
...extrafile.chunkInfo,
merged: true,
parts: parts.map(part => part.etag),
},
},
filter: {
@ -134,3 +132,25 @@ async function presignFile(params, context) {
const cos = (0, index_backend_1.getCosBackend)(extrafile.origin);
return await cos.presignFile(method, extrafile.application, extrafile, context);
}
async function presignMultiPartUpload(params, context) {
const { extraFileId, from, to } = params;
(0, assert_1.assert)(extraFileId, 'extraFileId不能为空');
(0, assert_1.assert)(from >= 1, 'from必须大于等于1');
(0, assert_1.assert)(to >= from, 'to必须大于等于from');
const [extrafile] = await context.select('extraFile', {
data: {
...Projection_1.extraFileProjection,
application: {
...Projection_1.applicationProjection,
},
chunkInfo: 1,
enableChunkedUpload: 1,
},
filter: {
id: extraFileId,
}
}, { dontCollect: true });
(0, assert_1.assert)(extrafile, `找不到id为${extraFileId}的extraFile记录`);
const cos = (0, index_backend_1.getCosBackend)(extrafile.origin);
return cos.presignMultiPartUpload(extrafile.application, extrafile, from, to, context);
}

View File

@ -1,5 +1,5 @@
import { bindByEmail, bindByMobile, loginByAccount, loginByEmail, loginByMobile, loginWechat, loginWechatMp, loginWechatNative, syncUserInfoWechatMp, sendCaptchaByMobile, sendCaptchaByEmail, switchTo, refreshWechatPublicUserInfo, getWechatMpUserPhoneNumber, logout, loginByWechat, wakeupParasite, refreshToken, verifyPassword, loginWebByMpToken, setUserAvatarFromWechat } from './token';
import { getInfoByUrl, mergeChunkedUpload, presignFile } from './extraFile';
import { getInfoByUrl, mergeChunkedUpload, presignFile, presignMultiPartUpload } from './extraFile';
import { getApplication, signatureJsSDK, uploadWechatMedia, batchGetArticle, getArticle, batchGetMaterialList, getMaterial, deleteMaterial } from './application';
import { updateConfig, updateApplicationConfig, updateStyle } from './config';
import { syncWechatTemplate, getMessageType } from './template';
@ -88,6 +88,7 @@ declare const aspectDict: {
setUserAvatarFromWechat: typeof setUserAvatarFromWechat;
mergeChunkedUpload: typeof mergeChunkedUpload;
presignFile: typeof presignFile;
presignMultiPartUpload: typeof presignMultiPartUpload;
registerUserByLoginName: typeof registerUserByLoginName;
};
export default aspectDict;

View File

@ -92,6 +92,7 @@ const aspectDict = {
// extraFile新增
mergeChunkedUpload: extraFile_1.mergeChunkedUpload,
presignFile: extraFile_1.presignFile,
presignMultiPartUpload: extraFile_1.presignMultiPartUpload,
registerUserByLoginName: user_1.registerUserByLoginName,
};
exports.default = aspectDict;

View File

@ -39,7 +39,7 @@ async function loginByOauth(params, context) {
filter: {
state: stateCode,
},
}, { dontCollect: true });
}, { dontCollect: true, forUpdate: true }); // 这里直接加锁,防止其他人抢了
const systemId = context.getSystemId();
const [applicationPassport] = await context.select('applicationPassport', {
data: {
@ -103,7 +103,7 @@ async function loginByOauth(params, context) {
providerUserId: oauthUserInfo.providerUserId,
providerConfigId: state.providerId,
}
}, { dontCollect: true });
}, { dontCollect: true, forUpdate: true }); // 加锁,防止并发绑定
// 已登录的情况
if (islogginedIn) {
// 检查当前用户是否已绑定此提供商

View File

@ -8,12 +8,7 @@ type ChunkInfo = {
partCount: number;
uploadId: string;
merged: boolean;
parts: Array<{
partNumber: number;
uploadUrl: string;
etag: string;
formData?: Record<string, any>;
}>;
parts: Array<string>;
};
export interface Schema extends EntityShape {
origin: CosOrigin;

View File

@ -47,5 +47,18 @@ exports.entityDesc = {
loaded: '#008000',
}
}
}
},
indexes: [
{
// 业务上可能涉及的间接授权查询,建立索引以避免全表扫描
name: 'idx_oauthUser_composite',
attributes: [{
name: 'user',
}, {
name: 'providerUserId',
}, {
name: 'providerConfig',
}]
}
]
};

View File

@ -263,6 +263,14 @@ class ExtraFile extends Feature_1.Feature {
await cos.upload({
extraFile: extraFile,
uploadFn: this.fileUpLoad.uploadFile,
presignMultiPartUpload: async (from, to) => {
const res = await this.cache.exec('presignMultiPartUpload', {
extraFileId,
from,
to,
});
return res.result;
},
file: file,
uploadToAspect: this.uploadToAspect.bind(this),
getPercent: getPercent,
@ -313,7 +321,7 @@ class ExtraFile extends Feature_1.Feature {
const uploadIds = [];
if (extraFile.enableChunkedUpload) {
for (let partNumber = 1; partNumber <= chunkInfo.partCount; partNumber++) {
if (!chunkInfo.parts.find(part => part.partNumber === partNumber)?.etag) {
if (!chunkInfo.parts[partNumber - 1]) {
uploadIds.push(`${extraFile.id}:${partNumber}`);
}
}

View File

@ -13,7 +13,7 @@ class Token extends Feature_1.Feature {
cache;
storage;
application;
ignoreExceptionList = [Exception_1.OakNetworkException, Exception_1.OakServerProxyException, Exception_1.OakRequestTimeoutException, Exception_1.OakClockDriftException];
ignoreExceptionList = [Exception_1.OakNetworkException, Exception_1.OakServerProxyException, Exception_1.OakRequestTimeoutException, Exception_1.OakClockDriftException, Exception_2.OakApplicationLoadingException];
async loadSavedToken() {
this.tokenValue = await this.storage.load(constants_1.LOCAL_STORAGE_KEYS.token);
await this.refreshTokenData(this.tokenValue);

View File

@ -9,12 +9,7 @@ type ChunkInfo = {
partCount: number;
uploadId: string;
merged: boolean;
parts: Array<{
partNumber: number;
uploadUrl: string;
etag: string;
formData?: Record<string, any>;
}>;
parts: Array<string>;
};
export type OpSchema = EntityShape & {
origin: CosOrigin;

View File

@ -61,5 +61,18 @@ exports.desc = {
}
},
actionType: "crud",
actions: Action_1.actions
actions: Action_1.actions,
indexes: [
{
// 业务上可能涉及的间接授权查询,建立索引以避免全表扫描
name: 'idx_oauthUser_composite',
attributes: [{
name: "userId",
}, {
name: 'providerUserId',
}, {
name: "providerConfigId",
}]
}
]
};

View File

@ -66,10 +66,10 @@ const triggers = [
});
return;
}
const cos = (0, index_backend_1.getCosBackend)(configOrigin);
if (!cos) {
if (!configOrigin) {
throw new Exception_1.OakException(`origin为${configOrigin}的extraFile没有定义Cos类请调用registerCos注入`);
}
const cos = (0, index_backend_1.getCosBackend)(configOrigin);
await cos.formUploadMeta(context.getApplication(), data, context);
Object.assign(data, {
uploadState: 'uploading',
@ -104,18 +104,21 @@ const triggers = [
(0, assert_1.default)(data.chunkInfo?.chunkSize <= 1 * 1024 * 1024 * 1024, `chunkInfo.chunkSize必须小于1GB`);
(0, assert_1.default)(data.chunkInfo?.partCount && data.chunkInfo.partCount > 0, `chunkInfo.partCount必须大于0`);
(0, assert_1.default)(!data.chunkInfo?.merged, `chunkInfo.merged必须为false`);
(0, assert_1.default)(data.chunkInfo?.partCount <= 100, `分片数量不能超过100`);
(0, assert_1.default)(data.chunkInfo?.partCount <= 1000, `分片数量不能超过1000`);
// 计算partCount 是否正确
const expectedPartCount = Math.ceil(data.size / data.chunkInfo.chunkSize);
(0, assert_1.default)(data.chunkInfo.partCount === expectedPartCount, `chunkInfo.partCount计算错误预期值为${expectedPartCount},但实际值为${data.chunkInfo.partCount}`);
const cos = (0, index_backend_1.getCosBackend)(data.origin);
if (!cos) {
if (!data.origin) {
throw new Exception_1.OakException(`origin为${data.origin}的extraFile没有定义Cos类请调用registerCos注入`);
}
const infos = await cos.composeChunkUploadInfo(context.getApplication(), data, context);
const cos = (0, index_backend_1.getCosBackend)(data.origin);
const infos = await cos.prepareChunkedUpload(context.getApplication(), data, context);
Object.assign(data, {
chunkInfo: {
...infos,
uploadId: infos.uploadId,
chunkSize: data.chunkInfo?.chunkSize,
partCount: data.chunkInfo?.partCount,
parts: new Array(data.chunkInfo.partCount).fill(null).map(() => ''),
merged: false,
},
});
@ -171,12 +174,15 @@ const triggers = [
if (extraFile.enableChunkedUpload) {
// 是否所有的分片都已经有etag上传成功
const chunkInfo = extraFile.chunkInfo;
const allPartsDone = chunkInfo?.parts?.every(part => part.etag);
const allPartsDone = chunkInfo?.parts?.every(part => !!part);
if (allPartsDone) {
if (!chunkInfo?.merged) {
try {
// 先完成分片合并
await uploader.mergeChunkedUpload(extraFile.application, extraFile, context);
await uploader.mergeChunkedUpload(extraFile.application, extraFile, extraFile.chunkInfo.parts.map((etag, index) => ({
partNumber: index + 1,
etag: etag,
})), context);
}
catch (err) {
console.error(`合并extraFile ${extraFile.id} 的分片上传时出错,但仍继续删除操作`, err);

27
lib/types/Cos.d.ts vendored
View File

@ -7,6 +7,11 @@ export type UploadToAspect = (file: File | string, name: string, // 文件的par
aspectName: string, // 上传的aspect名
formData: Record<string, any>, // 上传的其它part参数
autoInform?: boolean) => Promise<any>;
export type PresignMultiPartUploadFn = (from: number, to: number) => Promise<{
partNumber: number;
uploadUrl: string;
formData: Record<string, any>;
}[]>;
/**
* Complicated Object Storage
* extraFile对象上对文件进行操作的目标类
@ -26,6 +31,7 @@ export interface Cos<ED extends EntityDict> {
*/
upload: (options: {
extraFile: ED['extraFile']['OpSchema'];
presignMultiPartUpload?: PresignMultiPartUploadFn;
uploadFn: UploadFn;
file: string | File;
uploadToAspect?: UploadToAspect;
@ -77,9 +83,12 @@ export interface CosBackend<ED extends EntityDict> {
* @returns
*/
removeFile: (application: ED['application']['Schema'], extraFile: ED['extraFile']['OpSchema'], context: BRC<ED>) => Promise<void>;
prepareChunkedUpload: (application: ED['application']['Schema'], extraFile: ED['extraFile']['OpSchema'], context: BRC<ED>) => Promise<{
uploadId: string;
}>;
/**
*
* @param extraFile
* @param extraFileId extraFile的id
* @returns
*/
abortMultipartUpload: (application: ED['application']['Schema'], extraFile: ED['extraFile']['OpSchema'], context: BRC<ED>) => Promise<void>;
@ -99,7 +108,10 @@ export interface CosBackend<ED extends EntityDict> {
/**
*
*/
mergeChunkedUpload: (application: ED['application']['Schema'], extraFile: ED['extraFile']['OpSchema'], context: BRC<ED>) => Promise<void>;
mergeChunkedUpload: (application: ED['application']['Schema'], extraFile: ED['extraFile']['OpSchema'], parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<ED>) => Promise<void>;
/**
*
*/
@ -124,4 +136,15 @@ export interface CosBackend<ED extends EntityDict> {
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
/**
*
* @param extraFileId extraFile的id
* @param from partNumber
* @param to partNumber
*/
presignMultiPartUpload: (application: ED['application']['Schema'], extraFile: ED['extraFile']['OpSchema'], from: number, to: number, context: BRC<ED>) => Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
}

View File

@ -27,7 +27,10 @@ export default class ALiYunBackend extends ALiYun implements CosBackend<EntityDi
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: Array<{
@ -42,4 +45,17 @@ export default class ALiYunBackend extends ALiYun implements CosBackend<EntityDi
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
/**
*
* @param extraFileId extraFile的id
* @param from partNumber
* @param to partNumber
*/
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -6,6 +6,7 @@ const aliyun_1 = tslib_1.__importDefault(require("./aliyun"));
const oak_external_sdk_1 = require("oak-external-sdk");
const Exception_1 = require("oak-domain/lib/types/Exception");
const sts_1 = require("oak-external-sdk/lib/service/ali/sts");
const types_1 = require("oak-domain/lib/types");
class ALiYunBackend extends aliyun_1.default {
getConfigAndInstance(application) {
const { config, account } = this.getConfig(application);
@ -131,21 +132,29 @@ class ALiYunBackend extends aliyun_1.default {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
const key = this.formKey(extraFile);
const { instance, config: aliyunCosConfig } = this.getConfigAndInstance(application);
const b = aliyunCosConfig.buckets.find((ele) => ele.name === extraFile.bucket);
(0, assert_1.assert)(b, `extraFile中的bucket名称在阿里云配置中找不到「${extraFile.bucket}`);
await instance.completeMultipartUpload(extraFile.bucket, b.zone, key, extraFile.chunkInfo.uploadId, extraFile.chunkInfo.parts.map((part) => ({
number: part.partNumber,
etag: part.etag,
})));
(0, assert_1.assert)(extraFile.chunkInfo?.uploadId, 'extraFile缺少chunkInfo.uploadId无法完成分片上传的合并操作');
(0, assert_1.assert)(parts.length > 0, 'parts不能为空无法完成分片上传的合并操作');
try {
await instance.completeMultipartUpload(extraFile.bucket, b.zone, key, extraFile.chunkInfo.uploadId, parts.map(part => ({
number: part.partNumber,
etag: part.etag,
})));
}
catch (err) {
throw new types_1.OakException('合并分片上传失败' + 'extraFile' + err);
}
}
async abortMultipartUpload(application, extraFile, context) {
const key = this.formKey(extraFile);
const { instance, config: aliyunCosConfig } = this.getConfigAndInstance(application);
const b = aliyunCosConfig.buckets.find((ele) => ele.name === extraFile.bucket);
(0, assert_1.assert)(b, `extraFile中的bucket名称在阿里云配置中找不到「${extraFile.bucket}`);
(0, assert_1.assert)(extraFile.chunkInfo?.uploadId, 'extraFile缺少chunkInfo.uploadId无法中止分片上传操作');
await instance.abortMultipartUpload(extraFile.bucket, b.zone, key, extraFile.chunkInfo.uploadId);
}
async listMultipartUploads(application, extraFile, context) {
@ -153,6 +162,7 @@ class ALiYunBackend extends aliyun_1.default {
const { instance, config: aliyunCosConfig } = this.getConfigAndInstance(application);
const b = aliyunCosConfig.buckets.find((ele) => ele.name === extraFile.bucket);
(0, assert_1.assert)(b, `extraFile中的bucket名称在阿里云配置中找不到「${extraFile.bucket}`);
(0, assert_1.assert)(extraFile.chunkInfo?.uploadId, 'extraFile缺少chunkInfo.uploadId无法列出分片上传信息');
return await instance.listParts(extraFile.bucket, b.zone, key, extraFile.chunkInfo.uploadId);
}
async presignFile(methods, application, extraFile, context) {
@ -164,5 +174,61 @@ class ALiYunBackend extends aliyun_1.default {
expires: 24 * 60 * 60, // 1 day
});
}
/**
* 对一段文件的分片上传进行预签名
* @param extraFileId extraFile的id
* @param from 起始partNumber
* @param to 结束partNumber
*/
async presignMultiPartUpload(application, extraFile, from, to, context) {
const key = this.formKey(extraFile);
const { instance, config: aliyunCosConfig } = this.getConfigAndInstance(application);
const b = aliyunCosConfig.buckets.find((ele) => ele.name === extraFile.bucket);
(0, assert_1.assert)(b, `extraFile中的bucket名称在阿里云配置中找不到「${extraFile.bucket}`);
const res = await instance.presignMulti(extraFile.bucket, b.zone, key, extraFile.chunkInfo.uploadId, from, to, {
expires: 24 * 60 * 60, // 1 day
});
return res;
}
async prepareChunkedUpload(application, extraFile, context) {
const key = this.formKey(extraFile);
const { instance, config: aliyunCosConfig, account } = this.getConfigAndInstance(application);
let useSts = true;
let stsInfo = {};
if (!account.stsEndpoint || !account.roleArn || !account.roleSessionName) {
useSts = false;
console.warn("阿里云Cos配置中缺少sts相关配置无法使用sts方式上传分片将使用账号授权进行上传可能存在安全风险请检查确保不会暴露accessKey");
}
else {
try {
const res = await (0, sts_1.stsAssumeRole)({
accessKeyId: account.accessKeyId,
accessKeySecret: account.accessKeySecret,
endpoint: account.stsEndpoint,
roleArn: account.roleArn,
roleSessionName: account.roleSessionName,
});
stsInfo = {
stsToken: res.Credentials.SecurityToken,
accessKeyId: res.Credentials.AccessKeyId,
accessKeySecret: res.Credentials.AccessKeySecret,
};
}
catch (err) {
console.error("Failed to assume role for STS:", err);
throw new Exception_1.OakPreConditionUnsetException("获取阿里云STS临时凭证失败请检查配置是否正确", 'extraFile');
}
}
// 大部分校验都在formUploadMeta中完成这里可以不多做判断了
const b = aliyunCosConfig.buckets.find((ele) => ele.name === extraFile.bucket);
const preInit = await instance.initiateMultipartUpload(extraFile.bucket, b.zone, key, {
timeout: 30 * 1000, // 30 seconds
...(useSts ? stsInfo
: {}),
});
return {
uploadId: preInit.uploadId,
};
}
}
exports.default = ALiYunBackend;

View File

@ -1,5 +1,5 @@
import { EntityDict } from '../../oak-app-domain';
import { Cos, UploadFn, UploadToAspect } from "../../types/Cos";
import { Cos, PresignMultiPartUploadFn, UploadFn, UploadToAspect } from "../../types/Cos";
import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema';
import { ALiYunCosConfig } from '../../types/Config';
export default class ALiYun implements Cos<EntityDict> {
@ -12,6 +12,7 @@ export default class ALiYun implements Cos<EntityDict> {
protected formKey(extraFile: Partial<OpSchema>): string;
upload(options: {
extraFile: OpSchema;
presignMultiPartUpload?: PresignMultiPartUploadFn;
uploadFn: UploadFn;
file: string | File;
uploadToAspect?: UploadToAspect;

View File

@ -28,11 +28,12 @@ class ALiYun {
return `extraFile/${objectId}${extension ? '.' + extension : ''}`;
}
async upload(options) {
const { extraFile, uploadFn, file, uploadToAspect, getPercent, onChunkSuccess } = options;
const { extraFile, uploadFn, file, presignMultiPartUpload, uploadToAspect, getPercent, onChunkSuccess } = options;
const uploadMeta = extraFile.uploadMeta;
if (extraFile.enableChunkedUpload) {
return (0, common_1.chunkUpload)({
extraFile,
presignMultiPartUpload: presignMultiPartUpload,
uploadFn,
file,
getPercent,

View File

@ -1,5 +1,5 @@
import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema';
import { UploadFn } from "../../types/Cos";
import { PresignMultiPartUploadFn, UploadFn } from "../../types/Cos";
import { EntityDict } from '../../oak-app-domain';
export declare function isAbortError(error: any): boolean;
/**
@ -9,6 +9,7 @@ export declare function isAbortError(error: any): boolean;
*/
export declare function chunkUpload(options: {
extraFile: OpSchema;
presignMultiPartUpload: PresignMultiPartUploadFn;
uploadFn: UploadFn;
file: string | File;
getPercent?: Function;

View File

@ -2,7 +2,9 @@
Object.defineProperty(exports, "__esModule", { value: true });
exports.isAbortError = isAbortError;
exports.chunkUpload = chunkUpload;
const tslib_1 = require("tslib");
const slice_1 = require("../files/slice");
const assert_1 = tslib_1.__importDefault(require("assert"));
const Exception_1 = require("../../types/Exception");
function isAbortError(error) {
return error instanceof DOMException && error.name === 'AbortError';
@ -13,14 +15,17 @@ function isAbortError(error) {
* @return
*/
async function chunkUpload(options) {
const { extraFile, uploadFn, file, getPercent, onChunkSuccess } = options;
const { extraFile, uploadFn, file, getPercent, onChunkSuccess, presignMultiPartUpload } = options;
const chunkInfo = extraFile.chunkInfo;
const parallelism = options.parallelism || 5;
const retryTimes = options.retryTimes || 5;
const retryDelay = options.retryDelay || 1000;
// 过滤出未完成的分片
const pendingParts = chunkInfo.parts.filter(part => !part.etag);
if (pendingParts.length === 0) {
const pendingPartNumbers = chunkInfo.parts
.map((etag, index) => ({ partNumber: index + 1, etag }))
.filter(item => !item.etag)
.map(item => item.partNumber);
if (pendingPartNumbers.length === 0) {
return; // 所有分片已上传完成
}
// 将文件分片
@ -35,11 +40,93 @@ async function chunkUpload(options) {
getPercent(totalPercent);
}
}, 500);
const uploadTasks = pendingPartNumbers.map((partNumber) => ({
partNumber,
chunk: chunks[partNumber - 1]
}));
// 预签名池管理
const presignPool = new Map();
const fetchingRanges = new Map(); // 记录正在请求的范围
const BATCH_SIZE = 50;
/**
* 检查两个范围是否重叠
*/
const isRangeOverlap = (a1, a2, b1, b2) => {
return a1 <= b2 && b1 <= a2;
};
/**
* 查找与指定范围重叠的正在请求的范围
*/
const findOverlappingRanges = (from, to) => {
const overlapping = [];
for (const [key, range] of fetchingRanges.entries()) {
if (isRangeOverlap(from, to, range.from, range.to)) {
overlapping.push(range.promise);
}
}
return overlapping;
};
/**
* 获取指定 partNumber 的预签名信息
* 优化允许不重叠范围的请求并发执行
*/
const getPresign = async (partNumber) => {
if (presignPool.has(partNumber)) {
return presignPool.get(partNumber);
}
// 标准化范围计算,对齐到 BATCH_SIZE 的倍数
const batchIndex = Math.floor((partNumber - 1) / BATCH_SIZE);
const from = batchIndex * BATCH_SIZE + 1;
const to = Math.min(from + BATCH_SIZE - 1, chunkInfo.partCount);
const rangeKey = `${from}-${to}`;
// 如果已经有相同范围的请求,等待它
if (fetchingRanges.has(rangeKey)) {
await fetchingRanges.get(rangeKey).promise;
(0, assert_1.default)(presignPool.has(partNumber), `无法获取分片 ${partNumber} 的预签名信息`);
return presignPool.get(partNumber);
}
// 查找重叠的范围
let overlappingRequests = findOverlappingRanges(from, to);
while (overlappingRequests.length > 0) {
await Promise.all(overlappingRequests);
if (presignPool.has(partNumber)) {
return presignPool.get(partNumber);
}
// 在等待期间,可能其他任务已经发起了相同范围的请求
if (fetchingRanges.has(rangeKey)) {
await fetchingRanges.get(rangeKey).promise;
(0, assert_1.default)(presignPool.has(partNumber), `无法获取分片 ${partNumber} 的预签名信息`);
return presignPool.get(partNumber);
}
overlappingRequests = findOverlappingRanges(from, to);
}
// 创建请求
const fetchPromise = (async () => {
try {
const presignedParts = await presignMultiPartUpload(from, to);
for (const item of presignedParts) {
presignPool.set(item.partNumber, {
uploadUrl: item.uploadUrl,
formData: item.formData
});
}
}
finally {
fetchingRanges.delete(rangeKey);
}
})();
fetchingRanges.set(rangeKey, { from, to, promise: fetchPromise });
await fetchPromise;
(0, assert_1.default)(presignPool.has(partNumber), `无法获取分片 ${partNumber} 的预签名信息`);
return presignPool.get(partNumber);
};
// 上传单个分片的函数,带重试
const uploadPart = async (part, chunk) => {
const uploadPart = async (partNumber, chunk) => {
let lastError;
for (let attempt = 0; attempt <= retryTimes; attempt++) {
try {
// 从预签名池获取信息
const presignInfo = await getPresign(partNumber);
let data;
if (chunk.type === 'getter') {
data = await chunk.getFile();
@ -50,47 +137,34 @@ async function chunkUpload(options) {
const response = await uploadFn({
file: data,
name: 'file',
uploadUrl: part.uploadUrl,
formData: part.formData || {},
uploadUrl: presignInfo.uploadUrl, // 从池中获取
formData: presignInfo.formData, // 从池中获取
autoInform: true,
getPercent: (percent) => {
// 更新每个分片的进度
updateChunkPercent(part.partNumber, percent);
updateChunkPercent(partNumber, percent); // 使用 partNumber
},
uploadId: `${extraFile.id}:${part.partNumber}`,
uploadId: `${extraFile.id}:${partNumber}`, // 使用 partNumber
method: "PUT"
});
// 验证上传是否成功
let isSuccess = false;
isSuccess = !!(response.status === 200 || response.status === 204);
let isSuccess = !!(response.status === 200 || response.status === 204);
if (isSuccess) {
// // 标记该分片已完成
// part.etag = (response as Response).headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag")
// assert(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`);
return;
}
throw new Exception_1.OakUploadException(`分片 ${part.partNumber} 上传失败`);
throw new Exception_1.OakUploadException(`分片 ${partNumber} 上传失败`);
}
catch (err) {
console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err);
console.error(`分片 ${partNumber} 上传第 ${attempt + 1} 次失败:`, err);
lastError = err;
// 如果是OakUserException说明是用户主动中止上传不进行重试
if (isAbortError(err)) {
throw err;
}
if (attempt < retryTimes) {
// 等待后重试
await new Promise(resolve => setTimeout(resolve, retryDelay));
}
}
}
throw lastError || new Exception_1.OakUploadException(`分片 ${part.partNumber} 上传失败`);
throw lastError || new Exception_1.OakUploadException(`分片 ${partNumber} 上传失败`);
};
// 并行上传控制
const uploadTasks = pendingParts.map((part) => ({
part,
chunk: chunks[part.partNumber - 1]
}));
// 使用并发控制执行上传
const executing = new Set();
const errors = [];
@ -103,12 +177,11 @@ async function chunkUpload(options) {
let promise;
promise = (async () => {
try {
await uploadPart(task.part, task.chunk);
await uploadPart(task.partNumber, task.chunk); // 修改参数
}
catch (err) {
if (isAbortError(err)) {
// 用户主动中止上传,设置中止标志,阻止后续任务开始
console.log(`分片 ${task.part.partNumber} 上传被用户中止`);
console.log(`分片 ${task.partNumber} 上传被用户中止`); // 修改日志
shouldAbort = true;
}
errors.push(err);

View File

@ -24,7 +24,10 @@ export default class CTYunBackend extends CTYun implements CosBackend<EntityDict
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: never[];
@ -34,4 +37,12 @@ export default class CTYunBackend extends CTYun implements CosBackend<EntityDict
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -91,7 +91,7 @@ class CTYunBackend extends ctyun_1.default {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
// Implementation here
}
async abortMultipartUpload(application, extraFile, context) {
@ -112,5 +112,11 @@ class CTYunBackend extends ctyun_1.default {
expires: 24 * 60 * 60, // 1 day
});
}
presignMultiPartUpload(application, extraFile, from, to, context) {
throw new types_1.OakPreConditionUnsetException('天翼云暂不支持分片上传预签名');
}
prepareChunkedUpload(application, extraFile, context) {
throw new types_1.OakPreConditionUnsetException("天翼云分片上传请使用composeChunkUploadInfo方法获取上传信息", 'extraFile');
}
}
exports.default = CTYunBackend;

View File

@ -30,7 +30,7 @@ function registerCosBackend(clazz) {
CosBackendDict[instance.name] = instance;
}
function getCosBackend(origin) {
(0, assert_1.assert)(CosBackendDict.hasOwnProperty(origin));
(0, assert_1.assert)(CosBackendDict.hasOwnProperty(origin), `不存在类型为"${origin}"的CosBackend类`);
return CosBackendDict[origin];
}
async function composeFileUrlBackend(application, extraFile, context, style) {

View File

@ -24,7 +24,10 @@ export default class LocalBackend extends Local implements CosBackend<EntityDict
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: never[];
@ -34,4 +37,12 @@ export default class LocalBackend extends Local implements CosBackend<EntityDict
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -69,7 +69,7 @@ class LocalBackend extends local_1.default {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
// Implementation here
}
async abortMultipartUpload(application, extraFile, context) {
@ -91,6 +91,12 @@ class LocalBackend extends local_1.default {
url: '',
};
}
presignMultiPartUpload(application, extraFile, from, to, context) {
throw new types_1.OakPreConditionUnsetException('本地存储暂不支持分片上传预签名');
}
prepareChunkedUpload(application, extraFile, context) {
throw new types_1.OakPreConditionUnsetException("本地存储分片上传请使用composeChunkUploadInfo方法获取上传信息", 'extraFile');
}
}
exports.default = LocalBackend;
;

View File

@ -24,7 +24,10 @@ export default class QiniuBackend extends Qiniu implements CosBackend<EntityDict
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: never[];
@ -34,4 +37,12 @@ export default class QiniuBackend extends Qiniu implements CosBackend<EntityDict
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -110,7 +110,7 @@ class QiniuBackend extends qiniu_1.default {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
// Implementation here
}
async abortMultipartUpload(application, extraFile, context) {
@ -131,6 +131,12 @@ class QiniuBackend extends qiniu_1.default {
expires: 24 * 60 * 60, // 1 day
});
}
presignMultiPartUpload(application, extraFile, from, to, context) {
throw new Exception_1.OakPreConditionUnsetException('七牛云暂不支持分片上传预签名');
}
prepareChunkedUpload(application, extraFile, context) {
throw new Exception_1.OakPreConditionUnsetException("七牛云分片上传请使用composeChunkUploadInfo方法获取上传信息", 'extraFile');
}
}
exports.default = QiniuBackend;
;

View File

@ -27,7 +27,10 @@ export default class S3Backend extends S3 implements CosBackend<EntityDict> {
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: Array<{
@ -42,4 +45,12 @@ export default class S3Backend extends S3 implements CosBackend<EntityDict> {
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -119,24 +119,33 @@ class S3Backend extends s3_1.default {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
const key = this.formKey(extraFile);
const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket);
await instance.completeMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo.uploadId, extraFile.chunkInfo.parts.map((part) => ({
partNumber: part.partNumber,
eTag: part.etag,
})), s3CosConfig.endpoint);
(0, assert_1.assert)(extraFile.chunkInfo?.uploadId, 'extraFile缺少chunkInfo.uploadId无法完成分片上传的合并操作');
(0, assert_1.assert)(parts.length > 0, 'parts不能为空无法完成分片上传的合并操作');
try {
await instance.completeMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo.uploadId, parts.map(part => ({
partNumber: part.partNumber,
eTag: part.etag,
})), s3CosConfig.endpoint);
}
catch (err) {
throw new Exception_1.OakException('合并分片上传失败' + 'extraFile' + err);
}
}
async abortMultipartUpload(application, extraFile, context) {
const key = this.formKey(extraFile);
const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket);
(0, assert_1.assert)(extraFile.chunkInfo?.uploadId, 'extraFile缺少chunkInfo.uploadId无法中止分片上传操作');
await instance.abortMultipartUpload(extraFile.bucket, key, extraFile.chunkInfo.uploadId, s3CosConfig.endpoint);
}
async listMultipartUploads(application, extraFile, context) {
const key = this.formKey(extraFile);
const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket);
const result = await instance.listParts(extraFile.bucket, key, extraFile.chunkInfo.uploadId, s3CosConfig.endpoint, 101);
(0, assert_1.assert)(result.isTruncated === false, `分片数量超过101无法列出所有分片信息不应当出现这个情况触发器中已经限制了最大分片数量为100`);
const result = await instance.listParts(extraFile.bucket, key, extraFile.chunkInfo.uploadId, s3CosConfig.endpoint, 1001);
(0, assert_1.assert)(result.isTruncated === false, `分片数量超过1001无法列出所有分片信息不应当出现这个情况触发器中已经限制了最大分片数量为100`);
(0, assert_1.assert)(extraFile.chunkInfo?.uploadId, 'extraFile缺少chunkInfo.uploadId无法列出分片上传信息');
return {
parts: result.parts.map((part) => ({
partNumber: part.partNumber,
@ -155,5 +164,22 @@ class S3Backend extends s3_1.default {
expires: 24 * 60 * 60, // 1 day
});
}
presignMultiPartUpload(application, extraFile, from, to, context) {
const key = this.formKey(extraFile);
const { instance, config: s3CosConfig } = this.getConfigAndInstance(application, extraFile.bucket);
const b = s3CosConfig.buckets.find((ele) => ele.name === extraFile.bucket);
(0, assert_1.assert)(b, `extraFile中的bucket名称在S3配置中找不到「${extraFile.bucket}`);
return instance.presignMulti(extraFile.bucket, key, extraFile.chunkInfo.uploadId, from, to, {
expiresIn: 3 * 24 * 60 * 60, // 3 days
});
}
async prepareChunkedUpload(application, extraFile, context) {
const key = this.formKey(extraFile);
const { instance, config: s3Config } = this.getConfigAndInstance(application, extraFile.bucket);
const preInit = await instance.createMultipartUpload(extraFile.bucket, key, s3Config.endpoint);
return {
uploadId: preInit.uploadId,
};
}
}
exports.default = S3Backend;

View File

@ -1,5 +1,5 @@
import { EntityDict } from '../../oak-app-domain';
import { Cos, UploadFn, UploadToAspect } from "../../types/Cos";
import { Cos, PresignMultiPartUploadFn, UploadFn, UploadToAspect } from "../../types/Cos";
import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema';
import { S3CosConfig } from '../../types/Config';
export default class S3 implements Cos<EntityDict> {
@ -14,6 +14,7 @@ export default class S3 implements Cos<EntityDict> {
protected formKey(extraFile: Partial<OpSchema>): string;
upload(options: {
extraFile: OpSchema;
presignMultiPartUpload?: PresignMultiPartUploadFn;
uploadFn: UploadFn;
file: string | File;
uploadToAspect?: UploadToAspect;

View File

@ -30,11 +30,12 @@ class S3 {
return `extraFile/${objectId}${extension ? '.' + extension : ''}`;
}
async upload(options) {
const { extraFile, uploadFn, file, getPercent, parallelism, retryTimes, retryDelay, onChunkSuccess } = options;
const { extraFile, uploadFn, file, presignMultiPartUpload, getPercent, parallelism, retryTimes, retryDelay, onChunkSuccess } = options;
const uploadMeta = extraFile.uploadMeta;
if (extraFile.enableChunkedUpload) {
return (0, common_1.chunkUpload)({
extraFile,
presignMultiPartUpload: presignMultiPartUpload,
uploadFn,
file,
getPercent,

View File

@ -24,7 +24,10 @@ export default class TencentYunBackend extends TencentYun implements CosBackend<
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: never[];
@ -34,4 +37,12 @@ export default class TencentYunBackend extends TencentYun implements CosBackend<
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -91,7 +91,7 @@ class TencentYunBackend extends tencent_1.default {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
// Implementation here
}
async abortMultipartUpload(application, extraFile, context) {
@ -110,5 +110,11 @@ class TencentYunBackend extends tencent_1.default {
expires: 24 * 60 * 60, // 1 day
});
}
presignMultiPartUpload(application, extraFile, from, to, context) {
throw new Exception_1.OakPreConditionUnsetException('腾讯云暂不支持分片上传预签名');
}
prepareChunkedUpload(application, extraFile, context) {
throw new Exception_1.OakPreConditionUnsetException("腾讯云分片上传请使用composeChunkUploadInfo方法获取上传信息", 'extraFile');
}
}
exports.default = TencentYunBackend;

View File

@ -24,7 +24,10 @@ export default class UnknownBackend extends Unknown implements CosBackend<Entity
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: never[];
@ -34,4 +37,12 @@ export default class UnknownBackend extends Unknown implements CosBackend<Entity
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -31,7 +31,7 @@ class UnknownBackend extends unknown_1.default {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
// Implementation here
}
async abortMultipartUpload(application, extraFile, context) {
@ -44,6 +44,12 @@ class UnknownBackend extends unknown_1.default {
async presignFile(methods, application, extraFile, context) {
throw new types_1.OakPreConditionUnsetException('未知存储暂不支持预签名操作');
}
presignMultiPartUpload(application, extraFile, from, to, context) {
throw new types_1.OakPreConditionUnsetException('未知存储暂不支持分片上传预签名');
}
prepareChunkedUpload(application, extraFile, context) {
throw new types_1.OakPreConditionUnsetException("未知存储分片上传请使用composeChunkUploadInfo方法获取上传信息", 'extraFile');
}
}
exports.default = UnknownBackend;
;

View File

@ -24,7 +24,10 @@ export default class WechatBackend extends Wechat implements CosBackend<EntityDi
/**
*
*/
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
mergeChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, parts: Array<{
partNumber: number;
etag: string;
}>, context: BRC<EntityDict>): Promise<void>;
abortMultipartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<void>;
listMultipartUploads(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
parts: never[];
@ -34,4 +37,12 @@ export default class WechatBackend extends Wechat implements CosBackend<EntityDi
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
presignMultiPartUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, from: number, to: number, context: BRC<EntityDict>): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
prepareChunkedUpload(application: EntityDict['application']['Schema'], extraFile: OpSchema, context: BRC<EntityDict>): Promise<{
uploadId: string;
}>;
}

View File

@ -52,7 +52,7 @@ class WechatBackend extends wechat_1.default {
/**
* 完成分片上传后的合并操作
*/
async mergeChunkedUpload(application, extraFile, context) {
async mergeChunkedUpload(application, extraFile, parts, context) {
// Implementation here
}
async abortMultipartUpload(application, extraFile, context) {
@ -65,6 +65,12 @@ class WechatBackend extends wechat_1.default {
async presignFile(methods, application, extraFile, context) {
return { url: '' };
}
presignMultiPartUpload(application, extraFile, from, to, context) {
throw new types_1.OakPreConditionUnsetException('微信暂不支持分片上传预签名');
}
prepareChunkedUpload(application, extraFile, context) {
throw new types_1.OakPreConditionUnsetException("微信分片上传请使用composeChunkUploadInfo方法获取上传信息", 'extraFile');
}
}
exports.default = WechatBackend;
;

View File

@ -136,7 +136,10 @@ const watchers = [
continue;
}
// 去合并分片
await cos.mergeChunkedUpload(context.getApplication(), d, context);
await cos.mergeChunkedUpload(context.getApplication(), d, d.chunkInfo.parts.map((etag, index) => ({
partNumber: index + 1,
etag: etag,
})), context);
await context.operate('extraFile', {
id: await (0, uuid_1.generateNewIdAsync)(),
action: 'update',

View File

@ -1,6 +1,6 @@
{
"name": "oak-general-business",
"version": "5.11.0",
"version": "5.11.2",
"description": "oak框架中公共业务逻辑的实现",
"author": {
"name": "XuChang"
@ -21,13 +21,12 @@
"classnames": "^2.3.1",
"compressorjs": "^1.2.1",
"copy-to-clipboard": "^3.3.3",
"csstype": "^3.1.3",
"dayjs": "^1.11.9",
"nodemailer": "^6.9.14",
"oak-common-aspect": "^3.0.5",
"oak-domain": "^5.1.33",
"oak-external-sdk": "^2.3.12",
"oak-frontend-base": "^5.3.45",
"oak-common-aspect": "file:../oak-common-aspect",
"oak-domain": "file:../oak-domain",
"oak-external-sdk": "file:../oak-external-sdk",
"oak-frontend-base": "file:../oak-frontend-base",
"qrcode.react": "^3.1.0",
"react-dnd": "^16.0.1",
"react-dnd-html5-backend": "^16.0.1",

View File

@ -1038,11 +1038,27 @@ export type AspectDict<ED extends EntityDict> = {
method?: 'GET' | 'PUT' | 'POST' | 'DELETE';
},
context: BackendRuntimeContext<ED>
) => Promise<{
) => Promise<{
url: string;
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
}>;
/**
*
* @param params ,
*/
presignMultiPartUpload: (
params: {
extraFileId: string;
from: number;
to: number;
},
context: BackendRuntimeContext<ED>
) => Promise<{
partNumber: number;
uploadUrl: string;
formData: Record<string, any>;
}[]>;
/**
*
* @param loginName

View File

@ -125,17 +125,13 @@ export async function mergeChunkedUpload<ED extends EntityDict>(
const allPartsDone = parts.every(part => part.etag && part.size > 0);
assert(allPartsDone, `extraFile ${extraFileId} 存在未上传完成的分片,无法合并`);
// 赋值顺带删除一些无用信息减小体积出现过mysql排序超出限制的问题
extrafile.chunkInfo!.parts = parts.map((part, index) => ({
...extrafile.chunkInfo!.parts[index],
partNumber: part.partNumber,
etag: part.etag!,
uploadUrl: '', // 不需要保存上传链接
}));
await cos.mergeChunkedUpload(
extrafile.application!,
extrafile as any,
parts.map(part => ({
partNumber: part.partNumber,
etag: part.etag!,
})),
context as any
)
@ -151,6 +147,7 @@ export async function mergeChunkedUpload<ED extends EntityDict>(
chunkInfo: {
...extrafile.chunkInfo,
merged: true,
parts: parts.map(part => part.etag!),
},
},
filter: {
@ -164,7 +161,7 @@ export async function mergeChunkedUpload<ED extends EntityDict>(
closeRootMode();
throw err;
}
}
}
export async function presignFile<ED extends EntityDict>(
params: {
@ -172,7 +169,7 @@ export async function presignFile<ED extends EntityDict>(
method?: 'GET' | 'PUT' | 'POST' | 'DELETE';
},
context: BRC<ED>
): Promise<{
): Promise<{
url: string;
headers?: Record<string, string | string[]>;
formdata?: Record<string, any>;
@ -201,3 +198,44 @@ export async function presignFile<ED extends EntityDict>(
context as any
);
}
export async function presignMultiPartUpload<ED extends EntityDict>(
params: {
extraFileId: string,
from: number,
to: number,
},
context: BRC<ED>
): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]> {
const { extraFileId, from, to } = params;
assert(extraFileId, 'extraFileId不能为空');
assert(from >= 1, 'from必须大于等于1');
assert(to >= from, 'to必须大于等于from');
const [extrafile] = await context.select('extraFile', {
data: {
...extraFileProjection,
application: {
...applicationProjection,
},
chunkInfo: 1,
enableChunkedUpload: 1,
},
filter: {
id: extraFileId,
}
}, { dontCollect: true });
assert(extrafile, `找不到id为${extraFileId}的extraFile记录`);
const cos = getCosBackend(extrafile.origin!);
return cos.presignMultiPartUpload(
extrafile.application!,
extrafile as any,
from,
to,
context as any,
)
}

View File

@ -21,7 +21,7 @@ import {
loginWebByMpToken,
setUserAvatarFromWechat,
} from './token';
import { getInfoByUrl, mergeChunkedUpload,presignFile } from './extraFile';
import { getInfoByUrl, mergeChunkedUpload,presignFile, presignMultiPartUpload } from './extraFile';
import {
getApplication,
signatureJsSDK,
@ -147,6 +147,7 @@ const aspectDict = {
// extraFile新增
mergeChunkedUpload,
presignFile,
presignMultiPartUpload,
registerUserByLoginName,
};

View File

@ -43,7 +43,7 @@ export async function loginByOauth<ED extends EntityDict>(params: {
filter: {
state: stateCode,
},
}, { dontCollect: true });
}, { dontCollect: true, forUpdate: true }); // 这里直接加锁,防止其他人抢了
const systemId = context.getSystemId();
const [applicationPassport] = await context.select('applicationPassport', {
@ -115,7 +115,7 @@ export async function loginByOauth<ED extends EntityDict>(params: {
providerUserId: oauthUserInfo.providerUserId,
providerConfigId: state.providerId!,
}
}, { dontCollect: true })
}, { dontCollect: true, forUpdate: true }); // 加锁,防止并发绑定
// 已登录的情况
if (islogginedIn) {

View File

@ -44,38 +44,6 @@ export default OakComponent({
const scope = searchParams.get('scope') || '';
const state = searchParams.get('state') || '';
//判断是否允许oauth登录
const application = this.features.application.getApplication();
const { result: applicationPassports } = await this.features.cache.exec('getApplicationPassports', { applicationId: application.id });
const oauthPassport = applicationPassports?.find((ele: EntityDict['applicationPassport']['Schema']) => ele.passport?.type === 'oauth');
const oauthIds = oauthPassport?.config?.oauthIds;
let allowOauth = false;
if (clientId) {
const { data: [oauthProvider] } = await this.features.cache.refresh('oauthProvider', {
data: {
id: 1,
clientId: 1,
systemId: 1,
},
filter: {
clientId,
systemId: application.systemId,
}
});
if (oauthProvider?.id && oauthIds?.length > 0 && oauthIds.includes(oauthProvider?.id)) {
allowOauth = true;
}
}
if (!allowOauth) {
this.setState({
hasError: true,
errorMsg: 'oauth.login',
});
this.setState({ loading: false });
return;
}
this.setState({
client_id: clientId,
response_type: responseType,

View File

@ -1,3 +1,4 @@
import React from 'react';
import { WebComponentProps } from "oak-frontend-base";
import { Card, Input, Form, Button } from 'antd-mobile';
import { EntityDict } from "../../../../oak-app-domain";

View File

@ -9,12 +9,7 @@ type ChunkInfo = {
partCount: number;
uploadId: string;
merged: boolean;
parts: Array<{ // 在下一次上传的时候可以先从oss查询已上传的分片避免重复上传
partNumber: number;
uploadUrl: string;
etag: string;
formData?: Record<string, any>;
}>;
parts: Array<string>; // 记录etag
}
export interface Schema extends EntityShape {

View File

@ -47,7 +47,7 @@ export const LoadActionDef: ActionDef<LoadAction, LoadState> = {
export const entityDesc: EntityDesc<Schema, Action, '', {
loadState: LoadState;
}> = {
}> = {
locales: {
zh_CN: {
name: '用户登录连接',
@ -87,6 +87,19 @@ export const entityDesc: EntityDesc<Schema, Action, '', {
loaded: '#008000',
}
}
}
},
indexes: [
{
// 业务上可能涉及的间接授权查询,建立索引以避免全表扫描
name: 'idx_oauthUser_composite',
attributes: [{
name: 'user',
}, {
name: 'providerUserId',
}, {
name: 'providerConfig',
}]
}
]
};

View File

@ -41,7 +41,7 @@ export class ExtraFile<ED extends EntityDict> extends Feature {
this.cache = cache;
this.application = application;
this.files = {};
const up = new Upload();
this.fileUpLoad = up;
}
@ -340,6 +340,14 @@ export class ExtraFile<ED extends EntityDict> extends Feature {
await cos.upload({
extraFile: extraFile,
uploadFn: this.fileUpLoad.uploadFile,
presignMultiPartUpload: async (from, to) => {
const res = await this.cache.exec('presignMultiPartUpload', {
extraFileId,
from,
to,
});
return res.result;
},
file: file,
uploadToAspect: this.uploadToAspect.bind(this),
getPercent: getPercent,
@ -397,7 +405,7 @@ export class ExtraFile<ED extends EntityDict> extends Feature {
const uploadIds: string[] = [];
if (extraFile.enableChunkedUpload) {
for (let partNumber = 1; partNumber <= chunkInfo.partCount!; partNumber++) {
if (!chunkInfo.parts.find(part => part.partNumber === partNumber)?.etag) {
if (!chunkInfo.parts[partNumber - 1]) {
uploadIds.push(`${extraFile.id}:${partNumber}`);
}
}

View File

@ -16,7 +16,7 @@ import { Application } from './application';
import { WebEnv, WechatMpEnv, NativeEnv } from 'oak-domain/lib/types/Environment';
import { EntityDict } from '../oak-app-domain';
import { tokenProjection } from '../types/Projection';
import { OakPasswordUnset, OakUserInfoLoadingException } from '../types/Exception';
import { OakApplicationLoadingException, OakPasswordUnset, OakUserInfoLoadingException } from '../types/Exception';
import { LOCAL_STORAGE_KEYS } from '../config/constants';
import { cloneDeep } from 'oak-domain/lib/utils/lodash';
@ -26,7 +26,7 @@ export class Token<ED extends EntityDict> extends Feature {
protected cache: Cache<ED>;
protected storage: LocalStorage;
protected application: Application<ED>;
protected ignoreExceptionList: typeof OakException<ED>[] = [OakNetworkException, OakServerProxyException, OakRequestTimeoutException, OakClockDriftException];
protected ignoreExceptionList: typeof OakException<ED>[] = [OakNetworkException, OakServerProxyException, OakRequestTimeoutException, OakClockDriftException, OakApplicationLoadingException];
protected async loadSavedToken() {
this.tokenValue = await this.storage.load(LOCAL_STORAGE_KEYS.token);

View File

@ -80,10 +80,10 @@ const triggers: Trigger<EntityDict, 'extraFile', BRC<EntityDict>>[] = [
});
return;
}
const cos = getCosBackend(configOrigin!);
if (!cos) {
if (!configOrigin) {
throw new OakException(`origin为${configOrigin}的extraFile没有定义Cos类请调用registerCos注入`);
}
const cos = getCosBackend(configOrigin!);
await cos.formUploadMeta(context.getApplication() as EntityDict['application']['Schema'], data, context);
Object.assign(data, {
uploadState: 'uploading',
@ -122,18 +122,18 @@ const triggers: Trigger<EntityDict, 'extraFile', BRC<EntityDict>>[] = [
assert(data.chunkInfo?.chunkSize <= 1 * 1024 * 1024 * 1024, `chunkInfo.chunkSize必须小于1GB`);
assert(data.chunkInfo?.partCount && data.chunkInfo.partCount > 0, `chunkInfo.partCount必须大于0`);
assert(!data.chunkInfo?.merged, `chunkInfo.merged必须为false`);
assert(data.chunkInfo?.partCount <= 100, `分片数量不能超过100`);
assert(data.chunkInfo?.partCount <= 1000, `分片数量不能超过1000`);
// 计算partCount 是否正确
const expectedPartCount = Math.ceil(data.size! / data.chunkInfo!.chunkSize);
assert(data.chunkInfo!.partCount === expectedPartCount, `chunkInfo.partCount计算错误预期值为${expectedPartCount},但实际值为${data.chunkInfo!.partCount}`);
const cos = getCosBackend(data.origin!)
if (!cos) {
if (!data.origin) {
throw new OakException(`origin为${data.origin}的extraFile没有定义Cos类请调用registerCos注入`);
}
const cos = getCosBackend(data.origin!)
const infos = await cos.composeChunkUploadInfo(
const infos = await cos.prepareChunkedUpload(
context.getApplication() as EntityDict['application']['Schema'],
data,
context
@ -141,7 +141,10 @@ const triggers: Trigger<EntityDict, 'extraFile', BRC<EntityDict>>[] = [
Object.assign(data, {
chunkInfo: {
...infos,
uploadId: infos.uploadId,
chunkSize: data.chunkInfo?.chunkSize,
partCount: data.chunkInfo?.partCount,
parts: new Array(data.chunkInfo!.partCount).fill(null).map(() => ''),
merged: false,
},
});
@ -203,7 +206,7 @@ const triggers: Trigger<EntityDict, 'extraFile', BRC<EntityDict>>[] = [
if (extraFile.enableChunkedUpload) {
// 是否所有的分片都已经有etag上传成功
const chunkInfo = extraFile.chunkInfo;
const allPartsDone = chunkInfo?.parts?.every(part => part.etag);
const allPartsDone = chunkInfo?.parts?.every(part => !!part);
if (allPartsDone) {
if (!chunkInfo?.merged) {
try {
@ -211,6 +214,10 @@ const triggers: Trigger<EntityDict, 'extraFile', BRC<EntityDict>>[] = [
await uploader.mergeChunkedUpload(
extraFile.application!,
extraFile as any,
extraFile.chunkInfo!.parts!.map((etag, index) => ({
partNumber: index + 1,
etag: etag!,
})),
context as any
)
} catch (err) {

View File

@ -13,6 +13,15 @@ export type UploadToAspect = (
autoInform?: boolean // 上传成功是否会自动通知server若不会则需要前台显式通知
) => Promise<any>
export type PresignMultiPartUploadFn = (
from: number,
to: number,
) => Promise<{
partNumber: number;
uploadUrl: string;
formData: Record<string, any>;
}[]>
/**
* Complicated Object Storage
* extraFile对象上对文件进行操作的目标类
@ -36,6 +45,8 @@ export interface Cos<ED extends EntityDict> {
upload: (
options: {
extraFile: ED['extraFile']['OpSchema'],
// 预签名分片上传函数,仅在分片上传时提供
presignMultiPartUpload?: PresignMultiPartUploadFn,
uploadFn: UploadFn,
file: string | File,
uploadToAspect?: UploadToAspect,
@ -113,9 +124,17 @@ export interface CosBackend<ED extends EntityDict> {
context: BRC<ED>,
) => Promise<void>;
prepareChunkedUpload: (
application: ED['application']['Schema'],
extraFile: ED['extraFile']['OpSchema'],
context: BRC<ED>,
) => Promise<{
uploadId: string;
}>;
/**
*
* @param extraFile
* @param extraFileId extraFile的id
* @returns
*/
abortMultipartUpload: (
@ -148,6 +167,10 @@ export interface CosBackend<ED extends EntityDict> {
mergeChunkedUpload: (
application: ED['application']['Schema'],
extraFile: ED['extraFile']['OpSchema'],
parts: Array<{
partNumber: number,
etag: string,
}>,
context: BRC<ED>,
) => Promise<void>;
@ -185,4 +208,22 @@ export interface CosBackend<ED extends EntityDict> {
headers?: Record<string, string | string[]>; // 如果是PUT/DELETE/GET请求可能需要添加请求头
formdata?: Record<string, any>; // 对于POST上传可能需要添加表单数据
}>;
/**
*
* @param extraFileId extraFile的id
* @param from partNumber
* @param to partNumber
*/
presignMultiPartUpload: (
application: ED['application']['Schema'],
extraFile: ED['extraFile']['OpSchema'],
from: number,
to: number,
context: BRC<ED>,
) => Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]>;
}

View File

@ -1,4 +1,4 @@
import { EntityDict } from '../../oak-app-domain';
import { ApplicationPassport, Captcha, Domain, EntityDict, ExtraFile, Notification, OauthAuthorizationCode, OauthUser, Session, SessionMessage, System, Token, WechatMenu, WechatPublicAutoReply, WechatPublicTag, WechatQrCode, WechatTemplate, WechatUser } from '../../oak-app-domain';
import { assert } from 'oak-domain/lib/utils/assert';
import { CosBackend } from '../../types/Cos';
import ALiYun from './aliyun';
@ -9,6 +9,9 @@ import { ALiYunInstance, ALiYunSDK } from 'oak-external-sdk';
import { OakExternalException, OakPreConditionUnsetException } from 'oak-domain/lib/types/Exception';
import { BRC } from '../..';
import { stsAssumeRole } from 'oak-external-sdk/lib/service/ali/sts';
import { EntityShape, String, Text, ForeignKey, AggregationResult, OakException } from 'oak-domain/lib/types';
import { AppType, WebConfig, WechatMpConfig, WechatPublicConfig, NativeConfig } from '../../oak-app-domain/Application/_baseSchema';
import { Style } from '../../types/Style';
export default class ALiYunBackend
extends ALiYun
@ -151,7 +154,7 @@ export default class ALiYunBackend
const key = this.formKey(extraFile);
const { instance, config: aliyunCosConfig, account } =
this.getConfigAndInstance(application);
let useSts = true;
let stsInfo = {};
if (!account.stsEndpoint || !account.roleArn || !account.roleSessionName) {
@ -212,6 +215,10 @@ export default class ALiYunBackend
async mergeChunkedUpload(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
parts: Array<{
partNumber: number,
etag: string,
}>,
context: BRC<EntityDict>,
): Promise<void> {
const key = this.formKey(extraFile);
@ -226,16 +233,23 @@ export default class ALiYunBackend
`extraFile中的bucket名称在阿里云配置中找不到「${extraFile.bucket}`
);
await instance.completeMultipartUpload(
extraFile.bucket!,
b.zone,
key,
extraFile.chunkInfo!.uploadId!,
extraFile.chunkInfo!.parts!.map((part) => ({
number: part.partNumber,
etag: part.etag!,
}))
);
assert(extraFile.chunkInfo?.uploadId, 'extraFile缺少chunkInfo.uploadId无法完成分片上传的合并操作');
assert(parts.length > 0, 'parts不能为空无法完成分片上传的合并操作');
try {
await instance.completeMultipartUpload(
extraFile.bucket!,
b.zone,
key,
extraFile.chunkInfo!.uploadId!,
parts.map(part => ({
number: part.partNumber,
etag: part.etag,
}))
);
} catch (err: any) {
throw new OakException('合并分片上传失败'+ 'extraFile' + err);
}
}
async abortMultipartUpload(
@ -254,6 +268,8 @@ export default class ALiYunBackend
`extraFile中的bucket名称在阿里云配置中找不到「${extraFile.bucket}`
);
assert(extraFile.chunkInfo?.uploadId, 'extraFile缺少chunkInfo.uploadId无法中止分片上传操作');
await instance.abortMultipartUpload(
extraFile.bucket!,
b.zone,
@ -284,6 +300,9 @@ export default class ALiYunBackend
b,
`extraFile中的bucket名称在阿里云配置中找不到「${extraFile.bucket}`
);
assert(extraFile.chunkInfo?.uploadId, 'extraFile缺少chunkInfo.uploadId无法列出分片上传信息');
return await instance.listParts(
extraFile.bucket!,
b.zone,
@ -322,4 +341,91 @@ export default class ALiYunBackend
}
);
}
/**
*
* @param extraFileId extraFile的id
* @param from partNumber
* @param to partNumber
*/
async presignMultiPartUpload(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
from: number,
to: number,
context: BRC<EntityDict>,
) {
const key = this.formKey(extraFile);
const { instance, config: aliyunCosConfig } =
this.getConfigAndInstance(application);
const b = (aliyunCosConfig as ALiYunCosConfig).buckets.find(
(ele) => ele.name === extraFile.bucket
);
assert(
b,
`extraFile中的bucket名称在阿里云配置中找不到「${extraFile.bucket}`
);
const res = await instance.presignMulti(extraFile.bucket!, b.zone, key, extraFile.chunkInfo!.uploadId!, from, to, {
expires: 24 * 60 * 60, // 1 day
});
return res;
}
async prepareChunkedUpload(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
context: BRC<EntityDict>,
): Promise<{
uploadId: string;
}> {
const key = this.formKey(extraFile);
const { instance, config: aliyunCosConfig, account } =
this.getConfigAndInstance(application);
let useSts = true;
let stsInfo = {};
if (!account.stsEndpoint || !account.roleArn || !account.roleSessionName) {
useSts = false;
console.warn("阿里云Cos配置中缺少sts相关配置无法使用sts方式上传分片将使用账号授权进行上传可能存在安全风险请检查确保不会暴露accessKey");
} else {
try {
const res = await stsAssumeRole(
{
accessKeyId: account.accessKeyId,
accessKeySecret: account.accessKeySecret,
endpoint: account.stsEndpoint!,
roleArn: account.roleArn!,
roleSessionName: account.roleSessionName!,
}
);
stsInfo = {
stsToken: res.Credentials.SecurityToken,
accessKeyId: res.Credentials.AccessKeyId,
accessKeySecret: res.Credentials.AccessKeySecret,
}
} catch (err: any) {
console.error("Failed to assume role for STS:", err);
throw new OakPreConditionUnsetException("获取阿里云STS临时凭证失败请检查配置是否正确", 'extraFile');
}
}
// 大部分校验都在formUploadMeta中完成这里可以不多做判断了
const b = (aliyunCosConfig as ALiYunCosConfig).buckets.find((ele) => ele.name === extraFile.bucket);
const preInit = await instance.initiateMultipartUpload(
extraFile.bucket!,
b!.zone,
key,
{
timeout: 30 * 1000, // 30 seconds
...(useSts ? stsInfo
: {}),
}
)
return {
uploadId: preInit.uploadId,
};
}
}

View File

@ -1,6 +1,6 @@
import { EntityDict } from '../../oak-app-domain';
import { assert } from 'oak-domain/lib/utils/assert';
import { Cos, UploadFn, UploadToAspect } from "../../types/Cos";
import { Cos, PresignMultiPartUploadFn, UploadFn, UploadToAspect } from "../../types/Cos";
import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema';
import { AliYunUploadInfo } from '../../types/Upload';
@ -42,6 +42,7 @@ export default class ALiYun implements Cos<EntityDict> {
async upload(
options: {
extraFile: OpSchema,
presignMultiPartUpload?: PresignMultiPartUploadFn,
uploadFn: UploadFn,
file: string | File,
uploadToAspect?: UploadToAspect,
@ -53,11 +54,12 @@ export default class ALiYun implements Cos<EntityDict> {
onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise<void> // 每个分片上传成功的回调
}
) {
const { extraFile, uploadFn, file, uploadToAspect, getPercent, onChunkSuccess } = options;
const { extraFile, uploadFn, file, presignMultiPartUpload, uploadToAspect, getPercent, onChunkSuccess } = options;
const uploadMeta = extraFile.uploadMeta! as AliYunUploadInfo;
if (extraFile.enableChunkedUpload) {
return chunkUpload({
extraFile,
presignMultiPartUpload: presignMultiPartUpload!,
uploadFn,
file,
getPercent,

View File

@ -1,6 +1,6 @@
import { MpFileGetter, sliceFile, cleanTempFiles } from '../files/slice';
import { OpSchema } from '../../oak-app-domain/ExtraFile/Schema';
import { UploadFn } from "../../types/Cos";
import { PresignMultiPartUploadFn, UploadFn } from "../../types/Cos";
import { EntityDict } from '../../oak-app-domain';
import assert from 'assert';
import { OakUploadException } from '../../types/Exception';
@ -17,6 +17,7 @@ export function isAbortError(error: any): boolean {
export async function chunkUpload(
options: {
extraFile: OpSchema,
presignMultiPartUpload: PresignMultiPartUploadFn,
uploadFn: UploadFn,
file: string | File,
getPercent?: Function
@ -27,16 +28,19 @@ export async function chunkUpload(
onChunkSuccess?: (chunkInfo: EntityDict['extraFile']['Schema']['chunkInfo']) => Promise<void> // 每个分片上传成功的回调
}
) {
const { extraFile, uploadFn, file, getPercent, onChunkSuccess } = options;
const { extraFile, uploadFn, file, getPercent, onChunkSuccess, presignMultiPartUpload } = options;
const chunkInfo = extraFile.chunkInfo!;
const parallelism = options.parallelism || 5;
const retryTimes = options.retryTimes || 5;
const retryDelay = options.retryDelay || 1000;
// 过滤出未完成的分片
const pendingParts = chunkInfo.parts.filter(part => !part.etag);
if (pendingParts.length === 0) {
const pendingPartNumbers = chunkInfo.parts
.map((etag, index) => ({ partNumber: index + 1, etag }))
.filter(item => !item.etag)
.map(item => item.partNumber);
if (pendingPartNumbers.length === 0) {
return; // 所有分片已上传完成
}
@ -58,11 +62,107 @@ export async function chunkUpload(
}
}, 500);
const uploadTasks = pendingPartNumbers.map((partNumber) => ({
partNumber,
chunk: chunks[partNumber - 1]
}));
// 预签名池管理
const presignPool = new Map<number, { uploadUrl: string; formData: Record<string, any> }>();
const fetchingRanges = new Map<string, { from: number; to: number; promise: Promise<void> }>(); // 记录正在请求的范围
const BATCH_SIZE = 50;
/**
*
*/
const isRangeOverlap = (a1: number, a2: number, b1: number, b2: number): boolean => {
return a1 <= b2 && b1 <= a2;
};
/**
*
*/
const findOverlappingRanges = (from: number, to: number): Promise<void>[] => {
const overlapping: Promise<void>[] = [];
for (const [key, range] of fetchingRanges.entries()) {
if (isRangeOverlap(from, to, range.from, range.to)) {
overlapping.push(range.promise);
}
}
return overlapping;
};
/**
* partNumber
*
*/
const getPresign = async (partNumber: number) => {
if (presignPool.has(partNumber)) {
return presignPool.get(partNumber)!;
}
// 标准化范围计算,对齐到 BATCH_SIZE 的倍数
const batchIndex = Math.floor((partNumber - 1) / BATCH_SIZE);
const from = batchIndex * BATCH_SIZE + 1;
const to = Math.min(from + BATCH_SIZE - 1, chunkInfo.partCount);
const rangeKey = `${from}-${to}`;
// 如果已经有相同范围的请求,等待它
if (fetchingRanges.has(rangeKey)) {
await fetchingRanges.get(rangeKey)!.promise;
assert(presignPool.has(partNumber), `无法获取分片 ${partNumber} 的预签名信息`);
return presignPool.get(partNumber)!;
}
// 查找重叠的范围
let overlappingRequests = findOverlappingRanges(from, to);
while (overlappingRequests.length > 0) {
await Promise.all(overlappingRequests);
if (presignPool.has(partNumber)) {
return presignPool.get(partNumber)!;
}
// 在等待期间,可能其他任务已经发起了相同范围的请求
if (fetchingRanges.has(rangeKey)) {
await fetchingRanges.get(rangeKey)!.promise;
assert(presignPool.has(partNumber), `无法获取分片 ${partNumber} 的预签名信息`);
return presignPool.get(partNumber)!;
}
overlappingRequests = findOverlappingRanges(from, to);
}
// 创建请求
const fetchPromise = (async () => {
try {
const presignedParts = await presignMultiPartUpload(from, to);
for (const item of presignedParts) {
presignPool.set(item.partNumber, {
uploadUrl: item.uploadUrl,
formData: item.formData
});
}
} finally {
fetchingRanges.delete(rangeKey);
}
})();
fetchingRanges.set(rangeKey, { from, to, promise: fetchPromise });
await fetchPromise;
assert(presignPool.has(partNumber), `无法获取分片 ${partNumber} 的预签名信息`);
return presignPool.get(partNumber)!;
};
// 上传单个分片的函数,带重试
const uploadPart = async (part: typeof chunkInfo.parts[0], chunk: File | Blob | MpFileGetter) => {
const uploadPart = async (partNumber: number, chunk: File | Blob | MpFileGetter) => {
let lastError;
for (let attempt = 0; attempt <= retryTimes; attempt++) {
try {
// 从预签名池获取信息
const presignInfo = await getPresign(partNumber);
let data: File | Blob | string;
if (chunk.type === 'getter') {
data = await (chunk as MpFileGetter).getFile() as string;
@ -70,62 +170,46 @@ export async function chunkUpload(
data = chunk as File | Blob;
}
const response = await uploadFn(
{
file: data,
name: 'file',
uploadUrl: part.uploadUrl,
formData: part.formData || {},
autoInform: true,
getPercent: (percent: number) => {
// 更新每个分片的进度
updateChunkPercent(part.partNumber, percent);
},
uploadId: `${extraFile.id}:${part.partNumber}`,
method: "PUT"
}
);
// 验证上传是否成功
let isSuccess = false;
isSuccess = !!(response.status === 200 || response.status === 204);
const response = await uploadFn({
file: data,
name: 'file',
uploadUrl: presignInfo.uploadUrl, // 从池中获取
formData: presignInfo.formData, // 从池中获取
autoInform: true,
getPercent: (percent: number) => {
updateChunkPercent(partNumber, percent); // 使用 partNumber
},
uploadId: `${extraFile.id}:${partNumber}`, // 使用 partNumber
method: "PUT"
});
let isSuccess = !!(response.status === 200 || response.status === 204);
if (isSuccess) {
// // 标记该分片已完成
// part.etag = (response as Response).headers?.get("ETag") || response.headers?.get("etag") || response.headers?.get("eTag")
// assert(part.etag, `无法获取分片 ${part.partNumber} 的 ETag`);
return;
}
throw new OakUploadException(`分片 ${part.partNumber} 上传失败`);
throw new OakUploadException(`分片 ${partNumber} 上传失败`);
} catch (err: any) {
console.error(`分片 ${part.partNumber} 上传第 ${attempt + 1} 次失败:`, err);
console.error(`分片 ${partNumber} 上传第 ${attempt + 1} 次失败:`, err);
lastError = err;
// 如果是OakUserException说明是用户主动中止上传不进行重试
if (isAbortError(err)) {
throw err;
}
if (attempt < retryTimes) {
// 等待后重试
await new Promise(resolve => setTimeout(resolve, retryDelay));
}
}
}
throw lastError || new OakUploadException(`分片 ${part.partNumber} 上传失败`);
throw lastError || new OakUploadException(`分片 ${partNumber} 上传失败`);
};
// 并行上传控制
const uploadTasks = pendingParts.map((part) => ({
part,
chunk: chunks[part.partNumber - 1]
}));
// 使用并发控制执行上传
const executing: Set<Promise<void>> = new Set();
const errors: Error[] = [];
let shouldAbort = false; // 中止标志
for (const task of uploadTasks) {
for (const task of uploadTasks) {
// 如果已经需要中止,跳过未开始的任务
if (shouldAbort) {
break;
@ -134,11 +218,10 @@ export async function chunkUpload(
let promise;
promise = (async () => {
try {
await uploadPart(task.part, task.chunk);
await uploadPart(task.partNumber, task.chunk); // 修改参数
} catch (err) {
if (isAbortError(err)) {
// 用户主动中止上传,设置中止标志,阻止后续任务开始
console.log(`分片 ${task.part.partNumber} 上传被用户中止`);
console.log(`分片 ${task.partNumber} 上传被用户中止`); // 修改日志
shouldAbort = true;
}
errors.push(err as Error);
@ -149,11 +232,11 @@ export async function chunkUpload(
}
}
})();
executing.add(promise);
// 当达到并发限制时,等待任意一个完成
if (executing.size >= parallelism) {
if (executing.size >= parallelism) {
await Promise.race(executing).catch(() => { });
}
}
@ -170,7 +253,7 @@ export async function chunkUpload(
// 等待所有任务完成
await Promise.all(executing);
// // 调用分片成功回调(所有分片完成后)
// if (onChunkSuccess) {
// await onChunkSuccess(chunkInfo);

View File

@ -156,13 +156,17 @@ export default class CTYunBackend
parts: [],
}
}
/**
*
*/
async mergeChunkedUpload(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
parts: Array<{
partNumber: number,
etag: string,
}>,
context: BRC<EntityDict>,
): Promise<void> {
// Implementation here
@ -178,7 +182,7 @@ export default class CTYunBackend
this.getConfigAndInstance(application);
}
async listMultipartUploads(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
@ -219,4 +223,28 @@ export default class CTYunBackend
}
);
}
presignMultiPartUpload(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
from: number,
to: number,
context: BRC<EntityDict>,
): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]> {
throw new OakPreConditionUnsetException('天翼云暂不支持分片上传预签名');
}
prepareChunkedUpload(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
context: BRC<EntityDict>,
): Promise<{
uploadId: string;
}> {
throw new OakPreConditionUnsetException("天翼云分片上传请使用composeChunkUploadInfo方法获取上传信息", 'extraFile');
}
}

View File

@ -33,7 +33,7 @@ export function registerCosBackend<ED extends EntityDict>(clazz: new () => CosBa
}
export function getCosBackend<ED extends EntityDict>(origin: string) {
assert(CosBackendDict.hasOwnProperty(origin));
assert(CosBackendDict.hasOwnProperty(origin), `不存在类型为"${origin}"的CosBackend类`);
return CosBackendDict[origin] as CosBackend<ED>;
}

View File

@ -95,7 +95,7 @@ export default class LocalBackend extends Local implements CosBackend<EntityDict
}
async composeChunkUploadInfo(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
@ -118,6 +118,10 @@ export default class LocalBackend extends Local implements CosBackend<EntityDict
async mergeChunkedUpload(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
parts: Array<{
partNumber: number,
etag: string,
}>,
context: BRC<EntityDict>,
): Promise<void> {
// Implementation here
@ -133,7 +137,7 @@ export default class LocalBackend extends Local implements CosBackend<EntityDict
this.getConfigAndInstance(application);
}
async listMultipartUploads(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
@ -169,4 +173,28 @@ export default class LocalBackend extends Local implements CosBackend<EntityDict
url: '',
}
}
presignMultiPartUpload(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
from: number,
to: number,
context: BRC<EntityDict>,
): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]> {
throw new OakPreConditionUnsetException('本地存储暂不支持分片上传预签名');
}
prepareChunkedUpload(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
context: BRC<EntityDict>,
): Promise<{
uploadId: string;
}> {
throw new OakPreConditionUnsetException("本地存储分片上传请使用composeChunkUploadInfo方法获取上传信息", 'extraFile');
}
};

View File

@ -152,7 +152,7 @@ export default class QiniuBackend extends Qiniu implements CosBackend<EntityDict
}
}
async composeChunkUploadInfo(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
@ -175,6 +175,10 @@ export default class QiniuBackend extends Qiniu implements CosBackend<EntityDict
async mergeChunkedUpload(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
parts: Array<{
partNumber: number,
etag: string,
}>,
context: BRC<EntityDict>,
): Promise<void> {
// Implementation here
@ -230,4 +234,28 @@ export default class QiniuBackend extends Qiniu implements CosBackend<EntityDict
}
);
}
presignMultiPartUpload(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
from: number,
to: number,
context: BRC<EntityDict>,
): Promise<{
partNumber: number;
uploadUrl: string;
formData?: Record<string, any>;
}[]> {
throw new OakPreConditionUnsetException('七牛云暂不支持分片上传预签名');
}
prepareChunkedUpload(
application: EntityDict['application']['Schema'],
extraFile: OpSchema,
context: BRC<EntityDict>,
): Promise<{
uploadId: string;
}> {
throw new OakPreConditionUnsetException("七牛云分片上传请使用composeChunkUploadInfo方法获取上传信息", 'extraFile');
}
};

Some files were not shown because too many files have changed in this diff Show More