fix: 提高缓冲区大小和等待时间,避免大流量时出现异常

fix: 增加了断开连接的记录,避免在拥塞情况下重复发送close导致的进一步拥塞
This commit is contained in:
Pan Qiancheng 2025-10-17 11:36:09 +08:00
parent a432e12c1d
commit 71326a0e19
2 changed files with 290 additions and 49 deletions

View File

@ -7,7 +7,9 @@ import (
"io"
"log"
"net"
"strings"
"sync"
"sync/atomic"
"time"
)
@ -37,7 +39,7 @@ const (
// 超时设置
ConnectTimeout = 10 * time.Second // 连接超时
ReadTimeout = 30 * time.Second // 读取超时
ReadTimeout = 300 * time.Second // 读取超时增加到60秒
KeepAliveInterval = 15 * time.Second // 心跳间隔
)
@ -56,6 +58,7 @@ type LocalConnection struct {
Conn net.Conn
closeChan chan struct{}
closeOnce sync.Once
Closing int32 // 原子操作标志,表示连接正在关闭
}
// Client 内网穿透客户端
@ -69,6 +72,7 @@ type Client struct {
// 连接管理
connections map[uint32]*LocalConnection
closingConns map[uint32]time.Time // 正在关闭的连接,避免重复处理
connMu sync.RWMutex
// 消息队列
@ -78,13 +82,19 @@ type Client struct {
// NewClient 创建新的隧道客户端
func NewClient(serverAddr string) *Client {
ctx, cancel := context.WithCancel(context.Background())
return &Client{
client := &Client{
serverAddr: serverAddr,
cancel: cancel,
ctx: ctx,
connections: make(map[uint32]*LocalConnection),
sendChan: make(chan *TunnelMessage, 1000),
closingConns: make(map[uint32]time.Time),
sendChan: make(chan *TunnelMessage, 10000), // 增加到10000
}
// 启动清理器,定期清理过期的关闭连接记录
go client.cleanupClosingConns()
return client
}
// Start 启动隧道客户端
@ -347,6 +357,7 @@ func (c *Client) handleConnectRequest(msg *TunnelMessage) {
TargetAddr: targetAddr,
Conn: localConn,
closeChan: make(chan struct{}),
Closing: 0, // 显式初始化为未关闭状态
}
c.connMu.Lock()
@ -374,10 +385,21 @@ func (c *Client) handleDataMessage(msg *TunnelMessage) {
c.connMu.RLock()
connection, exists := c.connections[connID]
isClosing := false
if _, found := c.closingConns[connID]; found {
isClosing = true
}
c.connMu.RUnlock()
if !exists {
// 检查是否是正在关闭的连接,避免重复发送关闭消息
if !isClosing {
log.Printf("收到未知连接的数据: %d发送关闭消息", connID)
// 标记为正在关闭,避免重复处理
c.connMu.Lock()
c.closingConns[connID] = time.Now()
c.connMu.Unlock()
// 连接不存在,发送关闭消息通知对端
closeData := make([]byte, 4)
binary.BigEndian.PutUint32(closeData, connID)
@ -391,6 +413,12 @@ func (c *Client) handleDataMessage(msg *TunnelMessage) {
case c.sendChan <- closeMsg:
default:
}
}
return
}
// 检查连接是否正在关闭
if atomic.LoadInt32(&connection.Closing) == 1 {
return
}
@ -457,14 +485,22 @@ func (c *Client) forwardData(connection *LocalConnection) {
n, err := connection.Conn.Read(buffer)
if err != nil {
// 检查是否是正在关闭的连接,避免记录无关错误
if atomic.LoadInt32(&connection.Closing) == 1 {
return // 连接正在关闭,正常退出
}
// 任何错误都应该终止转发
if err == io.EOF {
log.Printf("目标连接正常关闭 (ID=%d)", connection.ID)
} else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
log.Printf("目标连接超时 (ID=%d)", connection.ID)
} else {
// 只记录非关闭相关的错误
if !isConnectionClosed(err) {
log.Printf("读取目标连接失败 (ID=%d): %v", connection.ID, err)
}
}
return
}
@ -477,6 +513,11 @@ func (c *Client) forwardData(connection *LocalConnection) {
// 重置读取超时
connection.Conn.SetReadDeadline(time.Time{})
// 检查连接是否正在关闭
if atomic.LoadInt32(&connection.Closing) == 1 {
return
}
// 发送数据到隧道
dataMsg := make([]byte, 4+n)
binary.BigEndian.PutUint32(dataMsg[0:4], connection.ID)
@ -492,8 +533,9 @@ func (c *Client) forwardData(connection *LocalConnection) {
select {
case c.sendChan <- msg:
// 数据已发送
case <-time.After(5 * time.Second):
log.Printf("发送数据超时 (ID=%d)", connection.ID)
case <-time.After(2 * time.Second): // 减少超时时间
queueLen := len(c.sendChan)
log.Printf("发送数据超时 (ID=%d), 队列长度: %d/10000", connection.ID, queueLen)
return
case <-c.ctx.Done():
return
@ -508,7 +550,17 @@ func (c *Client) closeConnection(connID uint32) {
c.connMu.Lock()
connection, exists := c.connections[connID]
if exists {
// 使用原子操作标记连接正在关闭
if !atomic.CompareAndSwapInt32(&connection.Closing, 0, 1) {
// 连接已经在关闭中,避免重复处理
c.connMu.Unlock()
return
}
delete(c.connections, connID)
// 记录关闭时间,避免重复发送关闭消息
c.closingConns[connID] = time.Now()
connection.closeOnce.Do(func() {
close(connection.closeChan)
})
@ -520,8 +572,19 @@ func (c *Client) closeConnection(connID uint32) {
c.connMu.Unlock()
if !exists {
// 连接不存在,无需发送关闭消息
return
// 连接不存在,检查是否已经在关闭列表中
c.connMu.RLock()
_, isClosing := c.closingConns[connID]
c.connMu.RUnlock()
if isClosing {
return // 已经处理过了
}
// 标记为正在关闭
c.connMu.Lock()
c.closingConns[connID] = time.Now()
c.connMu.Unlock()
}
// 发送关闭消息
@ -629,3 +692,59 @@ func (c *Client) drainSendChan() {
}
}
}
// cleanupClosingConns 定期清理过期的关闭连接记录
func (c *Client) cleanupClosingConns() {
ticker := time.NewTicker(30 * time.Second) // 每30秒清理一次
defer ticker.Stop()
const maxClosingRecords = 10000 // 最大保留记录数
const maxAge = 2 * time.Minute // 最大保留时间从5分钟减少到2分钟
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
now := time.Now()
c.connMu.Lock()
// 按时间清理过期记录
for connID, closeTime := range c.closingConns {
if now.Sub(closeTime) > maxAge {
delete(c.closingConns, connID)
}
}
// 如果记录数量仍然过多,删除最旧的记录
if len(c.closingConns) > maxClosingRecords {
// 删除一半的最旧记录,避免频繁清理
deleteCount := len(c.closingConns) - maxClosingRecords/2
deletedCount := 0
for connID, closeTime := range c.closingConns {
if deletedCount >= deleteCount {
break
}
if closeTime.Before(now.Add(-maxAge/2)) {
delete(c.closingConns, connID)
deletedCount++
}
}
}
c.connMu.Unlock()
}
}
}
// isConnectionClosed 检查错误是否是连接关闭相关的
func isConnectionClosed(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
return strings.Contains(errStr, "use of closed network connection") ||
strings.Contains(errStr, "connection reset by peer") ||
strings.Contains(errStr, "broken pipe")
}

View File

@ -8,6 +8,7 @@ import (
"log"
"net"
"port-forward/server/stats"
"strings"
"sync"
"sync/atomic"
"time"
@ -39,7 +40,7 @@ const (
// 超时设置
ConnectTimeout = 10 * time.Second // 连接超时
ReadTimeout = 300 * time.Second // 读取超时
ReadTimeout = 300 * time.Second // 读取超时统一为60秒
KeepAliveInterval = 15 * time.Second // 心跳间隔
)
@ -93,6 +94,7 @@ type ActiveConnection struct {
TargetHost string // 支持IP或域名
TargetIP string
Created time.Time
Closing int32 // 原子操作标志,表示连接正在关闭
}
// Server 内网穿透服务器
@ -108,6 +110,7 @@ type Server struct {
// 连接管理
pendingConns map[uint32]*PendingConnection // 待确认连接
activeConns map[uint32]*ActiveConnection // 活跃连接
closingConns map[uint32]time.Time // 正在关闭的连接,避免重复处理
connMu sync.RWMutex
nextConnID uint32
@ -122,14 +125,20 @@ type Server struct {
// NewServer 创建新的隧道服务器
func NewServer(listenPort int) *Server {
ctx, cancel := context.WithCancel(context.Background())
return &Server{
server := &Server{
listenPort: listenPort,
cancel: cancel,
ctx: ctx,
pendingConns: make(map[uint32]*PendingConnection),
activeConns: make(map[uint32]*ActiveConnection),
sendChan: make(chan *TunnelMessage, 1000),
closingConns: make(map[uint32]time.Time),
sendChan: make(chan *TunnelMessage, 10000), // 增加到10000
}
// 启动清理器,定期清理过期的关闭连接记录
go server.cleanupClosingConns()
return server
}
// Start 启动隧道服务器
@ -216,6 +225,7 @@ func (s *Server) handleTunnelRead(conn net.Conn) {
}
s.pendingConns = make(map[uint32]*PendingConnection)
s.activeConns = make(map[uint32]*ActiveConnection)
s.closingConns = make(map[uint32]time.Time)
s.connMu.Unlock()
}()
@ -423,10 +433,21 @@ func (s *Server) handleDataMessage(msg *TunnelMessage) {
s.connMu.RLock()
active, exists := s.activeConns[connID]
isClosing := false
if _, found := s.closingConns[connID]; found {
isClosing = true
}
s.connMu.RUnlock()
if !exists {
// 检查是否是正在关闭的连接,避免重复发送关闭消息
if !isClosing {
log.Printf("收到未知连接的数据: %d发送关闭消息", connID)
// 标记为正在关闭,避免重复处理
s.connMu.Lock()
s.closingConns[connID] = time.Now()
s.connMu.Unlock()
// 连接不存在,发送关闭消息通知对端
closeData := make([]byte, 4)
binary.BigEndian.PutUint32(closeData, connID)
@ -440,6 +461,12 @@ func (s *Server) handleDataMessage(msg *TunnelMessage) {
case s.sendChan <- closeMsg:
default:
}
}
return
}
// 检查连接是否正在关闭
if atomic.LoadInt32(&active.Closing) == 1 {
return
}
@ -475,8 +502,12 @@ func (s *Server) handleKeepAlive(msg *TunnelMessage) {
select {
case s.sendChan <- response:
// log.Printf("回应心跳消息") // 降低日志频率
case <-time.After(1 * time.Second):
// 心跳不是关键消息,超时就跳过
log.Printf("发送心跳响应超时,跳过")
default:
log.Printf("发送心跳响应失败: 发送队列已满")
// 队列满时跳过心跳,避免阻塞数据传输
log.Printf("发送队列忙碌,跳过心跳响应")
}
}
@ -498,14 +529,22 @@ func (s *Server) forwardData(active *ActiveConnection) {
n, err := active.ClientConn.Read(buffer)
if err != nil {
// 检查是否是正在关闭的连接,避免记录无关错误
if atomic.LoadInt32(&active.Closing) == 1 {
return // 连接正在关闭,正常退出
}
// 任何错误都应该终止转发,包括超时
if err == io.EOF {
log.Printf("客户端连接正常关闭 (ID=%d)", active.ID)
} else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
log.Printf("客户端连接超时 (ID=%d)", active.ID)
} else {
// 只记录非关闭相关的错误
if !isConnectionClosed(err) {
log.Printf("读取客户端连接失败 (ID=%d): %v", active.ID, err)
}
}
return
}
@ -518,6 +557,11 @@ func (s *Server) forwardData(active *ActiveConnection) {
// 重置读取超时
active.ClientConn.SetReadDeadline(time.Time{})
// 检查连接是否正在关闭
if atomic.LoadInt32(&active.Closing) == 1 {
return
}
// 发送数据到隧道
dataMsg := make([]byte, 4+n)
binary.BigEndian.PutUint32(dataMsg[0:4], active.ID)
@ -533,8 +577,9 @@ func (s *Server) forwardData(active *ActiveConnection) {
select {
case s.sendChan <- msg:
// 数据已发送
case <-time.After(5 * time.Second):
log.Printf("发送数据超时 (ID=%d)", active.ID)
case <-time.After(2 * time.Second): // 减少超时时间
queueLen := len(s.sendChan)
log.Printf("发送数据超时 (ID=%d), 队列长度: %d/10000", active.ID, queueLen)
return
case <-s.ctx.Done():
return
@ -547,7 +592,17 @@ func (s *Server) closeConnection(connID uint32) {
s.connMu.Lock()
active, exists := s.activeConns[connID]
if exists {
// 使用原子操作标记连接正在关闭
if !atomic.CompareAndSwapInt32(&active.Closing, 0, 1) {
// 连接已经在关闭中,避免重复处理
s.connMu.Unlock()
return
}
delete(s.activeConns, connID)
// 记录关闭时间,避免重复发送关闭消息
s.closingConns[connID] = time.Now()
// 确保连接被关闭
if active.ClientConn != nil {
active.ClientConn.Close()
@ -556,8 +611,19 @@ func (s *Server) closeConnection(connID uint32) {
s.connMu.Unlock()
if !exists {
// 连接不存在,无需发送关闭消息
return
// 连接不存在,检查是否已经在关闭列表中
s.connMu.RLock()
_, isClosing := s.closingConns[connID]
s.connMu.RUnlock()
if isClosing {
return // 已经处理过了
}
// 标记为正在关闭
s.connMu.Lock()
s.closingConns[connID] = time.Now()
s.connMu.Unlock()
}
// 发送关闭消息
@ -750,3 +816,59 @@ func (s *Server) addBytesSent(bytes uint64) {
func (s *Server) addBytesReceived(bytes uint64) {
atomic.AddUint64(&s.bytesReceived, bytes)
}
// cleanupClosingConns 定期清理过期的关闭连接记录
func (s *Server) cleanupClosingConns() {
ticker := time.NewTicker(30 * time.Second) // 每30秒清理一次
defer ticker.Stop()
const maxClosingRecords = 10000 // 最大保留记录数
const maxAge = 2 * time.Minute // 最大保留时间从5分钟减少到2分钟
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
now := time.Now()
s.connMu.Lock()
// 按时间清理过期记录
for connID, closeTime := range s.closingConns {
if now.Sub(closeTime) > maxAge {
delete(s.closingConns, connID)
}
}
// 如果记录数量仍然过多,删除最旧的记录
if len(s.closingConns) > maxClosingRecords {
// 删除一半的最旧记录,避免频繁清理
deleteCount := len(s.closingConns) - maxClosingRecords/2
deletedCount := 0
for connID, closeTime := range s.closingConns {
if deletedCount >= deleteCount {
break
}
if closeTime.Before(now.Add(-maxAge/2)) {
delete(s.closingConns, connID)
deletedCount++
}
}
}
s.connMu.Unlock()
}
}
}
// isConnectionClosed 检查错误是否是连接关闭相关的
func isConnectionClosed(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
return strings.Contains(errStr, "use of closed network connection") ||
strings.Contains(errStr, "connection reset by peer") ||
strings.Contains(errStr, "broken pipe")
}