mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
Fix kafka producer panic by nil (#25691)
Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
This commit is contained in:
parent
8d343bf75a
commit
9fe62cb5f3
@ -150,7 +150,7 @@ type dmlChannels struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePrefixDefault string, chanNumDefault int64) *dmlChannels {
|
func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePrefixDefault string, chanNumDefault int64) *dmlChannels {
|
||||||
params := paramtable.Get().CommonCfg
|
params := ¶mtable.Get().CommonCfg
|
||||||
var (
|
var (
|
||||||
chanNamePrefix string
|
chanNamePrefix string
|
||||||
chanNum int64
|
chanNum int64
|
||||||
@ -347,7 +347,7 @@ func (d *dmlChannels) removeChannels(names ...string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func getChannelName(prefix string, idx int64) string {
|
func getChannelName(prefix string, idx int64) string {
|
||||||
params := paramtable.Get().CommonCfg
|
params := ¶mtable.Get().CommonCfg
|
||||||
if params.PreCreatedTopicEnabled.GetAsBool() {
|
if params.PreCreatedTopicEnabled.GetAsBool() {
|
||||||
return params.TopicNames.GetAsStrings()[idx]
|
return params.TopicNames.GetAsStrings()[idx]
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,7 +15,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||||
)
|
)
|
||||||
|
|
||||||
var Producer *kafka.Producer
|
var producer *kafka.Producer
|
||||||
|
|
||||||
var once sync.Once
|
var once sync.Once
|
||||||
|
|
||||||
@ -85,13 +85,15 @@ func cloneKafkaConfig(config kafka.ConfigMap) *kafka.ConfigMap {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) {
|
func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) {
|
||||||
var err error
|
config := kc.newProducerConfig()
|
||||||
|
producer, err := kafka.NewProducer(config)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("create sync kafka producer failed", zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
once.Do(func() {
|
once.Do(func() {
|
||||||
config := kc.newProducerConfig()
|
|
||||||
Producer, err = kafka.NewProducer(config)
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for e := range Producer.Events() {
|
for e := range producer.Events() {
|
||||||
switch ev := e.(type) {
|
switch ev := e.(type) {
|
||||||
case kafka.Error:
|
case kafka.Error:
|
||||||
// Generic client instance-level errors, such as broker connection failures,
|
// Generic client instance-level errors, such as broker connection failures,
|
||||||
@ -109,12 +111,7 @@ func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) {
|
|||||||
}()
|
}()
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
return producer, nil
|
||||||
log.Error("create sync kafka producer failed", zap.Error(err))
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return Producer, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kc *kafkaClient) newProducerConfig() *kafka.ConfigMap {
|
func (kc *kafkaClient) newProducerConfig() *kafka.ConfigMap {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user