From 3d3a3cee40eb09e9ca68a7972d3993362f0d6874 Mon Sep 17 00:00:00 2001 From: "qcqcqc@wsl" <1220204124@zust.edu.cn> Date: Tue, 15 Apr 2025 15:30:01 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AD=89=E5=88=B0handler=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E6=89=8D=E4=BC=9A=E7=BB=93=E6=9D=9F=E8=AF=B7=E6=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend-service/pkg/service/socket.go | 2 +- shared/pkg/client/methods.go | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/backend-service/pkg/service/socket.go b/backend-service/pkg/service/socket.go index 4931336..3e9124f 100644 --- a/backend-service/pkg/service/socket.go +++ b/backend-service/pkg/service/socket.go @@ -175,7 +175,6 @@ func (t *UnixSocketTask) handleConnection(conn net.Conn) { client := client.NewClient() handleQuestion := func(question string) { - logger.Debug("question: %s", question) if loading { loading = false // 停止加载动画 t.writeMessage(conn, "\r") @@ -197,6 +196,7 @@ func (t *UnixSocketTask) handleConnection(conn net.Conn) { } // 结束后发送结束标记 + t.writeMessage(conn, "\n\r") t.writeMessage(conn, "[end]") conn.Close() // 关闭连接,防止客户端一直等待inf logger.Debug("connection done.") diff --git a/shared/pkg/client/methods.go b/shared/pkg/client/methods.go index 1faa70a..6759ef1 100644 --- a/shared/pkg/client/methods.go +++ b/shared/pkg/client/methods.go @@ -12,6 +12,7 @@ import ( "os" "path" "strings" + "sync" "time" ) @@ -255,14 +256,18 @@ func (c *Client) PostToStream(uri string, body interface{}, query map[string]str reader := bufio.NewReader(resp.Body) var buffer strings.Builder - ticker := time.NewTicker(250 * time.Millisecond) // 每 250 ms处理一次 + ticker := time.NewTicker(250 * time.Millisecond) defer ticker.Stop() done := make(chan struct{}) errChan := make(chan error, 1) - // 启动一个定时处理协程 + var wg sync.WaitGroup + wg.Add(1) + + // 处理数据的协程 go func() { + defer wg.Done() for { select { case <-ticker.C: @@ -271,7 +276,6 @@ func (c *Client) PostToStream(uri string, body interface{}, query map[string]str buffer.Reset() } case <-done: - // flush final if buffer.Len() > 0 { messageHandler(buffer.String()) } @@ -280,7 +284,7 @@ func (c *Client) PostToStream(uri string, body interface{}, query map[string]str } }() - // 读取数据的主循环 + // 读取数据 go func() { for { chunk := make([]byte, 256) @@ -300,11 +304,15 @@ func (c *Client) PostToStream(uri string, body interface{}, query map[string]str errChan <- nil }() + // 等待读取是否有错误 if err := <-errChan; err != nil { logger.Error("Error reading response stream") return err } + // 等待 messageHandler 完成处理 + wg.Wait() + return nil }