enhance: introduce a tolerance duration to delay the drop operation (#46251)

issue: #46214

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-12-10 19:57:13 +08:00 committed by GitHub
parent 416113d11a
commit 15f8dfc7ad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 30 additions and 4 deletions

View File

@ -133,6 +133,10 @@ func (r *recoveryStorageImpl) GetSchema(ctx context.Context, vchannel string, ti
if vchannelInfo, ok := r.vchannels[vchannel]; ok {
_, schema := vchannelInfo.GetSchema(timetick)
if schema == nil {
r.Logger().DPanic("schema not found, fallback to latest schema", zap.String("vchannel", vchannel), zap.Uint64("timetick", timetick))
if _, schema = vchannelInfo.GetSchema(0); schema != nil {
return schema, nil
}
return nil, errors.Errorf("critical error: schema not found, vchannel: %s, timetick: %d", vchannel, timetick)
}
return schema, nil

View File

@ -10,6 +10,8 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/messageutil"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
)
// newVChannelRecoveryInfoFromCreateCollectionMessage creates a new vchannel recovery info from a create collection message.
@ -103,9 +105,13 @@ func (info *vchannelRecoveryInfo) GetSchema(timetick uint64) (int, *schemapb.Col
// UpdateFlushCheckpoint updates the flush checkpoint of the vchannel recovery info.
func (info *vchannelRecoveryInfo) UpdateFlushCheckpoint(checkpoint *WALCheckpoint) error {
// Because current L0 may be consuming the data before the flush checkpoint,
// so we introduce a tolerance duration to delay the drop operation of the schema that is not used anymore.
tolerance := paramtable.Get().StreamingCfg.WALRecoverySchemaExpirationTolerance.GetAsDurationByParse()
if info.flusherCheckpoint == nil || info.flusherCheckpoint.MessageID.LTE(checkpoint.MessageID) {
info.flusherCheckpoint = checkpoint
idx, _ := info.GetSchema(info.flusherCheckpoint.TimeTick)
timetick := tsoutil.AddPhysicalDurationOnTs(info.flusherCheckpoint.TimeTick, -tolerance)
idx, _ := info.GetSchema(timetick)
for i := 0; i < idx; i++ {
// drop the schema that is not used anymore.
// the future GetSchema operation will use the timetick greater than the flusher checkpoint.

View File

@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/rmq"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
func TestNewVChannelRecoveryInfoFromVChannelMeta(t *testing.T) {
@ -33,6 +34,9 @@ func TestNewVChannelRecoveryInfoFromVChannelMeta(t *testing.T) {
}
func TestNewVChannelRecoveryInfoFromCreateCollectionMessage(t *testing.T) {
paramtable.Init()
paramtable.Get().StreamingCfg.WALRecoverySchemaExpirationTolerance.SwapTempValue("0")
schema1 := &schemapb.CollectionSchema{
Name: "test-collection-1",
}

View File

@ -6376,9 +6376,10 @@ type streamingConfig struct {
FlushL0MaxSize ParamItem `refreshable:"true"`
// recovery configuration.
WALRecoveryPersistInterval ParamItem `refreshable:"true"`
WALRecoveryMaxDirtyMessage ParamItem `refreshable:"true"`
WALRecoveryGracefulCloseTimeout ParamItem `refreshable:"true"`
WALRecoveryPersistInterval ParamItem `refreshable:"true"`
WALRecoveryMaxDirtyMessage ParamItem `refreshable:"true"`
WALRecoveryGracefulCloseTimeout ParamItem `refreshable:"true"`
WALRecoverySchemaExpirationTolerance ParamItem `refreshable:"true"`
}
func (p *streamingConfig) init(base *BaseTable) {
@ -6703,6 +6704,16 @@ If that persist operation exceeds this timeout, the wal recovery module will clo
Export: true,
}
p.WALRecoveryGracefulCloseTimeout.Init(base.mgr)
p.WALRecoverySchemaExpirationTolerance = ParamItem{
Key: "streaming.walRecovery.schemaExpirationTolerance",
Version: "2.6.8",
Doc: `The tolerance of schema expiration for wal recovery, 24h by default.
If the schema is older than (the channel checkpoint - tolerance), it will be removed forever.`,
DefaultValue: "24h",
Export: false,
}
p.WALRecoverySchemaExpirationTolerance.Init(base.mgr)
}
// runtimeConfig is just a private environment value table.

View File

@ -691,6 +691,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 128, params.StreamingCfg.WALReadAheadBufferLength.GetAsInt())
assert.Equal(t, 1*time.Second, params.StreamingCfg.LoggingAppendSlowThreshold.GetAsDurationByParse())
assert.Equal(t, 3*time.Second, params.StreamingCfg.WALRecoveryGracefulCloseTimeout.GetAsDurationByParse())
assert.Equal(t, 24*time.Hour, params.StreamingCfg.WALRecoverySchemaExpirationTolerance.GetAsDurationByParse())
assert.Equal(t, 100, params.StreamingCfg.WALRecoveryMaxDirtyMessage.GetAsInt())
assert.Equal(t, 10*time.Second, params.StreamingCfg.WALRecoveryPersistInterval.GetAsDurationByParse())
assert.Equal(t, float64(0.6), params.StreamingCfg.FlushMemoryThreshold.GetAsFloat())