mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
fix: Prevent delete records loss during slow segment loading [QueryNodeV2] (#43527)
issue: #42884 Fixes an issue where delete records for a segment are lost from the delete buffer if `load segment` execution on the delegator is too slow, causing `syncTargetVersion` or other cleanup operations to clear them prematurely. Changes include: - Introduced `Pin` and `Unpin` methods in `DeleteBuffer` interface and its implementations (`doubleCacheBuffer`, `listDeleteBuffer`). - Added a `pinnedTimestamps` map to track timestamps protected from cleanup by specific segments. - Modified `LoadSegments` in `shardDelegator` to `Pin` relevant segment delete records before loading and `Unpin` them afterwards. - Added `isPinned` check in `UnRegister` and `TryDiscard` methods of `listDeleteBuffer` to skip cleanup if corresponding timestamps are pinned. - Added comprehensive unit tests for `Pin`, `Unpin`, and `isPinned` functionality, covering basic, multiple pins, concurrent, and edge cases. This ensures the integrity of delete records by preventing their premature removal from the delete buffer during segment loading. Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
1cf8ed505f
commit
990a25e51a
@ -414,6 +414,18 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
|
|||||||
return merr.WrapErrServiceInternal("load L0 segment is not supported, l0 segment should only be loaded by watchChannel")
|
return merr.WrapErrServiceInternal("load L0 segment is not supported, l0 segment should only be loaded by watchChannel")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pin all segments to prevent delete buffer has been cleaned up during worker load segments
|
||||||
|
// Note: if delete records is pinned, it will skip cleanup during SyncTargetVersion
|
||||||
|
// which means after segment is loaded, then delete buffer will be cleaned up by next SyncTargetVersion call
|
||||||
|
for _, info := range req.GetInfos() {
|
||||||
|
sd.deleteBuffer.Pin(info.GetStartPosition().GetTimestamp(), info.GetSegmentID())
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
for _, info := range req.GetInfos() {
|
||||||
|
sd.deleteBuffer.Unpin(info.GetStartPosition().GetTimestamp(), info.GetSegmentID())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
worker, err := sd.workerManager.GetWorker(ctx, targetNodeID)
|
worker, err := sd.workerManager.GetWorker(ctx, targetNodeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("delegator failed to find worker", zap.Error(err))
|
log.Warn("delegator failed to find worker", zap.Error(err))
|
||||||
|
|||||||
@ -55,14 +55,19 @@ type DeleteBuffer[T timed] interface {
|
|||||||
|
|
||||||
// clean up delete buffer
|
// clean up delete buffer
|
||||||
Clear()
|
Clear()
|
||||||
|
|
||||||
|
// Pin/Unpin methods for protecting specific timestamps from cleanup
|
||||||
|
Pin(ts uint64, segmentID int64)
|
||||||
|
Unpin(ts uint64, segmentID int64)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDoubleCacheDeleteBuffer[T timed](startTs uint64, maxSize int64) DeleteBuffer[T] {
|
func NewDoubleCacheDeleteBuffer[T timed](startTs uint64, maxSize int64) DeleteBuffer[T] {
|
||||||
return &doubleCacheBuffer[T]{
|
return &doubleCacheBuffer[T]{
|
||||||
head: newCacheBlock[T](startTs, maxSize),
|
head: newCacheBlock[T](startTs, maxSize),
|
||||||
maxSize: maxSize,
|
maxSize: maxSize,
|
||||||
ts: startTs,
|
ts: startTs,
|
||||||
l0Segments: make([]segments.Segment, 0),
|
l0Segments: make([]segments.Segment, 0),
|
||||||
|
pinnedTimestamps: make(map[uint64]map[int64]struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,6 +80,10 @@ type doubleCacheBuffer[T timed] struct {
|
|||||||
|
|
||||||
// maintain l0 segment list
|
// maintain l0 segment list
|
||||||
l0Segments []segments.Segment
|
l0Segments []segments.Segment
|
||||||
|
|
||||||
|
// track pinned timestamps to prevent cleanup
|
||||||
|
// map[timestamp]map[segmentID]struct{} - tracks which segments pin which timestamps
|
||||||
|
pinnedTimestamps map[uint64]map[int64]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *doubleCacheBuffer[T]) RegisterL0(segmentList ...segments.Segment) {
|
func (c *doubleCacheBuffer[T]) RegisterL0(segmentList ...segments.Segment) {
|
||||||
@ -254,3 +263,41 @@ func (c *cacheBlock[T]) ListAfter(ts uint64) []T {
|
|||||||
func (c *cacheBlock[T]) Size() (entryNum, memorySize int64) {
|
func (c *cacheBlock[T]) Size() (entryNum, memorySize int64) {
|
||||||
return c.entryNum, c.size
|
return c.entryNum, c.size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Pin protects a specific timestamp from being cleaned up by a specific segment
|
||||||
|
func (c *doubleCacheBuffer[T]) Pin(ts uint64, segmentID int64) {
|
||||||
|
c.mut.Lock()
|
||||||
|
defer c.mut.Unlock()
|
||||||
|
|
||||||
|
if c.pinnedTimestamps[ts] == nil {
|
||||||
|
c.pinnedTimestamps[ts] = make(map[int64]struct{})
|
||||||
|
}
|
||||||
|
c.pinnedTimestamps[ts][segmentID] = struct{}{}
|
||||||
|
|
||||||
|
log.Info("pin timestamp for segment",
|
||||||
|
zap.Uint64("timestamp", ts),
|
||||||
|
zap.Int64("segmentID", segmentID),
|
||||||
|
zap.Time("physicalTime", tsoutil.PhysicalTime(ts)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unpin removes protection for a specific timestamp by a specific segment
|
||||||
|
func (c *doubleCacheBuffer[T]) Unpin(ts uint64, segmentID int64) {
|
||||||
|
c.mut.Lock()
|
||||||
|
defer c.mut.Unlock()
|
||||||
|
|
||||||
|
if segmentMap, exists := c.pinnedTimestamps[ts]; exists {
|
||||||
|
delete(segmentMap, segmentID)
|
||||||
|
if len(segmentMap) == 0 {
|
||||||
|
delete(c.pinnedTimestamps, ts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("unpin timestamp for segment",
|
||||||
|
zap.Uint64("timestamp", ts),
|
||||||
|
zap.Int64("segmentID", segmentID),
|
||||||
|
zap.Time("physicalTime", tsoutil.PhysicalTime(ts)),
|
||||||
|
)
|
||||||
|
// Note: doubleCacheBuffer doesn't implement cleanup logic in TryDiscard,
|
||||||
|
// so no cleanup is triggered here
|
||||||
|
}
|
||||||
|
|||||||
@ -31,11 +31,12 @@ import (
|
|||||||
|
|
||||||
func NewListDeleteBuffer[T timed](startTs uint64, sizePerBlock int64, labels []string) DeleteBuffer[T] {
|
func NewListDeleteBuffer[T timed](startTs uint64, sizePerBlock int64, labels []string) DeleteBuffer[T] {
|
||||||
return &listDeleteBuffer[T]{
|
return &listDeleteBuffer[T]{
|
||||||
safeTs: startTs,
|
safeTs: startTs,
|
||||||
sizePerBlock: sizePerBlock,
|
sizePerBlock: sizePerBlock,
|
||||||
list: []*cacheBlock[T]{newCacheBlock[T](startTs, sizePerBlock)},
|
list: []*cacheBlock[T]{newCacheBlock[T](startTs, sizePerBlock)},
|
||||||
labels: labels,
|
labels: labels,
|
||||||
l0Segments: make([]segments.Segment, 0),
|
l0Segments: make([]segments.Segment, 0),
|
||||||
|
pinnedTimestamps: make(map[uint64]map[int64]struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -59,6 +60,10 @@ type listDeleteBuffer[T timed] struct {
|
|||||||
|
|
||||||
// maintain l0 segment list
|
// maintain l0 segment list
|
||||||
l0Segments []segments.Segment
|
l0Segments []segments.Segment
|
||||||
|
|
||||||
|
// track pinned timestamps to prevent cleanup
|
||||||
|
// map[timestamp]map[segmentID]struct{} - tracks which segments pin which timestamps
|
||||||
|
pinnedTimestamps map[uint64]map[int64]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *listDeleteBuffer[T]) RegisterL0(segmentList ...segments.Segment) {
|
func (b *listDeleteBuffer[T]) RegisterL0(segmentList ...segments.Segment) {
|
||||||
@ -87,8 +92,10 @@ func (b *listDeleteBuffer[T]) ListL0() []segments.Segment {
|
|||||||
func (b *listDeleteBuffer[T]) UnRegister(ts uint64) {
|
func (b *listDeleteBuffer[T]) UnRegister(ts uint64) {
|
||||||
b.mut.Lock()
|
b.mut.Lock()
|
||||||
defer b.mut.Unlock()
|
defer b.mut.Unlock()
|
||||||
|
if b.isPinned(ts) {
|
||||||
|
return
|
||||||
|
}
|
||||||
var newSegments []segments.Segment
|
var newSegments []segments.Segment
|
||||||
|
|
||||||
for _, s := range b.l0Segments {
|
for _, s := range b.l0Segments {
|
||||||
if s.StartPosition().GetTimestamp() >= ts {
|
if s.StartPosition().GetTimestamp() >= ts {
|
||||||
newSegments = append(newSegments, s)
|
newSegments = append(newSegments, s)
|
||||||
@ -162,6 +169,9 @@ func (b *listDeleteBuffer[T]) SafeTs() uint64 {
|
|||||||
func (b *listDeleteBuffer[T]) TryDiscard(ts uint64) {
|
func (b *listDeleteBuffer[T]) TryDiscard(ts uint64) {
|
||||||
b.mut.Lock()
|
b.mut.Lock()
|
||||||
defer b.mut.Unlock()
|
defer b.mut.Unlock()
|
||||||
|
if b.isPinned(ts) {
|
||||||
|
return
|
||||||
|
}
|
||||||
b.tryCleanDelete(ts)
|
b.tryCleanDelete(ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -190,9 +200,75 @@ func (b *listDeleteBuffer[T]) tryCleanDelete(ts uint64) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check if any records is pinned before the cleanTs
|
||||||
|
func (b *listDeleteBuffer[T]) isPinned(cleanTs uint64) bool {
|
||||||
|
// Check if there are any pinned timestamps before the cleanTs
|
||||||
|
// If there are pinned timestamps before cleanTs, we should skip cleanup
|
||||||
|
// because pinning a timestamp protects all data after that timestamp
|
||||||
|
var pinnedSegments []int64
|
||||||
|
var pinnedTimestamp uint64
|
||||||
|
for pinnedTs, segmentMap := range b.pinnedTimestamps {
|
||||||
|
if pinnedTs < cleanTs && len(segmentMap) > 0 {
|
||||||
|
// Found a pinned timestamp before cleanTs
|
||||||
|
pinnedTimestamp = pinnedTs
|
||||||
|
for segmentID := range segmentMap {
|
||||||
|
pinnedSegments = append(pinnedSegments, segmentID)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pinnedSegments) > 0 {
|
||||||
|
log.Info("skip cleanup due to pinned timestamp before cleanTs",
|
||||||
|
zap.Time("pinnedPhysicalTime", tsoutil.PhysicalTime(pinnedTimestamp)),
|
||||||
|
zap.Time("cleanPhysicalTime", tsoutil.PhysicalTime(cleanTs)),
|
||||||
|
zap.Int64s("pinningSegmentIDs", pinnedSegments),
|
||||||
|
zap.Int("segmentCount", len(pinnedSegments)),
|
||||||
|
)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (b *listDeleteBuffer[T]) Size() (entryNum, memorySize int64) {
|
func (b *listDeleteBuffer[T]) Size() (entryNum, memorySize int64) {
|
||||||
b.mut.RLock()
|
b.mut.RLock()
|
||||||
defer b.mut.RUnlock()
|
defer b.mut.RUnlock()
|
||||||
|
|
||||||
return b.rowNum, b.size
|
return b.rowNum, b.size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Pin protects a specific timestamp from being cleaned up by a specific segment
|
||||||
|
func (b *listDeleteBuffer[T]) Pin(ts uint64, segmentID int64) {
|
||||||
|
b.mut.Lock()
|
||||||
|
defer b.mut.Unlock()
|
||||||
|
|
||||||
|
if b.pinnedTimestamps[ts] == nil {
|
||||||
|
b.pinnedTimestamps[ts] = make(map[int64]struct{})
|
||||||
|
}
|
||||||
|
b.pinnedTimestamps[ts][segmentID] = struct{}{}
|
||||||
|
|
||||||
|
log.Info("pin timestamp for segment",
|
||||||
|
zap.Uint64("timestamp", ts),
|
||||||
|
zap.Int64("segmentID", segmentID),
|
||||||
|
zap.Time("physicalTime", tsoutil.PhysicalTime(ts)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unpin removes protection for a specific timestamp by a specific segment and triggers cleanup
|
||||||
|
func (b *listDeleteBuffer[T]) Unpin(ts uint64, segmentID int64) {
|
||||||
|
b.mut.Lock()
|
||||||
|
defer b.mut.Unlock()
|
||||||
|
|
||||||
|
if segmentMap, exists := b.pinnedTimestamps[ts]; exists {
|
||||||
|
delete(segmentMap, segmentID)
|
||||||
|
if len(segmentMap) == 0 {
|
||||||
|
delete(b.pinnedTimestamps, ts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("unpin timestamp for segment",
|
||||||
|
zap.Uint64("timestamp", ts),
|
||||||
|
zap.Int64("segmentID", segmentID),
|
||||||
|
zap.Time("physicalTime", tsoutil.PhysicalTime(ts)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|||||||
@ -17,8 +17,10 @@
|
|||||||
package deletebuffer
|
package deletebuffer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
@ -200,3 +202,299 @@ func (s *ListDeleteBufferSuite) TestL0SegmentOperations() {
|
|||||||
func TestListDeleteBuffer(t *testing.T) {
|
func TestListDeleteBuffer(t *testing.T) {
|
||||||
suite.Run(t, new(ListDeleteBufferSuite))
|
suite.Run(t, new(ListDeleteBufferSuite))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListDeleteBuffer_PinUnpinBasic(t *testing.T) {
|
||||||
|
// Create a new list delete buffer
|
||||||
|
buffer := NewListDeleteBuffer[*Item](1000, 1, []string{"test1", "test2"})
|
||||||
|
|
||||||
|
// Add some test data
|
||||||
|
item1 := &Item{Ts: 1000, Data: []BufferItem{
|
||||||
|
{
|
||||||
|
PartitionID: 200,
|
||||||
|
DeleteData: storage.DeleteData{
|
||||||
|
Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(1)},
|
||||||
|
Tss: []uint64{1000},
|
||||||
|
RowCount: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
item2 := &Item{Ts: 2000, Data: []BufferItem{
|
||||||
|
{
|
||||||
|
PartitionID: 200,
|
||||||
|
DeleteData: storage.DeleteData{
|
||||||
|
Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(2)},
|
||||||
|
Tss: []uint64{2000},
|
||||||
|
RowCount: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
item3 := &Item{Ts: 3000, Data: []BufferItem{
|
||||||
|
{
|
||||||
|
PartitionID: 200,
|
||||||
|
DeleteData: storage.DeleteData{
|
||||||
|
Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(3)},
|
||||||
|
Tss: []uint64{3000},
|
||||||
|
RowCount: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
|
||||||
|
buffer.Put(item1)
|
||||||
|
buffer.Put(item2)
|
||||||
|
buffer.Put(item3)
|
||||||
|
|
||||||
|
// Verify initial state
|
||||||
|
entryNum, _ := buffer.Size()
|
||||||
|
assert.Equal(t, int64(3), entryNum)
|
||||||
|
|
||||||
|
// Pin timestamp 1500 for segment 123
|
||||||
|
buffer.Pin(1500, 123)
|
||||||
|
|
||||||
|
// Try to discard data before timestamp 2000
|
||||||
|
// This should be skipped because there's a pinned timestamp (1500) before cleanTs (2000)
|
||||||
|
buffer.TryDiscard(2000)
|
||||||
|
|
||||||
|
// Verify that all data is still there (cleanup was skipped)
|
||||||
|
entryNum, _ = buffer.Size()
|
||||||
|
assert.Equal(t, int64(3), entryNum)
|
||||||
|
|
||||||
|
// Unpin timestamp 1500 for segment 123
|
||||||
|
buffer.Unpin(1500, 123)
|
||||||
|
|
||||||
|
// Try to discard data before timestamp 2000 again
|
||||||
|
// Now cleanup should proceed normally
|
||||||
|
buffer.TryDiscard(2000)
|
||||||
|
|
||||||
|
// Verify that data before 2000 is cleaned up
|
||||||
|
entryNum, _ = buffer.Size()
|
||||||
|
assert.Equal(t, int64(2), entryNum)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestListDeleteBuffer_MultipleSegmentsPinSameTimestamp(t *testing.T) {
|
||||||
|
// Create a new list delete buffer
|
||||||
|
buffer := NewListDeleteBuffer[*Item](1000, 1, []string{"test1", "test2"})
|
||||||
|
|
||||||
|
// Add test data
|
||||||
|
item1 := &Item{Ts: 1000, Data: []BufferItem{
|
||||||
|
{
|
||||||
|
PartitionID: 200,
|
||||||
|
DeleteData: storage.DeleteData{
|
||||||
|
Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(1)},
|
||||||
|
Tss: []uint64{1000},
|
||||||
|
RowCount: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
item2 := &Item{Ts: 2000, Data: []BufferItem{
|
||||||
|
{
|
||||||
|
PartitionID: 200,
|
||||||
|
DeleteData: storage.DeleteData{
|
||||||
|
Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(2)},
|
||||||
|
Tss: []uint64{2000},
|
||||||
|
RowCount: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
|
||||||
|
buffer.Put(item1)
|
||||||
|
buffer.Put(item2)
|
||||||
|
|
||||||
|
// Multiple segments pin the same timestamp
|
||||||
|
buffer.Pin(1500, 123) // Protects data after 1500
|
||||||
|
buffer.Pin(1500, 456) // Also protects data after 1500
|
||||||
|
|
||||||
|
// Try to discard data before timestamp 2000
|
||||||
|
// This should be skipped because there's a pinned timestamp (1500) before cleanTs (2000)
|
||||||
|
buffer.TryDiscard(2000)
|
||||||
|
|
||||||
|
// Verify that all data is still there (cleanup was skipped)
|
||||||
|
entryNum, _ := buffer.Size()
|
||||||
|
assert.Equal(t, int64(2), entryNum) // All data should still be there
|
||||||
|
|
||||||
|
// Unpin one segment
|
||||||
|
buffer.Unpin(1500, 123)
|
||||||
|
|
||||||
|
// Try to discard data before timestamp 2000
|
||||||
|
// This should still be skipped because the other segment still has it pinned
|
||||||
|
buffer.TryDiscard(2000)
|
||||||
|
|
||||||
|
// Verify that data is still there (other segment still has it pinned)
|
||||||
|
entryNum, _ = buffer.Size()
|
||||||
|
assert.Equal(t, int64(2), entryNum) // Data should still be there
|
||||||
|
|
||||||
|
// Unpin the other segment
|
||||||
|
buffer.Unpin(1500, 456)
|
||||||
|
|
||||||
|
// Try to discard data before timestamp 2000
|
||||||
|
// Now cleanup should proceed normally
|
||||||
|
buffer.TryDiscard(2000)
|
||||||
|
|
||||||
|
// Verify that data is now cleaned up
|
||||||
|
entryNum, _ = buffer.Size()
|
||||||
|
assert.Equal(t, int64(1), entryNum) // Only data with ts >= 2000 should remain
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestListDeleteBuffer_PinAfterCleanTs(t *testing.T) {
|
||||||
|
// Create a new list delete buffer
|
||||||
|
buffer := NewListDeleteBuffer[*Item](1000, 1, []string{"test1", "test2"})
|
||||||
|
|
||||||
|
// Add test data
|
||||||
|
item1 := &Item{Ts: 1000, Data: []BufferItem{
|
||||||
|
{
|
||||||
|
PartitionID: 200,
|
||||||
|
DeleteData: storage.DeleteData{
|
||||||
|
Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(1)},
|
||||||
|
Tss: []uint64{1000},
|
||||||
|
RowCount: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
item2 := &Item{Ts: 2000, Data: []BufferItem{
|
||||||
|
{
|
||||||
|
PartitionID: 200,
|
||||||
|
DeleteData: storage.DeleteData{
|
||||||
|
Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(2)},
|
||||||
|
Tss: []uint64{2000},
|
||||||
|
RowCount: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
item3 := &Item{Ts: 3000, Data: []BufferItem{
|
||||||
|
{
|
||||||
|
PartitionID: 200,
|
||||||
|
DeleteData: storage.DeleteData{
|
||||||
|
Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(3)},
|
||||||
|
Tss: []uint64{3000},
|
||||||
|
RowCount: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
|
||||||
|
buffer.Put(item1)
|
||||||
|
buffer.Put(item2)
|
||||||
|
buffer.Put(item3)
|
||||||
|
|
||||||
|
// Pin a timestamp that is AFTER the cleanTs
|
||||||
|
buffer.Pin(2500, 123) // Protects data after 2500
|
||||||
|
|
||||||
|
// Try to discard data before timestamp 2000
|
||||||
|
// This should proceed normally because the pinned timestamp (2500) is AFTER cleanTs (2000)
|
||||||
|
buffer.TryDiscard(2000)
|
||||||
|
|
||||||
|
// Verify that data before 2000 is cleaned up
|
||||||
|
entryNum, _ := buffer.Size()
|
||||||
|
assert.Equal(t, int64(2), entryNum) // Data with ts 2000 and 3000 should remain
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestListDeleteBuffer_EdgeCases tests edge cases for the pinned functionality
|
||||||
|
func TestListDeleteBuffer_EdgeCases(t *testing.T) {
|
||||||
|
// Create a new list delete buffer
|
||||||
|
buffer := NewListDeleteBuffer[*Item](1000, 1, []string{"test1", "test2"})
|
||||||
|
|
||||||
|
// Test with empty buffer - TryDiscard should proceed normally
|
||||||
|
buffer.TryDiscard(2000)
|
||||||
|
entryNum, _ := buffer.Size()
|
||||||
|
assert.Equal(t, int64(0), entryNum) // No data to clean
|
||||||
|
|
||||||
|
// Add some data
|
||||||
|
item1 := &Item{Ts: 1000, Data: []BufferItem{
|
||||||
|
{
|
||||||
|
PartitionID: 200,
|
||||||
|
DeleteData: storage.DeleteData{
|
||||||
|
Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(1)},
|
||||||
|
Tss: []uint64{1000},
|
||||||
|
RowCount: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
|
||||||
|
buffer.Put(item1)
|
||||||
|
|
||||||
|
// Pin timestamp equal to data timestamp
|
||||||
|
buffer.Pin(1000, 123)
|
||||||
|
|
||||||
|
// Try to discard with timestamp equal to pinned timestamp
|
||||||
|
// This should be skipped because 1000 == 1000 (not < 1000)
|
||||||
|
buffer.TryDiscard(1000)
|
||||||
|
|
||||||
|
// Verify that data is still there
|
||||||
|
entryNum, _ = buffer.Size()
|
||||||
|
assert.Equal(t, int64(1), entryNum) // Data should still be there
|
||||||
|
|
||||||
|
// Try to discard with timestamp greater than pinned timestamp
|
||||||
|
// This should be skipped because 1000 < 2000
|
||||||
|
buffer.TryDiscard(2000)
|
||||||
|
|
||||||
|
// Verify that data is still there
|
||||||
|
entryNum, _ = buffer.Size()
|
||||||
|
assert.Equal(t, int64(1), entryNum) // Data should still be there
|
||||||
|
|
||||||
|
// Try to discard with timestamp less than pinned timestamp
|
||||||
|
// This should proceed normally because 1000 > 500
|
||||||
|
buffer.TryDiscard(500)
|
||||||
|
|
||||||
|
// Verify that data is still there (because 500 < 1000, no data to clean)
|
||||||
|
entryNum, _ = buffer.Size()
|
||||||
|
assert.Equal(t, int64(1), entryNum) // Data should still be there
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestListDeleteBuffer_PinUnpinConcurrent tests concurrent pin and unpin operations
|
||||||
|
func TestListDeleteBuffer_PinUnpinConcurrent(t *testing.T) {
|
||||||
|
// Create a new list delete buffer
|
||||||
|
buffer := NewListDeleteBuffer[*Item](1000, 1, []string{"test1", "test2"})
|
||||||
|
|
||||||
|
// Add test data
|
||||||
|
item1 := &Item{Ts: 1000, Data: []BufferItem{
|
||||||
|
{
|
||||||
|
PartitionID: 200,
|
||||||
|
DeleteData: storage.DeleteData{
|
||||||
|
Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(1)},
|
||||||
|
Tss: []uint64{1000},
|
||||||
|
RowCount: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
item2 := &Item{Ts: 2000, Data: []BufferItem{
|
||||||
|
{
|
||||||
|
PartitionID: 200,
|
||||||
|
DeleteData: storage.DeleteData{
|
||||||
|
Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(2)},
|
||||||
|
Tss: []uint64{2000},
|
||||||
|
RowCount: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
buffer.Put(item1)
|
||||||
|
buffer.Put(item2)
|
||||||
|
|
||||||
|
// Test concurrent pin operations
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(segmentID int64) {
|
||||||
|
defer wg.Done()
|
||||||
|
buffer.Pin(1500, segmentID)
|
||||||
|
}(int64(i))
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Try to discard - should be skipped due to pinned timestamp
|
||||||
|
buffer.TryDiscard(2000)
|
||||||
|
entryNum, _ := buffer.Size()
|
||||||
|
assert.Equal(t, int64(2), entryNum) // All data should remain
|
||||||
|
|
||||||
|
// Test concurrent unpin operations
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(segmentID int64) {
|
||||||
|
defer wg.Done()
|
||||||
|
buffer.Unpin(1500, segmentID)
|
||||||
|
}(int64(i))
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Try to discard again - should proceed normally
|
||||||
|
buffer.TryDiscard(2000)
|
||||||
|
entryNum, _ = buffer.Size()
|
||||||
|
assert.Equal(t, int64(1), entryNum) // Only data with ts >= 2000 should remain
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user