oak-frontend-base/es/features/subscriber.js

245 lines
8.6 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { assert } from 'oak-domain/lib/utils/assert';
import { pull, unset } from 'oak-domain/lib/utils/lodash';
import { OakSocketConnectException } from 'oak-domain/lib/types/Exception';
import io from '../utils/socket.io/socket.io';
import { Feature } from '../types/Feature';
export class SubScriber extends Feature {
cache;
message;
getSubscribePointFn;
eventMap = {};
url;
path;
socket;
halted;
reconnectInterval;
reconnectTimer;
count;
socketState = 'unconnected';
eventCallbackMap = {
connect: [],
disconnect: [],
};
constructor(cache, message, getSubscribePointFn) {
super();
this.cache = cache;
this.message = message;
this.getSubscribePointFn = getSubscribePointFn;
this.halted = false;
this.reconnectInterval = 10 * 1000; // 每隔10秒重连
this.reconnectTimer = undefined;
this.count = 0;
}
on(event, callback) {
this.eventCallbackMap[event].push(callback);
}
off(event, callback) {
pull(this.eventCallbackMap[event], callback);
}
emit(event, ...data) {
this.eventCallbackMap[event].forEach((ele) => ele(data));
}
async initSocketPoint() {
try {
const { url, path } = await this.getSubscribePointFn();
this.url = url;
this.path = path;
}
catch (err) {
this.url = undefined;
this.path = undefined;
throw new OakSocketConnectException();
}
}
async connect() {
if (this.halted) {
return;
}
this.socketState = 'connecting';
let optionInited = false;
if (!this.url) {
await this.initSocketPoint();
optionInited = true;
this.count = 0;
}
const url = this.url;
const path = this.path;
// import { io } from "socket.io-client";
// const socket = io('https://example.com/order', {
// path: '/my-custom-path/',
// });
// the Socket instance is attached to the "order" Namespace
// the HTTP requests will look like: GET https://example.com/my-custom-path/?EIO=4&transport=polling&t=ML4jUwU
// 文档 https://socket.io/docs/v4/client-options/
if (!this.socket) {
// 理论上有可能出现this.socket初始化时url和path和这次connect时发生变化暂时不处理。by Xc
this.socket = io(url, {
path,
});
}
const socket = this.socket;
return new Promise((resolve, reject) => {
/**
* https://socket.io/zh-CN/docs/v4/client-socket-instance/
*/
socket.on('connect', async () => {
this.socketState = 'connected';
this.emit('connect');
socket.off('connect');
socket.on('disconnect', () => {
this.socketState = 'unconnected';
this.emit('disconnect');
socket.removeAllListeners();
if (Object.keys(this.eventMap).length > 0) {
this.connect();
}
});
socket.on('data', (opRecords, event) => {
const registered = this.eventMap[event];
if (registered) {
registered.callbacks.forEach((ele) => ele(event, opRecords));
}
this.cache.sync(opRecords);
});
socket.on('error', (errString) => {
console.error(errString);
this.message.setMessage({
type: 'error',
title: '服务器subscriber抛出异常',
content: errString,
});
});
if (Object.keys(this.eventMap).length > 0) {
socket.emit('sub', Object.keys(this.eventMap));
resolve(undefined);
}
else {
resolve(undefined);
socket.disconnect();
}
});
if (!optionInited) {
socket.on('connect_error', async (err) => {
this.count++;
if (this.count > 50) {
// 可能socket地址改变了刷新重连
this.url = undefined;
}
socket.removeAllListeners();
socket.disconnect();
this.socket = undefined;
// 清除之前的重连定时器
if (this.reconnectTimer) {
clearInterval(this.reconnectTimer);
}
// 根据重连次数设置不同的重连间隔
if (this.count <= 10) {
this.reconnectInterval = 10 * 1000; // 10秒
}
else if (this.count <= 20) {
this.reconnectInterval = 30 * 1000; // 30秒
}
else if (this.count <= 30) {
this.reconnectInterval = 60 * 1000; // 60秒
}
else if (this.count <= 40) {
this.reconnectInterval = 90 * 1000; // 90秒
}
else {
this.reconnectInterval = 120 * 1000; // 2分钟
}
// 设置新的定时器
this.reconnectTimer = setInterval(async () => {
await this.connect();
// resolve(undefined);
}, this.reconnectInterval);
resolve(undefined);
});
}
socket.connect();
});
}
async sub(events, callback) {
const newEvents = [];
events.forEach((event) => {
const registered = this.eventMap[event];
if (registered) {
if (callback) {
registered.callbacks.push(callback);
}
registered.count++;
}
else {
this.eventMap[event] = {
callbacks: callback ? [callback] : [],
count: 1,
};
newEvents.push(event);
}
;
});
if (this.socketState === 'unconnected') {
await this.connect();
}
else if (this.socketState === 'connected' && newEvents.length > 0) {
await new Promise((resolve, reject) => {
// this.socket!.emit('sub', newEvents, (result: string) => {
// if (result) {
// this.message.setMessage({
// type: 'error',
// title: 'sub data error',
// content: result,
// });
// reject();
// }
// else {
// resolve(undefined);
// }
// });
// TODO 临时代码 后续结合文档再解决
this.socket.emit('sub', newEvents);
resolve(undefined);
});
}
return () => {
this.unsub(events, callback);
};
}
async unsub(events, callback) {
const invalidEvents = [];
events.forEach((event) => {
const registered = this.eventMap[event];
assert(registered);
registered.count--;
if (registered.count > 0) {
if (callback) {
pull(registered.callbacks, callback);
}
}
else {
invalidEvents.push(event);
}
});
invalidEvents.forEach(event => unset(this.eventMap, event));
if (this.socketState === 'connected') {
this.socket.emit('unsub', events);
if (Object.keys(this.eventMap).length === 0) {
this.socket.disconnect();
this.socket.removeAllListeners();
this.socketState = 'unconnected';
}
}
}
getSubscriberId() {
if (this.socket) {
return this.socket.id;
}
}
setHalted(halted) {
this.halted = halted;
if (halted === false && Object.keys(this.eventMap).length > 0) {
this.connect();
}
}
}