fix: [AddField] Permit missing new nullable field in InsertMsg (#42684)

Related to #41858 #41951 #42084

When insert msg consumer (pipeline/flowgraph) have newer schema than
insertMsg, it have to adapter the insert msg used old schema(missing
newly added field)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-06-13 13:52:35 +08:00 committed by GitHub
parent d59002d45e
commit cbed31933a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 59 additions and 1 deletions

View File

@ -534,8 +534,12 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
srcField, ok := srcFields[field.GetFieldID()] srcField, ok := srcFields[field.GetFieldID()]
if !ok && field.GetFieldID() >= common.StartOfUserFieldID { if !ok && field.GetFieldID() >= common.StartOfUserFieldID {
if !field.GetNullable() {
return nil, merr.WrapErrFieldNotFound(field.GetFieldID(), fmt.Sprintf("field %s not found when converting insert msg to insert data", field.GetName())) return nil, merr.WrapErrFieldNotFound(field.GetFieldID(), fmt.Sprintf("field %s not found when converting insert msg to insert data", field.GetName()))
} }
log.Warn("insert msg missing field but nullable", zap.Int64("fieldID", field.GetFieldID()), zap.String("fieldName", field.GetName()))
continue
}
var fieldData FieldData var fieldData FieldData
switch field.DataType { switch field.DataType {
case schemapb.DataType_FloatVector: case schemapb.DataType_FloatVector:

View File

@ -1360,6 +1360,60 @@ func TestInsertMsgToInsertData2(t *testing.T) {
} }
} }
func TestInsertMsgMissingNullableField(t *testing.T) {
dim := 8
schema := &schemapb.CollectionSchema{
Name: "all_fields_schema",
Description: "all_fields_schema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField,
Name: common.RowIDFieldName,
IsPrimaryKey: false,
Description: "",
DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField,
Name: common.TimeStampFieldName,
IsPrimaryKey: false,
Description: "",
DataType: schemapb.DataType_Int64,
},
{
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
Name: "pk",
FieldID: 100,
},
{
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(dim),
},
},
FieldID: 101,
},
{
DataType: schemapb.DataType_Int64,
Name: "new_field",
Nullable: true,
FieldID: 102,
},
},
}
msg, _, _ := genColumnBasedInsertMsg(schema, 5, dim)
_, err := ColumnBasedInsertMsgToInsertData(msg, schema)
assert.NoError(t, err)
// remove the last field(nullable one)
msg.FieldsData = msg.FieldsData[:len(msg.FieldsData)-1]
_, err = ColumnBasedInsertMsgToInsertData(msg, schema)
assert.NoError(t, err)
}
func TestMergeInsertData(t *testing.T) { func TestMergeInsertData(t *testing.T) {
t.Run("empty data in buffer", func(t *testing.T) { t.Run("empty data in buffer", func(t *testing.T) {
d1 := &InsertData{ d1 := &InsertData{