From 16d6af85c3e5f68affc4a22ead912eefdaa07eac Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Fri, 9 May 2025 11:16:54 +0800 Subject: [PATCH] fix: panic when use kafka/pulsar/wp for standalone (#41683) issue: #41682 - There's a bug in rocksmq cleanup, #41565 remove the support of nats. Signed-off-by: chyezh --- cmd/roles/roles.go | 14 ++++---- .../util/streamingutil/util/wal_selector.go | 18 +++++----- .../streamingutil/util/wal_selector_test.go | 36 +++++++++---------- 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 249819a697..52e5339f6e 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -42,6 +42,7 @@ import ( kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/internal/util/initcore" internalmetrics "github.com/milvus-io/milvus/internal/util/metrics" + "github.com/milvus-io/milvus/internal/util/streamingutil/util" "github.com/milvus-io/milvus/pkg/v2/config" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" @@ -69,8 +70,11 @@ func init() { metrics.RegisterStorageMetrics(Registry.GoRegistry) } -func stopRocksmq() { - rocksmqimpl.CloseRocksMQ() +// stopRocksmqIfUsed closes the RocksMQ if it is used. +func stopRocksmqIfUsed() { + if name := util.MustSelectWALName(); name == util.WALTypeRocksmq { + rocksmqimpl.CloseRocksMQ() + } } type component interface { @@ -331,11 +335,6 @@ func (mr *MilvusRoles) Run() { } params := paramtable.Get() - if paramtable.Get().RocksmqEnable() { - defer stopRocksmq() - } else { - panic("only support Rocksmq in standalone mode") - } if params.EtcdCfg.UseEmbedEtcd.GetAsBool() { // Start etcd server. etcd.InitEtcdServer( @@ -347,6 +346,7 @@ func (mr *MilvusRoles) Run() { defer etcd.StopEtcdServer() } paramtable.SetRole(typeutil.StandaloneRole) + defer stopRocksmqIfUsed() } else { if err := os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.ClusterDeployMode); err != nil { log.Error("Failed to set deploy mode: ", zap.Error(err)) diff --git a/internal/util/streamingutil/util/wal_selector.go b/internal/util/streamingutil/util/wal_selector.go index 59df8f1f16..f37f7ccb5d 100644 --- a/internal/util/streamingutil/util/wal_selector.go +++ b/internal/util/streamingutil/util/wal_selector.go @@ -9,10 +9,10 @@ import ( const ( walTypeDefault = "default" - walTypeRocksmq = "rocksmq" - walTypeKafka = "kafka" - walTypePulsar = "pulsar" - walTypeWoodpecker = "woodpecker" + WALTypeRocksmq = "rocksmq" + WALTypeKafka = "kafka" + WALTypePulsar = "pulsar" + WALTypeWoodpecker = "woodpecker" ) type walEnable struct { @@ -44,17 +44,17 @@ func mustSelectWALName(standalone bool, mqType string, enable walEnable) string } if standalone { if enable.Rocksmq { - return walTypeRocksmq + return WALTypeRocksmq } } if enable.Pulsar { - return walTypePulsar + return WALTypePulsar } if enable.Kafka { - return walTypeKafka + return WALTypeKafka } if enable.Woodpecker { - return walTypeWoodpecker + return WALTypeWoodpecker } panic(errors.Errorf("no available wal config found, %s, enable: %+v", mqType, enable)) } @@ -64,7 +64,7 @@ func validateWALName(standalone bool, mqType string) error { // we may register more mq type by plugin. // so we should not check all mq type here. // only check standalone type. - if !standalone && mqType == walTypeRocksmq { + if !standalone && mqType == WALTypeRocksmq { return errors.Newf("mq %s is only valid in standalone mode", mqType) } return nil diff --git a/internal/util/streamingutil/util/wal_selector_test.go b/internal/util/streamingutil/util/wal_selector_test.go index a7080ab1be..928d05e4e5 100644 --- a/internal/util/streamingutil/util/wal_selector_test.go +++ b/internal/util/streamingutil/util/wal_selector_test.go @@ -7,27 +7,27 @@ import ( ) func TestValidateWALType(t *testing.T) { - assert.Error(t, validateWALName(false, walTypeRocksmq)) + assert.Error(t, validateWALName(false, WALTypeRocksmq)) } func TestSelectWALType(t *testing.T) { - assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{true, true, true, true}), walTypeRocksmq) - assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, true, true, true}), walTypePulsar) - assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, true, true}), walTypeKafka) - assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, false, true}), walTypeWoodpecker) + assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{true, true, true, true}), WALTypeRocksmq) + assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, true, true, true}), WALTypePulsar) + assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, true, true}), WALTypeKafka) + assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, false, true}), WALTypeWoodpecker) assert.Panics(t, func() { mustSelectWALName(true, walTypeDefault, walEnable{false, false, false, false}) }) - assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{true, true, true, true}), walTypePulsar) - assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true, true}), walTypePulsar) - assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true, true}), walTypePulsar) - assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, true, true}), walTypeKafka) - assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, false, true}), walTypeWoodpecker) + assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{true, true, true, true}), WALTypePulsar) + assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true, true}), WALTypePulsar) + assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true, true}), WALTypePulsar) + assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, true, true}), WALTypeKafka) + assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, false, true}), WALTypeWoodpecker) assert.Panics(t, func() { mustSelectWALName(false, walTypeDefault, walEnable{false, false, false, false}) }) - assert.Equal(t, mustSelectWALName(true, walTypeRocksmq, walEnable{true, true, true, true}), walTypeRocksmq) - assert.Equal(t, mustSelectWALName(true, walTypePulsar, walEnable{true, true, true, true}), walTypePulsar) - assert.Equal(t, mustSelectWALName(true, walTypeKafka, walEnable{true, true, true, true}), walTypeKafka) - assert.Equal(t, mustSelectWALName(true, walTypeWoodpecker, walEnable{true, true, true, true}), walTypeWoodpecker) - assert.Panics(t, func() { mustSelectWALName(false, walTypeRocksmq, walEnable{true, true, true, true}) }) - assert.Equal(t, mustSelectWALName(false, walTypePulsar, walEnable{true, true, true, true}), walTypePulsar) - assert.Equal(t, mustSelectWALName(false, walTypeKafka, walEnable{true, true, true, true}), walTypeKafka) - assert.Equal(t, mustSelectWALName(false, walTypeWoodpecker, walEnable{true, true, true, true}), walTypeWoodpecker) + assert.Equal(t, mustSelectWALName(true, WALTypeRocksmq, walEnable{true, true, true, true}), WALTypeRocksmq) + assert.Equal(t, mustSelectWALName(true, WALTypePulsar, walEnable{true, true, true, true}), WALTypePulsar) + assert.Equal(t, mustSelectWALName(true, WALTypeKafka, walEnable{true, true, true, true}), WALTypeKafka) + assert.Equal(t, mustSelectWALName(true, WALTypeWoodpecker, walEnable{true, true, true, true}), WALTypeWoodpecker) + assert.Panics(t, func() { mustSelectWALName(false, WALTypeRocksmq, walEnable{true, true, true, true}) }) + assert.Equal(t, mustSelectWALName(false, WALTypePulsar, walEnable{true, true, true, true}), WALTypePulsar) + assert.Equal(t, mustSelectWALName(false, WALTypeKafka, walEnable{true, true, true, true}), WALTypeKafka) + assert.Equal(t, mustSelectWALName(false, WALTypeWoodpecker, walEnable{true, true, true, true}), WALTypeWoodpecker) }