enhance: Improve delegator serviceable check logic in PinReadableSegments (#43768)

issue: #43767
- Enhance serviceable check logic to properly handle full vs partial
result requirements
- For full result (requiredLoadRatio >= 1.0): check
queryView.Serviceable()
- For partial result (requiredLoadRatio < 1.0): check load ratio
satisfaction
- Add comprehensive unit tests covering all serviceable check scenarios

This enhancement ensures delegator correctly validates serviceability
based on the requested result completeness, improving reliability of
query operations.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-08-07 12:13:40 +08:00 committed by GitHub
parent 3647568cf3
commit 715b5153b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 190 additions and 4 deletions

View File

@ -17,7 +17,6 @@
package delegator
import (
"fmt"
"sync"
"github.com/samber/lo"
@ -164,9 +163,23 @@ func (d *distribution) PinReadableSegments(requiredLoadRatio float64, partitions
d.mut.RLock()
defer d.mut.RUnlock()
if d.queryView.GetLoadedRatio() < requiredLoadRatio {
return nil, nil, nil, -1, merr.WrapErrChannelNotAvailable(d.channelName,
fmt.Sprintf("channel distribution is not serviceable, required load ratio is %f, current load ratio is %f", requiredLoadRatio, d.queryView.GetLoadedRatio()))
requireFullResult := requiredLoadRatio >= 1.0
loadRatioSatisfy := d.queryView.GetLoadedRatio() >= requiredLoadRatio
var isServiceable bool
if requireFullResult {
isServiceable = d.queryView.Serviceable()
} else {
isServiceable = loadRatioSatisfy
}
if !isServiceable {
log.Warn("channel distribution is not serviceable",
zap.String("channel", d.channelName),
zap.Float64("requiredLoadRatio", requiredLoadRatio),
zap.Float64("currentLoadRatio", d.queryView.GetLoadedRatio()),
zap.Bool("serviceable", d.queryView.Serviceable()),
)
return nil, nil, nil, -1, merr.WrapErrChannelNotAvailable(d.channelName, "channel distribution is not serviceable")
}
current := d.current.Load()

View File

@ -1410,3 +1410,176 @@ func (s *DistributionSuite) TestSyncTargetVersion_RedundantGrowingLogic() {
})
}
}
func TestPinReadableSegments(t *testing.T) {
// Create test distribution
queryView := NewChannelQueryView(nil, nil, []int64{1}, initialTargetVersion)
dist := NewDistribution("test-channel", queryView)
// Add some test segments
dist.AddDistributions([]SegmentEntry{
{NodeID: 1, SegmentID: 1, PartitionID: 1, Version: 1},
{NodeID: 1, SegmentID: 2, PartitionID: 1, Version: 1},
}...)
// Setup query view
dist.SyncTargetVersion(&querypb.SyncAction{
TargetVersion: 1000,
SealedSegmentRowCount: map[int64]int64{1: 100, 2: 100},
GrowingInTarget: []int64{},
}, []int64{1})
// Test case 1: requireFullResult=true, Serviceable=false
mockServiceable := mockey.Mock((*channelQueryView).Serviceable).Return(false).Build()
mockGetLoadedRatio := mockey.Mock((*channelQueryView).GetLoadedRatio).Return(0.8).Build()
sealed, growing, _, _, err := dist.PinReadableSegments(1.0, 1)
assert.Error(t, err)
assert.Nil(t, sealed)
assert.Nil(t, growing)
assert.Contains(t, err.Error(), "channel distribution is not serviceable")
// Test case 2: requireFullResult=true, Serviceable=true
mockServiceable.UnPatch()
mockServiceable = mockey.Mock((*channelQueryView).Serviceable).Return(true).Build()
sealed, growing, _, _, err = dist.PinReadableSegments(1.0, 1)
assert.NoError(t, err)
assert.NotNil(t, sealed)
assert.NotNil(t, growing)
// Test case 3: requireFullResult=false, loadRatioSatisfy=false
mockServiceable.UnPatch()
mockGetLoadedRatio.UnPatch()
mockGetLoadedRatio = mockey.Mock((*channelQueryView).GetLoadedRatio).Return(0.5).Build()
sealed, growing, _, _, err = dist.PinReadableSegments(0.8, 1)
assert.Error(t, err)
assert.Nil(t, sealed)
assert.Nil(t, growing)
assert.Contains(t, err.Error(), "channel distribution is not serviceable")
// Test case 4: requireFullResult=false, loadRatioSatisfy=true
mockGetLoadedRatio.UnPatch()
mockGetLoadedRatio = mockey.Mock((*channelQueryView).GetLoadedRatio).Return(0.9).Build()
defer mockGetLoadedRatio.UnPatch()
sealed, growing, _, _, err = dist.PinReadableSegments(0.8, 1)
assert.NoError(t, err)
assert.NotNil(t, sealed)
assert.NotNil(t, growing)
}
func TestPinReadableSegments_ServiceableLogic(t *testing.T) {
// Create test distribution
queryView := NewChannelQueryView(nil, nil, []int64{1}, initialTargetVersion)
dist := NewDistribution("test-channel", queryView)
// Add test segments
dist.AddDistributions([]SegmentEntry{
{NodeID: 1, SegmentID: 1, PartitionID: 1, Version: 1},
}...)
// Setup query view
dist.SyncTargetVersion(&querypb.SyncAction{
TargetVersion: 1000,
SealedSegmentRowCount: map[int64]int64{1: 100},
GrowingInTarget: []int64{},
}, []int64{1})
// Test case: requireFullResult=true, Serviceable=false, GetLoadedRatio=1.0
// This tests the case where load ratio is satisfied but serviceable is false
mockServiceable := mockey.Mock((*channelQueryView).Serviceable).Return(false).Build()
mockGetLoadedRatio := mockey.Mock((*channelQueryView).GetLoadedRatio).Return(1.0).Build()
defer mockServiceable.UnPatch()
defer mockGetLoadedRatio.UnPatch()
sealed, growing, _, _, err := dist.PinReadableSegments(1.0, 1)
assert.Error(t, err)
assert.Nil(t, sealed)
assert.Nil(t, growing)
assert.Contains(t, err.Error(), "channel distribution is not serviceable")
}
func TestPinReadableSegments_LoadRatioLogic(t *testing.T) {
// Create test distribution
queryView := NewChannelQueryView(nil, nil, []int64{1}, initialTargetVersion)
dist := NewDistribution("test-channel", queryView)
// Add test segments
dist.AddDistributions([]SegmentEntry{
{NodeID: 1, SegmentID: 1, PartitionID: 1, Version: 1},
}...)
// Setup query view
dist.SyncTargetVersion(&querypb.SyncAction{
TargetVersion: 1000,
SealedSegmentRowCount: map[int64]int64{1: 100},
GrowingInTarget: []int64{},
}, []int64{1})
// Test case: requireFullResult=false, loadRatioSatisfy=false
// This tests the case where partial result is requested but load ratio is insufficient
mockGetLoadedRatio := mockey.Mock((*channelQueryView).GetLoadedRatio).Return(0.3).Build()
defer mockGetLoadedRatio.UnPatch()
sealed, growing, _, _, err := dist.PinReadableSegments(0.5, 1)
assert.Error(t, err)
assert.Nil(t, sealed)
assert.Nil(t, growing)
assert.Contains(t, err.Error(), "channel distribution is not serviceable")
}
func TestPinReadableSegments_EdgeCases(t *testing.T) {
// Create test distribution
queryView := NewChannelQueryView(nil, nil, []int64{1}, initialTargetVersion)
dist := NewDistribution("test-channel", queryView)
// Add test segments
dist.AddDistributions([]SegmentEntry{
{NodeID: 1, SegmentID: 1, PartitionID: 1, Version: 1},
}...)
// Setup query view
dist.SyncTargetVersion(&querypb.SyncAction{
TargetVersion: 1000,
SealedSegmentRowCount: map[int64]int64{1: 100},
GrowingInTarget: []int64{},
}, []int64{1})
// Test case 1: requiredLoadRatio = 0.0 (edge case)
mockGetLoadedRatio := mockey.Mock((*channelQueryView).GetLoadedRatio).Return(0.0).Build()
sealed, growing, _, _, err := dist.PinReadableSegments(0.0, 1)
assert.NoError(t, err)
assert.NotNil(t, sealed)
assert.NotNil(t, growing)
// Test case 2: requiredLoadRatio = 0.0, GetLoadedRatio = 0.0 (exact match)
mockGetLoadedRatio.UnPatch()
mockGetLoadedRatio = mockey.Mock((*channelQueryView).GetLoadedRatio).Return(0.0).Build()
sealed, growing, _, _, err = dist.PinReadableSegments(0.0, 1)
assert.NoError(t, err)
assert.NotNil(t, sealed)
assert.NotNil(t, growing)
// Test case 3: requiredLoadRatio = 0.0, GetLoadedRatio = 0.1 (satisfied)
mockGetLoadedRatio.UnPatch()
mockGetLoadedRatio = mockey.Mock((*channelQueryView).GetLoadedRatio).Return(0.1).Build()
defer mockGetLoadedRatio.UnPatch()
sealed, growing, _, _, err = dist.PinReadableSegments(0.0, 1)
assert.NoError(t, err)
assert.NotNil(t, sealed)
assert.NotNil(t, growing)
}