diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index 0cc2656fe2..d7e8e1af88 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -128,7 +128,7 @@ func (suite *JobSuite) SetupSuite() { suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything). Return(nil, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything). + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything). Return(nil, nil) suite.cluster = session.NewMockCluster(suite.T()) @@ -1192,10 +1192,10 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() { // call LoadPartitions failed at get index info getIndexErr := fmt.Errorf("mock get index error") suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool { - return call.Method != "DescribeIndex" + return call.Method != "ListIndexes" }) for _, collection := range suite.collections { - suite.broker.EXPECT().DescribeIndex(mock.Anything, collection).Return(nil, getIndexErr) + suite.broker.EXPECT().ListIndexes(mock.Anything, collection).Return(nil, getIndexErr) loadCollectionReq := &querypb.LoadCollectionRequest{ CollectionID: collection, } @@ -1281,10 +1281,10 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() { } suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool { - return call.Method != "DescribeIndex" && call.Method != "DescribeCollection" + return call.Method != "ListIndexes" && call.Method != "DescribeCollection" }) suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return(nil, nil) + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil) } func (suite *JobSuite) TestCallReleasePartitionFailed() { diff --git a/internal/querycoordv2/job/utils.go b/internal/querycoordv2/job/utils.go index 6369dbb46a..39941af090 100644 --- a/internal/querycoordv2/job/utils.go +++ b/internal/querycoordv2/job/utils.go @@ -79,7 +79,7 @@ func loadPartitions(ctx context.Context, } schema = collectionInfo.GetSchema() } - indexes, err := broker.DescribeIndex(ctx, collection) + indexes, err := broker.ListIndexes(ctx, collection) if err != nil { return err } diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index e739c384d8..d755e5f378 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -43,7 +43,7 @@ type Broker interface { DescribeCollection(ctx context.Context, collectionID UniqueID) (*milvuspb.DescribeCollectionResponse, error) GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) GetRecoveryInfo(ctx context.Context, collectionID UniqueID, partitionID UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error) - DescribeIndex(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) + ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) GetSegmentInfo(ctx context.Context, segmentID ...UniqueID) (*datapb.GetSegmentInfoResponse, error) GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentID UniqueID) ([]*querypb.FieldIndexInfo, error) GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error) @@ -245,7 +245,7 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID return indexes, nil } -func (broker *CoordinatorBroker) DescribeIndex(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) { +func (broker *CoordinatorBroker) describeIndex(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) { ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() @@ -271,3 +271,25 @@ func (broker *CoordinatorBroker) DescribeIndex(ctx context.Context, collectionID } return resp.GetIndexInfos(), nil } + +func (broker *CoordinatorBroker) ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) { + log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID)) + ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) + defer cancel() + + resp, err := broker.dataCoord.ListIndexes(ctx, &indexpb.ListIndexesRequest{ + CollectionID: collectionID, + }) + + err = merr.CheckRPCCall(resp, err) + if err != nil { + if errors.Is(err, merr.ErrServiceUnimplemented) { + log.Warn("datacoord does not implement ListIndex API fallback to DescribeIndex") + return broker.describeIndex(ctx, collectionID) + } + log.Warn("failed to fetch index meta", zap.Error(err)) + return nil, err + } + + return resp.GetIndexInfos(), nil +} diff --git a/internal/querycoordv2/meta/coordinator_broker_test.go b/internal/querycoordv2/meta/coordinator_broker_test.go index bcd75b7ba5..476a997dd2 100644 --- a/internal/querycoordv2/meta/coordinator_broker_test.go +++ b/internal/querycoordv2/meta/coordinator_broker_test.go @@ -270,7 +270,7 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() { return &indexpb.IndexInfo{IndexID: id} }), }, nil) - infos, err := s.broker.DescribeIndex(ctx, collectionID) + infos, err := s.broker.describeIndex(ctx, collectionID) s.NoError(err) s.ElementsMatch(indexIDs, lo.Map(infos, func(info *indexpb.IndexInfo, _ int) int64 { return info.GetIndexID() })) s.resetMock() @@ -280,7 +280,7 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() { s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything). Return(nil, errors.New("mock")) - _, err := s.broker.DescribeIndex(ctx, collectionID) + _, err := s.broker.describeIndex(ctx, collectionID) s.Error(err) s.resetMock() }) @@ -291,7 +291,7 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() { Status: merr.Status(errors.New("mocked")), }, nil) - _, err := s.broker.DescribeIndex(ctx, collectionID) + _, err := s.broker.describeIndex(ctx, collectionID) s.Error(err) s.resetMock() }) @@ -311,12 +311,69 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() { }), }, nil) - _, err := s.broker.DescribeIndex(ctx, collectionID) + _, err := s.broker.describeIndex(ctx, collectionID) s.NoError(err) s.resetMock() }) } +func (s *CoordinatorBrokerDataCoordSuite) TestListIndexes() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + collectionID := int64(100) + + s.Run("normal_case", func() { + indexIDs := []int64{1, 2} + s.datacoord.EXPECT().ListIndexes(mock.Anything, mock.Anything). + Return(&indexpb.ListIndexesResponse{ + Status: merr.Status(nil), + IndexInfos: lo.Map(indexIDs, func(id int64, _ int) *indexpb.IndexInfo { + return &indexpb.IndexInfo{IndexID: id} + }), + }, nil).Once() + infos, err := s.broker.ListIndexes(ctx, collectionID) + s.NoError(err) + s.ElementsMatch(indexIDs, lo.Map(infos, func(info *indexpb.IndexInfo, _ int) int64 { return info.GetIndexID() })) + }) + + s.Run("datacoord_return_error", func() { + s.datacoord.EXPECT().ListIndexes(mock.Anything, mock.Anything). + Return(nil, errors.New("mocked")).Once() + + _, err := s.broker.ListIndexes(ctx, collectionID) + s.Error(err) + }) + + s.Run("datacoord_return_failure_status", func() { + s.datacoord.EXPECT().ListIndexes(mock.Anything, mock.Anything). + Return(&indexpb.ListIndexesResponse{ + Status: merr.Status(errors.New("mocked")), + }, nil).Once() + + _, err := s.broker.ListIndexes(ctx, collectionID) + s.Error(err) + }) + + s.Run("datacoord_return_unimplemented", func() { + // mock old version datacoord return unimplemented + s.datacoord.EXPECT().ListIndexes(mock.Anything, mock.Anything). + Return(nil, merr.ErrServiceUnimplemented).Once() + + // mock retry on old version datacoord descibe index + indexIDs := []int64{1, 2} + s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything). + Return(&indexpb.DescribeIndexResponse{ + Status: merr.Status(nil), + IndexInfos: lo.Map(indexIDs, func(id int64, _ int) *indexpb.IndexInfo { + return &indexpb.IndexInfo{IndexID: id} + }), + }, nil).Once() + + _, err := s.broker.ListIndexes(ctx, collectionID) + s.NoError(err) + }) +} + func (s *CoordinatorBrokerDataCoordSuite) TestSegmentInfo() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/querycoordv2/meta/mock_broker.go b/internal/querycoordv2/meta/mock_broker.go index 9ba70eb8c4..ff35489855 100644 --- a/internal/querycoordv2/meta/mock_broker.go +++ b/internal/querycoordv2/meta/mock_broker.go @@ -83,61 +83,6 @@ func (_c *MockBroker_DescribeCollection_Call) RunAndReturn(run func(context.Cont return _c } -// DescribeIndex provides a mock function with given fields: ctx, collectionID -func (_m *MockBroker) DescribeIndex(ctx context.Context, collectionID int64) ([]*indexpb.IndexInfo, error) { - ret := _m.Called(ctx, collectionID) - - var r0 []*indexpb.IndexInfo - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64) ([]*indexpb.IndexInfo, error)); ok { - return rf(ctx, collectionID) - } - if rf, ok := ret.Get(0).(func(context.Context, int64) []*indexpb.IndexInfo); ok { - r0 = rf(ctx, collectionID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*indexpb.IndexInfo) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { - r1 = rf(ctx, collectionID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockBroker_DescribeIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DescribeIndex' -type MockBroker_DescribeIndex_Call struct { - *mock.Call -} - -// DescribeIndex is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -func (_e *MockBroker_Expecter) DescribeIndex(ctx interface{}, collectionID interface{}) *MockBroker_DescribeIndex_Call { - return &MockBroker_DescribeIndex_Call{Call: _e.mock.On("DescribeIndex", ctx, collectionID)} -} - -func (_c *MockBroker_DescribeIndex_Call) Run(run func(ctx context.Context, collectionID int64)) *MockBroker_DescribeIndex_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64)) - }) - return _c -} - -func (_c *MockBroker_DescribeIndex_Call) Return(_a0 []*indexpb.IndexInfo, _a1 error) *MockBroker_DescribeIndex_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockBroker_DescribeIndex_Call) RunAndReturn(run func(context.Context, int64) ([]*indexpb.IndexInfo, error)) *MockBroker_DescribeIndex_Call { - _c.Call.Return(run) - return _c -} - // GetIndexInfo provides a mock function with given fields: ctx, collectionID, segmentID func (_m *MockBroker) GetIndexInfo(ctx context.Context, collectionID int64, segmentID int64) ([]*querypb.FieldIndexInfo, error) { ret := _m.Called(ctx, collectionID, segmentID) @@ -462,6 +407,61 @@ func (_c *MockBroker_GetSegmentInfo_Call) RunAndReturn(run func(context.Context, return _c } +// ListIndexes provides a mock function with given fields: ctx, collectionID +func (_m *MockBroker) ListIndexes(ctx context.Context, collectionID int64) ([]*indexpb.IndexInfo, error) { + ret := _m.Called(ctx, collectionID) + + var r0 []*indexpb.IndexInfo + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64) ([]*indexpb.IndexInfo, error)); ok { + return rf(ctx, collectionID) + } + if rf, ok := ret.Get(0).(func(context.Context, int64) []*indexpb.IndexInfo); ok { + r0 = rf(ctx, collectionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*indexpb.IndexInfo) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, collectionID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockBroker_ListIndexes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListIndexes' +type MockBroker_ListIndexes_Call struct { + *mock.Call +} + +// ListIndexes is a helper method to define mock.On call +// - ctx context.Context +// - collectionID int64 +func (_e *MockBroker_Expecter) ListIndexes(ctx interface{}, collectionID interface{}) *MockBroker_ListIndexes_Call { + return &MockBroker_ListIndexes_Call{Call: _e.mock.On("ListIndexes", ctx, collectionID)} +} + +func (_c *MockBroker_ListIndexes_Call) Run(run func(ctx context.Context, collectionID int64)) *MockBroker_ListIndexes_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *MockBroker_ListIndexes_Call) Return(_a0 []*indexpb.IndexInfo, _a1 error) *MockBroker_ListIndexes_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockBroker_ListIndexes_Call) RunAndReturn(run func(context.Context, int64) ([]*indexpb.IndexInfo, error)) *MockBroker_ListIndexes_Call { + _c.Call.Return(run) + return _c +} + // NewMockBroker creates a new instance of MockBroker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockBroker(t interface { diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 1b2bf2ec25..7976aa2a0f 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -383,7 +383,7 @@ func (ob *TargetObserver) sync(ctx context.Context, replicaID int64, leaderView } // Get collection index info - indexInfo, err := ob.broker.DescribeIndex(ctx, collectionInfo.GetCollectionID()) + indexInfo, err := ob.broker.ListIndexes(ctx, collectionInfo.GetCollectionID()) if err != nil { log.Warn("fail to get index info of collection", zap.Error(err)) return false diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 6372e1d4af..0685514992 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -586,7 +586,7 @@ func (suite *ServerSuite) hackServer() { ) suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{Schema: &schemapb.CollectionSchema{}}, nil).Maybe() - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() for _, collection := range suite.collections { suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe() suite.expectGetRecoverInfo(collection) diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 77160df243..67e7bb3c8d 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -1731,7 +1731,7 @@ func (suite *ServiceSuite) expectGetRecoverInfo(collection int64) { func (suite *ServiceSuite) expectLoadPartitions() { suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything). Return(nil, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything). + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything). Return(nil, nil) suite.cluster.EXPECT().LoadPartitions(mock.Anything, mock.Anything, mock.Anything). Return(merr.Success(), nil) diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index c027e631d6..9c83b326ef 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -312,7 +312,7 @@ func (ex *Executor) subscribeChannel(task *ChannelTask, step int) error { log.Warn("failed to get partitions of collection") return err } - indexInfo, err := ex.broker.DescribeIndex(ctx, task.CollectionID()) + indexInfo, err := ex.broker.ListIndexes(ctx, task.CollectionID()) if err != nil { log.Warn("fail to get index meta of collection") return err @@ -594,7 +594,7 @@ func (ex *Executor) getLoadInfo(ctx context.Context, collectionID, segmentID int } // Get collection index info - indexInfos, err := ex.broker.DescribeIndex(ctx, collectionID) + indexInfos, err := ex.broker.ListIndexes(ctx, collectionID) if err != nil { log.Warn("fail to get index meta of collection", zap.Error(err)) return nil, nil, err diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 89653071f3..103b402fb0 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -223,7 +223,7 @@ func (suite *TaskSuite) TestSubscribeChannelTask() { }, }, nil) } - suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ { CollectionID: suite.collection, FieldID: 100, @@ -401,7 +401,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() { }, }, nil }) - suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ { CollectionID: suite.collection, }, @@ -501,7 +501,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() { }, }, nil }) - suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ { CollectionID: suite.collection, }, @@ -803,7 +803,7 @@ func (suite *TaskSuite) TestMoveSegmentTask() { }, }, nil }) - suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ { CollectionID: suite.collection, }, @@ -977,7 +977,7 @@ func (suite *TaskSuite) TestTaskCanceled() { }, }, nil }) - suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ { CollectionID: suite.collection, }, @@ -1068,7 +1068,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() { }, }, nil }) - suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ { CollectionID: suite.collection, }, @@ -1250,7 +1250,7 @@ func (suite *TaskSuite) TestLeaderTaskSet() { }, }, nil }) - suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ { CollectionID: suite.collection, },