finclip-app-manager/vendor/gitlab.finogeeks.club/finclip-backend/rmconf/consul.go

171 lines
4.5 KiB
Go

package rmconf
import (
"encoding/json"
"fmt"
"strings"
"sync"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
)
const KV_PUBLIC_NAME = "public"
type ConsulConfManager struct {
consulKVClient *api.KV
config interface{} // 调用方的config对象指针, 缓存供watch使用
started bool
lastValues sync.Map // 缓存最新的值
path string
address string
}
func (c *ConsulConfManager) IsKeyLower() bool {
return false
}
func (c *ConsulConfManager) InitFromConsul(config interface{}, addr, path string) error {
c.address = addr
c.path = path
c.config = config
if strings.TrimSpace(path) == "" {
return fmt.Errorf("path is empty")
}
client, err := api.NewClient(&api.Config{Address: addr})
if err != nil {
return err
}
c.consulKVClient = client.KV()
pair, _, err := c.consulKVClient.Get(path, nil)
if err != nil {
return err
}
if pair == nil {
// 如果未找到, 初始化一个空json对象
// 否则, 后面的监听会失败
pair = &api.KVPair{Key: path, Value: []byte("{}")}
_, err = c.consulKVClient.Put(pair, nil)
if err != nil {
fmt.Printf("[remote-conf] put err: %v\n", err)
return err
}
}
newValues := make(map[string]interface{})
err = json.Unmarshal(pair.Value, &newValues)
if err != nil {
return err
}
for k, newValue := range newValues {
c.lastValues.Store(k, newValue)
}
fmt.Printf("[remote-conf] path: %v ,allkeys: %v\n", path, c.GetAllKeys())
err = applyRemoteConfig(c.config, c)
if err != nil {
return err
}
return nil
}
func (c *ConsulConfManager) StartWatch() error {
if c.path == "" {
return fmt.Errorf("ConsulConfManager should init first!")
}
if c.started {
return fmt.Errorf("ConsulConfManager started!")
}
c.started = true
params := map[string]interface{}{"type": "key", "key": c.path}
plan, err := watch.Parse(params)
plan.Handler = func(idx uint64, raw interface{}) {
var v *api.KVPair
if raw == nil { // nil is a valid return value
v = nil
} else {
var ok bool
if v, ok = raw.(*api.KVPair); !ok {
return // ignore
}
newValues := make(map[string]interface{})
err = json.Unmarshal(v.Value, &newValues)
// 删除的配置项忽略
for k, newValue := range newValues {
oldValue, ok := c.lastValues.Load(k)
if ok && oldValue == newValue {
continue
}
c.lastValues.Store(k, newValue)
err = update(c.config, strings.ToUpper(k), fmt.Sprintf("%v", newValues[k]))
if err != nil {
fmt.Printf("[remote-conf] error: update config err: %v\n", err)
continue
} else {
fmt.Printf("[remote-conf] update config success [%v]: %v -> %v\n", k, oldValue, newValue)
}
}
}
}
go plan.Run(c.address)
return nil
}
// Get 统一返回字符串, 后续单独处理
func (c *ConsulConfManager) Get(key string) string {
if value, ok := c.lastValues.Load(key); ok {
return fmt.Sprintf("%v", value)
} else {
return ""
}
}
func (c *ConsulConfManager) GetAllKeys() []string {
res := []string{}
c.lastValues.Range(func(key, value interface{}) bool {
res = append(res, fmt.Sprintf("%v", key))
return true
})
return res
}
// StartConsulConfig 供客户端使用的标准方法
// 1. 初始化公共配置
// 2. 初始化服务私有配置并监听
// config必须是一个指针
func StartConsulConfig(config interface{}, kvPath, serverName, addr string) error {
if strings.TrimSpace(addr) == "" {
return fmt.Errorf("consul addr is empty!")
}
if strings.TrimSpace(kvPath) == "" {
return fmt.Errorf("kvPath is empty!")
}
if strings.TrimSpace(serverName) == "" {
return fmt.Errorf("serverName is empty!")
}
remoteAddr := addr
publicKeyPath := fmt.Sprintf("%s/%s", kvPath, KV_PUBLIC_NAME)
privateKeyPath := fmt.Sprintf("%s/%s", kvPath, serverName)
// 优先级 服务配置 > 公共配置 > 环境变量 > 代码中的默认值
{
consulConf := &ConsulConfManager{}
if err := consulConf.InitFromConsul(config, remoteAddr, publicKeyPath); err != nil {
fmt.Printf("read config from consul [%v] [%v] fail: %v\n", remoteAddr, publicKeyPath, err)
return err
}
}
{
consulConf := &ConsulConfManager{}
if err := consulConf.InitFromConsul(config, remoteAddr, privateKeyPath); err != nil {
fmt.Printf("read config from consul [%v] [%v] fail: %v\n", remoteAddr, privateKeyPath, err)
return err
}
if err := consulConf.StartWatch(); err != nil {
fmt.Printf("config watch fail [%v] [%v] fail: %v\n", remoteAddr, privateKeyPath, err)
return err
}
}
fmt.Printf("config watch success [%v] [%v] \n", remoteAddr, privateKeyPath)
return nil
}