From b08d9efe69748e457d7828d4ba037260303fc824 Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 15 Jul 2025 17:46:51 +0800 Subject: [PATCH] fix: Prevent delegator unserviceable due to shard leader change (#42689) (#43309) issue: #42098 #42404 pr: #42689 Fix critical issue where concurrent balance segment and balance channel operations cause delegator view inconsistency. When shard leader switches between load and release phases of segment balance, it results in loading segments on old delegator but releasing on new delegator, making the new delegator unserviceable. The root cause is that balance segment modifies delegator views, and if these modifications happen on different delegators due to leader change, it corrupts the delegator state and affects query availability. Changes include: - Add shardLeaderID field to SegmentTask to track delegator for load - Record shard leader ID during segment loading in move operations - Skip release if shard leader changed from the one used for loading - Add comprehensive unit tests for leader change scenarios This ensures balance segment operations are atomic on single delegator, preventing view corruption and maintaining delegator serviceability. --------- Signed-off-by: Wei Liu --- internal/datanode/importv2/util.go | 3 +- internal/querycoordv2/task/executor.go | 35 +++- internal/querycoordv2/task/executor_test.go | 193 ++++++++++++++++++ internal/querycoordv2/task/task.go | 15 +- internal/querycoordv2/task/task_test.go | 34 +++ .../querynodev2/delegator/exclude_info.go | 2 +- internal/querynodev2/services.go | 10 + tests/integration/balance/balance_test.go | 61 ++++++ 8 files changed, 343 insertions(+), 10 deletions(-) diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index b65e573d76..9d1826ae7c 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -199,8 +199,7 @@ func AppendSystemFieldsData(task *ImportTask, data *storage.InsertData, rowNum i return nil } -type nullDefaultAppender[T any] struct { -} +type nullDefaultAppender[T any] struct{} func (h *nullDefaultAppender[T]) AppendDefault(fieldData storage.FieldData, defaultVal T, rowNum int) error { values := make([]T, rowNum) diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 1ff9fc62e1..8c65079c44 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -18,6 +18,7 @@ package task import ( "context" + "fmt" "sync" "time" @@ -215,15 +216,25 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { log.Warn(msg, zap.Error(err)) return err } - view := ex.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(action.Shard)) + // prefer to load segment by latest and serviceable shard leader + view := ex.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(action.Shard), meta.WithServiceable()) if view == nil { - msg := "no shard leader for the segment to execute loading" - err = merr.WrapErrChannelNotFound(task.Shard(), "shard delegator not found") - log.Warn(msg, zap.Error(err)) - return err + // if no serviceable shard leader, try to find the latest shard leader + view = ex.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(action.Shard)) + if view == nil { + msg := "no shard leader for the segment to execute loading" + err := merr.WrapErrChannelNotFound(task.Shard(), "shard delegator not found") + log.Warn(msg, zap.Error(err)) + return err + } } log = log.With(zap.Int64("shardLeader", view.ID)) + // NOTE: for balance segment task, expected load and release execution on the same shard leader + if GetTaskType(task) == TaskTypeMove { + task.SetShardLeaderID(view.ID) + } + startTs := time.Now() log.Info("load segments...") status, err := ex.cluster.LoadSegments(task.Context(), view.ID, req) @@ -255,6 +266,12 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) { ) ctx := task.Context() + var err error + defer func() { + if err != nil { + task.Fail(err) + } + }() dstNode := action.Node() @@ -295,6 +312,14 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) { } } + // NOTE: for balance segment task, expected load and release execution on the same shard leader + if GetTaskType(task) == TaskTypeMove && task.ShardLeaderID() != view.ID { + msg := "shard leader changed, skip release" + err = merr.WrapErrServiceInternal(fmt.Sprintf("shard leader changed from %d to %d", task.ShardLeaderID(), view.ID)) + log.Warn(msg, zap.Error(err)) + return + } + dstNode = view.ID log = log.With(zap.Int64("shardLeader", view.ID)) req.NeedTransfer = true diff --git a/internal/querycoordv2/task/executor_test.go b/internal/querycoordv2/task/executor_test.go index b9a6cc6d09..7981a7622b 100644 --- a/internal/querycoordv2/task/executor_test.go +++ b/internal/querycoordv2/task/executor_test.go @@ -26,6 +26,8 @@ import ( "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" "github.com/milvus-io/milvus/internal/querycoordv2/meta" @@ -33,6 +35,8 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/v2/kv" + "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/util/etcd" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) @@ -306,6 +310,195 @@ func (suite *ExecutorTestSuite) TestReleaseSegmentChannelSpecificLookup() { suite.cluster.AssertExpectations(suite.T()) } +func (suite *ExecutorTestSuite) TestBalanceTaskWithTwoDelegators() { + // Setup collection and replica + collection := utils.CreateTestCollection(1, 1) + suite.meta.CollectionManager.PutCollection(suite.ctx, collection) + suite.meta.CollectionManager.PutPartition(suite.ctx, utils.CreateTestPartition(1, 1)) + + replica := utils.CreateTestReplica(1, 1, []int64{1, 2}) + suite.meta.ReplicaManager.Put(suite.ctx, replica) + + // Setup nodes + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) + suite.meta.ResourceManager.HandleNodeUp(suite.ctx, 1) + suite.meta.ResourceManager.HandleNodeUp(suite.ctx, 2) + + // Create balance task with load and release actions + loadAction := NewSegmentAction(2, ActionTypeGrow, "test-channel", 100) // Load on node 2 + releaseAction := NewSegmentAction(1, ActionTypeReduce, "test-channel", 100) // Release from node 1 + + task, err := NewSegmentTask(suite.ctx, time.Second*10, WrapIDSource(0), 1, replica, loadAction, releaseAction) + suite.NoError(err) + + // Setup old delegator (node 1) - serviceable + oldDelegatorView := utils.CreateTestLeaderView(1, 1, "test-channel", map[int64]int64{100: 1}, map[int64]*meta.Segment{}) + oldDelegatorView.Version = 1 + oldDelegatorView.UnServiceableError = nil // serviceable + suite.dist.LeaderViewManager.Update(1, oldDelegatorView) + + // Mock broker responses + suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{ + Schema: &schemapb.CollectionSchema{ + Name: "TestBalanceTask", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector}, + }, + }, + }, nil) + suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(100)).Return([]*datapb.SegmentInfo{ + { + ID: 100, + CollectionID: 1, + PartitionID: 1, + InsertChannel: "test-channel", + }, + }, nil) + suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), int64(100)).Return(nil, nil) + suite.broker.EXPECT().ListIndexes(mock.Anything, int64(1)).Return([]*indexpb.IndexInfo{ + { + CollectionID: 1, + }, + }, nil) + + // Setup target for collection to be loaded + channel := &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: "test-channel", + } + segments := []*datapb.SegmentInfo{ + { + ID: 100, + CollectionID: 1, + PartitionID: 1, + InsertChannel: "test-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return([]*datapb.VchannelInfo{channel}, segments, nil) + suite.target.UpdateCollectionNextTarget(suite.ctx, 1) + + // Expect load to be called on latest serviceable delegator (node 1) + suite.cluster.EXPECT().LoadSegments(mock.Anything, int64(1), mock.Anything).Return(&commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, nil).Once() + + // Execute load segment (step 0) + suite.executor.loadSegment(task, 0) + + // Setup new delegator (node 2) - serviceable + newDelegatorView := utils.CreateTestLeaderView(2, 1, "test-channel", map[int64]int64{100: 2}, map[int64]*meta.Segment{}) + newDelegatorView.Version = 2 + newDelegatorView.UnServiceableError = nil // serviceable + suite.dist.LeaderViewManager.Update(2, newDelegatorView) + + // Execute release segment (step 1) + suite.executor.releaseSegment(task, 1) + + // verify that the task is failed due to shard leader change + suite.Error(task.Err()) +} + +func (suite *ExecutorTestSuite) TestBalanceTaskFallbackToLatestWhenNoServiceableDelegator() { + // Setup collection and replica + collection := utils.CreateTestCollection(1, 1) + suite.meta.CollectionManager.PutCollection(suite.ctx, collection) + suite.meta.CollectionManager.PutPartition(suite.ctx, utils.CreateTestPartition(1, 1)) + + replica := utils.CreateTestReplica(1, 1, []int64{1, 2}) + suite.meta.ReplicaManager.Put(suite.ctx, replica) + + // Setup nodes + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) + suite.meta.ResourceManager.HandleNodeUp(suite.ctx, 1) + suite.meta.ResourceManager.HandleNodeUp(suite.ctx, 2) + + // Create load action + loadAction := NewSegmentAction(2, ActionTypeGrow, "test-channel", 100) + task, err := NewSegmentTask(suite.ctx, time.Second*10, WrapIDSource(0), 1, replica, loadAction) + suite.NoError(err) + + // Setup old delegator (node 1) - not serviceable + oldDelegatorView := utils.CreateTestLeaderView(1, 1, "test-channel", map[int64]int64{100: 1}, map[int64]*meta.Segment{}) + oldDelegatorView.UnServiceableError = errors.New("not serviceable") + oldDelegatorView.Version = 1 + suite.dist.LeaderViewManager.Update(1, oldDelegatorView) + + // Setup new delegator (node 2) - not serviceable but latest + newDelegatorView := utils.CreateTestLeaderView(2, 1, "test-channel", map[int64]int64{100: 2}, map[int64]*meta.Segment{}) + newDelegatorView.UnServiceableError = errors.New("not serviceable") + newDelegatorView.Version = 2 // latest version + suite.dist.LeaderViewManager.Update(2, newDelegatorView) + + // Mock broker responses + suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{ + Schema: &schemapb.CollectionSchema{ + Name: "TestBalanceTask", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector}, + }, + }, + }, nil) + suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(100)).Return([]*datapb.SegmentInfo{ + { + ID: 100, + CollectionID: 1, + PartitionID: 1, + InsertChannel: "test-channel", + }, + }, nil) + suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), int64(100)).Return(nil, nil) + suite.broker.EXPECT().ListIndexes(mock.Anything, int64(1)).Return([]*indexpb.IndexInfo{ + { + CollectionID: 1, + }, + }, nil) + + // Setup target for collection to be loaded + channel := &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: "test-channel", + } + segments := []*datapb.SegmentInfo{ + { + ID: 100, + CollectionID: 1, + PartitionID: 1, + InsertChannel: "test-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return([]*datapb.VchannelInfo{channel}, segments, nil) + suite.target.UpdateCollectionNextTarget(suite.ctx, 1) + + // Expect load to be called on the latest delegator (node 2) as fallback + suite.cluster.EXPECT().LoadSegments(mock.Anything, int64(2), mock.Anything).Return(&commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, nil).Once() + + // Execute load segment + suite.executor.loadSegment(task, 0) + + // Verify that the latest delegator was chosen as fallback + suite.cluster.AssertExpectations(suite.T()) +} + func TestExecutorSuite(t *testing.T) { suite.Run(t, new(ExecutorTestSuite)) } diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go index 6fcbfb3eba..c0c6755b42 100644 --- a/internal/querycoordv2/task/task.go +++ b/internal/querycoordv2/task/task.go @@ -325,6 +325,8 @@ type SegmentTask struct { *baseTask segmentID typeutil.UniqueID + // for balance segment task, expected load and release execution on the same shard leader + shardLeaderID int64 } // NewSegmentTask creates a SegmentTask with actions, @@ -359,8 +361,9 @@ func NewSegmentTask(ctx context.Context, base := newBaseTask(ctx, source, collectionID, replica, shard, fmt.Sprintf("SegmentTask-%s-%d", actions[0].Type().String(), segmentID)) base.actions = actions return &SegmentTask{ - baseTask: base, - segmentID: segmentID, + baseTask: base, + segmentID: segmentID, + shardLeaderID: -1, }, nil } @@ -384,6 +387,14 @@ func (task *SegmentTask) MarshalJSON() ([]byte, error) { return marshalJSON(task) } +func (task *SegmentTask) ShardLeaderID() int64 { + return task.shardLeaderID +} + +func (task *SegmentTask) SetShardLeaderID(id int64) { + task.shardLeaderID = id +} + type ChannelTask struct { *baseTask } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 325289bbea..160e21414d 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1998,3 +1998,37 @@ func newReplicaDefaultRG(replicaID int64) *meta.Replica { typeutil.NewUniqueSet(), ) } + +func (suite *TaskSuite) TestSegmentTaskShardLeaderID() { + ctx := context.Background() + timeout := 10 * time.Second + + // Create a segment task + action := NewSegmentActionWithScope(1, ActionTypeGrow, "", 100, querypb.DataScope_Historical, 100) + segmentTask, err := NewSegmentTask( + ctx, + timeout, + WrapIDSource(0), + suite.collection, + suite.replica, + action, + ) + suite.NoError(err) + + // Test initial shard leader ID (should be -1) + suite.Equal(int64(-1), segmentTask.ShardLeaderID()) + + // Test setting shard leader ID + expectedLeaderID := int64(123) + segmentTask.SetShardLeaderID(expectedLeaderID) + suite.Equal(expectedLeaderID, segmentTask.ShardLeaderID()) + + // Test setting another value + anotherLeaderID := int64(456) + segmentTask.SetShardLeaderID(anotherLeaderID) + suite.Equal(anotherLeaderID, segmentTask.ShardLeaderID()) + + // Test with zero value + segmentTask.SetShardLeaderID(0) + suite.Equal(int64(0), segmentTask.ShardLeaderID()) +} diff --git a/internal/querynodev2/delegator/exclude_info.go b/internal/querynodev2/delegator/exclude_info.go index d2040ca394..be1484dd91 100644 --- a/internal/querynodev2/delegator/exclude_info.go +++ b/internal/querynodev2/delegator/exclude_info.go @@ -77,7 +77,7 @@ func (s *ExcludedSegments) CleanInvalid(ts uint64) { for _, segmentID := range invalidExcludedInfos { delete(s.segments, segmentID) - log.Info("remove segment from exclude info", zap.Int64("segmentID", segmentID)) + log.Ctx(context.TODO()).Debug("remove segment from exclude info", zap.Int64("segmentID", segmentID)) } s.lastClean.Store(time.Now()) } diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 3db45de0ea..c707e9f470 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -301,6 +301,16 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm }) delegator.AddExcludedSegments(growingInfo) + flushedInfo := lo.SliceToMap(channel.GetFlushedSegmentIds(), func(id int64) (int64, uint64) { + return id, typeutil.MaxTimestamp + }) + delegator.AddExcludedSegments(flushedInfo) + + droppedInfo := lo.SliceToMap(channel.GetDroppedSegmentIds(), func(id int64) (int64, uint64) { + return id, typeutil.MaxTimestamp + }) + delegator.AddExcludedSegments(droppedInfo) + defer func() { if err != nil { // remove legacy growing diff --git a/tests/integration/balance/balance_test.go b/tests/integration/balance/balance_test.go index 9911f5d5b5..c29c9acafb 100644 --- a/tests/integration/balance/balance_test.go +++ b/tests/integration/balance/balance_test.go @@ -21,11 +21,13 @@ import ( "fmt" "strconv" "strings" + "sync" "testing" "time" "github.com/samber/lo" "github.com/stretchr/testify/suite" + "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -309,6 +311,65 @@ func (s *BalanceTestSuit) TestNodeDown() { }, 30*time.Second, 1*time.Second) } +func (s *BalanceTestSuit) TestConcurrentBalanceChannelAndSegment() { + ctx := context.Background() + + // speed up balance trigger + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.BalanceCheckInterval.Key, "500") + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.AutoBalanceInterval.Key, "500") + + // init collection with 10 channel, each channel has 10 segment, each segment has 2000 row + // and load it with 1 replicas on 2 nodes. + name := "test_balance_" + funcutil.GenRandomStr() + s.initCollection(name, 1, 10, 10, 2000, 500) + + stopSearchCh := make(chan struct{}) + failCounter := atomic.NewInt64(0) + + // keep query during balance + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stopSearchCh: + log.Info("stop search") + return + default: + queryResult, err := s.Cluster.Proxy.Query(ctx, &milvuspb.QueryRequest{ + DbName: "", + CollectionName: name, + Expr: "", + OutputFields: []string{"count(*)"}, + }) + + if err := merr.CheckRPCCall(queryResult.GetStatus(), err); err != nil { + log.Info("query failed", zap.Error(err)) + failCounter.Inc() + } + } + } + }() + + // then we add 1 query node, expected segment and channel will be move to new query node concurrently + qn1 := s.Cluster.AddQueryNode() + + // wait until balance channel finished + s.Eventually(func() bool { + resp, err := qn1.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + log.Info("resp", zap.Any("channel", len(resp.Channels)), zap.Any("segments", len(resp.Segments))) + return len(resp.Channels) == 5 + }, 30*time.Second, 1*time.Second) + + // expected concurrent balance will execute successfully, shard serviceable won't be broken + close(stopSearchCh) + wg.Wait() + s.Equal(int64(0), failCounter.Load()) +} + func TestBalance(t *testing.T) { suite.Run(t, new(BalanceTestSuit)) }