diff --git a/internal/datanode/buffer.go b/internal/datanode/buffer.go index f1b0a4e160..8d88a9afa1 100644 --- a/internal/datanode/buffer.go +++ b/internal/datanode/buffer.go @@ -296,6 +296,7 @@ func (ddb *DelDataBuf) mergeDelDataBuf(buf *DelDataBuf) { tr := TimeRange{timestampMax: buf.TimestampTo, timestampMin: buf.TimestampFrom} ddb.updateTimeRange(tr) + ddb.updateStartAndEndPosition(buf.startPos, buf.endPos) ddb.delData.Pks = append(ddb.delData.Pks, buf.delData.Pks...) ddb.delData.Tss = append(ddb.delData.Tss, buf.delData.Tss...) diff --git a/internal/datanode/buffer_test.go b/internal/datanode/buffer_test.go index 7971e30470..102277f114 100644 --- a/internal/datanode/buffer_test.go +++ b/internal/datanode/buffer_test.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" ) func genTestCollectionSchema(dim int64) *schemapb.CollectionSchema { @@ -162,10 +163,12 @@ func Test_CompactSegBuff(t *testing.T) { //2. set up deleteDataBuf for seg1 and seg2 delDataBuf1 := newDelDataBuf() delDataBuf1.EntriesNum++ + delDataBuf1.updateStartAndEndPosition(nil, &internalpb.MsgPosition{Timestamp: 50}) delBufferManager.Store(segID1, delDataBuf1) heap.Push(delBufferManager.delBufHeap, delDataBuf1.item) delDataBuf2 := newDelDataBuf() delDataBuf2.EntriesNum++ + delDataBuf2.updateStartAndEndPosition(nil, &internalpb.MsgPosition{Timestamp: 50}) delBufferManager.Store(segID2, delDataBuf2) heap.Push(delBufferManager.delBufHeap, delDataBuf2.item) @@ -180,4 +183,16 @@ func Test_CompactSegBuff(t *testing.T) { assert.False(t, seg1Exist) assert.False(t, seg2Exist) assert.Equal(t, int64(2), delBufferManager.GetEntriesNum(compactedToSegID)) + + //5. test roll and evict (https://github.com/milvus-io/milvus/issues/20501) + delBufferManager.channel.rollDeleteBuffer(compactedToSegID) + _, segCompactedToExist := delBufferManager.Load(compactedToSegID) + assert.False(t, segCompactedToExist) + delBufferManager.channel.evictHistoryDeleteBuffer(compactedToSegID, &internalpb.MsgPosition{ + Timestamp: 100, + }) + cp := delBufferManager.channel.getChannelCheckpoint(&internalpb.MsgPosition{ + Timestamp: 200, + }) + assert.Equal(t, Timestamp(200), cp.Timestamp) // evict all buffer, use ttPos as cp }