From 8f16afd5e78012c4e8c99efd2d157720fa8454aa Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 21 Oct 2025 15:14:03 +0800 Subject: [PATCH] fix: Handle JSON field default values in storage layer (#44999) Related to #44995 Added missing case for JSON data type in GetDefaultValue function to properly retrieve default values for JSON fields. This prevents crashes when enabling dynamic fields with default values during concurrent insert operations. Changes: - Added JSON data type case in GetDefaultValue to return BytesData - Added comprehensive tests for fillMissingFields covering JSON and other data types with default values - Added tests for nullable fields, required fields validation, and edge cases Signed-off-by: Congqi Xia --- internal/storage/utils.go | 2 + internal/storage/utils_test.go | 475 +++++++++++++++++++++++++++++++++ 2 files changed, 477 insertions(+) diff --git a/internal/storage/utils.go b/internal/storage/utils.go index 382cb6f154..65fcb41d44 100644 --- a/internal/storage/utils.go +++ b/internal/storage/utils.go @@ -1601,6 +1601,8 @@ func GetDefaultValue(fieldSchema *schemapb.FieldSchema) interface{} { return fieldSchema.GetDefaultValue().GetStringData() case schemapb.DataType_Timestamptz: return fieldSchema.GetDefaultValue().GetTimestamptzData() + case schemapb.DataType_JSON: + return fieldSchema.GetDefaultValue().GetBytesData() default: // won't happen panic(fmt.Sprintf("undefined data type:%s", fieldSchema.DataType.String())) diff --git a/internal/storage/utils_test.go b/internal/storage/utils_test.go index b3547e6e95..d995d30d87 100644 --- a/internal/storage/utils_test.go +++ b/internal/storage/utils_test.go @@ -2083,3 +2083,478 @@ func TestBM25Checker(t *testing.T) { } } } + +func TestFillMissingFields(t *testing.T) { + t.Run("fill missing field with default value - int64", func(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "test_schema", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, + Name: common.RowIDFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: common.TimeStampField, + Name: common.TimeStampFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }, + { + FieldID: 101, + Name: "field_with_default", + DataType: schemapb.DataType_Int64, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_LongData{LongData: 42}, + }, + }, + }, + } + + insertData := &InsertData{ + Data: map[FieldID]FieldData{ + common.RowIDField: &Int64FieldData{Data: []int64{1, 2, 3}}, + common.TimeStampField: &Int64FieldData{Data: []int64{100, 200, 300}}, + 100: &Int64FieldData{Data: []int64{10, 20, 30}}, + // Field 101 is missing + }, + } + + err := fillMissingFields(schema, insertData) + assert.NoError(t, err) + + // Check that field 101 was filled with default value + field101, exists := insertData.Data[101] + assert.True(t, exists) + assert.Equal(t, 3, field101.RowNum()) + + int64Field := field101.(*Int64FieldData) + assert.Equal(t, []int64{42, 42, 42}, int64Field.Data) + }) + + t.Run("fill missing field with default value - json", func(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "test_schema", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, + Name: common.RowIDFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: common.TimeStampField, + Name: common.TimeStampFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }, + { + FieldID: 101, + Name: "json_with_default", + DataType: schemapb.DataType_JSON, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_BytesData{BytesData: []byte("{}")}, + }, + }, + }, + } + + insertData := &InsertData{ + Data: map[FieldID]FieldData{ + common.RowIDField: &Int64FieldData{Data: []int64{1, 2, 3}}, + common.TimeStampField: &Int64FieldData{Data: []int64{100, 200, 300}}, + 100: &Int64FieldData{Data: []int64{10, 20, 30}}, + // Field 101 is missing + }, + } + + err := fillMissingFields(schema, insertData) + assert.NoError(t, err) + + // Check that field 101 was filled with default value + field101, exists := insertData.Data[101] + assert.True(t, exists) + assert.Equal(t, 3, field101.RowNum()) + + jsonField := field101.(*JSONFieldData) + assert.Equal(t, [][]byte{[]byte("{}"), []byte("{}"), []byte("{}")}, jsonField.Data) + }) + + t.Run("fill missing field with default value - various types", func(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "test_schema", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, + Name: common.RowIDFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: common.TimeStampField, + Name: common.TimeStampFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }, + { + FieldID: 101, + Name: "bool_default", + DataType: schemapb.DataType_Bool, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_BoolData{BoolData: true}, + }, + }, + { + FieldID: 102, + Name: "int32_default", + DataType: schemapb.DataType_Int32, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_IntData{IntData: 123}, + }, + }, + { + FieldID: 103, + Name: "float_default", + DataType: schemapb.DataType_Float, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_FloatData{FloatData: 3.14}, + }, + }, + { + FieldID: 104, + Name: "string_default", + DataType: schemapb.DataType_VarChar, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_StringData{StringData: "default_value"}, + }, + }, + }, + } + + insertData := &InsertData{ + Data: map[FieldID]FieldData{ + common.RowIDField: &Int64FieldData{Data: []int64{1, 2}}, + common.TimeStampField: &Int64FieldData{Data: []int64{100, 200}}, + 100: &Int64FieldData{Data: []int64{10, 20}}, + }, + } + + err := fillMissingFields(schema, insertData) + assert.NoError(t, err) + + // Check bool field + field101, exists := insertData.Data[101] + assert.True(t, exists) + boolField := field101.(*BoolFieldData) + assert.Equal(t, []bool{true, true}, boolField.Data) + + // Check int32 field + field102, exists := insertData.Data[102] + assert.True(t, exists) + int32Field := field102.(*Int32FieldData) + assert.Equal(t, []int32{123, 123}, int32Field.Data) + + // Check float field + field103, exists := insertData.Data[103] + assert.True(t, exists) + floatField := field103.(*FloatFieldData) + assert.Equal(t, []float32{3.14, 3.14}, floatField.Data) + + // Check string field + field104, exists := insertData.Data[104] + assert.True(t, exists) + stringField := field104.(*StringFieldData) + assert.Equal(t, []string{"default_value", "default_value"}, stringField.Data) + }) + + t.Run("fill missing nullable field with null", func(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "test_schema", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, + Name: common.RowIDFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: common.TimeStampField, + Name: common.TimeStampFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }, + { + FieldID: 101, + Name: "nullable_field", + DataType: schemapb.DataType_Int64, + Nullable: true, + }, + }, + } + + insertData := &InsertData{ + Data: map[FieldID]FieldData{ + common.RowIDField: &Int64FieldData{Data: []int64{1, 2, 3}}, + common.TimeStampField: &Int64FieldData{Data: []int64{100, 200, 300}}, + 100: &Int64FieldData{Data: []int64{10, 20, 30}}, + }, + } + + err := fillMissingFields(schema, insertData) + assert.NoError(t, err) + + // Check that field 101 exists and has null values + field101, exists := insertData.Data[101] + assert.True(t, exists) + assert.Equal(t, 3, field101.RowNum()) + + int64Field := field101.(*Int64FieldData) + assert.Equal(t, 3, len(int64Field.ValidData)) + // All values should be marked as invalid (null) + assert.Equal(t, []bool{false, false, false}, int64Field.ValidData) + }) + + t.Run("error when field is not nullable and has no default", func(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "test_schema", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, + Name: common.RowIDFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: common.TimeStampField, + Name: common.TimeStampFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }, + { + FieldID: 101, + Name: "required_field", + DataType: schemapb.DataType_Int64, + Nullable: false, + // No default value + }, + }, + } + + insertData := &InsertData{ + Data: map[FieldID]FieldData{ + common.RowIDField: &Int64FieldData{Data: []int64{1, 2, 3}}, + common.TimeStampField: &Int64FieldData{Data: []int64{100, 200, 300}}, + 100: &Int64FieldData{Data: []int64{10, 20, 30}}, + }, + } + + err := fillMissingFields(schema, insertData) + assert.Error(t, err) + assert.Contains(t, err.Error(), "required_field") + assert.Contains(t, err.Error(), "not nullable and has no default value") + }) + + t.Run("skip system fields", func(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "test_schema", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, // Field ID < 100, should be skipped + Name: common.RowIDFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: common.TimeStampField, // Field ID < 100, should be skipped + Name: common.TimeStampFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }, + }, + } + + insertData := &InsertData{ + Data: map[FieldID]FieldData{ + 100: &Int64FieldData{Data: []int64{10, 20, 30}}, + // System fields (RowID, TimeStamp) are missing but should be skipped + }, + } + + err := fillMissingFields(schema, insertData) + assert.NoError(t, err) + + // System fields should not be added + _, existsRowID := insertData.Data[common.RowIDField] + assert.False(t, existsRowID) + _, existsTimestamp := insertData.Data[common.TimeStampField] + assert.False(t, existsTimestamp) + }) + + t.Run("skip function output fields", func(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "test_schema", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, + Name: common.RowIDFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: common.TimeStampField, + Name: common.TimeStampFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }, + { + FieldID: 101, + Name: "function_output", + DataType: schemapb.DataType_FloatVector, + IsFunctionOutput: true, // Should be skipped + }, + }, + } + + insertData := &InsertData{ + Data: map[FieldID]FieldData{ + common.RowIDField: &Int64FieldData{Data: []int64{1, 2, 3}}, + common.TimeStampField: &Int64FieldData{Data: []int64{100, 200, 300}}, + 100: &Int64FieldData{Data: []int64{10, 20, 30}}, + }, + } + + err := fillMissingFields(schema, insertData) + assert.NoError(t, err) + + // Function output field should not be added + _, exists := insertData.Data[101] + assert.False(t, exists) + }) + + t.Run("all fields present - no filling needed", func(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "test_schema", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, + Name: common.RowIDFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: common.TimeStampField, + Name: common.TimeStampFieldName, + DataType: schemapb.DataType_Int64, + IsPrimaryKey: false, + }, + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }, + { + FieldID: 101, + Name: "field1", + DataType: schemapb.DataType_Int64, + }, + }, + } + + insertData := &InsertData{ + Data: map[FieldID]FieldData{ + common.RowIDField: &Int64FieldData{Data: []int64{1, 2, 3}}, + common.TimeStampField: &Int64FieldData{Data: []int64{100, 200, 300}}, + 100: &Int64FieldData{Data: []int64{10, 20, 30}}, + 101: &Int64FieldData{Data: []int64{100, 200, 300}}, + }, + } + + originalDataLen := len(insertData.Data) + + err := fillMissingFields(schema, insertData) + assert.NoError(t, err) + + // No new fields should be added + assert.Equal(t, originalDataLen, len(insertData.Data)) + }) + + t.Run("empty insert data", func(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "test_schema", + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }, + { + FieldID: 101, + Name: "field_with_default", + DataType: schemapb.DataType_Int64, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_LongData{LongData: 42}, + }, + }, + }, + } + + insertData := &InsertData{ + Data: map[FieldID]FieldData{ + 100: &Int64FieldData{Data: []int64{}}, + }, + } + + err := fillMissingFields(schema, insertData) + assert.NoError(t, err) + + // Field should be created but empty + field101, exists := insertData.Data[101] + assert.True(t, exists) + assert.Equal(t, 0, field101.RowNum()) + }) +}