fix: Segment may be released prematurely during balance channel (#42043)

issue: #41143
pr: #42090

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-05-29 18:36:35 +08:00 committed by GitHub
parent e567b38a74
commit d2ff390a52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 195 additions and 103 deletions

View File

@ -135,7 +135,7 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica
task.SetPriority(task.TaskPriorityNormal, tasks...)
ret = append(ret, tasks...)
redundancies = c.filterSegmentInUse(ctx, replica, redundancies)
redundancies = c.filterOutSegmentInUse(ctx, replica, redundancies)
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical)
task.SetReason("segment not exists in target", tasks...)
task.SetPriority(task.TaskPriorityNormal, tasks...)
@ -143,7 +143,7 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica
// compare inner dists to find repeated loaded segments
redundancies = c.findRepeatedSealedSegments(ctx, replica.GetID())
redundancies = c.filterExistedOnLeader(replica, redundancies)
redundancies = c.filterOutExistedOnLeader(replica, redundancies)
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical)
task.SetReason("redundancies of segment", tasks...)
// set deduplicate task priority to low, to avoid deduplicate task cancel balance task
@ -360,53 +360,60 @@ func (c *SegmentChecker) findRepeatedSealedSegments(ctx context.Context, replica
return segments
}
func (c *SegmentChecker) filterExistedOnLeader(replica *meta.Replica, segments []*meta.Segment) []*meta.Segment {
filtered := make([]*meta.Segment, 0, len(segments))
// for duplicated segment, we should release the one which is not serving on leader
func (c *SegmentChecker) filterOutExistedOnLeader(replica *meta.Replica, segments []*meta.Segment) []*meta.Segment {
notServing := make([]*meta.Segment, 0, len(segments))
for _, s := range segments {
leaderID, ok := c.dist.ChannelDistManager.GetShardLeader(replica, s.GetInsertChannel())
if !ok {
views := c.dist.LeaderViewManager.GetByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(s.GetInsertChannel()))
if len(views) == 0 {
continue
}
view := c.dist.LeaderViewManager.GetLeaderShardView(leaderID, s.GetInsertChannel())
if view == nil {
continue
servingOnLeader := false
for _, view := range views {
segInView, ok := view.Segments[s.GetID()]
if ok && segInView.NodeID == s.Node {
servingOnLeader = true
break
}
}
seg, ok := view.Segments[s.GetID()]
if ok && seg.NodeID == s.Node {
// if this segment is serving on leader, do not remove it for search available
continue
if !servingOnLeader {
notServing = append(notServing, s)
}
filtered = append(filtered, s)
}
return filtered
return notServing
}
func (c *SegmentChecker) filterSegmentInUse(ctx context.Context, replica *meta.Replica, segments []*meta.Segment) []*meta.Segment {
filtered := make([]*meta.Segment, 0, len(segments))
// for sealed segment which doesn't exist in target, we should release it after delegator has updated to latest readable version
func (c *SegmentChecker) filterOutSegmentInUse(ctx context.Context, replica *meta.Replica, segments []*meta.Segment) []*meta.Segment {
notUsed := make([]*meta.Segment, 0, len(segments))
for _, s := range segments {
leaderID, ok := c.dist.ChannelDistManager.GetShardLeader(replica, s.GetInsertChannel())
if !ok {
continue
}
view := c.dist.LeaderViewManager.GetLeaderShardView(leaderID, s.GetInsertChannel())
if view == nil {
continue
}
currentTargetVersion := c.targetMgr.GetCollectionTargetVersion(ctx, s.CollectionID, meta.CurrentTarget)
partition := c.meta.CollectionManager.GetPartition(ctx, s.PartitionID)
// if delegator has valid target version, and before it update to latest readable version, skip release it's sealed segment
// Notice: if syncTargetVersion stuck, segment on delegator won't be released
readableVersionNotUpdate := view.TargetVersion != initialTargetVersion && view.TargetVersion < currentTargetVersion
if partition != nil && readableVersionNotUpdate {
// leader view version hasn't been updated, segment maybe still in use
views := c.dist.LeaderViewManager.GetByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(s.GetInsertChannel()))
if len(views) == 0 {
continue
}
filtered = append(filtered, s)
stillInUseByDelegator := false
// if delegator has valid target version, and before it update to latest readable version, skip release it's sealed segment
for _, view := range views {
// Notice: if syncTargetVersion stuck, segment on delegator won't be released
readableVersionNotUpdate := view.TargetVersion != initialTargetVersion && view.TargetVersion < currentTargetVersion
if partition != nil && readableVersionNotUpdate {
// leader view version hasn't been updated, segment maybe still in use
stillInUseByDelegator = true
break
}
}
if !stillInUseByDelegator {
notUsed = append(notUsed, s)
}
}
return filtered
return notUsed
}
func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []*datapb.SegmentInfo, replica *meta.Replica) []task.Task {

View File

@ -516,76 +516,6 @@ func (suite *SegmentCheckerTestSuite) TestReleaseDirtySegments() {
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
}
func (suite *SegmentCheckerTestSuite) TestSkipReleaseSealedSegments() {
ctx := context.Background()
checker := suite.checker
collectionID := int64(1)
partitionID := int64(1)
// set meta
checker.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(collectionID, 1))
checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(1, 1))
checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(collectionID, partitionID))
checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(1, collectionID, []int64{1, 2}))
// set target
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
SeekPosition: &msgpb.MsgPosition{Timestamp: 10},
},
}
segments := []*datapb.SegmentInfo{}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
checker.targetMgr.UpdateCollectionNextTarget(ctx, collectionID)
checker.targetMgr.UpdateCollectionCurrentTarget(ctx, collectionID)
checker.targetMgr.UpdateCollectionNextTarget(ctx, collectionID)
readableVersion := checker.targetMgr.GetCollectionTargetVersion(ctx, collectionID, meta.CurrentTarget)
// test less target version exist on leader,meet segment doesn't exit in target, segment should be released
nodeID := int64(2)
segmentID := int64(1)
checker.dist.ChannelDistManager.Update(nodeID, utils.CreateTestChannel(collectionID, nodeID, segmentID, "test-insert-channel"))
view := utils.CreateTestLeaderView(nodeID, collectionID, "test-insert-channel", map[int64]int64{segmentID: 2}, map[int64]*meta.Segment{})
view.TargetVersion = readableVersion - 1
checker.dist.LeaderViewManager.Update(nodeID, view)
checker.dist.SegmentDistManager.Update(nodeID, utils.CreateTestSegment(collectionID, partitionID, segmentID, nodeID, 2, "test-insert-channel"))
tasks := checker.Check(context.TODO())
suite.Len(tasks, 0)
// test leader's target version update to latest,meet segment doesn't exit in target, segment should be released
view = utils.CreateTestLeaderView(nodeID, collectionID, "test-insert-channel", map[int64]int64{1: 3}, map[int64]*meta.Segment{})
view.TargetVersion = readableVersion
checker.dist.LeaderViewManager.Update(2, view)
tasks = checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Len(tasks[0].Actions(), 1)
action, ok := tasks[0].Actions()[0].(*task.SegmentAction)
suite.True(ok)
suite.EqualValues(1, tasks[0].ReplicaID())
suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(segmentID, action.GetSegmentID())
suite.EqualValues(nodeID, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
// test leader with initialTargetVersion, meet segment doesn't exit in target, segment should be released
view = utils.CreateTestLeaderView(nodeID, collectionID, "test-insert-channel", map[int64]int64{1: 3}, map[int64]*meta.Segment{})
view.TargetVersion = initialTargetVersion
checker.dist.LeaderViewManager.Update(2, view)
tasks = checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Len(tasks[0].Actions(), 1)
action, ok = tasks[0].Actions()[0].(*task.SegmentAction)
suite.True(ok)
suite.EqualValues(1, tasks[0].ReplicaID())
suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(segmentID, action.GetSegmentID())
suite.EqualValues(nodeID, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
}
func (suite *SegmentCheckerTestSuite) TestReleaseGrowingSegments() {
ctx := context.Background()
checker := suite.checker
@ -776,6 +706,161 @@ func (suite *SegmentCheckerTestSuite) TestReleaseDroppedSegments() {
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
}
func (suite *SegmentCheckerTestSuite) TestFilterOutExistedOnLeader() {
checker := suite.checker
// Setup test data
collectionID := int64(1)
partitionID := int64(1)
segmentID1 := int64(1)
segmentID2 := int64(2)
segmentID3 := int64(3)
nodeID1 := int64(1)
nodeID2 := int64(2)
channel := "test-insert-channel"
// Create test replica
replica := utils.CreateTestReplica(1, collectionID, []int64{nodeID1, nodeID2})
// Create test segments
segments := []*meta.Segment{
utils.CreateTestSegment(collectionID, partitionID, segmentID1, nodeID1, 1, channel),
utils.CreateTestSegment(collectionID, partitionID, segmentID2, nodeID2, 1, channel),
utils.CreateTestSegment(collectionID, partitionID, segmentID3, nodeID1, 1, channel),
}
// Test case 1: No leader views - should skip releasing segments
result := checker.filterOutExistedOnLeader(replica, segments)
suite.Equal(0, len(result), "Should return all segments when no leader views")
// Test case 2: Segment serving on leader - should be filtered out
leaderView1 := utils.CreateTestLeaderView(nodeID1, collectionID, channel,
map[int64]int64{segmentID1: nodeID1}, map[int64]*meta.Segment{})
checker.dist.LeaderViewManager.Update(nodeID1, leaderView1)
result = checker.filterOutExistedOnLeader(replica, segments)
suite.Len(result, 2, "Should filter out segment serving on leader")
// Check that segmentID1 is filtered out
for _, seg := range result {
suite.NotEqual(segmentID1, seg.GetID(), "Segment serving on leader should be filtered out")
}
// Test case 3: Multiple leader views with segments serving on different nodes
leaderView2 := utils.CreateTestLeaderView(nodeID2, collectionID, channel,
map[int64]int64{segmentID2: nodeID2}, map[int64]*meta.Segment{})
checker.dist.LeaderViewManager.Update(nodeID2, leaderView2)
result = checker.filterOutExistedOnLeader(replica, segments)
suite.Len(result, 1, "Should filter out segments serving on their respective leaders")
suite.Equal(segmentID3, result[0].GetID(), "Only non-serving segment should remain")
// Test case 4: Segment exists in leader view but on different node - should not be filtered
leaderView3 := utils.CreateTestLeaderView(nodeID1, collectionID, channel,
map[int64]int64{segmentID3: nodeID2}, map[int64]*meta.Segment{}) // segmentID3 exists but on nodeID2, not nodeID1
checker.dist.LeaderViewManager.Update(nodeID1, leaderView3)
result = checker.filterOutExistedOnLeader(replica, []*meta.Segment{segments[2]}) // Only test segmentID3
suite.Len(result, 1, "Segment not serving on its actual node should not be filtered")
}
func (suite *SegmentCheckerTestSuite) TestFilterOutSegmentInUse() {
ctx := context.Background()
checker := suite.checker
// Setup test data
collectionID := int64(1)
partitionID := int64(1)
segmentID1 := int64(1)
segmentID2 := int64(2)
segmentID3 := int64(3)
nodeID1 := int64(1)
nodeID2 := int64(2)
channel := "test-insert-channel"
// Setup meta data
checker.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(collectionID, 1))
checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(collectionID, partitionID))
// Create test replica
replica := utils.CreateTestReplica(1, collectionID, []int64{nodeID1, nodeID2})
// Create test segments
segments := []*meta.Segment{
utils.CreateTestSegment(collectionID, partitionID, segmentID1, nodeID1, 1, channel),
utils.CreateTestSegment(collectionID, partitionID, segmentID2, nodeID2, 1, channel),
utils.CreateTestSegment(collectionID, partitionID, segmentID3, nodeID1, 1, channel),
}
// Setup target to have a current version
channels := []*datapb.VchannelInfo{
{
CollectionID: collectionID,
ChannelName: channel,
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(
channels, []*datapb.SegmentInfo{}, nil).Maybe()
checker.targetMgr.UpdateCollectionCurrentTarget(ctx, collectionID)
currentTargetVersion := checker.targetMgr.GetCollectionTargetVersion(ctx, collectionID, meta.CurrentTarget)
// Test case 1: No leader views - should skip releasing segments
result := checker.filterOutSegmentInUse(ctx, replica, segments)
suite.Equal(0, len(result), "Should return all segments when no leader views")
// Test case 2: Leader view with outdated target version - segment should be filtered (still in use)
leaderView1 := utils.CreateTestLeaderView(nodeID1, collectionID, channel,
map[int64]int64{}, map[int64]*meta.Segment{})
leaderView1.TargetVersion = currentTargetVersion - 1 // Outdated version
checker.dist.LeaderViewManager.Update(nodeID1, leaderView1)
result = checker.filterOutSegmentInUse(ctx, replica, []*meta.Segment{segments[0]})
suite.Len(result, 0, "Segment should be filtered out when delegator hasn't updated to latest version")
// Test case 3: Leader view with current target version - segment should not be filtered
leaderView2 := utils.CreateTestLeaderView(nodeID1, collectionID, channel,
map[int64]int64{}, map[int64]*meta.Segment{})
leaderView2.TargetVersion = currentTargetVersion
checker.dist.LeaderViewManager.Update(nodeID1, leaderView2)
result = checker.filterOutSegmentInUse(ctx, replica, []*meta.Segment{segments[0]})
suite.Len(result, 1, "Segment should not be filtered when delegator has updated to latest version")
// Test case 4: Leader view with initial target version - segment should not be filtered
leaderView3 := utils.CreateTestLeaderView(nodeID2, collectionID, channel,
map[int64]int64{}, map[int64]*meta.Segment{})
leaderView3.TargetVersion = initialTargetVersion
checker.dist.LeaderViewManager.Update(nodeID2, leaderView3)
result = checker.filterOutSegmentInUse(ctx, replica, []*meta.Segment{segments[1]})
suite.Len(result, 1, "Segment should not be filtered when leader has initial target version")
// Test case 5: Multiple leader views with mixed versions - segment should be filtered (still in use)
leaderView4 := utils.CreateTestLeaderView(nodeID1, collectionID, channel,
map[int64]int64{}, map[int64]*meta.Segment{})
leaderView4.TargetVersion = currentTargetVersion - 1 // Outdated
leaderView5 := utils.CreateTestLeaderView(nodeID2, collectionID, channel,
map[int64]int64{}, map[int64]*meta.Segment{})
leaderView5.TargetVersion = currentTargetVersion // Up to date
checker.dist.LeaderViewManager.Update(nodeID1, leaderView4)
checker.dist.LeaderViewManager.Update(nodeID2, leaderView5)
testSegments := []*meta.Segment{
utils.CreateTestSegment(collectionID, partitionID, segmentID1, nodeID1, 1, channel),
utils.CreateTestSegment(collectionID, partitionID, segmentID2, nodeID2, 1, channel),
}
result = checker.filterOutSegmentInUse(ctx, replica, testSegments)
suite.Len(result, 0, "Should release all segments when any delegator hasn't updated")
// Test case 6: Partition is nil - should release all segments (no partition info)
checker.meta.CollectionManager.RemovePartition(ctx, partitionID)
result = checker.filterOutSegmentInUse(ctx, replica, []*meta.Segment{segments[0]})
suite.Len(result, 0, "Should release all segments when partition is nil")
}
func TestSegmentCheckerSuite(t *testing.T) {
suite.Run(t, new(SegmentCheckerTestSuite))
}