mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: prevent empty segment list when partial result is enabled (#46670)
issue: #46669 When partial result is enabled (PartialResultRequiredDataRatio < 1.0), the Serviceable() method would return true even if syncedByCoord is false (by bypassing viewReady check). However, PinReadableSegments uses GetLoadedRatio() == 1.0 to decide whether to filter segments by target version. This causes a problem: when loadedRatio == 1.0 but syncedByCoord == false, segments are filtered by an incorrect target version, resulting in an empty segment list during search. This change: - Replace GetLoadedRatio() == 1.0 with Serviceable() check to ensure target version filtering only happens after coord sync completes - Remove partial result bypass in Serviceable() to keep the check consistent <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Bug Fix Summary **Core Invariant**: `Serviceable()` must enforce a strict requirement that both data loading AND coordinator synchronization are complete before allowing full search operations. This prevents using stale or uninitialized target versions. **Logic Removed/Simplified**: - Removed the partial-result bypass from `Serviceable()` that previously allowed it to return `true` even when `syncedByCoord == false` - Replaced `GetLoadedRatio() == 1.0` checks in `PinReadableSegments` with `Serviceable()` calls to ensure target-version filtering only occurs after coord sync completes - Simplified the serviceability condition from parameterized partial-result logic to a direct conjunction: `loadedRatio >= 1.0 AND syncedByCoord == true` **No Data Loss or Regression**: The change is safe because: - When `Serviceable()` returns `true` (both loadedRatio ≥ 1.0 AND syncedByCoord ≥ true), segments are filtered by the current valid target version—this is the full-result path - When `Serviceable()` returns `false` but `loadedRatio >= requiredLoadRatio` (partial result case), segments are filtered against the query view's segment lists rather than target version, ensuring non-empty results as validated by `TestPinReadableSegments_PartialResultNotEmpty` - The test explicitly demonstrates that even with `loadedRatio == 1.0` and `syncedByCoord == false`, calling `PinReadableSegments(0.8, partition)` returns segments (partial result) instead of an empty list, which was the bug root cause **Root Cause Fix**: Previously, segments could be filtered with `unreadableTargetVersion` when `loadedRatio == 1.0` but the querycoord hadn't yet synced the target, causing empty segment lists. Now the sync state is checked before deciding the filtering strategy, preventing this race condition. <!-- end of auto-generated comment: release notes by coderabbit.ai --> Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
5b6697c5cb
commit
c2677967ad
@ -27,7 +27,6 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
)
|
||||
|
||||
@ -94,9 +93,7 @@ func (q *channelQueryView) Serviceable() bool {
|
||||
// Note: after we support channel level target(data view), we can remove this flag
|
||||
viewReady := q.syncedByCoord
|
||||
|
||||
// if partial result is enabled, we can skip the viewReady check
|
||||
enablePartialResult := paramtable.Get().QueryNodeCfg.PartialResultRequiredDataRatio.GetAsFloat() < 1.0
|
||||
return dataReady && (viewReady || enablePartialResult)
|
||||
return dataReady && viewReady
|
||||
}
|
||||
|
||||
func (q *channelQueryView) GetLoadedRatio() float64 {
|
||||
@ -193,8 +190,8 @@ func (d *distribution) PinReadableSegments(requiredLoadRatio float64, partitions
|
||||
sealed, growing = current.Get(partitions...)
|
||||
version = current.version
|
||||
sealedRowCount = d.queryView.sealedSegmentRowCount
|
||||
if d.queryView.GetLoadedRatio() == 1.0 {
|
||||
// if query view is fully loaded, we can use current target version to filter segments
|
||||
if d.queryView.Serviceable() {
|
||||
// if query view is serviceable, we can use current target version to filter segments
|
||||
targetVersion := current.GetTargetVersion()
|
||||
filterReadable := d.readableFilter(targetVersion)
|
||||
sealed, growing = d.filterSegments(sealed, growing, filterReadable)
|
||||
|
||||
@ -944,197 +944,210 @@ func TestDistribution_MarkOfflineSegments(t *testing.T) {
|
||||
|
||||
// TestChannelQueryView_SyncedByCoord tests the syncedByCoord field functionality
|
||||
func TestChannelQueryView_SyncedByCoord(t *testing.T) {
|
||||
mockey.PatchConvey("TestChannelQueryView_SyncedByCoord", t, func() {
|
||||
growings := []int64{1, 2, 3}
|
||||
sealedWithRowCount := map[int64]int64{4: 100, 5: 200, 6: 300}
|
||||
partitions := []int64{7, 8, 9}
|
||||
version := int64(10)
|
||||
|
||||
t.Run("new channelQueryView has syncedByCoord false", func(t *testing.T) {
|
||||
// Mock GetAsFloat to return 1.0 (disable partial result) to avoid paramtable initialization
|
||||
mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsFloat")).Return(1.0).Build()
|
||||
mockGetAsFloat := mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsFloat")).Return(1.0).Build()
|
||||
defer mockGetAsFloat.UnPatch()
|
||||
|
||||
growings := []int64{1, 2, 3}
|
||||
sealedWithRowCount := map[int64]int64{4: 100, 5: 200, 6: 300}
|
||||
partitions := []int64{7, 8, 9}
|
||||
version := int64(10)
|
||||
view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version)
|
||||
assert.False(t, view.syncedByCoord, "New channelQueryView should have syncedByCoord = false")
|
||||
})
|
||||
|
||||
t.Run("new channelQueryView has syncedByCoord false", func(t *testing.T) {
|
||||
view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version)
|
||||
assert.False(t, view.syncedByCoord, "New channelQueryView should have syncedByCoord = false")
|
||||
})
|
||||
t.Run("syncedByCoord can be set manually", func(t *testing.T) {
|
||||
// Mock GetAsFloat to return 1.0 (disable partial result) to avoid paramtable initialization
|
||||
mockGetAsFloat := mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsFloat")).Return(1.0).Build()
|
||||
defer mockGetAsFloat.UnPatch()
|
||||
|
||||
t.Run("syncedByCoord can be set manually", func(t *testing.T) {
|
||||
view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version)
|
||||
view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version)
|
||||
|
||||
// Initially false
|
||||
assert.False(t, view.syncedByCoord)
|
||||
// Initially false
|
||||
assert.False(t, view.syncedByCoord)
|
||||
|
||||
// Set to true
|
||||
view.syncedByCoord = true
|
||||
assert.True(t, view.syncedByCoord)
|
||||
// Set to true
|
||||
view.syncedByCoord = true
|
||||
assert.True(t, view.syncedByCoord)
|
||||
|
||||
// Set back to false
|
||||
view.syncedByCoord = false
|
||||
assert.False(t, view.syncedByCoord)
|
||||
})
|
||||
// Set back to false
|
||||
view.syncedByCoord = false
|
||||
assert.False(t, view.syncedByCoord)
|
||||
})
|
||||
}
|
||||
|
||||
// TestDistribution_SyncTargetVersionSetsSyncedByCoord tests that SyncTargetVersion sets syncedByCoord
|
||||
func TestDistribution_SyncTargetVersionSetsSyncedByCoord(t *testing.T) {
|
||||
mockey.PatchConvey("TestDistribution_SyncTargetVersionSetsSyncedByCoord", t, func() {
|
||||
channelName := "test_channel"
|
||||
growings := []int64{1, 2, 3}
|
||||
sealedWithRowCount := map[int64]int64{4: 100, 5: 200, 6: 300}
|
||||
partitions := []int64{7, 8, 9}
|
||||
version := int64(10)
|
||||
|
||||
t.Run("SyncTargetVersion sets syncedByCoord to true", func(t *testing.T) {
|
||||
// Mock GetAsFloat to return 1.0 (disable partial result) to avoid paramtable initialization
|
||||
mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsFloat")).Return(1.0).Build()
|
||||
mockGetAsFloat := mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsFloat")).Return(1.0).Build()
|
||||
defer mockGetAsFloat.UnPatch()
|
||||
|
||||
channelName := "test_channel"
|
||||
growings := []int64{1, 2, 3}
|
||||
sealedWithRowCount := map[int64]int64{4: 100, 5: 200, 6: 300}
|
||||
partitions := []int64{7, 8, 9}
|
||||
version := int64(10)
|
||||
view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version)
|
||||
dist := NewDistribution(channelName, view)
|
||||
|
||||
t.Run("SyncTargetVersion sets syncedByCoord to true", func(t *testing.T) {
|
||||
view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version)
|
||||
dist := NewDistribution(channelName, view)
|
||||
// Initially syncedByCoord should be false
|
||||
assert.False(t, dist.queryView.syncedByCoord)
|
||||
|
||||
// Initially syncedByCoord should be false
|
||||
assert.False(t, dist.queryView.syncedByCoord)
|
||||
// Create sync action
|
||||
action := &querypb.SyncAction{
|
||||
GrowingInTarget: []int64{1, 2},
|
||||
SealedSegmentRowCount: map[int64]int64{4: 100, 5: 100},
|
||||
TargetVersion: version + 1,
|
||||
}
|
||||
|
||||
// Create sync action
|
||||
action := &querypb.SyncAction{
|
||||
GrowingInTarget: []int64{1, 2},
|
||||
SealedSegmentRowCount: map[int64]int64{4: 100, 5: 100},
|
||||
TargetVersion: version + 1,
|
||||
}
|
||||
// Sync the view
|
||||
dist.SyncTargetVersion(action, partitions)
|
||||
|
||||
// Sync the view
|
||||
dist.SyncTargetVersion(action, partitions)
|
||||
// Verify syncedByCoord is set to true after sync
|
||||
assert.True(t, dist.queryView.syncedByCoord, "SyncTargetVersion should set syncedByCoord to true")
|
||||
assert.Equal(t, action.GetTargetVersion(), dist.queryView.version)
|
||||
})
|
||||
|
||||
// Verify syncedByCoord is set to true after sync
|
||||
assert.True(t, dist.queryView.syncedByCoord, "SyncTargetVersion should set syncedByCoord to true")
|
||||
assert.Equal(t, action.GetTargetVersion(), dist.queryView.version)
|
||||
})
|
||||
t.Run("multiple SyncTargetVersion calls maintain syncedByCoord true", func(t *testing.T) {
|
||||
// Mock GetAsFloat to return 1.0 (disable partial result) to avoid paramtable initialization
|
||||
mockGetAsFloat := mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsFloat")).Return(1.0).Build()
|
||||
defer mockGetAsFloat.UnPatch()
|
||||
|
||||
t.Run("multiple SyncTargetVersion calls maintain syncedByCoord true", func(t *testing.T) {
|
||||
view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version)
|
||||
dist := NewDistribution(channelName, view)
|
||||
view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version)
|
||||
dist := NewDistribution(channelName, view)
|
||||
|
||||
// First sync
|
||||
action1 := &querypb.SyncAction{
|
||||
GrowingInTarget: []int64{1, 2},
|
||||
SealedSegmentRowCount: map[int64]int64{4: 100},
|
||||
TargetVersion: version + 1,
|
||||
}
|
||||
dist.SyncTargetVersion(action1, partitions)
|
||||
assert.True(t, dist.queryView.syncedByCoord)
|
||||
// First sync
|
||||
action1 := &querypb.SyncAction{
|
||||
GrowingInTarget: []int64{1, 2},
|
||||
SealedSegmentRowCount: map[int64]int64{4: 100},
|
||||
TargetVersion: version + 1,
|
||||
}
|
||||
dist.SyncTargetVersion(action1, partitions)
|
||||
assert.True(t, dist.queryView.syncedByCoord)
|
||||
|
||||
// Second sync
|
||||
action2 := &querypb.SyncAction{
|
||||
GrowingInTarget: []int64{1, 2, 3},
|
||||
SealedSegmentRowCount: map[int64]int64{4: 100, 5: 200},
|
||||
TargetVersion: version + 2,
|
||||
}
|
||||
dist.SyncTargetVersion(action2, partitions)
|
||||
assert.True(t, dist.queryView.syncedByCoord, "syncedByCoord should remain true after multiple syncs")
|
||||
assert.Equal(t, action2.GetTargetVersion(), dist.queryView.version)
|
||||
})
|
||||
// Second sync
|
||||
action2 := &querypb.SyncAction{
|
||||
GrowingInTarget: []int64{1, 2, 3},
|
||||
SealedSegmentRowCount: map[int64]int64{4: 100, 5: 200},
|
||||
TargetVersion: version + 2,
|
||||
}
|
||||
dist.SyncTargetVersion(action2, partitions)
|
||||
assert.True(t, dist.queryView.syncedByCoord, "syncedByCoord should remain true after multiple syncs")
|
||||
assert.Equal(t, action2.GetTargetVersion(), dist.queryView.version)
|
||||
})
|
||||
|
||||
t.Run("SyncTargetVersion creates new queryView with syncedByCoord true", func(t *testing.T) {
|
||||
view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version)
|
||||
dist := NewDistribution(channelName, view)
|
||||
t.Run("SyncTargetVersion creates new queryView with syncedByCoord true", func(t *testing.T) {
|
||||
// Mock GetAsFloat to return 1.0 (disable partial result) to avoid paramtable initialization
|
||||
mockGetAsFloat := mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsFloat")).Return(1.0).Build()
|
||||
defer mockGetAsFloat.UnPatch()
|
||||
|
||||
// Store reference to original view
|
||||
originalView := dist.queryView
|
||||
assert.False(t, originalView.syncedByCoord)
|
||||
view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version)
|
||||
dist := NewDistribution(channelName, view)
|
||||
|
||||
// Sync should create new queryView
|
||||
action := &querypb.SyncAction{
|
||||
GrowingInTarget: []int64{1, 2},
|
||||
SealedSegmentRowCount: map[int64]int64{4: 100, 5: 100},
|
||||
TargetVersion: version + 1,
|
||||
}
|
||||
dist.SyncTargetVersion(action, partitions)
|
||||
// Store reference to original view
|
||||
originalView := dist.queryView
|
||||
assert.False(t, originalView.syncedByCoord)
|
||||
|
||||
// Verify new queryView is created with syncedByCoord = true
|
||||
newView := dist.queryView
|
||||
assert.NotSame(t, originalView, newView, "SyncTargetVersion should create new queryView")
|
||||
assert.True(t, newView.syncedByCoord, "New queryView should have syncedByCoord = true")
|
||||
assert.False(t, originalView.syncedByCoord, "Original queryView should remain unchanged")
|
||||
})
|
||||
// Sync should create new queryView
|
||||
action := &querypb.SyncAction{
|
||||
GrowingInTarget: []int64{1, 2},
|
||||
SealedSegmentRowCount: map[int64]int64{4: 100, 5: 100},
|
||||
TargetVersion: version + 1,
|
||||
}
|
||||
dist.SyncTargetVersion(action, partitions)
|
||||
|
||||
// Verify new queryView is created with syncedByCoord = true
|
||||
newView := dist.queryView
|
||||
assert.NotSame(t, originalView, newView, "SyncTargetVersion should create new queryView")
|
||||
assert.True(t, newView.syncedByCoord, "New queryView should have syncedByCoord = true")
|
||||
assert.False(t, originalView.syncedByCoord, "Original queryView should remain unchanged")
|
||||
})
|
||||
}
|
||||
|
||||
// TestDistribution_ServiceableWithSyncedByCoord tests serviceable logic considering syncedByCoord
|
||||
func TestDistribution_ServiceableWithSyncedByCoord(t *testing.T) {
|
||||
mockey.PatchConvey("TestDistribution_ServiceableWithSyncedByCoord", t, func() {
|
||||
channelName := "test_channel"
|
||||
growings := []int64{1, 2}
|
||||
sealedWithRowCount := map[int64]int64{4: 100, 5: 100}
|
||||
partitions := []int64{7, 8}
|
||||
version := int64(10)
|
||||
|
||||
t.Run("distribution becomes serviceable after sync and full load", func(t *testing.T) {
|
||||
// Mock GetAsFloat to return 1.0 (disable partial result) to avoid paramtable initialization
|
||||
mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsFloat")).Return(1.0).Build()
|
||||
mockGetAsFloat := mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsFloat")).Return(1.0).Build()
|
||||
defer mockGetAsFloat.UnPatch()
|
||||
|
||||
channelName := "test_channel"
|
||||
growings := []int64{1, 2}
|
||||
sealedWithRowCount := map[int64]int64{4: 100, 5: 100}
|
||||
partitions := []int64{7, 8}
|
||||
version := int64(10)
|
||||
view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version)
|
||||
dist := NewDistribution(channelName, view)
|
||||
|
||||
t.Run("distribution becomes serviceable after sync and full load", func(t *testing.T) {
|
||||
view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version)
|
||||
dist := NewDistribution(channelName, view)
|
||||
// Initially not serviceable
|
||||
assert.False(t, dist.Serviceable())
|
||||
|
||||
// Initially not serviceable
|
||||
assert.False(t, dist.Serviceable())
|
||||
|
||||
// Add all segments to make it fully loaded
|
||||
for _, id := range growings {
|
||||
dist.growingSegments[id] = SegmentEntry{
|
||||
SegmentID: id,
|
||||
}
|
||||
// Add all segments to make it fully loaded
|
||||
for _, id := range growings {
|
||||
dist.growingSegments[id] = SegmentEntry{
|
||||
SegmentID: id,
|
||||
}
|
||||
for id := range sealedWithRowCount {
|
||||
dist.sealedSegments[id] = SegmentEntry{
|
||||
SegmentID: id,
|
||||
Offline: false,
|
||||
}
|
||||
}
|
||||
for id := range sealedWithRowCount {
|
||||
dist.sealedSegments[id] = SegmentEntry{
|
||||
SegmentID: id,
|
||||
Offline: false,
|
||||
}
|
||||
}
|
||||
|
||||
// Sync target version to set syncedByCoord = true
|
||||
action := &querypb.SyncAction{
|
||||
GrowingInTarget: growings,
|
||||
SealedSegmentRowCount: sealedWithRowCount,
|
||||
TargetVersion: version + 1,
|
||||
// Sync target version to set syncedByCoord = true
|
||||
action := &querypb.SyncAction{
|
||||
GrowingInTarget: growings,
|
||||
SealedSegmentRowCount: sealedWithRowCount,
|
||||
TargetVersion: version + 1,
|
||||
}
|
||||
dist.SyncTargetVersion(action, partitions)
|
||||
|
||||
// Update serviceable to calculate loaded ratio
|
||||
dist.updateServiceable("test")
|
||||
|
||||
// Should be serviceable now
|
||||
assert.True(t, dist.Serviceable())
|
||||
assert.Equal(t, float64(1), dist.queryView.GetLoadedRatio())
|
||||
assert.True(t, dist.queryView.syncedByCoord)
|
||||
})
|
||||
|
||||
t.Run("distribution not serviceable without sync even with full load", func(t *testing.T) {
|
||||
// Mock GetAsFloat to return 1.0 (disable partial result) to avoid paramtable initialization
|
||||
mockGetAsFloat := mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsFloat")).Return(1.0).Build()
|
||||
defer mockGetAsFloat.UnPatch()
|
||||
|
||||
view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version)
|
||||
dist := NewDistribution(channelName, view)
|
||||
|
||||
// Add all segments to make it fully loaded
|
||||
for _, id := range growings {
|
||||
dist.growingSegments[id] = SegmentEntry{
|
||||
SegmentID: id,
|
||||
}
|
||||
dist.SyncTargetVersion(action, partitions)
|
||||
|
||||
// Update serviceable to calculate loaded ratio
|
||||
dist.updateServiceable("test")
|
||||
|
||||
// Should be serviceable now
|
||||
assert.True(t, dist.Serviceable())
|
||||
assert.Equal(t, float64(1), dist.queryView.GetLoadedRatio())
|
||||
assert.True(t, dist.queryView.syncedByCoord)
|
||||
})
|
||||
|
||||
t.Run("distribution not serviceable without sync even with full load", func(t *testing.T) {
|
||||
view := NewChannelQueryView(growings, sealedWithRowCount, partitions, version)
|
||||
dist := NewDistribution(channelName, view)
|
||||
|
||||
// Add all segments to make it fully loaded
|
||||
for _, id := range growings {
|
||||
dist.growingSegments[id] = SegmentEntry{
|
||||
SegmentID: id,
|
||||
}
|
||||
}
|
||||
for id := range sealedWithRowCount {
|
||||
dist.sealedSegments[id] = SegmentEntry{
|
||||
SegmentID: id,
|
||||
Offline: false,
|
||||
}
|
||||
}
|
||||
for id := range sealedWithRowCount {
|
||||
dist.sealedSegments[id] = SegmentEntry{
|
||||
SegmentID: id,
|
||||
Offline: false,
|
||||
}
|
||||
}
|
||||
|
||||
// Update serviceable to calculate loaded ratio but don't sync
|
||||
dist.updateServiceable("test")
|
||||
// Update serviceable to calculate loaded ratio but don't sync
|
||||
dist.updateServiceable("test")
|
||||
|
||||
// Should not be serviceable without sync (assuming partial result is disabled by default)
|
||||
// The exact behavior depends on paramtable configuration, but we test the basic structure
|
||||
assert.Equal(t, float64(1), dist.queryView.GetLoadedRatio(), "Load ratio should be 1.0")
|
||||
assert.False(t, dist.queryView.syncedByCoord, "Should not be synced by coord")
|
||||
// Should not be serviceable without sync (assuming partial result is disabled by default)
|
||||
// The exact behavior depends on paramtable configuration, but we test the basic structure
|
||||
assert.Equal(t, float64(1), dist.queryView.GetLoadedRatio(), "Load ratio should be 1.0")
|
||||
assert.False(t, dist.queryView.syncedByCoord, "Should not be synced by coord")
|
||||
|
||||
// Note: The actual serviceable result depends on partial result configuration
|
||||
// We focus on testing that the fields are set correctly
|
||||
})
|
||||
// Note: The actual serviceable result depends on partial result configuration
|
||||
// We focus on testing that the fields are set correctly
|
||||
})
|
||||
}
|
||||
|
||||
@ -1412,66 +1425,83 @@ func (s *DistributionSuite) TestSyncTargetVersion_RedundantGrowingLogic() {
|
||||
}
|
||||
|
||||
func TestPinReadableSegments(t *testing.T) {
|
||||
// Create test distribution
|
||||
queryView := NewChannelQueryView(nil, nil, []int64{1}, initialTargetVersion)
|
||||
dist := NewDistribution("test-channel", queryView)
|
||||
// Helper function to create test distribution
|
||||
setupDistribution := func() *distribution {
|
||||
queryView := NewChannelQueryView(nil, nil, []int64{1}, initialTargetVersion)
|
||||
dist := NewDistribution("test-channel", queryView)
|
||||
dist.AddDistributions([]SegmentEntry{
|
||||
{NodeID: 1, SegmentID: 1, PartitionID: 1, Version: 1},
|
||||
{NodeID: 1, SegmentID: 2, PartitionID: 1, Version: 1},
|
||||
}...)
|
||||
dist.SyncTargetVersion(&querypb.SyncAction{
|
||||
TargetVersion: 1000,
|
||||
SealedSegmentRowCount: map[int64]int64{1: 100, 2: 100},
|
||||
GrowingInTarget: []int64{},
|
||||
}, []int64{1})
|
||||
return dist
|
||||
}
|
||||
|
||||
// Add some test segments
|
||||
dist.AddDistributions([]SegmentEntry{
|
||||
{NodeID: 1, SegmentID: 1, PartitionID: 1, Version: 1},
|
||||
{NodeID: 1, SegmentID: 2, PartitionID: 1, Version: 1},
|
||||
}...)
|
||||
t.Run("requireFullResult_true_serviceable_false", func(t *testing.T) {
|
||||
dist := setupDistribution()
|
||||
|
||||
// Setup query view
|
||||
dist.SyncTargetVersion(&querypb.SyncAction{
|
||||
TargetVersion: 1000,
|
||||
SealedSegmentRowCount: map[int64]int64{1: 100, 2: 100},
|
||||
GrowingInTarget: []int64{},
|
||||
}, []int64{1})
|
||||
mockServiceable := mockey.Mock((*channelQueryView).Serviceable).Return(false).Build()
|
||||
defer mockServiceable.UnPatch()
|
||||
mockGetLoadedRatio := mockey.Mock((*channelQueryView).GetLoadedRatio).Return(0.8).Build()
|
||||
defer mockGetLoadedRatio.UnPatch()
|
||||
|
||||
// Test case 1: requireFullResult=true, Serviceable=false
|
||||
mockServiceable := mockey.Mock((*channelQueryView).Serviceable).Return(false).Build()
|
||||
mockGetLoadedRatio := mockey.Mock((*channelQueryView).GetLoadedRatio).Return(0.8).Build()
|
||||
sealed, growing, _, _, err := dist.PinReadableSegments(1.0, 1)
|
||||
|
||||
sealed, growing, _, _, err := dist.PinReadableSegments(1.0, 1)
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, sealed)
|
||||
assert.Nil(t, growing)
|
||||
assert.Contains(t, err.Error(), "channel distribution is not serviceable")
|
||||
})
|
||||
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, sealed)
|
||||
assert.Nil(t, growing)
|
||||
assert.Contains(t, err.Error(), "channel distribution is not serviceable")
|
||||
t.Run("requireFullResult_true_serviceable_true", func(t *testing.T) {
|
||||
dist := setupDistribution()
|
||||
|
||||
// Test case 2: requireFullResult=true, Serviceable=true
|
||||
mockServiceable.UnPatch()
|
||||
mockServiceable = mockey.Mock((*channelQueryView).Serviceable).Return(true).Build()
|
||||
mockServiceable := mockey.Mock((*channelQueryView).Serviceable).Return(true).Build()
|
||||
defer mockServiceable.UnPatch()
|
||||
mockGetLoadedRatio := mockey.Mock((*channelQueryView).GetLoadedRatio).Return(0.8).Build()
|
||||
defer mockGetLoadedRatio.UnPatch()
|
||||
|
||||
sealed, growing, _, _, err = dist.PinReadableSegments(1.0, 1)
|
||||
sealed, growing, _, _, err := dist.PinReadableSegments(1.0, 1)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, sealed)
|
||||
assert.NotNil(t, growing)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, sealed)
|
||||
assert.NotNil(t, growing)
|
||||
})
|
||||
|
||||
// Test case 3: requireFullResult=false, loadRatioSatisfy=false
|
||||
mockServiceable.UnPatch()
|
||||
mockGetLoadedRatio.UnPatch()
|
||||
mockGetLoadedRatio = mockey.Mock((*channelQueryView).GetLoadedRatio).Return(0.5).Build()
|
||||
t.Run("requireFullResult_false_loadRatioSatisfy_false", func(t *testing.T) {
|
||||
dist := setupDistribution()
|
||||
|
||||
sealed, growing, _, _, err = dist.PinReadableSegments(0.8, 1)
|
||||
mockServiceable := mockey.Mock((*channelQueryView).Serviceable).Return(false).Build()
|
||||
defer mockServiceable.UnPatch()
|
||||
mockGetLoadedRatio := mockey.Mock((*channelQueryView).GetLoadedRatio).Return(0.5).Build()
|
||||
defer mockGetLoadedRatio.UnPatch()
|
||||
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, sealed)
|
||||
assert.Nil(t, growing)
|
||||
assert.Contains(t, err.Error(), "channel distribution is not serviceable")
|
||||
sealed, growing, _, _, err := dist.PinReadableSegments(0.8, 1)
|
||||
|
||||
// Test case 4: requireFullResult=false, loadRatioSatisfy=true
|
||||
mockGetLoadedRatio.UnPatch()
|
||||
mockGetLoadedRatio = mockey.Mock((*channelQueryView).GetLoadedRatio).Return(0.9).Build()
|
||||
defer mockGetLoadedRatio.UnPatch()
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, sealed)
|
||||
assert.Nil(t, growing)
|
||||
assert.Contains(t, err.Error(), "channel distribution is not serviceable")
|
||||
})
|
||||
|
||||
sealed, growing, _, _, err = dist.PinReadableSegments(0.8, 1)
|
||||
t.Run("requireFullResult_false_loadRatioSatisfy_true", func(t *testing.T) {
|
||||
dist := setupDistribution()
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, sealed)
|
||||
assert.NotNil(t, growing)
|
||||
mockServiceable := mockey.Mock((*channelQueryView).Serviceable).Return(false).Build()
|
||||
defer mockServiceable.UnPatch()
|
||||
mockGetLoadedRatio := mockey.Mock((*channelQueryView).GetLoadedRatio).Return(0.9).Build()
|
||||
defer mockGetLoadedRatio.UnPatch()
|
||||
|
||||
sealed, growing, _, _, err := dist.PinReadableSegments(0.8, 1)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, sealed)
|
||||
assert.NotNil(t, growing)
|
||||
})
|
||||
}
|
||||
|
||||
func TestPinReadableSegments_ServiceableLogic(t *testing.T) {
|
||||
@ -1583,3 +1613,52 @@ func TestPinReadableSegments_EdgeCases(t *testing.T) {
|
||||
assert.NotNil(t, sealed)
|
||||
assert.NotNil(t, growing)
|
||||
}
|
||||
|
||||
// TestPinReadableSegments_PartialResultNotEmpty tests that when partial result is enabled
|
||||
// and loadedRatio is 1.0 but syncedByCoord is false, PinReadableSegments should return
|
||||
// partial result (filtered by query view's segment list) instead of empty segment list.
|
||||
// This covers the fix in commit 15bc5d0928: prevent empty segment list when partial result is enabled.
|
||||
func TestPinReadableSegments_PartialResultNotEmpty(t *testing.T) {
|
||||
// Mock: loadedRatio = 1.0 (all segments loaded)
|
||||
mockGetLoadedRatio := mockey.Mock((*channelQueryView).GetLoadedRatio).Return(1.0).Build()
|
||||
defer mockGetLoadedRatio.UnPatch()
|
||||
// Mock: Serviceable = false (syncedByCoord = false)
|
||||
mockServiceable := mockey.Mock((*channelQueryView).Serviceable).Return(false).Build()
|
||||
defer mockServiceable.UnPatch()
|
||||
|
||||
// Create distribution with query view
|
||||
queryView := NewChannelQueryView(nil, nil, []int64{1}, initialTargetVersion)
|
||||
dist := NewDistribution("test-channel", queryView)
|
||||
|
||||
// Add segments with unreadable target version (simulating not synced by coord)
|
||||
dist.AddDistributions([]SegmentEntry{
|
||||
{NodeID: 1, SegmentID: 1, PartitionID: 1, Version: 1, TargetVersion: unreadableTargetVersion},
|
||||
{NodeID: 1, SegmentID: 2, PartitionID: 1, Version: 1, TargetVersion: unreadableTargetVersion},
|
||||
}...)
|
||||
|
||||
// Setup query view with segments in target
|
||||
dist.SyncTargetVersion(&querypb.SyncAction{
|
||||
TargetVersion: 1000,
|
||||
SealedSegmentRowCount: map[int64]int64{1: 100, 2: 100},
|
||||
GrowingInTarget: []int64{},
|
||||
}, []int64{1})
|
||||
|
||||
// Call PinReadableSegments with partial result enabled (requiredLoadRatio < 1.0)
|
||||
sealed, growing, _, _, err := dist.PinReadableSegments(0.8, 1)
|
||||
|
||||
// Should succeed because loadRatioSatisfy = true (1.0 >= 0.8)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, sealed)
|
||||
assert.NotNil(t, growing)
|
||||
|
||||
// Count actual sealed segments returned
|
||||
actualSealedCount := 0
|
||||
for _, item := range sealed {
|
||||
actualSealedCount += len(item.Segments)
|
||||
}
|
||||
|
||||
// Should return non-empty segment list (partial result filtered by query view)
|
||||
// Before the fix, this would return 0 because segments were filtered by target version
|
||||
// After the fix, this returns 2 because segments are filtered by query view's segment list
|
||||
assert.Equal(t, 2, actualSealedCount, "Should return segments filtered by query view, not empty list")
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user