Fix insert node doesn't handle JSON data while merging (#24126)

Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
yah01 2023-05-16 16:09:23 +08:00 committed by GitHub
parent 1c093d59ee
commit 3ea4a39078
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 94 additions and 5 deletions

View File

@ -58,7 +58,11 @@ func (iNode *insertNode) addInsertData(insertDatas map[UniqueID]*delegator.Inser
}
insertDatas[msg.SegmentID] = iData
} else {
typeutil.MergeFieldData(iData.InsertRecord.FieldsData, insertRecord.FieldsData)
err := typeutil.MergeFieldData(iData.InsertRecord.FieldsData, insertRecord.FieldsData)
if err != nil {
log.Error("failed to merge field data", zap.Error(err))
panic(err)
}
iData.InsertRecord.NumRows += insertRecord.NumRows
}
@ -79,7 +83,7 @@ func (iNode *insertNode) addInsertData(insertDatas map[UniqueID]*delegator.Inser
zap.Uint64("timestampMax", msg.EndTimestamp))
}
//Insert task
// Insert task
func (iNode *insertNode) Operate(in Msg) Msg {
nodeMsg := in.(*insertNodeMsg)

View File

@ -23,6 +23,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -43,6 +44,8 @@ type InsertNodeSuite struct {
}
func (suite *InsertNodeSuite) SetupSuite() {
paramtable.Init()
suite.collectionName = "test-collection"
suite.collectionID = 111
suite.partitionID = 11
@ -87,6 +90,39 @@ func (suite *InsertNodeSuite) TestBasic() {
suite.Equal(suite.deleteSegmentSum, len(nodeMsg.deleteMsgs))
}
func (suite *InsertNodeSuite) TestDataTypeNotSupported() {
schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64)
in := suite.buildInsertNodeMsg(schema)
collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection)
collection.AddPartition(suite.partitionID)
//init mock
mockCollectionManager := segments.NewMockCollectionManager(suite.T())
mockCollectionManager.EXPECT().Get(suite.collectionID).Return(collection)
mockSegmentManager := segments.NewMockSegmentManager(suite.T())
suite.manager = &segments.Manager{
Collection: mockCollectionManager,
Segment: mockSegmentManager,
}
suite.delegator = delegator.NewMockShardDelegator(suite.T())
for _, msg := range in.insertMsgs {
for _, field := range msg.GetFieldsData() {
field.Type = schemapb.DataType_None
}
}
//TODO mock a delgator for test
node := newInsertNode(suite.collectionID, suite.channel, suite.manager, suite.delegator, 8)
suite.Panics(func() {
node.Operate(in)
})
}
func (suite *InsertNodeSuite) buildInsertNodeMsg(schema *schemapb.CollectionSchema) *insertNodeMsg {
nodeMsg := insertNodeMsg{
insertMsgs: []*InsertMsg{},
@ -101,6 +137,7 @@ func (suite *InsertNodeSuite) buildInsertNodeMsg(schema *schemapb.CollectionSche
insertMsg := buildInsertMsg(suite.collectionID, suite.partitionID, segmentID, suite.channel, 1)
insertMsg.FieldsData = genFiledDataWithSchema(schema, 1)
nodeMsg.insertMsgs = append(nodeMsg.insertMsgs, insertMsg)
nodeMsg.insertMsgs = append(nodeMsg.insertMsgs, insertMsg)
}
for i := 0; i < suite.deleteSegmentSum; i++ {

View File

@ -550,7 +550,7 @@ func DeleteFieldData(dst []*schemapb.FieldData) {
}
// MergeFieldData appends fields data to dst
func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) {
func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) error {
fieldID2Data := make(map[int64]*schemapb.FieldData)
for _, data := range dst {
fieldID2Data[data.FieldId] = data
@ -632,8 +632,19 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) {
} else {
dstScalar.GetStringData().Data = append(dstScalar.GetStringData().Data, srcScalar.StringData.Data...)
}
case *schemapb.ScalarField_JsonData:
if dstScalar.GetJsonData() == nil {
dstScalar.Data = &schemapb.ScalarField_JsonData{
JsonData: &schemapb.JSONArray{
Data: srcScalar.JsonData.Data,
},
}
} else {
dstScalar.GetJsonData().Data = append(dstScalar.GetJsonData().Data, srcScalar.JsonData.Data...)
}
default:
log.Error("Not supported field type", zap.String("field type", srcFieldData.Type.String()))
log.Error("Not supported data type", zap.String("data type", srcFieldData.Type.String()))
return errors.New("unsupported data type: " + srcFieldData.Type.String())
}
case *schemapb.FieldData_Vectors:
dim := fieldType.Vectors.Dim
@ -673,10 +684,13 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) {
dstVector.GetFloatVector().Data = append(dstVector.GetFloatVector().Data, srcVector.FloatVector.Data...)
}
default:
log.Error("Not supported field type", zap.String("field type", srcFieldData.Type.String()))
log.Error("Not supported data type", zap.String("data type", srcFieldData.Type.String()))
return errors.New("unsupported data type: " + srcFieldData.Type.String())
}
}
}
return nil
}
// GetPrimaryFieldSchema get primary field schema from collection schema

View File

@ -1096,3 +1096,37 @@ func TestGetDataAndGetDataSize(t *testing.T) {
assert.Nil(t, invalidDataRes)
})
}
func TestMergeFieldData(t *testing.T) {
dstFields := []*schemapb.FieldData{
genFieldData("int64", 100, schemapb.DataType_Int64, []int64{1, 2, 3}, 1),
genFieldData("vector", 101, schemapb.DataType_FloatVector, []float32{1, 2, 3}, 1),
genFieldData("json", 102, schemapb.DataType_JSON, [][]byte{[]byte(`{"key":"value"}`), []byte(`{"hello":"world"}`)}, 1),
}
srcFields := []*schemapb.FieldData{
genFieldData("int64", 100, schemapb.DataType_Int64, []int64{4, 5, 6}, 1),
genFieldData("vector", 101, schemapb.DataType_FloatVector, []float32{4, 5, 6}, 1),
genFieldData("json", 102, schemapb.DataType_JSON, [][]byte{[]byte(`{"key":"value"}`), []byte(`{"hello":"world"}`)}, 1),
}
err := MergeFieldData(dstFields, srcFields)
assert.Nil(t, err)
assert.Equal(t, []int64{1, 2, 3, 4, 5, 6}, dstFields[0].GetScalars().GetLongData().Data)
assert.Equal(t, []float32{1, 2, 3, 4, 5, 6}, dstFields[1].GetVectors().GetFloatVector().Data)
assert.Equal(t, [][]byte{[]byte(`{"key":"value"}`), []byte(`{"hello":"world"}`), []byte(`{"key":"value"}`), []byte(`{"hello":"world"}`)},
dstFields[2].GetScalars().GetJsonData().Data)
emptyField := &schemapb.FieldData{
Type: schemapb.DataType_None,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: nil,
},
},
}
err = MergeFieldData([]*schemapb.FieldData{emptyField}, []*schemapb.FieldData{emptyField})
assert.Error(t, err)
}