oak-frontend-base/lib/features/socket/subscriber.js

240 lines
8.4 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.SubScriber = void 0;
const tslib_1 = require("tslib");
const assert_1 = require("oak-domain/lib/utils/assert");
const lodash_1 = require("oak-domain/lib/utils/lodash");
const Exception_1 = require("oak-domain/lib/types/Exception");
const socket_io_1 = tslib_1.__importDefault(require("../../utils/socket.io/socket.io"));
const Feature_1 = require("../../types/Feature");
class SubScriber extends Feature_1.Feature {
cache;
message;
socketPoint;
eventMap = {};
url;
path;
socket;
halted;
reconnectInterval;
reconnectTimer;
count;
socketState = 'unconnected';
eventCallbackMap = {
connect: [],
disconnect: [],
};
constructor(cache, message, socketPoint) {
super();
this.cache = cache;
this.message = message;
this.socketPoint = socketPoint;
this.halted = false;
this.reconnectInterval = 10 * 1000; // 每隔10秒重连
this.reconnectTimer = undefined;
this.count = 0;
}
on(event, callback) {
this.eventCallbackMap[event].push(callback);
}
off(event, callback) {
(0, lodash_1.pull)(this.eventCallbackMap[event], callback);
}
emit(event, ...data) {
this.eventCallbackMap[event].forEach((ele) => ele(data));
}
async initSocketPoint() {
try {
const { subscribeUrl, path } = await this.socketPoint.get();
this.url = subscribeUrl;
this.path = path;
}
catch (err) {
this.url = undefined;
this.path = undefined;
throw new Exception_1.OakSocketConnectException();
}
}
async connect() {
if (this.halted) {
return;
}
this.socketState = 'connecting';
if (!this.url) {
await this.initSocketPoint();
this.count = 0;
}
const url = this.url;
const path = this.path;
// import { io } from "socket.io-client";
// const socket = io('https://example.com/order', {
// path: '/my-custom-path/',
// });
// the Socket instance is attached to the "order" Namespace
// the HTTP requests will look like: GET https://example.com/my-custom-path/?EIO=4&transport=polling&t=ML4jUwU
// 文档 https://socket.io/docs/v4/client-options/
if (!this.socket) {
// 理论上有可能出现this.socket初始化时url和path和这次connect时发生变化暂时不处理。by Xc
this.socket = (0, socket_io_1.default)(url, {
path,
});
}
const socket = this.socket;
return new Promise((resolve, reject) => {
/**
* https://socket.io/zh-CN/docs/v4/client-socket-instance/
*/
socket.on('connect', async () => {
this.socketState = 'connected';
this.count = 0;
this.emit('connect');
socket.off('connect');
socket.on('disconnect', () => {
this.socketState = 'unconnected';
this.emit('disconnect');
socket.removeAllListeners();
if (Object.keys(this.eventMap).length > 0) {
this.connect();
}
});
socket.on('data', (opRecords, event) => {
this.cache.sync(opRecords);
const registered = this.eventMap[event];
if (registered) {
registered.callbacks.forEach((ele) => ele(event, opRecords));
}
});
socket.on('error', (errString) => {
console.error(errString);
this.message.setMessage({
type: 'error',
title: '服务器subscriber抛出异常',
content: errString,
});
});
if (Object.keys(this.eventMap).length > 0) {
socket.emit('sub', Object.keys(this.eventMap));
resolve(undefined);
}
else {
resolve(undefined);
socket.disconnect();
}
});
socket.on('connect_error', async (err) => {
this.count++;
socket.removeAllListeners();
socket.disconnect();
if (this.count > 50) {
// 可能socket地址改变了刷新重连
this.url = undefined;
this.socket = undefined;
}
// 根据重连次数设置不同的重连间隔
if (this.count <= 10) {
this.reconnectInterval = 10 * 1000; // 10秒
}
else if (this.count <= 20) {
this.reconnectInterval = 30 * 1000; // 30秒
}
else if (this.count <= 30) {
this.reconnectInterval = 60 * 1000; // 60秒
}
else if (this.count <= 40) {
this.reconnectInterval = 90 * 1000; // 90秒
}
else {
this.reconnectInterval = 120 * 1000; // 2分钟
}
// 设置新的定时器
this.reconnectTimer = setTimeout(() => this.connect(), this.reconnectInterval);
resolve(undefined);
});
socket.connect();
});
}
async sub(events, callback) {
const newEvents = [];
events.forEach((event) => {
const registered = this.eventMap[event];
if (registered) {
if (callback) {
registered.callbacks.push(callback);
}
registered.count++;
}
else {
this.eventMap[event] = {
callbacks: callback ? [callback] : [],
count: 1,
};
newEvents.push(event);
}
;
});
if (this.socketState === 'unconnected') {
await this.connect();
}
else if (this.socketState === 'connected' && newEvents.length > 0) {
await new Promise((resolve, reject) => {
// this.socket!.emit('sub', newEvents, (result: string) => {
// if (result) {
// this.message.setMessage({
// type: 'error',
// title: 'sub data error',
// content: result,
// });
// reject();
// }
// else {
// resolve(undefined);
// }
// });
// TODO 临时代码 后续结合文档再解决
this.socket.emit('sub', newEvents);
resolve(undefined);
});
}
return () => {
this.unsub(events, callback);
};
}
async unsub(events, callback) {
const invalidEvents = [];
events.forEach((event) => {
const registered = this.eventMap[event];
(0, assert_1.assert)(registered);
registered.count--;
if (registered.count > 0) {
if (callback) {
(0, lodash_1.pull)(registered.callbacks, callback);
}
}
else {
invalidEvents.push(event);
}
});
invalidEvents.forEach(event => (0, lodash_1.unset)(this.eventMap, event));
if (this.socketState === 'connected' || this.socketState === 'connecting') {
this.socket.emit('unsub', events);
if (Object.keys(this.eventMap).length === 0) {
this.socket.disconnect();
this.socket.removeAllListeners();
this.socketState = 'unconnected';
}
}
}
getSubscriberId() {
if (this.socket) {
return this.socket.id;
}
}
setHalted(halted) {
this.halted = halted;
if (halted === false && Object.keys(this.eventMap).length > 0) {
this.connect();
}
}
}
exports.SubScriber = SubScriber;