fix: Fix L0 segment duplicate load task generation during channel balance (#44700)

issue: #44699
Fix the issue where L0 segment checking logic incorrectly identifies L0
segments as missing when they exist on multiple delegators during
channel balance process, which blocks sealed segment loading and target
progression.

Changes include:
- Replace GetLatestShardLeaderByFilter with GetByFilter to check all
delegators instead of only the latest leader
- Iterate through all delegator views to identify which ones lack the L0
segment

The original logic only checked the latest shard leader, causing false
positive detection of missing L0 segments when they actually exist on
other delegators in the same channel during balance operations. This led
to continuous generation of duplicate L0 segment load tasks, preventing
normal sealed segment loading flow.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-10-11 10:04:00 +08:00 committed by GitHub
parent 03223ddb8b
commit cbe2761e99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 241 additions and 112 deletions

View File

@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
const initialTargetVersion = int64(0)
@ -267,20 +268,29 @@ func (c *SegmentChecker) getSealedSegmentDiff(
}
isSegmentLack := func(segment *datapb.SegmentInfo) bool {
node, existInDist := distMap[segment.ID]
if segment.GetLevel() == datapb.SegmentLevel_L0 {
// the L0 segments have to been in the same node as the channel watched
leader := c.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(segment.GetInsertChannel()))
// Note: In the original design, all segments are forwarded through serviceable delegators to target workers for loading.
// However, L0 segments are always an exception and need to be loaded directly on the delegator.
// during balance channel, we should check each delegator's view to see if it lacks the l0 segment
// cause same channel may have different delegators during balance channel progress, and which last for a long time if memory is not enough
views := c.dist.LeaderViewManager.GetByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(segment.GetInsertChannel()))
if len(views) == 0 {
msg := "no shard leader for the l0 segment to execute loading"
err := merr.WrapErrChannelNotFound(segment.GetInsertChannel(), "shard delegator not found")
log.Warn(msg, zap.Error(err))
return false
}
// if the leader node's version doesn't match load l0 segment's requirement, skip it
if leader != nil && checkLeaderVersion(leader, segment.ID) {
l0WithWrongLocation := node != leader.ID
return !existInDist || l0WithWrongLocation
// find delegator which lack of l0 segment
for _, view := range views {
if _, ok := view.Segments[segment.ID]; !ok && checkLeaderVersion(view, segment.ID) {
return true
}
}
return false
}
_, existInDist := distMap[segment.ID]
return !existInDist
}

View File

@ -872,115 +872,234 @@ func TestSegmentCheckerSuite(t *testing.T) {
}
func TestGetSealedSegmentDiff_WithL0SegmentCheck(t *testing.T) {
mockey.PatchConvey("TestGetSealedSegmentDiff_WithL0SegmentCheck", t, func() {
// Test case 1: L0 segment exists
t.Run("L0_segment_exists", func(t *testing.T) {
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
return true // L0 segment exists
// Test case 1: L0 segment exists
t.Run("L0_segment_exists", func(t *testing.T) {
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
return true // L0 segment exists
}
// Create test segments
segments := []*datapb.SegmentInfo{
{
ID: 1,
CollectionID: 1,
PartitionID: 1,
Level: datapb.SegmentLevel_L0,
},
{
ID: 2,
CollectionID: 1,
PartitionID: 1,
Level: datapb.SegmentLevel_L1,
},
}
// Filter L0 segments with existence check
var level0Segments []*datapb.SegmentInfo
for _, segment := range segments {
if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) {
level0Segments = append(level0Segments, segment)
}
}
// Create test segments
segments := []*datapb.SegmentInfo{
{
ID: 1,
CollectionID: 1,
PartitionID: 1,
Level: datapb.SegmentLevel_L0,
},
{
ID: 2,
CollectionID: 1,
PartitionID: 1,
Level: datapb.SegmentLevel_L1,
},
// Verify: L0 segment should be included
assert.Equal(t, 1, len(level0Segments))
assert.Equal(t, int64(1), level0Segments[0].GetID())
})
// Test case 2: L0 segment does not exist
t.Run("L0_segment_not_exists", func(t *testing.T) {
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
return false // L0 segment does not exist
}
// Create test segments
segments := []*datapb.SegmentInfo{
{
ID: 1,
CollectionID: 1,
PartitionID: 1,
Level: datapb.SegmentLevel_L0,
},
{
ID: 2,
CollectionID: 1,
PartitionID: 1,
Level: datapb.SegmentLevel_L1,
},
}
// Filter L0 segments with existence check
var level0Segments []*datapb.SegmentInfo
for _, segment := range segments {
if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) {
level0Segments = append(level0Segments, segment)
}
}
// Filter L0 segments with existence check
var level0Segments []*datapb.SegmentInfo
for _, segment := range segments {
if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) {
level0Segments = append(level0Segments, segment)
}
// Verify: L0 segment should be filtered out
assert.Equal(t, 0, len(level0Segments))
})
// Test case 3: Mixed L0 segments, only some exist
t.Run("Mixed_L0_segments", func(t *testing.T) {
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
return segmentID == 1 // Only segment 1 exists
}
// Create test segments
segments := []*datapb.SegmentInfo{
{
ID: 1,
CollectionID: 1,
PartitionID: 1,
Level: datapb.SegmentLevel_L0,
},
{
ID: 2,
CollectionID: 1,
PartitionID: 1,
Level: datapb.SegmentLevel_L0,
},
{
ID: 3,
CollectionID: 1,
PartitionID: 1,
Level: datapb.SegmentLevel_L1,
},
}
// Filter L0 segments with existence check
var level0Segments []*datapb.SegmentInfo
for _, segment := range segments {
if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) {
level0Segments = append(level0Segments, segment)
}
}
// Verify: L0 segment should be included
assert.Equal(t, 1, len(level0Segments))
assert.Equal(t, int64(1), level0Segments[0].GetID())
})
// Test case 2: L0 segment does not exist
t.Run("L0_segment_not_exists", func(t *testing.T) {
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
return false // L0 segment does not exist
}
// Create test segments
segments := []*datapb.SegmentInfo{
{
ID: 1,
CollectionID: 1,
PartitionID: 1,
Level: datapb.SegmentLevel_L0,
},
{
ID: 2,
CollectionID: 1,
PartitionID: 1,
Level: datapb.SegmentLevel_L1,
},
}
// Filter L0 segments with existence check
var level0Segments []*datapb.SegmentInfo
for _, segment := range segments {
if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) {
level0Segments = append(level0Segments, segment)
}
}
// Verify: L0 segment should be filtered out
assert.Equal(t, 0, len(level0Segments))
})
// Test case 3: Mixed L0 segments, only some exist
t.Run("Mixed_L0_segments", func(t *testing.T) {
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
return segmentID == 1 // Only segment 1 exists
}
// Create test segments
segments := []*datapb.SegmentInfo{
{
ID: 1,
CollectionID: 1,
PartitionID: 1,
Level: datapb.SegmentLevel_L0,
},
{
ID: 2,
CollectionID: 1,
PartitionID: 1,
Level: datapb.SegmentLevel_L0,
},
{
ID: 3,
CollectionID: 1,
PartitionID: 1,
Level: datapb.SegmentLevel_L1,
},
}
// Filter L0 segments with existence check
var level0Segments []*datapb.SegmentInfo
for _, segment := range segments {
if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) {
level0Segments = append(level0Segments, segment)
}
}
// Verify: Only existing L0 segment should be included
assert.Equal(t, 1, len(level0Segments))
assert.Equal(t, int64(1), level0Segments[0].GetID())
})
// Verify: Only existing L0 segment should be included
assert.Equal(t, 1, len(level0Segments))
assert.Equal(t, int64(1), level0Segments[0].GetID())
})
}
// createTestSegmentChecker creates a test SegmentChecker with mocked dependencies
func createTestSegmentChecker() (*SegmentChecker, *meta.Meta, *meta.DistributionManager, *session.NodeManager) {
nodeMgr := session.NewNodeManager()
metaManager := meta.NewMeta(nil, nil, nodeMgr)
distManager := meta.NewDistributionManager()
targetManager := meta.NewTargetManager(nil, metaManager)
getBalancerFunc := func() balance.Balance { return balance.NewScoreBasedBalancer(nil, nil, nil, nil, nil) }
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
return true
}
checker := NewSegmentChecker(metaManager, distManager, targetManager, nodeMgr, getBalancerFunc, checkSegmentExist)
return checker, metaManager, distManager, nodeMgr
}
func TestGetSealedSegmentDiff_L0SegmentMultipleDelegators(t *testing.T) {
defer mockey.UnPatchAll()
ctx := context.Background()
// Setup test data
collectionID := int64(1)
replicaID := int64(1)
partitionID := int64(1)
segmentID := int64(1)
channel := "test-insert-channel"
nodeID1 := int64(1)
nodeID2 := int64(2)
// Create test components
checker, _, _, _ := createTestSegmentChecker()
// Mock GetSealedSegmentsByCollection to return L0 segment
mockey.Mock((*meta.TargetManager).GetSealedSegmentsByCollection).To(func(ctx context.Context, collectionID int64, scope meta.TargetScope) map[int64]*datapb.SegmentInfo {
if scope == meta.CurrentTarget {
return map[int64]*datapb.SegmentInfo{
segmentID: {
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channel,
Level: datapb.SegmentLevel_L0,
},
}
}
return make(map[int64]*datapb.SegmentInfo)
}).Build()
// Mock IsNextTargetExist to return false
mockey.Mock((*meta.TargetManager).IsNextTargetExist).Return(false).Build()
// Mock meta manager methods to avoid direct meta manipulation
// Mock Get method to return the test replica
testReplica := utils.CreateTestReplica(replicaID, collectionID, []int64{nodeID1, nodeID2})
mockey.Mock((*meta.ReplicaManager).Get).To(func(ctx context.Context, rid int64) *meta.Replica {
if rid == replicaID {
return testReplica
}
return nil
}).Build()
// Mock GetCollection to return test collection
testCollection := utils.CreateTestCollection(collectionID, 1)
mockey.Mock((*meta.Meta).GetCollection).To(func(ctx context.Context, cid int64) *meta.Collection {
if cid == collectionID {
return testCollection
}
return nil
}).Build()
// Mock NodeManager Get method to return compatible node versions
mockey.Mock((*session.NodeManager).Get).To(func(nodeID int64) *session.NodeInfo {
if nodeID == nodeID1 || nodeID == nodeID2 {
return session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID,
Address: "localhost",
Hostname: "localhost",
Version: common.Version,
})
}
return nil
}).Build()
// Mock SegmentDistManager GetByFilter to return empty distribution initially
mockey.Mock((*meta.SegmentDistManager).GetByFilter).Return([]*meta.Segment{}).Build()
// Test case 1: Multiple delegators, one lacks the L0 segment
leaderView1 := utils.CreateTestLeaderView(nodeID1, collectionID, channel,
map[int64]int64{segmentID: nodeID1}, map[int64]*meta.Segment{}) // Has the segment
leaderView2 := utils.CreateTestLeaderView(nodeID2, collectionID, channel,
map[int64]int64{}, map[int64]*meta.Segment{}) // Missing the segment
// Mock LeaderViewManager GetByFilter to return leader views for L0 segment checking
mockGetByFilter := mockey.Mock((*meta.LeaderViewManager).GetByFilter).To(func(filters ...meta.LeaderViewFilter) []*meta.LeaderView {
// Return both leader views for L0 segment checking
return []*meta.LeaderView{leaderView1, leaderView2}
}).Build()
toLoad, toRelease := checker.getSealedSegmentDiff(ctx, collectionID, replicaID)
mockGetByFilter.Release()
// Verify: L0 segment should be loaded for the delegator that lacks it
assert.Len(t, toLoad, 1, "Should load L0 segment for delegator that lacks it")
assert.Equal(t, segmentID, toLoad[0].GetID(), "Should load the correct L0 segment")
assert.Empty(t, toRelease, "Should not release any segments")
// Test case 2: All delegators have the L0 segment
leaderView2WithSegment := utils.CreateTestLeaderView(nodeID2, collectionID, channel,
map[int64]int64{segmentID: nodeID2}, map[int64]*meta.Segment{}) // Now has the segment
// Update the mock to return leader views where both have the segment
mockey.Mock((*meta.LeaderViewManager).GetByFilter).To(func(filters ...meta.LeaderViewFilter) []*meta.LeaderView {
// Return both leader views, both now have the L0 segment
return []*meta.LeaderView{leaderView1, leaderView2WithSegment}
}).Build()
toLoad, toRelease = checker.getSealedSegmentDiff(ctx, collectionID, replicaID)
// Verify: No segments should be loaded when all delegators have the L0 segment
assert.Empty(t, toLoad, "Should not load L0 segment when all delegators have it")
assert.Empty(t, toRelease, "Should not release any segments")
}