From 4aa8a12ce85d09f1f353536aba87d7eda71b9103 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 2 Jul 2024 15:48:10 +0800 Subject: [PATCH] fix: [2.4] Check partition in current target when observing partition load status (#34282) (#34305) Cherry-pick from master pr: #34282 See also #34234 `LoadPartitions` does not guarantee the current target has loading partitions if there are some partitions already loaded before. This PR check current target contains the partition to load when advancing loading percentage to 100. Signed-off-by: Congqi Xia --- .../querycoordv2/checkers/balance_checker.go | 3 ++- .../querycoordv2/checkers/channel_checker.go | 3 ++- .../querycoordv2/checkers/leader_checker.go | 3 ++- .../querycoordv2/checkers/segment_checker.go | 3 ++- .../querycoordv2/meta/mock_target_manager.go | 21 ++++++++++--------- internal/querycoordv2/meta/target.go | 9 ++++++-- internal/querycoordv2/meta/target_manager.go | 18 ++++++++++------ .../querycoordv2/meta/target_manager_test.go | 2 +- .../observers/collection_observer.go | 5 +++-- .../querycoordv2/observers/target_observer.go | 6 +++--- .../observers/target_observer_test.go | 3 ++- 11 files changed, 47 insertions(+), 29 deletions(-) diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index 05a682bfed..d300bda51f 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -74,7 +75,7 @@ func (b *BalanceChecker) Description() string { func (b *BalanceChecker) readyToCheck(collectionID int64) bool { metaExist := (b.meta.GetCollection(collectionID) != nil) - targetExist := b.targetMgr.IsNextTargetExist(collectionID) || b.targetMgr.IsCurrentTargetExist(collectionID) + targetExist := b.targetMgr.IsNextTargetExist(collectionID) || b.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID) return metaExist && targetExist } diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index f60381286b..324525a6fc 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -70,7 +71,7 @@ func (c *ChannelChecker) Description() string { func (c *ChannelChecker) readyToCheck(collectionID int64) bool { metaExist := (c.meta.GetCollection(collectionID) != nil) - targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID) + targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID) return metaExist && targetExist } diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go index c3005844fc..5c0355adab 100644 --- a/internal/querycoordv2/checkers/leader_checker.go +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -68,7 +69,7 @@ func (c *LeaderChecker) Description() string { func (c *LeaderChecker) readyToCheck(collectionID int64) bool { metaExist := (c.meta.GetCollection(collectionID) != nil) - targetExist := c.target.IsNextTargetExist(collectionID) || c.target.IsCurrentTargetExist(collectionID) + targetExist := c.target.IsNextTargetExist(collectionID) || c.target.IsCurrentTargetExist(collectionID, common.AllPartitionsID) return metaExist && targetExist } diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index e884e56ff3..d669758790 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" ) @@ -75,7 +76,7 @@ func (c *SegmentChecker) Description() string { func (c *SegmentChecker) readyToCheck(collectionID int64) bool { metaExist := (c.meta.GetCollection(collectionID) != nil) - targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID) + targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID) return metaExist && targetExist } diff --git a/internal/querycoordv2/meta/mock_target_manager.go b/internal/querycoordv2/meta/mock_target_manager.go index 5728fd2903..3637cc4204 100644 --- a/internal/querycoordv2/meta/mock_target_manager.go +++ b/internal/querycoordv2/meta/mock_target_manager.go @@ -478,13 +478,13 @@ func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) RunAndReturn(run return _c } -// IsCurrentTargetExist provides a mock function with given fields: collectionID -func (_m *MockTargetManager) IsCurrentTargetExist(collectionID int64) bool { - ret := _m.Called(collectionID) +// IsCurrentTargetExist provides a mock function with given fields: collectionID, partitionID +func (_m *MockTargetManager) IsCurrentTargetExist(collectionID int64, partitionID int64) bool { + ret := _m.Called(collectionID, partitionID) var r0 bool - if rf, ok := ret.Get(0).(func(int64) bool); ok { - r0 = rf(collectionID) + if rf, ok := ret.Get(0).(func(int64, int64) bool); ok { + r0 = rf(collectionID, partitionID) } else { r0 = ret.Get(0).(bool) } @@ -499,13 +499,14 @@ type MockTargetManager_IsCurrentTargetExist_Call struct { // IsCurrentTargetExist is a helper method to define mock.On call // - collectionID int64 -func (_e *MockTargetManager_Expecter) IsCurrentTargetExist(collectionID interface{}) *MockTargetManager_IsCurrentTargetExist_Call { - return &MockTargetManager_IsCurrentTargetExist_Call{Call: _e.mock.On("IsCurrentTargetExist", collectionID)} +// - partitionID int64 +func (_e *MockTargetManager_Expecter) IsCurrentTargetExist(collectionID interface{}, partitionID interface{}) *MockTargetManager_IsCurrentTargetExist_Call { + return &MockTargetManager_IsCurrentTargetExist_Call{Call: _e.mock.On("IsCurrentTargetExist", collectionID, partitionID)} } -func (_c *MockTargetManager_IsCurrentTargetExist_Call) Run(run func(collectionID int64)) *MockTargetManager_IsCurrentTargetExist_Call { +func (_c *MockTargetManager_IsCurrentTargetExist_Call) Run(run func(collectionID int64, partitionID int64)) *MockTargetManager_IsCurrentTargetExist_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64)) + run(args[0].(int64), args[1].(int64)) }) return _c } @@ -515,7 +516,7 @@ func (_c *MockTargetManager_IsCurrentTargetExist_Call) Return(_a0 bool) *MockTar return _c } -func (_c *MockTargetManager_IsCurrentTargetExist_Call) RunAndReturn(run func(int64) bool) *MockTargetManager_IsCurrentTargetExist_Call { +func (_c *MockTargetManager_IsCurrentTargetExist_Call) RunAndReturn(run func(int64, int64) bool) *MockTargetManager_IsCurrentTargetExist_Call { _c.Call.Return(run) return _c } diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index b7fc06930a..d924e7cbc5 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -23,19 +23,22 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // CollectionTarget collection target is immutable, type CollectionTarget struct { segments map[int64]*datapb.SegmentInfo dmChannels map[string]*DmChannel + partitions typeutil.Set[int64] // stores target partitions info version int64 } -func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel) *CollectionTarget { +func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget { return &CollectionTarget{ segments: segments, dmChannels: dmChannels, + partitions: typeutil.NewSet(partitionIDs...), version: time.Now().UnixNano(), } } @@ -43,6 +46,7 @@ func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget { segments := make(map[int64]*datapb.SegmentInfo) dmChannels := make(map[string]*DmChannel) + var partitions []int64 for _, t := range target.GetChannelTargets() { for _, partition := range t.GetPartitionTargets() { @@ -55,6 +59,7 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget InsertChannel: t.GetChannelName(), } } + partitions = append(partitions, partition.GetPartitionID()) } dmChannels[t.GetChannelName()] = &DmChannel{ VchannelInfo: &datapb.VchannelInfo{ @@ -68,7 +73,7 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget } } - return NewCollectionTarget(segments, dmChannels) + return NewCollectionTarget(segments, dmChannels, partitions) } func (p *CollectionTarget) toPbMsg() *querypb.CollectionTarget { diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 376fd37569..310ad2dcb0 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -67,7 +67,7 @@ type TargetManagerInterface interface { GetDmChannel(collectionID int64, channel string, scope TargetScope) *DmChannel GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64 - IsCurrentTargetExist(collectionID int64) bool + IsCurrentTargetExist(collectionID int64, partitionID int64) bool IsNextTargetExist(collectionID int64) bool SaveCurrentTarget(catalog metastore.QueryCoordCatalog) Recover(catalog metastore.QueryCoordCatalog) error @@ -136,7 +136,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 { return partition.PartitionID }) - allocatedTarget := NewCollectionTarget(nil, nil) + allocatedTarget := NewCollectionTarget(nil, nil, partitionIDs) mgr.rwMutex.Unlock() log := log.With(zap.Int64("collectionID", collectionID), @@ -373,8 +373,11 @@ func (mgr *TargetManager) removePartitionFromCollectionTarget(oldTarget *Collect for _, channel := range oldTarget.GetAllDmChannels() { channels[channel.GetChannelName()] = channel } + partitions := lo.Filter(oldTarget.partitions.Collect(), func(partitionID int64, _ int) bool { + return !partitionSet.Contain(partitionID) + }) - return NewCollectionTarget(segments, channels) + return NewCollectionTarget(segments, channels, partitions) } func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID int64) []*CollectionTarget { @@ -604,10 +607,13 @@ func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope T return 0 } -func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64) bool { - newChannels := mgr.GetDmChannelsByCollection(collectionID, CurrentTarget) +func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64, partitionID int64) bool { + mgr.rwMutex.RLock() + defer mgr.rwMutex.RUnlock() - return len(newChannels) > 0 + targets := mgr.getCollectionTarget(CurrentTarget, collectionID) + + return len(targets) > 0 && (targets[0].partitions.Contain(partitionID) || partitionID == common.AllPartitionsID) && len(targets[0].dmChannels) > 0 } func (mgr *TargetManager) IsNextTargetExist(collectionID int64) bool { diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index 5aad9d8a7c..894660638d 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -346,7 +346,7 @@ func (suite *TargetManagerSuite) assertSegments(expected []int64, actual map[int func (suite *TargetManagerSuite) TestGetCollectionTargetVersion() { t1 := time.Now().UnixNano() - target := NewCollectionTarget(nil, nil) + target := NewCollectionTarget(nil, nil, nil) t2 := time.Now().UnixNano() version := target.GetTargetVersion() diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index ee54517b7a..9080cee8f0 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/eventlog" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -221,7 +222,7 @@ func (ob *CollectionObserver) observeTimeout() { func (ob *CollectionObserver) readyToObserve(collectionID int64) bool { metaExist := (ob.meta.GetCollection(collectionID) != nil) - targetExist := ob.targetMgr.IsNextTargetExist(collectionID) || ob.targetMgr.IsCurrentTargetExist(collectionID) + targetExist := ob.targetMgr.IsNextTargetExist(collectionID) || ob.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID) return metaExist && targetExist } @@ -332,7 +333,7 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount if loadPercentage == 100 { - if !ob.targetObserver.Check(ctx, partition.GetCollectionID()) { + if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) { log.Warn("failed to manual check current target, skip update load status") return } diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index ae343abd36..7d3087b83d 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -171,9 +171,9 @@ func (ob *TargetObserver) schedule(ctx context.Context) { } // Check whether provided collection is has current target. -// If not, submit a async task into dispatcher. -func (ob *TargetObserver) Check(ctx context.Context, collectionID int64) bool { - result := ob.targetMgr.IsCurrentTargetExist(collectionID) +// If not, submit an async task into dispatcher. +func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool { + result := ob.targetMgr.IsCurrentTargetExist(collectionID, partitionID) if !result { ob.dispatcher.AddTask(collectionID) } diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 4116b06b8a..825a2b28bb 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -32,6 +32,7 @@ import ( . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -284,7 +285,7 @@ func (suite *TargetObserverCheckSuite) SetupTest() { } func (s *TargetObserverCheckSuite) TestCheck() { - r := s.observer.Check(context.Background(), s.collectionID) + r := s.observer.Check(context.Background(), s.collectionID, common.AllPartitionsID) s.False(r) s.True(s.observer.dispatcher.tasks.Contain(s.collectionID)) }