mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
When autoID is enabled, the preimport task estimates row distribution by evenly dividing the total row count (numRows) across all vchannels: `estimatedCount = numRows / vchannelNum`. However, the actual import task hashes real auto-generated IDs to determine the target vchannel. This mismatch can lead to inaccurate row distribution estimation in such corner cases: - Importing 1 row into 2 vchannels: • Preimport: 1 / 2 = 0 → both v0 and v1 are estimated to have 0 rows • Import: real autoID (e.g., 457975852966809057) hashes to v1 → actual result: v0 = 0, v1 = 1 To resolve such corner case, we now allocate at least one segment for each vchannel when autoID is enabled, ensuring all vchannels are prepared to receive data even if no rows are estimated for them. issue: https://github.com/milvus-io/milvus/issues/41759 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
320 lines
8.3 KiB
Go
320 lines
8.3 KiB
Go
package importv2
|
|
|
|
import (
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
|
"github.com/milvus-io/milvus/internal/storage"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/conc"
|
|
)
|
|
|
|
func TestNewHashedData(t *testing.T) {
|
|
schema := &schemapb.CollectionSchema{
|
|
Fields: []*schemapb.FieldSchema{
|
|
{
|
|
FieldID: 100,
|
|
Name: "pk",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPrimaryKey: true,
|
|
},
|
|
},
|
|
}
|
|
|
|
got, err := newHashedData(schema, 2, 2)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, 2, len(got))
|
|
for i := 0; i < 2; i++ {
|
|
assert.Equal(t, 2, len(got[i]))
|
|
for j := 0; j < 2; j++ {
|
|
assert.NotNil(t, got[i][j])
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestHashData(t *testing.T) {
|
|
schema := &schemapb.CollectionSchema{
|
|
Fields: []*schemapb.FieldSchema{
|
|
{
|
|
FieldID: 100,
|
|
Name: "pk",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPrimaryKey: true,
|
|
},
|
|
{
|
|
FieldID: 101,
|
|
Name: "partition_key",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPartitionKey: true,
|
|
},
|
|
},
|
|
}
|
|
|
|
mockTask := NewMockTask(t)
|
|
mockTask.On("GetSchema").Return(schema).Maybe()
|
|
mockTask.On("GetVchannels").Return([]string{"channel1", "channel2"}).Maybe()
|
|
mockTask.On("GetPartitionIDs").Return([]int64{1, 2}).Maybe()
|
|
mockTask.On("Execute").Return([]*conc.Future[any]{}).Maybe()
|
|
mockTask.On("GetJobID").Return(int64(1)).Maybe()
|
|
mockTask.On("GetTaskID").Return(int64(1)).Maybe()
|
|
mockTask.On("GetCollectionID").Return(int64(1)).Maybe()
|
|
|
|
rows, err := storage.NewInsertData(schema)
|
|
assert.NoError(t, err)
|
|
|
|
// Add 1000 rows of test data
|
|
for i := 0; i < 1000; i++ {
|
|
rows.Append(map[int64]interface{}{
|
|
100: int64(i), // primary key
|
|
101: int64(i%2 + 1), // partition key, alternates between 1 and 2
|
|
})
|
|
}
|
|
|
|
got, err := HashData(mockTask, rows)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, 2, len(got))
|
|
|
|
// Verify data distribution
|
|
totalRows := 0
|
|
for i := 0; i < 2; i++ {
|
|
assert.Equal(t, 2, len(got[i]))
|
|
for j := 0; j < 2; j++ {
|
|
assert.NotNil(t, got[i][j])
|
|
totalRows += got[i][j].GetRowNum()
|
|
}
|
|
}
|
|
assert.Equal(t, 1000, totalRows)
|
|
}
|
|
|
|
func TestHashDeleteData(t *testing.T) {
|
|
schema := &schemapb.CollectionSchema{
|
|
Fields: []*schemapb.FieldSchema{
|
|
{
|
|
FieldID: 100,
|
|
Name: "pk",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPrimaryKey: true,
|
|
},
|
|
},
|
|
}
|
|
|
|
mockTask := NewMockTask(t)
|
|
mockTask.On("GetSchema").Return(schema).Maybe()
|
|
mockTask.On("GetVchannels").Return([]string{"channel1", "channel2"}).Maybe()
|
|
mockTask.On("Execute").Return([]*conc.Future[any]{}).Maybe()
|
|
mockTask.On("GetJobID").Return(int64(1)).Maybe()
|
|
mockTask.On("GetTaskID").Return(int64(1)).Maybe()
|
|
mockTask.On("GetCollectionID").Return(int64(1)).Maybe()
|
|
|
|
delData := storage.NewDeleteData(nil, nil)
|
|
delData.Append(storage.NewInt64PrimaryKey(1), 1)
|
|
|
|
got, err := HashDeleteData(mockTask, delData)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, 2, len(got))
|
|
for i := 0; i < 2; i++ {
|
|
assert.NotNil(t, got[i])
|
|
}
|
|
}
|
|
|
|
func TestGetRowsStats(t *testing.T) {
|
|
t.Run("test non-autoID", func(t *testing.T) {
|
|
schema := &schemapb.CollectionSchema{
|
|
Fields: []*schemapb.FieldSchema{
|
|
{
|
|
FieldID: 100,
|
|
Name: "pk",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPrimaryKey: true,
|
|
},
|
|
{
|
|
FieldID: 101,
|
|
Name: "partition_key",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPartitionKey: true,
|
|
},
|
|
},
|
|
}
|
|
|
|
mockTask := NewMockTask(t)
|
|
mockTask.On("GetSchema").Return(schema).Maybe()
|
|
mockTask.On("GetVchannels").Return([]string{"channel1", "channel2"}).Maybe()
|
|
mockTask.On("GetPartitionIDs").Return([]int64{1, 2}).Maybe()
|
|
mockTask.On("Execute").Return([]*conc.Future[any]{}).Maybe()
|
|
mockTask.On("GetJobID").Return(int64(1)).Maybe()
|
|
mockTask.On("GetTaskID").Return(int64(1)).Maybe()
|
|
mockTask.On("GetCollectionID").Return(int64(1)).Maybe()
|
|
|
|
rows, err := storage.NewInsertData(schema)
|
|
assert.NoError(t, err)
|
|
|
|
// Add 1000 rows of test data
|
|
for i := 0; i < 1000; i++ {
|
|
rows.Append(map[int64]interface{}{
|
|
100: int64(i), // primary key
|
|
101: int64(i%2 + 1), // partition key, alternates between 1 and 2
|
|
})
|
|
}
|
|
|
|
got, err := GetRowsStats(mockTask, rows)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, 2, len(got))
|
|
|
|
// Verify statistics
|
|
totalRows := int64(0)
|
|
for _, stats := range got {
|
|
assert.NotNil(t, stats)
|
|
assert.NotNil(t, stats.PartitionRows)
|
|
assert.NotNil(t, stats.PartitionDataSize)
|
|
|
|
for _, count := range stats.PartitionRows {
|
|
totalRows += count
|
|
}
|
|
}
|
|
assert.Equal(t, int64(1000), totalRows)
|
|
})
|
|
|
|
t.Run("test autoID", func(t *testing.T) {
|
|
schema := &schemapb.CollectionSchema{
|
|
Fields: []*schemapb.FieldSchema{
|
|
{
|
|
FieldID: 100,
|
|
Name: "pk",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPrimaryKey: true,
|
|
AutoID: true,
|
|
},
|
|
{
|
|
FieldID: 101,
|
|
Name: "partition_key",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPartitionKey: true,
|
|
},
|
|
},
|
|
}
|
|
|
|
mockTask := NewMockTask(t)
|
|
mockTask.On("GetSchema").Return(schema).Maybe()
|
|
mockTask.On("GetVchannels").Return([]string{"channel1", "channel2"}).Maybe()
|
|
mockTask.On("GetPartitionIDs").Return([]int64{1, 2}).Maybe()
|
|
mockTask.On("Execute").Return([]*conc.Future[any]{}).Maybe()
|
|
mockTask.On("GetJobID").Return(int64(1)).Maybe()
|
|
mockTask.On("GetTaskID").Return(int64(1)).Maybe()
|
|
mockTask.On("GetCollectionID").Return(int64(1)).Maybe()
|
|
|
|
rows, err := storage.NewInsertData(schema)
|
|
assert.NoError(t, err)
|
|
|
|
// Add 1000 rows of test data
|
|
for i := 0; i < 1000; i++ {
|
|
rows.Append(map[int64]interface{}{
|
|
101: int64(i%2 + 1), // partition key, alternates between 1 and 2
|
|
})
|
|
}
|
|
|
|
got, err := GetRowsStats(mockTask, rows)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, 2, len(got))
|
|
|
|
// Verify statistics and data distribution
|
|
totalRows := int64(0)
|
|
channelRows := make([]int64, 2)
|
|
|
|
channelIndex := 0
|
|
for _, stats := range got {
|
|
assert.NotNil(t, stats)
|
|
assert.NotNil(t, stats.PartitionRows)
|
|
assert.NotNil(t, stats.PartitionDataSize)
|
|
|
|
channelTotal := int64(0)
|
|
for _, count := range stats.PartitionRows {
|
|
channelTotal += count
|
|
}
|
|
channelRows[channelIndex] = channelTotal
|
|
totalRows += channelTotal
|
|
channelIndex++
|
|
}
|
|
|
|
// Verify total rows
|
|
assert.Equal(t, int64(1000), totalRows)
|
|
|
|
// Verify data is evenly distributed across channels
|
|
// Allow for small differences due to rounding
|
|
expectedPerChannel := totalRows / 2
|
|
for _, count := range channelRows {
|
|
assert.InDelta(t, expectedPerChannel, count, 1)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestGetDeleteStats(t *testing.T) {
|
|
schema := &schemapb.CollectionSchema{
|
|
Fields: []*schemapb.FieldSchema{
|
|
{
|
|
FieldID: 100,
|
|
Name: "pk",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPrimaryKey: true,
|
|
},
|
|
},
|
|
}
|
|
|
|
mockTask := NewMockTask(t)
|
|
mockTask.On("GetSchema").Return(schema).Maybe()
|
|
mockTask.On("GetVchannels").Return([]string{"channel1", "channel2"}).Maybe()
|
|
mockTask.On("GetPartitionIDs").Return([]int64{1}).Maybe()
|
|
mockTask.On("Execute").Return([]*conc.Future[any]{}).Maybe()
|
|
mockTask.On("GetJobID").Return(int64(1)).Maybe()
|
|
mockTask.On("GetTaskID").Return(int64(1)).Maybe()
|
|
mockTask.On("GetCollectionID").Return(int64(1)).Maybe()
|
|
|
|
delData := storage.NewDeleteData(nil, nil)
|
|
delData.Append(storage.NewInt64PrimaryKey(1), 1)
|
|
|
|
got, err := GetDeleteStats(mockTask, delData)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, 2, len(got))
|
|
for _, stats := range got {
|
|
assert.NotNil(t, stats)
|
|
assert.NotNil(t, stats.PartitionRows)
|
|
assert.NotNil(t, stats.PartitionDataSize)
|
|
}
|
|
}
|
|
|
|
func TestMergeHashedStats(t *testing.T) {
|
|
src := map[string]*datapb.PartitionImportStats{
|
|
"channel1": {
|
|
PartitionRows: map[int64]int64{
|
|
1: 10,
|
|
2: 20,
|
|
},
|
|
PartitionDataSize: map[int64]int64{
|
|
1: 100,
|
|
2: 200,
|
|
},
|
|
},
|
|
}
|
|
|
|
dst := map[string]*datapb.PartitionImportStats{
|
|
"channel1": {
|
|
PartitionRows: map[int64]int64{
|
|
1: 5,
|
|
2: 15,
|
|
},
|
|
PartitionDataSize: map[int64]int64{
|
|
1: 50,
|
|
2: 150,
|
|
},
|
|
},
|
|
}
|
|
|
|
MergeHashedStats(src, dst)
|
|
|
|
assert.Equal(t, int64(15), dst["channel1"].PartitionRows[1])
|
|
assert.Equal(t, int64(35), dst["channel1"].PartitionRows[2])
|
|
assert.Equal(t, int64(150), dst["channel1"].PartitionDataSize[1])
|
|
assert.Equal(t, int64(350), dst["channel1"].PartitionDataSize[2])
|
|
}
|