diff --git a/pkg/streaming/walimpls/impls/kafka/builder.go b/pkg/streaming/walimpls/impls/kafka/builder.go index bf1139641b..61c7673d6c 100644 --- a/pkg/streaming/walimpls/impls/kafka/builder.go +++ b/pkg/streaming/walimpls/impls/kafka/builder.go @@ -54,7 +54,14 @@ func (b *builderImpl) getConsumerConfig() kafka.ConfigMap { config := ¶mtable.Get().KafkaCfg consumerConfig := getBasicConfig(config) consumerConfig.SetKey("allow.auto.create.topics", true) + // may be the offset is already deleted by the retention policy of kafka. + // so we need to auto reset the offset to the earliest to consume reading. consumerConfig.SetKey("auto.offset.reset", "earliest") + // we use the assign/unassign api to manage the consumer manually, + // the commit operation will generate a new consumer group which is not what we want. + // so we disable the auto commit to avoid it, also see: + // https://github.com/confluentinc/confluent-kafka-python/issues/250#issuecomment-331377925 + consumerConfig.SetKey("enable.auto.commit", false) for k, v := range config.ConsumerExtraConfig.GetValue() { consumerConfig.SetKey(k, v) }