From d2ff390a52bc0eb06d1965ce080f8b3293be7c9b Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 29 May 2025 18:36:35 +0800 Subject: [PATCH] fix: Segment may be released prematurely during balance channel (#42043) issue: #41143 pr: #42090 Signed-off-by: Wei Liu --- .../querycoordv2/checkers/segment_checker.go | 73 +++--- .../checkers/segment_checker_test.go | 225 ++++++++++++------ 2 files changed, 195 insertions(+), 103 deletions(-) diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 50e638dd35..cd2d242130 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -135,7 +135,7 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica task.SetPriority(task.TaskPriorityNormal, tasks...) ret = append(ret, tasks...) - redundancies = c.filterSegmentInUse(ctx, replica, redundancies) + redundancies = c.filterOutSegmentInUse(ctx, replica, redundancies) tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical) task.SetReason("segment not exists in target", tasks...) task.SetPriority(task.TaskPriorityNormal, tasks...) @@ -143,7 +143,7 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica // compare inner dists to find repeated loaded segments redundancies = c.findRepeatedSealedSegments(ctx, replica.GetID()) - redundancies = c.filterExistedOnLeader(replica, redundancies) + redundancies = c.filterOutExistedOnLeader(replica, redundancies) tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical) task.SetReason("redundancies of segment", tasks...) // set deduplicate task priority to low, to avoid deduplicate task cancel balance task @@ -360,53 +360,60 @@ func (c *SegmentChecker) findRepeatedSealedSegments(ctx context.Context, replica return segments } -func (c *SegmentChecker) filterExistedOnLeader(replica *meta.Replica, segments []*meta.Segment) []*meta.Segment { - filtered := make([]*meta.Segment, 0, len(segments)) +// for duplicated segment, we should release the one which is not serving on leader +func (c *SegmentChecker) filterOutExistedOnLeader(replica *meta.Replica, segments []*meta.Segment) []*meta.Segment { + notServing := make([]*meta.Segment, 0, len(segments)) for _, s := range segments { - leaderID, ok := c.dist.ChannelDistManager.GetShardLeader(replica, s.GetInsertChannel()) - if !ok { + views := c.dist.LeaderViewManager.GetByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(s.GetInsertChannel())) + if len(views) == 0 { continue } - view := c.dist.LeaderViewManager.GetLeaderShardView(leaderID, s.GetInsertChannel()) - if view == nil { - continue + servingOnLeader := false + for _, view := range views { + segInView, ok := view.Segments[s.GetID()] + if ok && segInView.NodeID == s.Node { + servingOnLeader = true + break + } } - seg, ok := view.Segments[s.GetID()] - if ok && seg.NodeID == s.Node { - // if this segment is serving on leader, do not remove it for search available - continue + + if !servingOnLeader { + notServing = append(notServing, s) } - filtered = append(filtered, s) } - return filtered + return notServing } -func (c *SegmentChecker) filterSegmentInUse(ctx context.Context, replica *meta.Replica, segments []*meta.Segment) []*meta.Segment { - filtered := make([]*meta.Segment, 0, len(segments)) +// for sealed segment which doesn't exist in target, we should release it after delegator has updated to latest readable version +func (c *SegmentChecker) filterOutSegmentInUse(ctx context.Context, replica *meta.Replica, segments []*meta.Segment) []*meta.Segment { + notUsed := make([]*meta.Segment, 0, len(segments)) for _, s := range segments { - leaderID, ok := c.dist.ChannelDistManager.GetShardLeader(replica, s.GetInsertChannel()) - if !ok { - continue - } - - view := c.dist.LeaderViewManager.GetLeaderShardView(leaderID, s.GetInsertChannel()) - if view == nil { - continue - } currentTargetVersion := c.targetMgr.GetCollectionTargetVersion(ctx, s.CollectionID, meta.CurrentTarget) partition := c.meta.CollectionManager.GetPartition(ctx, s.PartitionID) - // if delegator has valid target version, and before it update to latest readable version, skip release it's sealed segment - // Notice: if syncTargetVersion stuck, segment on delegator won't be released - readableVersionNotUpdate := view.TargetVersion != initialTargetVersion && view.TargetVersion < currentTargetVersion - if partition != nil && readableVersionNotUpdate { - // leader view version hasn't been updated, segment maybe still in use + views := c.dist.LeaderViewManager.GetByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(s.GetInsertChannel())) + if len(views) == 0 { continue } - filtered = append(filtered, s) + + stillInUseByDelegator := false + // if delegator has valid target version, and before it update to latest readable version, skip release it's sealed segment + for _, view := range views { + // Notice: if syncTargetVersion stuck, segment on delegator won't be released + readableVersionNotUpdate := view.TargetVersion != initialTargetVersion && view.TargetVersion < currentTargetVersion + if partition != nil && readableVersionNotUpdate { + // leader view version hasn't been updated, segment maybe still in use + stillInUseByDelegator = true + break + } + } + + if !stillInUseByDelegator { + notUsed = append(notUsed, s) + } } - return filtered + return notUsed } func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []*datapb.SegmentInfo, replica *meta.Replica) []task.Task { diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index a94a0e5727..6abc30cc7e 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -516,76 +516,6 @@ func (suite *SegmentCheckerTestSuite) TestReleaseDirtySegments() { suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) } -func (suite *SegmentCheckerTestSuite) TestSkipReleaseSealedSegments() { - ctx := context.Background() - checker := suite.checker - - collectionID := int64(1) - partitionID := int64(1) - // set meta - checker.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(collectionID, 1)) - checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(1, 1)) - checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(collectionID, partitionID)) - checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(1, collectionID, []int64{1, 2})) - - // set target - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - SeekPosition: &msgpb.MsgPosition{Timestamp: 10}, - }, - } - segments := []*datapb.SegmentInfo{} - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - checker.targetMgr.UpdateCollectionNextTarget(ctx, collectionID) - checker.targetMgr.UpdateCollectionCurrentTarget(ctx, collectionID) - checker.targetMgr.UpdateCollectionNextTarget(ctx, collectionID) - readableVersion := checker.targetMgr.GetCollectionTargetVersion(ctx, collectionID, meta.CurrentTarget) - - // test less target version exist on leader,meet segment doesn't exit in target, segment should be released - nodeID := int64(2) - segmentID := int64(1) - checker.dist.ChannelDistManager.Update(nodeID, utils.CreateTestChannel(collectionID, nodeID, segmentID, "test-insert-channel")) - view := utils.CreateTestLeaderView(nodeID, collectionID, "test-insert-channel", map[int64]int64{segmentID: 2}, map[int64]*meta.Segment{}) - view.TargetVersion = readableVersion - 1 - checker.dist.LeaderViewManager.Update(nodeID, view) - checker.dist.SegmentDistManager.Update(nodeID, utils.CreateTestSegment(collectionID, partitionID, segmentID, nodeID, 2, "test-insert-channel")) - tasks := checker.Check(context.TODO()) - suite.Len(tasks, 0) - - // test leader's target version update to latest,meet segment doesn't exit in target, segment should be released - view = utils.CreateTestLeaderView(nodeID, collectionID, "test-insert-channel", map[int64]int64{1: 3}, map[int64]*meta.Segment{}) - view.TargetVersion = readableVersion - checker.dist.LeaderViewManager.Update(2, view) - tasks = checker.Check(context.TODO()) - suite.Len(tasks, 1) - suite.Len(tasks[0].Actions(), 1) - action, ok := tasks[0].Actions()[0].(*task.SegmentAction) - suite.True(ok) - suite.EqualValues(1, tasks[0].ReplicaID()) - suite.Equal(task.ActionTypeReduce, action.Type()) - suite.EqualValues(segmentID, action.GetSegmentID()) - suite.EqualValues(nodeID, action.Node()) - suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) - - // test leader with initialTargetVersion, meet segment doesn't exit in target, segment should be released - view = utils.CreateTestLeaderView(nodeID, collectionID, "test-insert-channel", map[int64]int64{1: 3}, map[int64]*meta.Segment{}) - view.TargetVersion = initialTargetVersion - checker.dist.LeaderViewManager.Update(2, view) - tasks = checker.Check(context.TODO()) - suite.Len(tasks, 1) - suite.Len(tasks[0].Actions(), 1) - action, ok = tasks[0].Actions()[0].(*task.SegmentAction) - suite.True(ok) - suite.EqualValues(1, tasks[0].ReplicaID()) - suite.Equal(task.ActionTypeReduce, action.Type()) - suite.EqualValues(segmentID, action.GetSegmentID()) - suite.EqualValues(nodeID, action.Node()) - suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) -} - func (suite *SegmentCheckerTestSuite) TestReleaseGrowingSegments() { ctx := context.Background() checker := suite.checker @@ -776,6 +706,161 @@ func (suite *SegmentCheckerTestSuite) TestReleaseDroppedSegments() { suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) } +func (suite *SegmentCheckerTestSuite) TestFilterOutExistedOnLeader() { + checker := suite.checker + + // Setup test data + collectionID := int64(1) + partitionID := int64(1) + segmentID1 := int64(1) + segmentID2 := int64(2) + segmentID3 := int64(3) + nodeID1 := int64(1) + nodeID2 := int64(2) + channel := "test-insert-channel" + + // Create test replica + replica := utils.CreateTestReplica(1, collectionID, []int64{nodeID1, nodeID2}) + + // Create test segments + segments := []*meta.Segment{ + utils.CreateTestSegment(collectionID, partitionID, segmentID1, nodeID1, 1, channel), + utils.CreateTestSegment(collectionID, partitionID, segmentID2, nodeID2, 1, channel), + utils.CreateTestSegment(collectionID, partitionID, segmentID3, nodeID1, 1, channel), + } + + // Test case 1: No leader views - should skip releasing segments + result := checker.filterOutExistedOnLeader(replica, segments) + suite.Equal(0, len(result), "Should return all segments when no leader views") + + // Test case 2: Segment serving on leader - should be filtered out + leaderView1 := utils.CreateTestLeaderView(nodeID1, collectionID, channel, + map[int64]int64{segmentID1: nodeID1}, map[int64]*meta.Segment{}) + checker.dist.LeaderViewManager.Update(nodeID1, leaderView1) + + result = checker.filterOutExistedOnLeader(replica, segments) + suite.Len(result, 2, "Should filter out segment serving on leader") + + // Check that segmentID1 is filtered out + for _, seg := range result { + suite.NotEqual(segmentID1, seg.GetID(), "Segment serving on leader should be filtered out") + } + + // Test case 3: Multiple leader views with segments serving on different nodes + leaderView2 := utils.CreateTestLeaderView(nodeID2, collectionID, channel, + map[int64]int64{segmentID2: nodeID2}, map[int64]*meta.Segment{}) + checker.dist.LeaderViewManager.Update(nodeID2, leaderView2) + + result = checker.filterOutExistedOnLeader(replica, segments) + suite.Len(result, 1, "Should filter out segments serving on their respective leaders") + suite.Equal(segmentID3, result[0].GetID(), "Only non-serving segment should remain") + + // Test case 4: Segment exists in leader view but on different node - should not be filtered + leaderView3 := utils.CreateTestLeaderView(nodeID1, collectionID, channel, + map[int64]int64{segmentID3: nodeID2}, map[int64]*meta.Segment{}) // segmentID3 exists but on nodeID2, not nodeID1 + checker.dist.LeaderViewManager.Update(nodeID1, leaderView3) + + result = checker.filterOutExistedOnLeader(replica, []*meta.Segment{segments[2]}) // Only test segmentID3 + suite.Len(result, 1, "Segment not serving on its actual node should not be filtered") +} + +func (suite *SegmentCheckerTestSuite) TestFilterOutSegmentInUse() { + ctx := context.Background() + checker := suite.checker + + // Setup test data + collectionID := int64(1) + partitionID := int64(1) + segmentID1 := int64(1) + segmentID2 := int64(2) + segmentID3 := int64(3) + nodeID1 := int64(1) + nodeID2 := int64(2) + channel := "test-insert-channel" + + // Setup meta data + checker.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(collectionID, 1)) + checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(collectionID, partitionID)) + + // Create test replica + replica := utils.CreateTestReplica(1, collectionID, []int64{nodeID1, nodeID2}) + + // Create test segments + segments := []*meta.Segment{ + utils.CreateTestSegment(collectionID, partitionID, segmentID1, nodeID1, 1, channel), + utils.CreateTestSegment(collectionID, partitionID, segmentID2, nodeID2, 1, channel), + utils.CreateTestSegment(collectionID, partitionID, segmentID3, nodeID1, 1, channel), + } + + // Setup target to have a current version + channels := []*datapb.VchannelInfo{ + { + CollectionID: collectionID, + ChannelName: channel, + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return( + channels, []*datapb.SegmentInfo{}, nil).Maybe() + checker.targetMgr.UpdateCollectionCurrentTarget(ctx, collectionID) + currentTargetVersion := checker.targetMgr.GetCollectionTargetVersion(ctx, collectionID, meta.CurrentTarget) + + // Test case 1: No leader views - should skip releasing segments + result := checker.filterOutSegmentInUse(ctx, replica, segments) + suite.Equal(0, len(result), "Should return all segments when no leader views") + + // Test case 2: Leader view with outdated target version - segment should be filtered (still in use) + leaderView1 := utils.CreateTestLeaderView(nodeID1, collectionID, channel, + map[int64]int64{}, map[int64]*meta.Segment{}) + leaderView1.TargetVersion = currentTargetVersion - 1 // Outdated version + checker.dist.LeaderViewManager.Update(nodeID1, leaderView1) + + result = checker.filterOutSegmentInUse(ctx, replica, []*meta.Segment{segments[0]}) + suite.Len(result, 0, "Segment should be filtered out when delegator hasn't updated to latest version") + + // Test case 3: Leader view with current target version - segment should not be filtered + leaderView2 := utils.CreateTestLeaderView(nodeID1, collectionID, channel, + map[int64]int64{}, map[int64]*meta.Segment{}) + leaderView2.TargetVersion = currentTargetVersion + checker.dist.LeaderViewManager.Update(nodeID1, leaderView2) + + result = checker.filterOutSegmentInUse(ctx, replica, []*meta.Segment{segments[0]}) + suite.Len(result, 1, "Segment should not be filtered when delegator has updated to latest version") + + // Test case 4: Leader view with initial target version - segment should not be filtered + leaderView3 := utils.CreateTestLeaderView(nodeID2, collectionID, channel, + map[int64]int64{}, map[int64]*meta.Segment{}) + leaderView3.TargetVersion = initialTargetVersion + checker.dist.LeaderViewManager.Update(nodeID2, leaderView3) + + result = checker.filterOutSegmentInUse(ctx, replica, []*meta.Segment{segments[1]}) + suite.Len(result, 1, "Segment should not be filtered when leader has initial target version") + + // Test case 5: Multiple leader views with mixed versions - segment should be filtered (still in use) + leaderView4 := utils.CreateTestLeaderView(nodeID1, collectionID, channel, + map[int64]int64{}, map[int64]*meta.Segment{}) + leaderView4.TargetVersion = currentTargetVersion - 1 // Outdated + + leaderView5 := utils.CreateTestLeaderView(nodeID2, collectionID, channel, + map[int64]int64{}, map[int64]*meta.Segment{}) + leaderView5.TargetVersion = currentTargetVersion // Up to date + + checker.dist.LeaderViewManager.Update(nodeID1, leaderView4) + checker.dist.LeaderViewManager.Update(nodeID2, leaderView5) + + testSegments := []*meta.Segment{ + utils.CreateTestSegment(collectionID, partitionID, segmentID1, nodeID1, 1, channel), + utils.CreateTestSegment(collectionID, partitionID, segmentID2, nodeID2, 1, channel), + } + + result = checker.filterOutSegmentInUse(ctx, replica, testSegments) + suite.Len(result, 0, "Should release all segments when any delegator hasn't updated") + + // Test case 6: Partition is nil - should release all segments (no partition info) + checker.meta.CollectionManager.RemovePartition(ctx, partitionID) + result = checker.filterOutSegmentInUse(ctx, replica, []*meta.Segment{segments[0]}) + suite.Len(result, 0, "Should release all segments when partition is nil") +} + func TestSegmentCheckerSuite(t *testing.T) { suite.Run(t, new(SegmentCheckerTestSuite)) }