mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: Handle JSON field default values in storage layer (#44999)
Related to #44995 Added missing case for JSON data type in GetDefaultValue function to properly retrieve default values for JSON fields. This prevents crashes when enabling dynamic fields with default values during concurrent insert operations. Changes: - Added JSON data type case in GetDefaultValue to return BytesData - Added comprehensive tests for fillMissingFields covering JSON and other data types with default values - Added tests for nullable fields, required fields validation, and edge cases Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
a4d584f22b
commit
8f16afd5e7
@ -1601,6 +1601,8 @@ func GetDefaultValue(fieldSchema *schemapb.FieldSchema) interface{} {
|
||||
return fieldSchema.GetDefaultValue().GetStringData()
|
||||
case schemapb.DataType_Timestamptz:
|
||||
return fieldSchema.GetDefaultValue().GetTimestamptzData()
|
||||
case schemapb.DataType_JSON:
|
||||
return fieldSchema.GetDefaultValue().GetBytesData()
|
||||
default:
|
||||
// won't happen
|
||||
panic(fmt.Sprintf("undefined data type:%s", fieldSchema.DataType.String()))
|
||||
|
||||
@ -2083,3 +2083,478 @@ func TestBM25Checker(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFillMissingFields(t *testing.T) {
|
||||
t.Run("fill missing field with default value - int64", func(t *testing.T) {
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Name: "test_schema",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: common.RowIDField,
|
||||
Name: common.RowIDFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: common.TimeStampField,
|
||||
Name: common.TimeStampFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "pk",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Name: "field_with_default",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
DefaultValue: &schemapb.ValueField{
|
||||
Data: &schemapb.ValueField_LongData{LongData: 42},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
insertData := &InsertData{
|
||||
Data: map[FieldID]FieldData{
|
||||
common.RowIDField: &Int64FieldData{Data: []int64{1, 2, 3}},
|
||||
common.TimeStampField: &Int64FieldData{Data: []int64{100, 200, 300}},
|
||||
100: &Int64FieldData{Data: []int64{10, 20, 30}},
|
||||
// Field 101 is missing
|
||||
},
|
||||
}
|
||||
|
||||
err := fillMissingFields(schema, insertData)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Check that field 101 was filled with default value
|
||||
field101, exists := insertData.Data[101]
|
||||
assert.True(t, exists)
|
||||
assert.Equal(t, 3, field101.RowNum())
|
||||
|
||||
int64Field := field101.(*Int64FieldData)
|
||||
assert.Equal(t, []int64{42, 42, 42}, int64Field.Data)
|
||||
})
|
||||
|
||||
t.Run("fill missing field with default value - json", func(t *testing.T) {
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Name: "test_schema",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: common.RowIDField,
|
||||
Name: common.RowIDFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: common.TimeStampField,
|
||||
Name: common.TimeStampFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "pk",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Name: "json_with_default",
|
||||
DataType: schemapb.DataType_JSON,
|
||||
DefaultValue: &schemapb.ValueField{
|
||||
Data: &schemapb.ValueField_BytesData{BytesData: []byte("{}")},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
insertData := &InsertData{
|
||||
Data: map[FieldID]FieldData{
|
||||
common.RowIDField: &Int64FieldData{Data: []int64{1, 2, 3}},
|
||||
common.TimeStampField: &Int64FieldData{Data: []int64{100, 200, 300}},
|
||||
100: &Int64FieldData{Data: []int64{10, 20, 30}},
|
||||
// Field 101 is missing
|
||||
},
|
||||
}
|
||||
|
||||
err := fillMissingFields(schema, insertData)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Check that field 101 was filled with default value
|
||||
field101, exists := insertData.Data[101]
|
||||
assert.True(t, exists)
|
||||
assert.Equal(t, 3, field101.RowNum())
|
||||
|
||||
jsonField := field101.(*JSONFieldData)
|
||||
assert.Equal(t, [][]byte{[]byte("{}"), []byte("{}"), []byte("{}")}, jsonField.Data)
|
||||
})
|
||||
|
||||
t.Run("fill missing field with default value - various types", func(t *testing.T) {
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Name: "test_schema",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: common.RowIDField,
|
||||
Name: common.RowIDFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: common.TimeStampField,
|
||||
Name: common.TimeStampFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "pk",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Name: "bool_default",
|
||||
DataType: schemapb.DataType_Bool,
|
||||
DefaultValue: &schemapb.ValueField{
|
||||
Data: &schemapb.ValueField_BoolData{BoolData: true},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 102,
|
||||
Name: "int32_default",
|
||||
DataType: schemapb.DataType_Int32,
|
||||
DefaultValue: &schemapb.ValueField{
|
||||
Data: &schemapb.ValueField_IntData{IntData: 123},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 103,
|
||||
Name: "float_default",
|
||||
DataType: schemapb.DataType_Float,
|
||||
DefaultValue: &schemapb.ValueField{
|
||||
Data: &schemapb.ValueField_FloatData{FloatData: 3.14},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 104,
|
||||
Name: "string_default",
|
||||
DataType: schemapb.DataType_VarChar,
|
||||
DefaultValue: &schemapb.ValueField{
|
||||
Data: &schemapb.ValueField_StringData{StringData: "default_value"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
insertData := &InsertData{
|
||||
Data: map[FieldID]FieldData{
|
||||
common.RowIDField: &Int64FieldData{Data: []int64{1, 2}},
|
||||
common.TimeStampField: &Int64FieldData{Data: []int64{100, 200}},
|
||||
100: &Int64FieldData{Data: []int64{10, 20}},
|
||||
},
|
||||
}
|
||||
|
||||
err := fillMissingFields(schema, insertData)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Check bool field
|
||||
field101, exists := insertData.Data[101]
|
||||
assert.True(t, exists)
|
||||
boolField := field101.(*BoolFieldData)
|
||||
assert.Equal(t, []bool{true, true}, boolField.Data)
|
||||
|
||||
// Check int32 field
|
||||
field102, exists := insertData.Data[102]
|
||||
assert.True(t, exists)
|
||||
int32Field := field102.(*Int32FieldData)
|
||||
assert.Equal(t, []int32{123, 123}, int32Field.Data)
|
||||
|
||||
// Check float field
|
||||
field103, exists := insertData.Data[103]
|
||||
assert.True(t, exists)
|
||||
floatField := field103.(*FloatFieldData)
|
||||
assert.Equal(t, []float32{3.14, 3.14}, floatField.Data)
|
||||
|
||||
// Check string field
|
||||
field104, exists := insertData.Data[104]
|
||||
assert.True(t, exists)
|
||||
stringField := field104.(*StringFieldData)
|
||||
assert.Equal(t, []string{"default_value", "default_value"}, stringField.Data)
|
||||
})
|
||||
|
||||
t.Run("fill missing nullable field with null", func(t *testing.T) {
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Name: "test_schema",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: common.RowIDField,
|
||||
Name: common.RowIDFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: common.TimeStampField,
|
||||
Name: common.TimeStampFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "pk",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Name: "nullable_field",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
Nullable: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
insertData := &InsertData{
|
||||
Data: map[FieldID]FieldData{
|
||||
common.RowIDField: &Int64FieldData{Data: []int64{1, 2, 3}},
|
||||
common.TimeStampField: &Int64FieldData{Data: []int64{100, 200, 300}},
|
||||
100: &Int64FieldData{Data: []int64{10, 20, 30}},
|
||||
},
|
||||
}
|
||||
|
||||
err := fillMissingFields(schema, insertData)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Check that field 101 exists and has null values
|
||||
field101, exists := insertData.Data[101]
|
||||
assert.True(t, exists)
|
||||
assert.Equal(t, 3, field101.RowNum())
|
||||
|
||||
int64Field := field101.(*Int64FieldData)
|
||||
assert.Equal(t, 3, len(int64Field.ValidData))
|
||||
// All values should be marked as invalid (null)
|
||||
assert.Equal(t, []bool{false, false, false}, int64Field.ValidData)
|
||||
})
|
||||
|
||||
t.Run("error when field is not nullable and has no default", func(t *testing.T) {
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Name: "test_schema",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: common.RowIDField,
|
||||
Name: common.RowIDFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: common.TimeStampField,
|
||||
Name: common.TimeStampFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "pk",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Name: "required_field",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
Nullable: false,
|
||||
// No default value
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
insertData := &InsertData{
|
||||
Data: map[FieldID]FieldData{
|
||||
common.RowIDField: &Int64FieldData{Data: []int64{1, 2, 3}},
|
||||
common.TimeStampField: &Int64FieldData{Data: []int64{100, 200, 300}},
|
||||
100: &Int64FieldData{Data: []int64{10, 20, 30}},
|
||||
},
|
||||
}
|
||||
|
||||
err := fillMissingFields(schema, insertData)
|
||||
assert.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "required_field")
|
||||
assert.Contains(t, err.Error(), "not nullable and has no default value")
|
||||
})
|
||||
|
||||
t.Run("skip system fields", func(t *testing.T) {
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Name: "test_schema",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: common.RowIDField, // Field ID < 100, should be skipped
|
||||
Name: common.RowIDFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: common.TimeStampField, // Field ID < 100, should be skipped
|
||||
Name: common.TimeStampFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "pk",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
insertData := &InsertData{
|
||||
Data: map[FieldID]FieldData{
|
||||
100: &Int64FieldData{Data: []int64{10, 20, 30}},
|
||||
// System fields (RowID, TimeStamp) are missing but should be skipped
|
||||
},
|
||||
}
|
||||
|
||||
err := fillMissingFields(schema, insertData)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// System fields should not be added
|
||||
_, existsRowID := insertData.Data[common.RowIDField]
|
||||
assert.False(t, existsRowID)
|
||||
_, existsTimestamp := insertData.Data[common.TimeStampField]
|
||||
assert.False(t, existsTimestamp)
|
||||
})
|
||||
|
||||
t.Run("skip function output fields", func(t *testing.T) {
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Name: "test_schema",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: common.RowIDField,
|
||||
Name: common.RowIDFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: common.TimeStampField,
|
||||
Name: common.TimeStampFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "pk",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Name: "function_output",
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
IsFunctionOutput: true, // Should be skipped
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
insertData := &InsertData{
|
||||
Data: map[FieldID]FieldData{
|
||||
common.RowIDField: &Int64FieldData{Data: []int64{1, 2, 3}},
|
||||
common.TimeStampField: &Int64FieldData{Data: []int64{100, 200, 300}},
|
||||
100: &Int64FieldData{Data: []int64{10, 20, 30}},
|
||||
},
|
||||
}
|
||||
|
||||
err := fillMissingFields(schema, insertData)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Function output field should not be added
|
||||
_, exists := insertData.Data[101]
|
||||
assert.False(t, exists)
|
||||
})
|
||||
|
||||
t.Run("all fields present - no filling needed", func(t *testing.T) {
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Name: "test_schema",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: common.RowIDField,
|
||||
Name: common.RowIDFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: common.TimeStampField,
|
||||
Name: common.TimeStampFieldName,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "pk",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Name: "field1",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
insertData := &InsertData{
|
||||
Data: map[FieldID]FieldData{
|
||||
common.RowIDField: &Int64FieldData{Data: []int64{1, 2, 3}},
|
||||
common.TimeStampField: &Int64FieldData{Data: []int64{100, 200, 300}},
|
||||
100: &Int64FieldData{Data: []int64{10, 20, 30}},
|
||||
101: &Int64FieldData{Data: []int64{100, 200, 300}},
|
||||
},
|
||||
}
|
||||
|
||||
originalDataLen := len(insertData.Data)
|
||||
|
||||
err := fillMissingFields(schema, insertData)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// No new fields should be added
|
||||
assert.Equal(t, originalDataLen, len(insertData.Data))
|
||||
})
|
||||
|
||||
t.Run("empty insert data", func(t *testing.T) {
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Name: "test_schema",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "pk",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Name: "field_with_default",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
DefaultValue: &schemapb.ValueField{
|
||||
Data: &schemapb.ValueField_LongData{LongData: 42},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
insertData := &InsertData{
|
||||
Data: map[FieldID]FieldData{
|
||||
100: &Int64FieldData{Data: []int64{}},
|
||||
},
|
||||
}
|
||||
|
||||
err := fillMissingFields(schema, insertData)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Field should be created but empty
|
||||
field101, exists := insertData.Data[101]
|
||||
assert.True(t, exists)
|
||||
assert.Equal(t, 0, field101.RowNum())
|
||||
})
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user