mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-05 10:22:41 +08:00
fix: Add field data alignment validation to prevent partial update panic (#46177)
issue: #46176 - Add checkAligned validation before processing partial update field data to prevent index out of range panic when field data arrays have mismatched lengths - Fix GetNumRowOfFieldDataWithSchema to handle Timestamptz string format and Geometry WKT format properly - Add unit tests for empty data array scenarios in partial update --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
728cdc15b2
commit
d7050c417f
@ -291,11 +291,10 @@ func (it *upsertTask) queryPreExecute(ctx context.Context) error {
|
||||
zap.Int64("latency", tr.ElapseSpan().Milliseconds()))
|
||||
|
||||
// set field id for user passed field data, prepare for merge logic
|
||||
upsertFieldData := it.upsertMsg.InsertMsg.GetFieldsData()
|
||||
if len(upsertFieldData) == 0 {
|
||||
if len(it.upsertMsg.InsertMsg.GetFieldsData()) == 0 {
|
||||
return merr.WrapErrParameterInvalidMsg("upsert field data is empty")
|
||||
}
|
||||
for _, fieldData := range upsertFieldData {
|
||||
for _, fieldData := range it.upsertMsg.InsertMsg.GetFieldsData() {
|
||||
fieldName := fieldData.GetFieldName()
|
||||
if fieldData.GetIsDynamic() {
|
||||
fieldName = "$meta"
|
||||
@ -318,6 +317,12 @@ func (it *upsertTask) queryPreExecute(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Validate field data alignment before processing to prevent index out of range panic
|
||||
if err := newValidateUtil().checkAligned(it.upsertMsg.InsertMsg.GetFieldsData(), it.schema.schemaHelper, uint64(upsertIDSize)); err != nil {
|
||||
log.Warn("check field data aligned failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// Two nullable data formats are supported:
|
||||
//
|
||||
// COMPRESSED FORMAT (SDK format, before validateUtil.fillWithValue processing):
|
||||
@ -395,7 +400,7 @@ func (it *upsertTask) queryPreExecute(ctx context.Context) error {
|
||||
return merr.WrapErrParameterInvalidMsg("primary key not found in exist data mapping")
|
||||
}
|
||||
typeutil.AppendFieldData(it.insertFieldData, existFieldData, int64(existIndex))
|
||||
err := typeutil.UpdateFieldData(it.insertFieldData, upsertFieldData, int64(baseIdx), int64(idx))
|
||||
err := typeutil.UpdateFieldData(it.insertFieldData, it.upsertMsg.InsertMsg.GetFieldsData(), int64(baseIdx), int64(idx))
|
||||
baseIdx += 1
|
||||
if err != nil {
|
||||
log.Info("update field data failed", zap.Error(err))
|
||||
|
||||
@ -845,18 +845,41 @@ func TestUpdateTask_queryPreExecute_Success(t *testing.T) {
|
||||
FieldName: "id",
|
||||
FieldId: 100,
|
||||
Type: schemapb.DataType_Int64,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_LongData{
|
||||
LongData: &schemapb.LongArray{Data: []int64{1, 2, 3}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldName: "name",
|
||||
FieldId: 102,
|
||||
Type: schemapb.DataType_VarChar,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_StringData{
|
||||
StringData: &schemapb.StringArray{Data: []string{"test1", "test2", "test3"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldName: "vector",
|
||||
FieldId: 101,
|
||||
Type: schemapb.DataType_FloatVector,
|
||||
Field: &schemapb.FieldData_Vectors{
|
||||
Vectors: &schemapb.VectorField{
|
||||
Dim: 128,
|
||||
Data: &schemapb.VectorField_FloatVector{
|
||||
FloatVector: &schemapb.FloatArray{Data: make([]float32, 384)}, // 3 * 128
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
NumRows: 3,
|
||||
},
|
||||
},
|
||||
}
|
||||
@ -1600,3 +1623,251 @@ func TestUpsertTask_NoDuplicatePK(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, hasDuplicate, "should not have duplicate primary keys")
|
||||
}
|
||||
|
||||
// TestUpsertTask_queryPreExecute_EmptyDataArray tests the scenario where:
|
||||
// 1. Partial update is enabled
|
||||
// 2. Three columns are passed: pk (a), vector (b), scalar (c)
|
||||
// 3. Columns a and b have 10 rows of data, column c has FieldData but empty data array
|
||||
// 4. Verifies both nullable and non-nullable scenarios for column c
|
||||
func TestUpsertTask_queryPreExecute_EmptyDataArray(t *testing.T) {
|
||||
numRows := 10
|
||||
dim := 128
|
||||
|
||||
t.Run("scalar field with empty data array nullable field", func(t *testing.T) {
|
||||
// Schema with nullable scalar field c
|
||||
schema := newSchemaInfo(&schemapb.CollectionSchema{
|
||||
Name: "test_empty_data_array",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{FieldID: 100, Name: "a", IsPrimaryKey: true, DataType: schemapb.DataType_Int64},
|
||||
{
|
||||
FieldID: 101,
|
||||
Name: "b",
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{Key: "dim", Value: "128"},
|
||||
},
|
||||
},
|
||||
{FieldID: 102, Name: "c", DataType: schemapb.DataType_Int32, Nullable: true},
|
||||
},
|
||||
})
|
||||
|
||||
// Upsert data: a (pk, 10 rows), b (vector, 10 rows), c (scalar, FieldData exists but data array is empty)
|
||||
pkData := make([]int64, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
pkData[i] = int64(i + 1)
|
||||
}
|
||||
vectorData := make([]float32, numRows*dim)
|
||||
|
||||
upsertData := []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "a", FieldId: 100, Type: schemapb.DataType_Int64,
|
||||
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: pkData}}}},
|
||||
},
|
||||
{
|
||||
FieldName: "b", FieldId: 101, Type: schemapb.DataType_FloatVector,
|
||||
Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Dim: int64(dim), Data: &schemapb.VectorField_FloatVector{FloatVector: &schemapb.FloatArray{Data: vectorData}}}},
|
||||
},
|
||||
{
|
||||
// c has FieldData but empty data array
|
||||
FieldName: "c", FieldId: 102, Type: schemapb.DataType_Int32,
|
||||
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: []int32{}}}}},
|
||||
},
|
||||
}
|
||||
|
||||
// Query result returns empty (all are new inserts)
|
||||
mockQueryResult := &milvuspb.QueryResults{
|
||||
Status: merr.Success(),
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "a", FieldId: 100, Type: schemapb.DataType_Int64,
|
||||
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: []int64{}}}}},
|
||||
},
|
||||
{
|
||||
FieldName: "b", FieldId: 101, Type: schemapb.DataType_FloatVector,
|
||||
Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Dim: int64(dim), Data: &schemapb.VectorField_FloatVector{FloatVector: &schemapb.FloatArray{Data: []float32{}}}}},
|
||||
},
|
||||
{
|
||||
FieldName: "c", FieldId: 102, Type: schemapb.DataType_Int32,
|
||||
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: []int32{}}}}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
mockey.PatchConvey("test nullable field", t, func() {
|
||||
// Setup mocks using mockey
|
||||
mockey.Mock(GetReplicateID).Return("", nil).Build()
|
||||
mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build()
|
||||
mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{updateTimestamp: 12345}, nil).Build()
|
||||
mockey.Mock((*MetaCache).GetCollectionSchema).Return(schema, nil).Build()
|
||||
mockey.Mock(isPartitionKeyMode).Return(false, nil).Build()
|
||||
mockey.Mock((*MetaCache).GetPartitionInfo).Return(&partitionInfo{name: "_default"}, nil).Build()
|
||||
mockey.Mock((*MetaCache).GetDatabaseInfo).Return(&databaseInfo{dbID: 0}, nil).Build()
|
||||
mockey.Mock(retrieveByPKs).Return(mockQueryResult, segcore.StorageCost{}, nil).Build()
|
||||
|
||||
globalMetaCache = &MetaCache{}
|
||||
|
||||
// Setup idAllocator
|
||||
ctx := context.Background()
|
||||
rc := mocks.NewMockRootCoordClient(t)
|
||||
rc.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{
|
||||
Status: merr.Status(nil),
|
||||
ID: 1000,
|
||||
Count: uint32(numRows),
|
||||
}, nil).Maybe()
|
||||
idAllocator, err := allocator.NewIDAllocator(ctx, rc, 0)
|
||||
assert.NoError(t, err)
|
||||
idAllocator.Start()
|
||||
defer idAllocator.Close()
|
||||
|
||||
task := &upsertTask{
|
||||
ctx: ctx,
|
||||
schema: schema,
|
||||
req: &milvuspb.UpsertRequest{
|
||||
CollectionName: "test_empty_data_array",
|
||||
FieldsData: upsertData,
|
||||
NumRows: uint32(numRows),
|
||||
},
|
||||
upsertMsg: &msgstream.UpsertMsg{
|
||||
InsertMsg: &msgstream.InsertMsg{
|
||||
InsertRequest: &msgpb.InsertRequest{
|
||||
CollectionName: "test_empty_data_array",
|
||||
FieldsData: upsertData,
|
||||
NumRows: uint64(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
idAllocator: idAllocator,
|
||||
result: &milvuspb.MutationResult{},
|
||||
node: &Proxy{},
|
||||
}
|
||||
|
||||
// case1: test upsert
|
||||
err = task.PreExecute(ctx)
|
||||
assert.Error(t, err)
|
||||
|
||||
// case2: test partial update
|
||||
task.req.PartialUpdate = true
|
||||
err = task.PreExecute(ctx)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("scalar field with empty data array - non-nullable field", func(t *testing.T) {
|
||||
// Schema with non-nullable scalar field c
|
||||
schema := newSchemaInfo(&schemapb.CollectionSchema{
|
||||
Name: "test_empty_data_array_non_nullable",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{FieldID: 100, Name: "a", IsPrimaryKey: true, DataType: schemapb.DataType_Int64},
|
||||
{
|
||||
FieldID: 101,
|
||||
Name: "b",
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{Key: "dim", Value: "128"},
|
||||
},
|
||||
},
|
||||
{FieldID: 102, Name: "c", DataType: schemapb.DataType_Int32, Nullable: false},
|
||||
},
|
||||
})
|
||||
|
||||
// Upsert data: a (pk, 10 rows), b (vector, 10 rows), c (scalar, FieldData exists but data array is empty)
|
||||
pkData := make([]int64, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
pkData[i] = int64(i + 1)
|
||||
}
|
||||
vectorData := make([]float32, numRows*dim)
|
||||
|
||||
upsertData := []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "a", FieldId: 100, Type: schemapb.DataType_Int64,
|
||||
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: pkData}}}},
|
||||
},
|
||||
{
|
||||
FieldName: "b", FieldId: 101, Type: schemapb.DataType_FloatVector,
|
||||
Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Dim: int64(dim), Data: &schemapb.VectorField_FloatVector{FloatVector: &schemapb.FloatArray{Data: vectorData}}}},
|
||||
},
|
||||
{
|
||||
// c has FieldData but empty data array - this should cause validation error for non-nullable field
|
||||
FieldName: "c", FieldId: 102, Type: schemapb.DataType_Int32,
|
||||
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: []int32{}}}}},
|
||||
},
|
||||
}
|
||||
|
||||
// Query result returns empty (all are new inserts)
|
||||
mockQueryResult := &milvuspb.QueryResults{
|
||||
Status: merr.Success(),
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "a", FieldId: 100, Type: schemapb.DataType_Int64,
|
||||
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: []int64{}}}}},
|
||||
},
|
||||
{
|
||||
FieldName: "b", FieldId: 101, Type: schemapb.DataType_FloatVector,
|
||||
Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Dim: int64(dim), Data: &schemapb.VectorField_FloatVector{FloatVector: &schemapb.FloatArray{Data: []float32{}}}}},
|
||||
},
|
||||
{
|
||||
FieldName: "c", FieldId: 102, Type: schemapb.DataType_Int32,
|
||||
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: []int32{}}}}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
mockey.PatchConvey("test non-nullable field", t, func() {
|
||||
// Setup mocks using mockey
|
||||
mockey.Mock(GetReplicateID).Return("", nil).Build()
|
||||
mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build()
|
||||
mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{updateTimestamp: 12345}, nil).Build()
|
||||
mockey.Mock((*MetaCache).GetCollectionSchema).Return(schema, nil).Build()
|
||||
mockey.Mock(isPartitionKeyMode).Return(false, nil).Build()
|
||||
mockey.Mock((*MetaCache).GetPartitionInfo).Return(&partitionInfo{name: "_default"}, nil).Build()
|
||||
mockey.Mock((*MetaCache).GetDatabaseInfo).Return(&databaseInfo{dbID: 0}, nil).Build()
|
||||
mockey.Mock(retrieveByPKs).Return(mockQueryResult, segcore.StorageCost{}, nil).Build()
|
||||
|
||||
globalMetaCache = &MetaCache{}
|
||||
|
||||
// Setup idAllocator
|
||||
ctx := context.Background()
|
||||
rc := mocks.NewMockRootCoordClient(t)
|
||||
rc.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{
|
||||
Status: merr.Status(nil),
|
||||
ID: 1000,
|
||||
Count: uint32(numRows),
|
||||
}, nil).Maybe()
|
||||
idAllocator, err := allocator.NewIDAllocator(ctx, rc, 0)
|
||||
assert.NoError(t, err)
|
||||
idAllocator.Start()
|
||||
defer idAllocator.Close()
|
||||
|
||||
task := &upsertTask{
|
||||
ctx: ctx,
|
||||
schema: schema,
|
||||
req: &milvuspb.UpsertRequest{
|
||||
CollectionName: "test_empty_data_array_non_nullable",
|
||||
FieldsData: upsertData,
|
||||
NumRows: uint32(numRows),
|
||||
},
|
||||
upsertMsg: &msgstream.UpsertMsg{
|
||||
InsertMsg: &msgstream.InsertMsg{
|
||||
InsertRequest: &msgpb.InsertRequest{
|
||||
CollectionName: "test_empty_data_array_non_nullable",
|
||||
FieldsData: upsertData,
|
||||
NumRows: uint64(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
idAllocator: idAllocator,
|
||||
result: &milvuspb.MutationResult{},
|
||||
node: &Proxy{},
|
||||
}
|
||||
|
||||
// case1: test upsert
|
||||
err = task.PreExecute(ctx)
|
||||
assert.Error(t, err)
|
||||
|
||||
// case2: test partial update
|
||||
task.req.PartialUpdate = true
|
||||
err = task.PreExecute(ctx)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@ -438,6 +438,9 @@ func GetNumRowOfFieldDataWithSchema(fieldData *schemapb.FieldData, helper *typeu
|
||||
fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetDoubleData().GetData())
|
||||
case schemapb.DataType_Timestamptz:
|
||||
fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetTimestamptzData().GetData())
|
||||
if fieldNumRows == 0 {
|
||||
fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetStringData().GetData())
|
||||
}
|
||||
case schemapb.DataType_String, schemapb.DataType_VarChar, schemapb.DataType_Text:
|
||||
fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetStringData().GetData())
|
||||
case schemapb.DataType_Array:
|
||||
@ -446,6 +449,9 @@ func GetNumRowOfFieldDataWithSchema(fieldData *schemapb.FieldData, helper *typeu
|
||||
fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetJsonData().GetData())
|
||||
case schemapb.DataType_Geometry:
|
||||
fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetGeometryData().GetData())
|
||||
if fieldNumRows == 0 {
|
||||
fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetGeometryWktData().GetData())
|
||||
}
|
||||
case schemapb.DataType_FloatVector:
|
||||
dim := fieldData.GetVectors().GetDim()
|
||||
fieldNumRows, err = GetNumRowsOfFloatVectorField(fieldData.GetVectors().GetFloatVector().GetData(), dim)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user