From b6fefee0cf1bed512a83dd991a034755822b106e Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Mon, 1 Apr 2024 00:23:19 -0700 Subject: [PATCH] fix: etcd not connectable when auth enabled (#31633) Fix etcd config source didn't respect auth enabled Also removed pulsar recoverable error when pulsar return ConsumerBusy. It could happen that pulsar didn't find the original consumer is dead and recover takes some time. fix #31631 Signed-off-by: xiaofanluan --- cmd/milvus/mck.go | 5 ++++- pkg/config/etcd_source.go | 5 ++++- pkg/config/source.go | 3 +++ pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go | 5 ----- pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go | 2 +- pkg/util/etcd/etcd_util.go | 1 + pkg/util/paramtable/base_table.go | 3 +++ 7 files changed, 16 insertions(+), 8 deletions(-) diff --git a/cmd/milvus/mck.go b/cmd/milvus/mck.go index 3775764bf0..2347398bf0 100644 --- a/cmd/milvus/mck.go +++ b/cmd/milvus/mck.go @@ -216,8 +216,11 @@ func (c *mck) connectEctd() { if c.etcdIP != "" { etcdCli, err = etcd.GetRemoteEtcdClient([]string{c.etcdIP}) } else { - etcdCli, err = etcd.GetEtcdClient( + etcdCli, err = etcd.CreateEtcdClient( c.params.EtcdCfg.UseEmbedEtcd.GetAsBool(), + c.params.EtcdCfg.EtcdEnableAuth.GetAsBool(), + c.params.EtcdCfg.EtcdAuthUserName.GetValue(), + c.params.EtcdCfg.EtcdAuthPassword.GetValue(), c.params.EtcdCfg.EtcdUseSSL.GetAsBool(), c.params.EtcdCfg.Endpoints.GetAsStrings(), c.params.EtcdCfg.EtcdTLSCert.GetValue(), diff --git a/pkg/config/etcd_source.go b/pkg/config/etcd_source.go index fdfbc2d243..9c87d0fc1c 100644 --- a/pkg/config/etcd_source.go +++ b/pkg/config/etcd_source.go @@ -48,8 +48,11 @@ type EtcdSource struct { func NewEtcdSource(etcdInfo *EtcdInfo) (*EtcdSource, error) { log.Debug("init etcd source", zap.Any("etcdInfo", etcdInfo)) - etcdCli, err := etcd.GetEtcdClient( + etcdCli, err := etcd.CreateEtcdClient( etcdInfo.UseEmbed, + etcdInfo.EnableAuth, + etcdInfo.UserName, + etcdInfo.PassWord, etcdInfo.UseSSL, etcdInfo.Endpoints, etcdInfo.CertFile, diff --git a/pkg/config/source.go b/pkg/config/source.go index 8382915797..6a2cfbae04 100644 --- a/pkg/config/source.go +++ b/pkg/config/source.go @@ -36,6 +36,9 @@ type Source interface { // EtcdInfo has attribute for config center source initialization type EtcdInfo struct { UseEmbed bool + EnableAuth bool + UserName string + PassWord string UseSSL bool Endpoints []string KeyPrefix string diff --git a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go index 3d71d7c76e..8e4e75ea6c 100644 --- a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go +++ b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go @@ -18,7 +18,6 @@ package pulsar import ( "fmt" - "strings" "sync" "time" @@ -31,7 +30,6 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" - "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/timerecord" ) @@ -122,9 +120,6 @@ func (pc *pulsarClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper. }) if err != nil { metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() - if strings.Contains(err.Error(), "ConsumerBusy") { - return nil, retry.Unrecoverable(err) - } return nil, err } diff --git a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go index 270c06e5e5..2bf265d832 100644 --- a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go +++ b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go @@ -668,7 +668,7 @@ func TestPulsarClient_SubscribeExclusiveFail(t *testing.T) { _, err := pc.Subscribe(mqwrapper.ConsumerOptions{Topic: "test_topic_name"}) assert.Error(t, err) - assert.False(t, retry.IsRecoverable(err)) + assert.True(t, retry.IsRecoverable(err)) }) } diff --git a/pkg/util/etcd/etcd_util.go b/pkg/util/etcd/etcd_util.go index b1816e0d9e..be53dc0f26 100644 --- a/pkg/util/etcd/etcd_util.go +++ b/pkg/util/etcd/etcd_util.go @@ -35,6 +35,7 @@ import ( var maxTxnNum = 128 // GetEtcdClient returns etcd client +// should only used for test func GetEtcdClient( useEmbedEtcd bool, useSSL bool, diff --git a/pkg/util/paramtable/base_table.go b/pkg/util/paramtable/base_table.go index 15afbfe378..838ae45cf6 100644 --- a/pkg/util/paramtable/base_table.go +++ b/pkg/util/paramtable/base_table.go @@ -191,6 +191,9 @@ func (bt *BaseTable) initConfigsFromRemote() { } info := &config.EtcdInfo{ UseEmbed: etcdConfig.UseEmbedEtcd.GetAsBool(), + EnableAuth: etcdConfig.EtcdEnableAuth.GetAsBool(), + UserName: etcdConfig.EtcdAuthUserName.GetValue(), + PassWord: etcdConfig.EtcdAuthPassword.GetValue(), UseSSL: etcdConfig.EtcdUseSSL.GetAsBool(), Endpoints: etcdConfig.Endpoints.GetAsStrings(), CertFile: etcdConfig.EtcdTLSCert.GetValue(),