"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.SubScriber = void 0; const tslib_1 = require("tslib"); const assert_1 = require("oak-domain/lib/utils/assert"); const lodash_1 = require("oak-domain/lib/utils/lodash"); const Exception_1 = require("oak-domain/lib/types/Exception"); const socket_io_1 = tslib_1.__importDefault(require("../utils/socket.io/socket.io")); const Feature_1 = require("../types/Feature"); class SubScriber extends Feature_1.Feature { cache; message; getSubscribePointFn; eventMap = {}; url; path; socket; halted; reconnectInterval; reconnectTimer; count; socketState = 'unconnected'; eventCallbackMap = { connect: [], disconnect: [], }; constructor(cache, message, getSubscribePointFn) { super(); this.cache = cache; this.message = message; this.getSubscribePointFn = getSubscribePointFn; this.halted = false; this.reconnectInterval = 10 * 1000; // 每隔10秒重连 this.reconnectTimer = undefined; this.count = 0; } on(event, callback) { this.eventCallbackMap[event].push(callback); } off(event, callback) { (0, lodash_1.pull)(this.eventCallbackMap[event], callback); } emit(event, ...data) { this.eventCallbackMap[event].forEach((ele) => ele(data)); } async initSocketPoint() { try { const { url, path } = await this.getSubscribePointFn(); this.url = url; this.path = path; } catch (err) { this.url = undefined; this.path = undefined; throw new Exception_1.OakSocketConnectException(); } } async connect() { if (this.halted) { return; } this.socketState = 'connecting'; let optionInited = false; if (!this.url) { await this.initSocketPoint(); optionInited = true; this.count = 0; } const url = this.url; const path = this.path; // import { io } from "socket.io-client"; // const socket = io('https://example.com/order', { // path: '/my-custom-path/', // }); // the Socket instance is attached to the "order" Namespace // the HTTP requests will look like: GET https://example.com/my-custom-path/?EIO=4&transport=polling&t=ML4jUwU // 文档 https://socket.io/docs/v4/client-options/ if (!this.socket) { // 理论上有可能出现this.socket初始化时,url和path和这次connect时发生变化,暂时不处理。by Xc this.socket = (0, socket_io_1.default)(url, { path, }); } const socket = this.socket; return new Promise((resolve, reject) => { /** * https://socket.io/zh-CN/docs/v4/client-socket-instance/ */ socket.on('connect', async () => { this.socketState = 'connected'; this.emit('connect'); socket.off('connect'); socket.on('disconnect', () => { this.socketState = 'unconnected'; this.emit('disconnect'); socket.removeAllListeners(); if (Object.keys(this.eventMap).length > 0) { this.connect(); } }); socket.on('data', (opRecords, event) => { const registered = this.eventMap[event]; if (registered) { registered.callbacks.forEach((ele) => ele(event, opRecords)); } this.cache.sync(opRecords); }); socket.on('error', (errString) => { console.error(errString); this.message.setMessage({ type: 'error', title: '服务器subscriber抛出异常', content: errString, }); }); if (Object.keys(this.eventMap).length > 0) { socket.emit('sub', Object.keys(this.eventMap)); resolve(undefined); } else { resolve(undefined); socket.disconnect(); } }); if (!optionInited) { socket.on('connect_error', async (err) => { this.count++; if (this.count > 50) { // 可能socket地址改变了,刷新重连 this.url = undefined; } socket.removeAllListeners(); socket.disconnect(); this.socket = undefined; // 清除之前的重连定时器 if (this.reconnectTimer) { clearInterval(this.reconnectTimer); } // 根据重连次数设置不同的重连间隔 if (this.count <= 10) { this.reconnectInterval = 10 * 1000; // 10秒 } else if (this.count <= 20) { this.reconnectInterval = 30 * 1000; // 30秒 } else if (this.count <= 30) { this.reconnectInterval = 60 * 1000; // 60秒 } else if (this.count <= 40) { this.reconnectInterval = 90 * 1000; // 90秒 } else { this.reconnectInterval = 120 * 1000; // 2分钟 } // 设置新的定时器 this.reconnectTimer = setInterval(async () => { await this.connect(); // resolve(undefined); }, this.reconnectInterval); resolve(undefined); }); } socket.connect(); }); } async sub(events, callback) { const newEvents = []; events.forEach((event) => { const registered = this.eventMap[event]; if (registered) { if (callback) { registered.callbacks.push(callback); } registered.count++; } else { this.eventMap[event] = { callbacks: callback ? [callback] : [], count: 1, }; newEvents.push(event); } ; }); if (this.socketState === 'unconnected') { await this.connect(); } else if (this.socketState === 'connected' && newEvents.length > 0) { await new Promise((resolve, reject) => { // this.socket!.emit('sub', newEvents, (result: string) => { // if (result) { // this.message.setMessage({ // type: 'error', // title: 'sub data error', // content: result, // }); // reject(); // } // else { // resolve(undefined); // } // }); // TODO 临时代码 后续结合文档再解决 this.socket.emit('sub', newEvents); resolve(undefined); }); } return () => { this.unsub(events, callback); }; } async unsub(events, callback) { const invalidEvents = []; events.forEach((event) => { const registered = this.eventMap[event]; (0, assert_1.assert)(registered); registered.count--; if (registered.count > 0) { if (callback) { (0, lodash_1.pull)(registered.callbacks, callback); } } else { invalidEvents.push(event); } }); invalidEvents.forEach(event => (0, lodash_1.unset)(this.eventMap, event)); if (this.socketState === 'connected') { this.socket.emit('unsub', events); if (Object.keys(this.eventMap).length === 0) { this.socket.disconnect(); this.socket.removeAllListeners(); this.socketState = 'unconnected'; } } } getSubscriberId() { if (this.socket) { return this.socket.id; } } setHalted(halted) { this.halted = halted; if (halted === false && Object.keys(this.eventMap).length > 0) { this.connect(); } } } exports.SubScriber = SubScriber;