From 15f8dfc7ad3b18167cfbf89203d4b912016bb48e Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Wed, 10 Dec 2025 19:57:13 +0800 Subject: [PATCH] enhance: introduce a tolerance duration to delay the drop operation (#46251) issue: #46214 Signed-off-by: chyezh --- .../wal/recovery/recovery_storage_impl.go | 4 ++++ .../wal/recovery/vchannel_recovery_info.go | 8 +++++++- .../wal/recovery/vchannel_recovery_info_test.go | 4 ++++ pkg/util/paramtable/component_param.go | 17 ++++++++++++++--- pkg/util/paramtable/component_param_test.go | 1 + 5 files changed, 30 insertions(+), 4 deletions(-) diff --git a/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go b/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go index 08d88e22e7..2577645f1e 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go +++ b/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go @@ -133,6 +133,10 @@ func (r *recoveryStorageImpl) GetSchema(ctx context.Context, vchannel string, ti if vchannelInfo, ok := r.vchannels[vchannel]; ok { _, schema := vchannelInfo.GetSchema(timetick) if schema == nil { + r.Logger().DPanic("schema not found, fallback to latest schema", zap.String("vchannel", vchannel), zap.Uint64("timetick", timetick)) + if _, schema = vchannelInfo.GetSchema(0); schema != nil { + return schema, nil + } return nil, errors.Errorf("critical error: schema not found, vchannel: %s, timetick: %d", vchannel, timetick) } return schema, nil diff --git a/internal/streamingnode/server/wal/recovery/vchannel_recovery_info.go b/internal/streamingnode/server/wal/recovery/vchannel_recovery_info.go index 0a32aa2e83..1b3eab7d77 100644 --- a/internal/streamingnode/server/wal/recovery/vchannel_recovery_info.go +++ b/internal/streamingnode/server/wal/recovery/vchannel_recovery_info.go @@ -10,6 +10,8 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message/messageutil" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" ) // newVChannelRecoveryInfoFromCreateCollectionMessage creates a new vchannel recovery info from a create collection message. @@ -103,9 +105,13 @@ func (info *vchannelRecoveryInfo) GetSchema(timetick uint64) (int, *schemapb.Col // UpdateFlushCheckpoint updates the flush checkpoint of the vchannel recovery info. func (info *vchannelRecoveryInfo) UpdateFlushCheckpoint(checkpoint *WALCheckpoint) error { + // Because current L0 may be consuming the data before the flush checkpoint, + // so we introduce a tolerance duration to delay the drop operation of the schema that is not used anymore. + tolerance := paramtable.Get().StreamingCfg.WALRecoverySchemaExpirationTolerance.GetAsDurationByParse() if info.flusherCheckpoint == nil || info.flusherCheckpoint.MessageID.LTE(checkpoint.MessageID) { info.flusherCheckpoint = checkpoint - idx, _ := info.GetSchema(info.flusherCheckpoint.TimeTick) + timetick := tsoutil.AddPhysicalDurationOnTs(info.flusherCheckpoint.TimeTick, -tolerance) + idx, _ := info.GetSchema(timetick) for i := 0; i < idx; i++ { // drop the schema that is not used anymore. // the future GetSchema operation will use the timetick greater than the flusher checkpoint. diff --git a/internal/streamingnode/server/wal/recovery/vchannel_recovery_info_test.go b/internal/streamingnode/server/wal/recovery/vchannel_recovery_info_test.go index 4297aaba55..38e7e37b2f 100644 --- a/internal/streamingnode/server/wal/recovery/vchannel_recovery_info_test.go +++ b/internal/streamingnode/server/wal/recovery/vchannel_recovery_info_test.go @@ -11,6 +11,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/rmq" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) func TestNewVChannelRecoveryInfoFromVChannelMeta(t *testing.T) { @@ -33,6 +34,9 @@ func TestNewVChannelRecoveryInfoFromVChannelMeta(t *testing.T) { } func TestNewVChannelRecoveryInfoFromCreateCollectionMessage(t *testing.T) { + paramtable.Init() + paramtable.Get().StreamingCfg.WALRecoverySchemaExpirationTolerance.SwapTempValue("0") + schema1 := &schemapb.CollectionSchema{ Name: "test-collection-1", } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 260128f3c0..3b07f73aec 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -6376,9 +6376,10 @@ type streamingConfig struct { FlushL0MaxSize ParamItem `refreshable:"true"` // recovery configuration. - WALRecoveryPersistInterval ParamItem `refreshable:"true"` - WALRecoveryMaxDirtyMessage ParamItem `refreshable:"true"` - WALRecoveryGracefulCloseTimeout ParamItem `refreshable:"true"` + WALRecoveryPersistInterval ParamItem `refreshable:"true"` + WALRecoveryMaxDirtyMessage ParamItem `refreshable:"true"` + WALRecoveryGracefulCloseTimeout ParamItem `refreshable:"true"` + WALRecoverySchemaExpirationTolerance ParamItem `refreshable:"true"` } func (p *streamingConfig) init(base *BaseTable) { @@ -6703,6 +6704,16 @@ If that persist operation exceeds this timeout, the wal recovery module will clo Export: true, } p.WALRecoveryGracefulCloseTimeout.Init(base.mgr) + + p.WALRecoverySchemaExpirationTolerance = ParamItem{ + Key: "streaming.walRecovery.schemaExpirationTolerance", + Version: "2.6.8", + Doc: `The tolerance of schema expiration for wal recovery, 24h by default. +If the schema is older than (the channel checkpoint - tolerance), it will be removed forever.`, + DefaultValue: "24h", + Export: false, + } + p.WALRecoverySchemaExpirationTolerance.Init(base.mgr) } // runtimeConfig is just a private environment value table. diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 8c117292c8..cff1ce623f 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -691,6 +691,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 128, params.StreamingCfg.WALReadAheadBufferLength.GetAsInt()) assert.Equal(t, 1*time.Second, params.StreamingCfg.LoggingAppendSlowThreshold.GetAsDurationByParse()) assert.Equal(t, 3*time.Second, params.StreamingCfg.WALRecoveryGracefulCloseTimeout.GetAsDurationByParse()) + assert.Equal(t, 24*time.Hour, params.StreamingCfg.WALRecoverySchemaExpirationTolerance.GetAsDurationByParse()) assert.Equal(t, 100, params.StreamingCfg.WALRecoveryMaxDirtyMessage.GetAsInt()) assert.Equal(t, 10*time.Second, params.StreamingCfg.WALRecoveryPersistInterval.GetAsDurationByParse()) assert.Equal(t, float64(0.6), params.StreamingCfg.FlushMemoryThreshold.GetAsFloat())