diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go index 7b434b52c9..bf87b260a7 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go @@ -99,10 +99,7 @@ func newKafkaConsumer(config *kafka.ConfigMap, bufSize int64, topic string, grou func (kc *Consumer) createKafkaConsumer() error { var err error - getPool().Submit(func() (any, error) { - kc.c, err = kafka.NewConsumer(kc.config) - return nil, err - }).Await() + kc.c, err = kafka.NewConsumer(kc.config) if err != nil { log.Error("create kafka consumer failed", zap.String("topic", kc.topic), zap.Error(err)) return err @@ -136,12 +133,7 @@ func (kc *Consumer) Chan() <-chan common.Message { return default: readTimeout := paramtable.Get().KafkaCfg.ReadTimeout.GetAsDuration(time.Second) - var e *kafka.Message - var err error - getPool().Submit(func() (any, error) { - e, err = kc.c.ReadMessage(readTimeout) - return nil, err - }).Await() + e, err := kc.c.ReadMessage(readTimeout) if err != nil { // if we failed to read message in 30 Seconds, print out a warn message since there should always be a tt log.Warn("consume msg failed", zap.String("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(err)) @@ -174,40 +166,38 @@ func (kc *Consumer) Seek(id common.MessageID, inclusive bool) error { } func (kc *Consumer) internalSeek(offset kafka.Offset, inclusive bool) error { - _, err := getPool().Submit(func() (any, error) { - log.Info("kafka consumer seek start", zap.String("topic name", kc.topic), - zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive)) - start := time.Now() - err := kc.c.Assign([]kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}}) - if err != nil { - log.Warn("kafka consumer assign failed ", zap.String("topic name", kc.topic), zap.Any("Msg offset", offset), zap.Error(err)) - return nil, err - } + log.Info("kafka consumer seek start", zap.String("topic name", kc.topic), + zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive)) - cost := time.Since(start).Milliseconds() - if cost > 200 { - log.Warn("kafka consumer assign take too long!", zap.String("topic name", kc.topic), - zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost)) - } + start := time.Now() + err := kc.c.Assign([]kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}}) + if err != nil { + log.Warn("kafka consumer assign failed ", zap.String("topic name", kc.topic), zap.Any("Msg offset", offset), zap.Error(err)) + return err + } - // If seek timeout is not 0 the call twice will return error isStarted RD_KAFKA_RESP_ERR__STATE. - // if the timeout is 0 it will initiate the seek but return immediately without any error reporting - kc.skipMsg = !inclusive - if err := kc.c.Seek(kafka.TopicPartition{ - Topic: &kc.topic, - Partition: mqwrapper.DefaultPartitionIdx, - Offset: offset, - }, timeout); err != nil { - return nil, err - } - cost = time.Since(start).Milliseconds() - log.Info("kafka consumer seek finished", zap.String("topic name", kc.topic), + cost := time.Since(start).Milliseconds() + if cost > 200 { + log.Warn("kafka consumer assign take too long!", zap.String("topic name", kc.topic), zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost)) + } - kc.hasAssign = true - return nil, nil - }).Await() - return err + // If seek timeout is not 0 the call twice will return error isStarted RD_KAFKA_RESP_ERR__STATE. + // if the timeout is 0 it will initiate the seek but return immediately without any error reporting + kc.skipMsg = !inclusive + if err := kc.c.Seek(kafka.TopicPartition{ + Topic: &kc.topic, + Partition: mqwrapper.DefaultPartitionIdx, + Offset: offset, + }, timeout); err != nil { + return err + } + cost = time.Since(start).Milliseconds() + log.Info("kafka consumer seek finished", zap.String("topic name", kc.topic), + zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost)) + + kc.hasAssign = true + return nil } func (kc *Consumer) Ack(message common.Message) { @@ -217,13 +207,7 @@ func (kc *Consumer) Ack(message common.Message) { } func (kc *Consumer) GetLatestMsgID() (common.MessageID, error) { - var low, high int64 - var err error - - getPool().Submit(func() (any, error) { - low, high, err = kc.c.QueryWatermarkOffsets(kc.topic, mqwrapper.DefaultPartitionIdx, timeout) - return nil, err - }).Await() + low, high, err := kc.c.QueryWatermarkOffsets(kc.topic, mqwrapper.DefaultPartitionIdx, timeout) if err != nil { return nil, err } @@ -260,11 +244,7 @@ func (kc *Consumer) CheckTopicValid(topic string) error { func (kc *Consumer) closeInternal() { log.Info("close consumer ", zap.String("topic", kc.topic), zap.String("groupID", kc.groupID)) start := time.Now() - var err error - getPool().Submit(func() (any, error) { - err = kc.c.Close() - return nil, err - }).Await() + err := kc.c.Close() if err != nil { log.Warn("failed to close ", zap.String("topic", kc.topic), zap.Error(err)) } diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go index 2ce921157e..ae5d1a409b 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go @@ -44,15 +44,11 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqcommon.ProducerMes header := kafka.Header{Key: key, Value: []byte(value)} headers = append(headers, header) } - var err error - getPool().Submit(func() (any, error) { - err = kp.p.Produce(&kafka.Message{ - TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx}, - Value: message.Payload, - Headers: headers, - }, kp.deliveryChan) - return nil, err - }) + err := kp.p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx}, + Value: message.Payload, + Headers: headers, + }, kp.deliveryChan) if err != nil { metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc() return nil, err diff --git a/pkg/mq/msgstream/mqwrapper/kafka/pool.go b/pkg/mq/msgstream/mqwrapper/kafka/pool.go deleted file mode 100644 index ad82b0b74f..0000000000 --- a/pkg/mq/msgstream/mqwrapper/kafka/pool.go +++ /dev/null @@ -1,54 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kafka - -import ( - "runtime" - "sync" - - "go.uber.org/atomic" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/conc" - "github.com/milvus-io/milvus/pkg/util/hardware" -) - -var ( - kafkaCPool atomic.Pointer[conc.Pool[any]] - initOnce sync.Once -) - -func initPool() { - initOnce.Do(func() { - pool := conc.NewPool[any]( - hardware.GetCPUNum(), - conc.WithPreAlloc(false), - conc.WithDisablePurge(false), - conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal - ) - - kafkaCPool.Store(pool) - log.Info("init dynamicPool done", zap.Int("size", hardware.GetCPUNum())) - }) -} - -// GetSQPool returns the singleton pool instance for search/query operations. -func getPool() *conc.Pool[any] { - initPool() - return kafkaCPool.Load() -}