From c7dc1c067a57dcc04c21b830fbba5dad15c2b06c Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Fri, 16 Jun 2023 16:50:41 +0800 Subject: [PATCH] Assign metric type in load segment request (#24917) Signed-off-by: bigsheeper --- internal/querycoordv2/task/executor.go | 14 ++++- internal/querycoordv2/task/task_test.go | 79 +++++++++++++++++++++++-- internal/querynodev2/services.go | 11 ++++ internal/querynodev2/services_test.go | 53 ++++++++++++++++- 4 files changed, 150 insertions(+), 7 deletions(-) diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 37f020c8ee..9472136bc0 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -254,9 +254,21 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { return err } + // TODO: improve this, queryCoord should keep index and schema in memory to save RPC. + indexInfo, err := ex.broker.DescribeIndex(ctx, task.CollectionID()) + if err != nil { + log.Warn("fail to get index meta of collection") + return err + } + metricType, err := getMetricType(indexInfo, schema) + if err != nil { + log.Warn("failed to get metric type", zap.Error(err)) + return err + } + loadMeta := packLoadMeta( ex.meta.GetLoadType(task.CollectionID()), - "", + metricType, task.CollectionID(), partitions..., ) diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 35da8c63a2..84084ef0ab 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -358,10 +358,6 @@ func (suite *TaskSuite) TestUnsubscribeChannelTask() { } } -func (suite *TaskSuite) expectationsForLoadSegments() { - -} - func (suite *TaskSuite) TestLoadSegmentTask() { ctx := context.Background() timeout := 10 * time.Second @@ -375,6 +371,9 @@ func (suite *TaskSuite) TestLoadSegmentTask() { // Expect suite.broker.EXPECT().GetCollectionSchema(mock.Anything, suite.collection).Return(&schemapb.CollectionSchema{ Name: "TestLoadSegmentTask", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector}, + }, }, nil) for _, segment := range suite.loadSegments { suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return(&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{ @@ -387,6 +386,18 @@ func (suite *TaskSuite) TestLoadSegmentTask() { }, nil) suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, nil) } + suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + { + CollectionID: suite.collection, + FieldID: 100, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.MetricTypeKey, + Value: "L2", + }, + }, + }, + }, nil) suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil) // Test load segment task @@ -457,6 +468,9 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() { // Expect suite.broker.EXPECT().GetCollectionSchema(mock.Anything, suite.collection).Return(&schemapb.CollectionSchema{ Name: "TestLoadSegmentTask", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector}, + }, }, nil) for _, segment := range suite.loadSegments { suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return(&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{ @@ -469,6 +483,18 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() { }, nil) suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, errors.New("index not ready")) } + suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + { + CollectionID: suite.collection, + FieldID: 100, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.MetricTypeKey, + Value: "L2", + }, + }, + }, + }, nil) // Test load segment task suite.dist.ChannelDistManager.Update(targetNode, meta.DmChannelFromVChannel(&datapb.VchannelInfo{ @@ -650,6 +676,9 @@ func (suite *TaskSuite) TestMoveSegmentTask() { // Expect suite.broker.EXPECT().GetCollectionSchema(mock.Anything, suite.collection).Return(&schemapb.CollectionSchema{ Name: "TestMoveSegmentTask", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector}, + }, }, nil) for _, segment := range suite.moveSegments { suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return(&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{ @@ -662,6 +691,18 @@ func (suite *TaskSuite) TestMoveSegmentTask() { }, nil) suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, nil) } + suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + { + CollectionID: suite.collection, + FieldID: 100, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.MetricTypeKey, + Value: "L2", + }, + }, + }, + }, nil) suite.cluster.EXPECT().LoadSegments(mock.Anything, leader, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil) suite.cluster.EXPECT().ReleaseSegments(mock.Anything, leader, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil) @@ -747,6 +788,9 @@ func (suite *TaskSuite) TestTaskCanceled() { // Expect suite.broker.EXPECT().GetCollectionSchema(mock.Anything, suite.collection).Return(&schemapb.CollectionSchema{ Name: "TestSubscribeChannelTask", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector}, + }, }, nil) for _, segment := range suite.loadSegments { suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return(&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{ @@ -759,6 +803,18 @@ func (suite *TaskSuite) TestTaskCanceled() { }, nil) suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, nil) } + suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + { + CollectionID: suite.collection, + FieldID: 100, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.MetricTypeKey, + Value: "L2", + }, + }, + }, + }, nil) suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil) // Test load segment task @@ -823,6 +879,9 @@ func (suite *TaskSuite) TestSegmentTaskStale() { // Expect suite.broker.EXPECT().GetCollectionSchema(mock.Anything, suite.collection).Return(&schemapb.CollectionSchema{ Name: "TestSegmentTaskStale", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector}, + }, }, nil) for _, segment := range suite.loadSegments { suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return(&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{ @@ -835,6 +894,18 @@ func (suite *TaskSuite) TestSegmentTaskStale() { }, nil) suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, nil) } + suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + { + CollectionID: suite.collection, + FieldID: 100, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.MetricTypeKey, + Value: "L2", + }, + }, + }, + }, nil) suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil) // Test load segment task diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index df27a3a578..c38ec002e4 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -433,6 +433,11 @@ func (node *QueryNode) LoadPartitions(ctx context.Context, req *querypb.LoadPart if err != nil { return merr.Status(err), nil } + // check metric type + if metricType == "" { + err := fmt.Errorf("empty metric type, collection = %d", req.GetCollectionID()) + return merr.Status(err), nil + } node.manager.Collection.Put(req.GetCollectionID(), req.GetSchema(), &segcorepb.CollectionIndexMeta{ IndexMetas: fieldIndexMetas, MaxIndexRowCount: maxIndexRecordPerSegment, @@ -480,6 +485,12 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen return node.loadDeltaLogs(ctx, req), nil } + // check metric type + if req.GetLoadMeta().GetMetricType() == "" { + err := fmt.Errorf("empty metric type, collection = %d", req.GetCollectionID()) + return merr.Status(err), nil + } + node.manager.Collection.Put(req.GetCollectionID(), req.GetSchema(), nil, req.GetLoadMeta()) // Delegates request to workers diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 29638f6dd8..bf6edc346b 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -494,6 +494,9 @@ func (suite *ServiceSuite) TestLoadSegments_Int64() { Schema: schema, DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}}, NeedTransfer: true, + LoadMeta: &querypb.LoadMetaInfo{ + MetricType: defaultMetricType, + }, } // LoadSegment @@ -507,13 +510,14 @@ func (suite *ServiceSuite) TestLoadSegments_VarChar() { suite.TestWatchDmChannelsVarchar() // data schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_VarChar) - LoadMeta := &querypb.LoadMetaInfo{ + loadMeta := &querypb.LoadMetaInfo{ LoadType: querypb.LoadType_LoadCollection, CollectionID: suite.collectionID, PartitionIDs: suite.partitionIDs, + MetricType: defaultMetricType, } suite.node.manager.Collection = segments.NewCollectionManager() - suite.node.manager.Collection.Put(suite.collectionID, schema, nil, LoadMeta) + suite.node.manager.Collection.Put(suite.collectionID, schema, nil, loadMeta) req := &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), @@ -525,6 +529,7 @@ func (suite *ServiceSuite) TestLoadSegments_VarChar() { Schema: schema, DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}}, NeedTransfer: true, + LoadMeta: loadMeta, } // LoadSegment @@ -549,6 +554,9 @@ func (suite *ServiceSuite) TestLoadDeltaInt64() { Schema: schema, NeedTransfer: true, LoadScope: querypb.LoadScope_Delta, + LoadMeta: &querypb.LoadMetaInfo{ + MetricType: defaultMetricType, + }, } // LoadSegment @@ -573,6 +581,9 @@ func (suite *ServiceSuite) TestLoadDeltaVarchar() { Schema: schema, NeedTransfer: true, LoadScope: querypb.LoadScope_Delta, + LoadMeta: &querypb.LoadMetaInfo{ + MetricType: defaultMetricType, + }, } // LoadSegment @@ -595,6 +606,9 @@ func (suite *ServiceSuite) TestLoadSegments_Failed() { Infos: suite.genSegmentLoadInfos(schema), Schema: schema, NeedTransfer: true, + LoadMeta: &querypb.LoadMetaInfo{ + MetricType: defaultMetricType, + }, } // Delegator not found @@ -614,6 +628,14 @@ func (suite *ServiceSuite) TestLoadSegments_Failed() { status, err = suite.node.LoadSegments(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode()) + + // no metric type + req.LoadMeta = nil + req.Base.TargetID = paramtable.GetNodeID() + suite.node.UpdateStateCode(commonpb.StateCode_Healthy) + status, err = suite.node.LoadSegments(ctx, req) + suite.NoError(err) + suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) } func (suite *ServiceSuite) TestLoadSegments_Transfer() { @@ -637,6 +659,9 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() { Infos: suite.genSegmentLoadInfos(schema), Schema: schema, NeedTransfer: true, + LoadMeta: &querypb.LoadMetaInfo{ + MetricType: defaultMetricType, + }, } // LoadSegment @@ -658,6 +683,9 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() { Infos: suite.genSegmentLoadInfos(schema), Schema: schema, NeedTransfer: true, + LoadMeta: &querypb.LoadMetaInfo{ + MetricType: defaultMetricType, + }, } // LoadSegment @@ -684,8 +712,12 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() { Infos: suite.genSegmentLoadInfos(schema), Schema: schema, NeedTransfer: true, + LoadMeta: &querypb.LoadMetaInfo{ + MetricType: defaultMetricType, + }, } + // LoadSegment // LoadSegment status, err := suite.node.LoadSegments(ctx, req) suite.NoError(err) @@ -1540,6 +1572,23 @@ func (suite *ServiceSuite) TestLoadPartition() { suite.NoError(err) suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + // empty metric type + req.IndexInfoList = []*indexpb.IndexInfo{ + { + CollectionID: suite.collectionID, + FieldID: vecField.GetFieldID(), + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.MetricTypeKey, + Value: "", + }, + }, + }, + } + status, err = suite.node.LoadPartitions(ctx, req) + suite.NoError(err) + suite.Equal(commonpb.ErrorCode_UnexpectedError, status.ErrorCode) + // collection not exist and schema is not nil req.IndexInfoList = []*indexpb.IndexInfo{ {