fix: panic when use kafka/pulsar/wp for standalone (#41683)

issue: #41682

- There's a bug in rocksmq cleanup, #41565 remove the support of nats.

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-05-09 11:16:54 +08:00 committed by GitHub
parent 3dd9a1147b
commit 16d6af85c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 34 additions and 34 deletions

View File

@ -42,6 +42,7 @@ import (
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/internal/util/initcore"
internalmetrics "github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/v2/config"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
@ -69,8 +70,11 @@ func init() {
metrics.RegisterStorageMetrics(Registry.GoRegistry)
}
func stopRocksmq() {
rocksmqimpl.CloseRocksMQ()
// stopRocksmqIfUsed closes the RocksMQ if it is used.
func stopRocksmqIfUsed() {
if name := util.MustSelectWALName(); name == util.WALTypeRocksmq {
rocksmqimpl.CloseRocksMQ()
}
}
type component interface {
@ -331,11 +335,6 @@ func (mr *MilvusRoles) Run() {
}
params := paramtable.Get()
if paramtable.Get().RocksmqEnable() {
defer stopRocksmq()
} else {
panic("only support Rocksmq in standalone mode")
}
if params.EtcdCfg.UseEmbedEtcd.GetAsBool() {
// Start etcd server.
etcd.InitEtcdServer(
@ -347,6 +346,7 @@ func (mr *MilvusRoles) Run() {
defer etcd.StopEtcdServer()
}
paramtable.SetRole(typeutil.StandaloneRole)
defer stopRocksmqIfUsed()
} else {
if err := os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.ClusterDeployMode); err != nil {
log.Error("Failed to set deploy mode: ", zap.Error(err))

View File

@ -9,10 +9,10 @@ import (
const (
walTypeDefault = "default"
walTypeRocksmq = "rocksmq"
walTypeKafka = "kafka"
walTypePulsar = "pulsar"
walTypeWoodpecker = "woodpecker"
WALTypeRocksmq = "rocksmq"
WALTypeKafka = "kafka"
WALTypePulsar = "pulsar"
WALTypeWoodpecker = "woodpecker"
)
type walEnable struct {
@ -44,17 +44,17 @@ func mustSelectWALName(standalone bool, mqType string, enable walEnable) string
}
if standalone {
if enable.Rocksmq {
return walTypeRocksmq
return WALTypeRocksmq
}
}
if enable.Pulsar {
return walTypePulsar
return WALTypePulsar
}
if enable.Kafka {
return walTypeKafka
return WALTypeKafka
}
if enable.Woodpecker {
return walTypeWoodpecker
return WALTypeWoodpecker
}
panic(errors.Errorf("no available wal config found, %s, enable: %+v", mqType, enable))
}
@ -64,7 +64,7 @@ func validateWALName(standalone bool, mqType string) error {
// we may register more mq type by plugin.
// so we should not check all mq type here.
// only check standalone type.
if !standalone && mqType == walTypeRocksmq {
if !standalone && mqType == WALTypeRocksmq {
return errors.Newf("mq %s is only valid in standalone mode", mqType)
}
return nil

View File

@ -7,27 +7,27 @@ import (
)
func TestValidateWALType(t *testing.T) {
assert.Error(t, validateWALName(false, walTypeRocksmq))
assert.Error(t, validateWALName(false, WALTypeRocksmq))
}
func TestSelectWALType(t *testing.T) {
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{true, true, true, true}), walTypeRocksmq)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, true, true}), walTypeKafka)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, false, true}), walTypeWoodpecker)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{true, true, true, true}), WALTypeRocksmq)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, true, true, true}), WALTypePulsar)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, true, true}), WALTypeKafka)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, false, true}), WALTypeWoodpecker)
assert.Panics(t, func() { mustSelectWALName(true, walTypeDefault, walEnable{false, false, false, false}) })
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{true, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, true, true}), walTypeKafka)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, false, true}), walTypeWoodpecker)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{true, true, true, true}), WALTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true, true}), WALTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true, true}), WALTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, true, true}), WALTypeKafka)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, false, true}), WALTypeWoodpecker)
assert.Panics(t, func() { mustSelectWALName(false, walTypeDefault, walEnable{false, false, false, false}) })
assert.Equal(t, mustSelectWALName(true, walTypeRocksmq, walEnable{true, true, true, true}), walTypeRocksmq)
assert.Equal(t, mustSelectWALName(true, walTypePulsar, walEnable{true, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(true, walTypeKafka, walEnable{true, true, true, true}), walTypeKafka)
assert.Equal(t, mustSelectWALName(true, walTypeWoodpecker, walEnable{true, true, true, true}), walTypeWoodpecker)
assert.Panics(t, func() { mustSelectWALName(false, walTypeRocksmq, walEnable{true, true, true, true}) })
assert.Equal(t, mustSelectWALName(false, walTypePulsar, walEnable{true, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeKafka, walEnable{true, true, true, true}), walTypeKafka)
assert.Equal(t, mustSelectWALName(false, walTypeWoodpecker, walEnable{true, true, true, true}), walTypeWoodpecker)
assert.Equal(t, mustSelectWALName(true, WALTypeRocksmq, walEnable{true, true, true, true}), WALTypeRocksmq)
assert.Equal(t, mustSelectWALName(true, WALTypePulsar, walEnable{true, true, true, true}), WALTypePulsar)
assert.Equal(t, mustSelectWALName(true, WALTypeKafka, walEnable{true, true, true, true}), WALTypeKafka)
assert.Equal(t, mustSelectWALName(true, WALTypeWoodpecker, walEnable{true, true, true, true}), WALTypeWoodpecker)
assert.Panics(t, func() { mustSelectWALName(false, WALTypeRocksmq, walEnable{true, true, true, true}) })
assert.Equal(t, mustSelectWALName(false, WALTypePulsar, walEnable{true, true, true, true}), WALTypePulsar)
assert.Equal(t, mustSelectWALName(false, WALTypeKafka, walEnable{true, true, true, true}), WALTypeKafka)
assert.Equal(t, mustSelectWALName(false, WALTypeWoodpecker, walEnable{true, true, true, true}), WALTypeWoodpecker)
}