enhance: Add ListIndexes API from datacoord (#31104)

See also #31103

This PR add `listIndexes` API for datacoor server to list all indexes
for provided collection.
Comparing to the existing `DescribeIndex` API, the new one does NOT
check the segment index building progress to ease the burden when
invoking it

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-03-07 17:37:01 +08:00 committed by GitHub
parent 2a047103d6
commit d81ba164c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 636 additions and 251 deletions

View File

@ -879,3 +879,37 @@ func (s *Server) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoReq
return ret, nil
}
// ListIndexes returns all indexes created on provided collection.
func (s *Server) ListIndexes(ctx context.Context, req *indexpb.ListIndexesRequest) (*indexpb.ListIndexesResponse, error) {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err))
return &indexpb.ListIndexesResponse{
Status: merr.Status(err),
}, nil
}
indexes := s.meta.indexMeta.GetIndexesForCollection(req.GetCollectionID(), "")
indexInfos := lo.Map(indexes, func(index *model.Index, _ int) *indexpb.IndexInfo {
return &indexpb.IndexInfo{
CollectionID: index.CollectionID,
FieldID: index.FieldID,
IndexName: index.IndexName,
IndexID: index.IndexID,
TypeParams: index.TypeParams,
IndexParams: index.IndexParams,
IsAutoIndex: index.IsAutoIndex,
UserIndexParams: index.UserIndexParams,
}
})
log.Info("List index success")
return &indexpb.ListIndexesResponse{
Status: merr.Success(),
IndexInfos: indexInfos,
}, nil
}

View File

@ -1508,6 +1508,155 @@ func TestServer_DescribeIndex(t *testing.T) {
})
}
func TestServer_ListIndexes(t *testing.T) {
var (
collID = UniqueID(1)
fieldID = UniqueID(10)
indexID = UniqueID(100)
indexName = "default_idx"
typeParams = []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
}
indexParams = []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "IVF_FLAT",
},
}
createTS = uint64(1000)
ctx = context.Background()
req = &indexpb.ListIndexesRequest{
CollectionID: collID,
}
)
catalog := catalogmocks.NewDataCoordCatalog(t)
s := &Server{
meta: &meta{
catalog: catalog,
indexMeta: &indexMeta{
catalog: catalog,
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
// finished
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: createTS,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
// deleted
indexID + 1: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 1,
IndexID: indexID + 1,
IndexName: indexName + "_1",
IsDeleted: true,
CreateTime: createTS,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
// unissued
indexID + 2: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 2,
IndexID: indexID + 2,
IndexName: indexName + "_2",
IsDeleted: false,
CreateTime: createTS,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
// inProgress
indexID + 3: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 3,
IndexID: indexID + 3,
IndexName: indexName + "_3",
IsDeleted: false,
CreateTime: createTS,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
// failed
indexID + 4: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 4,
IndexID: indexID + 4,
IndexName: indexName + "_4",
IsDeleted: false,
CreateTime: createTS,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
// unissued
indexID + 5: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 5,
IndexID: indexID + 5,
IndexName: indexName + "_5",
IsDeleted: false,
CreateTime: createTS,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{},
},
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{},
},
},
allocator: newMockAllocator(),
notifyIndexChan: make(chan UniqueID, 1),
}
t.Run("server not available", func(t *testing.T) {
s.stateCode.Store(commonpb.StateCode_Initializing)
resp, err := s.ListIndexes(ctx, req)
assert.NoError(t, err)
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
})
s.stateCode.Store(commonpb.StateCode_Healthy)
t.Run("success", func(t *testing.T) {
resp, err := s.ListIndexes(ctx, req)
assert.NoError(t, err)
// assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.Equal(t, 5, len(resp.GetIndexInfos()))
})
}
func TestServer_GetIndexStatistics(t *testing.T) {
var (
collID = UniqueID(1)

View File

@ -779,3 +779,9 @@ func (c *Client) ListImports(ctx context.Context, in *internalpb.ListImportsRequ
return client.ListImports(ctx, in)
})
}
func (c *Client) ListIndexes(ctx context.Context, in *indexpb.ListIndexesRequest, opts ...grpc.CallOption) (*indexpb.ListIndexesResponse, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.ListIndexesResponse, error) {
return client.ListIndexes(ctx, in)
})
}

View File

@ -2369,3 +2369,51 @@ func Test_GcControl(t *testing.T) {
_, err = client.GcControl(ctx, &datapb.GcControlRequest{})
assert.ErrorIs(t, err, context.DeadlineExceeded)
}
func Test_ListIndexes(t *testing.T) {
paramtable.Init()
ctx := context.Background()
client, err := NewClient(ctx)
assert.NoError(t, err)
assert.NotNil(t, client)
defer client.Close()
mockDC := mocks.NewMockDataCoordClient(t)
mockGrpcClient := mocks.NewMockGrpcClient[datapb.DataCoordClient](t)
mockGrpcClient.EXPECT().Close().Return(nil)
mockGrpcClient.EXPECT().ReCall(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, f func(datapb.DataCoordClient) (interface{}, error)) (interface{}, error) {
return f(mockDC)
})
client.(*Client).grpcClient = mockGrpcClient
// test success
mockDC.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(&indexpb.ListIndexesResponse{
Status: merr.Success(),
}, nil).Once()
_, err = client.ListIndexes(ctx, &indexpb.ListIndexesRequest{})
assert.Nil(t, err)
// test return error status
mockDC.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(
&indexpb.ListIndexesResponse{
Status: merr.Status(merr.ErrServiceNotReady),
}, nil).Once()
rsp, err := client.ListIndexes(ctx, &indexpb.ListIndexesRequest{})
assert.Nil(t, err)
assert.False(t, merr.Ok(rsp.GetStatus()))
// test return error
mockDC.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, mockErr).Once()
_, err = client.ListIndexes(ctx, &indexpb.ListIndexesRequest{})
assert.Error(t, err)
// test ctx done
ctx, cancel := context.WithCancel(ctx)
cancel()
_, err = client.ListIndexes(ctx, &indexpb.ListIndexesRequest{})
assert.ErrorIs(t, err, context.Canceled)
}

View File

@ -501,3 +501,7 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport
func (s *Server) ListImports(ctx context.Context, in *internalpb.ListImportsRequestInternal) (*internalpb.ListImportsResponse, error) {
return s.dataCoord.ListImports(ctx, in)
}
func (s *Server) ListIndexes(ctx context.Context, in *indexpb.ListIndexesRequest) (*indexpb.ListIndexesResponse, error) {
return s.dataCoord.ListIndexes(ctx, in)
}

View File

@ -331,6 +331,15 @@ func Test_NewServer(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, ret)
})
t.Run("ListIndex", func(t *testing.T) {
mockDataCoord.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(&indexpb.ListIndexesResponse{
Status: merr.Success(),
}, nil)
ret, err := server.ListIndexes(ctx, &indexpb.ListIndexesRequest{})
assert.NoError(t, err)
assert.True(t, merr.Ok(ret.GetStatus()))
})
}
func Test_Run(t *testing.T) {

View File

@ -2167,6 +2167,61 @@ func (_c *MockDataCoord_ListImports_Call) RunAndReturn(run func(context.Context,
return _c
}
// ListIndexes provides a mock function with given fields: _a0, _a1
func (_m *MockDataCoord) ListIndexes(_a0 context.Context, _a1 *indexpb.ListIndexesRequest) (*indexpb.ListIndexesResponse, error) {
ret := _m.Called(_a0, _a1)
var r0 *indexpb.ListIndexesResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *indexpb.ListIndexesRequest) (*indexpb.ListIndexesResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *indexpb.ListIndexesRequest) *indexpb.ListIndexesResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*indexpb.ListIndexesResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *indexpb.ListIndexesRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockDataCoord_ListIndexes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListIndexes'
type MockDataCoord_ListIndexes_Call struct {
*mock.Call
}
// ListIndexes is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *indexpb.ListIndexesRequest
func (_e *MockDataCoord_Expecter) ListIndexes(_a0 interface{}, _a1 interface{}) *MockDataCoord_ListIndexes_Call {
return &MockDataCoord_ListIndexes_Call{Call: _e.mock.On("ListIndexes", _a0, _a1)}
}
func (_c *MockDataCoord_ListIndexes_Call) Run(run func(_a0 context.Context, _a1 *indexpb.ListIndexesRequest)) *MockDataCoord_ListIndexes_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*indexpb.ListIndexesRequest))
})
return _c
}
func (_c *MockDataCoord_ListIndexes_Call) Return(_a0 *indexpb.ListIndexesResponse, _a1 error) *MockDataCoord_ListIndexes_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockDataCoord_ListIndexes_Call) RunAndReturn(run func(context.Context, *indexpb.ListIndexesRequest) (*indexpb.ListIndexesResponse, error)) *MockDataCoord_ListIndexes_Call {
_c.Call.Return(run)
return _c
}
// ManualCompaction provides a mock function with given fields: _a0, _a1
func (_m *MockDataCoord) ManualCompaction(_a0 context.Context, _a1 *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
ret := _m.Called(_a0, _a1)

View File

@ -2734,6 +2734,76 @@ func (_c *MockDataCoordClient_ListImports_Call) RunAndReturn(run func(context.Co
return _c
}
// ListIndexes provides a mock function with given fields: ctx, in, opts
func (_m *MockDataCoordClient) ListIndexes(ctx context.Context, in *indexpb.ListIndexesRequest, opts ...grpc.CallOption) (*indexpb.ListIndexesResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *indexpb.ListIndexesResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *indexpb.ListIndexesRequest, ...grpc.CallOption) (*indexpb.ListIndexesResponse, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *indexpb.ListIndexesRequest, ...grpc.CallOption) *indexpb.ListIndexesResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*indexpb.ListIndexesResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *indexpb.ListIndexesRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockDataCoordClient_ListIndexes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListIndexes'
type MockDataCoordClient_ListIndexes_Call struct {
*mock.Call
}
// ListIndexes is a helper method to define mock.On call
// - ctx context.Context
// - in *indexpb.ListIndexesRequest
// - opts ...grpc.CallOption
func (_e *MockDataCoordClient_Expecter) ListIndexes(ctx interface{}, in interface{}, opts ...interface{}) *MockDataCoordClient_ListIndexes_Call {
return &MockDataCoordClient_ListIndexes_Call{Call: _e.mock.On("ListIndexes",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockDataCoordClient_ListIndexes_Call) Run(run func(ctx context.Context, in *indexpb.ListIndexesRequest, opts ...grpc.CallOption)) *MockDataCoordClient_ListIndexes_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*indexpb.ListIndexesRequest), variadicArgs...)
})
return _c
}
func (_c *MockDataCoordClient_ListIndexes_Call) Return(_a0 *indexpb.ListIndexesResponse, _a1 error) *MockDataCoordClient_ListIndexes_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockDataCoordClient_ListIndexes_Call) RunAndReturn(run func(context.Context, *indexpb.ListIndexesRequest, ...grpc.CallOption) (*indexpb.ListIndexesResponse, error)) *MockDataCoordClient_ListIndexes_Call {
_c.Call.Return(run)
return _c
}
// ManualCompaction provides a mock function with given fields: ctx, in, opts
func (_m *MockDataCoordClient) ManualCompaction(ctx context.Context, in *milvuspb.ManualCompactionRequest, opts ...grpc.CallOption) (*milvuspb.ManualCompactionResponse, error) {
_va := make([]interface{}, len(opts))

View File

@ -89,6 +89,7 @@ service DataCoord {
rpc GetIndexStatistics(index.GetIndexStatisticsRequest) returns (index.GetIndexStatisticsResponse) {}
// Deprecated: use DescribeIndex instead
rpc GetIndexBuildProgress(index.GetIndexBuildProgressRequest) returns (index.GetIndexBuildProgressResponse) {}
rpc ListIndexes(index.ListIndexesRequest) returns (index.ListIndexesResponse) {}
rpc GcConfirm(GcConfirmRequest) returns (GcConfirmResponse) {}

View File

@ -334,3 +334,12 @@ message GetIndexStatisticsResponse {
common.Status status = 1;
repeated IndexInfo index_infos = 2;
}
message ListIndexesRequest {
int64 collectionID = 1;
}
message ListIndexesResponse {
common.Status status = 1;
repeated IndexInfo index_infos = 2;
}