bash_go_service/shared/pkg/client/methods.go

377 lines
7.8 KiB
Go

package client
import (
"bash_go_service/shared/pkg/logger"
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path"
"strings"
"sync"
"time"
)
func (c *Client) Get(uri string, query map[string]string, result interface{}) error {
c.trackRequest()
defer c.requestDone()
u, err := url.Parse(c.baseURL)
if err != nil {
logger.Error("Failed to parse base URL: %v", err)
return err
}
u.Path = path.Join(u.Path, uri)
q := u.Query()
for k, v := range query {
q.Set(k, v)
}
u.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(c.ctx, http.MethodGet, u.String(), nil)
if err != nil {
logger.Error("Failed to create GET request: %v", err)
return err
}
resp, err := c.client.Do(req)
if err != nil {
logger.Error("GET request failed: %v", err)
return err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
logger.Error("Failed to read response body: %v", err)
return err
}
if err := json.Unmarshal(body, result); err != nil {
logger.Error("Failed to unmarshal response: %v", err)
return err
}
return nil
}
func (c *Client) Post(uri string, body interface{}, query map[string]string, result interface{}) error {
c.trackRequest()
defer c.requestDone()
u, err := url.Parse(c.baseURL)
if err != nil {
logger.Error("Failed to parse base URL: %v", err)
return err
}
u.Path = path.Join(u.Path, uri)
if query != nil {
q := u.Query()
for k, v := range query {
q.Set(k, v)
}
u.RawQuery = q.Encode()
}
payload, err := json.Marshal(body)
if err != nil {
logger.Error("Failed to marshal request body: %v", err)
return err
}
req, err := http.NewRequestWithContext(c.ctx, http.MethodPost, u.String(), bytes.NewBuffer(payload))
if err != nil {
logger.Error("Failed to create POST request: %v", err)
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
logger.Error("POST request failed: %v", err)
return err
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
logger.Error("Failed to read response body: %v", err)
return err
}
if err := json.Unmarshal(respBody, result); err != nil {
logger.Error("Failed to unmarshal response: %v", err)
return err
}
return nil
}
func (c *Client) Put(uri string, body interface{}, query map[string]string, result interface{}) error {
c.trackRequest()
defer c.requestDone()
u, err := url.Parse(c.baseURL)
if err != nil {
logger.Error("Failed to parse base URL: %v", err)
return err
}
u.Path = path.Join(u.Path, uri)
if query != nil {
q := u.Query()
for k, v := range query {
q.Set(k, v)
}
u.RawQuery = q.Encode()
}
payload, err := json.Marshal(body)
if err != nil {
logger.Error("Failed to marshal request body: %v", err)
return err
}
req, err := http.NewRequestWithContext(c.ctx, http.MethodPut, u.String(), bytes.NewBuffer(payload))
if err != nil {
logger.Error("Failed to create PUT request: %v", err)
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
logger.Error("PUT request failed: %v", err)
return err
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
logger.Error("Failed to read response body: %v", err)
return err
}
if err := json.Unmarshal(respBody, result); err != nil {
logger.Error("Failed to unmarshal response: %v", err)
return err
}
return nil
}
func (c *Client) Delete(uri string, query map[string]string, result interface{}) error {
c.trackRequest()
defer c.requestDone()
u, err := url.Parse(c.baseURL)
if err != nil {
logger.Error("Failed to parse base URL: %v", err)
return err
}
u.Path = path.Join(u.Path, uri)
if query != nil {
q := u.Query()
for k, v := range query {
q.Set(k, v)
}
u.RawQuery = q.Encode()
}
req, err := http.NewRequestWithContext(c.ctx, http.MethodDelete, u.String(), nil)
if err != nil {
logger.Error("Failed to create DELETE request: %v", err)
return err
}
resp, err := c.client.Do(req)
if err != nil {
logger.Error("DELETE request failed: %v", err)
return err
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
logger.Error("Failed to read response body: %v", err)
return err
}
if err := json.Unmarshal(respBody, result); err != nil {
logger.Error("Failed to unmarshal response: %v", err)
return err
}
return nil
}
func (c *Client) PostToStream(uri string, body interface{}, query map[string]string, messageHandler MessageHandler) error {
c.trackRequest()
defer c.requestDone()
u, err := url.Parse(c.baseURL)
if err != nil {
logger.Error("Failed to parse base URL")
return err
}
u.Path = path.Join(u.Path, uri)
if query != nil {
q := u.Query()
for k, v := range query {
q.Set(k, v)
}
u.RawQuery = q.Encode()
}
payload, err := json.Marshal(body)
if err != nil {
logger.Error("Failed to marshal request body")
return err
}
req, err := http.NewRequestWithContext(c.ctx, http.MethodPost, u.String(), bytes.NewBuffer(payload))
if err != nil {
logger.Error("Failed to create new request")
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
logger.Error("POST request failed")
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logger.Error("Received non-OK status code: %d", resp.StatusCode)
respBody, _ := io.ReadAll(resp.Body)
logger.Debug("Error response body: %s", string(respBody))
return fmt.Errorf("unexpected status: %d", resp.StatusCode)
}
reader := bufio.NewReader(resp.Body)
var buffer strings.Builder
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
done := make(chan struct{})
errChan := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(1)
// 处理数据的协程
go func() {
defer wg.Done()
for {
select {
case <-ticker.C:
if buffer.Len() > 0 {
messageHandler(buffer.String())
buffer.Reset()
}
case <-done:
if buffer.Len() > 0 {
messageHandler(buffer.String())
}
return
}
}
}()
// 读取数据
go func() {
for {
chunk := make([]byte, 256)
n, err := reader.Read(chunk)
if n > 0 {
buffer.Write(chunk[:n])
}
if err != nil {
if err == io.EOF {
break
}
errChan <- err
return
}
}
close(done)
errChan <- nil
}()
// 等待读取是否有错误
if err := <-errChan; err != nil {
logger.Error("Error reading response stream")
return err
}
// 等待 messageHandler 完成处理
wg.Wait()
return nil
}
func (c *Client) Download(uri string, query map[string]string, filepath string) error {
c.trackRequest()
defer c.requestDone()
// 构建请求URL
u, err := url.Parse(c.baseURL)
if err != nil {
logger.Error("Failed to parse base URL: %v", err)
return err
}
u.Path = path.Join(u.Path, uri)
if query != nil {
q := u.Query()
for k, v := range query {
q.Set(k, v)
}
u.RawQuery = q.Encode()
}
// 创建GET请求
req, err := http.NewRequestWithContext(c.ctx, http.MethodGet, u.String(), nil)
if err != nil {
logger.Error("Failed to create GET request: %v", err)
return err
}
// 发送请求
resp, err := c.client.Do(req)
if err != nil {
logger.Error("Download request failed: %v", err)
return err
}
defer resp.Body.Close()
// 检查响应状态
if resp.StatusCode != http.StatusOK {
logger.Error("Received non-OK status code: %d", resp.StatusCode)
return fmt.Errorf("download failed with status code: %d", resp.StatusCode)
}
// 创建目标文件
out, err := os.Create(filepath)
if err != nil {
logger.Error("Failed to create file: %v", err)
return err
}
defer out.Close()
// 将响应内容写入文件
_, err = io.Copy(out, resp.Body)
if err != nil {
logger.Error("Failed to write file: %v", err)
return err
}
return nil
}