From 322e9f39a34a45f17956085f0d4377e67f403d31 Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 11 Jan 2024 15:01:01 +0800 Subject: [PATCH] fix: Remove Unnecessary lock in config manager (#29855) issue: #29709 #291712 pr: #29836 to avoid concurrent recursive RLock and Lock cause deadlock, This PR remove the unnecessary lock in config manager --------- Signed-off-by: Wei Liu --- pkg/config/manager.go | 151 ++++++++---------- pkg/config/manager_test.go | 132 +++++++++++++++ .../mqwrapper/kafka/kafka_client_test.go | 4 +- pkg/util/paramtable/service_param_test.go | 4 +- 4 files changed, 201 insertions(+), 90 deletions(-) diff --git a/pkg/config/manager.go b/pkg/config/manager.go index 77fbb4b9a1..8545234ac7 100644 --- a/pkg/config/manager.go +++ b/pkg/config/manager.go @@ -18,7 +18,6 @@ package config import ( "fmt" "strings" - "sync" "github.com/cockroachdb/errors" "go.uber.org/zap" @@ -80,36 +79,33 @@ func filterate(key string, filters ...Filter) (string, bool) { } type Manager struct { - sync.RWMutex Dispatcher *EventDispatcher - sources map[string]Source - keySourceMap map[string]string // store the key to config source, example: key is A.B.C and source is file which means the A.B.C's value is from file - overlays map[string]string // store the highest priority configs which modified at runtime - forbiddenKeys typeutil.Set[string] + sources *typeutil.ConcurrentMap[string, Source] + keySourceMap *typeutil.ConcurrentMap[string, string] // store the key to config source, example: key is A.B.C and source is file which means the A.B.C's value is from file + overlays *typeutil.ConcurrentMap[string, string] // store the highest priority configs which modified at runtime + forbiddenKeys *typeutil.ConcurrentSet[string] } func NewManager() *Manager { return &Manager{ Dispatcher: NewEventDispatcher(), - sources: make(map[string]Source), - keySourceMap: make(map[string]string), - overlays: make(map[string]string), - forbiddenKeys: typeutil.NewSet[string](), + sources: typeutil.NewConcurrentMap[string, Source](), + keySourceMap: typeutil.NewConcurrentMap[string, string](), + overlays: typeutil.NewConcurrentMap[string, string](), + forbiddenKeys: typeutil.NewConcurrentSet[string](), } } func (m *Manager) GetConfig(key string) (string, error) { - m.RLock() - defer m.RUnlock() realKey := formatKey(key) - v, ok := m.overlays[realKey] + v, ok := m.overlays.Get(realKey) if ok { if v == TombValue { return "", fmt.Errorf("key not found %s", key) } return v, nil } - sourceName, ok := m.keySourceMap[realKey] + sourceName, ok := m.keySourceMap.Get(realKey) if !ok { return "", fmt.Errorf("key not found: %s", key) } @@ -118,27 +114,27 @@ func (m *Manager) GetConfig(key string) (string, error) { // GetConfigs returns all the key values func (m *Manager) GetConfigs() map[string]string { - m.RLock() - defer m.RUnlock() config := make(map[string]string) - for key := range m.keySourceMap { + m.keySourceMap.Range(func(key, value string) bool { sValue, err := m.GetConfig(key) if err != nil { - continue + return true } + config[key] = sValue - } - for key, value := range m.overlays { + return true + }) + + m.overlays.Range(func(key, value string) bool { config[key] = value - } + return true + }) return config } func (m *Manager) GetBy(filters ...Filter) map[string]string { - m.RLock() - defer m.RUnlock() matchedConfig := make(map[string]string) for key, value := range m.GetConfigs() { @@ -152,37 +148,33 @@ func (m *Manager) GetBy(filters ...Filter) map[string]string { } func (m *Manager) FileConfigs() map[string]string { - m.RLock() - defer m.RUnlock() config := make(map[string]string) - for _, source := range m.sources { - if s, ok := source.(*FileSource); ok { + m.sources.Range(func(key string, value Source) bool { + if s, ok := value.(*FileSource); ok { config, _ = s.GetConfigurations() - break + return false } - } + return true + }) return config } func (m *Manager) Close() { - m.Lock() - defer m.Unlock() - for _, s := range m.sources { - s.Close() - } + m.sources.Range(func(key string, value Source) bool { + value.Close() + return true + }) } func (m *Manager) AddSource(source Source) error { - m.Lock() - defer m.Unlock() sourceName := source.GetSourceName() - _, ok := m.sources[sourceName] + _, ok := m.sources.Get(sourceName) if ok { err := errors.New("duplicate source supplied") return err } - m.sources[sourceName] = source + m.sources.Insert(sourceName, source) err := m.pullSourceConfigs(sourceName) if err != nil { @@ -198,55 +190,43 @@ func (m *Manager) AddSource(source Source) error { // Update config at runtime, which can be called by others // The most used scenario is UT func (m *Manager) SetConfig(key, value string) { - m.Lock() - defer m.Unlock() - m.overlays[formatKey(key)] = value + m.overlays.Insert(formatKey(key), value) } func (m *Manager) SetMapConfig(key, value string) { - m.Lock() - defer m.Unlock() - m.overlays[strings.ToLower(key)] = value + m.overlays.Insert(strings.ToLower(key), value) } // Delete config at runtime, which has the highest priority to override all other sources func (m *Manager) DeleteConfig(key string) { - m.Lock() - defer m.Unlock() - m.overlays[formatKey(key)] = TombValue + m.overlays.Insert(formatKey(key), TombValue) } // Remove the config which set at runtime, use config from sources func (m *Manager) ResetConfig(key string) { - m.Lock() - defer m.Unlock() - delete(m.overlays, formatKey(key)) + m.overlays.Remove(formatKey(key)) } // Ignore any of update events, which means the config cannot auto refresh anymore func (m *Manager) ForbidUpdate(key string) { - m.Lock() - defer m.Unlock() m.forbiddenKeys.Insert(formatKey(key)) } func (m *Manager) UpdateSourceOptions(opts ...Option) { - m.Lock() - defer m.Unlock() - var options Options for _, opt := range opts { opt(&options) } - for _, source := range m.sources { - source.UpdateOptions(options) - } + m.sources.Range(func(key string, value Source) bool { + value.UpdateOptions(options) + return true + }) } // Do not use it directly, only used when add source and unittests. func (m *Manager) pullSourceConfigs(source string) error { - configSource, ok := m.sources[source] + configSource, ok := m.sources.Get(source) if !ok { return errors.New("invalid source or source not added") } @@ -259,21 +239,21 @@ func (m *Manager) pullSourceConfigs(source string) error { sourcePriority := configSource.GetPriority() for key := range configs { - sourceName, ok := m.keySourceMap[key] + sourceName, ok := m.keySourceMap.Get(key) if !ok { // if key do not exist then add source - m.keySourceMap[key] = source + m.keySourceMap.Insert(key, source) continue } - currentSource, ok := m.sources[sourceName] + currentSource, ok := m.sources.Get(sourceName) if !ok { - m.keySourceMap[key] = source + m.keySourceMap.Insert(key, source) continue } currentSrcPriority := currentSource.GetPriority() if currentSrcPriority > sourcePriority { // lesser value has high priority - m.keySourceMap[key] = source + m.keySourceMap.Insert(key, source) } } @@ -281,7 +261,7 @@ func (m *Manager) pullSourceConfigs(source string) error { } func (m *Manager) getConfigValueBySource(configKey, sourceName string) (string, error) { - source, ok := m.sources[sourceName] + source, ok := m.sources.Get(sourceName) if !ok { return "", ErrKeyNotFound } @@ -296,9 +276,9 @@ func (m *Manager) updateEvent(e *Event) error { } switch e.EventType { case CreateType, UpdateType: - sourceName, ok := m.keySourceMap[e.Key] + sourceName, ok := m.keySourceMap.Get(e.Key) if !ok { - m.keySourceMap[e.Key] = e.EventSource + m.keySourceMap.Insert(e.Key, e.EventSource) e.EventType = CreateType } else if sourceName == e.EventSource { e.EventType = UpdateType @@ -310,12 +290,12 @@ func (m *Manager) updateEvent(e *Event) error { e.EventSource, sourceName)) return ErrIgnoreChange } - m.keySourceMap[e.Key] = e.EventSource + m.keySourceMap.Insert(e.Key, e.EventSource) e.EventType = UpdateType } case DeleteType: - sourceName, ok := m.keySourceMap[e.Key] + sourceName, ok := m.keySourceMap.Get(e.Key) if !ok || sourceName != e.EventSource { // if delete event generated from source not maintained ignore it log.Info(fmt.Sprintf("the event source %s (expect %s) is not maintained, ignore", @@ -325,9 +305,9 @@ func (m *Manager) updateEvent(e *Event) error { // find less priority source or delete key source := m.findNextBestSource(e.Key, sourceName) if source == nil { - delete(m.keySourceMap, e.Key) + m.keySourceMap.Remove(e.Key) } else { - m.keySourceMap[e.Key] = source.GetSourceName() + m.keySourceMap.Insert(e.Key, source.GetSourceName()) } } } @@ -339,8 +319,6 @@ func (m *Manager) updateEvent(e *Event) error { // OnEvent Triggers actions when an event is generated func (m *Manager) OnEvent(event *Event) { - m.Lock() - defer m.Unlock() if m.forbiddenKeys.Contain(formatKey(event.Key)) { log.Info("ignore event for forbidden key", zap.String("key", event.Key)) return @@ -358,31 +336,32 @@ func (m *Manager) GetIdentifier() string { return "Manager" } -func (m *Manager) findNextBestSource(key string, sourceName string) Source { +func (m *Manager) findNextBestSource(configKey string, sourceName string) Source { var rSource Source - for _, source := range m.sources { - if source.GetSourceName() == sourceName { - continue + m.sources.Range(func(key string, value Source) bool { + if value.GetSourceName() == sourceName { + return true } - _, err := source.GetConfigurationByKey(key) + _, err := value.GetConfigurationByKey(configKey) if err != nil { - continue + return true } if rSource == nil { - rSource = source - continue + rSource = value + return true } - if source.GetPriority() < rSource.GetPriority() { // less value has high priority - rSource = source + if value.GetPriority() < rSource.GetPriority() { // less value has high priority + rSource = value } - } + return true + }) return rSource } func (m *Manager) getHighPrioritySource(srcNameA, srcNameB string) Source { - sourceA, okA := m.sources[srcNameA] - sourceB, okB := m.sources[srcNameB] + sourceA, okA := m.sources.Get(srcNameA) + sourceB, okB := m.sources.Get(srcNameB) if !okA && !okB { return nil diff --git a/pkg/config/manager_test.go b/pkg/config/manager_test.go index 2635f07979..bef70835ae 100644 --- a/pkg/config/manager_test.go +++ b/pkg/config/manager_test.go @@ -17,6 +17,7 @@ package config import ( + "context" "os" "path" "testing" @@ -24,6 +25,9 @@ import ( "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/server/v3/embed" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" + "golang.org/x/sync/errgroup" ) func TestAllConfigFromManager(t *testing.T) { @@ -69,6 +73,134 @@ func TestAllDupliateSource(t *testing.T) { assert.Error(t, err, "invalid source or source not added") } +func TestBasic(t *testing.T) { + mgr, _ := Init() + + // test set config + mgr.SetConfig("a.b", "aaa") + value, err := mgr.GetConfig("a.b") + assert.NoError(t, err) + assert.Equal(t, value, "aaa") + _, err = mgr.GetConfig("a.a") + assert.Error(t, err) + + // test delete config + mgr.SetConfig("a.b", "aaa") + mgr.DeleteConfig("a.b") + assert.Error(t, err) + + // test reset config + mgr.ResetConfig("a.b") + assert.Error(t, err) + + // test forbid config + envSource := NewEnvSource(formatKey) + err = mgr.AddSource(envSource) + assert.NoError(t, err) + + envSource.configs.Insert("ab", "aaa") + mgr.OnEvent(&Event{ + EventSource: envSource.GetSourceName(), + EventType: CreateType, + Key: "ab", + Value: "aaa", + }) + value, err = mgr.GetConfig("a.b") + assert.NoError(t, err) + assert.Equal(t, value, "aaa") + + mgr.ForbidUpdate("a.b") + mgr.OnEvent(&Event{ + EventSource: envSource.GetSourceName(), + EventType: UpdateType, + Key: "a.b", + Value: "bbb", + }) + value, err = mgr.GetConfig("a.b") + assert.NoError(t, err) + assert.Equal(t, value, "aaa") + + configs := mgr.FileConfigs() + assert.Len(t, configs, 0) +} + +func TestOnEvent(t *testing.T) { + cfg, _ := embed.ConfigFromFile("../../configs/advanced/etcd.yaml") + cfg.Dir = "/tmp/milvus/test" + e, err := embed.StartEtcd(cfg) + assert.NoError(t, err) + defer e.Close() + defer os.RemoveAll(cfg.Dir) + + client := v3client.New(e.Server) + + dir, _ := os.MkdirTemp("", "milvus") + yamlFile := path.Join(dir, "milvus.yaml") + mgr, _ := Init(WithEnvSource(formatKey), + WithFilesSource(&FileInfo{ + Files: []string{yamlFile}, + RefreshInterval: 10 * time.Millisecond, + }), + WithEtcdSource(&EtcdInfo{ + Endpoints: []string{cfg.ACUrls[0].Host}, + KeyPrefix: "test", + RefreshInterval: 10 * time.Millisecond, + })) + + os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600) + time.Sleep(time.Second) + value, err := mgr.GetConfig("a.b") + assert.NoError(t, err) + assert.Equal(t, value, "aaa") + + ctx := context.Background() + client.KV.Put(ctx, "test/config/a/b", "bbb") + time.Sleep(time.Second) + value, err = mgr.GetConfig("a.b") + assert.NoError(t, err) + assert.Equal(t, value, "bbb") + + client.KV.Put(ctx, "test/config/a/b", "ccc") + time.Sleep(time.Second) + value, err = mgr.GetConfig("a.b") + assert.NoError(t, err) + assert.Equal(t, value, "ccc") + + os.WriteFile(yamlFile, []byte("a.b: ddd"), 0o600) + time.Sleep(time.Second) + value, err = mgr.GetConfig("a.b") + assert.NoError(t, err) + assert.Equal(t, value, "ccc") + + client.KV.Delete(ctx, "test/config/a/b") + time.Sleep(time.Second) + value, err = mgr.GetConfig("a.b") + assert.NoError(t, err) + assert.Equal(t, value, "ddd") +} + +func TestDeadlock(t *testing.T) { + mgr, _ := Init() + + // test concurrent lock and recursive rlock + wg, _ := errgroup.WithContext(context.Background()) + wg.Go(func() error { + for i := 0; i < 100; i++ { + mgr.GetBy(WithPrefix("rootcoord.")) + } + return nil + }) + + wg.Go(func() error { + for i := 0; i < 100; i++ { + mgr.SetConfig("rootcoord.xxx", "111") + } + return nil + }) + + wg.Wait() +} + type ErrSource struct{} func (e ErrSource) Close() { diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go index b33d425a2e..db5c04b0ea 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go @@ -313,13 +313,13 @@ func createParamItem(v string) paramtable.ParamItem { item := paramtable.ParamItem{ Formatter: func(originValue string) string { return v }, } - item.Init(&config.Manager{}) + item.Init(config.NewManager()) return item }*/ func initParamItem(item *paramtable.ParamItem, v string) { item.Formatter = func(originValue string) string { return v } - item.Init(&config.Manager{}) + item.Init(config.NewManager()) } type kafkaCfgOption func(cfg *paramtable.KafkaConfig) diff --git a/pkg/util/paramtable/service_param_test.go b/pkg/util/paramtable/service_param_test.go index 847301ce93..e0fd6ab52a 100644 --- a/pkg/util/paramtable/service_param_test.go +++ b/pkg/util/paramtable/service_param_test.go @@ -84,7 +84,7 @@ func TestServiceParam(t *testing.T) { // test default value { pc := &PulsarConfig{} - base := &BaseTable{mgr: &config.Manager{}} + base := &BaseTable{mgr: config.NewManager()} pc.Init(base) assert.Empty(t, pc.Address.GetValue()) } @@ -163,7 +163,7 @@ func TestServiceParam(t *testing.T) { // test default value { kc := &KafkaConfig{} - base := &BaseTable{mgr: &config.Manager{}} + base := &BaseTable{mgr: config.NewManager()} kc.Init(base) assert.Empty(t, kc.Address.GetValue()) assert.Equal(t, kc.SaslMechanisms.GetValue(), "PLAIN")