From 1eacdc591b1a8c5f8c765c8541cc65a9be0a644a Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 20 Dec 2023 21:22:43 +0800 Subject: [PATCH] fix: delegator may mark segment offline by mistake (#29343) See also #29332 The segment may be released before or during the request when delegator tries to forward delete request to yet. Currently, these two situation returns different error code. In this particular case, `ErrSegmentNotLoaded` and `ErrSegmentNotFound` shall both be ignored preventing return search service unavailable by mistake. --------- Signed-off-by: Congqi Xia --- .../querynodev2/delegator/delegator_data.go | 2 +- .../delegator/delegator_data_test.go | 26 ++++++++++++++++--- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 1f749eb8fa..f22f7db9c4 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -277,7 +277,7 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker if errors.Is(err, merr.ErrNodeNotFound) { log.Warn("try to delete data on non-exist node") return retry.Unrecoverable(err) - } else if errors.Is(err, merr.ErrSegmentNotFound) { + } else if errors.IsAny(err, merr.ErrSegmentNotFound, merr.ErrSegmentNotLoaded) { log.Warn("try to delete data of released segment") return nil } else if err != nil { diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 41257c71bd..bd57fcf70c 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -58,7 +58,7 @@ type DelegatorDataSuite struct { loader *segments.MockLoader mq *msgstream.MockMsgStream - delegator ShardDelegator + delegator *shardDelegator } func (s *DelegatorDataSuite) SetupSuite() { @@ -131,13 +131,15 @@ func (s *DelegatorDataSuite) SetupTest() { s.mq = &msgstream.MockMsgStream{} - var err error - s.delegator, err = NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{ + delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{ NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) { return s.mq, nil }, }, 10000, nil) s.Require().NoError(err) + sd, ok := delegator.(*shardDelegator) + s.Require().True(ok) + s.delegator = sd } func (s *DelegatorDataSuite) TestProcessInsert() { @@ -330,6 +332,20 @@ func (s *DelegatorDataSuite) TestProcessDelete() { RowCount: 1, }, }, 10) + s.True(s.delegator.distribution.Serviceable()) + + // test worker return segment not loaded + worker1.ExpectedCalls = nil + worker1.EXPECT().Delete(mock.Anything, mock.Anything).Return(merr.ErrSegmentNotLoaded) + s.delegator.ProcessDelete([]*DeleteData{ + { + PartitionID: 500, + PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)}, + Timestamps: []uint64{10}, + RowCount: 1, + }, + }, 10) + s.True(s.delegator.distribution.Serviceable(), "segment not loaded shall not trigger offline") // test worker offline worker1.ExpectedCalls = nil @@ -342,6 +358,8 @@ func (s *DelegatorDataSuite) TestProcessDelete() { RowCount: 1, }, }, 10) + + s.False(s.delegator.distribution.Serviceable()) } func (s *DelegatorDataSuite) TestLoadSegments() { @@ -901,7 +919,7 @@ func (s *DelegatorDataSuite) TestSyncTargetVersion() { } func (s *DelegatorDataSuite) TestLevel0Deletions() { - delegator := s.delegator.(*shardDelegator) + delegator := s.delegator partitionID := int64(10) partitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1)}, []storage.Timestamp{100}) allPartitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(2)}, []storage.Timestamp{101})