235 lines
8.1 KiB
JavaScript
235 lines
8.1 KiB
JavaScript
import { assert } from 'oak-domain/lib/utils/assert';
|
||
import { pull, unset } from 'oak-domain/lib/utils/lodash';
|
||
import { OakSocketConnectException } from 'oak-domain/lib/types/Exception';
|
||
import io from '../../utils/socket.io/socket.io';
|
||
import { Feature } from '../../types/Feature';
|
||
export class SubScriber extends Feature {
|
||
cache;
|
||
message;
|
||
socketPoint;
|
||
eventMap = {};
|
||
url;
|
||
path;
|
||
socket;
|
||
halted;
|
||
reconnectInterval;
|
||
reconnectTimer;
|
||
count;
|
||
socketState = 'unconnected';
|
||
eventCallbackMap = {
|
||
connect: [],
|
||
disconnect: [],
|
||
};
|
||
constructor(cache, message, socketPoint) {
|
||
super();
|
||
this.cache = cache;
|
||
this.message = message;
|
||
this.socketPoint = socketPoint;
|
||
this.halted = false;
|
||
this.reconnectInterval = 10 * 1000; // 每隔10秒重连
|
||
this.reconnectTimer = undefined;
|
||
this.count = 0;
|
||
}
|
||
on(event, callback) {
|
||
this.eventCallbackMap[event].push(callback);
|
||
}
|
||
off(event, callback) {
|
||
pull(this.eventCallbackMap[event], callback);
|
||
}
|
||
emit(event, ...data) {
|
||
this.eventCallbackMap[event].forEach((ele) => ele(data));
|
||
}
|
||
async initSocketPoint() {
|
||
try {
|
||
const { subscribeUrl, path } = await this.socketPoint.get();
|
||
this.url = subscribeUrl;
|
||
this.path = path;
|
||
}
|
||
catch (err) {
|
||
this.url = undefined;
|
||
this.path = undefined;
|
||
throw new OakSocketConnectException();
|
||
}
|
||
}
|
||
async connect() {
|
||
if (this.halted) {
|
||
return;
|
||
}
|
||
this.socketState = 'connecting';
|
||
if (!this.url) {
|
||
await this.initSocketPoint();
|
||
this.count = 0;
|
||
}
|
||
const url = this.url;
|
||
const path = this.path;
|
||
// import { io } from "socket.io-client";
|
||
// const socket = io('https://example.com/order', {
|
||
// path: '/my-custom-path/',
|
||
// });
|
||
// the Socket instance is attached to the "order" Namespace
|
||
// the HTTP requests will look like: GET https://example.com/my-custom-path/?EIO=4&transport=polling&t=ML4jUwU
|
||
// 文档 https://socket.io/docs/v4/client-options/
|
||
if (!this.socket) {
|
||
// 理论上有可能出现this.socket初始化时,url和path和这次connect时发生变化,暂时不处理。by Xc
|
||
this.socket = io(url, {
|
||
path,
|
||
});
|
||
}
|
||
const socket = this.socket;
|
||
return new Promise((resolve, reject) => {
|
||
/**
|
||
* https://socket.io/zh-CN/docs/v4/client-socket-instance/
|
||
*/
|
||
socket.on('connect', async () => {
|
||
this.socketState = 'connected';
|
||
this.count = 0;
|
||
this.emit('connect');
|
||
socket.off('connect');
|
||
socket.on('disconnect', () => {
|
||
this.socketState = 'unconnected';
|
||
this.emit('disconnect');
|
||
socket.removeAllListeners();
|
||
if (Object.keys(this.eventMap).length > 0) {
|
||
this.connect();
|
||
}
|
||
});
|
||
socket.on('data', (opRecords, event) => {
|
||
const registered = this.eventMap[event];
|
||
if (registered) {
|
||
registered.callbacks.forEach((ele) => ele(event, opRecords));
|
||
}
|
||
this.cache.sync(opRecords);
|
||
});
|
||
socket.on('error', (errString) => {
|
||
console.error(errString);
|
||
this.message.setMessage({
|
||
type: 'error',
|
||
title: '服务器subscriber抛出异常',
|
||
content: errString,
|
||
});
|
||
});
|
||
if (Object.keys(this.eventMap).length > 0) {
|
||
socket.emit('sub', Object.keys(this.eventMap));
|
||
resolve(undefined);
|
||
}
|
||
else {
|
||
resolve(undefined);
|
||
socket.disconnect();
|
||
}
|
||
});
|
||
socket.on('connect_error', async (err) => {
|
||
this.count++;
|
||
socket.removeAllListeners();
|
||
socket.disconnect();
|
||
if (this.count > 50) {
|
||
// 可能socket地址改变了,刷新重连
|
||
this.url = undefined;
|
||
this.socket = undefined;
|
||
}
|
||
// 根据重连次数设置不同的重连间隔
|
||
if (this.count <= 10) {
|
||
this.reconnectInterval = 10 * 1000; // 10秒
|
||
}
|
||
else if (this.count <= 20) {
|
||
this.reconnectInterval = 30 * 1000; // 30秒
|
||
}
|
||
else if (this.count <= 30) {
|
||
this.reconnectInterval = 60 * 1000; // 60秒
|
||
}
|
||
else if (this.count <= 40) {
|
||
this.reconnectInterval = 90 * 1000; // 90秒
|
||
}
|
||
else {
|
||
this.reconnectInterval = 120 * 1000; // 2分钟
|
||
}
|
||
// 设置新的定时器
|
||
this.reconnectTimer = setTimeout(() => this.connect(), this.reconnectInterval);
|
||
resolve(undefined);
|
||
});
|
||
socket.connect();
|
||
});
|
||
}
|
||
async sub(events, callback) {
|
||
const newEvents = [];
|
||
events.forEach((event) => {
|
||
const registered = this.eventMap[event];
|
||
if (registered) {
|
||
if (callback) {
|
||
registered.callbacks.push(callback);
|
||
}
|
||
registered.count++;
|
||
}
|
||
else {
|
||
this.eventMap[event] = {
|
||
callbacks: callback ? [callback] : [],
|
||
count: 1,
|
||
};
|
||
newEvents.push(event);
|
||
}
|
||
;
|
||
});
|
||
if (this.socketState === 'unconnected') {
|
||
await this.connect();
|
||
}
|
||
else if (this.socketState === 'connected' && newEvents.length > 0) {
|
||
await new Promise((resolve, reject) => {
|
||
// this.socket!.emit('sub', newEvents, (result: string) => {
|
||
// if (result) {
|
||
// this.message.setMessage({
|
||
// type: 'error',
|
||
// title: 'sub data error',
|
||
// content: result,
|
||
// });
|
||
// reject();
|
||
// }
|
||
// else {
|
||
// resolve(undefined);
|
||
// }
|
||
// });
|
||
// TODO 临时代码 后续结合文档再解决
|
||
this.socket.emit('sub', newEvents);
|
||
resolve(undefined);
|
||
});
|
||
}
|
||
return () => {
|
||
this.unsub(events, callback);
|
||
};
|
||
}
|
||
async unsub(events, callback) {
|
||
const invalidEvents = [];
|
||
events.forEach((event) => {
|
||
const registered = this.eventMap[event];
|
||
assert(registered);
|
||
registered.count--;
|
||
if (registered.count > 0) {
|
||
if (callback) {
|
||
pull(registered.callbacks, callback);
|
||
}
|
||
}
|
||
else {
|
||
invalidEvents.push(event);
|
||
}
|
||
});
|
||
invalidEvents.forEach(event => unset(this.eventMap, event));
|
||
if (this.socketState === 'connected' || this.socketState === 'connecting') {
|
||
this.socket.emit('unsub', events);
|
||
if (Object.keys(this.eventMap).length === 0) {
|
||
this.socket.disconnect();
|
||
this.socket.removeAllListeners();
|
||
this.socketState = 'unconnected';
|
||
}
|
||
}
|
||
}
|
||
getSubscriberId() {
|
||
if (this.socket) {
|
||
return this.socket.id;
|
||
}
|
||
}
|
||
setHalted(halted) {
|
||
this.halted = halted;
|
||
if (halted === false && Object.keys(this.eventMap).length > 0) {
|
||
this.connect();
|
||
}
|
||
}
|
||
}
|