enhance: Check loaded segments before gc (#42639)

issue: https://github.com/milvus-io/milvus/issues/42412

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
Bingyi Sun 2025-06-13 17:44:38 +08:00 committed by GitHub
parent 4f5409e1fe
commit 1bf960b1a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 1388 additions and 743 deletions

View File

@ -1063,3 +1063,7 @@ func (s *mixCoordImpl) RegisterStreamingCoordGRPCService(server *grpc.Server) {
func (s *mixCoordImpl) GetQuotaMetrics(ctx context.Context, req *internalpb.GetQuotaMetricsRequest) (*internalpb.GetQuotaMetricsResponse, error) {
return s.rootcoordServer.GetQuotaMetrics(ctx, req)
}
func (s *mixCoordImpl) ListLoadedSegments(ctx context.Context, req *querypb.ListLoadedSegmentsRequest) (*querypb.ListLoadedSegmentsResponse, error) {
return s.queryCoordServer.ListLoadedSegments(ctx, req)
}

View File

@ -415,6 +415,7 @@ func (gc *garbageCollector) checkDroppedSegmentGC(segment *SegmentInfo,
// recycleDroppedSegments scans all segments and remove those dropped segments from meta and oss.
func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
start := time.Now()
log := log.With(zap.String("gcName", "recycleDroppedSegments"), zap.Time("startAt", start))
log.Info("start clear dropped segments...")
@ -457,6 +458,17 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
channelCPs[channel] = pos.GetTimestamp()
}
// try to get loaded segments
loadedSegments := typeutil.NewSet[int64]()
segments, err := gc.handler.ListLoadedSegments(ctx)
if err != nil {
log.Warn("failed to get loaded segments", zap.Error(err))
return
}
for _, segmentID := range segments {
loadedSegments.Insert(segmentID)
}
log.Info("start to GC segments", zap.Int("drop_num", len(drops)))
for segmentID, segment := range drops {
if ctx.Err() != nil {
@ -466,6 +478,10 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
log := log.With(zap.Int64("segmentID", segmentID))
segInsertChannel := segment.GetInsertChannel()
if loadedSegments.Contain(segmentID) {
log.Info("skip GC segment since it is loaded", zap.Int64("segmentID", segmentID))
continue
}
if !gc.checkDroppedSegmentGC(segment, compactTo[segment.GetID()], indexedSet, channelCPs[segInsertChannel]) {
continue
}

View File

@ -1733,6 +1733,31 @@ func (s *GarbageCollectorSuite) TestRunRecycleTaskWithPauser() {
s.Equal(cnt, 2)
}
func (s *GarbageCollectorSuite) TestAvoidGCLoadedSegments() {
handler := NewNMockHandler(s.T())
handler.EXPECT().ListLoadedSegments(mock.Anything).Return([]int64{1}, nil).Once()
gc := newGarbageCollector(s.meta, handler, GcOption{
cli: s.cli,
enabled: true,
checkInterval: time.Millisecond * 10,
scanInterval: time.Hour * 7 * 24,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
State: commonpb.SegmentState_Dropped,
DroppedAt: 0,
},
})
gc.recycleDroppedSegments(context.TODO())
seg := s.meta.GetSegment(context.TODO(), 1)
s.NotNil(seg)
}
func TestGarbageCollector(t *testing.T) {
suite.Run(t, new(GarbageCollectorSuite))
}

View File

@ -44,6 +44,7 @@ type Handler interface {
FinishDropChannel(ch string, collectionID int64) error
GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
GetCurrentSegmentsView(ctx context.Context, channel RWChannel, partitionIDs ...UniqueID) *SegmentsView
ListLoadedSegments(ctx context.Context) ([]int64, error)
}
type SegmentsView struct {
@ -605,3 +606,7 @@ func (h *ServerHandler) FinishDropChannel(channel string, collectionID int64) er
return nil
}
func (h *ServerHandler) ListLoadedSegments(ctx context.Context) ([]int64, error) {
return h.s.listLoadedSegments(ctx)
}

View File

@ -350,6 +350,64 @@ func (_c *NMockHandler_GetQueryVChanPositions_Call) RunAndReturn(run func(RWChan
return _c
}
// ListLoadedSegments provides a mock function with given fields: ctx
func (_m *NMockHandler) ListLoadedSegments(ctx context.Context) ([]int64, error) {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for ListLoadedSegments")
}
var r0 []int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) ([]int64, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) []int64); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]int64)
}
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NMockHandler_ListLoadedSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListLoadedSegments'
type NMockHandler_ListLoadedSegments_Call struct {
*mock.Call
}
// ListLoadedSegments is a helper method to define mock.On call
// - ctx context.Context
func (_e *NMockHandler_Expecter) ListLoadedSegments(ctx interface{}) *NMockHandler_ListLoadedSegments_Call {
return &NMockHandler_ListLoadedSegments_Call{Call: _e.mock.On("ListLoadedSegments", ctx)}
}
func (_c *NMockHandler_ListLoadedSegments_Call) Run(run func(ctx context.Context)) *NMockHandler_ListLoadedSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *NMockHandler_ListLoadedSegments_Call) Return(_a0 []int64, _a1 error) *NMockHandler_ListLoadedSegments_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *NMockHandler_ListLoadedSegments_Call) RunAndReturn(run func(context.Context) ([]int64, error)) *NMockHandler_ListLoadedSegments_Call {
_c.Call.Return(run)
return _c
}
// NewNMockHandler creates a new instance of NMockHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewNMockHandler(t interface {

View File

@ -201,6 +201,12 @@ func (m *mockMixCoord) GetQuotaMetrics(ctx context.Context, req *internalpb.GetQ
panic("implement me")
}
func (m *mockMixCoord) ListLoadedSegments(ctx context.Context, req *querypb.ListLoadedSegmentsRequest) (*querypb.ListLoadedSegmentsResponse, error) {
return &querypb.ListLoadedSegmentsResponse{
Status: merr.Success(),
}, nil
}
func newMockMixCoord() *mockMixCoord {
return &mockMixCoord{state: commonpb.StateCode_Healthy}
}
@ -936,6 +942,10 @@ func (h *mockHandler) GetCurrentSegmentsView(ctx context.Context, channel RWChan
return nil
}
func (h *mockHandler) ListLoadedSegments(ctx context.Context) ([]int64, error) {
return nil, nil
}
func newMockHandlerWithMeta(meta *meta) *mockHandler {
return &mockHandler{
meta: meta,

View File

@ -58,6 +58,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/util"
"github.com/milvus-io/milvus/pkg/v2/util/expr"
@ -1217,3 +1218,13 @@ func (s *Server) updateBalanceConfig() bool {
log.RatedDebug(10, "old data node exist", zap.Strings("sessions", lo.Keys(sessions)))
return false
}
func (s *Server) listLoadedSegments(ctx context.Context) ([]int64, error) {
req := &querypb.ListLoadedSegmentsRequest{}
resp, err := s.mixCoord.ListLoadedSegments(ctx, req)
if err := merr.CheckRPCCall(resp, err); err != nil {
return nil, err
}
return resp.SegmentIDs, nil
}

View File

@ -1838,3 +1838,14 @@ func (c *Client) GetQuotaMetrics(ctx context.Context, req *internalpb.GetQuotaMe
return client.GetQuotaMetrics(ctx, req)
})
}
func (c *Client) ListLoadedSegments(ctx context.Context, req *querypb.ListLoadedSegmentsRequest, opts ...grpc.CallOption) (*querypb.ListLoadedSegmentsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
return wrapGrpcCall(ctx, c, func(client MixCoordClient) (*querypb.ListLoadedSegmentsResponse, error) {
return client.ListLoadedSegments(ctx, req)
})
}

View File

@ -911,3 +911,7 @@ func (s *Server) ListIndexes(ctx context.Context, in *indexpb.ListIndexesRequest
func (s *Server) GetQuotaMetrics(ctx context.Context, req *internalpb.GetQuotaMetricsRequest) (*internalpb.GetQuotaMetricsResponse, error) {
return s.mixCoord.GetQuotaMetrics(ctx, req)
}
func (s *Server) ListLoadedSegments(ctx context.Context, req *querypb.ListLoadedSegmentsRequest) (*querypb.ListLoadedSegmentsResponse, error) {
return s.mixCoord.ListLoadedSegments(ctx, req)
}

View File

@ -5431,6 +5431,65 @@ func (_c *MixCoord_ListIndexes_Call) RunAndReturn(run func(context.Context, *ind
return _c
}
// ListLoadedSegments provides a mock function with given fields: _a0, _a1
func (_m *MixCoord) ListLoadedSegments(_a0 context.Context, _a1 *querypb.ListLoadedSegmentsRequest) (*querypb.ListLoadedSegmentsResponse, error) {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for ListLoadedSegments")
}
var r0 *querypb.ListLoadedSegmentsResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.ListLoadedSegmentsRequest) (*querypb.ListLoadedSegmentsResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.ListLoadedSegmentsRequest) *querypb.ListLoadedSegmentsResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*querypb.ListLoadedSegmentsResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.ListLoadedSegmentsRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MixCoord_ListLoadedSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListLoadedSegments'
type MixCoord_ListLoadedSegments_Call struct {
*mock.Call
}
// ListLoadedSegments is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *querypb.ListLoadedSegmentsRequest
func (_e *MixCoord_Expecter) ListLoadedSegments(_a0 interface{}, _a1 interface{}) *MixCoord_ListLoadedSegments_Call {
return &MixCoord_ListLoadedSegments_Call{Call: _e.mock.On("ListLoadedSegments", _a0, _a1)}
}
func (_c *MixCoord_ListLoadedSegments_Call) Run(run func(_a0 context.Context, _a1 *querypb.ListLoadedSegmentsRequest)) *MixCoord_ListLoadedSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*querypb.ListLoadedSegmentsRequest))
})
return _c
}
func (_c *MixCoord_ListLoadedSegments_Call) Return(_a0 *querypb.ListLoadedSegmentsResponse, _a1 error) *MixCoord_ListLoadedSegments_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MixCoord_ListLoadedSegments_Call) RunAndReturn(run func(context.Context, *querypb.ListLoadedSegmentsRequest) (*querypb.ListLoadedSegmentsResponse, error)) *MixCoord_ListLoadedSegments_Call {
_c.Call.Return(run)
return _c
}
// ListPolicy provides a mock function with given fields: _a0, _a1
func (_m *MixCoord) ListPolicy(_a0 context.Context, _a1 *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) {
ret := _m.Called(_a0, _a1)

View File

@ -6596,6 +6596,80 @@ func (_c *MockMixCoordClient_ListIndexes_Call) RunAndReturn(run func(context.Con
return _c
}
// ListLoadedSegments provides a mock function with given fields: ctx, in, opts
func (_m *MockMixCoordClient) ListLoadedSegments(ctx context.Context, in *querypb.ListLoadedSegmentsRequest, opts ...grpc.CallOption) (*querypb.ListLoadedSegmentsResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for ListLoadedSegments")
}
var r0 *querypb.ListLoadedSegmentsResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.ListLoadedSegmentsRequest, ...grpc.CallOption) (*querypb.ListLoadedSegmentsResponse, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.ListLoadedSegmentsRequest, ...grpc.CallOption) *querypb.ListLoadedSegmentsResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*querypb.ListLoadedSegmentsResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.ListLoadedSegmentsRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockMixCoordClient_ListLoadedSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListLoadedSegments'
type MockMixCoordClient_ListLoadedSegments_Call struct {
*mock.Call
}
// ListLoadedSegments is a helper method to define mock.On call
// - ctx context.Context
// - in *querypb.ListLoadedSegmentsRequest
// - opts ...grpc.CallOption
func (_e *MockMixCoordClient_Expecter) ListLoadedSegments(ctx interface{}, in interface{}, opts ...interface{}) *MockMixCoordClient_ListLoadedSegments_Call {
return &MockMixCoordClient_ListLoadedSegments_Call{Call: _e.mock.On("ListLoadedSegments",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockMixCoordClient_ListLoadedSegments_Call) Run(run func(ctx context.Context, in *querypb.ListLoadedSegmentsRequest, opts ...grpc.CallOption)) *MockMixCoordClient_ListLoadedSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*querypb.ListLoadedSegmentsRequest), variadicArgs...)
})
return _c
}
func (_c *MockMixCoordClient_ListLoadedSegments_Call) Return(_a0 *querypb.ListLoadedSegmentsResponse, _a1 error) *MockMixCoordClient_ListLoadedSegments_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockMixCoordClient_ListLoadedSegments_Call) RunAndReturn(run func(context.Context, *querypb.ListLoadedSegmentsRequest, ...grpc.CallOption) (*querypb.ListLoadedSegmentsResponse, error)) *MockMixCoordClient_ListLoadedSegments_Call {
_c.Call.Return(run)
return _c
}
// ListPolicy provides a mock function with given fields: ctx, in, opts
func (_m *MockMixCoordClient) ListPolicy(ctx context.Context, in *internalpb.ListPolicyRequest, opts ...grpc.CallOption) (*internalpb.ListPolicyResponse, error) {
_va := make([]interface{}, len(opts))

View File

@ -964,6 +964,65 @@ func (_c *MockQueryCoord_ListCheckers_Call) RunAndReturn(run func(context.Contex
return _c
}
// ListLoadedSegments provides a mock function with given fields: _a0, _a1
func (_m *MockQueryCoord) ListLoadedSegments(_a0 context.Context, _a1 *querypb.ListLoadedSegmentsRequest) (*querypb.ListLoadedSegmentsResponse, error) {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for ListLoadedSegments")
}
var r0 *querypb.ListLoadedSegmentsResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.ListLoadedSegmentsRequest) (*querypb.ListLoadedSegmentsResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.ListLoadedSegmentsRequest) *querypb.ListLoadedSegmentsResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*querypb.ListLoadedSegmentsResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.ListLoadedSegmentsRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryCoord_ListLoadedSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListLoadedSegments'
type MockQueryCoord_ListLoadedSegments_Call struct {
*mock.Call
}
// ListLoadedSegments is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *querypb.ListLoadedSegmentsRequest
func (_e *MockQueryCoord_Expecter) ListLoadedSegments(_a0 interface{}, _a1 interface{}) *MockQueryCoord_ListLoadedSegments_Call {
return &MockQueryCoord_ListLoadedSegments_Call{Call: _e.mock.On("ListLoadedSegments", _a0, _a1)}
}
func (_c *MockQueryCoord_ListLoadedSegments_Call) Run(run func(_a0 context.Context, _a1 *querypb.ListLoadedSegmentsRequest)) *MockQueryCoord_ListLoadedSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*querypb.ListLoadedSegmentsRequest))
})
return _c
}
func (_c *MockQueryCoord_ListLoadedSegments_Call) Return(_a0 *querypb.ListLoadedSegmentsResponse, _a1 error) *MockQueryCoord_ListLoadedSegments_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryCoord_ListLoadedSegments_Call) RunAndReturn(run func(context.Context, *querypb.ListLoadedSegmentsRequest) (*querypb.ListLoadedSegmentsResponse, error)) *MockQueryCoord_ListLoadedSegments_Call {
_c.Call.Return(run)
return _c
}
// ListQueryNode provides a mock function with given fields: _a0, _a1
func (_m *MockQueryCoord) ListQueryNode(_a0 context.Context, _a1 *querypb.ListQueryNodeRequest) (*querypb.ListQueryNodeResponse, error) {
ret := _m.Called(_a0, _a1)

View File

@ -1186,6 +1186,80 @@ func (_c *MockQueryCoordClient_ListCheckers_Call) RunAndReturn(run func(context.
return _c
}
// ListLoadedSegments provides a mock function with given fields: ctx, in, opts
func (_m *MockQueryCoordClient) ListLoadedSegments(ctx context.Context, in *querypb.ListLoadedSegmentsRequest, opts ...grpc.CallOption) (*querypb.ListLoadedSegmentsResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for ListLoadedSegments")
}
var r0 *querypb.ListLoadedSegmentsResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.ListLoadedSegmentsRequest, ...grpc.CallOption) (*querypb.ListLoadedSegmentsResponse, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.ListLoadedSegmentsRequest, ...grpc.CallOption) *querypb.ListLoadedSegmentsResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*querypb.ListLoadedSegmentsResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.ListLoadedSegmentsRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryCoordClient_ListLoadedSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListLoadedSegments'
type MockQueryCoordClient_ListLoadedSegments_Call struct {
*mock.Call
}
// ListLoadedSegments is a helper method to define mock.On call
// - ctx context.Context
// - in *querypb.ListLoadedSegmentsRequest
// - opts ...grpc.CallOption
func (_e *MockQueryCoordClient_Expecter) ListLoadedSegments(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryCoordClient_ListLoadedSegments_Call {
return &MockQueryCoordClient_ListLoadedSegments_Call{Call: _e.mock.On("ListLoadedSegments",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockQueryCoordClient_ListLoadedSegments_Call) Run(run func(ctx context.Context, in *querypb.ListLoadedSegmentsRequest, opts ...grpc.CallOption)) *MockQueryCoordClient_ListLoadedSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*querypb.ListLoadedSegmentsRequest), variadicArgs...)
})
return _c
}
func (_c *MockQueryCoordClient_ListLoadedSegments_Call) Return(_a0 *querypb.ListLoadedSegmentsResponse, _a1 error) *MockQueryCoordClient_ListLoadedSegments_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryCoordClient_ListLoadedSegments_Call) RunAndReturn(run func(context.Context, *querypb.ListLoadedSegmentsRequest, ...grpc.CallOption) (*querypb.ListLoadedSegmentsResponse, error)) *MockQueryCoordClient_ListLoadedSegments_Call {
_c.Call.Return(run)
return _c
}
// ListQueryNode provides a mock function with given fields: ctx, in, opts
func (_m *MockQueryCoordClient) ListQueryNode(ctx context.Context, in *querypb.ListQueryNodeRequest, opts ...grpc.CallOption) (*querypb.ListQueryNodeResponse, error) {
_va := make([]interface{}, len(opts))

View File

@ -1619,6 +1619,10 @@ func (coord *MixCoordMock) GetQuotaMetrics(ctx context.Context, in *internalpb.G
return &internalpb.GetQuotaMetricsResponse{}, nil
}
func (coord *MixCoordMock) ListLoadedSegments(ctx context.Context, in *querypb.ListLoadedSegmentsRequest, opts ...grpc.CallOption) (*querypb.ListLoadedSegmentsResponse, error) {
return &querypb.ListLoadedSegmentsResponse{}, nil
}
type DescribeCollectionFunc func(ctx context.Context, request *milvuspb.DescribeCollectionRequest, opts ...grpc.CallOption) (*milvuspb.DescribeCollectionResponse, error)
type ShowPartitionsFunc func(ctx context.Context, request *milvuspb.ShowPartitionsRequest, opts ...grpc.CallOption) (*milvuspb.ShowPartitionsResponse, error)

View File

@ -1239,3 +1239,31 @@ func (s *Server) UpdateLoadConfig(ctx context.Context, req *querypb.UpdateLoadCo
return merr.Success(), nil
}
func (s *Server) ListLoadedSegments(ctx context.Context, req *querypb.ListLoadedSegmentsRequest) (*querypb.ListLoadedSegmentsResponse, error) {
if err := merr.CheckHealthy(s.State()); err != nil {
return &querypb.ListLoadedSegmentsResponse{
Status: merr.Status(errors.Wrap(err, "failed to list loaded segments")),
}, nil
}
segmentIDs := typeutil.NewUniqueSet()
collections := s.meta.GetAllCollections(ctx)
for _, collection := range collections {
segments := s.targetMgr.GetSealedSegmentsByCollection(ctx, collection.GetCollectionID(), meta.CurrentTargetFirst)
for _, segment := range segments {
segmentIDs.Insert(segment.ID)
}
}
segments := s.dist.SegmentDistManager.GetByFilter()
for _, segment := range segments {
segmentIDs.Insert(segment.ID)
}
resp := &querypb.ListLoadedSegmentsResponse{
Status: merr.Success(),
SegmentIDs: segmentIDs.Collect(),
}
return resp, nil
}

View File

@ -189,3 +189,7 @@ func (m *GrpcQueryCoordClient) CheckQueryNodeDistribution(ctx context.Context, r
func (m *GrpcQueryCoordClient) UpdateLoadConfig(ctx context.Context, req *querypb.UpdateLoadConfigRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return &commonpb.Status{}, m.Err
}
func (m *GrpcQueryCoordClient) ListLoadedSegments(ctx context.Context, req *querypb.ListLoadedSegmentsRequest, opts ...grpc.CallOption) (*querypb.ListLoadedSegmentsResponse, error) {
return &querypb.ListLoadedSegmentsResponse{}, m.Err
}

View File

@ -79,6 +79,7 @@ service QueryCoord {
rpc DescribeResourceGroup(DescribeResourceGroupRequest)
returns (DescribeResourceGroupResponse) {
}
rpc ListLoadedSegments(ListLoadedSegmentsRequest) returns (ListLoadedSegmentsResponse){}
// ops interfaces
@ -975,4 +976,13 @@ message RunAnalyzerRequest{
bool with_detail = 6;
bool with_hash = 7;
}
message ListLoadedSegmentsRequest {
common.MsgBase base = 1;
}
message ListLoadedSegmentsResponse {
common.Status status = 1;
repeated int64 segmentIDs = 2;
}

File diff suppressed because it is too large Load Diff

View File

@ -44,6 +44,7 @@ const (
QueryCoord_TransferReplica_FullMethodName = "/milvus.proto.query.QueryCoord/TransferReplica"
QueryCoord_ListResourceGroups_FullMethodName = "/milvus.proto.query.QueryCoord/ListResourceGroups"
QueryCoord_DescribeResourceGroup_FullMethodName = "/milvus.proto.query.QueryCoord/DescribeResourceGroup"
QueryCoord_ListLoadedSegments_FullMethodName = "/milvus.proto.query.QueryCoord/ListLoadedSegments"
QueryCoord_ListCheckers_FullMethodName = "/milvus.proto.query.QueryCoord/ListCheckers"
QueryCoord_ActivateChecker_FullMethodName = "/milvus.proto.query.QueryCoord/ActivateChecker"
QueryCoord_DeactivateChecker_FullMethodName = "/milvus.proto.query.QueryCoord/DeactivateChecker"
@ -88,6 +89,7 @@ type QueryCoordClient interface {
TransferReplica(ctx context.Context, in *TransferReplicaRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
ListResourceGroups(ctx context.Context, in *milvuspb.ListResourceGroupsRequest, opts ...grpc.CallOption) (*milvuspb.ListResourceGroupsResponse, error)
DescribeResourceGroup(ctx context.Context, in *DescribeResourceGroupRequest, opts ...grpc.CallOption) (*DescribeResourceGroupResponse, error)
ListLoadedSegments(ctx context.Context, in *ListLoadedSegmentsRequest, opts ...grpc.CallOption) (*ListLoadedSegmentsResponse, error)
// ops interfaces
ListCheckers(ctx context.Context, in *ListCheckersRequest, opts ...grpc.CallOption) (*ListCheckersResponse, error)
ActivateChecker(ctx context.Context, in *ActivateCheckerRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
@ -311,6 +313,15 @@ func (c *queryCoordClient) DescribeResourceGroup(ctx context.Context, in *Descri
return out, nil
}
func (c *queryCoordClient) ListLoadedSegments(ctx context.Context, in *ListLoadedSegmentsRequest, opts ...grpc.CallOption) (*ListLoadedSegmentsResponse, error) {
out := new(ListLoadedSegmentsResponse)
err := c.cc.Invoke(ctx, QueryCoord_ListLoadedSegments_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *queryCoordClient) ListCheckers(ctx context.Context, in *ListCheckersRequest, opts ...grpc.CallOption) (*ListCheckersResponse, error) {
out := new(ListCheckersResponse)
err := c.cc.Invoke(ctx, QueryCoord_ListCheckers_FullMethodName, in, out, opts...)
@ -465,6 +476,7 @@ type QueryCoordServer interface {
TransferReplica(context.Context, *TransferReplicaRequest) (*commonpb.Status, error)
ListResourceGroups(context.Context, *milvuspb.ListResourceGroupsRequest) (*milvuspb.ListResourceGroupsResponse, error)
DescribeResourceGroup(context.Context, *DescribeResourceGroupRequest) (*DescribeResourceGroupResponse, error)
ListLoadedSegments(context.Context, *ListLoadedSegmentsRequest) (*ListLoadedSegmentsResponse, error)
// ops interfaces
ListCheckers(context.Context, *ListCheckersRequest) (*ListCheckersResponse, error)
ActivateChecker(context.Context, *ActivateCheckerRequest) (*commonpb.Status, error)
@ -552,6 +564,9 @@ func (UnimplementedQueryCoordServer) ListResourceGroups(context.Context, *milvus
func (UnimplementedQueryCoordServer) DescribeResourceGroup(context.Context, *DescribeResourceGroupRequest) (*DescribeResourceGroupResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method DescribeResourceGroup not implemented")
}
func (UnimplementedQueryCoordServer) ListLoadedSegments(context.Context, *ListLoadedSegmentsRequest) (*ListLoadedSegmentsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListLoadedSegments not implemented")
}
func (UnimplementedQueryCoordServer) ListCheckers(context.Context, *ListCheckersRequest) (*ListCheckersResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListCheckers not implemented")
}
@ -1002,6 +1017,24 @@ func _QueryCoord_DescribeResourceGroup_Handler(srv interface{}, ctx context.Cont
return interceptor(ctx, in, info, handler)
}
func _QueryCoord_ListLoadedSegments_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListLoadedSegmentsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(QueryCoordServer).ListLoadedSegments(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: QueryCoord_ListLoadedSegments_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(QueryCoordServer).ListLoadedSegments(ctx, req.(*ListLoadedSegmentsRequest))
}
return interceptor(ctx, in, info, handler)
}
func _QueryCoord_ListCheckers_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListCheckersRequest)
if err := dec(in); err != nil {
@ -1349,6 +1382,10 @@ var QueryCoord_ServiceDesc = grpc.ServiceDesc{
MethodName: "DescribeResourceGroup",
Handler: _QueryCoord_DescribeResourceGroup_Handler,
},
{
MethodName: "ListLoadedSegments",
Handler: _QueryCoord_ListLoadedSegments_Handler,
},
{
MethodName: "ListCheckers",
Handler: _QueryCoord_ListCheckers_Handler,