mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: skip loading non-existent L0 segments to prevent load blocking (#43576)
issue: #43557 In 2.5 branch, L0 segments must be loaded before other segments. If an L0 segment has been garbage collected but is still in the target list, the load operation would keep failing, preventing other segments from being loaded. This patch adds a segment existence check for L0 segments in getSealedSegmentDiff. Only L0 segments that actually exist will be included in the load list. Changes: - Add checkSegmentExist function parameter to SegmentChecker constructor - Filter L0 segments by existence check in getSealedSegmentDiff - Add unit tests using mockey to verify the fix behavior Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
4b8e8bd9fd
commit
75463725b3
@ -33,9 +33,7 @@ type schedulePolicy interface {
|
||||
Remove(taskID UniqueID)
|
||||
}
|
||||
|
||||
var (
|
||||
_ schedulePolicy = &priorityQueuePolicy{}
|
||||
)
|
||||
var _ schedulePolicy = &priorityQueuePolicy{}
|
||||
|
||||
// priorityQueuePolicy implements a priority queue that sorts tasks by taskID (smaller taskID has higher priority)
|
||||
type priorityQueuePolicy struct {
|
||||
|
||||
@ -31,6 +31,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||
)
|
||||
|
||||
var errTypeNotFound = errors.New("checker type not found")
|
||||
@ -63,11 +64,21 @@ func NewCheckerController(
|
||||
broker meta.Broker,
|
||||
getBalancerFunc GetBalancerFunc,
|
||||
) *CheckerController {
|
||||
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
|
||||
resp, err := broker.GetSegmentInfo(ctx, segmentID)
|
||||
if err := merr.CheckRPCCall(resp, err); err != nil {
|
||||
log.Info("check segment exist failed", zap.Int64("collectionID", collectionID), zap.Int64("segmentID", segmentID), zap.Error(err))
|
||||
if errors.Is(err, merr.ErrSegmentNotFound) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
// CheckerController runs checkers with the order,
|
||||
// the former checker has higher priority
|
||||
checkers := map[utils.CheckerType]Checker{
|
||||
utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc),
|
||||
utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc),
|
||||
utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc, checkSegmentExist),
|
||||
utils.BalanceChecker: NewBalanceChecker(meta, targetMgr, nodeMgr, scheduler, getBalancerFunc),
|
||||
utils.IndexChecker: NewIndexChecker(meta, dist, broker, nodeMgr, targetMgr),
|
||||
// todo temporary work around must fix
|
||||
|
||||
@ -41,13 +41,16 @@ import (
|
||||
|
||||
const initialTargetVersion = int64(0)
|
||||
|
||||
type CheckSegmentExist func(ctx context.Context, collectionID int64, segmentID int64) bool
|
||||
|
||||
type SegmentChecker struct {
|
||||
*checkerActivation
|
||||
meta *meta.Meta
|
||||
dist *meta.DistributionManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
nodeMgr *session.NodeManager
|
||||
getBalancerFunc GetBalancerFunc
|
||||
meta *meta.Meta
|
||||
dist *meta.DistributionManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
nodeMgr *session.NodeManager
|
||||
getBalancerFunc GetBalancerFunc
|
||||
checkSegmentExist CheckSegmentExist
|
||||
}
|
||||
|
||||
func NewSegmentChecker(
|
||||
@ -56,6 +59,7 @@ func NewSegmentChecker(
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
nodeMgr *session.NodeManager,
|
||||
getBalancerFunc GetBalancerFunc,
|
||||
checkSegmentExist CheckSegmentExist,
|
||||
) *SegmentChecker {
|
||||
return &SegmentChecker{
|
||||
checkerActivation: newCheckerActivation(),
|
||||
@ -64,6 +68,7 @@ func NewSegmentChecker(
|
||||
targetMgr: targetMgr,
|
||||
nodeMgr: nodeMgr,
|
||||
getBalancerFunc: getBalancerFunc,
|
||||
checkSegmentExist: checkSegmentExist,
|
||||
}
|
||||
}
|
||||
|
||||
@ -314,7 +319,10 @@ func (c *SegmentChecker) getSealedSegmentDiff(
|
||||
}
|
||||
|
||||
level0Segments := lo.Filter(toLoad, func(segment *datapb.SegmentInfo, _ int) bool {
|
||||
return segment.GetLevel() == datapb.SegmentLevel_L0
|
||||
// Patch for 2.5 branch only, l0 segments need to be loaded before other segments
|
||||
// then if l0 segment has been garbage collected, load l0 will never succeed,
|
||||
// other segment will never be loaded, so we need to check if l0 segment exists
|
||||
return segment.GetLevel() == datapb.SegmentLevel_L0 && c.checkSegmentExist(ctx, collectionID, segment.GetID())
|
||||
})
|
||||
// L0 segment found,
|
||||
// QueryCoord loads the L0 segments first,
|
||||
|
||||
@ -21,6 +21,8 @@ import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/bytedance/mockey"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
@ -77,7 +79,11 @@ func (suite *SegmentCheckerTestSuite) SetupTest() {
|
||||
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
|
||||
|
||||
balancer := suite.createMockBalancer()
|
||||
suite.checker = NewSegmentChecker(suite.meta, distManager, targetManager, suite.nodeMgr, func() balance.Balance { return balancer })
|
||||
getBalancerFunc := func() balance.Balance { return balancer }
|
||||
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
|
||||
return true
|
||||
}
|
||||
suite.checker = NewSegmentChecker(suite.meta, distManager, targetManager, suite.nodeMgr, getBalancerFunc, checkSegmentExist)
|
||||
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil).Maybe()
|
||||
}
|
||||
@ -864,3 +870,117 @@ func (suite *SegmentCheckerTestSuite) TestFilterOutSegmentInUse() {
|
||||
func TestSegmentCheckerSuite(t *testing.T) {
|
||||
suite.Run(t, new(SegmentCheckerTestSuite))
|
||||
}
|
||||
|
||||
func TestGetSealedSegmentDiff_WithL0SegmentCheck(t *testing.T) {
|
||||
mockey.PatchConvey("TestGetSealedSegmentDiff_WithL0SegmentCheck", t, func() {
|
||||
// Test case 1: L0 segment exists
|
||||
t.Run("L0_segment_exists", func(t *testing.T) {
|
||||
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
|
||||
return true // L0 segment exists
|
||||
}
|
||||
|
||||
// Create test segments
|
||||
segments := []*datapb.SegmentInfo{
|
||||
{
|
||||
ID: 1,
|
||||
CollectionID: 1,
|
||||
PartitionID: 1,
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
},
|
||||
{
|
||||
ID: 2,
|
||||
CollectionID: 1,
|
||||
PartitionID: 1,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
},
|
||||
}
|
||||
|
||||
// Filter L0 segments with existence check
|
||||
var level0Segments []*datapb.SegmentInfo
|
||||
for _, segment := range segments {
|
||||
if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) {
|
||||
level0Segments = append(level0Segments, segment)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify: L0 segment should be included
|
||||
assert.Equal(t, 1, len(level0Segments))
|
||||
assert.Equal(t, int64(1), level0Segments[0].GetID())
|
||||
})
|
||||
|
||||
// Test case 2: L0 segment does not exist
|
||||
t.Run("L0_segment_not_exists", func(t *testing.T) {
|
||||
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
|
||||
return false // L0 segment does not exist
|
||||
}
|
||||
|
||||
// Create test segments
|
||||
segments := []*datapb.SegmentInfo{
|
||||
{
|
||||
ID: 1,
|
||||
CollectionID: 1,
|
||||
PartitionID: 1,
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
},
|
||||
{
|
||||
ID: 2,
|
||||
CollectionID: 1,
|
||||
PartitionID: 1,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
},
|
||||
}
|
||||
|
||||
// Filter L0 segments with existence check
|
||||
var level0Segments []*datapb.SegmentInfo
|
||||
for _, segment := range segments {
|
||||
if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) {
|
||||
level0Segments = append(level0Segments, segment)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify: L0 segment should be filtered out
|
||||
assert.Equal(t, 0, len(level0Segments))
|
||||
})
|
||||
|
||||
// Test case 3: Mixed L0 segments, only some exist
|
||||
t.Run("Mixed_L0_segments", func(t *testing.T) {
|
||||
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
|
||||
return segmentID == 1 // Only segment 1 exists
|
||||
}
|
||||
|
||||
// Create test segments
|
||||
segments := []*datapb.SegmentInfo{
|
||||
{
|
||||
ID: 1,
|
||||
CollectionID: 1,
|
||||
PartitionID: 1,
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
},
|
||||
{
|
||||
ID: 2,
|
||||
CollectionID: 1,
|
||||
PartitionID: 1,
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
},
|
||||
{
|
||||
ID: 3,
|
||||
CollectionID: 1,
|
||||
PartitionID: 1,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
},
|
||||
}
|
||||
|
||||
// Filter L0 segments with existence check
|
||||
var level0Segments []*datapb.SegmentInfo
|
||||
for _, segment := range segments {
|
||||
if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) {
|
||||
level0Segments = append(level0Segments, segment)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify: Only existing L0 segment should be included
|
||||
assert.Equal(t, 1, len(level0Segments))
|
||||
assert.Equal(t, int64(1), level0Segments[0].GetID())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user