fix: Prevent delegator unserviceable due to shard leader change (#42689) (#43309)

issue: #42098 #42404
pr: #42689
Fix critical issue where concurrent balance segment and balance channel
operations cause delegator view inconsistency. When shard leader
switches between load and release phases of segment balance, it results
in loading segments on old delegator but releasing on new delegator,
making the new delegator unserviceable.

The root cause is that balance segment modifies delegator views, and if
these modifications happen on different delegators due to leader change,
it corrupts the delegator state and affects query availability.

Changes include:
- Add shardLeaderID field to SegmentTask to track delegator for load
- Record shard leader ID during segment loading in move operations
- Skip release if shard leader changed from the one used for loading
- Add comprehensive unit tests for leader change scenarios

This ensures balance segment operations are atomic on single delegator,
preventing view corruption and maintaining delegator serviceability.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-07-15 17:46:51 +08:00 committed by GitHub
parent 180214d406
commit b08d9efe69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 343 additions and 10 deletions

View File

@ -199,8 +199,7 @@ func AppendSystemFieldsData(task *ImportTask, data *storage.InsertData, rowNum i
return nil
}
type nullDefaultAppender[T any] struct {
}
type nullDefaultAppender[T any] struct{}
func (h *nullDefaultAppender[T]) AppendDefault(fieldData storage.FieldData, defaultVal T, rowNum int) error {
values := make([]T, rowNum)

View File

@ -18,6 +18,7 @@ package task
import (
"context"
"fmt"
"sync"
"time"
@ -215,15 +216,25 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
log.Warn(msg, zap.Error(err))
return err
}
view := ex.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(action.Shard))
// prefer to load segment by latest and serviceable shard leader
view := ex.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(action.Shard), meta.WithServiceable())
if view == nil {
msg := "no shard leader for the segment to execute loading"
err = merr.WrapErrChannelNotFound(task.Shard(), "shard delegator not found")
log.Warn(msg, zap.Error(err))
return err
// if no serviceable shard leader, try to find the latest shard leader
view = ex.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(action.Shard))
if view == nil {
msg := "no shard leader for the segment to execute loading"
err := merr.WrapErrChannelNotFound(task.Shard(), "shard delegator not found")
log.Warn(msg, zap.Error(err))
return err
}
}
log = log.With(zap.Int64("shardLeader", view.ID))
// NOTE: for balance segment task, expected load and release execution on the same shard leader
if GetTaskType(task) == TaskTypeMove {
task.SetShardLeaderID(view.ID)
}
startTs := time.Now()
log.Info("load segments...")
status, err := ex.cluster.LoadSegments(task.Context(), view.ID, req)
@ -255,6 +266,12 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
)
ctx := task.Context()
var err error
defer func() {
if err != nil {
task.Fail(err)
}
}()
dstNode := action.Node()
@ -295,6 +312,14 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
}
}
// NOTE: for balance segment task, expected load and release execution on the same shard leader
if GetTaskType(task) == TaskTypeMove && task.ShardLeaderID() != view.ID {
msg := "shard leader changed, skip release"
err = merr.WrapErrServiceInternal(fmt.Sprintf("shard leader changed from %d to %d", task.ShardLeaderID(), view.ID))
log.Warn(msg, zap.Error(err))
return
}
dstNode = view.ID
log = log.With(zap.Int64("shardLeader", view.ID))
req.NeedTransfer = true

View File

@ -26,6 +26,8 @@ import (
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -33,6 +35,8 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
@ -306,6 +310,195 @@ func (suite *ExecutorTestSuite) TestReleaseSegmentChannelSpecificLookup() {
suite.cluster.AssertExpectations(suite.T())
}
func (suite *ExecutorTestSuite) TestBalanceTaskWithTwoDelegators() {
// Setup collection and replica
collection := utils.CreateTestCollection(1, 1)
suite.meta.CollectionManager.PutCollection(suite.ctx, collection)
suite.meta.CollectionManager.PutPartition(suite.ctx, utils.CreateTestPartition(1, 1))
replica := utils.CreateTestReplica(1, 1, []int64{1, 2})
suite.meta.ReplicaManager.Put(suite.ctx, replica)
// Setup nodes
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.HandleNodeUp(suite.ctx, 1)
suite.meta.ResourceManager.HandleNodeUp(suite.ctx, 2)
// Create balance task with load and release actions
loadAction := NewSegmentAction(2, ActionTypeGrow, "test-channel", 100) // Load on node 2
releaseAction := NewSegmentAction(1, ActionTypeReduce, "test-channel", 100) // Release from node 1
task, err := NewSegmentTask(suite.ctx, time.Second*10, WrapIDSource(0), 1, replica, loadAction, releaseAction)
suite.NoError(err)
// Setup old delegator (node 1) - serviceable
oldDelegatorView := utils.CreateTestLeaderView(1, 1, "test-channel", map[int64]int64{100: 1}, map[int64]*meta.Segment{})
oldDelegatorView.Version = 1
oldDelegatorView.UnServiceableError = nil // serviceable
suite.dist.LeaderViewManager.Update(1, oldDelegatorView)
// Mock broker responses
suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{
Schema: &schemapb.CollectionSchema{
Name: "TestBalanceTask",
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector},
},
},
}, nil)
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(100)).Return([]*datapb.SegmentInfo{
{
ID: 100,
CollectionID: 1,
PartitionID: 1,
InsertChannel: "test-channel",
},
}, nil)
suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), int64(100)).Return(nil, nil)
suite.broker.EXPECT().ListIndexes(mock.Anything, int64(1)).Return([]*indexpb.IndexInfo{
{
CollectionID: 1,
},
}, nil)
// Setup target for collection to be loaded
channel := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: "test-channel",
}
segments := []*datapb.SegmentInfo{
{
ID: 100,
CollectionID: 1,
PartitionID: 1,
InsertChannel: "test-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return([]*datapb.VchannelInfo{channel}, segments, nil)
suite.target.UpdateCollectionNextTarget(suite.ctx, 1)
// Expect load to be called on latest serviceable delegator (node 1)
suite.cluster.EXPECT().LoadSegments(mock.Anything, int64(1), mock.Anything).Return(&commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil).Once()
// Execute load segment (step 0)
suite.executor.loadSegment(task, 0)
// Setup new delegator (node 2) - serviceable
newDelegatorView := utils.CreateTestLeaderView(2, 1, "test-channel", map[int64]int64{100: 2}, map[int64]*meta.Segment{})
newDelegatorView.Version = 2
newDelegatorView.UnServiceableError = nil // serviceable
suite.dist.LeaderViewManager.Update(2, newDelegatorView)
// Execute release segment (step 1)
suite.executor.releaseSegment(task, 1)
// verify that the task is failed due to shard leader change
suite.Error(task.Err())
}
func (suite *ExecutorTestSuite) TestBalanceTaskFallbackToLatestWhenNoServiceableDelegator() {
// Setup collection and replica
collection := utils.CreateTestCollection(1, 1)
suite.meta.CollectionManager.PutCollection(suite.ctx, collection)
suite.meta.CollectionManager.PutPartition(suite.ctx, utils.CreateTestPartition(1, 1))
replica := utils.CreateTestReplica(1, 1, []int64{1, 2})
suite.meta.ReplicaManager.Put(suite.ctx, replica)
// Setup nodes
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.HandleNodeUp(suite.ctx, 1)
suite.meta.ResourceManager.HandleNodeUp(suite.ctx, 2)
// Create load action
loadAction := NewSegmentAction(2, ActionTypeGrow, "test-channel", 100)
task, err := NewSegmentTask(suite.ctx, time.Second*10, WrapIDSource(0), 1, replica, loadAction)
suite.NoError(err)
// Setup old delegator (node 1) - not serviceable
oldDelegatorView := utils.CreateTestLeaderView(1, 1, "test-channel", map[int64]int64{100: 1}, map[int64]*meta.Segment{})
oldDelegatorView.UnServiceableError = errors.New("not serviceable")
oldDelegatorView.Version = 1
suite.dist.LeaderViewManager.Update(1, oldDelegatorView)
// Setup new delegator (node 2) - not serviceable but latest
newDelegatorView := utils.CreateTestLeaderView(2, 1, "test-channel", map[int64]int64{100: 2}, map[int64]*meta.Segment{})
newDelegatorView.UnServiceableError = errors.New("not serviceable")
newDelegatorView.Version = 2 // latest version
suite.dist.LeaderViewManager.Update(2, newDelegatorView)
// Mock broker responses
suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{
Schema: &schemapb.CollectionSchema{
Name: "TestBalanceTask",
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector},
},
},
}, nil)
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(100)).Return([]*datapb.SegmentInfo{
{
ID: 100,
CollectionID: 1,
PartitionID: 1,
InsertChannel: "test-channel",
},
}, nil)
suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), int64(100)).Return(nil, nil)
suite.broker.EXPECT().ListIndexes(mock.Anything, int64(1)).Return([]*indexpb.IndexInfo{
{
CollectionID: 1,
},
}, nil)
// Setup target for collection to be loaded
channel := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: "test-channel",
}
segments := []*datapb.SegmentInfo{
{
ID: 100,
CollectionID: 1,
PartitionID: 1,
InsertChannel: "test-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return([]*datapb.VchannelInfo{channel}, segments, nil)
suite.target.UpdateCollectionNextTarget(suite.ctx, 1)
// Expect load to be called on the latest delegator (node 2) as fallback
suite.cluster.EXPECT().LoadSegments(mock.Anything, int64(2), mock.Anything).Return(&commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil).Once()
// Execute load segment
suite.executor.loadSegment(task, 0)
// Verify that the latest delegator was chosen as fallback
suite.cluster.AssertExpectations(suite.T())
}
func TestExecutorSuite(t *testing.T) {
suite.Run(t, new(ExecutorTestSuite))
}

View File

@ -325,6 +325,8 @@ type SegmentTask struct {
*baseTask
segmentID typeutil.UniqueID
// for balance segment task, expected load and release execution on the same shard leader
shardLeaderID int64
}
// NewSegmentTask creates a SegmentTask with actions,
@ -359,8 +361,9 @@ func NewSegmentTask(ctx context.Context,
base := newBaseTask(ctx, source, collectionID, replica, shard, fmt.Sprintf("SegmentTask-%s-%d", actions[0].Type().String(), segmentID))
base.actions = actions
return &SegmentTask{
baseTask: base,
segmentID: segmentID,
baseTask: base,
segmentID: segmentID,
shardLeaderID: -1,
}, nil
}
@ -384,6 +387,14 @@ func (task *SegmentTask) MarshalJSON() ([]byte, error) {
return marshalJSON(task)
}
func (task *SegmentTask) ShardLeaderID() int64 {
return task.shardLeaderID
}
func (task *SegmentTask) SetShardLeaderID(id int64) {
task.shardLeaderID = id
}
type ChannelTask struct {
*baseTask
}

View File

@ -1998,3 +1998,37 @@ func newReplicaDefaultRG(replicaID int64) *meta.Replica {
typeutil.NewUniqueSet(),
)
}
func (suite *TaskSuite) TestSegmentTaskShardLeaderID() {
ctx := context.Background()
timeout := 10 * time.Second
// Create a segment task
action := NewSegmentActionWithScope(1, ActionTypeGrow, "", 100, querypb.DataScope_Historical, 100)
segmentTask, err := NewSegmentTask(
ctx,
timeout,
WrapIDSource(0),
suite.collection,
suite.replica,
action,
)
suite.NoError(err)
// Test initial shard leader ID (should be -1)
suite.Equal(int64(-1), segmentTask.ShardLeaderID())
// Test setting shard leader ID
expectedLeaderID := int64(123)
segmentTask.SetShardLeaderID(expectedLeaderID)
suite.Equal(expectedLeaderID, segmentTask.ShardLeaderID())
// Test setting another value
anotherLeaderID := int64(456)
segmentTask.SetShardLeaderID(anotherLeaderID)
suite.Equal(anotherLeaderID, segmentTask.ShardLeaderID())
// Test with zero value
segmentTask.SetShardLeaderID(0)
suite.Equal(int64(0), segmentTask.ShardLeaderID())
}

View File

@ -77,7 +77,7 @@ func (s *ExcludedSegments) CleanInvalid(ts uint64) {
for _, segmentID := range invalidExcludedInfos {
delete(s.segments, segmentID)
log.Info("remove segment from exclude info", zap.Int64("segmentID", segmentID))
log.Ctx(context.TODO()).Debug("remove segment from exclude info", zap.Int64("segmentID", segmentID))
}
s.lastClean.Store(time.Now())
}

View File

@ -301,6 +301,16 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
})
delegator.AddExcludedSegments(growingInfo)
flushedInfo := lo.SliceToMap(channel.GetFlushedSegmentIds(), func(id int64) (int64, uint64) {
return id, typeutil.MaxTimestamp
})
delegator.AddExcludedSegments(flushedInfo)
droppedInfo := lo.SliceToMap(channel.GetDroppedSegmentIds(), func(id int64) (int64, uint64) {
return id, typeutil.MaxTimestamp
})
delegator.AddExcludedSegments(droppedInfo)
defer func() {
if err != nil {
// remove legacy growing

View File

@ -21,11 +21,13 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
@ -309,6 +311,65 @@ func (s *BalanceTestSuit) TestNodeDown() {
}, 30*time.Second, 1*time.Second)
}
func (s *BalanceTestSuit) TestConcurrentBalanceChannelAndSegment() {
ctx := context.Background()
// speed up balance trigger
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.BalanceCheckInterval.Key, "500")
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.AutoBalanceInterval.Key, "500")
// init collection with 10 channel, each channel has 10 segment, each segment has 2000 row
// and load it with 1 replicas on 2 nodes.
name := "test_balance_" + funcutil.GenRandomStr()
s.initCollection(name, 1, 10, 10, 2000, 500)
stopSearchCh := make(chan struct{})
failCounter := atomic.NewInt64(0)
// keep query during balance
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stopSearchCh:
log.Info("stop search")
return
default:
queryResult, err := s.Cluster.Proxy.Query(ctx, &milvuspb.QueryRequest{
DbName: "",
CollectionName: name,
Expr: "",
OutputFields: []string{"count(*)"},
})
if err := merr.CheckRPCCall(queryResult.GetStatus(), err); err != nil {
log.Info("query failed", zap.Error(err))
failCounter.Inc()
}
}
}
}()
// then we add 1 query node, expected segment and channel will be move to new query node concurrently
qn1 := s.Cluster.AddQueryNode()
// wait until balance channel finished
s.Eventually(func() bool {
resp, err := qn1.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
log.Info("resp", zap.Any("channel", len(resp.Channels)), zap.Any("segments", len(resp.Segments)))
return len(resp.Channels) == 5
}, 30*time.Second, 1*time.Second)
// expected concurrent balance will execute successfully, shard serviceable won't be broken
close(stopSearchCh)
wg.Wait()
s.Equal(int64(0), failCounter.Load())
}
func TestBalance(t *testing.T) {
suite.Run(t, new(BalanceTestSuit))
}