diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 659c940378..38c8aff2e2 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/funcutil" ) const initialTargetVersion = int64(0) @@ -203,6 +204,12 @@ func (c *SegmentChecker) getGrowingSegmentDiff(collectionID int64, if channel, ok := currentTargetChannelMap[segment.InsertChannel]; ok { timestampInSegment := segment.GetStartPosition().GetTimestamp() timestampInTarget := channel.GetSeekPosition().GetTimestamp() + // release growing segment if in dropped segment list + if funcutil.SliceContain(channel.GetDroppedSegmentIds(), segment.GetID()) { + log.Info("growing segment exists in dropped segment list, release it", zap.Int64("segmentID", segment.GetID())) + toRelease = append(toRelease, segment) + continue + } // filter toRelease which seekPosition is newer than next target dmChannel if timestampInSegment < timestampInTarget { log.Info("growing segment not exist in target, so release it", diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 5944e8e9b0..228013c48d 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -646,6 +646,62 @@ func (suite *SegmentCheckerTestSuite) TestReleaseGrowingSegments() { suite.Equal(tasks[1].Priority(), task.TaskPriorityNormal) } +func (suite *SegmentCheckerTestSuite) TestReleaseCompactedGrowingSegments() { + checker := suite.checker + + checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) + checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + + segments := []*datapb.SegmentInfo{ + { + ID: 3, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + SeekPosition: &msgpb.MsgPosition{Timestamp: 10}, + DroppedSegmentIds: []int64{4}, + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + checker.targetMgr.UpdateCollectionNextTarget(int64(1)) + checker.targetMgr.UpdateCollectionCurrentTarget(int64(1)) + checker.targetMgr.UpdateCollectionNextTarget(int64(1)) + + growingSegments := make(map[int64]*meta.Segment) + // segment start pos after chekcpoint + growingSegments[4] = utils.CreateTestSegment(1, 1, 4, 2, 1, "test-insert-channel") + growingSegments[4].SegmentInfo.StartPosition = &msgpb.MsgPosition{Timestamp: 11} + + dmChannel := utils.CreateTestChannel(1, 2, 1, "test-insert-channel") + dmChannel.UnflushedSegmentIds = []int64{2, 3} + checker.dist.ChannelDistManager.Update(2, dmChannel) + view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2}, growingSegments) + view.TargetVersion = checker.targetMgr.GetCollectionTargetVersion(int64(1), meta.CurrentTarget) + checker.dist.LeaderViewManager.Update(2, view) + checker.dist.SegmentDistManager.Update(2, utils.CreateTestSegment(1, 1, 3, 2, 2, "test-insert-channel")) + + tasks := checker.Check(context.TODO()) + suite.Len(tasks, 1) + sort.Slice(tasks, func(i, j int) bool { + return tasks[i].Actions()[0].(*task.SegmentAction).GetSegmentID() < tasks[j].Actions()[0].(*task.SegmentAction).GetSegmentID() + }) + suite.Len(tasks[0].Actions(), 1) + action, ok := tasks[0].Actions()[0].(*task.SegmentAction) + suite.True(ok) + suite.EqualValues(1, tasks[0].ReplicaID()) + suite.Equal(task.ActionTypeReduce, action.Type()) + suite.EqualValues(4, action.GetSegmentID()) + suite.EqualValues(2, action.Node()) + suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) +} + func (suite *SegmentCheckerTestSuite) TestSkipReleaseGrowingSegments() { checker := suite.checker checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))