fix: Fix L0 segment retention and improve delete buffer logging (#40884)

issue:#40207
related to https://github.com/milvus-io/milvus/pull/39552

- Correct comparison operator in UnRegister from > to >= to prevent
premature release of L0 segments with matching timestamps
- Add detailed logging for segment retention decisions during
unregistration
- Enhance error logging for buffer cleanup operations
- Add trace logs for segment registration/release lifecycle
- Include timestamp comparisons in debug logs for future troubleshooting

    Signed-off-by: Wei Liu <wei.liu@zilliz.com>

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-03-27 11:24:21 +08:00 committed by GitHub
parent 87e7d6d79f
commit 06310a5994
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 75 additions and 5 deletions

View File

@ -549,6 +549,8 @@ func (sd *shardDelegator) rangeHitL0Deletions(partitionID int64, candidate pkora
if segment.Partition() == partitionID || segment.Partition() == common.AllPartitionsID {
segmentPks, segmentTss := segment.DeleteRecords()
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
bfHitRowInL0 := int64(0)
start := time.Now()
for idx := 0; idx < len(segmentPks); idx += batchSize {
endIdx := idx + batchSize
if endIdx > len(segmentPks) {
@ -559,12 +561,22 @@ func (sd *shardDelegator) rangeHitL0Deletions(partitionID int64, candidate pkora
hits := candidate.BatchPkExist(lc)
for i, hit := range hits {
if hit {
bfHitRowInL0 += 1
if err := fn(segmentPks[idx+i], segmentTss[idx+i]); err != nil {
return err
}
}
}
}
log.Info("forward delete to worker...",
zap.Int64("L0SegmentID", segment.ID()),
zap.Int64("segmentID", candidate.ID()),
zap.String("channel", segment.LoadInfo().GetInsertChannel()),
zap.Int("totalDeleteRowsInL0", len(segmentPks)),
zap.Int64("bfHitRowsInL0", bfHitRowInL0),
zap.Int64("bfCost", time.Since(start).Milliseconds()),
)
}
}
return nil
@ -658,6 +670,8 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
// list buffered delete
deleteRecords := sd.deleteBuffer.ListAfter(info.GetStartPosition().GetTimestamp())
tsHitDeleteRows := int64(0)
bfHitDeleteRows := int64(0)
start := time.Now()
for _, entry := range deleteRecords {
for _, record := range entry.Data {
tsHitDeleteRows += int64(len(record.DeleteData.Pks))
@ -676,6 +690,7 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
hits := candidate.BatchPkExist(lc)
for i, hit := range hits {
if hit {
bfHitDeleteRows += 1
err := bufferedForwarder.Buffer(pks[idx+i], record.DeleteData.Tss[idx+i])
if err != nil {
return err
@ -685,6 +700,14 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
}
}
}
log.Info("forward delete to worker...",
zap.String("channel", info.InsertChannel),
zap.Int64("segmentID", info.GetSegmentID()),
zap.Time("startPosition", tsoutil.PhysicalTime(info.GetStartPosition().GetTimestamp())),
zap.Int64("tsHitDeleteRowNum", tsHitDeleteRows),
zap.Int64("bfHitDeleteRowNum", bfHitDeleteRows),
zap.Int64("bfCost", time.Since(start).Milliseconds()),
)
err := bufferedForwarder.Flush()
if err != nil {
return err
@ -992,16 +1015,20 @@ func (sd *shardDelegator) SyncTargetVersion(
sd.distribution.SyncTargetVersion(newVersion, partitions, growingInTarget, sealedInTarget, redundantGrowingIDs)
start := time.Now()
sizeBeforeClean, _ := sd.deleteBuffer.Size()
l0NumBeforeClean := len(sd.deleteBuffer.ListL0())
sd.deleteBuffer.UnRegister(deleteSeekPos.GetTimestamp())
sizeAfterClean, _ := sd.deleteBuffer.Size()
l0NumAfterClean := len(sd.deleteBuffer.ListL0())
if sizeAfterClean < sizeBeforeClean {
if sizeAfterClean < sizeBeforeClean || l0NumAfterClean < l0NumBeforeClean {
log.Info("clean delete buffer",
zap.String("channel", sd.vchannelName),
zap.Time("deleteSeekPos", tsoutil.PhysicalTime(deleteSeekPos.GetTimestamp())),
zap.Time("channelCP", tsoutil.PhysicalTime(checkpoint.GetTimestamp())),
zap.Int64("sizeBeforeClean", sizeBeforeClean),
zap.Int64("sizeAfterClean", sizeAfterClean),
zap.Int("l0NumBeforeClean", l0NumBeforeClean),
zap.Int("l0NumAfterClean", l0NumAfterClean),
zap.Duration("cost", time.Since(start)),
)
}

View File

@ -22,8 +22,11 @@ import (
"sync"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
)
var errBufferFull = errors.New("buffer full")
@ -82,6 +85,10 @@ func (c *doubleCacheBuffer[T]) RegisterL0(segmentList ...segments.Segment) {
for _, seg := range segmentList {
if seg != nil {
c.l0Segments = append(c.l0Segments, seg)
log.Info("register l0 from delete buffer",
zap.Int64("segmentID", seg.ID()),
zap.Time("startPosition", tsoutil.PhysicalTime(seg.StartPosition().GetTimestamp())),
)
}
}
}
@ -100,6 +107,11 @@ func (c *doubleCacheBuffer[T]) UnRegister(ts uint64) {
for _, s := range c.l0Segments {
if s.StartPosition().GetTimestamp() < ts {
s.Release(context.TODO())
log.Info("unregister l0 from delete buffer",
zap.Int64("segmentID", s.ID()),
zap.Time("startPosition", tsoutil.PhysicalTime(s.StartPosition().GetTimestamp())),
zap.Time("cleanTs", tsoutil.PhysicalTime(ts)),
)
continue
}
newSegments = append(newSegments, s)

View File

@ -158,7 +158,11 @@ func (s *DoubleCacheBufferSuite) TestL0SegmentOperations() {
})
seg3 := segments.NewMockSegment(s.T())
seg3.On("ID").Return(int64(3))
seg3.On("Release", mock.Anything).Return()
seg3.On("StartPosition").Return(&msgpb.MsgPosition{
Timestamp: 30,
})
// Test RegisterL0 with multiple segments
buffer.RegisterL0(seg1, seg2)
@ -177,7 +181,10 @@ func (s *DoubleCacheBufferSuite) TestL0SegmentOperations() {
s.Equal(0, len(emptyBuffer.ListL0()))
// Test UnRegister
buffer.UnRegister(15)
buffer.UnRegister(seg1.StartPosition().GetTimestamp())
segments = buffer.ListL0()
s.Equal(2, len(segments))
buffer.UnRegister(seg1.StartPosition().GetTimestamp() + 1)
segments = buffer.ListL0()
s.Equal(1, len(segments))
s.Equal(int64(2), segments[0].ID())

View File

@ -21,9 +21,12 @@ import (
"sync"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
)
func NewListDeleteBuffer[T timed](startTs uint64, sizePerBlock int64, labels []string) DeleteBuffer[T] {
@ -65,6 +68,10 @@ func (b *listDeleteBuffer[T]) RegisterL0(segmentList ...segments.Segment) {
for _, seg := range segmentList {
if seg != nil {
b.l0Segments = append(b.l0Segments, seg)
log.Info("register l0 from delete buffer",
zap.Int64("segmentID", seg.ID()),
zap.Time("startPosition", tsoutil.PhysicalTime(seg.StartPosition().GetTimestamp())),
)
}
}
@ -83,10 +90,15 @@ func (b *listDeleteBuffer[T]) UnRegister(ts uint64) {
var newSegments []segments.Segment
for _, s := range b.l0Segments {
if s.StartPosition().GetTimestamp() > ts {
if s.StartPosition().GetTimestamp() >= ts {
newSegments = append(newSegments, s)
} else {
s.Release(context.TODO())
log.Info("unregister l0 from delete buffer",
zap.Int64("segmentID", s.ID()),
zap.Time("startPosition", tsoutil.PhysicalTime(s.StartPosition().GetTimestamp())),
zap.Time("cleanTs", tsoutil.PhysicalTime(ts)),
)
}
}
b.l0Segments = newSegments

View File

@ -148,7 +148,11 @@ func (s *ListDeleteBufferSuite) TestL0SegmentOperations() {
})
seg3 := segments.NewMockSegment(s.T())
seg3.On("ID").Return(int64(3))
seg3.On("Release", mock.Anything).Return()
seg3.On("StartPosition").Return(&msgpb.MsgPosition{
Timestamp: 30,
})
// Test RegisterL0 with multiple segments
buffer.RegisterL0(seg1, seg2)
@ -167,7 +171,10 @@ func (s *ListDeleteBufferSuite) TestL0SegmentOperations() {
s.Equal(0, len(emptyBuffer.ListL0()))
// Test UnRegister
buffer.UnRegister(15)
buffer.UnRegister(seg1.StartPosition().GetTimestamp())
segments = buffer.ListL0()
s.Equal(2, len(segments))
buffer.UnRegister(seg1.StartPosition().GetTimestamp() + 1)
segments = buffer.ListL0()
s.Equal(1, len(segments))
s.Equal(int64(2), segments[0].ID())

View File

@ -418,6 +418,7 @@ func (s *GrowingMergeL0Suite) TestAddL0ForGrowingBF() {
s.Require().NoError(err)
s.delegator.deleteBuffer.RegisterL0(l0Segment)
seg.EXPECT().ID().Return(1)
seg.EXPECT().Partition().Return(100)
seg.EXPECT().BatchPkExist(mock.Anything).Return(lo.RepeatBy(n, func(i int) bool { return true }))
seg.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, pk storage.PrimaryKeys, u []uint64) error {

View File

@ -1304,7 +1304,11 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
})
})
case querypb.SyncType_UpdateVersion:
log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion()), zap.Int64s("partitions", req.GetLoadMeta().GetPartitionIDs()))
log.Info("sync action",
zap.Int64("TargetVersion", action.GetTargetVersion()),
zap.Time("checkPoint", tsoutil.PhysicalTime(action.GetCheckpoint().GetTimestamp())),
zap.Time("deleteCP", tsoutil.PhysicalTime(action.GetDeleteCP().GetTimestamp())),
zap.Int64s("partitions", req.GetLoadMeta().GetPartitionIDs()))
droppedInfos := lo.SliceToMap(action.GetDroppedInTarget(), func(id int64) (int64, uint64) {
if action.GetCheckpoint() == nil {
return id, typeutil.MaxTimestamp