import { assert } from 'oak-domain/lib/utils/assert'; import { generateNewIdAsync } from 'oak-domain/lib/utils/uuid'; import { tryMakeSmsNotification } from '../utils/message/sms'; import { getNotificationHandler, getNotificationFailureHandlers } from '../utils/notification'; async function sendNotification(notification, context) { const { channel } = notification; const handler = getNotificationHandler(channel); await handler(notification, context); return 1; } async function tryCreateSmsNotification(message, context) { const smsNotification = await tryMakeSmsNotification(message, context); if (smsNotification) { const { messageSystem$message } = message; for (const ms of messageSystem$message) { const { id } = ms; await context.operate('notification', { id: await generateNewIdAsync(), action: 'create', data: Object.assign(smsNotification, { messageSystemId: id, }), }, { dontCollect: true }); } return messageSystem$message.length; } return 0; } const triggers = [ { name: '当创建notification后,业务提交后再进行推送', entity: 'notification', action: 'create', when: 'commit', strict: 'takeEasy', fn: async ({ ids }, context) => { const closeRootMode = context.openRootMode(); try { for (const id of ids) { const [row] = await context.select('notification', { data: { id: 1, data: 1, templateId: 1, channel: 1, messageSystemId: 1, data1: 1, }, filter: { id, }, }, {}); await sendNotification(row, context); } } catch (err) { closeRootMode(); throw err; } closeRootMode(); } }, { name: '当notification完成时,根据情况去更新message', entity: 'notification', when: 'after', action: ['fail', 'succeed'], fn: async ({ operation }, context) => { const { filter } = operation; assert(filter.id); const closeRootMode = context.openRootMode(); try { const messages = await context.select('message', { data: { id: 1, weight: 1, iState: 1, type: 1, entity: 1, entityId: 1, userId: 1, messageSystem$message: { $entity: 'messageSystem', data: { id: 1, notification$messageSystem: { $entity: 'notification', data: { id: 1, iState: 1, channel: 1, }, }, }, }, }, filter: { messageSystem$message: { notification$messageSystem: { id: filter.id, }, }, /* id: { $in: { entity: 'messageSystem', data: { messageId: 1, }, filter: { id: { $in: { entity: 'notification', data: { messageSystemId: 1, }, filter: { id: filter!.id, } }, } } } } */ }, }, { dontCollect: true }); assert(messages.length === 1); const [message] = messages; if (message.iState === 'success') { closeRootMode(); return 0; } // 查看所有的notification状态,只要有一个完成就已经完成了 let success = false; let allFailed = true; let smsTried = false; for (const ms of message.messageSystem$message) { for (const n of ms.notification$messageSystem) { if (n.iState === 'success') { success = true; break; } if (n.iState !== 'failure') { allFailed = false; } if (n.channel === 'sms') { smsTried = true; } } if (success === true) { break; } } if (success) { // 有一个完成就算完成 await context.operate('message', { id: await generateNewIdAsync(), action: 'succeed', data: {}, filter: { id: message.id, }, }, { dontCollect: true }); closeRootMode(); return 1; } // 获取所有注册的失败处理器 const failureHandlers = getNotificationFailureHandlers(); if (failureHandlers.length > 0) { // 如果有注册的失败处理器,执行所有处理器 let totalResult = 0; for (const handler of failureHandlers) { try { const result = await handler(message, context); totalResult += result; } catch (err) { console.error('执行notification失败处理器时出错:', err); } } if (totalResult > 0) { closeRootMode(); return totalResult; } } else if (message.weight === 'medium' && !smsTried && allFailed) { // 如果没有注册处理器,走原有逻辑:中级的消息,在其它途径都失败的情况下再发短信 const result = await tryCreateSmsNotification(message, context); closeRootMode(); return result; } // 标识消息发送失败 if (allFailed) { await context.operate('message', { id: await generateNewIdAsync(), action: 'fail', data: {}, filter: { id: message.id, }, }, { dontCollect: true }); closeRootMode(); return 1; } } catch (err) { closeRootMode(); throw err; } } } ]; export default triggers;