diff --git a/internal/querycoordv2/meta/mock_target_manager.go b/internal/querycoordv2/meta/mock_target_manager.go index 95c8a1175d..b396afb33f 100644 --- a/internal/querycoordv2/meta/mock_target_manager.go +++ b/internal/querycoordv2/meta/mock_target_manager.go @@ -658,172 +658,6 @@ func (_c *MockTargetManager_IsNextTargetExist_Call) RunAndReturn(run func(int64) return _c } -// PullNextTargetV1 provides a mock function with given fields: broker, collectionID, chosenPartitionIDs -func (_m *MockTargetManager) PullNextTargetV1(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) { - _va := make([]interface{}, len(chosenPartitionIDs)) - for _i := range chosenPartitionIDs { - _va[_i] = chosenPartitionIDs[_i] - } - var _ca []interface{} - _ca = append(_ca, broker, collectionID) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - if len(ret) == 0 { - panic("no return value specified for PullNextTargetV1") - } - - var r0 map[int64]*datapb.SegmentInfo - var r1 map[string]*DmChannel - var r2 error - if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)); ok { - return rf(broker, collectionID, chosenPartitionIDs...) - } - if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) map[int64]*datapb.SegmentInfo); ok { - r0 = rf(broker, collectionID, chosenPartitionIDs...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo) - } - } - - if rf, ok := ret.Get(1).(func(Broker, int64, ...int64) map[string]*DmChannel); ok { - r1 = rf(broker, collectionID, chosenPartitionIDs...) - } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).(map[string]*DmChannel) - } - } - - if rf, ok := ret.Get(2).(func(Broker, int64, ...int64) error); ok { - r2 = rf(broker, collectionID, chosenPartitionIDs...) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - -// MockTargetManager_PullNextTargetV1_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PullNextTargetV1' -type MockTargetManager_PullNextTargetV1_Call struct { - *mock.Call -} - -// PullNextTargetV1 is a helper method to define mock.On call -// - broker Broker -// - collectionID int64 -// - chosenPartitionIDs ...int64 -func (_e *MockTargetManager_Expecter) PullNextTargetV1(broker interface{}, collectionID interface{}, chosenPartitionIDs ...interface{}) *MockTargetManager_PullNextTargetV1_Call { - return &MockTargetManager_PullNextTargetV1_Call{Call: _e.mock.On("PullNextTargetV1", - append([]interface{}{broker, collectionID}, chosenPartitionIDs...)...)} -} - -func (_c *MockTargetManager_PullNextTargetV1_Call) Run(run func(broker Broker, collectionID int64, chosenPartitionIDs ...int64)) *MockTargetManager_PullNextTargetV1_Call { - _c.Call.Run(func(args mock.Arguments) { - variadicArgs := make([]int64, len(args)-2) - for i, a := range args[2:] { - if a != nil { - variadicArgs[i] = a.(int64) - } - } - run(args[0].(Broker), args[1].(int64), variadicArgs...) - }) - return _c -} - -func (_c *MockTargetManager_PullNextTargetV1_Call) Return(_a0 map[int64]*datapb.SegmentInfo, _a1 map[string]*DmChannel, _a2 error) *MockTargetManager_PullNextTargetV1_Call { - _c.Call.Return(_a0, _a1, _a2) - return _c -} - -func (_c *MockTargetManager_PullNextTargetV1_Call) RunAndReturn(run func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)) *MockTargetManager_PullNextTargetV1_Call { - _c.Call.Return(run) - return _c -} - -// PullNextTargetV2 provides a mock function with given fields: broker, collectionID, chosenPartitionIDs -func (_m *MockTargetManager) PullNextTargetV2(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) { - _va := make([]interface{}, len(chosenPartitionIDs)) - for _i := range chosenPartitionIDs { - _va[_i] = chosenPartitionIDs[_i] - } - var _ca []interface{} - _ca = append(_ca, broker, collectionID) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - if len(ret) == 0 { - panic("no return value specified for PullNextTargetV2") - } - - var r0 map[int64]*datapb.SegmentInfo - var r1 map[string]*DmChannel - var r2 error - if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)); ok { - return rf(broker, collectionID, chosenPartitionIDs...) - } - if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) map[int64]*datapb.SegmentInfo); ok { - r0 = rf(broker, collectionID, chosenPartitionIDs...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo) - } - } - - if rf, ok := ret.Get(1).(func(Broker, int64, ...int64) map[string]*DmChannel); ok { - r1 = rf(broker, collectionID, chosenPartitionIDs...) - } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).(map[string]*DmChannel) - } - } - - if rf, ok := ret.Get(2).(func(Broker, int64, ...int64) error); ok { - r2 = rf(broker, collectionID, chosenPartitionIDs...) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - -// MockTargetManager_PullNextTargetV2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PullNextTargetV2' -type MockTargetManager_PullNextTargetV2_Call struct { - *mock.Call -} - -// PullNextTargetV2 is a helper method to define mock.On call -// - broker Broker -// - collectionID int64 -// - chosenPartitionIDs ...int64 -func (_e *MockTargetManager_Expecter) PullNextTargetV2(broker interface{}, collectionID interface{}, chosenPartitionIDs ...interface{}) *MockTargetManager_PullNextTargetV2_Call { - return &MockTargetManager_PullNextTargetV2_Call{Call: _e.mock.On("PullNextTargetV2", - append([]interface{}{broker, collectionID}, chosenPartitionIDs...)...)} -} - -func (_c *MockTargetManager_PullNextTargetV2_Call) Run(run func(broker Broker, collectionID int64, chosenPartitionIDs ...int64)) *MockTargetManager_PullNextTargetV2_Call { - _c.Call.Run(func(args mock.Arguments) { - variadicArgs := make([]int64, len(args)-2) - for i, a := range args[2:] { - if a != nil { - variadicArgs[i] = a.(int64) - } - } - run(args[0].(Broker), args[1].(int64), variadicArgs...) - }) - return _c -} - -func (_c *MockTargetManager_PullNextTargetV2_Call) Return(_a0 map[int64]*datapb.SegmentInfo, _a1 map[string]*DmChannel, _a2 error) *MockTargetManager_PullNextTargetV2_Call { - _c.Call.Return(_a0, _a1, _a2) - return _c -} - -func (_c *MockTargetManager_PullNextTargetV2_Call) RunAndReturn(run func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)) *MockTargetManager_PullNextTargetV2_Call { - _c.Call.Return(run) - return _c -} - // Recover provides a mock function with given fields: catalog func (_m *MockTargetManager) Recover(catalog metastore.QueryCoordCatalog) error { ret := _m.Called(catalog) diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index f6851f2326..6f05d4c96e 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -22,7 +22,6 @@ import ( "runtime" "sync" - "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" @@ -34,7 +33,6 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/conc" - "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -53,8 +51,6 @@ const ( type TargetManagerInterface interface { UpdateCollectionCurrentTarget(collectionID int64) bool UpdateCollectionNextTarget(collectionID int64) error - PullNextTargetV1(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) - PullNextTargetV2(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) RemoveCollection(collectionID int64) RemovePartition(collectionID int64, partitionIDs ...int64) GetGrowingSegmentsByCollection(collectionID int64, scope TargetScope) typeutil.UniqueSet @@ -140,150 +136,70 @@ func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool // WARN: DO NOT call this method for an existing collection as target observer running, or it will lead to a double-update, // which may make the current target not available func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { + var vChannelInfos []*datapb.VchannelInfo + var segmentInfos []*datapb.SegmentInfo + err := retry.Handle(context.TODO(), func() (bool, error) { + var err error + vChannelInfos, segmentInfos, err = mgr.broker.GetRecoveryInfoV2(context.TODO(), collectionID) + if err != nil { + return true, err + } + return false, nil + }, retry.Attempts(10)) + if err != nil { + log.Warn("failed to get next targets for collection", zap.Int64("collectionID", collectionID), zap.Error(err)) + return err + } + mgr.rwMutex.Lock() + defer mgr.rwMutex.Unlock() partitions := mgr.meta.GetPartitionsByCollection(collectionID) partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 { return partition.PartitionID }) allocatedTarget := NewCollectionTarget(nil, nil, partitionIDs) - mgr.rwMutex.Unlock() - log := log.With(zap.Int64("collectionID", collectionID), - zap.Int64s("PartitionIDs", partitionIDs)) - segments, channels, err := mgr.PullNextTargetV2(mgr.broker, collectionID, partitionIDs...) - if err != nil { - log.Warn("failed to get next targets for collection", zap.Error(err)) - return err - } - - if len(segments) == 0 && len(channels) == 0 { - log.Debug("skip empty next targets for collection") - return nil - } - allocatedTarget.segments = segments - allocatedTarget.dmChannels = channels - - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() - mgr.next.updateCollectionTarget(collectionID, allocatedTarget) - log.Debug("finish to update next targets for collection", - zap.Int64s("segments", allocatedTarget.GetAllSegmentIDs()), - zap.Strings("channels", allocatedTarget.GetAllDmChannelNames())) - - return nil -} - -func (mgr *TargetManager) PullNextTargetV1(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) { - if len(chosenPartitionIDs) == 0 { - return nil, nil, nil - } channelInfos := make(map[string][]*datapb.VchannelInfo) segments := make(map[int64]*datapb.SegmentInfo, 0) dmChannels := make(map[string]*DmChannel) - fullPartitions, err := broker.GetPartitions(context.Background(), collectionID) - if err != nil { - return nil, nil, err - } - - // we should pull `channel targets` from all partitions because QueryNodes need to load - // the complete growing segments. And we should pull `segments targets` only from the chosen partitions. - for _, partitionID := range fullPartitions { - log.Debug("get recovery info...", - zap.Int64("collectionID", collectionID), - zap.Int64("partitionID", partitionID)) - vChannelInfos, binlogs, err := broker.GetRecoveryInfo(context.TODO(), collectionID, partitionID) - if err != nil { - return nil, nil, err - } - for _, info := range vChannelInfos { - channelInfos[info.GetChannelName()] = append(channelInfos[info.GetChannelName()], info) - } - if !lo.Contains(chosenPartitionIDs, partitionID) { - continue - } - for _, binlog := range binlogs { - segments[binlog.GetSegmentID()] = &datapb.SegmentInfo{ - ID: binlog.GetSegmentID(), + for _, info := range vChannelInfos { + channelInfos[info.GetChannelName()] = append(channelInfos[info.GetChannelName()], info) + for _, segmentID := range info.GetLevelZeroSegmentIds() { + segments[segmentID] = &datapb.SegmentInfo{ + ID: segmentID, CollectionID: collectionID, - PartitionID: partitionID, - InsertChannel: binlog.GetInsertChannel(), - NumOfRows: binlog.GetNumOfRows(), - Binlogs: binlog.GetFieldBinlogs(), - Statslogs: binlog.GetStatslogs(), - Deltalogs: binlog.GetDeltalogs(), + InsertChannel: info.GetChannelName(), + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L0, } } } + partitionSet := typeutil.NewUniqueSet(partitionIDs...) + for _, segmentInfo := range segmentInfos { + if partitionSet.Contain(segmentInfo.GetPartitionID()) || segmentInfo.GetPartitionID() == common.AllPartitionsID { + segments[segmentInfo.GetID()] = segmentInfo + } + } for _, infos := range channelInfos { merged := mgr.mergeDmChannelInfo(infos) dmChannels[merged.GetChannelName()] = merged } - return segments, dmChannels, nil -} - -func (mgr *TargetManager) PullNextTargetV2(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) { - log.Debug("start to pull next targets for collection", - zap.Int64("collectionID", collectionID), - zap.Int64s("chosenPartitionIDs", chosenPartitionIDs)) - - if len(chosenPartitionIDs) == 0 { - return nil, nil, nil - } - - channelInfos := make(map[string][]*datapb.VchannelInfo) - segments := make(map[int64]*datapb.SegmentInfo, 0) - dmChannels := make(map[string]*DmChannel) - - getRecoveryInfo := func() error { - var err error - - vChannelInfos, segmentInfos, err := broker.GetRecoveryInfoV2(context.TODO(), collectionID) - if err != nil { - // if meet rpc error, for compatibility with previous versions, try pull next target v1 - if errors.Is(err, merr.ErrServiceUnimplemented) { - segments, dmChannels, err = mgr.PullNextTargetV1(broker, collectionID, chosenPartitionIDs...) - return err - } - - return err - } - - for _, info := range vChannelInfos { - channelInfos[info.GetChannelName()] = append(channelInfos[info.GetChannelName()], info) - for _, segmentID := range info.GetLevelZeroSegmentIds() { - segments[segmentID] = &datapb.SegmentInfo{ - ID: segmentID, - CollectionID: collectionID, - InsertChannel: info.GetChannelName(), - State: commonpb.SegmentState_Flushed, - Level: datapb.SegmentLevel_L0, - } - } - } - - partitionSet := typeutil.NewUniqueSet(chosenPartitionIDs...) - for _, segmentInfo := range segmentInfos { - if partitionSet.Contain(segmentInfo.GetPartitionID()) || segmentInfo.GetPartitionID() == common.AllPartitionsID { - segments[segmentInfo.GetID()] = segmentInfo - } - } - - for _, infos := range channelInfos { - merged := mgr.mergeDmChannelInfo(infos) - dmChannels[merged.GetChannelName()] = merged - } + if len(segments) == 0 && len(dmChannels) == 0 { + log.Debug("skip empty next targets for collection", zap.Int64("collectionID", collectionID), zap.Int64s("PartitionIDs", partitionIDs)) return nil } - err := retry.Do(context.TODO(), getRecoveryInfo, retry.Attempts(10)) - if err != nil { - return nil, nil, err - } + mgr.next.updateCollectionTarget(collectionID, NewCollectionTarget(segments, dmChannels, partitionIDs)) + log.Debug("finish to update next targets for collection", + zap.Int64("collectionID", collectionID), + zap.Int64s("PartitionIDs", partitionIDs), + zap.Int64s("segments", allocatedTarget.GetAllSegmentIDs()), + zap.Strings("channels", allocatedTarget.GetAllDmChannelNames())) - return segments, dmChannels, nil + return nil } func (mgr *TargetManager) mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel { diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index 17d5fad327..3e3954b24d 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -24,8 +24,6 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore" @@ -36,7 +34,6 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/util/etcd" - "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -224,17 +221,6 @@ func (suite *TargetManagerSuite) TestUpdateNextTarget() { }, } - nextTargetBinlogs := []*datapb.SegmentBinlogs{ - { - SegmentID: 11, - InsertChannel: "channel-1", - }, - { - SegmentID: 12, - InsertChannel: "channel-2", - }, - } - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nextTargetChannels, nextTargetSegments, nil) suite.mgr.UpdateCollectionNextTarget(collectionID) suite.assertSegments([]int64{11, 12}, suite.mgr.GetSealedSegmentsByCollection(collectionID, NextTarget)) @@ -242,20 +228,11 @@ func (suite *TargetManagerSuite) TestUpdateNextTarget() { suite.assertSegments([]int64{}, suite.mgr.GetSealedSegmentsByCollection(collectionID, CurrentTarget)) suite.assertChannels([]string{}, suite.mgr.GetDmChannelsByCollection(collectionID, CurrentTarget)) - suite.broker.ExpectedCalls = nil - // test getRecoveryInfoV2 failed , then back to getRecoveryInfo succeed - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return( - nil, nil, merr.WrapErrServiceUnimplemented(status.Errorf(codes.Unimplemented, "fake not found"))) - suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return([]int64{1}, nil) - suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, collectionID, int64(1)).Return(nextTargetChannels, nextTargetBinlogs, nil) - err := suite.mgr.UpdateCollectionNextTarget(collectionID) - suite.NoError(err) - suite.broker.ExpectedCalls = nil // test getRecoveryInfoV2 failed , then retry getRecoveryInfoV2 succeed suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nil, nil, errors.New("fake error")).Times(1) suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nextTargetChannels, nextTargetSegments, nil) - err = suite.mgr.UpdateCollectionNextTarget(collectionID) + err := suite.mgr.UpdateCollectionNextTarget(collectionID) suite.NoError(err) err = suite.mgr.UpdateCollectionNextTarget(collectionID)