获取配置改为post,并且推送productId和instanceId(如果存在)并修复了shm相关问题
This commit is contained in:
parent
fcf57a9d27
commit
ca80035633
|
|
@ -30,7 +30,7 @@ func NewConfigManagerTask() *ConfigManagerTask {
|
|||
|
||||
func (t *ConfigManagerTask) Execute(ctx context.Context) {
|
||||
// 初始化配置管理器
|
||||
ff, err := t.manager.Initialize()
|
||||
ff, err := t.manager.Initialize(false)
|
||||
if err != nil {
|
||||
logger.Error("Failed to initialize config manager: %v", err)
|
||||
return
|
||||
|
|
|
|||
|
|
@ -2,8 +2,6 @@ package main
|
|||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"bash_go_service/config-loader/pkg/manager"
|
||||
|
|
@ -15,30 +13,43 @@ func main() {
|
|||
// Create config manager instance
|
||||
configManager := manager.NewConfigManager(constants.RuleFile, 5*time.Second)
|
||||
|
||||
err := configManager.ClearSharedMemory()
|
||||
if err != nil {
|
||||
logger.Error("Failed to clear shared memory: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
// Initialize the manager
|
||||
if _, err := configManager.Initialize(); err != nil {
|
||||
if _, err := configManager.Initialize(true); err != nil {
|
||||
logger.Error("Failed to initialize config manager: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Start watching for changes
|
||||
configManager.StartWatching()
|
||||
// // Start watching for changes
|
||||
// configManager.StartWatching()
|
||||
|
||||
// Set up signal handling
|
||||
signalChan := make(chan os.Signal, 1)
|
||||
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
// // Set up signal handling
|
||||
// signalChan := make(chan os.Signal, 1)
|
||||
// signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
logger.Info("Configuration manager started. Press Ctrl+C to exit.")
|
||||
// logger.Info("Configuration manager started. Press Ctrl+C to exit.")
|
||||
|
||||
// Wait for shutdown signal
|
||||
<-signalChan
|
||||
logger.Info("Shutdown signal received")
|
||||
// // Wait for shutdown signal
|
||||
// <-signalChan
|
||||
// logger.Info("Shutdown signal received")
|
||||
|
||||
// Clean up
|
||||
configManager.Stop()
|
||||
if err := configManager.ClearSharedMemory(); err != nil {
|
||||
logger.Error("Failed to clear shared memory: %v", err)
|
||||
// // Clean up
|
||||
// configManager.Stop()
|
||||
// if err := configManager.ClearSharedMemory(); err != nil {
|
||||
// logger.Error("Failed to clear shared memory: %v", err)
|
||||
// }
|
||||
|
||||
// logger.Info("Configuration manager shutdown complete")
|
||||
config := configManager.GetCurrentConfig()
|
||||
// 格式化成json输出
|
||||
jsonData, err := config.ToJSON()
|
||||
if err != nil {
|
||||
logger.Error("Failed to convert config to JSON: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
logger.Info("Configuration manager shutdown complete")
|
||||
logger.Info("Current configuration: %s", jsonData)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,10 @@
|
|||
package models
|
||||
|
||||
import "bash_go_service/shared/pkg/constants"
|
||||
import (
|
||||
"bash_go_service/shared/pkg/constants"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Rule 映射 C 结构体
|
||||
type Rule struct {
|
||||
|
|
@ -31,3 +35,39 @@ type JSONConfig struct {
|
|||
Enabled bool `json:"enabled"` // 是否启用
|
||||
Rules []JSONRule `json:"rules"` // 规则列表
|
||||
}
|
||||
|
||||
func (c *ConfigData) ToJSON() (string, error) {
|
||||
|
||||
jsonConfig := JSONConfig{
|
||||
Enabled: c.Enabled,
|
||||
Rules: make([]JSONRule, c.RuleCount),
|
||||
}
|
||||
for i := 0; i < int(c.RuleCount); i++ {
|
||||
jsonConfig.Rules[i] = c.Rules[i].ToJSONRule()
|
||||
}
|
||||
jsonData, err := json.Marshal(jsonConfig)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(jsonData), nil
|
||||
}
|
||||
|
||||
func (r *Rule) ToJSONRule() JSONRule {
|
||||
// 处理C字符串到Go字符串的转换,去除末尾的空字符
|
||||
cmd := strings.TrimRight(string(r.Cmd[:]), "\x00")
|
||||
typ := strings.TrimRight(string(r.Type[:]), "\x00")
|
||||
msg := strings.TrimRight(string(r.Msg[:]), "\x00")
|
||||
|
||||
// 处理参数数组
|
||||
args := make([]string, r.ArgCount)
|
||||
for i := 0; i < int(r.ArgCount); i++ {
|
||||
args[i] = strings.TrimRight(string(r.Args[i][:]), "\x00")
|
||||
}
|
||||
|
||||
return JSONRule{
|
||||
Cmd: cmd,
|
||||
Type: typ,
|
||||
Msg: msg,
|
||||
Args: args,
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,8 +50,19 @@ func shmdt(shmaddr unsafe.Pointer) error {
|
|||
func WriteConfigToSharedMemory(config *models.ConfigData) error {
|
||||
logger.Debug("Starting to write configuration to shared memory")
|
||||
|
||||
// 创建或获取共享内存段
|
||||
shmID, err := shmget(constants.ShmKey, constants.ShmSize, unix.IPC_CREAT|0666)
|
||||
// 使用 unsafe.Alignof 获取对齐要求
|
||||
align := unsafe.Alignof(*config)
|
||||
size := unsafe.Sizeof(*config)
|
||||
|
||||
// 确保大小是对齐的
|
||||
alignedSize := (size + align - 1) &^ (align - 1)
|
||||
|
||||
if alignedSize > constants.ShmSize {
|
||||
return fmt.Errorf("config size %d exceeds shared memory size %d", alignedSize, constants.ShmSize)
|
||||
}
|
||||
|
||||
// 创建共享内存
|
||||
shmID, err := shmget(constants.ShmKey, int(alignedSize), unix.IPC_CREAT|0666)
|
||||
if err != nil {
|
||||
// 获取更详细的错误信息
|
||||
switch err {
|
||||
|
|
@ -71,23 +82,18 @@ func WriteConfigToSharedMemory(config *models.ConfigData) error {
|
|||
return fmt.Errorf("shmget failed: %w", err)
|
||||
}
|
||||
|
||||
// 将共享内存段附加到进程地址空间
|
||||
// 附加共享内存
|
||||
shmPtr, err := shmat(shmID, 0, 0)
|
||||
if err != nil {
|
||||
logger.Error("shmat failed: %v", err)
|
||||
return fmt.Errorf("shmat failed: %w", err)
|
||||
}
|
||||
defer shmdt(shmPtr) // 确保在函数结束时分离共享内存
|
||||
defer shmdt(shmPtr)
|
||||
|
||||
// 检查配置数据大小是否超过共享内存大小
|
||||
configSize := unsafe.Sizeof(*config)
|
||||
if configSize > constants.ShmSize {
|
||||
return fmt.Errorf("configuration size (%d) exceeds shared memory size (%d)", configSize, constants.ShmSize)
|
||||
}
|
||||
// 使用内存屏障确保对齐
|
||||
alignedPtr := unsafe.Pointer((uintptr(shmPtr) + align - 1) &^ (align - 1))
|
||||
|
||||
// 将配置数据复制到共享内存
|
||||
data := (*[constants.ShmSize]byte)(unsafe.Pointer(config))[:]
|
||||
copy((*[constants.ShmSize]byte)(shmPtr)[:], data)
|
||||
// 复制数据
|
||||
*(*models.ConfigData)(alignedPtr) = *config
|
||||
|
||||
logger.Info("Configuration successfully written to shared memory")
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"bash_go_service/shared/pkg/client"
|
||||
"bash_go_service/shared/pkg/constants"
|
||||
"bash_go_service/shared/pkg/logger"
|
||||
"bash_go_service/shared/pkg/utils"
|
||||
)
|
||||
|
||||
func LoadConfigFromFile(filename string) (*models.JSONConfig, error) {
|
||||
|
|
@ -61,13 +62,19 @@ func LoadConfigFromServer() (*models.JSONConfig, error) {
|
|||
var config models.JSONConfig
|
||||
|
||||
cli := client.NewClient()
|
||||
err := cli.Get(constants.GetConfigApi, nil, &config)
|
||||
envs, err := utils.GetProductInfo()
|
||||
if err != nil {
|
||||
logger.Error("Failed to read config file: %v", err)
|
||||
logger.Error("Failed to get product info: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
// 发送请求
|
||||
err = cli.Post(constants.GetConfigApi, envs, nil, &config)
|
||||
if err != nil {
|
||||
logger.Error("Failed to request config file: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logger.Info("Configuration successfully loaded from file with skip rules added")
|
||||
logger.Info("Configuration successfully loaded from server")
|
||||
return &config, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -38,11 +38,19 @@ func NewConfigManager(configFile string, watchInterval time.Duration) *ConfigMan
|
|||
}
|
||||
|
||||
// Initialize 加载初始配置并开始监控配置文件的变化
|
||||
func (cm *ConfigManager) Initialize() (bool, error) {
|
||||
func (cm *ConfigManager) Initialize(force bool) (bool, error) {
|
||||
logger.Info("Initializing ConfigManager")
|
||||
|
||||
if force {
|
||||
logger.Info("Force reloading configuration from file")
|
||||
if _, err := cm.syncFromFile(); err != nil {
|
||||
return false, fmt.Errorf("failed to load config file: %w", err)
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
config, err := shm.ReadConfigFromSharedMemory()
|
||||
// 尝试从共享内存中读取现有配置
|
||||
if config, err := shm.ReadConfigFromSharedMemory(); err == nil {
|
||||
if err == nil {
|
||||
cm.currentConfig = config
|
||||
logger.Info("Loaded existing configuration from shared memory. Enabled: %v, Rule Count: %d",
|
||||
config.Enabled, config.RuleCount)
|
||||
|
|
@ -107,7 +115,10 @@ func (cm *ConfigManager) ForceSync() error {
|
|||
formFile, err := cm.syncFromFile()
|
||||
logger.Info("Force syncing configuration, fromFile: %v", formFile)
|
||||
logger.Debug("current config count %v", cm.currentConfig.RuleCount)
|
||||
return err
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to sync configuration: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ClearSharedMemory 清除共享内存中的配置
|
||||
|
|
|
|||
|
|
@ -0,0 +1,6 @@
|
|||
package constants
|
||||
|
||||
const (
|
||||
ProductEnvName = "BASH_PRODUCT_ID" // 产品 ID 环境变量名称
|
||||
BashInstanceId = "BASH_INSTANCE_ID" // 实例 ID 环境变量名称
|
||||
)
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
package utils
|
||||
|
||||
import (
|
||||
"bash_go_service/shared/pkg/constants"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type ProductInfo struct {
|
||||
BashProductId string `json:"productId"`
|
||||
BashInstanceId string `json:"instanceId"`
|
||||
}
|
||||
|
||||
func GetProductInfo() (*ProductInfo, error) {
|
||||
envs := os.Environ()
|
||||
var productId, instanceId string
|
||||
for _, env := range envs {
|
||||
if strings.HasPrefix(env, constants.ProductEnvName) {
|
||||
productId = strings.TrimPrefix(env, constants.ProductEnvName+"=")
|
||||
} else if strings.HasPrefix(env, constants.BashInstanceId) {
|
||||
instanceId = strings.TrimPrefix(env, constants.BashInstanceId+"=")
|
||||
}
|
||||
}
|
||||
if productId == "" {
|
||||
return nil, fmt.Errorf("product ID not found in environment variables")
|
||||
}
|
||||
return &ProductInfo{
|
||||
BashProductId: productId,
|
||||
BashInstanceId: instanceId,
|
||||
}, nil
|
||||
}
|
||||
Loading…
Reference in New Issue