yihao.dai 36e9e41627
fix: Fix no candidate segments error for small import (#41771)
When autoID is enabled, the preimport task estimates row distribution by
evenly dividing the total row count (numRows) across all vchannels:
`estimatedCount = numRows / vchannelNum`.
However, the actual import task hashes real auto-generated IDs to
determine
the target vchannel. This mismatch can lead to inaccurate row
distribution estimation
in such corner cases:
- Importing 1 row into 2 vchannels:
				• Preimport: 1 / 2 = 0 → both v0 and v1 are estimated to have 0 rows
				• Import: real autoID (e.g., 457975852966809057) hashes to v1
				  → actual result: v0 = 0, v1 = 1

To resolve such corner case, we now allocate at least one segment for
each vchannel
when autoID is enabled, ensuring all vchannels are prepared to receive
data even
if no rows are estimated for them.

issue: https://github.com/milvus-io/milvus/issues/41759

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
2025-05-14 15:30:21 +08:00

320 lines
8.3 KiB
Go

package importv2
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/conc"
)
func TestNewHashedData(t *testing.T) {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
},
},
}
got, err := newHashedData(schema, 2, 2)
assert.NoError(t, err)
assert.Equal(t, 2, len(got))
for i := 0; i < 2; i++ {
assert.Equal(t, 2, len(got[i]))
for j := 0; j < 2; j++ {
assert.NotNil(t, got[i][j])
}
}
}
func TestHashData(t *testing.T) {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
},
{
FieldID: 101,
Name: "partition_key",
DataType: schemapb.DataType_Int64,
IsPartitionKey: true,
},
},
}
mockTask := NewMockTask(t)
mockTask.On("GetSchema").Return(schema).Maybe()
mockTask.On("GetVchannels").Return([]string{"channel1", "channel2"}).Maybe()
mockTask.On("GetPartitionIDs").Return([]int64{1, 2}).Maybe()
mockTask.On("Execute").Return([]*conc.Future[any]{}).Maybe()
mockTask.On("GetJobID").Return(int64(1)).Maybe()
mockTask.On("GetTaskID").Return(int64(1)).Maybe()
mockTask.On("GetCollectionID").Return(int64(1)).Maybe()
rows, err := storage.NewInsertData(schema)
assert.NoError(t, err)
// Add 1000 rows of test data
for i := 0; i < 1000; i++ {
rows.Append(map[int64]interface{}{
100: int64(i), // primary key
101: int64(i%2 + 1), // partition key, alternates between 1 and 2
})
}
got, err := HashData(mockTask, rows)
assert.NoError(t, err)
assert.Equal(t, 2, len(got))
// Verify data distribution
totalRows := 0
for i := 0; i < 2; i++ {
assert.Equal(t, 2, len(got[i]))
for j := 0; j < 2; j++ {
assert.NotNil(t, got[i][j])
totalRows += got[i][j].GetRowNum()
}
}
assert.Equal(t, 1000, totalRows)
}
func TestHashDeleteData(t *testing.T) {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
},
},
}
mockTask := NewMockTask(t)
mockTask.On("GetSchema").Return(schema).Maybe()
mockTask.On("GetVchannels").Return([]string{"channel1", "channel2"}).Maybe()
mockTask.On("Execute").Return([]*conc.Future[any]{}).Maybe()
mockTask.On("GetJobID").Return(int64(1)).Maybe()
mockTask.On("GetTaskID").Return(int64(1)).Maybe()
mockTask.On("GetCollectionID").Return(int64(1)).Maybe()
delData := storage.NewDeleteData(nil, nil)
delData.Append(storage.NewInt64PrimaryKey(1), 1)
got, err := HashDeleteData(mockTask, delData)
assert.NoError(t, err)
assert.Equal(t, 2, len(got))
for i := 0; i < 2; i++ {
assert.NotNil(t, got[i])
}
}
func TestGetRowsStats(t *testing.T) {
t.Run("test non-autoID", func(t *testing.T) {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
},
{
FieldID: 101,
Name: "partition_key",
DataType: schemapb.DataType_Int64,
IsPartitionKey: true,
},
},
}
mockTask := NewMockTask(t)
mockTask.On("GetSchema").Return(schema).Maybe()
mockTask.On("GetVchannels").Return([]string{"channel1", "channel2"}).Maybe()
mockTask.On("GetPartitionIDs").Return([]int64{1, 2}).Maybe()
mockTask.On("Execute").Return([]*conc.Future[any]{}).Maybe()
mockTask.On("GetJobID").Return(int64(1)).Maybe()
mockTask.On("GetTaskID").Return(int64(1)).Maybe()
mockTask.On("GetCollectionID").Return(int64(1)).Maybe()
rows, err := storage.NewInsertData(schema)
assert.NoError(t, err)
// Add 1000 rows of test data
for i := 0; i < 1000; i++ {
rows.Append(map[int64]interface{}{
100: int64(i), // primary key
101: int64(i%2 + 1), // partition key, alternates between 1 and 2
})
}
got, err := GetRowsStats(mockTask, rows)
assert.NoError(t, err)
assert.Equal(t, 2, len(got))
// Verify statistics
totalRows := int64(0)
for _, stats := range got {
assert.NotNil(t, stats)
assert.NotNil(t, stats.PartitionRows)
assert.NotNil(t, stats.PartitionDataSize)
for _, count := range stats.PartitionRows {
totalRows += count
}
}
assert.Equal(t, int64(1000), totalRows)
})
t.Run("test autoID", func(t *testing.T) {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
AutoID: true,
},
{
FieldID: 101,
Name: "partition_key",
DataType: schemapb.DataType_Int64,
IsPartitionKey: true,
},
},
}
mockTask := NewMockTask(t)
mockTask.On("GetSchema").Return(schema).Maybe()
mockTask.On("GetVchannels").Return([]string{"channel1", "channel2"}).Maybe()
mockTask.On("GetPartitionIDs").Return([]int64{1, 2}).Maybe()
mockTask.On("Execute").Return([]*conc.Future[any]{}).Maybe()
mockTask.On("GetJobID").Return(int64(1)).Maybe()
mockTask.On("GetTaskID").Return(int64(1)).Maybe()
mockTask.On("GetCollectionID").Return(int64(1)).Maybe()
rows, err := storage.NewInsertData(schema)
assert.NoError(t, err)
// Add 1000 rows of test data
for i := 0; i < 1000; i++ {
rows.Append(map[int64]interface{}{
101: int64(i%2 + 1), // partition key, alternates between 1 and 2
})
}
got, err := GetRowsStats(mockTask, rows)
assert.NoError(t, err)
assert.Equal(t, 2, len(got))
// Verify statistics and data distribution
totalRows := int64(0)
channelRows := make([]int64, 2)
channelIndex := 0
for _, stats := range got {
assert.NotNil(t, stats)
assert.NotNil(t, stats.PartitionRows)
assert.NotNil(t, stats.PartitionDataSize)
channelTotal := int64(0)
for _, count := range stats.PartitionRows {
channelTotal += count
}
channelRows[channelIndex] = channelTotal
totalRows += channelTotal
channelIndex++
}
// Verify total rows
assert.Equal(t, int64(1000), totalRows)
// Verify data is evenly distributed across channels
// Allow for small differences due to rounding
expectedPerChannel := totalRows / 2
for _, count := range channelRows {
assert.InDelta(t, expectedPerChannel, count, 1)
}
})
}
func TestGetDeleteStats(t *testing.T) {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
},
},
}
mockTask := NewMockTask(t)
mockTask.On("GetSchema").Return(schema).Maybe()
mockTask.On("GetVchannels").Return([]string{"channel1", "channel2"}).Maybe()
mockTask.On("GetPartitionIDs").Return([]int64{1}).Maybe()
mockTask.On("Execute").Return([]*conc.Future[any]{}).Maybe()
mockTask.On("GetJobID").Return(int64(1)).Maybe()
mockTask.On("GetTaskID").Return(int64(1)).Maybe()
mockTask.On("GetCollectionID").Return(int64(1)).Maybe()
delData := storage.NewDeleteData(nil, nil)
delData.Append(storage.NewInt64PrimaryKey(1), 1)
got, err := GetDeleteStats(mockTask, delData)
assert.NoError(t, err)
assert.Equal(t, 2, len(got))
for _, stats := range got {
assert.NotNil(t, stats)
assert.NotNil(t, stats.PartitionRows)
assert.NotNil(t, stats.PartitionDataSize)
}
}
func TestMergeHashedStats(t *testing.T) {
src := map[string]*datapb.PartitionImportStats{
"channel1": {
PartitionRows: map[int64]int64{
1: 10,
2: 20,
},
PartitionDataSize: map[int64]int64{
1: 100,
2: 200,
},
},
}
dst := map[string]*datapb.PartitionImportStats{
"channel1": {
PartitionRows: map[int64]int64{
1: 5,
2: 15,
},
PartitionDataSize: map[int64]int64{
1: 50,
2: 150,
},
},
}
MergeHashedStats(src, dst)
assert.Equal(t, int64(15), dst["channel1"].PartitionRows[1])
assert.Equal(t, int64(35), dst["channel1"].PartitionRows[2])
assert.Equal(t, int64(150), dst["channel1"].PartitionDataSize[1])
assert.Equal(t, int64(350), dst["channel1"].PartitionDataSize[2])
}