import { assert } from 'oak-domain/lib/utils/assert'; import { EntityDict } from '../oak-app-domain'; import { BackendRuntimeContext } from '../context/BackendRuntimeContext'; import { Watcher, BBWatcher } from 'oak-domain/lib/types/Watcher'; import { getCosBackend } from '../utils/cos/index.backend'; import { generateNewIdAsync } from 'oak-domain/lib/utils/uuid'; import { groupBy } from 'oak-domain/lib/utils/lodash'; import { extraFileProjection } from '../types/Projection'; import applicationPassport from '../components/applicationPassport'; async function checkWhetherSuccess(context: BackendRuntimeContext, applicationId: string, rows: EntityDict['extraFile']['OpSchema'][]) { const successIds: string[] = []; const failedIds: string[] = []; await context.setApplication(applicationId); for (const d of rows) { const { origin } = d; const cos = getCosBackend(origin!); assert(cos); const success = await cos.checkWhetherSuccess( context.getApplication() as EntityDict['application']['Schema'], d as EntityDict['extraFile']['OpSchema'], context ); if (success) { successIds.push(d.id!); } else { failedIds.push(d.id!); } } if (successIds.length > 0) { await context.operate('extraFile', { id: await generateNewIdAsync(), action: 'update', data: { uploadState: 'success', }, filter: { id: { $in: successIds, } } }, {}); } if (failedIds.length > 0) { await context.operate('extraFile', { id: await generateNewIdAsync(), action: 'update', data: { uploadState: 'failed', }, filter: { id: { $in: successIds, } } }, {}); } } const watchers: Watcher< EntityDict, 'extraFile', BackendRuntimeContext >[] = [ { name: '确定uploading的文件状态', entity: 'extraFile', filter: async () => { const now = Date.now(); const deadline = process.env.NODE_ENV === 'production' ? now - 3600 * 1000 : now - 60 * 1000; return { $$updateAt$$: { $lt: deadline, }, uploadState: 'uploading', enableChunkedUpload: { $exists: false, } }; }, projection: { id: 1, applicationId: 1, origin: 1, bucket: 1, uploadState: 1, objectId: 1, extension: 1, }, fn: async (context, data) => { const eg = groupBy(data, 'applicationId'); for (const appId in eg) { await checkWhetherSuccess(context, appId, eg[appId] as EntityDict['extraFile']['OpSchema'][]); } return { extraFile: { update: data.length, } }; } }, { name: '确定uploading的文件状态', entity: 'extraFile', filter: async () => { const now = Date.now(); const deadline = process.env.NODE_ENV === 'production' ? now - 3 * 24 * 60 * 60 * 1000 : now - 60 * 1000; return { $$updateAt$$: { $lt: deadline, }, uploadState: 'uploading', enableChunkedUpload: true }; }, projection: { ...extraFileProjection, application: { ...applicationPassport, } }, fn: async (context, data) => { const eg = groupBy(data, 'applicationId'); for (const appId in eg) { // 这里检查需要分片上传的文件信息,但是考虑到可能还在上传之类的,所以先不管,后面再处理 // 1. 要去查询分片信息,看看是不是都有etag了,如果都有etag了,就说明上传完成了,否则就标记为失败 } return { extraFile: { update: data.length, } }; } }, ]; export default watchers;