From ddccccbcab7c048fcd804364e5373e5e495130bb Mon Sep 17 00:00:00 2001 From: SimFG Date: Thu, 18 Jan 2024 22:18:55 +0800 Subject: [PATCH] enhance: add the bytes data type for merge data and format some code (#30105) /kind improvement Signed-off-by: SimFG --- internal/proxy/util.go | 2 +- internal/querycoordv2/meta/target_manager.go | 12 ++--- internal/querynodev2/services.go | 2 +- pkg/util/typeutil/schema.go | 10 ++++ pkg/util/typeutil/schema_test.go | 54 ++++++++++++++++++++ 5 files changed, 72 insertions(+), 8 deletions(-) diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 5f9c1995ea..5a7f8264e1 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -750,7 +750,7 @@ func ValidateUsername(username string) error { firstChar := username[0] if !isAlpha(firstChar) { - return merr.WrapErrParameterInvalidMsg("invalid user name %s, the first character must be a letter, but got %s", username, firstChar) + return merr.WrapErrParameterInvalidMsg("invalid user name %s, the first character must be a letter, but got %s", username, string(firstChar)) } usernameSize := len(username) diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index dc220c07f3..4813b335e2 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -332,19 +332,19 @@ func (mgr *TargetManager) removePartitionFromCollectionTarget(oldTarget *Collect func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID int64) *CollectionTarget { switch scope { case CurrentTarget: - return mgr.current.collectionTargetMap[collectionID] + return mgr.current.getCollectionTarget(collectionID) case NextTarget: - return mgr.next.collectionTargetMap[collectionID] + return mgr.next.getCollectionTarget(collectionID) case CurrentTargetFirst: - if current := mgr.current.collectionTargetMap[collectionID]; current != nil { + if current := mgr.current.getCollectionTarget(collectionID); current != nil { return current } - return mgr.next.collectionTargetMap[collectionID] + return mgr.next.getCollectionTarget(collectionID) case NextTargetFirst: - if next := mgr.next.collectionTargetMap[collectionID]; next != nil { + if next := mgr.next.getCollectionTarget(collectionID); next != nil { return next } - return mgr.current.collectionTargetMap[collectionID] + return mgr.current.getCollectionTarget(collectionID) } return nil } diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index a3cb22f7c8..5254c42464 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -423,7 +423,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen if err := node.lifetime.Add(merr.IsHealthy); err != nil { return merr.Status(err), nil } - node.lifetime.Done() + defer node.lifetime.Done() // check target matches if err := merr.CheckTargetID(req.GetBase()); err != nil { diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 54a928a7bd..3a6f7d0681 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -848,6 +848,16 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) error } else { dstScalar.GetJsonData().Data = append(dstScalar.GetJsonData().Data, srcScalar.JsonData.Data...) } + case *schemapb.ScalarField_BytesData: + if dstScalar.GetBytesData() == nil { + dstScalar.Data = &schemapb.ScalarField_BytesData{ + BytesData: &schemapb.BytesArray{ + Data: srcScalar.BytesData.Data, + }, + } + } else { + dstScalar.GetBytesData().Data = append(dstScalar.GetBytesData().Data, srcScalar.BytesData.Data...) + } default: log.Error("Not supported data type", zap.String("data type", srcFieldData.Type.String())) return errors.New("unsupported data type: " + srcFieldData.Type.String()) diff --git a/pkg/util/typeutil/schema_test.go b/pkg/util/typeutil/schema_test.go index a70835204b..ade19afbfc 100644 --- a/pkg/util/typeutil/schema_test.go +++ b/pkg/util/typeutil/schema_test.go @@ -1305,6 +1305,30 @@ func TestMergeFieldData(t *testing.T) { }, }, }, 1), + { + Type: schemapb.DataType_Array, + FieldName: "bytes", + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BytesData{}, + }, + }, + FieldId: 104, + }, + { + Type: schemapb.DataType_Array, + FieldName: "bytes", + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BytesData{ + BytesData: &schemapb.BytesArray{ + Data: [][]byte{[]byte("hello"), []byte("world")}, + }, + }, + }, + }, + FieldId: 105, + }, } srcFields := []*schemapb.FieldData{ @@ -1320,6 +1344,34 @@ func TestMergeFieldData(t *testing.T) { }, }, }, 1), + { + Type: schemapb.DataType_Array, + FieldName: "bytes", + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BytesData{ + BytesData: &schemapb.BytesArray{ + Data: [][]byte{[]byte("hoo"), []byte("foo")}, + }, + }, + }, + }, + FieldId: 104, + }, + { + Type: schemapb.DataType_Array, + FieldName: "bytes", + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BytesData{ + BytesData: &schemapb.BytesArray{ + Data: [][]byte{[]byte("hoo")}, + }, + }, + }, + }, + FieldId: 105, + }, } err := MergeFieldData(dstFields, srcFields) @@ -1346,6 +1398,8 @@ func TestMergeFieldData(t *testing.T) { }, }, dstFields[3].GetScalars().GetArrayData().Data) + assert.Equal(t, [][]byte{[]byte("hoo"), []byte("foo")}, dstFields[4].GetScalars().GetBytesData().Data) + assert.Equal(t, [][]byte{[]byte("hello"), []byte("world"), []byte("hoo")}, dstFields[5].GetScalars().GetBytesData().Data) }) t.Run("merge with nil", func(t *testing.T) {