diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index dc00d5bd3f..ebb614f0fa 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -1078,18 +1078,13 @@ func (it *upsertTask) PreExecute(ctx context.Context) error { log.Warn("fail to get primary field schema", zap.Error(err)) return err } - deduplicatedFieldsData, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, it.req.GetFieldsData(), schema) + duplicate, err := CheckDuplicatePkExist(primaryFieldSchema, it.req.GetFieldsData()) if err != nil { - log.Warn("fail to deduplicate upsert data", zap.Error(err)) + log.Warn("fail to check duplicate primary keys", zap.Error(err)) + return err } - - // dedup won't decrease numOfRows to 0 - if newNumRows > 0 && newNumRows != it.req.NumRows { - log.Info("upsert data deduplicated", - zap.Uint32("original_num_rows", it.req.NumRows), - zap.Uint32("deduplicated_num_rows", newNumRows)) - it.req.FieldsData = deduplicatedFieldsData - it.req.NumRows = newNumRows + if duplicate { + return merr.WrapErrParameterInvalidMsg("duplicate primary keys are not allowed in the same batch") } it.upsertMsg = &msgstream.UpsertMsg{ diff --git a/internal/proxy/task_upsert_test.go b/internal/proxy/task_upsert_test.go index 5f06c25395..8aa3bb1ac4 100644 --- a/internal/proxy/task_upsert_test.go +++ b/internal/proxy/task_upsert_test.go @@ -35,7 +35,6 @@ import ( "github.com/milvus-io/milvus/internal/proxy/shardclient" "github.com/milvus-io/milvus/internal/util/function/embedding" "github.com/milvus-io/milvus/internal/util/segcore" - "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" "github.com/milvus-io/milvus/pkg/v2/proto/planpb" "github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb" @@ -1467,402 +1466,137 @@ func TestGenNullableFieldData_GeometryAndTimestamptz(t *testing.T) { }) } -func TestUpsertTask_PlanNamespace_AfterPreExecute(t *testing.T) { - mockey.PatchConvey("TestUpsertTask_PlanNamespace_AfterPreExecute", t, func() { - // Setup global meta cache and common mocks - globalMetaCache = &MetaCache{} - mockey.Mock(GetReplicateID).Return("", nil).Build() - mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build() - mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{updateTimestamp: 12345}, nil).Build() - mockey.Mock((*MetaCache).GetPartitionInfo).Return(&partitionInfo{name: "_default"}, nil).Build() - mockey.Mock((*MetaCache).GetPartitionID).Return(int64(1002), nil).Build() - mockey.Mock(isPartitionKeyMode).Return(false, nil).Build() - mockey.Mock(validatePartitionTag).Return(nil).Build() - - // Schema with namespace enabled - mockey.Mock((*MetaCache).GetCollectionSchema).To(func(_ *MetaCache, _ context.Context, _ string, _ string) (*schemaInfo, error) { - info := createTestSchema() - info.CollectionSchema.Properties = append(info.CollectionSchema.Properties, &commonpb.KeyValuePair{Key: common.NamespaceEnabledKey, Value: "true"}) - return info, nil - }).Build() - - // Capture plan to verify namespace - var capturedPlan *planpb.PlanNode - mockey.Mock(planparserv2.CreateRequeryPlan).To(func(_ *schemapb.FieldSchema, _ *schemapb.IDs) *planpb.PlanNode { - capturedPlan = &planpb.PlanNode{} - return capturedPlan - }).Build() - - // Mock query to return a valid result for queryPreExecute merge path - mockey.Mock((*Proxy).query).Return(&milvuspb.QueryResults{ - Status: merr.Success(), - FieldsData: []*schemapb.FieldData{ - { - FieldName: "id", - FieldId: 100, - Type: schemapb.DataType_Int64, - Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: []int64{1, 2}}}}}, - }, - { - FieldName: "name", - FieldId: 102, - Type: schemapb.DataType_VarChar, - Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_StringData{StringData: &schemapb.StringArray{Data: []string{"old1", "old2"}}}}}, - }, - { - FieldName: "vector", - FieldId: 101, - Type: schemapb.DataType_FloatVector, - Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Dim: 128, Data: &schemapb.VectorField_FloatVector{FloatVector: &schemapb.FloatArray{Data: make([]float32, 256)}}}}, - }, - }, - }, segcore.StorageCost{}, nil).Build() - - // Build task - task := createTestUpdateTask() - ns := "ns-1" - task.req.PartialUpdate = true - task.req.Namespace = &ns - - // Skip insert/delete heavy logic - mockey.Mock((*upsertTask).insertPreExecute).Return(nil).Build() - mockey.Mock((*upsertTask).deletePreExecute).Return(nil).Build() - - err := task.PreExecute(context.Background()) - assert.NoError(t, err) - assert.NotNil(t, capturedPlan) - assert.NotNil(t, capturedPlan.Namespace) - assert.Equal(t, *task.req.Namespace, *capturedPlan.Namespace) - }) -} - -func TestUpsertTask_Deduplicate_Int64PK(t *testing.T) { - // Test deduplication with Int64 primary key - primaryFieldSchema := &schemapb.FieldSchema{ - Name: "id", - FieldID: 100, - DataType: schemapb.DataType_Int64, - IsPrimaryKey: true, - } - - collSchema := &schemapb.CollectionSchema{ +func TestUpsertTask_DuplicatePK_Int64(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "test_duplicate_pk", Fields: []*schemapb.FieldSchema{ - primaryFieldSchema, - { - Name: "float_field", - FieldID: 101, - DataType: schemapb.DataType_Float, - }, + {FieldID: 100, Name: "id", IsPrimaryKey: true, DataType: schemapb.DataType_Int64}, + {FieldID: 101, Name: "value", DataType: schemapb.DataType_Int32}, }, } - schema := newSchemaInfo(collSchema) - // Create field data with duplicate IDs: [1, 2, 3, 2, 1] - // Expected to keep last occurrence of each: [3, 2, 1] (indices 2, 3, 4) + // Data with duplicate primary keys: 1, 2, 1 (duplicate) fieldsData := []*schemapb.FieldData{ { FieldName: "id", + FieldId: 100, Type: schemapb.DataType_Int64, Field: &schemapb.FieldData_Scalars{ Scalars: &schemapb.ScalarField{ Data: &schemapb.ScalarField_LongData{ - LongData: &schemapb.LongArray{ - Data: []int64{1, 2, 3, 2, 1}, - }, + LongData: &schemapb.LongArray{Data: []int64{1, 2, 1}}, }, }, }, }, { - FieldName: "float_field", - Type: schemapb.DataType_Float, + FieldName: "value", + FieldId: 101, + Type: schemapb.DataType_Int32, Field: &schemapb.FieldData_Scalars{ Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_FloatData{ - FloatData: &schemapb.FloatArray{ - Data: []float32{1.1, 2.2, 3.3, 2.4, 1.5}, - }, + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: []int32{100, 200, 300}}, }, }, }, }, } - deduplicatedFields, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema) + // Test CheckDuplicatePkExist directly + primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(schema) assert.NoError(t, err) - assert.Equal(t, uint32(3), newNumRows) - assert.Equal(t, 2, len(deduplicatedFields)) - - // Check deduplicated primary keys - pkField := deduplicatedFields[0] - pkData := pkField.GetScalars().GetLongData().GetData() - assert.Equal(t, 3, len(pkData)) - assert.Equal(t, []int64{3, 2, 1}, pkData) - - // Check corresponding float values (should be 3.3, 2.4, 1.5) - floatField := deduplicatedFields[1] - floatData := floatField.GetScalars().GetFloatData().GetData() - assert.Equal(t, 3, len(floatData)) - assert.Equal(t, []float32{3.3, 2.4, 1.5}, floatData) + hasDuplicate, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData) + assert.NoError(t, err) + assert.True(t, hasDuplicate, "should detect duplicate primary keys") } -func TestUpsertTask_Deduplicate_VarCharPK(t *testing.T) { - // Test deduplication with VarChar primary key - primaryFieldSchema := &schemapb.FieldSchema{ - Name: "id", - FieldID: 100, - DataType: schemapb.DataType_VarChar, - IsPrimaryKey: true, - } - - collSchema := &schemapb.CollectionSchema{ +func TestUpsertTask_DuplicatePK_VarChar(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "test_duplicate_pk_varchar", Fields: []*schemapb.FieldSchema{ - primaryFieldSchema, - { - Name: "int_field", - FieldID: 101, - DataType: schemapb.DataType_Int64, - }, + {FieldID: 100, Name: "id", IsPrimaryKey: true, DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: "max_length", Value: "100"}}}, + {FieldID: 101, Name: "value", DataType: schemapb.DataType_Int32}, }, } - schema := newSchemaInfo(collSchema) - // Create field data with duplicate IDs: ["a", "b", "c", "b", "a"] - // Expected to keep last occurrence of each: ["c", "b", "a"] (indices 2, 3, 4) + // Data with duplicate primary keys: "a", "b", "a" (duplicate) fieldsData := []*schemapb.FieldData{ { FieldName: "id", + FieldId: 100, Type: schemapb.DataType_VarChar, Field: &schemapb.FieldData_Scalars{ Scalars: &schemapb.ScalarField{ Data: &schemapb.ScalarField_StringData{ - StringData: &schemapb.StringArray{ - Data: []string{"a", "b", "c", "b", "a"}, - }, + StringData: &schemapb.StringArray{Data: []string{"a", "b", "a"}}, }, }, }, }, { - FieldName: "int_field", - Type: schemapb.DataType_Int64, + FieldName: "value", + FieldId: 101, + Type: schemapb.DataType_Int32, Field: &schemapb.FieldData_Scalars{ Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_LongData{ - LongData: &schemapb.LongArray{ - Data: []int64{100, 200, 300, 201, 101}, - }, + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: []int32{100, 200, 300}}, }, }, }, }, } - deduplicatedFields, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema) + // Test CheckDuplicatePkExist directly + primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(schema) assert.NoError(t, err) - assert.Equal(t, uint32(3), newNumRows) - assert.Equal(t, 2, len(deduplicatedFields)) - - // Check deduplicated primary keys - pkField := deduplicatedFields[0] - pkData := pkField.GetScalars().GetStringData().GetData() - assert.Equal(t, 3, len(pkData)) - assert.Equal(t, []string{"c", "b", "a"}, pkData) - - // Check corresponding int64 values (should be 300, 201, 101) - int64Field := deduplicatedFields[1] - int64Data := int64Field.GetScalars().GetLongData().GetData() - assert.Equal(t, 3, len(int64Data)) - assert.Equal(t, []int64{300, 201, 101}, int64Data) + hasDuplicate, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData) + assert.NoError(t, err) + assert.True(t, hasDuplicate, "should detect duplicate primary keys") } -func TestUpsertTask_Deduplicate_NoDuplicates(t *testing.T) { - // Test with no duplicates - should return original data - primaryFieldSchema := &schemapb.FieldSchema{ - Name: "id", - FieldID: 100, - DataType: schemapb.DataType_Int64, - IsPrimaryKey: true, - } - - collSchema := &schemapb.CollectionSchema{ +func TestUpsertTask_NoDuplicatePK(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "test_no_duplicate_pk", Fields: []*schemapb.FieldSchema{ - primaryFieldSchema, + {FieldID: 100, Name: "id", IsPrimaryKey: true, DataType: schemapb.DataType_Int64}, + {FieldID: 101, Name: "value", DataType: schemapb.DataType_Int32}, }, } - schema := newSchemaInfo(collSchema) + // Data with unique primary keys: 1, 2, 3 fieldsData := []*schemapb.FieldData{ { FieldName: "id", + FieldId: 100, Type: schemapb.DataType_Int64, Field: &schemapb.FieldData_Scalars{ Scalars: &schemapb.ScalarField{ Data: &schemapb.ScalarField_LongData{ - LongData: &schemapb.LongArray{ - Data: []int64{1, 2, 3, 4, 5}, - }, + LongData: &schemapb.LongArray{Data: []int64{1, 2, 3}}, }, }, }, }, - } - - deduplicatedFields, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema) - assert.NoError(t, err) - assert.Equal(t, uint32(5), newNumRows) - assert.Equal(t, 1, len(deduplicatedFields)) - - // Should be unchanged - pkField := deduplicatedFields[0] - pkData := pkField.GetScalars().GetLongData().GetData() - assert.Equal(t, []int64{1, 2, 3, 4, 5}, pkData) -} - -func TestUpsertTask_Deduplicate_WithVector(t *testing.T) { - // Test deduplication with vector field - primaryFieldSchema := &schemapb.FieldSchema{ - Name: "id", - FieldID: 100, - DataType: schemapb.DataType_Int64, - IsPrimaryKey: true, - } - - collSchema := &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - primaryFieldSchema, - { - Name: "vector", - FieldID: 101, - DataType: schemapb.DataType_FloatVector, - }, - }, - } - schema := newSchemaInfo(collSchema) - - dim := 4 - // Create field data with duplicate IDs: [1, 2, 1] - // Expected to keep indices [1, 2] (last occurrence of 2, last occurrence of 1) - fieldsData := []*schemapb.FieldData{ { - FieldName: "id", - Type: schemapb.DataType_Int64, + FieldName: "value", + FieldId: 101, + Type: schemapb.DataType_Int32, Field: &schemapb.FieldData_Scalars{ Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_LongData{ - LongData: &schemapb.LongArray{ - Data: []int64{1, 2, 1}, - }, - }, - }, - }, - }, - { - FieldName: "vector", - Type: schemapb.DataType_FloatVector, - Field: &schemapb.FieldData_Vectors{ - Vectors: &schemapb.VectorField{ - Dim: int64(dim), - Data: &schemapb.VectorField_FloatVector{ - FloatVector: &schemapb.FloatArray{ - Data: []float32{ - 1.0, 1.1, 1.2, 1.3, // vector for ID 1 (first occurrence) - 2.0, 2.1, 2.2, 2.3, // vector for ID 2 - 1.4, 1.5, 1.6, 1.7, // vector for ID 1 (second occurrence - keep this) - }, - }, + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: []int32{100, 200, 300}}, }, }, }, }, } - deduplicatedFields, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema) + // Call CheckDuplicatePkExist directly to verify no duplicate error + primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(schema) assert.NoError(t, err) - assert.Equal(t, uint32(2), newNumRows) - assert.Equal(t, 2, len(deduplicatedFields)) - - // Check deduplicated primary keys - pkField := deduplicatedFields[0] - pkData := pkField.GetScalars().GetLongData().GetData() - assert.Equal(t, 2, len(pkData)) - assert.Equal(t, []int64{2, 1}, pkData) - - // Check corresponding vector (should keep vectors for ID 2 and ID 1's last occurrence) - vectorField := deduplicatedFields[1] - vectorData := vectorField.GetVectors().GetFloatVector().GetData() - assert.Equal(t, 8, len(vectorData)) // 2 vectors * 4 dimensions - expectedVector := []float32{ - 2.0, 2.1, 2.2, 2.3, // vector for ID 2 - 1.4, 1.5, 1.6, 1.7, // vector for ID 1 (last occurrence) - } - assert.Equal(t, expectedVector, vectorData) -} - -func TestUpsertTask_Deduplicate_EmptyData(t *testing.T) { - // Test with empty data - primaryFieldSchema := &schemapb.FieldSchema{ - Name: "id", - FieldID: 100, - DataType: schemapb.DataType_Int64, - IsPrimaryKey: true, - } - - collSchema := &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - primaryFieldSchema, - }, - } - schema := newSchemaInfo(collSchema) - - fieldsData := []*schemapb.FieldData{} - - deduplicatedFields, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema) + hasDuplicate, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData) assert.NoError(t, err) - assert.Equal(t, uint32(0), newNumRows) - assert.Equal(t, 0, len(deduplicatedFields)) -} - -func TestUpsertTask_Deduplicate_MissingPrimaryKey(t *testing.T) { - // Test with missing primary key field - primaryFieldSchema := &schemapb.FieldSchema{ - Name: "id", - FieldID: 100, - DataType: schemapb.DataType_Int64, - IsPrimaryKey: true, - } - - collSchema := &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - primaryFieldSchema, - { - Name: "other_field", - FieldID: 101, - DataType: schemapb.DataType_Float, - }, - }, - } - schema := newSchemaInfo(collSchema) - - fieldsData := []*schemapb.FieldData{ - { - FieldName: "other_field", - Type: schemapb.DataType_Float, - Field: &schemapb.FieldData_Scalars{ - Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_FloatData{ - FloatData: &schemapb.FloatArray{ - Data: []float32{1.1, 2.2}, - }, - }, - }, - }, - }, - } - - _, _, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema) - assert.Error(t, err) - // validateFieldDataColumns will fail first due to column count mismatch - // or the function will fail when trying to find primary key - assert.True(t, err != nil) + assert.False(t, hasDuplicate, "should not have duplicate primary keys") } diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 8888b01ca3..73a1c34163 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -1049,31 +1049,25 @@ func parsePrimaryFieldData2IDs(fieldData *schemapb.FieldData) (*schemapb.IDs, er return primaryData, nil } -// findLastOccurrenceIndices finds indices of last occurrences for each unique ID -func findLastOccurrenceIndices[T comparable](ids []T) []int { - lastOccurrence := make(map[T]int, len(ids)) - for idx, id := range ids { - lastOccurrence[id] = idx - } - - keepIndices := make([]int, 0, len(lastOccurrence)) - for idx, id := range ids { - if lastOccurrence[id] == idx { - keepIndices = append(keepIndices, idx) +// hasDuplicates checks if there are any duplicate values in the slice. +// Returns true immediately when the first duplicate is found (early exit). +func hasDuplicates[T comparable](ids []T) bool { + seen := make(map[T]struct{}, len(ids)) + for _, id := range ids { + if _, exists := seen[id]; exists { + return true } + seen[id] = struct{}{} } - return keepIndices + return false } -// DeduplicateFieldData removes duplicate primary keys from field data, -// keeping the last occurrence of each ID -func DeduplicateFieldData(primaryFieldSchema *schemapb.FieldSchema, fieldsData []*schemapb.FieldData, schema *schemaInfo) ([]*schemapb.FieldData, uint32, error) { +// CheckDuplicatePkExist checks if there are duplicate primary keys in the field data. +// Returns (true, nil) if duplicates exist, (false, nil) if no duplicates. +// Returns (false, error) if there's an error during checking. +func CheckDuplicatePkExist(primaryFieldSchema *schemapb.FieldSchema, fieldsData []*schemapb.FieldData) (bool, error) { if len(fieldsData) == 0 { - return fieldsData, 0, nil - } - - if err := fillFieldPropertiesOnly(fieldsData, schema); err != nil { - return nil, 0, err + return false, nil } // find primary field data @@ -1086,64 +1080,26 @@ func DeduplicateFieldData(primaryFieldSchema *schemapb.FieldSchema, fieldsData [ } if primaryFieldData == nil { - return nil, 0, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("must assign pk when upsert, primary field: %v", primaryFieldSchema.GetName())) + return false, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("must assign pk when upsert, primary field: %v", primaryFieldSchema.GetName())) } - // get row count - var numRows int + // check for duplicates based on primary key type switch primaryFieldData.Field.(type) { case *schemapb.FieldData_Scalars: scalarField := primaryFieldData.GetScalars() switch scalarField.Data.(type) { case *schemapb.ScalarField_LongData: - numRows = len(scalarField.GetLongData().GetData()) + intIDs := scalarField.GetLongData().GetData() + return hasDuplicates(intIDs), nil case *schemapb.ScalarField_StringData: - numRows = len(scalarField.GetStringData().GetData()) + strIDs := scalarField.GetStringData().GetData() + return hasDuplicates(strIDs), nil default: - return nil, 0, merr.WrapErrParameterInvalidMsg("unsupported primary key type") + return false, merr.WrapErrParameterInvalidMsg("unsupported primary key type") } default: - return nil, 0, merr.WrapErrParameterInvalidMsg("primary field must be scalar type") + return false, merr.WrapErrParameterInvalidMsg("primary field must be scalar type") } - - if numRows == 0 { - return fieldsData, 0, nil - } - - // build map to track last occurrence of each primary key - var keepIndices []int - switch primaryFieldData.Field.(type) { - case *schemapb.FieldData_Scalars: - scalarField := primaryFieldData.GetScalars() - switch scalarField.Data.(type) { - case *schemapb.ScalarField_LongData: - // for Int64 primary keys - intIDs := scalarField.GetLongData().GetData() - keepIndices = findLastOccurrenceIndices(intIDs) - - case *schemapb.ScalarField_StringData: - // for VarChar primary keys - strIDs := scalarField.GetStringData().GetData() - keepIndices = findLastOccurrenceIndices(strIDs) - } - } - - // if no duplicates found, return original data - if len(keepIndices) == numRows { - return fieldsData, uint32(numRows), nil - } - - log.Info("duplicate primary keys detected in upsert request, deduplicating", - zap.Int("original_rows", numRows), - zap.Int("deduplicated_rows", len(keepIndices))) - - // use typeutil.AppendFieldData to rebuild field data with deduplicated rows - result := typeutil.PrepareResultFieldData(fieldsData, int64(len(keepIndices))) - for _, idx := range keepIndices { - typeutil.AppendFieldData(result, fieldsData, int64(idx)) - } - - return result, uint32(len(keepIndices)), nil } // autoGenPrimaryFieldData generate primary data when autoID == true @@ -1214,12 +1170,12 @@ func validateFieldDataColumns(columns []*schemapb.FieldData, schema *schemaInfo) expectColumnNum := 0 // Count expected columns - for _, field := range schema.GetFields() { + for _, field := range schema.CollectionSchema.GetFields() { if !typeutil.IsBM25FunctionOutputField(field, schema.CollectionSchema) { expectColumnNum++ } } - for _, structField := range schema.GetStructArrayFields() { + for _, structField := range schema.CollectionSchema.GetStructArrayFields() { expectColumnNum += len(structField.GetFields()) } diff --git a/internal/proxy/util_test.go b/internal/proxy/util_test.go index c23ffd0d1a..1282d3538e 100644 --- a/internal/proxy/util_test.go +++ b/internal/proxy/util_test.go @@ -4866,3 +4866,147 @@ func TestGetStorageCost(t *testing.T) { assert.True(t, ok) }) } + +func TestCheckDuplicatePkExist_Int64PK(t *testing.T) { + primaryFieldSchema := &schemapb.FieldSchema{ + Name: "id", + FieldID: 100, + DataType: schemapb.DataType_Int64, + } + + t.Run("with duplicates", func(t *testing.T) { + fieldsData := []*schemapb.FieldData{ + { + FieldName: "id", + Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: []int64{1, 2, 3, 1, 4, 2}, // duplicates: 1, 2 + }, + }, + }, + }, + }, + } + + hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData) + assert.NoError(t, err) + assert.True(t, hasDup) + }) + + t.Run("without duplicates", func(t *testing.T) { + fieldsData := []*schemapb.FieldData{ + { + FieldName: "id", + Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: []int64{1, 2, 3, 4, 5}, + }, + }, + }, + }, + }, + } + + hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData) + assert.NoError(t, err) + assert.False(t, hasDup) + }) +} + +func TestCheckDuplicatePkExist_VarCharPK(t *testing.T) { + primaryFieldSchema := &schemapb.FieldSchema{ + Name: "id", + FieldID: 100, + DataType: schemapb.DataType_VarChar, + } + + t.Run("with duplicates", func(t *testing.T) { + fieldsData := []*schemapb.FieldData{ + { + FieldName: "id", + Type: schemapb.DataType_VarChar, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: []string{"a", "b", "c", "a", "d"}, // duplicate: "a" + }, + }, + }, + }, + }, + } + + hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData) + assert.NoError(t, err) + assert.True(t, hasDup) + }) + + t.Run("without duplicates", func(t *testing.T) { + fieldsData := []*schemapb.FieldData{ + { + FieldName: "id", + Type: schemapb.DataType_VarChar, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: []string{"a", "b", "c", "d", "e"}, + }, + }, + }, + }, + }, + } + + hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData) + assert.NoError(t, err) + assert.False(t, hasDup) + }) +} + +func TestCheckDuplicatePkExist_EmptyData(t *testing.T) { + primaryFieldSchema := &schemapb.FieldSchema{ + Name: "id", + FieldID: 100, + DataType: schemapb.DataType_Int64, + } + + hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, []*schemapb.FieldData{}) + assert.NoError(t, err) + assert.False(t, hasDup) +} + +func TestCheckDuplicatePkExist_MissingPrimaryKey(t *testing.T) { + primaryFieldSchema := &schemapb.FieldSchema{ + Name: "id", + FieldID: 100, + DataType: schemapb.DataType_Int64, + } + + fieldsData := []*schemapb.FieldData{ + { + FieldName: "other_field", + Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: []int64{1, 2, 3}, + }, + }, + }, + }, + }, + } + + hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData) + assert.Error(t, err) + assert.False(t, hasDup) +} diff --git a/tests/go_client/testcases/upsert_test.go b/tests/go_client/testcases/upsert_test.go index 883d5b8aec..662443befe 100644 --- a/tests/go_client/testcases/upsert_test.go +++ b/tests/go_client/testcases/upsert_test.go @@ -436,7 +436,7 @@ func TestUpsertAutoID(t *testing.T) { // upsert without pks -> error vecColumn = hp.GenColumnData(nb, entity.FieldTypeFloatVector, *hp.TNewDataOption()) _, err = mc.Upsert(ctx, client.NewColumnBasedInsertOption(schema.CollectionName).WithColumns(vecColumn)) - common.CheckErr(t, err, false, "has no corresponding fieldData pass in: invalid parameter") + common.CheckErr(t, err, false, "must assign pk when upsert") } // test upsert autoId collection diff --git a/tests/python_client/milvus_client/test_milvus_client_partial_update.py b/tests/python_client/milvus_client/test_milvus_client_partial_update.py index 3dcec78a78..e866bab7f1 100644 --- a/tests/python_client/milvus_client/test_milvus_client_partial_update.py +++ b/tests/python_client/milvus_client/test_milvus_client_partial_update.py @@ -1221,14 +1221,13 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base): @pytest.mark.tags(CaseLabel.L1) def test_milvus_client_partial_update_same_pk_same_field(self): """ - target: Test PU will success and query will success + target: Test partial update on an existing pk with the same field will success method: 1. Create a collection 2. Insert rows - 3. Upsert the rows with same pk and same field - 4. Query the rows - 5. Upsert the rows with same pk and different field - expected: Step 2 -> 4 should success 5 should fail + 3. Upsert a single row with existing pk and same field (partial update) + 4. Query the row to verify the update + expected: All steps should success, and the field value should be updated """ # step 1: create collection client = self._client() @@ -1236,28 +1235,31 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base): schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False) schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim) schema.add_field(default_int32_field_name, DataType.INT32, nullable=True) - index_params = self.prepare_index_params(client)[0] + index_params = self.prepare_index_params(client)[0] index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX") index_params.add_index(default_vector_field_name, index_type="AUTOINDEX") index_params.add_index(default_int32_field_name, index_type="AUTOINDEX") collection_name = cf.gen_collection_name_by_testcase_name(module_index=1) - self.create_collection(client, collection_name, default_dim, schema=schema, + self.create_collection(client, collection_name, default_dim, schema=schema, consistency_level="Strong", index_params=index_params) - + # step 2: Insert rows rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema) self.upsert(client, collection_name, rows, partial_update=True) - # step 3: Upsert the rows with same pk and same field - new_rows = [{default_primary_key_field_name: 0, - default_int32_field_name: i} for i in range(default_nb)] - self.upsert(client, collection_name, new_rows, partial_update=True) + # step 3: Upsert a single row with existing pk=0 and update the same field + updated_value = 99999 + new_row = {default_primary_key_field_name: 0, + default_int32_field_name: updated_value} + self.upsert(client, collection_name, [new_row], partial_update=True) - # step 4: Query the rows + # step 4: Query the row to verify the update + expected_row = {default_primary_key_field_name: 0, + default_int32_field_name: updated_value} result = self.query(client, collection_name, filter=f"{default_primary_key_field_name} == 0", check_task=CheckTasks.check_query_results, output_fields=[default_int32_field_name], - check_items={exp_res: [new_rows[-1]], + check_items={exp_res: [expected_row], "pk_name": default_primary_key_field_name})[0] assert len(result) == 1 diff --git a/tests/python_client/milvus_client/test_milvus_client_upsert.py b/tests/python_client/milvus_client/test_milvus_client_upsert.py index a670f838c4..f2ab7f2a42 100644 --- a/tests/python_client/milvus_client/test_milvus_client_upsert.py +++ b/tests/python_client/milvus_client/test_milvus_client_upsert.py @@ -371,6 +371,72 @@ class TestMilvusClientUpsertInvalid(TestMilvusClientV2Base): check_task=CheckTasks.err_res, check_items=error) + @pytest.mark.tags(CaseLabel.L1) + def test_milvus_client_upsert_duplicate_pk_int64(self): + """ + target: test upsert with duplicate primary keys (Int64) + method: + 1. create collection with Int64 primary key + 2. upsert data with duplicate primary keys in the same batch + expected: raise error - duplicate primary keys are not allowed + """ + client = self._client() + collection_name = cf.gen_collection_name_by_testcase_name() + # 1. create collection + self.create_collection(client, collection_name, default_dim, consistency_level="Strong") + # 2. upsert with duplicate PKs: 1, 2, 1 (duplicate) + rng = np.random.default_rng(seed=19530) + rows = [ + {default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]), + default_float_field_name: 1.0, default_string_field_name: "first"}, + {default_primary_key_field_name: 2, default_vector_field_name: list(rng.random((1, default_dim))[0]), + default_float_field_name: 2.0, default_string_field_name: "second"}, + {default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]), + default_float_field_name: 1.1, default_string_field_name: "duplicate"}, + ] + error = {ct.err_code: 1100, + ct.err_msg: "duplicate primary keys are not allowed in the same batch"} + self.upsert(client, collection_name, rows, + check_task=CheckTasks.err_res, check_items=error) + self.drop_collection(client, collection_name) + + @pytest.mark.tags(CaseLabel.L1) + def test_milvus_client_upsert_duplicate_pk_varchar(self): + """ + target: test upsert with duplicate primary keys (VarChar) + method: + 1. create collection with VarChar primary key + 2. upsert data with duplicate primary keys in the same batch + expected: raise error - duplicate primary keys are not allowed + """ + client = self._client() + collection_name = cf.gen_collection_name_by_testcase_name() + dim = default_dim + # 1. create collection with VarChar primary key + schema = self.create_schema(client, enable_dynamic_field=False)[0] + schema.add_field(default_primary_key_field_name, DataType.VARCHAR, max_length=64, is_primary=True, + auto_id=False) + schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=dim) + schema.add_field(default_float_field_name, DataType.FLOAT) + index_params = self.prepare_index_params(client)[0] + index_params.add_index(default_vector_field_name, metric_type="COSINE") + self.create_collection(client, collection_name, dimension=dim, schema=schema, index_params=index_params) + # 2. upsert with duplicate PKs: "a", "b", "a" (duplicate) + rng = np.random.default_rng(seed=19530) + rows = [ + {default_primary_key_field_name: "a", default_vector_field_name: list(rng.random((1, dim))[0]), + default_float_field_name: 1.0}, + {default_primary_key_field_name: "b", default_vector_field_name: list(rng.random((1, dim))[0]), + default_float_field_name: 2.0}, + {default_primary_key_field_name: "a", default_vector_field_name: list(rng.random((1, dim))[0]), + default_float_field_name: 1.1}, + ] + error = {ct.err_code: 1100, + ct.err_msg: "duplicate primary keys are not allowed in the same batch"} + self.upsert(client, collection_name, rows, + check_task=CheckTasks.err_res, check_items=error) + self.drop_collection(client, collection_name) + class TestMilvusClientUpsertValid(TestMilvusClientV2Base): """ Test case of search interface """ @@ -550,343 +616,4 @@ class TestMilvusClientUpsertValid(TestMilvusClientV2Base): self.release_partitions(client, collection_name, partition_name) self.drop_partition(client, collection_name, partition_name) if self.has_collection(client, collection_name)[0]: - self.drop_collection(client, collection_name) - - -class TestMilvusClientUpsertDedup(TestMilvusClientV2Base): - """Test case for upsert deduplication functionality""" - - @pytest.fixture(scope="function", params=["COSINE", "L2"]) - def metric_type(self, request): - yield request.param - - @pytest.mark.tags(CaseLabel.L1) - def test_milvus_client_upsert_dedup_int64_pk(self): - """ - target: test upsert with duplicate int64 primary keys in same batch - method: - 1. create collection with int64 primary key - 2. upsert data with duplicate primary keys [1, 2, 3, 2, 1] - 3. query to verify only last occurrence is kept - expected: only 3 unique records exist, with data from last occurrence - """ - client = self._client() - collection_name = cf.gen_collection_name_by_testcase_name() - - # 1. create collection - self.create_collection(client, collection_name, default_dim, consistency_level="Strong") - - # 2. upsert data with duplicate PKs: [1, 2, 3, 2, 1] - # Expected: keep last occurrence -> [3, 2, 1] at indices [2, 3, 4] - rng = np.random.default_rng(seed=19530) - rows = [ - {default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]), - default_float_field_name: 1.0, default_string_field_name: "str_1_first"}, - {default_primary_key_field_name: 2, default_vector_field_name: list(rng.random((1, default_dim))[0]), - default_float_field_name: 2.0, default_string_field_name: "str_2_first"}, - {default_primary_key_field_name: 3, default_vector_field_name: list(rng.random((1, default_dim))[0]), - default_float_field_name: 3.0, default_string_field_name: "str_3"}, - {default_primary_key_field_name: 2, default_vector_field_name: list(rng.random((1, default_dim))[0]), - default_float_field_name: 2.1, default_string_field_name: "str_2_last"}, - {default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]), - default_float_field_name: 1.1, default_string_field_name: "str_1_last"}, - ] - - results = self.upsert(client, collection_name, rows)[0] - # After deduplication, should only have 3 records - assert results['upsert_count'] == 3 - - # 3. query to verify deduplication - should have only 3 unique records - query_results = self.query(client, collection_name, filter="id >= 0")[0] - assert len(query_results) == 3 - - # Verify that last occurrence data is kept - id_to_data = {item['id']: item for item in query_results} - assert 1 in id_to_data - assert 2 in id_to_data - assert 3 in id_to_data - - # Check that data from last occurrence is preserved - assert id_to_data[1]['float'] == 1.1 - assert id_to_data[1]['varchar'] == "str_1_last" - assert id_to_data[2]['float'] == 2.1 - assert id_to_data[2]['varchar'] == "str_2_last" - assert id_to_data[3]['float'] == 3.0 - assert id_to_data[3]['varchar'] == "str_3" - - self.drop_collection(client, collection_name) - - @pytest.mark.tags(CaseLabel.L1) - def test_milvus_client_upsert_dedup_varchar_pk(self): - """ - target: test upsert with duplicate varchar primary keys in same batch - method: - 1. create collection with varchar primary key - 2. upsert data with duplicate primary keys ["a", "b", "c", "b", "a"] - 3. query to verify only last occurrence is kept - expected: only 3 unique records exist, with data from last occurrence - """ - client = self._client() - collection_name = cf.gen_collection_name_by_testcase_name() - - # 1. create collection with varchar primary key - schema = self.create_schema(client, enable_dynamic_field=True)[0] - schema.add_field("id", DataType.VARCHAR, max_length=64, is_primary=True, auto_id=False) - schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim) - schema.add_field("age", DataType.INT64) - index_params = self.prepare_index_params(client)[0] - index_params.add_index(default_vector_field_name, metric_type="COSINE") - self.create_collection(client, collection_name, default_dim, schema=schema, - index_params=index_params, consistency_level="Strong") - - # 2. upsert data with duplicate PKs: ["a", "b", "c", "b", "a"] - # Expected: keep last occurrence -> ["c", "b", "a"] at indices [2, 3, 4] - rng = np.random.default_rng(seed=19530) - rows = [ - {"id": "a", default_vector_field_name: list(rng.random((1, default_dim))[0]), - "age": 10}, - {"id": "b", default_vector_field_name: list(rng.random((1, default_dim))[0]), - "age": 20}, - {"id": "c", default_vector_field_name: list(rng.random((1, default_dim))[0]), - "age": 30}, - {"id": "b", default_vector_field_name: list(rng.random((1, default_dim))[0]), - "age": 21}, - {"id": "a", default_vector_field_name: list(rng.random((1, default_dim))[0]), - "age": 11}, - ] - - results = self.upsert(client, collection_name, rows)[0] - # After deduplication, should only have 3 records - assert results['upsert_count'] == 3 - - # 3. query to verify deduplication - query_results = self.query(client, collection_name, filter='id in ["a", "b", "c"]')[0] - assert len(query_results) == 3 - - # Verify that last occurrence data is kept - id_to_data = {item['id']: item for item in query_results} - assert "a" in id_to_data - assert "b" in id_to_data - assert "c" in id_to_data - - # Check that data from last occurrence is preserved - assert id_to_data["a"]["age"] == 11 - assert id_to_data["b"]["age"] == 21 - assert id_to_data["c"]["age"] == 30 - - self.drop_collection(client, collection_name) - - @pytest.mark.tags(CaseLabel.L1) - def test_milvus_client_upsert_dedup_all_duplicates(self): - """ - target: test upsert when all records have same primary key - method: - 1. create collection - 2. upsert 5 records with same primary key - 3. query to verify only 1 record exists - expected: only 1 record exists with data from last occurrence - """ - client = self._client() - collection_name = cf.gen_collection_name_by_testcase_name() - - # 1. create collection - self.create_collection(client, collection_name, default_dim, consistency_level="Strong") - - # 2. upsert data where all have same PK (id=1) - rng = np.random.default_rng(seed=19530) - rows = [ - {default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]), - default_float_field_name: i * 1.0, default_string_field_name: f"version_{i}"} - for i in range(5) - ] - - results = self.upsert(client, collection_name, rows)[0] - # After deduplication, should only have 1 record - assert results['upsert_count'] == 1 - - # 3. query to verify only 1 record exists - query_results = self.query(client, collection_name, filter="id == 1")[0] - assert len(query_results) == 1 - - # Verify it's the last occurrence (i=4) - assert query_results[0]['float'] == 4.0 - assert query_results[0]['varchar'] == "version_4" - - self.drop_collection(client, collection_name) - - @pytest.mark.tags(CaseLabel.L1) - def test_milvus_client_upsert_dedup_no_duplicates(self): - """ - target: test upsert with no duplicate primary keys - method: - 1. create collection - 2. upsert data with unique primary keys - 3. query to verify all records exist - expected: all records exist as-is - """ - client = self._client() - collection_name = cf.gen_collection_name_by_testcase_name() - - # 1. create collection - self.create_collection(client, collection_name, default_dim, consistency_level="Strong") - - # 2. upsert data with unique PKs - rng = np.random.default_rng(seed=19530) - nb = 10 - rows = [ - {default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), - default_float_field_name: i * 1.0, default_string_field_name: str(i)} - for i in range(nb) - ] - - results = self.upsert(client, collection_name, rows)[0] - # No deduplication should occur - assert results['upsert_count'] == nb - - # 3. query to verify all records exist - query_results = self.query(client, collection_name, filter=f"id >= 0")[0] - assert len(query_results) == nb - - self.drop_collection(client, collection_name) - - @pytest.mark.tags(CaseLabel.L2) - def test_milvus_client_upsert_dedup_large_batch(self): - """ - target: test upsert deduplication with large batch - method: - 1. create collection - 2. upsert large batch with 50% duplicate primary keys - 3. query to verify correct number of records - expected: only unique records exist - """ - client = self._client() - collection_name = cf.gen_collection_name_by_testcase_name() - - # 1. create collection - self.create_collection(client, collection_name, default_dim, consistency_level="Strong") - - # 2. upsert large batch where each ID appears twice - rng = np.random.default_rng(seed=19530) - nb = 500 - unique_ids = nb // 2 # 250 unique IDs - - rows = [] - for i in range(nb): - pk = i % unique_ids # This creates duplicates: 0,1,2...249,0,1,2...249 - rows.append({ - default_primary_key_field_name: pk, - default_vector_field_name: list(rng.random((1, default_dim))[0]), - default_float_field_name: float(i), # Different value for each row - default_string_field_name: f"batch_{i}" - }) - - results = self.upsert(client, collection_name, rows)[0] - # After deduplication, should only have unique_ids records - assert results['upsert_count'] == unique_ids - - # 3. query to verify correct number of records - query_results = self.query(client, collection_name, filter=f"id >= 0", limit=1000)[0] - assert len(query_results) == unique_ids - - # Verify that last occurrence is kept (should have higher float values) - for item in query_results: - pk = item['id'] - # Last occurrence of pk is at index (pk + unique_ids) - expected_float = float(pk + unique_ids) - assert item['float'] == expected_float - assert item['varchar'] == f"batch_{pk + unique_ids}" - - self.drop_collection(client, collection_name) - - @pytest.mark.tags(CaseLabel.L1) - def test_milvus_client_upsert_dedup_with_partition(self): - """ - target: test upsert deduplication works correctly with partitions - method: - 1. create collection with partition - 2. upsert data with duplicates to specific partition - 3. query to verify deduplication in partition - expected: deduplication works within partition - """ - client = self._client() - collection_name = cf.gen_collection_name_by_testcase_name() - partition_name = cf.gen_unique_str("partition") - - # 1. create collection and partition - self.create_collection(client, collection_name, default_dim, consistency_level="Strong") - self.create_partition(client, collection_name, partition_name) - - # 2. upsert data with duplicates to partition - rng = np.random.default_rng(seed=19530) - rows = [ - {default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]), - default_float_field_name: 1.0, default_string_field_name: "first"}, - {default_primary_key_field_name: 2, default_vector_field_name: list(rng.random((1, default_dim))[0]), - default_float_field_name: 2.0, default_string_field_name: "unique"}, - {default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]), - default_float_field_name: 1.1, default_string_field_name: "last"}, - ] - - results = self.upsert(client, collection_name, rows, partition_name=partition_name)[0] - assert results['upsert_count'] == 2 - - # 3. query partition to verify deduplication - query_results = self.query(client, collection_name, filter="id >= 0", - partition_names=[partition_name])[0] - assert len(query_results) == 2 - - # Verify correct data - id_to_data = {item['id']: item for item in query_results} - assert id_to_data[1]['float'] == 1.1 - assert id_to_data[1]['varchar'] == "last" - assert id_to_data[2]['float'] == 2.0 - assert id_to_data[2]['varchar'] == "unique" - - self.drop_collection(client, collection_name) - - @pytest.mark.tags(CaseLabel.L1) - def test_milvus_client_upsert_dedup_with_vectors(self): - """ - target: test upsert deduplication preserves correct vector data - method: - 1. create collection - 2. upsert data with duplicate PKs but different vectors - 3. search to verify correct vector is preserved - expected: vector from last occurrence is preserved - """ - client = self._client() - collection_name = cf.gen_collection_name_by_testcase_name() - - # 1. create collection - self.create_collection(client, collection_name, default_dim, consistency_level="Strong") - - # 2. upsert data with duplicate PK=1 but different vectors - # Create distinctly different vectors for easy verification - first_vector = [1.0] * default_dim # All 1.0 - last_vector = [2.0] * default_dim # All 2.0 - - rows = [ - {default_primary_key_field_name: 1, default_vector_field_name: first_vector, - default_float_field_name: 1.0, default_string_field_name: "first"}, - {default_primary_key_field_name: 2, default_vector_field_name: [0.5] * default_dim, - default_float_field_name: 2.0, default_string_field_name: "unique"}, - {default_primary_key_field_name: 1, default_vector_field_name: last_vector, - default_float_field_name: 1.1, default_string_field_name: "last"}, - ] - - results = self.upsert(client, collection_name, rows)[0] - assert results['upsert_count'] == 2 - - # 3. query to get vector data - query_results = self.query(client, collection_name, filter="id == 1", - output_fields=["id", "vector", "float", "varchar"])[0] - assert len(query_results) == 1 - - # Verify it's the last occurrence with last_vector - result = query_results[0] - assert result['float'] == 1.1 - assert result['varchar'] == "last" - # Vector should be last_vector (all 2.0) - assert all(abs(v - 2.0) < 0.001 for v in result['vector']) - - self.drop_collection(client, collection_name) \ No newline at end of file + self.drop_collection(client, collection_name) \ No newline at end of file