From d7050c417f03ef697e82866e1d341b1640203b22 Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 9 Dec 2025 14:17:12 +0800 Subject: [PATCH] fix: Add field data alignment validation to prevent partial update panic (#46177) issue: #46176 - Add checkAligned validation before processing partial update field data to prevent index out of range panic when field data arrays have mismatched lengths - Fix GetNumRowOfFieldDataWithSchema to handle Timestamptz string format and Geometry WKT format properly - Add unit tests for empty data array scenarios in partial update --------- Signed-off-by: Wei Liu --- internal/proxy/task_upsert.go | 13 +- internal/proxy/task_upsert_test.go | 271 +++++++++++++++++++++++++++++ pkg/util/funcutil/func.go | 6 + 3 files changed, 286 insertions(+), 4 deletions(-) diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index ebb614f0fa..3af8e1fe72 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -291,11 +291,10 @@ func (it *upsertTask) queryPreExecute(ctx context.Context) error { zap.Int64("latency", tr.ElapseSpan().Milliseconds())) // set field id for user passed field data, prepare for merge logic - upsertFieldData := it.upsertMsg.InsertMsg.GetFieldsData() - if len(upsertFieldData) == 0 { + if len(it.upsertMsg.InsertMsg.GetFieldsData()) == 0 { return merr.WrapErrParameterInvalidMsg("upsert field data is empty") } - for _, fieldData := range upsertFieldData { + for _, fieldData := range it.upsertMsg.InsertMsg.GetFieldsData() { fieldName := fieldData.GetFieldName() if fieldData.GetIsDynamic() { fieldName = "$meta" @@ -318,6 +317,12 @@ func (it *upsertTask) queryPreExecute(ctx context.Context) error { } } + // Validate field data alignment before processing to prevent index out of range panic + if err := newValidateUtil().checkAligned(it.upsertMsg.InsertMsg.GetFieldsData(), it.schema.schemaHelper, uint64(upsertIDSize)); err != nil { + log.Warn("check field data aligned failed", zap.Error(err)) + return err + } + // Two nullable data formats are supported: // // COMPRESSED FORMAT (SDK format, before validateUtil.fillWithValue processing): @@ -395,7 +400,7 @@ func (it *upsertTask) queryPreExecute(ctx context.Context) error { return merr.WrapErrParameterInvalidMsg("primary key not found in exist data mapping") } typeutil.AppendFieldData(it.insertFieldData, existFieldData, int64(existIndex)) - err := typeutil.UpdateFieldData(it.insertFieldData, upsertFieldData, int64(baseIdx), int64(idx)) + err := typeutil.UpdateFieldData(it.insertFieldData, it.upsertMsg.InsertMsg.GetFieldsData(), int64(baseIdx), int64(idx)) baseIdx += 1 if err != nil { log.Info("update field data failed", zap.Error(err)) diff --git a/internal/proxy/task_upsert_test.go b/internal/proxy/task_upsert_test.go index 8aa3bb1ac4..5aa07a74d5 100644 --- a/internal/proxy/task_upsert_test.go +++ b/internal/proxy/task_upsert_test.go @@ -845,18 +845,41 @@ func TestUpdateTask_queryPreExecute_Success(t *testing.T) { FieldName: "id", FieldId: 100, Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{Data: []int64{1, 2, 3}}, + }, + }, + }, }, { FieldName: "name", FieldId: 102, Type: schemapb.DataType_VarChar, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{Data: []string{"test1", "test2", "test3"}}, + }, + }, + }, }, { FieldName: "vector", FieldId: 101, Type: schemapb.DataType_FloatVector, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: 128, + Data: &schemapb.VectorField_FloatVector{ + FloatVector: &schemapb.FloatArray{Data: make([]float32, 384)}, // 3 * 128 + }, + }, + }, }, }, + NumRows: 3, }, }, } @@ -1600,3 +1623,251 @@ func TestUpsertTask_NoDuplicatePK(t *testing.T) { assert.NoError(t, err) assert.False(t, hasDuplicate, "should not have duplicate primary keys") } + +// TestUpsertTask_queryPreExecute_EmptyDataArray tests the scenario where: +// 1. Partial update is enabled +// 2. Three columns are passed: pk (a), vector (b), scalar (c) +// 3. Columns a and b have 10 rows of data, column c has FieldData but empty data array +// 4. Verifies both nullable and non-nullable scenarios for column c +func TestUpsertTask_queryPreExecute_EmptyDataArray(t *testing.T) { + numRows := 10 + dim := 128 + + t.Run("scalar field with empty data array nullable field", func(t *testing.T) { + // Schema with nullable scalar field c + schema := newSchemaInfo(&schemapb.CollectionSchema{ + Name: "test_empty_data_array", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, Name: "a", IsPrimaryKey: true, DataType: schemapb.DataType_Int64}, + { + FieldID: 101, + Name: "b", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: "dim", Value: "128"}, + }, + }, + {FieldID: 102, Name: "c", DataType: schemapb.DataType_Int32, Nullable: true}, + }, + }) + + // Upsert data: a (pk, 10 rows), b (vector, 10 rows), c (scalar, FieldData exists but data array is empty) + pkData := make([]int64, numRows) + for i := 0; i < numRows; i++ { + pkData[i] = int64(i + 1) + } + vectorData := make([]float32, numRows*dim) + + upsertData := []*schemapb.FieldData{ + { + FieldName: "a", FieldId: 100, Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: pkData}}}}, + }, + { + FieldName: "b", FieldId: 101, Type: schemapb.DataType_FloatVector, + Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Dim: int64(dim), Data: &schemapb.VectorField_FloatVector{FloatVector: &schemapb.FloatArray{Data: vectorData}}}}, + }, + { + // c has FieldData but empty data array + FieldName: "c", FieldId: 102, Type: schemapb.DataType_Int32, + Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: []int32{}}}}}, + }, + } + + // Query result returns empty (all are new inserts) + mockQueryResult := &milvuspb.QueryResults{ + Status: merr.Success(), + FieldsData: []*schemapb.FieldData{ + { + FieldName: "a", FieldId: 100, Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: []int64{}}}}}, + }, + { + FieldName: "b", FieldId: 101, Type: schemapb.DataType_FloatVector, + Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Dim: int64(dim), Data: &schemapb.VectorField_FloatVector{FloatVector: &schemapb.FloatArray{Data: []float32{}}}}}, + }, + { + FieldName: "c", FieldId: 102, Type: schemapb.DataType_Int32, + Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: []int32{}}}}}, + }, + }, + } + + mockey.PatchConvey("test nullable field", t, func() { + // Setup mocks using mockey + mockey.Mock(GetReplicateID).Return("", nil).Build() + mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build() + mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{updateTimestamp: 12345}, nil).Build() + mockey.Mock((*MetaCache).GetCollectionSchema).Return(schema, nil).Build() + mockey.Mock(isPartitionKeyMode).Return(false, nil).Build() + mockey.Mock((*MetaCache).GetPartitionInfo).Return(&partitionInfo{name: "_default"}, nil).Build() + mockey.Mock((*MetaCache).GetDatabaseInfo).Return(&databaseInfo{dbID: 0}, nil).Build() + mockey.Mock(retrieveByPKs).Return(mockQueryResult, segcore.StorageCost{}, nil).Build() + + globalMetaCache = &MetaCache{} + + // Setup idAllocator + ctx := context.Background() + rc := mocks.NewMockRootCoordClient(t) + rc.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{ + Status: merr.Status(nil), + ID: 1000, + Count: uint32(numRows), + }, nil).Maybe() + idAllocator, err := allocator.NewIDAllocator(ctx, rc, 0) + assert.NoError(t, err) + idAllocator.Start() + defer idAllocator.Close() + + task := &upsertTask{ + ctx: ctx, + schema: schema, + req: &milvuspb.UpsertRequest{ + CollectionName: "test_empty_data_array", + FieldsData: upsertData, + NumRows: uint32(numRows), + }, + upsertMsg: &msgstream.UpsertMsg{ + InsertMsg: &msgstream.InsertMsg{ + InsertRequest: &msgpb.InsertRequest{ + CollectionName: "test_empty_data_array", + FieldsData: upsertData, + NumRows: uint64(numRows), + }, + }, + }, + idAllocator: idAllocator, + result: &milvuspb.MutationResult{}, + node: &Proxy{}, + } + + // case1: test upsert + err = task.PreExecute(ctx) + assert.Error(t, err) + + // case2: test partial update + task.req.PartialUpdate = true + err = task.PreExecute(ctx) + assert.Error(t, err) + }) + }) + + t.Run("scalar field with empty data array - non-nullable field", func(t *testing.T) { + // Schema with non-nullable scalar field c + schema := newSchemaInfo(&schemapb.CollectionSchema{ + Name: "test_empty_data_array_non_nullable", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, Name: "a", IsPrimaryKey: true, DataType: schemapb.DataType_Int64}, + { + FieldID: 101, + Name: "b", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: "dim", Value: "128"}, + }, + }, + {FieldID: 102, Name: "c", DataType: schemapb.DataType_Int32, Nullable: false}, + }, + }) + + // Upsert data: a (pk, 10 rows), b (vector, 10 rows), c (scalar, FieldData exists but data array is empty) + pkData := make([]int64, numRows) + for i := 0; i < numRows; i++ { + pkData[i] = int64(i + 1) + } + vectorData := make([]float32, numRows*dim) + + upsertData := []*schemapb.FieldData{ + { + FieldName: "a", FieldId: 100, Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: pkData}}}}, + }, + { + FieldName: "b", FieldId: 101, Type: schemapb.DataType_FloatVector, + Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Dim: int64(dim), Data: &schemapb.VectorField_FloatVector{FloatVector: &schemapb.FloatArray{Data: vectorData}}}}, + }, + { + // c has FieldData but empty data array - this should cause validation error for non-nullable field + FieldName: "c", FieldId: 102, Type: schemapb.DataType_Int32, + Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: []int32{}}}}}, + }, + } + + // Query result returns empty (all are new inserts) + mockQueryResult := &milvuspb.QueryResults{ + Status: merr.Success(), + FieldsData: []*schemapb.FieldData{ + { + FieldName: "a", FieldId: 100, Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: []int64{}}}}}, + }, + { + FieldName: "b", FieldId: 101, Type: schemapb.DataType_FloatVector, + Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Dim: int64(dim), Data: &schemapb.VectorField_FloatVector{FloatVector: &schemapb.FloatArray{Data: []float32{}}}}}, + }, + { + FieldName: "c", FieldId: 102, Type: schemapb.DataType_Int32, + Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: []int32{}}}}}, + }, + }, + } + + mockey.PatchConvey("test non-nullable field", t, func() { + // Setup mocks using mockey + mockey.Mock(GetReplicateID).Return("", nil).Build() + mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build() + mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{updateTimestamp: 12345}, nil).Build() + mockey.Mock((*MetaCache).GetCollectionSchema).Return(schema, nil).Build() + mockey.Mock(isPartitionKeyMode).Return(false, nil).Build() + mockey.Mock((*MetaCache).GetPartitionInfo).Return(&partitionInfo{name: "_default"}, nil).Build() + mockey.Mock((*MetaCache).GetDatabaseInfo).Return(&databaseInfo{dbID: 0}, nil).Build() + mockey.Mock(retrieveByPKs).Return(mockQueryResult, segcore.StorageCost{}, nil).Build() + + globalMetaCache = &MetaCache{} + + // Setup idAllocator + ctx := context.Background() + rc := mocks.NewMockRootCoordClient(t) + rc.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{ + Status: merr.Status(nil), + ID: 1000, + Count: uint32(numRows), + }, nil).Maybe() + idAllocator, err := allocator.NewIDAllocator(ctx, rc, 0) + assert.NoError(t, err) + idAllocator.Start() + defer idAllocator.Close() + + task := &upsertTask{ + ctx: ctx, + schema: schema, + req: &milvuspb.UpsertRequest{ + CollectionName: "test_empty_data_array_non_nullable", + FieldsData: upsertData, + NumRows: uint32(numRows), + }, + upsertMsg: &msgstream.UpsertMsg{ + InsertMsg: &msgstream.InsertMsg{ + InsertRequest: &msgpb.InsertRequest{ + CollectionName: "test_empty_data_array_non_nullable", + FieldsData: upsertData, + NumRows: uint64(numRows), + }, + }, + }, + idAllocator: idAllocator, + result: &milvuspb.MutationResult{}, + node: &Proxy{}, + } + + // case1: test upsert + err = task.PreExecute(ctx) + assert.Error(t, err) + + // case2: test partial update + task.req.PartialUpdate = true + err = task.PreExecute(ctx) + assert.Error(t, err) + }) + }) +} diff --git a/pkg/util/funcutil/func.go b/pkg/util/funcutil/func.go index 5317a67e92..fde131e4aa 100644 --- a/pkg/util/funcutil/func.go +++ b/pkg/util/funcutil/func.go @@ -438,6 +438,9 @@ func GetNumRowOfFieldDataWithSchema(fieldData *schemapb.FieldData, helper *typeu fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetDoubleData().GetData()) case schemapb.DataType_Timestamptz: fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetTimestamptzData().GetData()) + if fieldNumRows == 0 { + fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetStringData().GetData()) + } case schemapb.DataType_String, schemapb.DataType_VarChar, schemapb.DataType_Text: fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetStringData().GetData()) case schemapb.DataType_Array: @@ -446,6 +449,9 @@ func GetNumRowOfFieldDataWithSchema(fieldData *schemapb.FieldData, helper *typeu fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetJsonData().GetData()) case schemapb.DataType_Geometry: fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetGeometryData().GetData()) + if fieldNumRows == 0 { + fieldNumRows = getNumRowsOfScalarField(fieldData.GetScalars().GetGeometryWktData().GetData()) + } case schemapb.DataType_FloatVector: dim := fieldData.GetVectors().GetDim() fieldNumRows, err = GetNumRowsOfFloatVectorField(fieldData.GetVectors().GetFloatVector().GetData(), dim)