diff --git a/pkg/config/env_source.go b/pkg/config/env_source.go index 204b64925c..9c2138b5c3 100644 --- a/pkg/config/env_source.go +++ b/pkg/config/env_source.go @@ -80,6 +80,8 @@ func (es EnvSource) GetSourceName() string { func (es EnvSource) SetEventHandler(eh EventHandler) { +} +func (es EnvSource) UpdateOptions(opts Options) { } func (es EnvSource) Close() { diff --git a/pkg/config/etcd_source.go b/pkg/config/etcd_source.go index 6bb6c1d9bb..05769dd01a 100644 --- a/pkg/config/etcd_source.go +++ b/pkg/config/etcd_source.go @@ -112,8 +112,25 @@ func (es *EtcdSource) SetEventHandler(eh EventHandler) { es.configRefresher.eh = eh } +func (es *EtcdSource) UpdateOptions(opts Options) { + if opts.EtcdInfo == nil { + return + } + es.Lock() + defer es.Unlock() + es.keyPrefix = opts.EtcdInfo.KeyPrefix + if es.configRefresher.refreshInterval != opts.EtcdInfo.RefreshInterval { + es.configRefresher.stop() + es.configRefresher = newRefresher(opts.EtcdInfo.RefreshInterval, es.refreshConfigurations) + es.configRefresher.start(es.GetSourceName()) + } +} + func (es *EtcdSource) refreshConfigurations() error { + es.RLock() prefix := es.keyPrefix + "/config" + es.RUnlock() + ctx, cancel := context.WithTimeout(es.ctx, ReadConfigTimeout) defer cancel() response, err := es.etcdCli.Get(ctx, prefix, clientv3.WithPrefix()) diff --git a/pkg/config/file_source.go b/pkg/config/file_source.go index 4834e29db9..962026e1ad 100644 --- a/pkg/config/file_source.go +++ b/pkg/config/file_source.go @@ -94,10 +94,26 @@ func (fs *FileSource) SetEventHandler(eh EventHandler) { fs.configRefresher.eh = eh } +func (fs *FileSource) UpdateOptions(opts Options) { + if opts.FileInfo == nil { + return + } + + fs.Lock() + defer fs.Unlock() + fs.files = opts.FileInfo.Files +} + func (fs *FileSource) loadFromFile() error { yamlReader := viper.New() newConfig := make(map[string]string) - for _, configFile := range fs.files { + var configFiles []string + + fs.RLock() + configFiles = fs.files + fs.RUnlock() + + for _, configFile := range configFiles { if _, err := os.Stat(configFile); err != nil { continue } @@ -135,6 +151,7 @@ func (fs *FileSource) loadFromFile() error { newConfig[formatKey(key)] = str } } + fs.Lock() defer fs.Unlock() err := fs.configRefresher.fireEvents(fs.GetSourceName(), fs.configs, newConfig) diff --git a/pkg/config/manager.go b/pkg/config/manager.go index b0592ba5dc..f106fe629a 100644 --- a/pkg/config/manager.go +++ b/pkg/config/manager.go @@ -219,6 +219,20 @@ func (m *Manager) ForbidUpdate(key string) { m.forbiddenKeys.Insert(formatKey(key)) } +func (m *Manager) UpdateSourceOptions(opts ...Option) { + m.Lock() + defer m.Unlock() + + var options Options + for _, opt := range opts { + opt(&options) + } + + for _, source := range m.sources { + source.UpdateOptions(options) + } +} + // Do not use it directly, only used when add source and unittests. func (m *Manager) pullSourceConfigs(source string) error { configSource, ok := m.sources[source] diff --git a/pkg/config/manager_test.go b/pkg/config/manager_test.go index 648a4766b6..353faa8edd 100644 --- a/pkg/config/manager_test.go +++ b/pkg/config/manager_test.go @@ -75,3 +75,6 @@ func (ErrSource) GetSourceName() string { func (e ErrSource) SetEventHandler(eh EventHandler) { } + +func (e ErrSource) UpdateOptions(opt Options) { +} diff --git a/pkg/config/source.go b/pkg/config/source.go index 1395684951..4095971e18 100644 --- a/pkg/config/source.go +++ b/pkg/config/source.go @@ -29,6 +29,7 @@ type Source interface { GetPriority() int GetSourceName() string SetEventHandler(eh EventHandler) + UpdateOptions(opt Options) Close() } diff --git a/pkg/util/paramtable/base_table.go b/pkg/util/paramtable/base_table.go index 1c53758681..467bf647d7 100644 --- a/pkg/util/paramtable/base_table.go +++ b/pkg/util/paramtable/base_table.go @@ -86,6 +86,10 @@ func (gp *BaseTable) GlobalInitWithYaml(yaml string) { }) } +func (gp *BaseTable) UpdateSourceOpiotns(opts ...config.Option) { + gp.mgr.UpdateSourceOptions(opts...) +} + // init initializes the param table. // if refreshInterval greater than 0 will auto refresh config from source func (gp *BaseTable) init(refreshInterval int) { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index c4395e2259..690e36de13 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -183,7 +183,7 @@ type commonConfig struct { DataNodeSubName ParamItem `refreshable:"false"` DefaultPartitionName ParamItem `refreshable:"false"` - DefaultIndexName ParamItem `refreshable:"false"` + DefaultIndexName ParamItem `refreshable:"true"` RetentionDuration ParamItem `refreshable:"true"` EntityExpirationTTL ParamItem `refreshable:"true"` @@ -413,7 +413,6 @@ func (p *commonConfig) init(base *BaseTable) { Key: "common.defaultIndexName", Version: "2.0.0", DefaultValue: "_default_idx", - Forbidden: true, Doc: "default index name", Export: true, } diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 6648a07d3b..65cc120306 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -55,7 +55,7 @@ type ServiceParam struct { } func (p *ServiceParam) init() { - p.BaseTable.init(10) + p.BaseTable.init(2) p.LocalStorageCfg.Init(&p.BaseTable) p.MetaStoreCfg.Init(&p.BaseTable) @@ -736,10 +736,10 @@ aliyun (ecs): https://www.alibabacloud.com/help/en/elastic-compute-service/lates Key: "minio.cloudProvider", DefaultValue: DefaultMinioCloudProvider, Version: "2.2.0", - Doc: `Cloud Provider of S3. Supports: "aws", "gcp", "aliyun". + Doc: `Cloud Provider of S3. Supports: "aws", "gcp", "aliyun". You can use "aws" for other cloud provider supports S3 API with signature v4, e.g.: minio You can use "gcp" for other cloud provider supports S3 API with signature v2 -You can use "aliyun" for other cloud provider uses virtual host style bucket +You can use "aliyun" for other cloud provider uses virtual host style bucket When useIAM enabled, only "aws", "gcp", "aliyun" is supported for now`, Export: true, } diff --git a/tests/integration/minicluster.go b/tests/integration/minicluster.go index 1bd6f0b549..0b86906097 100644 --- a/tests/integration/minicluster.go +++ b/tests/integration/minicluster.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -106,7 +107,6 @@ type MiniCluster struct { proxy types.ProxyComponent dataCoord types.DataCoordComponent rootCoord types.RootCoordComponent - //indexCoord types.IndexCoordComponent queryCoord types.QueryCoordComponent queryNodes []types.QueryNodeComponent @@ -116,7 +116,7 @@ type MiniCluster struct { metaWatcher MetaWatcher } -var Params *paramtable.ComponentParam = paramtable.Get() +var params *paramtable.ComponentParam = paramtable.Get() type Option func(cluster *MiniCluster) @@ -124,16 +124,19 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster cluster = &MiniCluster{ ctx: ctx, } - //Params.InitOnce() - Params.Init() + params.Init() cluster.params = DefaultParams() cluster.clusterConfig = DefaultClusterConfig() for _, opt := range opts { opt(cluster) } for k, v := range cluster.params { - Params.Save(k, v) + params.Save(k, v) } + params.UpdateSourceOpiotns(config.WithEtcdSource(&config.EtcdInfo{ + KeyPrefix: cluster.params[EtcdRootPath], + RefreshInterval: 2 * time.Second, + })) if cluster.factory == nil { cluster.factory = dependency.NewDefaultFactory(true) @@ -147,13 +150,13 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster if cluster.etcdCli == nil { var etcdCli *clientv3.Client etcdCli, err = etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), - Params.EtcdCfg.EtcdUseSSL.GetAsBool(), - Params.EtcdCfg.Endpoints.GetAsStrings(), - Params.EtcdCfg.EtcdTLSCert.GetValue(), - Params.EtcdCfg.EtcdTLSKey.GetValue(), - Params.EtcdCfg.EtcdTLSCACert.GetValue(), - Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) + params.EtcdCfg.UseEmbedEtcd.GetAsBool(), + params.EtcdCfg.EtcdUseSSL.GetAsBool(), + params.EtcdCfg.Endpoints.GetAsStrings(), + params.EtcdCfg.EtcdTLSCert.GetValue(), + params.EtcdCfg.EtcdTLSKey.GetValue(), + params.EtcdCfg.EtcdTLSCACert.GetValue(), + params.EtcdCfg.EtcdTLSMinVersion.GetValue()) if err != nil { return nil, err } @@ -448,7 +451,7 @@ func (cluster *MiniCluster) Stop() error { } log.Info("mini cluster indexnodes stopped") - cluster.etcdCli.KV.Delete(cluster.ctx, Params.EtcdCfg.RootPath.GetValue(), clientv3.WithPrefix()) + cluster.etcdCli.KV.Delete(cluster.ctx, params.EtcdCfg.RootPath.GetValue(), clientv3.WithPrefix()) defer cluster.etcdCli.Close() if cluster.chunkManager == nil { @@ -469,9 +472,9 @@ func DefaultParams() map[string]string { EtcdRootPath: testPath, MinioRootPath: testPath, //"runtime.role": typeutil.StandaloneRole, - Params.IntegrationTestCfg.IntegrationMode.Key: "true", - Params.CommonCfg.StorageType.Key: "local", - Params.DataNodeCfg.MemoryForceSyncEnable.Key: "false", // local execution will print too many logs + params.IntegrationTestCfg.IntegrationMode.Key: "true", + params.CommonCfg.StorageType.Key: "local", + params.DataNodeCfg.MemoryForceSyncEnable.Key: "false", // local execution will print too many logs } } diff --git a/tests/integration/refresh_config_test.go b/tests/integration/refresh_config_test.go new file mode 100644 index 0000000000..cfd5ce71b7 --- /dev/null +++ b/tests/integration/refresh_config_test.go @@ -0,0 +1,135 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus-proto/go-api/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/distance" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func TestRefreshPasswordLength(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*180) + defer cancel() + c, err := StartMiniCluster(ctx) + assert.NoError(t, err) + + err = c.Start() + assert.NoError(t, err) + defer func() { + err = c.Stop() + assert.NoError(t, err) + cancel() + }() + + s, err := c.proxy.CreateCredential(ctx, &milvuspb.CreateCredentialRequest{ + Username: "test", + Password: "1234", + }) + log.Debug("first create result", zap.Any("state", s)) + assert.Equal(t, commonpb.ErrorCode_IllegalArgument, s.GetErrorCode()) + + params := paramtable.Get() + c.etcdCli.KV.Put(ctx, fmt.Sprintf("%s/config/proxy/minpasswordlength", params.EtcdCfg.RootPath.GetValue()), "3") + + assert.Eventually(t, func() bool { + s, err = c.proxy.CreateCredential(ctx, &milvuspb.CreateCredentialRequest{ + Username: "test", + Password: "1234", + }) + log.Debug("second create result", zap.Any("state", s)) + return commonpb.ErrorCode_Success == s.GetErrorCode() + }, time.Second*20, time.Millisecond*500) +} + +func TestRefreshDefaultIndexName(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*180) + defer cancel() + c, err := StartMiniCluster(ctx) + assert.NoError(t, err) + + err = c.Start() + assert.NoError(t, err) + defer func() { + err = c.Stop() + assert.NoError(t, err) + cancel() + }() + + params := paramtable.Get() + c.etcdCli.KV.Put(ctx, fmt.Sprintf("%s/config/common/defaultIndexName", params.EtcdCfg.RootPath.GetValue()), "a_index") + + assert.Eventually(t, func() bool { + return params.CommonCfg.DefaultIndexName.GetValue() == "a_index" + }, time.Second*10, time.Millisecond*500) + + dim := 128 + dbName := "default" + collectionName := "test" + rowNum := 100 + + schema := constructSchema("test", 128, true) + marshaledSchema, err := proto.Marshal(schema) + + createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: "default", + CollectionName: "test", + Schema: marshaledSchema, + ShardsNum: 1, + }) + assert.NoError(t, err) + if createCollectionStatus.GetErrorCode() != commonpb.ErrorCode_Success { + log.Warn("createCollectionStatus fail reason", zap.String("reason", createCollectionStatus.GetReason())) + } + assert.Equal(t, createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) + + fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim) + hashKeys := generateHashKeys(rowNum) + _, err = c.proxy.Insert(ctx, &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(rowNum), + }) + assert.NoError(t, err) + + _, err = c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: floatVecField, + ExtraParams: constructIndexParam(dim, IndexFaissIvfFlat, distance.L2), + }) + + s, err := c.proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + assert.Equal(t, commonpb.ErrorCode_Success, s.Status.GetErrorCode()) + assert.Equal(t, 1, len(s.IndexDescriptions)) + assert.Equal(t, "a_index_101", s.IndexDescriptions[0].GetIndexName()) +}