From 990a25e51a432bd07936d0fb341e1e1aa1dd33b9 Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 24 Jul 2025 01:00:54 +0800 Subject: [PATCH] fix: Prevent delete records loss during slow segment loading [QueryNodeV2] (#43527) issue: #42884 Fixes an issue where delete records for a segment are lost from the delete buffer if `load segment` execution on the delegator is too slow, causing `syncTargetVersion` or other cleanup operations to clear them prematurely. Changes include: - Introduced `Pin` and `Unpin` methods in `DeleteBuffer` interface and its implementations (`doubleCacheBuffer`, `listDeleteBuffer`). - Added a `pinnedTimestamps` map to track timestamps protected from cleanup by specific segments. - Modified `LoadSegments` in `shardDelegator` to `Pin` relevant segment delete records before loading and `Unpin` them afterwards. - Added `isPinned` check in `UnRegister` and `TryDiscard` methods of `listDeleteBuffer` to skip cleanup if corresponding timestamps are pinned. - Added comprehensive unit tests for `Pin`, `Unpin`, and `isPinned` functionality, covering basic, multiple pins, concurrent, and edge cases. This ensures the integrity of delete records by preventing their premature removal from the delete buffer during segment loading. Signed-off-by: Wei Liu --- .../querynodev2/delegator/delegator_data.go | 12 + .../delegator/deletebuffer/delete_buffer.go | 55 +++- .../deletebuffer/list_delete_buffer.go | 88 +++++- .../deletebuffer/list_delete_buffer_test.go | 298 ++++++++++++++++++ 4 files changed, 443 insertions(+), 10 deletions(-) diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 37b36b5616..a8790c251b 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -414,6 +414,18 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg return merr.WrapErrServiceInternal("load L0 segment is not supported, l0 segment should only be loaded by watchChannel") } + // pin all segments to prevent delete buffer has been cleaned up during worker load segments + // Note: if delete records is pinned, it will skip cleanup during SyncTargetVersion + // which means after segment is loaded, then delete buffer will be cleaned up by next SyncTargetVersion call + for _, info := range req.GetInfos() { + sd.deleteBuffer.Pin(info.GetStartPosition().GetTimestamp(), info.GetSegmentID()) + } + defer func() { + for _, info := range req.GetInfos() { + sd.deleteBuffer.Unpin(info.GetStartPosition().GetTimestamp(), info.GetSegmentID()) + } + }() + worker, err := sd.workerManager.GetWorker(ctx, targetNodeID) if err != nil { log.Warn("delegator failed to find worker", zap.Error(err)) diff --git a/internal/querynodev2/delegator/deletebuffer/delete_buffer.go b/internal/querynodev2/delegator/deletebuffer/delete_buffer.go index a7f1ebb3be..9410f81f01 100644 --- a/internal/querynodev2/delegator/deletebuffer/delete_buffer.go +++ b/internal/querynodev2/delegator/deletebuffer/delete_buffer.go @@ -55,14 +55,19 @@ type DeleteBuffer[T timed] interface { // clean up delete buffer Clear() + + // Pin/Unpin methods for protecting specific timestamps from cleanup + Pin(ts uint64, segmentID int64) + Unpin(ts uint64, segmentID int64) } func NewDoubleCacheDeleteBuffer[T timed](startTs uint64, maxSize int64) DeleteBuffer[T] { return &doubleCacheBuffer[T]{ - head: newCacheBlock[T](startTs, maxSize), - maxSize: maxSize, - ts: startTs, - l0Segments: make([]segments.Segment, 0), + head: newCacheBlock[T](startTs, maxSize), + maxSize: maxSize, + ts: startTs, + l0Segments: make([]segments.Segment, 0), + pinnedTimestamps: make(map[uint64]map[int64]struct{}), } } @@ -75,6 +80,10 @@ type doubleCacheBuffer[T timed] struct { // maintain l0 segment list l0Segments []segments.Segment + + // track pinned timestamps to prevent cleanup + // map[timestamp]map[segmentID]struct{} - tracks which segments pin which timestamps + pinnedTimestamps map[uint64]map[int64]struct{} } func (c *doubleCacheBuffer[T]) RegisterL0(segmentList ...segments.Segment) { @@ -254,3 +263,41 @@ func (c *cacheBlock[T]) ListAfter(ts uint64) []T { func (c *cacheBlock[T]) Size() (entryNum, memorySize int64) { return c.entryNum, c.size } + +// Pin protects a specific timestamp from being cleaned up by a specific segment +func (c *doubleCacheBuffer[T]) Pin(ts uint64, segmentID int64) { + c.mut.Lock() + defer c.mut.Unlock() + + if c.pinnedTimestamps[ts] == nil { + c.pinnedTimestamps[ts] = make(map[int64]struct{}) + } + c.pinnedTimestamps[ts][segmentID] = struct{}{} + + log.Info("pin timestamp for segment", + zap.Uint64("timestamp", ts), + zap.Int64("segmentID", segmentID), + zap.Time("physicalTime", tsoutil.PhysicalTime(ts)), + ) +} + +// Unpin removes protection for a specific timestamp by a specific segment +func (c *doubleCacheBuffer[T]) Unpin(ts uint64, segmentID int64) { + c.mut.Lock() + defer c.mut.Unlock() + + if segmentMap, exists := c.pinnedTimestamps[ts]; exists { + delete(segmentMap, segmentID) + if len(segmentMap) == 0 { + delete(c.pinnedTimestamps, ts) + } + } + + log.Info("unpin timestamp for segment", + zap.Uint64("timestamp", ts), + zap.Int64("segmentID", segmentID), + zap.Time("physicalTime", tsoutil.PhysicalTime(ts)), + ) + // Note: doubleCacheBuffer doesn't implement cleanup logic in TryDiscard, + // so no cleanup is triggered here +} diff --git a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go index 2ddea74a22..c29ba5a9bf 100644 --- a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go +++ b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go @@ -31,11 +31,12 @@ import ( func NewListDeleteBuffer[T timed](startTs uint64, sizePerBlock int64, labels []string) DeleteBuffer[T] { return &listDeleteBuffer[T]{ - safeTs: startTs, - sizePerBlock: sizePerBlock, - list: []*cacheBlock[T]{newCacheBlock[T](startTs, sizePerBlock)}, - labels: labels, - l0Segments: make([]segments.Segment, 0), + safeTs: startTs, + sizePerBlock: sizePerBlock, + list: []*cacheBlock[T]{newCacheBlock[T](startTs, sizePerBlock)}, + labels: labels, + l0Segments: make([]segments.Segment, 0), + pinnedTimestamps: make(map[uint64]map[int64]struct{}), } } @@ -59,6 +60,10 @@ type listDeleteBuffer[T timed] struct { // maintain l0 segment list l0Segments []segments.Segment + + // track pinned timestamps to prevent cleanup + // map[timestamp]map[segmentID]struct{} - tracks which segments pin which timestamps + pinnedTimestamps map[uint64]map[int64]struct{} } func (b *listDeleteBuffer[T]) RegisterL0(segmentList ...segments.Segment) { @@ -87,8 +92,10 @@ func (b *listDeleteBuffer[T]) ListL0() []segments.Segment { func (b *listDeleteBuffer[T]) UnRegister(ts uint64) { b.mut.Lock() defer b.mut.Unlock() + if b.isPinned(ts) { + return + } var newSegments []segments.Segment - for _, s := range b.l0Segments { if s.StartPosition().GetTimestamp() >= ts { newSegments = append(newSegments, s) @@ -162,6 +169,9 @@ func (b *listDeleteBuffer[T]) SafeTs() uint64 { func (b *listDeleteBuffer[T]) TryDiscard(ts uint64) { b.mut.Lock() defer b.mut.Unlock() + if b.isPinned(ts) { + return + } b.tryCleanDelete(ts) } @@ -190,9 +200,75 @@ func (b *listDeleteBuffer[T]) tryCleanDelete(ts uint64) { } } +// check if any records is pinned before the cleanTs +func (b *listDeleteBuffer[T]) isPinned(cleanTs uint64) bool { + // Check if there are any pinned timestamps before the cleanTs + // If there are pinned timestamps before cleanTs, we should skip cleanup + // because pinning a timestamp protects all data after that timestamp + var pinnedSegments []int64 + var pinnedTimestamp uint64 + for pinnedTs, segmentMap := range b.pinnedTimestamps { + if pinnedTs < cleanTs && len(segmentMap) > 0 { + // Found a pinned timestamp before cleanTs + pinnedTimestamp = pinnedTs + for segmentID := range segmentMap { + pinnedSegments = append(pinnedSegments, segmentID) + } + break + } + } + + if len(pinnedSegments) > 0 { + log.Info("skip cleanup due to pinned timestamp before cleanTs", + zap.Time("pinnedPhysicalTime", tsoutil.PhysicalTime(pinnedTimestamp)), + zap.Time("cleanPhysicalTime", tsoutil.PhysicalTime(cleanTs)), + zap.Int64s("pinningSegmentIDs", pinnedSegments), + zap.Int("segmentCount", len(pinnedSegments)), + ) + return true + } + return false +} + func (b *listDeleteBuffer[T]) Size() (entryNum, memorySize int64) { b.mut.RLock() defer b.mut.RUnlock() return b.rowNum, b.size } + +// Pin protects a specific timestamp from being cleaned up by a specific segment +func (b *listDeleteBuffer[T]) Pin(ts uint64, segmentID int64) { + b.mut.Lock() + defer b.mut.Unlock() + + if b.pinnedTimestamps[ts] == nil { + b.pinnedTimestamps[ts] = make(map[int64]struct{}) + } + b.pinnedTimestamps[ts][segmentID] = struct{}{} + + log.Info("pin timestamp for segment", + zap.Uint64("timestamp", ts), + zap.Int64("segmentID", segmentID), + zap.Time("physicalTime", tsoutil.PhysicalTime(ts)), + ) +} + +// Unpin removes protection for a specific timestamp by a specific segment and triggers cleanup +func (b *listDeleteBuffer[T]) Unpin(ts uint64, segmentID int64) { + b.mut.Lock() + defer b.mut.Unlock() + + if segmentMap, exists := b.pinnedTimestamps[ts]; exists { + delete(segmentMap, segmentID) + if len(segmentMap) == 0 { + delete(b.pinnedTimestamps, ts) + } + } + + log.Info("unpin timestamp for segment", + zap.Uint64("timestamp", ts), + zap.Int64("segmentID", segmentID), + zap.Time("physicalTime", tsoutil.PhysicalTime(ts)), + ) +} diff --git a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go index 40c4ab7553..266f7f3d18 100644 --- a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go +++ b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go @@ -17,8 +17,10 @@ package deletebuffer import ( + "sync" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -200,3 +202,299 @@ func (s *ListDeleteBufferSuite) TestL0SegmentOperations() { func TestListDeleteBuffer(t *testing.T) { suite.Run(t, new(ListDeleteBufferSuite)) } + +func TestListDeleteBuffer_PinUnpinBasic(t *testing.T) { + // Create a new list delete buffer + buffer := NewListDeleteBuffer[*Item](1000, 1, []string{"test1", "test2"}) + + // Add some test data + item1 := &Item{Ts: 1000, Data: []BufferItem{ + { + PartitionID: 200, + DeleteData: storage.DeleteData{ + Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(1)}, + Tss: []uint64{1000}, + RowCount: 1, + }, + }, + }} + item2 := &Item{Ts: 2000, Data: []BufferItem{ + { + PartitionID: 200, + DeleteData: storage.DeleteData{ + Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(2)}, + Tss: []uint64{2000}, + RowCount: 1, + }, + }, + }} + item3 := &Item{Ts: 3000, Data: []BufferItem{ + { + PartitionID: 200, + DeleteData: storage.DeleteData{ + Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(3)}, + Tss: []uint64{3000}, + RowCount: 1, + }, + }, + }} + + buffer.Put(item1) + buffer.Put(item2) + buffer.Put(item3) + + // Verify initial state + entryNum, _ := buffer.Size() + assert.Equal(t, int64(3), entryNum) + + // Pin timestamp 1500 for segment 123 + buffer.Pin(1500, 123) + + // Try to discard data before timestamp 2000 + // This should be skipped because there's a pinned timestamp (1500) before cleanTs (2000) + buffer.TryDiscard(2000) + + // Verify that all data is still there (cleanup was skipped) + entryNum, _ = buffer.Size() + assert.Equal(t, int64(3), entryNum) + + // Unpin timestamp 1500 for segment 123 + buffer.Unpin(1500, 123) + + // Try to discard data before timestamp 2000 again + // Now cleanup should proceed normally + buffer.TryDiscard(2000) + + // Verify that data before 2000 is cleaned up + entryNum, _ = buffer.Size() + assert.Equal(t, int64(2), entryNum) +} + +func TestListDeleteBuffer_MultipleSegmentsPinSameTimestamp(t *testing.T) { + // Create a new list delete buffer + buffer := NewListDeleteBuffer[*Item](1000, 1, []string{"test1", "test2"}) + + // Add test data + item1 := &Item{Ts: 1000, Data: []BufferItem{ + { + PartitionID: 200, + DeleteData: storage.DeleteData{ + Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(1)}, + Tss: []uint64{1000}, + RowCount: 1, + }, + }, + }} + item2 := &Item{Ts: 2000, Data: []BufferItem{ + { + PartitionID: 200, + DeleteData: storage.DeleteData{ + Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(2)}, + Tss: []uint64{2000}, + RowCount: 1, + }, + }, + }} + + buffer.Put(item1) + buffer.Put(item2) + + // Multiple segments pin the same timestamp + buffer.Pin(1500, 123) // Protects data after 1500 + buffer.Pin(1500, 456) // Also protects data after 1500 + + // Try to discard data before timestamp 2000 + // This should be skipped because there's a pinned timestamp (1500) before cleanTs (2000) + buffer.TryDiscard(2000) + + // Verify that all data is still there (cleanup was skipped) + entryNum, _ := buffer.Size() + assert.Equal(t, int64(2), entryNum) // All data should still be there + + // Unpin one segment + buffer.Unpin(1500, 123) + + // Try to discard data before timestamp 2000 + // This should still be skipped because the other segment still has it pinned + buffer.TryDiscard(2000) + + // Verify that data is still there (other segment still has it pinned) + entryNum, _ = buffer.Size() + assert.Equal(t, int64(2), entryNum) // Data should still be there + + // Unpin the other segment + buffer.Unpin(1500, 456) + + // Try to discard data before timestamp 2000 + // Now cleanup should proceed normally + buffer.TryDiscard(2000) + + // Verify that data is now cleaned up + entryNum, _ = buffer.Size() + assert.Equal(t, int64(1), entryNum) // Only data with ts >= 2000 should remain +} + +func TestListDeleteBuffer_PinAfterCleanTs(t *testing.T) { + // Create a new list delete buffer + buffer := NewListDeleteBuffer[*Item](1000, 1, []string{"test1", "test2"}) + + // Add test data + item1 := &Item{Ts: 1000, Data: []BufferItem{ + { + PartitionID: 200, + DeleteData: storage.DeleteData{ + Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(1)}, + Tss: []uint64{1000}, + RowCount: 1, + }, + }, + }} + item2 := &Item{Ts: 2000, Data: []BufferItem{ + { + PartitionID: 200, + DeleteData: storage.DeleteData{ + Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(2)}, + Tss: []uint64{2000}, + RowCount: 1, + }, + }, + }} + item3 := &Item{Ts: 3000, Data: []BufferItem{ + { + PartitionID: 200, + DeleteData: storage.DeleteData{ + Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(3)}, + Tss: []uint64{3000}, + RowCount: 1, + }, + }, + }} + + buffer.Put(item1) + buffer.Put(item2) + buffer.Put(item3) + + // Pin a timestamp that is AFTER the cleanTs + buffer.Pin(2500, 123) // Protects data after 2500 + + // Try to discard data before timestamp 2000 + // This should proceed normally because the pinned timestamp (2500) is AFTER cleanTs (2000) + buffer.TryDiscard(2000) + + // Verify that data before 2000 is cleaned up + entryNum, _ := buffer.Size() + assert.Equal(t, int64(2), entryNum) // Data with ts 2000 and 3000 should remain +} + +// TestListDeleteBuffer_EdgeCases tests edge cases for the pinned functionality +func TestListDeleteBuffer_EdgeCases(t *testing.T) { + // Create a new list delete buffer + buffer := NewListDeleteBuffer[*Item](1000, 1, []string{"test1", "test2"}) + + // Test with empty buffer - TryDiscard should proceed normally + buffer.TryDiscard(2000) + entryNum, _ := buffer.Size() + assert.Equal(t, int64(0), entryNum) // No data to clean + + // Add some data + item1 := &Item{Ts: 1000, Data: []BufferItem{ + { + PartitionID: 200, + DeleteData: storage.DeleteData{ + Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(1)}, + Tss: []uint64{1000}, + RowCount: 1, + }, + }, + }} + + buffer.Put(item1) + + // Pin timestamp equal to data timestamp + buffer.Pin(1000, 123) + + // Try to discard with timestamp equal to pinned timestamp + // This should be skipped because 1000 == 1000 (not < 1000) + buffer.TryDiscard(1000) + + // Verify that data is still there + entryNum, _ = buffer.Size() + assert.Equal(t, int64(1), entryNum) // Data should still be there + + // Try to discard with timestamp greater than pinned timestamp + // This should be skipped because 1000 < 2000 + buffer.TryDiscard(2000) + + // Verify that data is still there + entryNum, _ = buffer.Size() + assert.Equal(t, int64(1), entryNum) // Data should still be there + + // Try to discard with timestamp less than pinned timestamp + // This should proceed normally because 1000 > 500 + buffer.TryDiscard(500) + + // Verify that data is still there (because 500 < 1000, no data to clean) + entryNum, _ = buffer.Size() + assert.Equal(t, int64(1), entryNum) // Data should still be there +} + +// TestListDeleteBuffer_PinUnpinConcurrent tests concurrent pin and unpin operations +func TestListDeleteBuffer_PinUnpinConcurrent(t *testing.T) { + // Create a new list delete buffer + buffer := NewListDeleteBuffer[*Item](1000, 1, []string{"test1", "test2"}) + + // Add test data + item1 := &Item{Ts: 1000, Data: []BufferItem{ + { + PartitionID: 200, + DeleteData: storage.DeleteData{ + Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(1)}, + Tss: []uint64{1000}, + RowCount: 1, + }, + }, + }} + item2 := &Item{Ts: 2000, Data: []BufferItem{ + { + PartitionID: 200, + DeleteData: storage.DeleteData{ + Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(2)}, + Tss: []uint64{2000}, + RowCount: 1, + }, + }, + }} + buffer.Put(item1) + buffer.Put(item2) + + // Test concurrent pin operations + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(segmentID int64) { + defer wg.Done() + buffer.Pin(1500, segmentID) + }(int64(i)) + } + wg.Wait() + + // Try to discard - should be skipped due to pinned timestamp + buffer.TryDiscard(2000) + entryNum, _ := buffer.Size() + assert.Equal(t, int64(2), entryNum) // All data should remain + + // Test concurrent unpin operations + for i := 0; i < 10; i++ { + wg.Add(1) + go func(segmentID int64) { + defer wg.Done() + buffer.Unpin(1500, segmentID) + }(int64(i)) + } + wg.Wait() + + // Try to discard again - should proceed normally + buffer.TryDiscard(2000) + entryNum, _ = buffer.Size() + assert.Equal(t, int64(1), entryNum) // Only data with ts >= 2000 should remain +}