From 75463725b332700c1946f7b28e53113c2cf78b71 Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 31 Jul 2025 14:33:38 +0800 Subject: [PATCH] fix: skip loading non-existent L0 segments to prevent load blocking (#43576) issue: #43557 In 2.5 branch, L0 segments must be loaded before other segments. If an L0 segment has been garbage collected but is still in the target list, the load operation would keep failing, preventing other segments from being loaded. This patch adds a segment existence check for L0 segments in getSealedSegmentDiff. Only L0 segments that actually exist will be included in the load list. Changes: - Add checkSegmentExist function parameter to SegmentChecker constructor - Filter L0 segments by existence check in getSealedSegmentDiff - Add unit tests using mockey to verify the fix behavior Signed-off-by: Wei Liu --- internal/datacoord/task_queue.go | 4 +- internal/querycoordv2/checkers/controller.go | 13 +- .../querycoordv2/checkers/segment_checker.go | 20 ++- .../checkers/segment_checker_test.go | 122 +++++++++++++++++- 4 files changed, 148 insertions(+), 11 deletions(-) diff --git a/internal/datacoord/task_queue.go b/internal/datacoord/task_queue.go index b9ce7f0eb7..15f9de67e2 100644 --- a/internal/datacoord/task_queue.go +++ b/internal/datacoord/task_queue.go @@ -33,9 +33,7 @@ type schedulePolicy interface { Remove(taskID UniqueID) } -var ( - _ schedulePolicy = &priorityQueuePolicy{} -) +var _ schedulePolicy = &priorityQueuePolicy{} // priorityQueuePolicy implements a priority queue that sorts tasks by taskID (smaller taskID has higher priority) type priorityQueuePolicy struct { diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index cbe2cb2be5..a0f047afdb 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/util/merr" ) var errTypeNotFound = errors.New("checker type not found") @@ -63,11 +64,21 @@ func NewCheckerController( broker meta.Broker, getBalancerFunc GetBalancerFunc, ) *CheckerController { + checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool { + resp, err := broker.GetSegmentInfo(ctx, segmentID) + if err := merr.CheckRPCCall(resp, err); err != nil { + log.Info("check segment exist failed", zap.Int64("collectionID", collectionID), zap.Int64("segmentID", segmentID), zap.Error(err)) + if errors.Is(err, merr.ErrSegmentNotFound) { + return false + } + } + return true + } // CheckerController runs checkers with the order, // the former checker has higher priority checkers := map[utils.CheckerType]Checker{ utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc), - utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc), + utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc, checkSegmentExist), utils.BalanceChecker: NewBalanceChecker(meta, targetMgr, nodeMgr, scheduler, getBalancerFunc), utils.IndexChecker: NewIndexChecker(meta, dist, broker, nodeMgr, targetMgr), // todo temporary work around must fix diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index cd2d242130..76a66ec67b 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -41,13 +41,16 @@ import ( const initialTargetVersion = int64(0) +type CheckSegmentExist func(ctx context.Context, collectionID int64, segmentID int64) bool + type SegmentChecker struct { *checkerActivation - meta *meta.Meta - dist *meta.DistributionManager - targetMgr meta.TargetManagerInterface - nodeMgr *session.NodeManager - getBalancerFunc GetBalancerFunc + meta *meta.Meta + dist *meta.DistributionManager + targetMgr meta.TargetManagerInterface + nodeMgr *session.NodeManager + getBalancerFunc GetBalancerFunc + checkSegmentExist CheckSegmentExist } func NewSegmentChecker( @@ -56,6 +59,7 @@ func NewSegmentChecker( targetMgr meta.TargetManagerInterface, nodeMgr *session.NodeManager, getBalancerFunc GetBalancerFunc, + checkSegmentExist CheckSegmentExist, ) *SegmentChecker { return &SegmentChecker{ checkerActivation: newCheckerActivation(), @@ -64,6 +68,7 @@ func NewSegmentChecker( targetMgr: targetMgr, nodeMgr: nodeMgr, getBalancerFunc: getBalancerFunc, + checkSegmentExist: checkSegmentExist, } } @@ -314,7 +319,10 @@ func (c *SegmentChecker) getSealedSegmentDiff( } level0Segments := lo.Filter(toLoad, func(segment *datapb.SegmentInfo, _ int) bool { - return segment.GetLevel() == datapb.SegmentLevel_L0 + // Patch for 2.5 branch only, l0 segments need to be loaded before other segments + // then if l0 segment has been garbage collected, load l0 will never succeed, + // other segment will never be loaded, so we need to check if l0 segment exists + return segment.GetLevel() == datapb.SegmentLevel_L0 && c.checkSegmentExist(ctx, collectionID, segment.GetID()) }) // L0 segment found, // QueryCoord loads the L0 segments first, diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 6abc30cc7e..3f4fd7cfb8 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -21,6 +21,8 @@ import ( "sort" "testing" + "github.com/bytedance/mockey" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -77,7 +79,11 @@ func (suite *SegmentCheckerTestSuite) SetupTest() { targetManager := meta.NewTargetManager(suite.broker, suite.meta) balancer := suite.createMockBalancer() - suite.checker = NewSegmentChecker(suite.meta, distManager, targetManager, suite.nodeMgr, func() balance.Balance { return balancer }) + getBalancerFunc := func() balance.Balance { return balancer } + checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool { + return true + } + suite.checker = NewSegmentChecker(suite.meta, distManager, targetManager, suite.nodeMgr, getBalancerFunc, checkSegmentExist) suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil).Maybe() } @@ -864,3 +870,117 @@ func (suite *SegmentCheckerTestSuite) TestFilterOutSegmentInUse() { func TestSegmentCheckerSuite(t *testing.T) { suite.Run(t, new(SegmentCheckerTestSuite)) } + +func TestGetSealedSegmentDiff_WithL0SegmentCheck(t *testing.T) { + mockey.PatchConvey("TestGetSealedSegmentDiff_WithL0SegmentCheck", t, func() { + // Test case 1: L0 segment exists + t.Run("L0_segment_exists", func(t *testing.T) { + checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool { + return true // L0 segment exists + } + + // Create test segments + segments := []*datapb.SegmentInfo{ + { + ID: 1, + CollectionID: 1, + PartitionID: 1, + Level: datapb.SegmentLevel_L0, + }, + { + ID: 2, + CollectionID: 1, + PartitionID: 1, + Level: datapb.SegmentLevel_L1, + }, + } + + // Filter L0 segments with existence check + var level0Segments []*datapb.SegmentInfo + for _, segment := range segments { + if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) { + level0Segments = append(level0Segments, segment) + } + } + + // Verify: L0 segment should be included + assert.Equal(t, 1, len(level0Segments)) + assert.Equal(t, int64(1), level0Segments[0].GetID()) + }) + + // Test case 2: L0 segment does not exist + t.Run("L0_segment_not_exists", func(t *testing.T) { + checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool { + return false // L0 segment does not exist + } + + // Create test segments + segments := []*datapb.SegmentInfo{ + { + ID: 1, + CollectionID: 1, + PartitionID: 1, + Level: datapb.SegmentLevel_L0, + }, + { + ID: 2, + CollectionID: 1, + PartitionID: 1, + Level: datapb.SegmentLevel_L1, + }, + } + + // Filter L0 segments with existence check + var level0Segments []*datapb.SegmentInfo + for _, segment := range segments { + if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) { + level0Segments = append(level0Segments, segment) + } + } + + // Verify: L0 segment should be filtered out + assert.Equal(t, 0, len(level0Segments)) + }) + + // Test case 3: Mixed L0 segments, only some exist + t.Run("Mixed_L0_segments", func(t *testing.T) { + checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool { + return segmentID == 1 // Only segment 1 exists + } + + // Create test segments + segments := []*datapb.SegmentInfo{ + { + ID: 1, + CollectionID: 1, + PartitionID: 1, + Level: datapb.SegmentLevel_L0, + }, + { + ID: 2, + CollectionID: 1, + PartitionID: 1, + Level: datapb.SegmentLevel_L0, + }, + { + ID: 3, + CollectionID: 1, + PartitionID: 1, + Level: datapb.SegmentLevel_L1, + }, + } + + // Filter L0 segments with existence check + var level0Segments []*datapb.SegmentInfo + for _, segment := range segments { + if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) { + level0Segments = append(level0Segments, segment) + } + } + + // Verify: Only existing L0 segment should be included + assert.Equal(t, 1, len(level0Segments)) + assert.Equal(t, int64(1), level0Segments[0].GetID()) + }) + }) +}