oak-frontend-base/es/features/subscriber.js

203 lines
6.8 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { assert } from 'oak-domain/lib/utils/assert';
import { pull, unset } from 'oak-domain/lib/utils/lodash';
import io from '../utils/socket.io/socket.io';
import { Feature } from '../types/Feature';
export class SubScriber extends Feature {
cache;
message;
getSubscribePointFn;
eventMap = {};
url;
path;
socket;
halted;
socketState = 'unconnected';
eventCallbackMap = {
connect: [],
disconnect: [],
};
constructor(cache, message, getSubscribePointFn) {
super();
this.cache = cache;
this.message = message;
this.getSubscribePointFn = getSubscribePointFn;
this.halted = false;
}
on(event, callback) {
this.eventCallbackMap[event].push(callback);
}
off(event, callback) {
pull(this.eventCallbackMap[event], callback);
}
emit(event, ...data) {
this.eventCallbackMap[event].forEach((ele) => ele(data));
}
async initSocketPoint() {
const { url, path } = await this.getSubscribePointFn();
this.url = url;
this.path = path;
}
async connect() {
if (this.halted) {
return;
}
this.socketState = 'connecting';
let optionInited = false;
if (!this.url) {
await this.initSocketPoint();
optionInited = true;
}
const url = this.url;
const path = this.path;
// import { io } from "socket.io-client";
// const socket = io('https://example.com/order', {
// path: '/my-custom-path/',
// });
// the Socket instance is attached to the "order" Namespace
// the HTTP requests will look like: GET https://example.com/my-custom-path/?EIO=4&transport=polling&t=ML4jUwU
// 文档 https://socket.io/docs/v4/client-options/
if (!this.socket) {
// 理论上有可能出现this.socket初始化时url和path和这次connect时发生变化暂时不处理。by Xc
this.socket = io(url, {
path,
});
}
const socket = this.socket;
return new Promise((resolve, reject) => {
/**
* https://socket.io/zh-CN/docs/v4/client-socket-instance/
*/
socket.on('connect', async () => {
this.socketState = 'connected';
this.emit('connect');
socket.off('connect');
socket.on('disconnect', () => {
this.socketState = 'unconnected';
this.emit('disconnect');
socket.removeAllListeners();
if (Object.keys(this.eventMap).length > 0) {
this.connect();
}
});
socket.on('data', (opRecords, event) => {
const registered = this.eventMap[event];
if (registered) {
registered.callbacks.forEach((ele) => ele(event, opRecords));
}
this.cache.sync(opRecords);
});
socket.on('error', (errString) => {
console.error(errString);
this.message.setMessage({
type: 'error',
title: '服务器subscriber抛出异常',
content: errString,
});
});
if (Object.keys(this.eventMap).length > 0) {
socket.emit('sub', Object.keys(this.eventMap));
resolve(undefined);
}
else {
resolve(undefined);
socket.disconnect();
}
});
if (!optionInited) {
let count = 0;
socket.on('connect_error', async () => {
count++;
if (count > 10) {
// 可能socket地址改变了刷新重连
socket.removeAllListeners();
socket.disconnect();
this.url = undefined;
await this.connect();
resolve(undefined);
}
});
}
socket.connect();
});
}
async sub(events, callback) {
const newEvents = [];
events.forEach((event) => {
const registered = this.eventMap[event];
if (registered) {
if (callback) {
registered.callbacks.push(callback);
registered.count++;
}
}
else {
this.eventMap[event] = {
callbacks: callback ? [callback] : [],
count: 1,
};
newEvents.push(event);
}
;
});
if (this.socketState === 'unconnected') {
await this.connect();
}
else if (this.socketState === 'connected' && newEvents.length > 0) {
await new Promise((resolve, reject) => {
this.socket.emit('sub', newEvents, (result) => {
if (result) {
this.message.setMessage({
type: 'error',
title: 'sub data error',
content: result,
});
reject();
}
else {
resolve(undefined);
}
});
});
}
return () => {
this.unsub(events, callback);
};
}
async unsub(events, callback) {
const invalidEvents = [];
events.forEach((event) => {
const registered = this.eventMap[event];
assert(registered);
registered.count--;
if (registered.count > 0) {
if (callback) {
pull(registered.callbacks, callback);
}
}
else {
invalidEvents.push(event);
}
});
invalidEvents.forEach(event => unset(this.eventMap, event));
if (this.socketState === 'connected') {
this.socket.emit('unsub', events);
if (Object.keys(this.eventMap).length === 0) {
this.socket.disconnect();
this.socket.removeAllListeners();
this.socketState = 'unconnected';
}
}
}
getSubscriberId() {
if (this.socket) {
return this.socket.id;
}
}
setHalted(halted) {
this.halted = halted;
if (halted === false && Object.keys(this.eventMap).length > 0) {
this.connect();
}
}
}