From 283f5731d23a4c4312ce6bac6101644f7a150fbb Mon Sep 17 00:00:00 2001 From: Enwei Jiao Date: Mon, 1 Aug 2022 10:04:33 +0800 Subject: [PATCH] config from etcd (#18421) Signed-off-by: Enwei Jiao --- internal/config/config.go | 60 +++ internal/config/config_test.go | 142 +++++++ internal/config/env_source.go | 81 ++++ internal/config/etcd_source.go | 166 ++++++++ internal/config/event.go | 65 +++ internal/config/file_source.go | 119 ++++++ internal/config/manager.go | 273 +++++++++++++ internal/config/manager_test.go | 73 ++++ internal/config/source.go | 79 ++++ internal/config/source_test.go | 36 ++ internal/core/build.sh | 186 --------- internal/core/run_clang_format.sh | 18 - internal/core/ubuntu_build_deps.sh | 3 - internal/kv/etcd/embed_etcd_config_test.go | 11 +- internal/kv/etcd/embed_etcd_kv.go | 2 +- internal/kv/etcd/embed_etcd_kv_test.go | 9 +- internal/kv/mem/mem_kv.go | 28 -- internal/util/paramtable/base_table.go | 386 ++++++++---------- internal/util/paramtable/base_table_test.go | 119 ++---- internal/util/paramtable/component_param.go | 7 +- .../util/paramtable/component_param_test.go | 4 - internal/util/paramtable/param_item.go | 20 + internal/util/paramtable/service_param.go | 47 ++- scripts/run_cpp_unittest.sh | 1 - scripts/run_go_unittest.sh | 1 + 25 files changed, 1363 insertions(+), 573 deletions(-) create mode 100644 internal/config/config.go create mode 100644 internal/config/config_test.go create mode 100644 internal/config/env_source.go create mode 100644 internal/config/etcd_source.go create mode 100644 internal/config/event.go create mode 100644 internal/config/file_source.go create mode 100644 internal/config/manager.go create mode 100644 internal/config/manager_test.go create mode 100644 internal/config/source.go create mode 100644 internal/config/source_test.go delete mode 100755 internal/core/build.sh delete mode 100755 internal/core/run_clang_format.sh delete mode 100755 internal/core/ubuntu_build_deps.sh create mode 100644 internal/util/paramtable/param_item.go diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000000..21475bbcc1 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,60 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "errors" + "strings" +) + +var ( + ErrNotInitial = errors.New("config is not initialized") + ErrIgnoreChange = errors.New("ignore change") + ErrKeyNotFound = errors.New("key not found") +) + +func Init(opts ...Option) (*Manager, error) { + o := &Options{} + for _, opt := range opts { + opt(o) + } + sourceManager := NewManager() + if o.File != nil { + sourceManager.AddSource(NewFileSource(*o.File)) + } + if o.EnvKeyFormatter != nil { + sourceManager.AddSource(NewEnvSource(o.EnvKeyFormatter)) + } + if o.EtcdInfo != nil { + s, err := NewEtcdSource(o.EtcdInfo) + if err != nil { + return nil, err + } + s.eh = sourceManager + sourceManager.AddSource(s) + } + return sourceManager, nil + +} + +func formatKey(key string) string { + ret := strings.ToLower(key) + ret = strings.ReplaceAll(ret, "/", "") + ret = strings.ReplaceAll(ret, "_", "") + ret = strings.ReplaceAll(ret, ".", "") + return ret +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000000..7d8e43cd11 --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,142 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/server/v3/embed" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" +) + +func TestConfigFromEnv(t *testing.T) { + mgr, _ := Init() + _, err := mgr.GetConfig("test.env") + assert.EqualError(t, err, "key not found: test.env") + + os.Setenv("TEST_ENV", "value") + mgr, _ = Init(WithEnvSource(formatKey)) + + v, err := mgr.GetConfig("test.env") + assert.Nil(t, err) + assert.Equal(t, "value", v) + + v, err = mgr.GetConfig("TEST_ENV") + assert.Nil(t, err) + assert.Equal(t, "value", v) +} + +func TestConfigFromRemote(t *testing.T) { + cfg, _ := embed.ConfigFromFile("../../configs/advanced/etcd.yaml") + cfg.Dir = "/tmp/milvus/test" + e, err := embed.StartEtcd(cfg) + assert.Nil(t, err) + defer e.Close() + defer os.RemoveAll(cfg.Dir) + + client := v3client.New(e.Server) + + os.Setenv("TMP_KEY", "1") + os.Setenv("log.level", "info") + mgr, _ := Init(WithEnvSource(formatKey), + WithFilesSource("../../configs/milvus.yaml"), + WithEtcdSource(&EtcdInfo{ + Endpoints: []string{cfg.ACUrls[0].Host}, + KeyPrefix: "test", + RefreshMode: ModeInterval, + RefreshInterval: 10 * time.Millisecond, + })) + ctx := context.Background() + + t.Run("origin is empty", func(t *testing.T) { + _, err = mgr.GetConfig("test.etcd") + assert.EqualError(t, err, "key not found: test.etcd") + + client.KV.Put(ctx, "test/config/test/etcd", "value") + + time.Sleep(100 * time.Millisecond) + + v, err := mgr.GetConfig("test.etcd") + assert.Nil(t, err) + assert.Equal(t, "value", v) + v, err = mgr.GetConfig("TEST_ETCD") + assert.Nil(t, err) + assert.Equal(t, "value", v) + + client.KV.Delete(ctx, "test/config/test/etcd") + time.Sleep(100 * time.Millisecond) + + _, err = mgr.GetConfig("TEST_ETCD") + assert.EqualError(t, err, "key not found: TEST_ETCD") + }) + + t.Run("override origin value", func(t *testing.T) { + v, _ := mgr.GetConfig("tmp.key") + assert.Equal(t, "1", v) + client.KV.Put(ctx, "test/config/tmp/key", "2") + + time.Sleep(100 * time.Millisecond) + + v, _ = mgr.GetConfig("tmp.key") + assert.Equal(t, "2", v) + + client.KV.Put(ctx, "test/config/tmp/key", "3") + + time.Sleep(100 * time.Millisecond) + + v, _ = mgr.GetConfig("tmp.key") + assert.Equal(t, "3", v) + + client.KV.Delete(ctx, "test/config/tmp/key") + time.Sleep(100 * time.Millisecond) + + v, _ = mgr.GetConfig("tmp.key") + assert.Equal(t, "1", v) + }) + + t.Run("multi priority", func(t *testing.T) { + v, _ := mgr.GetConfig("log.level") + assert.Equal(t, "info", v) + client.KV.Put(ctx, "test/config/log/level", "error") + + time.Sleep(100 * time.Millisecond) + + v, _ = mgr.GetConfig("log.level") + assert.Equal(t, "error", v) + + client.KV.Delete(ctx, "test/config/log/level") + time.Sleep(100 * time.Millisecond) + + v, _ = mgr.GetConfig("log.level") + assert.Equal(t, "info", v) + }) + + t.Run("close manager", func(t *testing.T) { + mgr.Close() + + client.KV.Put(ctx, "test/config/test/etcd", "value2") + time.Sleep(100) + + _, err = mgr.GetConfig("test.etcd") + assert.EqualError(t, err, "key not found: test.etcd") + }) + +} diff --git a/internal/config/env_source.go b/internal/config/env_source.go new file mode 100644 index 0000000000..ea7eb24d6a --- /dev/null +++ b/internal/config/env_source.go @@ -0,0 +1,81 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package config + +import ( + "fmt" + "os" + "strings" + "sync" +) + +type EnvSource struct { + configs sync.Map + KeyFormatter func(string) string +} + +func NewEnvSource(KeyFormatter func(string) string) EnvSource { + es := EnvSource{ + configs: sync.Map{}, + KeyFormatter: KeyFormatter, + } + for _, value := range os.Environ() { + rs := []rune(value) + in := strings.Index(value, "=") + key := string(rs[0:in]) + value := string(rs[in+1:]) + envKey := KeyFormatter(key) + es.configs.Store(key, value) + es.configs.Store(envKey, value) + + } + return es +} + +// GetConfigurationByKey implements ConfigSource +func (es EnvSource) GetConfigurationByKey(key string) (string, error) { + value, ok := es.configs.Load(key) + if !ok { + return "", fmt.Errorf("key not found: %s", key) + } + + return value.(string), nil +} + +// GetConfigurations implements ConfigSource +func (es EnvSource) GetConfigurations() (map[string]string, error) { + configMap := make(map[string]string) + es.configs.Range(func(k, v interface{}) bool { + configMap[k.(string)] = v.(string) + return true + }) + + return configMap, nil +} + +// GetPriority implements ConfigSource +func (es EnvSource) GetPriority() int { + return NormalPriority +} + +// GetSourceName implements ConfigSource +func (es EnvSource) GetSourceName() string { + return "EnvironmentSource" +} + +func (es EnvSource) Close() { + +} diff --git a/internal/config/etcd_source.go b/internal/config/etcd_source.go new file mode 100644 index 0000000000..c9b8331845 --- /dev/null +++ b/internal/config/etcd_source.go @@ -0,0 +1,166 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/milvus-io/milvus/internal/log" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" +) + +const ( + ModeWatch = iota + ModeInterval +) + +type EtcdSource struct { + sync.RWMutex + etcdCli *clientv3.Client + ctx context.Context + currentConfig map[string]string + keyPrefix string + refreshMode int + refreshInterval time.Duration + intervalDone chan bool + intervalInitOnce sync.Once + eh EventHandler +} + +func NewEtcdSource(remoteInfo *EtcdInfo) (*EtcdSource, error) { + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: remoteInfo.Endpoints, + DialTimeout: 5 * time.Second, + }) + if err != nil { + return nil, err + } + return &EtcdSource{ + etcdCli: etcdCli, + ctx: context.Background(), + currentConfig: make(map[string]string), + keyPrefix: remoteInfo.KeyPrefix, + refreshMode: remoteInfo.RefreshMode, + refreshInterval: remoteInfo.RefreshInterval, + intervalDone: make(chan bool, 1), + }, nil +} + +// GetConfigurationByKey implements ConfigSource +func (es *EtcdSource) GetConfigurationByKey(key string) (string, error) { + es.RLock() + v, ok := es.currentConfig[key] + es.RUnlock() + if !ok { + return "", fmt.Errorf("key not found: %s", key) + } + return v, nil +} + +// GetConfigurations implements ConfigSource +func (es *EtcdSource) GetConfigurations() (map[string]string, error) { + configMap := make(map[string]string) + err := es.refreshConfigurations() + if err != nil { + return nil, err + } + if es.refreshMode == ModeInterval { + es.intervalInitOnce.Do(func() { + go es.refreshConfigurationsPeriodically() + }) + } + + es.RLock() + for key, value := range es.currentConfig { + configMap[key] = value + } + es.RUnlock() + + return configMap, nil +} + +// GetPriority implements ConfigSource +func (es *EtcdSource) GetPriority() int { + return HighPriority +} + +// GetSourceName implements ConfigSource +func (es *EtcdSource) GetSourceName() string { + return "EtcdSource" +} + +func (es *EtcdSource) Close() { + es.intervalDone <- true +} + +func (es *EtcdSource) refreshConfigurations() error { + prefix := es.keyPrefix + "/config" + response, err := es.etcdCli.Get(es.ctx, prefix, clientv3.WithPrefix()) + if err != nil { + return err + } + newConfig := make(map[string]string, len(response.Kvs)) + for _, kv := range response.Kvs { + key := string(kv.Key) + key = strings.TrimPrefix(key, prefix+"/") + newConfig[key] = string(kv.Value) + newConfig[formatKey(key)] = string(kv.Value) + } + return es.updateConfigurationAndFireEvent(newConfig) +} + +func (es *EtcdSource) refreshConfigurationsPeriodically() { + ticker := time.NewTicker(es.refreshInterval) + log.Info("start refreshing configurations") + for { + select { + case <-ticker.C: + err := es.refreshConfigurations() + if err != nil { + log.Error("can not pull configs", zap.Error(err)) + es.intervalDone <- true + } + case <-es.intervalDone: + log.Info("stop refreshing configurations") + return + } + } +} + +func (es *EtcdSource) updateConfigurationAndFireEvent(config map[string]string) error { + es.Lock() + defer es.Unlock() + //Populate the events based on the changed value between current config and newly received Config + events, err := PopulateEvents(es.GetSourceName(), es.currentConfig, config) + if err != nil { + log.Warn("generating event error", zap.Error(err)) + return err + } + es.currentConfig = config + //Generate OnEvent Callback based on the events created + if es.eh != nil { + for _, e := range events { + es.eh.OnEvent(e) + } + } + return nil +} diff --git a/internal/config/event.go b/internal/config/event.go new file mode 100644 index 0000000000..8b7262fa09 --- /dev/null +++ b/internal/config/event.go @@ -0,0 +1,65 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package config + +// Event Constant +const ( + UpdateType = "UPDATE" + DeleteType = "DELETE" + CreateType = "CREATE" +) + +type Event struct { + EventSource string + EventType string + Key string + Value string + HasUpdated bool +} + +func newEvent(eventSource, eventType string, key string, value string) *Event { + return &Event{ + EventSource: eventSource, + EventType: eventType, + Key: key, + Value: value, + HasUpdated: false, + } + +} + +func PopulateEvents(source string, currentConfig, updatedConfig map[string]string) ([]*Event, error) { + events := make([]*Event, 0) + + // generate create and update event + for key, value := range updatedConfig { + currentValue, ok := currentConfig[key] + if !ok { // if new configuration introduced + events = append(events, newEvent(source, CreateType, key, value)) + } else if currentValue != value { + events = append(events, newEvent(source, UpdateType, key, value)) + } + } + + // generate delete event + for key, value := range currentConfig { + _, ok := updatedConfig[key] + if !ok { // when old config not present in new config + events = append(events, newEvent(source, DeleteType, key, value)) + } + } + return events, nil +} diff --git a/internal/config/file_source.go b/internal/config/file_source.go new file mode 100644 index 0000000000..141afce957 --- /dev/null +++ b/internal/config/file_source.go @@ -0,0 +1,119 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "errors" + "fmt" + "os" + "sync" + + "github.com/milvus-io/milvus/internal/log" + "github.com/spf13/cast" + "github.com/spf13/viper" + "go.uber.org/zap" +) + +type FileSource struct { + sync.RWMutex + file string + configs map[string]string +} + +func NewFileSource(file string) *FileSource { + fs := &FileSource{file: file, configs: make(map[string]string)} + fs.loadFromFile() + return fs +} + +// GetConfigurationByKey implements ConfigSource +func (fs *FileSource) GetConfigurationByKey(key string) (string, error) { + v, ok := fs.configs[key] + if !ok { + return "", fmt.Errorf("key not found: %s", key) + } + return v, nil +} + +// GetConfigurations implements ConfigSource +func (fs *FileSource) GetConfigurations() (map[string]string, error) { + configMap := make(map[string]string) + fs.Lock() + defer fs.Unlock() + for k, v := range fs.configs { + configMap[k] = v + } + return configMap, nil +} + +// GetPriority implements ConfigSource +func (fs *FileSource) GetPriority() int { + return LowPriority +} + +// GetSourceName implements ConfigSource +func (fs *FileSource) GetSourceName() string { + return "FileSource" +} + +func (fs *FileSource) Close() { +} + +func (fs *FileSource) loadFromFile() error { + yamlReader := viper.New() + configFile := fs.file + if _, err := os.Stat(configFile); err != nil { + return errors.New("cannot access config file: " + configFile) + } + + yamlReader.SetConfigFile(configFile) + if err := yamlReader.ReadInConfig(); err != nil { + return err + } + + for _, key := range yamlReader.AllKeys() { + val := yamlReader.Get(key) + str, err := cast.ToStringE(val) + if err != nil { + switch val := val.(type) { + case []interface{}: + str = str[:0] + for _, v := range val { + ss, err := cast.ToStringE(v) + if err != nil { + log.Warn("cast to string failed", zap.Any("value", v)) + } + if str == "" { + str = ss + } else { + str = str + "," + ss + } + } + + default: + log.Warn("val is not a slice", zap.Any("value", val)) + continue + } + } + fs.Lock() + fs.configs[key] = str + fs.configs[formatKey(key)] = str + fs.Unlock() + } + + return nil +} diff --git a/internal/config/manager.go b/internal/config/manager.go new file mode 100644 index 0000000000..6660a537bb --- /dev/null +++ b/internal/config/manager.go @@ -0,0 +1,273 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package config + +import ( + "errors" + "fmt" + "sync" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +const ( + TombValue = "TOMB_VALUE" + CustomSourceName = "CustomSource" +) + +type Manager struct { + sync.RWMutex + sources map[string]Source + keySourceMap map[string]string + overlayConfigs map[string]string // store the configs setted or deleted by user +} + +func NewManager() *Manager { + return &Manager{ + sources: make(map[string]Source), + keySourceMap: make(map[string]string), + overlayConfigs: make(map[string]string), + } +} + +func (m *Manager) GetConfig(key string) (string, error) { + m.RLock() + defer m.RUnlock() + realKey := formatKey(key) + v, ok := m.overlayConfigs[realKey] + if ok { + if v == TombValue { + return "", fmt.Errorf("key not found: %s", key) + } + return v, nil + } + sourceName, ok := m.keySourceMap[realKey] + if !ok { + return "", fmt.Errorf("key not found: %s", key) + } + return m.getConfigValueBySource(realKey, sourceName) +} + +// Configs returns all the key values +func (m *Manager) Configs() map[string]string { + m.RLock() + defer m.RLock() + config := make(map[string]string) + + for key, value := range m.keySourceMap { + sValue, err := m.getConfigValueBySource(key, value) + if err != nil { + continue + } + config[key] = sValue + } + + return config +} + +// For compatible reason, only visiable for Test +func (m *Manager) SetConfig(key, value string) { + m.Lock() + defer m.Unlock() + realKey := formatKey(key) + m.overlayConfigs[realKey] = value + m.updateEvent(newEvent(CustomSourceName, CreateType, realKey, value)) +} + +// For compatible reason, only visiable for Test +func (m *Manager) DeleteConfig(key string) { + m.Lock() + defer m.Unlock() + realKey := formatKey(key) + m.overlayConfigs[realKey] = TombValue + m.updateEvent(newEvent(realKey, DeleteType, realKey, "")) +} + +func (m *Manager) Close() { + for _, s := range m.sources { + s.Close() + } +} + +func (m *Manager) AddSource(source Source) error { + m.Lock() + defer m.Unlock() + sourceName := source.GetSourceName() + _, ok := m.sources[sourceName] + if ok { + err := errors.New("duplicate source supplied") + return err + } + + m.sources[sourceName] = source + + err := m.pullSourceConfigs(sourceName) + if err != nil { + err = fmt.Errorf("failed to load %s cause: %x", sourceName, err) + return err + } + + return nil +} + +// Do not use it directly, only used when add source and unittests. +func (m *Manager) pullSourceConfigs(source string) error { + configSource, ok := m.sources[source] + if !ok { + return errors.New("invalid source or source not added") + } + + configs, err := configSource.GetConfigurations() + if err != nil { + log.Error("Get configuration by items failed", zap.Error(err)) + return err + } + + sourcePriority := configSource.GetPriority() + for key := range configs { + sourceName, ok := m.keySourceMap[key] + if !ok { // if key do not exist then add source + m.keySourceMap[key] = source + continue + } + + currentSource, ok := m.sources[sourceName] + if !ok { + m.keySourceMap[key] = source + continue + } + + currentSrcPriority := currentSource.GetPriority() + if currentSrcPriority > sourcePriority { // lesser value has high priority + m.keySourceMap[key] = source + } + } + + return nil +} + +func (m *Manager) getConfigValueBySource(configKey, sourceName string) (string, error) { + source, ok := m.sources[sourceName] + if !ok { + return "", ErrKeyNotFound + } + + return source.GetConfigurationByKey(configKey) +} + +func (m *Manager) updateEvent(e *Event) error { + // refresh all configuration one by one + log.Debug("receive update event", zap.Any("event", e)) + if e.HasUpdated { + return nil + } + switch e.EventType { + case CreateType, UpdateType: + sourceName, ok := m.keySourceMap[e.Key] + if !ok { + m.keySourceMap[e.Key] = e.EventSource + e.EventType = CreateType + } else if sourceName == e.EventSource { + e.EventType = UpdateType + } else if sourceName != e.EventSource { + prioritySrc := m.getHighPrioritySource(sourceName, e.EventSource) + if prioritySrc != nil && prioritySrc.GetSourceName() == sourceName { + // if event generated from less priority source then ignore + log.Info(fmt.Sprintf("the event source %s's priority is less then %s's, ignore", + e.EventSource, sourceName)) + return ErrIgnoreChange + } + m.keySourceMap[e.Key] = e.EventSource + e.EventType = UpdateType + } + + case DeleteType: + sourceName, ok := m.keySourceMap[e.Key] + if !ok || sourceName != e.EventSource { + // if delete event generated from source not maintained ignore it + log.Info(fmt.Sprintf("the event source %s (expect %s) is not maintained, ignore", + e.EventSource, sourceName)) + return ErrIgnoreChange + } else if sourceName == e.EventSource { + // find less priority source or delete key + source := m.findNextBestSource(e.Key, sourceName) + if source == nil { + delete(m.keySourceMap, e.Key) + } else { + m.keySourceMap[e.Key] = source.GetSourceName() + } + } + + } + + e.HasUpdated = true + return nil +} + +// OnEvent Triggers actions when an event is generated +func (m *Manager) OnEvent(event *Event) { + m.Lock() + defer m.Unlock() + err := m.updateEvent(event) + if err != nil { + log.Warn("failed in updating event with error", zap.Error(err), zap.Any("event", event)) + return + } + + // m.dispatcher.DispatchEvent(event) +} + +func (m *Manager) findNextBestSource(key string, sourceName string) Source { + var rSource Source + for _, source := range m.sources { + if source.GetSourceName() == sourceName { + continue + } + _, err := source.GetConfigurationByKey(key) + if err != nil { + continue + } + if rSource == nil { + rSource = source + continue + } + if source.GetPriority() < rSource.GetPriority() { // less value has high priority + rSource = source + } + } + + return rSource +} + +func (m *Manager) getHighPrioritySource(srcNameA, srcNameB string) Source { + sourceA, okA := m.sources[srcNameA] + sourceB, okB := m.sources[srcNameB] + + if !okA && !okB { + return nil + } else if !okA { + return sourceB + } else if !okB { + return sourceA + } + + if sourceA.GetPriority() < sourceB.GetPriority() { //less value has high priority + return sourceA + } + + return sourceB +} diff --git a/internal/config/manager_test.go b/internal/config/manager_test.go new file mode 100644 index 0000000000..c8cfa6ecf9 --- /dev/null +++ b/internal/config/manager_test.go @@ -0,0 +1,73 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAllConfigFromManager(t *testing.T) { + mgr, _ := Init() + all := mgr.Configs() + assert.Equal(t, 0, len(all)) + + mgr, _ = Init(WithEnvSource(formatKey)) + all = mgr.Configs() + assert.Less(t, 0, len(all)) +} + +func TestAllDupliateSource(t *testing.T) { + mgr, _ := Init() + err := mgr.AddSource(NewEnvSource(formatKey)) + assert.Nil(t, err) + err = mgr.AddSource(NewEnvSource(formatKey)) + assert.Error(t, err) + + err = mgr.AddSource(ErrSource{}) + assert.Error(t, err, "error") + + err = mgr.pullSourceConfigs("ErrSource") + assert.Error(t, err, "invalid source or source not added") +} + +type ErrSource struct { +} + +func (e ErrSource) Close() { +} + +func (e ErrSource) GetConfigurationByKey(string) (string, error) { + return "", errors.New("error") +} + +// GetConfigurations implements Source +func (ErrSource) GetConfigurations() (map[string]string, error) { + return nil, errors.New("error") +} + +// GetPriority implements Source +func (ErrSource) GetPriority() int { + return 2 +} + +// GetSourceName implements Source +func (ErrSource) GetSourceName() string { + return "ErrSource" +} diff --git a/internal/config/source.go b/internal/config/source.go new file mode 100644 index 0000000000..f7c491346e --- /dev/null +++ b/internal/config/source.go @@ -0,0 +1,79 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package config + +import "time" + +const ( + HighPriority = 1 + NormalPriority = HighPriority + 10 + LowPriority = NormalPriority + 10 +) + +type Source interface { + GetConfigurations() (map[string]string, error) + GetConfigurationByKey(string) (string, error) + GetPriority() int + GetSourceName() string + Close() +} + +// EventHandler handles config change event +type EventHandler interface { + OnEvent(event *Event) +} + +// EtcdInfo has attribute for config center source initialization +type EtcdInfo struct { + Endpoints []string + KeyPrefix string + + RefreshMode int + //Pull Configuration interval, unit is second + RefreshInterval time.Duration +} + +//Options hold options +type Options struct { + File *string + EtcdInfo *EtcdInfo + EnvKeyFormatter func(string) string +} + +//Option is a func +type Option func(options *Options) + +//WithRequiredFiles tell archaius to manage files, if not exist will return error +func WithFilesSource(f string) Option { + return func(options *Options) { + options.File = &f + } +} + +//WithEtcdSource accept the information for initiating a remote source +func WithEtcdSource(ri *EtcdInfo) Option { + return func(options *Options) { + options.EtcdInfo = ri + } +} + +//WithEnvSource enable env source +//archaius will read ENV as key value +func WithEnvSource(keyFormatter func(string) string) Option { + return func(options *Options) { + options.EnvKeyFormatter = keyFormatter + } +} diff --git a/internal/config/source_test.go b/internal/config/source_test.go new file mode 100644 index 0000000000..68ec514ab5 --- /dev/null +++ b/internal/config/source_test.go @@ -0,0 +1,36 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLoadFromFileSource(t *testing.T) { + t.Run("file not exist", func(t *testing.T) { + fs := NewFileSource("file_not_exist.yaml") + err := fs.loadFromFile() + assert.Error(t, err, "cannot access config file: file_not_exist.yaml") + }) + t.Run("file type not support", func(t *testing.T) { + fs := NewFileSource("../../go.mod") + err := fs.loadFromFile() + assert.Error(t, err) + }) +} diff --git a/internal/core/build.sh b/internal/core/build.sh deleted file mode 100755 index 4973233e2f..0000000000 --- a/internal/core/build.sh +++ /dev/null @@ -1,186 +0,0 @@ -#!/bin/bash - -# Compile jobs variable; Usage: $ jobs=12 ./build.sh ... -if [[ ! ${jobs+1} ]]; then - if command -v nproc &> /dev/null - # For linux - then - jobs=$(nproc) - elif command -v sysctl &> /dev/null - # For macOS - then - jobs=$(sysctl -n hw.logicalcpu) - else - jobs=4 - fi -fi - -SOURCE="${BASH_SOURCE[0]}" -while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symlink - DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" - SOURCE="$(readlink "$SOURCE")" - [[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located -done -SCRIPTS_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" - -BUILD_OUTPUT_DIR="./cmake_build" -BUILD_TYPE="Release" -BUILD_UNITTEST="OFF" -INSTALL_PREFIX="${SCRIPTS_DIR}/output" -MAKE_CLEAN="OFF" -BUILD_COVERAGE="OFF" -PROFILING="OFF" -RUN_CPPLINT="OFF" -CUDA_COMPILER=/usr/local/cuda/bin/nvcc -GPU_VERSION="OFF" #defaults to CPU version -WITH_PROMETHEUS="ON" -CUDA_ARCH="DEFAULT" -CUSTOM_THIRDPARTY_PATH="" - -while getopts "p:t:s:f:o:ulrcghzme" arg; do - case $arg in - f) - CUSTOM_THIRDPARTY_PATH=$OPTARG - ;; - p) - INSTALL_PREFIX=$OPTARG - ;; - o) - BUILD_OUTPUT_DIR=$OPTARG - ;; - t) - BUILD_TYPE=$OPTARG # BUILD_TYPE - ;; - u) - echo "Build and run unittest cases" - BUILD_UNITTEST="ON" - ;; - l) - RUN_CPPLINT="ON" - ;; - r) - if [[ -d ${BUILD_OUTPUT_DIR} ]]; then - MAKE_CLEAN="ON" - fi - ;; - c) - BUILD_COVERAGE="ON" - ;; - z) - PROFILING="ON" - ;; - g) - GPU_VERSION="ON" - ;; - e) - WITH_PROMETHEUS="OFF" - ;; - s) - CUDA_ARCH=$OPTARG - ;; - h) # help - echo " - -parameter: --f: custom paths of thirdparty downloaded files(default: NULL) --p: install prefix(default: $(pwd)/milvus) --d: db data path(default: /tmp/milvus) --t: build type(default: Debug) --u: building unit test options(default: OFF) --l: run cpplint, clang-format and clang-tidy(default: OFF) --r: remove previous build directory(default: OFF) --c: code coverage(default: OFF) --z: profiling(default: OFF) --g: build GPU version(default: OFF) --e: build without prometheus(default: OFF) --s: build with CUDA arch(default:DEFAULT), for example '-gencode=compute_61,code=sm_61;-gencode=compute_75,code=sm_75' --h: help - -usage: -./build.sh -p \${INSTALL_PREFIX} -t \${BUILD_TYPE} -s \${CUDA_ARCH} -f\${CUSTOM_THIRDPARTY_PATH} [-u] [-l] [-r] [-c] [-z] [-g] [-m] [-e] [-h] - " - exit 0 - ;; - ?) - echo "ERROR! unknown argument" - exit 1 - ;; - esac -done - -if [[ ! -d ${BUILD_OUTPUT_DIR} ]]; then - mkdir ${BUILD_OUTPUT_DIR} -fi - -cd ${BUILD_OUTPUT_DIR} - -# remove make cache since build.sh -l use default variables -# force update the variables each time -make rebuild_cache >/dev/null 2>&1 - - -if [[ ${MAKE_CLEAN} == "ON" ]]; then - echo "Runing make clean in ${BUILD_OUTPUT_DIR} ..." - make clean - exit 0 -fi - -unameOut="$(uname -s)" -case "${unameOut}" in - Darwin*) - llvm_prefix="$(brew --prefix llvm)" - export CLANG_TOOLS_PATH="${llvm_prefix}/bin" - export CC="${llvm_prefix}/bin/clang" - export CXX="${llvm_prefix}/bin/clang++" - export LDFLAGS="-L${llvm_prefix}/lib -L/usr/local/opt/libomp/lib" - export CXXFLAGS="-I${llvm_prefix}/include -I/usr/local/include -I/usr/local/opt/libomp/include" - ;; - *) echo "==System:${unameOut}"; -esac - -CMAKE_CMD="cmake \ --DBUILD_UNIT_TEST=${BUILD_UNITTEST} \ --DCMAKE_INSTALL_PREFIX=${INSTALL_PREFIX} --DCMAKE_BUILD_TYPE=${BUILD_TYPE} \ --DOpenBLAS_SOURCE=AUTO \ --DCMAKE_CUDA_COMPILER=${CUDA_COMPILER} \ --DBUILD_COVERAGE=${BUILD_COVERAGE} \ --DENABLE_CPU_PROFILING=${PROFILING} \ --DMILVUS_GPU_VERSION=${GPU_VERSION} \ --DMILVUS_WITH_PROMETHEUS=${WITH_PROMETHEUS} \ --DMILVUS_CUDA_ARCH=${CUDA_ARCH} \ --DCUSTOM_THIRDPARTY_DOWNLOAD_PATH=${CUSTOM_THIRDPARTY_PATH} \ --DKNOWHERE_GPU_VERSION=${SUPPORT_GPU} \ -${SCRIPTS_DIR}" -echo ${CMAKE_CMD} -${CMAKE_CMD} - - -if [[ ${RUN_CPPLINT} == "ON" ]]; then - # cpplint check - make lint - if [ $? -ne 0 ]; then - echo "ERROR! cpplint check failed" - exit 1 - fi - echo "cpplint check passed!" - - # clang-format check - make check-clang-format - if [ $? -ne 0 ]; then - echo "ERROR! clang-format check failed" - exit 1 - fi - echo "clang-format check passed!" - - # clang-tidy check - make check-clang-tidy - if [ $? -ne 0 ]; then - echo "ERROR! clang-tidy check failed" - exit 1 - fi - echo "clang-tidy check passed!" -else - # compile and build - make -j ${jobs} install || exit 1 -fi diff --git a/internal/core/run_clang_format.sh b/internal/core/run_clang_format.sh deleted file mode 100755 index cc2d21eaaa..0000000000 --- a/internal/core/run_clang_format.sh +++ /dev/null @@ -1,18 +0,0 @@ -if [ -z $1 ]; then - echo "usage: $0 " - exit -1 -else - echo start formating -fi -CorePath=$1 - -formatThis() { - find "$1" | grep -E "(*\.cpp|*\.h|*\.cc)$" | grep -v "gen_tools/templates" | grep -v "/thirdparty" | grep -v "\.pb\." | xargs clang-format-10 -i -} - -formatThis "${CorePath}/src" -formatThis "${CorePath}/unittest" -formatThis "${CorePath}/unittest/bench" - -${CorePath}/build-support/add_cpp_license.sh ${CorePath}/build-support/cpp_license.txt ${CorePath} -${CorePath}/build-support/add_cmake_license.sh ${CorePath}/build-support/cmake_license.txt ${CorePath} diff --git a/internal/core/ubuntu_build_deps.sh b/internal/core/ubuntu_build_deps.sh deleted file mode 100755 index 4dc95e6069..0000000000 --- a/internal/core/ubuntu_build_deps.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -sudo apt-get install libtbb-dev libopenblas-dev libopenblas-base diff --git a/internal/kv/etcd/embed_etcd_config_test.go b/internal/kv/etcd/embed_etcd_config_test.go index 3bfd8797ed..156790ba2a 100644 --- a/internal/kv/etcd/embed_etcd_config_test.go +++ b/internal/kv/etcd/embed_etcd_config_test.go @@ -31,12 +31,13 @@ import ( func TestEtcdConfigLoad(te *testing.T) { os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.StandaloneDeployMode) param := new(paramtable.ServiceParam) - param.Init() - param.BaseTable.Save("etcd.use.embed", "true") + + os.Setenv("etcd.use.embed", "true") // TODO, not sure if the relative path works for ci environment - param.BaseTable.Save("etcd.config.path", "../../../configs/advanced/etcd.yaml") - param.BaseTable.Save("etcd.data.dir", "etcd.test.data.dir") - param.EtcdCfg.LoadCfgToMemory() + os.Setenv("etcd.config.path", "../../../configs/advanced/etcd.yaml") + os.Setenv("etcd.data.dir", "etcd.test.data.dir") + + param.Init() //clean up data defer func() { os.RemoveAll("etcd.test.data.dir") diff --git a/internal/kv/etcd/embed_etcd_kv.go b/internal/kv/etcd/embed_etcd_kv.go index 613564caa2..9b06907060 100644 --- a/internal/kv/etcd/embed_etcd_kv.go +++ b/internal/kv/etcd/embed_etcd_kv.go @@ -11,8 +11,8 @@ // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and // limitations under the License. +// See the License for the specific language governing permissions and package etcdkv diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index 9fe09e569e..54eaa88c50 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -33,14 +33,13 @@ import ( func TestEmbedEtcd(te *testing.T) { os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.StandaloneDeployMode) param := new(paramtable.ServiceParam) - param.Init() - param.BaseTable.Save("etcd.use.embed", "true") - param.BaseTable.Save("etcd.config.path", "../../../configs/advanced/etcd.yaml") + os.Setenv("etcd.use.embed", "true") + os.Setenv("etcd.config.path", "../../../configs/advanced/etcd.yaml") dir := te.TempDir() - param.BaseTable.Save("etcd.data.dir", dir) + os.Setenv("etcd.data.dir", dir) - param.EtcdCfg.LoadCfgToMemory() + param.Init() te.Run("EtcdKV SaveAndLoad", func(t *testing.T) { rootPath := "/etcd/test/root/saveandload" diff --git a/internal/kv/mem/mem_kv.go b/internal/kv/mem/mem_kv.go index 7802f45c46..d122eb5512 100644 --- a/internal/kv/mem/mem_kv.go +++ b/internal/kv/mem/mem_kv.go @@ -108,17 +108,6 @@ func (kv *MemoryKV) Get(key string) string { return item.(memoryKVItem).value.String() } -// LoadWithDefault loads an object with @key. If the object does not exist, @defaultValue will be returned. -func (kv *MemoryKV) LoadWithDefault(key, defaultValue string) string { - kv.RLock() - defer kv.RUnlock() - item := kv.tree.Get(memoryKVItem{key: key}) - if item == nil { - return defaultValue - } - return item.(memoryKVItem).value.String() -} - // LoadBytesWithDefault loads an object with @key. If the object does not exist, @defaultValue will be returned. func (kv *MemoryKV) LoadBytesWithDefault(key string, defaultValue []byte) []byte { kv.RLock() @@ -130,23 +119,6 @@ func (kv *MemoryKV) LoadBytesWithDefault(key string, defaultValue []byte) []byte return item.(memoryKVItem).value.ByteSlice() } -// LoadRange loads objects with range @startKey to @endKey with @limit number of objects. -func (kv *MemoryKV) LoadRange(key, endKey string, limit int) ([]string, []string, error) { - kv.RLock() - defer kv.RUnlock() - keys := make([]string, 0, limit) - values := make([]string, 0, limit) - kv.tree.AscendRange(memoryKVItem{key: key}, memoryKVItem{key: endKey}, func(item btree.Item) bool { - keys = append(keys, item.(memoryKVItem).key) - values = append(values, item.(memoryKVItem).value.String()) - if limit > 0 { - return len(keys) < limit - } - return true - }) - return keys, values, nil -} - // LoadBytesRange loads objects with range @startKey to @endKey with @limit number of objects. func (kv *MemoryKV) LoadBytesRange(key, endKey string, limit int) ([]string, [][]byte, error) { kv.RLock() diff --git a/internal/util/paramtable/base_table.go b/internal/util/paramtable/base_table.go index a82034b1db..2ceb8e243f 100644 --- a/internal/util/paramtable/base_table.go +++ b/internal/util/paramtable/base_table.go @@ -13,21 +13,16 @@ package paramtable import ( "fmt" - "net/url" "os" "path" "runtime" "strconv" "strings" "sync" - "syscall" + "time" - "github.com/spf13/cast" - "github.com/spf13/viper" - - memkv "github.com/milvus-io/milvus/internal/kv/mem" + config "github.com/milvus-io/milvus/internal/config" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/util/logutil" "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" @@ -67,8 +62,10 @@ type Base interface { // BaseTable the basics of paramtable type BaseTable struct { - once sync.Once - params *memkv.MemoryKV + once sync.Once + mgr *config.Manager + // params *memkv.MemoryKV + configDir string RoleName string @@ -89,11 +86,51 @@ func (gp *BaseTable) GlobalInitWithYaml(yaml string) { // Init initializes the param table. func (gp *BaseTable) Init() { - gp.params = memkv.NewMemoryKV() + var err error + formatter := func(key string) string { + ret := strings.ToLower(key) + ret = strings.TrimPrefix(ret, "milvus.") + ret = strings.ReplaceAll(ret, "/", "") + ret = strings.ReplaceAll(ret, "_", "") + ret = strings.ReplaceAll(ret, ".", "") + return ret + } + gp.mgr, err = config.Init(config.WithEnvSource(formatter)) + if err != nil { + return + } + gp.configDir = gp.initConfPath() - gp.loadFromYaml(defaultYaml) - gp.tryLoadFromEnv() - gp.InitLogCfg() + configFilePath := gp.configDir + "/" + defaultYaml + gp.mgr, err = config.Init(config.WithEnvSource(formatter), config.WithFilesSource(configFilePath)) + if err != nil { + log.Warn("init baseTable with file failed", zap.String("configFile", configFilePath), zap.Error(err)) + return + } + defer gp.InitLogCfg() + + endpoints, err := gp.mgr.GetConfig("etcd.endpoints") + if err != nil { + log.Info("cannot find etcd.endpoints") + return + } + rootPath, err := gp.mgr.GetConfig("etcd.rootPath") + if err != nil { + log.Info("cannot find etcd.rootPath") + return + } + gp.mgr, err = config.Init(config.WithEnvSource(formatter), + config.WithFilesSource(configFilePath), + config.WithEtcdSource(&config.EtcdInfo{ + Endpoints: strings.Split(endpoints, ","), + KeyPrefix: rootPath, + RefreshMode: config.ModeInterval, + RefreshInterval: 10 * time.Second, + })) + if err != nil { + log.Info("init with etcd failed", zap.Error(err)) + return + } } // GetConfigDir returns the config directory @@ -101,21 +138,10 @@ func (gp *BaseTable) GetConfigDir() string { return gp.configDir } -// LoadFromKVPair saves given kv pair to paramtable -func (gp *BaseTable) LoadFromKVPair(kvPairs []*commonpb.KeyValuePair) error { - for _, pair := range kvPairs { - err := gp.Save(pair.Key, pair.Value) - if err != nil { - return err - } - } - return nil -} - func (gp *BaseTable) initConfPath() string { // check if user set conf dir through env - configDir, find := syscall.Getenv("MILVUSCONF") - if !find { + configDir, err := gp.mgr.GetConfig("MILVUSCONF") + if err != nil { runPath, err := os.Getwd() if err != nil { panic(err) @@ -123,30 +149,15 @@ func (gp *BaseTable) initConfPath() string { configDir = runPath + "/configs/" if _, err := os.Stat(configDir); err != nil { _, fpath, _, _ := runtime.Caller(0) - // TODO, this is a hack, need to find better solution for relative path configDir = path.Dir(fpath) + "/../../../configs/" } } return configDir } -func (gp *BaseTable) loadFromYaml(file string) { - if err := gp.LoadYaml(file); err != nil { - panic(err) - } -} - -func (gp *BaseTable) tryLoadFromEnv() { - gp.loadEtcdConfig() - gp.loadMinioConfig() - gp.loadMQConfig() - gp.loadDataNodeConfig() - gp.loadOtherEnvs() -} - // Load loads an object with @key. func (gp *BaseTable) Load(key string) (string, error) { - return gp.params.Load(strings.ToLower(key)) + return gp.mgr.GetConfig(key) } // LoadWithPriority loads an object with multiple @keys, return the first successful value. @@ -154,7 +165,7 @@ func (gp *BaseTable) Load(key string) (string, error) { // This is to be compatible with old configuration file. func (gp *BaseTable) LoadWithPriority(keys []string) (string, error) { for _, key := range keys { - if str, err := gp.params.Load(strings.ToLower(key)); err == nil { + if str, err := gp.mgr.GetConfig(key); err == nil { return str, nil } } @@ -163,7 +174,11 @@ func (gp *BaseTable) LoadWithPriority(keys []string) (string, error) { // LoadWithDefault loads an object with @key. If the object does not exist, @defaultValue will be returned. func (gp *BaseTable) LoadWithDefault(key, defaultValue string) string { - return gp.params.LoadWithDefault(strings.ToLower(key), defaultValue) + str, err := gp.mgr.GetConfig(key) + if err != nil { + return defaultValue + } + return str } // LoadWithDefault2 loads an object with multiple @keys, return the first successful value. @@ -171,73 +186,32 @@ func (gp *BaseTable) LoadWithDefault(key, defaultValue string) string { // This is to be compatible with old configuration file. func (gp *BaseTable) LoadWithDefault2(keys []string, defaultValue string) string { for _, key := range keys { - if str, err := gp.params.Load(strings.ToLower(key)); err == nil { + str, err := gp.mgr.GetConfig(key) + if err == nil { return str } } return defaultValue } -// LoadRange loads objects with range @startKey to @endKey with @limit number of objects. -func (gp *BaseTable) LoadRange(key, endKey string, limit int) ([]string, []string, error) { - return gp.params.LoadRange(strings.ToLower(key), strings.ToLower(endKey), limit) +func (gp *BaseTable) Get(key string) string { + value, err := gp.mgr.GetConfig(key) + if err != nil { + return "" + } + return value } -func (gp *BaseTable) LoadYaml(fileName string) error { - config := viper.New() - configFile := gp.configDir + fileName - if _, err := os.Stat(configFile); err != nil { - panic("cannot access config file: " + configFile) - } - - config.SetConfigFile(configFile) - if err := config.ReadInConfig(); err != nil { - panic(err) - } - - for _, key := range config.AllKeys() { - val := config.Get(key) - str, err := cast.ToStringE(val) - if err != nil { - switch val := val.(type) { - case []interface{}: - str = str[:0] - for _, v := range val { - ss, err := cast.ToStringE(v) - if err != nil { - panic(err) - } - if str == "" { - str = ss - } else { - str = str + "," + ss - } - } - - default: - panic("undefined config type, key=" + key) - } - } - err = gp.params.Save(strings.ToLower(key), str) - if err != nil { - panic(err) - } - - } - +// For compatible reason, only visiable for Test +func (gp *BaseTable) Remove(key string) error { + gp.mgr.DeleteConfig(key) return nil } -func (gp *BaseTable) Get(key string) string { - return gp.params.Get(strings.ToLower(key)) -} - -func (gp *BaseTable) Remove(key string) error { - return gp.params.Remove(strings.ToLower(key)) -} - +// For compatible reason, only visiable for Test func (gp *BaseTable) Save(key, value string) error { - return gp.params.Save(strings.ToLower(key), value) + gp.mgr.SetConfig(key, value) + return nil } func (gp *BaseTable) ParseBool(key string, defaultValue bool) bool { @@ -427,124 +401,124 @@ func (gp *BaseTable) SetLogger(id UniqueID) { } } -func (gp *BaseTable) loadKafkaConfig() { - brokerList := os.Getenv("KAFKA_BROKER_LIST") - if brokerList == "" { - brokerList = gp.Get("kafka.brokerList") - } - gp.Save("_KafkaBrokerList", brokerList) -} +// func (gp *BaseTable) loadKafkaConfig() { +// brokerList := os.Getenv("KAFKA_BROKER_LIST") +// if brokerList == "" { +// brokerList = gp.Get("kafka.brokerList") +// } +// gp.Save("_KafkaBrokerList", brokerList) +// } -func (gp *BaseTable) loadPulsarConfig() { - pulsarAddress := os.Getenv("PULSAR_ADDRESS") - if pulsarAddress == "" { - pulsarHost := gp.Get("pulsar.address") - port := gp.Get("pulsar.port") - if len(pulsarHost) != 0 && len(port) != 0 { - pulsarAddress = "pulsar://" + pulsarHost + ":" + port - } - } - gp.Save("_PulsarAddress", pulsarAddress) +// func (gp *BaseTable) loadPulsarConfig() { +// pulsarAddress := os.Getenv("PULSAR_ADDRESS") +// if pulsarAddress == "" { +// pulsarHost := gp.Get("pulsar.address") +// port := gp.Get("pulsar.port") +// if len(pulsarHost) != 0 && len(port) != 0 { +// pulsarAddress = "pulsar://" + pulsarHost + ":" + port +// } +// } +// gp.Save("_PulsarAddress", pulsarAddress) - // parse pulsar address to find the host - pulsarURL, err := url.ParseRequestURI(pulsarAddress) - if err != nil { - gp.Save("_PulsarWebAddress", "") - log.Info("failed to parse pulsar config, assume pulsar not used", zap.Error(err)) - return - } - webport := gp.LoadWithDefault("pulsar.webport", "80") - pulsarWebAddress := "http://" + pulsarURL.Hostname() + ":" + webport - gp.Save("_PulsarWebAddress", pulsarWebAddress) - log.Info("Pulsar config", zap.String("pulsar url", pulsarAddress), zap.String("pulsar web url", pulsarWebAddress)) -} +// // parse pulsar address to find the host +// pulsarURL, err := url.ParseRequestURI(pulsarAddress) +// if err != nil { +// gp.Save("_PulsarWebAddress", "") +// log.Info("failed to parse pulsar config, assume pulsar not used", zap.Error(err)) +// return +// } +// webport := gp.LoadWithDefault("pulsar.webport", "80") +// pulsarWebAddress := "http://" + pulsarURL.Hostname() + ":" + webport +// gp.Save("_PulsarWebAddress", pulsarWebAddress) +// log.Info("Pulsar config", zap.String("pulsar url", pulsarAddress), zap.String("pulsar web url", pulsarWebAddress)) +// } -func (gp *BaseTable) loadRocksMQConfig() { - rocksmqPath := os.Getenv("ROCKSMQ_PATH") - if rocksmqPath == "" { - rocksmqPath = gp.Get("rocksmq.path") - } - gp.Save("_RocksmqPath", rocksmqPath) -} +// func (gp *BaseTable) loadRocksMQConfig() { +// rocksmqPath := os.Getenv("ROCKSMQ_PATH") +// if rocksmqPath == "" { +// rocksmqPath = gp.Get("rocksmq.path") +// } +// gp.Save("_RocksmqPath", rocksmqPath) +// } -func (gp *BaseTable) loadMQConfig() { - gp.loadPulsarConfig() - gp.loadKafkaConfig() - gp.loadRocksMQConfig() -} +// func (gp *BaseTable) loadMQConfig() { +// gp.loadPulsarConfig() +// gp.loadKafkaConfig() +// gp.loadRocksMQConfig() +// } -func (gp *BaseTable) loadEtcdConfig() { - etcdEndpoints := os.Getenv("ETCD_ENDPOINTS") - if etcdEndpoints == "" { - etcdEndpoints = gp.LoadWithDefault("etcd.endpoints", DefaultEtcdEndpoints) - } - gp.Save("_EtcdEndpoints", etcdEndpoints) -} +// func (gp *BaseTable) loadEtcdConfig() { +// etcdEndpoints := os.Getenv("ETCD_ENDPOINTS") +// if etcdEndpoints == "" { +// etcdEndpoints = gp.LoadWithDefault("etcd.endpoints", DefaultEtcdEndpoints) +// } +// gp.Save("_EtcdEndpoints", etcdEndpoints) +// } -func (gp *BaseTable) loadMinioConfig() { - minioAddress := os.Getenv("MINIO_ADDRESS") - if minioAddress == "" { - minioHost := gp.LoadWithDefault("minio.address", DefaultMinioHost) - port := gp.LoadWithDefault("minio.port", DefaultMinioPort) - minioAddress = minioHost + ":" + port - } - gp.Save("_MinioAddress", minioAddress) +// func (gp *BaseTable) loadMinioConfig() { +// minioAddress := os.Getenv("MINIO_ADDRESS") +// if minioAddress == "" { +// minioHost := gp.LoadWithDefault("minio.address", DefaultMinioHost) +// port := gp.LoadWithDefault("minio.port", DefaultMinioPort) +// minioAddress = minioHost + ":" + port +// } +// gp.Save("_MinioAddress", minioAddress) - minioAccessKey := os.Getenv("MINIO_ACCESS_KEY") - if minioAccessKey == "" { - minioAccessKey = gp.LoadWithDefault("minio.accessKeyID", DefaultMinioAccessKey) - } - gp.Save("_MinioAccessKeyID", minioAccessKey) +// minioAccessKey := os.Getenv("MINIO_ACCESS_KEY") +// if minioAccessKey == "" { +// minioAccessKey = gp.LoadWithDefault("minio.accessKeyID", DefaultMinioAccessKey) +// } +// gp.Save("_MinioAccessKeyID", minioAccessKey) - minioSecretKey := os.Getenv("MINIO_SECRET_KEY") - if minioSecretKey == "" { - minioSecretKey = gp.LoadWithDefault("minio.secretAccessKey", DefaultMinioSecretAccessKey) - } - gp.Save("_MinioSecretAccessKey", minioSecretKey) +// minioSecretKey := os.Getenv("MINIO_SECRET_KEY") +// if minioSecretKey == "" { +// minioSecretKey = gp.LoadWithDefault("minio.secretAccessKey", DefaultMinioSecretAccessKey) +// } +// gp.Save("_MinioSecretAccessKey", minioSecretKey) - minioUseSSL := os.Getenv("MINIO_USE_SSL") - if minioUseSSL == "" { - minioUseSSL = gp.LoadWithDefault("minio.useSSL", DefaultMinioUseSSL) - } - gp.Save("_MinioUseSSL", minioUseSSL) +// minioUseSSL := os.Getenv("MINIO_USE_SSL") +// if minioUseSSL == "" { +// minioUseSSL = gp.LoadWithDefault("minio.useSSL", DefaultMinioUseSSL) +// } +// gp.Save("_MinioUseSSL", minioUseSSL) - minioBucketName := os.Getenv("MINIO_BUCKET_NAME") - if minioBucketName == "" { - minioBucketName = gp.LoadWithDefault("minio.bucketName", DefaultMinioBucketName) - } - gp.Save("_MinioBucketName", minioBucketName) +// minioBucketName := os.Getenv("MINIO_BUCKET_NAME") +// if minioBucketName == "" { +// minioBucketName = gp.LoadWithDefault("minio.bucketName", DefaultMinioBucketName) +// } +// gp.Save("_MinioBucketName", minioBucketName) - minioUseIAM := os.Getenv("MINIO_USE_IAM") - if minioUseIAM == "" { - minioUseIAM = gp.LoadWithDefault("minio.useIAM", DefaultMinioUseIAM) - } - gp.Save("_MinioUseIAM", minioUseIAM) +// minioUseIAM := os.Getenv("MINIO_USE_IAM") +// if minioUseIAM == "" { +// minioUseIAM = gp.LoadWithDefault("minio.useIAM", DefaultMinioUseIAM) +// } +// gp.Save("_MinioUseIAM", minioUseIAM) - minioIAMEndpoint := os.Getenv("MINIO_IAM_ENDPOINT") - if minioIAMEndpoint == "" { - minioIAMEndpoint = gp.LoadWithDefault("minio.iamEndpoint", DefaultMinioIAMEndpoint) - } - gp.Save("_MinioIAMEndpoint", minioIAMEndpoint) -} +// minioIAMEndpoint := os.Getenv("MINIO_IAM_ENDPOINT") +// if minioIAMEndpoint == "" { +// minioIAMEndpoint = gp.LoadWithDefault("minio.iamEndpoint", DefaultMinioIAMEndpoint) +// } +// gp.Save("_MinioIAMEndpoint", minioIAMEndpoint) +// } -func (gp *BaseTable) loadDataNodeConfig() { - insertBufferFlushSize := os.Getenv("DATA_NODE_IBUFSIZE") - if insertBufferFlushSize == "" { - insertBufferFlushSize = gp.LoadWithDefault("datanode.flush.insertBufSize", DefaultInsertBufferSize) - } - gp.Save("_DATANODE_INSERTBUFSIZE", insertBufferFlushSize) -} +// func (gp *BaseTable) loadDataNodeConfig() { +// insertBufferFlushSize := os.Getenv("DATA_NODE_IBUFSIZE") +// if insertBufferFlushSize == "" { +// insertBufferFlushSize = gp.LoadWithDefault("datanode.flush.insertBufSize", DefaultInsertBufferSize) +// } +// gp.Save("_DATANODE_INSERTBUFSIZE", insertBufferFlushSize) +// } -func (gp *BaseTable) loadOtherEnvs() { - // try to load environment start with ENV_PREFIX - for _, e := range os.Environ() { - parts := strings.SplitN(e, "=", 2) - if strings.Contains(parts[0], DefaultEnvPrefix) { - parts := strings.SplitN(e, "=", 2) - // remove the ENV PREFIX and use the rest as key - keyParts := strings.SplitAfterN(parts[0], ".", 2) - // mem kv throw no errors - gp.Save(keyParts[1], parts[1]) - } - } -} +// func (gp *BaseTable) loadOtherEnvs() { +// // try to load environment start with ENV_PREFIX +// for _, e := range os.Environ() { +// parts := strings.SplitN(e, "=", 2) +// if strings.Contains(parts[0], DefaultEnvPrefix) { +// parts := strings.SplitN(e, "=", 2) +// // remove the ENV PREFIX and use the rest as key +// keyParts := strings.SplitAfterN(parts[0], ".", 2) +// // mem kv throw no errors +// gp.Save(keyParts[1], parts[1]) +// } +// } +// } diff --git a/internal/util/paramtable/base_table_test.go b/internal/util/paramtable/base_table_test.go index 8f92e3e59f..b634d2625c 100644 --- a/internal/util/paramtable/base_table_test.go +++ b/internal/util/paramtable/base_table_test.go @@ -12,13 +12,9 @@ package paramtable import ( - "fmt" "os" "testing" - "path/filepath" - - "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/stretchr/testify/assert" "google.golang.org/grpc/grpclog" ) @@ -60,61 +56,6 @@ func TestBaseTable_SaveAndLoad(t *testing.T) { assert.Nil(t, err6) } -func TestBaseTable_LoadFromKVPair(t *testing.T) { - var kvPairs []*commonpb.KeyValuePair - kvPairs = append(kvPairs, &commonpb.KeyValuePair{Key: "k1", Value: "v1"}, &commonpb.KeyValuePair{Key: "k2", Value: "v2"}) - - err := baseParams.LoadFromKVPair(kvPairs) - assert.Nil(t, err) - - v, err := baseParams.Load("k1") - assert.Nil(t, err) - assert.Equal(t, "v1", v) - - v, err = baseParams.Load("k2") - assert.Nil(t, err) - assert.Equal(t, "v2", v) - - v, err = baseParams.LoadWithPriority([]string{"k2_new"}) - assert.NotNil(t, err) - assert.Equal(t, "", v) - - v, err = baseParams.LoadWithPriority([]string{"k2_new", "k2"}) - assert.Nil(t, err) - assert.Equal(t, "v2", v) - - v = baseParams.LoadWithDefault("k2_new", "v2_new") - assert.Equal(t, "v2_new", v) - - v = baseParams.LoadWithDefault2([]string{"k2_new"}, "v2_new") - assert.Equal(t, "v2_new", v) - - v = baseParams.LoadWithDefault2([]string{"k2_new", "k2"}, "v2_new") - assert.Equal(t, "v2", v) -} - -func TestBaseTable_LoadRange(t *testing.T) { - _ = baseParams.Save("xxxaab", "10") - _ = baseParams.Save("xxxfghz", "20") - _ = baseParams.Save("xxxbcde", "1.1") - _ = baseParams.Save("xxxabcd", "testSaveAndLoad") - _ = baseParams.Save("xxxzhi", "12") - - keys, values, err := baseParams.LoadRange("xxxa", "xxxg", 10) - assert.Nil(t, err) - assert.Equal(t, 4, len(keys)) - assert.Equal(t, "10", values[0]) - assert.Equal(t, "testSaveAndLoad", values[1]) - assert.Equal(t, "1.1", values[2]) - assert.Equal(t, "20", values[3]) - - _ = baseParams.Remove("abc") - _ = baseParams.Remove("fghz") - _ = baseParams.Remove("bcde") - _ = baseParams.Remove("abcd") - _ = baseParams.Remove("zhi") -} - func TestBaseTable_Remove(t *testing.T) { err1 := baseParams.Save("RemoveInt", "10") assert.Nil(t, err1) @@ -146,54 +87,43 @@ func TestBaseTable_Get(t *testing.T) { assert.Equal(t, "", v2) } -func TestBaseTable_LoadYaml(t *testing.T) { - err := baseParams.LoadYaml("milvus.yaml") - assert.Nil(t, err) - assert.Panics(t, func() { baseParams.LoadYaml("advanced/not_exist.yaml") }) - - _, err = baseParams.Load("etcd.endpoints") - assert.Nil(t, err) - _, err = baseParams.Load("pulsar.port") - assert.Nil(t, err) -} - func TestBaseTable_Pulsar(t *testing.T) { //test PULSAR ADDRESS os.Setenv("PULSAR_ADDRESS", "pulsar://localhost:6650") - baseParams.loadPulsarConfig() + baseParams.Init() address := baseParams.Get("_PulsarAddress") assert.Equal(t, "pulsar://localhost:6650", address) } -func TestBaseTable_ConfDir(t *testing.T) { - rightConfig := baseParams.configDir - // fake dir - baseParams.configDir = "./" +// func TestBaseTable_ConfDir(t *testing.T) { +// rightConfig := baseParams.configDir +// // fake dir +// baseParams.configDir = "./" - assert.Panics(t, func() { baseParams.loadFromYaml(defaultYaml) }) +// assert.Panics(t, func() { baseParams.loadFromYaml(defaultYaml) }) - baseParams.configDir = rightConfig - baseParams.loadFromYaml(defaultYaml) - baseParams.GlobalInitWithYaml(defaultYaml) -} +// baseParams.configDir = rightConfig +// baseParams.loadFromYaml(defaultYaml) +// baseParams.GlobalInitWithYaml(defaultYaml) +// } -func TestBateTable_ConfPath(t *testing.T) { - os.Setenv("MILVUSCONF", "test") - config := baseParams.initConfPath() - assert.Equal(t, config, "test") +// func TestBateTable_ConfPath(t *testing.T) { +// os.Setenv("MILVUSCONF", "test") +// config := baseParams.initConfPath() +// assert.Equal(t, config, "test") - os.Unsetenv("MILVUSCONF") - dir, _ := os.Getwd() - config = baseParams.initConfPath() - assert.Equal(t, filepath.Clean(config), filepath.Clean(dir+"/../../../configs/")) +// os.Unsetenv("MILVUSCONF") +// dir, _ := os.Getwd() +// config = baseParams.initConfPath() +// assert.Equal(t, filepath.Clean(config), filepath.Clean(dir+"/../../../configs/")) - // test use get dir - os.Chdir(dir + "/../../../") - defer os.Chdir(dir) - config = baseParams.initConfPath() - assert.Equal(t, filepath.Clean(config), filepath.Clean(dir+"/../../../configs/")) -} +// // test use get dir +// os.Chdir(dir + "/../../../") +// defer os.Chdir(dir) +// config = baseParams.initConfPath() +// assert.Equal(t, filepath.Clean(config), filepath.Clean(dir+"/../../../configs/")) +// } func TestBaseTable_Env(t *testing.T) { os.Setenv("milvus.test", "test") @@ -313,7 +243,6 @@ func Test_SetLogger(t *testing.T) { baseParams.RoleName = "rootcoord" baseParams.Save("log.file.rootPath", ".") baseParams.SetLogger(UniqueID(-1)) - fmt.Println(baseParams.Log.File.Filename) assert.Equal(t, "rootcoord.log", baseParams.Log.File.Filename) baseParams.RoleName = "datanode" diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index fcc1f648e0..4cec32bc2e 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -1137,7 +1137,12 @@ func (p *dataNodeConfig) initFlowGraphMaxParallelism() { } func (p *dataNodeConfig) initFlushInsertBufferSize() { - p.FlushInsertBufferSize = p.Base.ParseInt64("_DATANODE_INSERTBUFSIZE") + bufferSize := p.Base.LoadWithDefault2([]string{"DATA_NODE_IBUFSIZE", "datanode.flush.insertBufSize"}, "0") + bs, err := strconv.ParseInt(bufferSize, 10, 64) + if err != nil { + panic(err) + } + p.FlushInsertBufferSize = bs } func (p *dataNodeConfig) initInsertBinlogRootPath() { diff --git a/internal/util/paramtable/component_param_test.go b/internal/util/paramtable/component_param_test.go index aa5e3a6aa1..f771fbaa9f 100644 --- a/internal/util/paramtable/component_param_test.go +++ b/internal/util/paramtable/component_param_test.go @@ -202,10 +202,6 @@ func TestComponentParam(t *testing.T) { }) }) - t.Run("test queryCoordConfig", func(t *testing.T) { - //Params := CParams.QueryCoordCfg - }) - t.Run("test queryNodeConfig", func(t *testing.T) { Params := CParams.QueryNodeCfg diff --git a/internal/util/paramtable/param_item.go b/internal/util/paramtable/param_item.go new file mode 100644 index 0000000000..19964b65fb --- /dev/null +++ b/internal/util/paramtable/param_item.go @@ -0,0 +1,20 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. +package paramtable + +type ParamItem struct { + Key string + EnvKey string + Version string + Doc string + DefaultValue string + Refreshable bool +} diff --git a/internal/util/paramtable/service_param.go b/internal/util/paramtable/service_param.go index 0a47f59400..7bf369cec7 100644 --- a/internal/util/paramtable/service_param.go +++ b/internal/util/paramtable/service_param.go @@ -17,14 +17,17 @@ package paramtable import ( + "net/url" "os" "path" "strconv" "strings" "sync" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/streamnative/pulsarctl/pkg/cmdutils" + "go.uber.org/zap" ) var pulsarOnce sync.Once @@ -124,7 +127,7 @@ func (p *EtcdConfig) initDataDir() { } func (p *EtcdConfig) initEndpoints() { - endpoints, err := p.Base.Load("_EtcdEndpoints") + endpoints, err := p.Base.Load("etcd.endpoints") if err != nil { panic(err) } @@ -217,19 +220,25 @@ func (p *PulsarConfig) init(base *BaseTable) { } func (p *PulsarConfig) initAddress() { - addr, err := p.Base.Load("_PulsarAddress") - if err != nil { - panic(err) + addr := p.Base.LoadWithDefault("pulsar.address", "localhost") + // for compatible + if strings.Contains(addr, ":") { + p.Address = addr + } else { + port := p.Base.LoadWithDefault("pulsar.port", "6650") + p.Address = "pulsar://" + addr + ":" + port } - p.Address = addr } func (p *PulsarConfig) initWebAddress() { - addr, err := p.Base.Load("_PulsarWebAddress") + pulsarURL, err := url.ParseRequestURI(p.Address) if err != nil { - panic(err) + p.WebAddress = "" + log.Info("failed to parse pulsar config, assume pulsar not used", zap.Error(err)) + } else { + webport := p.Base.LoadWithDefault("pulsar.webport", "80") + p.WebAddress = "http://" + pulsarURL.Hostname() + ":" + webport } - p.WebAddress = addr pulsarOnce.Do(func() { cmdutils.PulsarCtlConfig.WebServiceURL = p.WebAddress }) @@ -269,11 +278,7 @@ func (k *KafkaConfig) init(base *BaseTable) { } func (k *KafkaConfig) initAddress() { - addr, err := k.Base.Load("_KafkaBrokerList") - if err != nil { - panic(err) - } - k.Address = addr + k.Address = k.Base.LoadWithDefault("kafka.brokerList", "") } func (k *KafkaConfig) initSaslUsername() { @@ -307,11 +312,7 @@ func (p *RocksmqConfig) init(base *BaseTable) { } func (p *RocksmqConfig) initPath() { - path, err := p.Base.Load("_RocksmqPath") - if err != nil { - panic(err) - } - p.Path = path + p.Path = p.Base.LoadWithDefault("rocksmq.path", "") } /////////////////////////////////////////////////////////////////////////////// @@ -343,11 +344,17 @@ func (p *MinioConfig) init(base *BaseTable) { } func (p *MinioConfig) initAddress() { - endpoint, err := p.Base.Load("_MinioAddress") + host, err := p.Base.Load("minio.Address") if err != nil { panic(err) } - p.Address = endpoint + // for compatible + if strings.Contains(host, ":") { + p.Address = host + } else { + port := p.Base.LoadWithDefault("minio.port", "9000") + p.Address = host + ":" + port + } } func (p *MinioConfig) initAccessKeyID() { diff --git a/scripts/run_cpp_unittest.sh b/scripts/run_cpp_unittest.sh index 52ec7f3452..d0c577cb9f 100755 --- a/scripts/run_cpp_unittest.sh +++ b/scripts/run_cpp_unittest.sh @@ -29,7 +29,6 @@ SCRIPTS_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" MILVUS_CORE_DIR="${SCRIPTS_DIR}/../internal/core" CORE_INSTALL_PREFIX="${MILVUS_CORE_DIR}/output" UNITTEST_DIRS=("${CORE_INSTALL_PREFIX}/unittest") -CWRAPPER_UNITTEST="${SCRIPTS_DIR}/../internal/storage/cwrapper/output/wrapper_test" # currently core will install target lib to "internal/core/output/lib" if [ -d "${CORE_INSTALL_PREFIX}/lib" ]; then diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index 23d5bfdf24..bb026612ce 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -39,6 +39,7 @@ go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/kv/..." -failfast go test -race -cover ${APPLE_SILICON_FLAG} $(go list "${MILVUS_DIR}/mq/..." | grep -v kafka) -failfast go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/storage" -failfast go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/tso/..." -failfast +go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/config/..." -failfast go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/funcutil/..." -failfast go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/paramtable/..." -failfast go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/retry/..." -failfast