From 7d6d279e9caa1296f495023fe6ce7f0f9429f617 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Mon, 22 Dec 2025 21:07:18 +0800 Subject: [PATCH] fix: set enable.auto.commit false to prevent from creating kafka consumer group (#46508) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### **User description** issue: #46507 we use the assign/unassign api to manage the consumer manually, the commit operation will generate a new consumer group which is not what we want. so we disable the auto commit to avoid it, also see: https://github.com/confluentinc/confluent-kafka-python/issues/250#issuecomment-331377925 ___ ### **PR Type** Bug fix ___ ### **Description** - Disable auto-commit in Kafka consumer configuration - Prevents unwanted consumer group creation from manual offset management - Clarifies offset reset behavior with explanatory comments ___ ### Diagram Walkthrough ```mermaid flowchart LR A["Kafka Consumer Config"] --> B["Set enable.auto.commit to false"] B --> C["Prevent auto consumer group creation"] A --> D["Set auto.offset.reset to earliest"] D --> E["Handle deleted offsets gracefully"] ```

File Walkthrough

Relevant files
Bug fix
builder.go
Disable auto-commit and add configuration comments             

pkg/streaming/walimpls/impls/kafka/builder.go
  • Added enable.auto.commit configuration set to false to prevent
    automatic consumer group creation
  • Added explanatory comments for both auto.offset.reset and
    enable.auto.commit settings
  • Clarifies that manual assign/unassign API is used for consumer
    management
+7/-0     
___ ## Summary by CodeRabbit ## Bug Fixes * Kafka consumer now reads from the earliest available messages and auto-commit has been disabled to support manual offset management. ✏️ Tip: You can customize this high-level summary in your review settings. Signed-off-by: chyezh --- pkg/streaming/walimpls/impls/kafka/builder.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/streaming/walimpls/impls/kafka/builder.go b/pkg/streaming/walimpls/impls/kafka/builder.go index bf1139641b..61c7673d6c 100644 --- a/pkg/streaming/walimpls/impls/kafka/builder.go +++ b/pkg/streaming/walimpls/impls/kafka/builder.go @@ -54,7 +54,14 @@ func (b *builderImpl) getConsumerConfig() kafka.ConfigMap { config := ¶mtable.Get().KafkaCfg consumerConfig := getBasicConfig(config) consumerConfig.SetKey("allow.auto.create.topics", true) + // may be the offset is already deleted by the retention policy of kafka. + // so we need to auto reset the offset to the earliest to consume reading. consumerConfig.SetKey("auto.offset.reset", "earliest") + // we use the assign/unassign api to manage the consumer manually, + // the commit operation will generate a new consumer group which is not what we want. + // so we disable the auto commit to avoid it, also see: + // https://github.com/confluentinc/confluent-kafka-python/issues/250#issuecomment-331377925 + consumerConfig.SetKey("enable.auto.commit", false) for k, v := range config.ConsumerExtraConfig.GetValue() { consumerConfig.SetKey(k, v) }