diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index 7c41369b04..86cfb06453 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -74,7 +75,7 @@ func (b *BalanceChecker) Description() string { func (b *BalanceChecker) readyToCheck(collectionID int64) bool { metaExist := (b.meta.GetCollection(collectionID) != nil) - targetExist := b.targetMgr.IsNextTargetExist(collectionID) || b.targetMgr.IsCurrentTargetExist(collectionID) + targetExist := b.targetMgr.IsNextTargetExist(collectionID) || b.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID) return metaExist && targetExist } diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index f60381286b..324525a6fc 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -70,7 +71,7 @@ func (c *ChannelChecker) Description() string { func (c *ChannelChecker) readyToCheck(collectionID int64) bool { metaExist := (c.meta.GetCollection(collectionID) != nil) - targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID) + targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID) return metaExist && targetExist } diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go index 5486a1d3e0..0eb0d6dd1b 100644 --- a/internal/querycoordv2/checkers/leader_checker.go +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" ) @@ -66,7 +67,7 @@ func (c *LeaderChecker) Description() string { func (c *LeaderChecker) readyToCheck(collectionID int64) bool { metaExist := (c.meta.GetCollection(collectionID) != nil) - targetExist := c.target.IsNextTargetExist(collectionID) || c.target.IsCurrentTargetExist(collectionID) + targetExist := c.target.IsNextTargetExist(collectionID) || c.target.IsCurrentTargetExist(collectionID, common.AllPartitionsID) return metaExist && targetExist } diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index e884e56ff3..d669758790 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" ) @@ -75,7 +76,7 @@ func (c *SegmentChecker) Description() string { func (c *SegmentChecker) readyToCheck(collectionID int64) bool { metaExist := (c.meta.GetCollection(collectionID) != nil) - targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID) + targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID) return metaExist && targetExist } diff --git a/internal/querycoordv2/meta/mock_target_manager.go b/internal/querycoordv2/meta/mock_target_manager.go index 5728fd2903..3637cc4204 100644 --- a/internal/querycoordv2/meta/mock_target_manager.go +++ b/internal/querycoordv2/meta/mock_target_manager.go @@ -478,13 +478,13 @@ func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) RunAndReturn(run return _c } -// IsCurrentTargetExist provides a mock function with given fields: collectionID -func (_m *MockTargetManager) IsCurrentTargetExist(collectionID int64) bool { - ret := _m.Called(collectionID) +// IsCurrentTargetExist provides a mock function with given fields: collectionID, partitionID +func (_m *MockTargetManager) IsCurrentTargetExist(collectionID int64, partitionID int64) bool { + ret := _m.Called(collectionID, partitionID) var r0 bool - if rf, ok := ret.Get(0).(func(int64) bool); ok { - r0 = rf(collectionID) + if rf, ok := ret.Get(0).(func(int64, int64) bool); ok { + r0 = rf(collectionID, partitionID) } else { r0 = ret.Get(0).(bool) } @@ -499,13 +499,14 @@ type MockTargetManager_IsCurrentTargetExist_Call struct { // IsCurrentTargetExist is a helper method to define mock.On call // - collectionID int64 -func (_e *MockTargetManager_Expecter) IsCurrentTargetExist(collectionID interface{}) *MockTargetManager_IsCurrentTargetExist_Call { - return &MockTargetManager_IsCurrentTargetExist_Call{Call: _e.mock.On("IsCurrentTargetExist", collectionID)} +// - partitionID int64 +func (_e *MockTargetManager_Expecter) IsCurrentTargetExist(collectionID interface{}, partitionID interface{}) *MockTargetManager_IsCurrentTargetExist_Call { + return &MockTargetManager_IsCurrentTargetExist_Call{Call: _e.mock.On("IsCurrentTargetExist", collectionID, partitionID)} } -func (_c *MockTargetManager_IsCurrentTargetExist_Call) Run(run func(collectionID int64)) *MockTargetManager_IsCurrentTargetExist_Call { +func (_c *MockTargetManager_IsCurrentTargetExist_Call) Run(run func(collectionID int64, partitionID int64)) *MockTargetManager_IsCurrentTargetExist_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64)) + run(args[0].(int64), args[1].(int64)) }) return _c } @@ -515,7 +516,7 @@ func (_c *MockTargetManager_IsCurrentTargetExist_Call) Return(_a0 bool) *MockTar return _c } -func (_c *MockTargetManager_IsCurrentTargetExist_Call) RunAndReturn(run func(int64) bool) *MockTargetManager_IsCurrentTargetExist_Call { +func (_c *MockTargetManager_IsCurrentTargetExist_Call) RunAndReturn(run func(int64, int64) bool) *MockTargetManager_IsCurrentTargetExist_Call { _c.Call.Return(run) return _c } diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index b7fc06930a..d924e7cbc5 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -23,19 +23,22 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // CollectionTarget collection target is immutable, type CollectionTarget struct { segments map[int64]*datapb.SegmentInfo dmChannels map[string]*DmChannel + partitions typeutil.Set[int64] // stores target partitions info version int64 } -func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel) *CollectionTarget { +func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget { return &CollectionTarget{ segments: segments, dmChannels: dmChannels, + partitions: typeutil.NewSet(partitionIDs...), version: time.Now().UnixNano(), } } @@ -43,6 +46,7 @@ func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget { segments := make(map[int64]*datapb.SegmentInfo) dmChannels := make(map[string]*DmChannel) + var partitions []int64 for _, t := range target.GetChannelTargets() { for _, partition := range t.GetPartitionTargets() { @@ -55,6 +59,7 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget InsertChannel: t.GetChannelName(), } } + partitions = append(partitions, partition.GetPartitionID()) } dmChannels[t.GetChannelName()] = &DmChannel{ VchannelInfo: &datapb.VchannelInfo{ @@ -68,7 +73,7 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget } } - return NewCollectionTarget(segments, dmChannels) + return NewCollectionTarget(segments, dmChannels, partitions) } func (p *CollectionTarget) toPbMsg() *querypb.CollectionTarget { diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 376fd37569..310ad2dcb0 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -67,7 +67,7 @@ type TargetManagerInterface interface { GetDmChannel(collectionID int64, channel string, scope TargetScope) *DmChannel GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64 - IsCurrentTargetExist(collectionID int64) bool + IsCurrentTargetExist(collectionID int64, partitionID int64) bool IsNextTargetExist(collectionID int64) bool SaveCurrentTarget(catalog metastore.QueryCoordCatalog) Recover(catalog metastore.QueryCoordCatalog) error @@ -136,7 +136,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 { return partition.PartitionID }) - allocatedTarget := NewCollectionTarget(nil, nil) + allocatedTarget := NewCollectionTarget(nil, nil, partitionIDs) mgr.rwMutex.Unlock() log := log.With(zap.Int64("collectionID", collectionID), @@ -373,8 +373,11 @@ func (mgr *TargetManager) removePartitionFromCollectionTarget(oldTarget *Collect for _, channel := range oldTarget.GetAllDmChannels() { channels[channel.GetChannelName()] = channel } + partitions := lo.Filter(oldTarget.partitions.Collect(), func(partitionID int64, _ int) bool { + return !partitionSet.Contain(partitionID) + }) - return NewCollectionTarget(segments, channels) + return NewCollectionTarget(segments, channels, partitions) } func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID int64) []*CollectionTarget { @@ -604,10 +607,13 @@ func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope T return 0 } -func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64) bool { - newChannels := mgr.GetDmChannelsByCollection(collectionID, CurrentTarget) +func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64, partitionID int64) bool { + mgr.rwMutex.RLock() + defer mgr.rwMutex.RUnlock() - return len(newChannels) > 0 + targets := mgr.getCollectionTarget(CurrentTarget, collectionID) + + return len(targets) > 0 && (targets[0].partitions.Contain(partitionID) || partitionID == common.AllPartitionsID) && len(targets[0].dmChannels) > 0 } func (mgr *TargetManager) IsNextTargetExist(collectionID int64) bool { diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index 5aad9d8a7c..894660638d 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -346,7 +346,7 @@ func (suite *TargetManagerSuite) assertSegments(expected []int64, actual map[int func (suite *TargetManagerSuite) TestGetCollectionTargetVersion() { t1 := time.Now().UnixNano() - target := NewCollectionTarget(nil, nil) + target := NewCollectionTarget(nil, nil, nil) t2 := time.Now().UnixNano() version := target.GetTargetVersion() diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index ee54517b7a..9080cee8f0 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/eventlog" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -221,7 +222,7 @@ func (ob *CollectionObserver) observeTimeout() { func (ob *CollectionObserver) readyToObserve(collectionID int64) bool { metaExist := (ob.meta.GetCollection(collectionID) != nil) - targetExist := ob.targetMgr.IsNextTargetExist(collectionID) || ob.targetMgr.IsCurrentTargetExist(collectionID) + targetExist := ob.targetMgr.IsNextTargetExist(collectionID) || ob.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID) return metaExist && targetExist } @@ -332,7 +333,7 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount if loadPercentage == 100 { - if !ob.targetObserver.Check(ctx, partition.GetCollectionID()) { + if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) { log.Warn("failed to manual check current target, skip update load status") return } diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 83fd5a3248..7d3087b83d 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -172,8 +172,8 @@ func (ob *TargetObserver) schedule(ctx context.Context) { // Check whether provided collection is has current target. // If not, submit an async task into dispatcher. -func (ob *TargetObserver) Check(ctx context.Context, collectionID int64) bool { - result := ob.targetMgr.IsCurrentTargetExist(collectionID) +func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool { + result := ob.targetMgr.IsCurrentTargetExist(collectionID, partitionID) if !result { ob.dispatcher.AddTask(collectionID) } diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 4116b06b8a..825a2b28bb 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -32,6 +32,7 @@ import ( . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -284,7 +285,7 @@ func (suite *TargetObserverCheckSuite) SetupTest() { } func (s *TargetObserverCheckSuite) TestCheck() { - r := s.observer.Check(context.Background(), s.collectionID) + r := s.observer.Check(context.Background(), s.collectionID, common.AllPartitionsID) s.False(r) s.True(s.observer.dispatcher.tasks.Contain(s.collectionID)) }