179 lines
5.0 KiB
Go
179 lines
5.0 KiB
Go
package manager
|
||
|
||
import (
|
||
"fmt"
|
||
"os"
|
||
"sync"
|
||
"time"
|
||
|
||
"bash_go_service/config-loader/internal/models"
|
||
"bash_go_service/config-loader/internal/shm"
|
||
"bash_go_service/config-loader/internal/utils"
|
||
"bash_go_service/shared/pkg/logger"
|
||
|
||
"github.com/spf13/viper"
|
||
)
|
||
|
||
// ConfigManager 负责配置的加载和同步
|
||
type ConfigManager struct {
|
||
configFile string // 配置文件路径
|
||
lastModified time.Time // 配置文件的最后修改时间
|
||
watchInterval time.Duration // 配置文件监控的时间间隔
|
||
stopChan chan struct{} // 停止信号通道
|
||
wg sync.WaitGroup // 等待组,用于管理 goroutine
|
||
mu sync.RWMutex // 读写锁,保护 currentConfig 的并发访问
|
||
currentConfig *models.ConfigData // 当前的配置数据
|
||
}
|
||
|
||
// NewConfigManager 创建一个新的 ConfigManager 实例
|
||
func NewConfigManager(configFile string, watchInterval time.Duration) *ConfigManager {
|
||
if watchInterval == 0 {
|
||
watchInterval = 5 * time.Second // 默认监控间隔为 5 秒
|
||
}
|
||
return &ConfigManager{
|
||
configFile: configFile,
|
||
watchInterval: watchInterval,
|
||
stopChan: make(chan struct{}),
|
||
}
|
||
}
|
||
|
||
// Initialize 加载初始配置并开始监控配置文件的变化
|
||
func (cm *ConfigManager) Initialize() error {
|
||
logger.Info("Initializing ConfigManager")
|
||
|
||
// 尝试从共享内存中读取现有配置
|
||
if config, err := shm.ReadConfigFromSharedMemory(); err == nil {
|
||
cm.currentConfig = config
|
||
logger.Info("Loaded existing configuration from shared memory. Enabled: %v, Rule Count: %d",
|
||
config.Enabled, config.RuleCount)
|
||
if config.RuleCount == 0 {
|
||
cm.ForceSync() // 如果规则数量为 0,强制同步配置
|
||
}
|
||
} else {
|
||
logger.Warn("Failed to read from shared memory: %v", err)
|
||
// 如果从共享内存读取失败,则从文件加载
|
||
if err := cm.syncFromFile(); err != nil {
|
||
return fmt.Errorf("initial configuration load failed: %w", err)
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// StartWatching 开始监控配置文件的变化
|
||
func (cm *ConfigManager) StartWatching() {
|
||
cm.wg.Add(1)
|
||
go cm.watchConfigFile()
|
||
logger.Info("Started watching configuration file for changes")
|
||
}
|
||
|
||
// Stop 优雅地停止配置管理器
|
||
func (cm *ConfigManager) Stop() {
|
||
logger.Info("Stopping ConfigManager")
|
||
close(cm.stopChan)
|
||
cm.wg.Wait()
|
||
}
|
||
|
||
// GetCurrentConfig 返回当前的配置(线程安全)
|
||
func (cm *ConfigManager) GetCurrentConfig() *models.ConfigData {
|
||
cm.mu.RLock()
|
||
defer cm.mu.RUnlock()
|
||
return cm.currentConfig
|
||
}
|
||
|
||
// ForceSync 强制从文件重新加载配置
|
||
func (cm *ConfigManager) ForceSync() error {
|
||
logger.Info("Force syncing configuration from file")
|
||
return cm.syncFromFile()
|
||
}
|
||
|
||
// ClearSharedMemory 清除共享内存中的配置
|
||
func (cm *ConfigManager) ClearSharedMemory() error {
|
||
logger.Info("Clearing shared memory configuration")
|
||
// 这里可以实现实际的清除逻辑
|
||
// 当前仅写入一个空配置
|
||
emptyConfig := &models.ConfigData{
|
||
Enabled: false,
|
||
RuleCount: 0,
|
||
}
|
||
return shm.WriteConfigToSharedMemory(emptyConfig)
|
||
}
|
||
|
||
// 内部方法
|
||
|
||
// syncFromFile 从文件加载配置
|
||
func (cm *ConfigManager) syncFromFile() error {
|
||
cm.mu.Lock()
|
||
defer cm.mu.Unlock()
|
||
|
||
loader := viper.GetString("bash_config.loader")
|
||
var jsonConfig *models.JSONConfig
|
||
var err error
|
||
if loader == "SERVER" {
|
||
logger.Info("Loading configuration from server")
|
||
jsonConfig, err = utils.LoadConfigFromServer()
|
||
} else if loader == "FILE" {
|
||
logger.Info("Loading configuration from file")
|
||
jsonConfig, err = utils.LoadConfigFromFile(cm.configFile)
|
||
} else {
|
||
logger.Info("No loader specified, loading configuration from server by default")
|
||
jsonConfig, err = utils.LoadConfigFromServer()
|
||
}
|
||
if err != nil {
|
||
return fmt.Errorf("failed to load config file: %w", err)
|
||
}
|
||
|
||
// 转换为共享内存格式并写入共享内存
|
||
cConfig := utils.ConvertToCConfig(jsonConfig)
|
||
if err := shm.WriteConfigToSharedMemory(cConfig); err != nil {
|
||
return fmt.Errorf("failed to write to shared memory: %w", err)
|
||
}
|
||
|
||
cm.currentConfig = cConfig
|
||
if fileInfo, err := os.Stat(cm.configFile); err == nil {
|
||
cm.lastModified = fileInfo.ModTime() // 更新最后修改时间
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// watchConfigFile 监控配置文件的变化
|
||
func (cm *ConfigManager) watchConfigFile() {
|
||
defer cm.wg.Done()
|
||
|
||
ticker := time.NewTicker(cm.watchInterval)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-cm.stopChan:
|
||
logger.Info("Stopping configuration file watcher")
|
||
return
|
||
case <-ticker.C:
|
||
if err := cm.checkAndReloadConfig(); err != nil {
|
||
logger.Error("Error checking/reloading config: %v", err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// checkAndReloadConfig 检查并重新加载配置文件
|
||
func (cm *ConfigManager) checkAndReloadConfig() error {
|
||
fileInfo, err := os.Stat(cm.configFile)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to stat config file: %w", err)
|
||
}
|
||
|
||
cm.mu.RLock()
|
||
lastMod := cm.lastModified
|
||
cm.mu.RUnlock()
|
||
|
||
// 如果文件修改时间更新,则重新加载配置
|
||
if fileInfo.ModTime().After(lastMod) {
|
||
logger.Info("Configuration file changed, reloading")
|
||
return cm.syncFromFile()
|
||
}
|
||
|
||
return nil
|
||
}
|