From 5b693c466d8716098ab1cdc3b3e16a72eeaeee1b Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Tue, 9 Apr 2024 15:21:24 +0800 Subject: [PATCH] fix: delegator filter out all partition's delete msg when loading segment (#31585) May cause deleted data queryable a period of time. relate: https://github.com/milvus-io/milvus/issues/31484 https://github.com/milvus-io/milvus/issues/31548 --------- Signed-off-by: aoiasd --- .../querynodev2/delegator/delegator_data.go | 2 +- .../delegator/delegator_data_test.go | 33 +++++++++++++++++++ internal/storage/primary_key.go | 15 +++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index fea0e8cd8a..c63717dcdf 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -749,7 +749,7 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position for _, tsMsg := range msgPack.Msgs { if tsMsg.Type() == commonpb.MsgType_Delete { dmsg := tsMsg.(*msgstream.DeleteMsg) - if dmsg.CollectionID != sd.collectionID || dmsg.GetPartitionID() != candidate.Partition() { + if dmsg.CollectionID != sd.collectionID || (dmsg.GetPartitionID() != common.AllPartitionsID && dmsg.GetPartitionID() != candidate.Partition()) { continue } diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index e3b1b57d49..51b71f21f3 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -1114,6 +1114,39 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() { s.True(pks[0].EQ(allPartitionDeleteData.Pks[0])) } +func (s *DelegatorDataSuite) TestReadDeleteFromMsgstream() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.mq.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.mq.EXPECT().Seek(mock.Anything, mock.Anything).Return(nil) + s.mq.EXPECT().Close() + ch := make(chan *msgstream.MsgPack, 10) + s.mq.EXPECT().Chan().Return(ch) + + oracle := pkoracle.NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + oracle.UpdateBloomFilter([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1), storage.NewInt64PrimaryKey(2)}) + + baseMsg := &commonpb.MsgBase{MsgType: commonpb.MsgType_Delete} + + datas := []*msgstream.MsgPack{ + {EndTs: 10, EndPositions: []*msgpb.MsgPosition{{Timestamp: 10}}, Msgs: []msgstream.TsMsg{ + &msgstream.DeleteMsg{DeleteRequest: msgpb.DeleteRequest{Base: baseMsg, CollectionID: s.collectionID, PartitionID: 1, PrimaryKeys: storage.ParseInt64s2IDs(1), Timestamps: []uint64{1}}}, + &msgstream.DeleteMsg{DeleteRequest: msgpb.DeleteRequest{Base: baseMsg, CollectionID: s.collectionID, PartitionID: -1, PrimaryKeys: storage.ParseInt64s2IDs(2), Timestamps: []uint64{5}}}, + // invalid msg because partition wrong + &msgstream.DeleteMsg{DeleteRequest: msgpb.DeleteRequest{Base: baseMsg, CollectionID: s.collectionID, PartitionID: 2, PrimaryKeys: storage.ParseInt64s2IDs(1), Timestamps: []uint64{10}}}, + }}, + } + + for _, data := range datas { + ch <- data + } + + result, err := s.delegator.readDeleteFromMsgstream(ctx, &msgpb.MsgPosition{Timestamp: 0}, 10, oracle) + s.NoError(err) + s.Equal(2, len(result.Pks)) +} + func TestDelegatorDataSuite(t *testing.T) { suite.Run(t, new(DelegatorDataSuite)) } diff --git a/internal/storage/primary_key.go b/internal/storage/primary_key.go index 80f33bad89..f9322f64db 100644 --- a/internal/storage/primary_key.go +++ b/internal/storage/primary_key.go @@ -410,3 +410,18 @@ func ParsePrimaryKeys2IDs(pks []PrimaryKey) *schemapb.IDs { return ret } + +func ParseInt64s2IDs(pks ...int64) *schemapb.IDs { + ret := &schemapb.IDs{} + if len(pks) == 0 { + return ret + } + + ret.IdField = &schemapb.IDs_IntId{ + IntId: &schemapb.LongArray{ + Data: pks, + }, + } + + return ret +}