diff --git a/internal/datacoord/task_queue.go b/internal/datacoord/task_queue.go index b9ce7f0eb7..15f9de67e2 100644 --- a/internal/datacoord/task_queue.go +++ b/internal/datacoord/task_queue.go @@ -33,9 +33,7 @@ type schedulePolicy interface { Remove(taskID UniqueID) } -var ( - _ schedulePolicy = &priorityQueuePolicy{} -) +var _ schedulePolicy = &priorityQueuePolicy{} // priorityQueuePolicy implements a priority queue that sorts tasks by taskID (smaller taskID has higher priority) type priorityQueuePolicy struct { diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index cbe2cb2be5..a0f047afdb 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/util/merr" ) var errTypeNotFound = errors.New("checker type not found") @@ -63,11 +64,21 @@ func NewCheckerController( broker meta.Broker, getBalancerFunc GetBalancerFunc, ) *CheckerController { + checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool { + resp, err := broker.GetSegmentInfo(ctx, segmentID) + if err := merr.CheckRPCCall(resp, err); err != nil { + log.Info("check segment exist failed", zap.Int64("collectionID", collectionID), zap.Int64("segmentID", segmentID), zap.Error(err)) + if errors.Is(err, merr.ErrSegmentNotFound) { + return false + } + } + return true + } // CheckerController runs checkers with the order, // the former checker has higher priority checkers := map[utils.CheckerType]Checker{ utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc), - utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc), + utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc, checkSegmentExist), utils.BalanceChecker: NewBalanceChecker(meta, targetMgr, nodeMgr, scheduler, getBalancerFunc), utils.IndexChecker: NewIndexChecker(meta, dist, broker, nodeMgr, targetMgr), // todo temporary work around must fix diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index cd2d242130..76a66ec67b 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -41,13 +41,16 @@ import ( const initialTargetVersion = int64(0) +type CheckSegmentExist func(ctx context.Context, collectionID int64, segmentID int64) bool + type SegmentChecker struct { *checkerActivation - meta *meta.Meta - dist *meta.DistributionManager - targetMgr meta.TargetManagerInterface - nodeMgr *session.NodeManager - getBalancerFunc GetBalancerFunc + meta *meta.Meta + dist *meta.DistributionManager + targetMgr meta.TargetManagerInterface + nodeMgr *session.NodeManager + getBalancerFunc GetBalancerFunc + checkSegmentExist CheckSegmentExist } func NewSegmentChecker( @@ -56,6 +59,7 @@ func NewSegmentChecker( targetMgr meta.TargetManagerInterface, nodeMgr *session.NodeManager, getBalancerFunc GetBalancerFunc, + checkSegmentExist CheckSegmentExist, ) *SegmentChecker { return &SegmentChecker{ checkerActivation: newCheckerActivation(), @@ -64,6 +68,7 @@ func NewSegmentChecker( targetMgr: targetMgr, nodeMgr: nodeMgr, getBalancerFunc: getBalancerFunc, + checkSegmentExist: checkSegmentExist, } } @@ -314,7 +319,10 @@ func (c *SegmentChecker) getSealedSegmentDiff( } level0Segments := lo.Filter(toLoad, func(segment *datapb.SegmentInfo, _ int) bool { - return segment.GetLevel() == datapb.SegmentLevel_L0 + // Patch for 2.5 branch only, l0 segments need to be loaded before other segments + // then if l0 segment has been garbage collected, load l0 will never succeed, + // other segment will never be loaded, so we need to check if l0 segment exists + return segment.GetLevel() == datapb.SegmentLevel_L0 && c.checkSegmentExist(ctx, collectionID, segment.GetID()) }) // L0 segment found, // QueryCoord loads the L0 segments first, diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 6abc30cc7e..3f4fd7cfb8 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -21,6 +21,8 @@ import ( "sort" "testing" + "github.com/bytedance/mockey" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -77,7 +79,11 @@ func (suite *SegmentCheckerTestSuite) SetupTest() { targetManager := meta.NewTargetManager(suite.broker, suite.meta) balancer := suite.createMockBalancer() - suite.checker = NewSegmentChecker(suite.meta, distManager, targetManager, suite.nodeMgr, func() balance.Balance { return balancer }) + getBalancerFunc := func() balance.Balance { return balancer } + checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool { + return true + } + suite.checker = NewSegmentChecker(suite.meta, distManager, targetManager, suite.nodeMgr, getBalancerFunc, checkSegmentExist) suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil).Maybe() } @@ -864,3 +870,117 @@ func (suite *SegmentCheckerTestSuite) TestFilterOutSegmentInUse() { func TestSegmentCheckerSuite(t *testing.T) { suite.Run(t, new(SegmentCheckerTestSuite)) } + +func TestGetSealedSegmentDiff_WithL0SegmentCheck(t *testing.T) { + mockey.PatchConvey("TestGetSealedSegmentDiff_WithL0SegmentCheck", t, func() { + // Test case 1: L0 segment exists + t.Run("L0_segment_exists", func(t *testing.T) { + checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool { + return true // L0 segment exists + } + + // Create test segments + segments := []*datapb.SegmentInfo{ + { + ID: 1, + CollectionID: 1, + PartitionID: 1, + Level: datapb.SegmentLevel_L0, + }, + { + ID: 2, + CollectionID: 1, + PartitionID: 1, + Level: datapb.SegmentLevel_L1, + }, + } + + // Filter L0 segments with existence check + var level0Segments []*datapb.SegmentInfo + for _, segment := range segments { + if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) { + level0Segments = append(level0Segments, segment) + } + } + + // Verify: L0 segment should be included + assert.Equal(t, 1, len(level0Segments)) + assert.Equal(t, int64(1), level0Segments[0].GetID()) + }) + + // Test case 2: L0 segment does not exist + t.Run("L0_segment_not_exists", func(t *testing.T) { + checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool { + return false // L0 segment does not exist + } + + // Create test segments + segments := []*datapb.SegmentInfo{ + { + ID: 1, + CollectionID: 1, + PartitionID: 1, + Level: datapb.SegmentLevel_L0, + }, + { + ID: 2, + CollectionID: 1, + PartitionID: 1, + Level: datapb.SegmentLevel_L1, + }, + } + + // Filter L0 segments with existence check + var level0Segments []*datapb.SegmentInfo + for _, segment := range segments { + if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) { + level0Segments = append(level0Segments, segment) + } + } + + // Verify: L0 segment should be filtered out + assert.Equal(t, 0, len(level0Segments)) + }) + + // Test case 3: Mixed L0 segments, only some exist + t.Run("Mixed_L0_segments", func(t *testing.T) { + checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool { + return segmentID == 1 // Only segment 1 exists + } + + // Create test segments + segments := []*datapb.SegmentInfo{ + { + ID: 1, + CollectionID: 1, + PartitionID: 1, + Level: datapb.SegmentLevel_L0, + }, + { + ID: 2, + CollectionID: 1, + PartitionID: 1, + Level: datapb.SegmentLevel_L0, + }, + { + ID: 3, + CollectionID: 1, + PartitionID: 1, + Level: datapb.SegmentLevel_L1, + }, + } + + // Filter L0 segments with existence check + var level0Segments []*datapb.SegmentInfo + for _, segment := range segments { + if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) { + level0Segments = append(level0Segments, segment) + } + } + + // Verify: Only existing L0 segment should be included + assert.Equal(t, 1, len(level0Segments)) + assert.Equal(t, int64(1), level0Segments[0].GetID()) + }) + }) +}