From b284b81a47f82bf66b495797e315cd95c7c20afc Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 1 Jul 2024 17:40:07 +0800 Subject: [PATCH] fix: Check partition in current target when observing partition load status (#34282) See also #34234 `LoadPartitions` does not guarantee the current target has loading partitions if there are some partitions already loaded before. This PR check current target contains the partition to load when advancing loading percentage to 100. Signed-off-by: Congqi Xia --- .../querycoordv2/checkers/balance_checker.go | 3 ++- .../querycoordv2/checkers/channel_checker.go | 3 ++- .../querycoordv2/checkers/leader_checker.go | 3 ++- .../querycoordv2/checkers/segment_checker.go | 3 ++- .../querycoordv2/meta/mock_target_manager.go | 21 ++++++++++--------- internal/querycoordv2/meta/target.go | 9 ++++++-- internal/querycoordv2/meta/target_manager.go | 18 ++++++++++------ .../querycoordv2/meta/target_manager_test.go | 2 +- .../observers/collection_observer.go | 5 +++-- .../querycoordv2/observers/target_observer.go | 4 ++-- .../observers/target_observer_test.go | 3 ++- 11 files changed, 46 insertions(+), 28 deletions(-) diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index 7c41369b04..86cfb06453 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -74,7 +75,7 @@ func (b *BalanceChecker) Description() string { func (b *BalanceChecker) readyToCheck(collectionID int64) bool { metaExist := (b.meta.GetCollection(collectionID) != nil) - targetExist := b.targetMgr.IsNextTargetExist(collectionID) || b.targetMgr.IsCurrentTargetExist(collectionID) + targetExist := b.targetMgr.IsNextTargetExist(collectionID) || b.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID) return metaExist && targetExist } diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index f60381286b..324525a6fc 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -70,7 +71,7 @@ func (c *ChannelChecker) Description() string { func (c *ChannelChecker) readyToCheck(collectionID int64) bool { metaExist := (c.meta.GetCollection(collectionID) != nil) - targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID) + targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID) return metaExist && targetExist } diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go index 5486a1d3e0..0eb0d6dd1b 100644 --- a/internal/querycoordv2/checkers/leader_checker.go +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" ) @@ -66,7 +67,7 @@ func (c *LeaderChecker) Description() string { func (c *LeaderChecker) readyToCheck(collectionID int64) bool { metaExist := (c.meta.GetCollection(collectionID) != nil) - targetExist := c.target.IsNextTargetExist(collectionID) || c.target.IsCurrentTargetExist(collectionID) + targetExist := c.target.IsNextTargetExist(collectionID) || c.target.IsCurrentTargetExist(collectionID, common.AllPartitionsID) return metaExist && targetExist } diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index e884e56ff3..d669758790 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" ) @@ -75,7 +76,7 @@ func (c *SegmentChecker) Description() string { func (c *SegmentChecker) readyToCheck(collectionID int64) bool { metaExist := (c.meta.GetCollection(collectionID) != nil) - targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID) + targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID) return metaExist && targetExist } diff --git a/internal/querycoordv2/meta/mock_target_manager.go b/internal/querycoordv2/meta/mock_target_manager.go index 5728fd2903..3637cc4204 100644 --- a/internal/querycoordv2/meta/mock_target_manager.go +++ b/internal/querycoordv2/meta/mock_target_manager.go @@ -478,13 +478,13 @@ func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) RunAndReturn(run return _c } -// IsCurrentTargetExist provides a mock function with given fields: collectionID -func (_m *MockTargetManager) IsCurrentTargetExist(collectionID int64) bool { - ret := _m.Called(collectionID) +// IsCurrentTargetExist provides a mock function with given fields: collectionID, partitionID +func (_m *MockTargetManager) IsCurrentTargetExist(collectionID int64, partitionID int64) bool { + ret := _m.Called(collectionID, partitionID) var r0 bool - if rf, ok := ret.Get(0).(func(int64) bool); ok { - r0 = rf(collectionID) + if rf, ok := ret.Get(0).(func(int64, int64) bool); ok { + r0 = rf(collectionID, partitionID) } else { r0 = ret.Get(0).(bool) } @@ -499,13 +499,14 @@ type MockTargetManager_IsCurrentTargetExist_Call struct { // IsCurrentTargetExist is a helper method to define mock.On call // - collectionID int64 -func (_e *MockTargetManager_Expecter) IsCurrentTargetExist(collectionID interface{}) *MockTargetManager_IsCurrentTargetExist_Call { - return &MockTargetManager_IsCurrentTargetExist_Call{Call: _e.mock.On("IsCurrentTargetExist", collectionID)} +// - partitionID int64 +func (_e *MockTargetManager_Expecter) IsCurrentTargetExist(collectionID interface{}, partitionID interface{}) *MockTargetManager_IsCurrentTargetExist_Call { + return &MockTargetManager_IsCurrentTargetExist_Call{Call: _e.mock.On("IsCurrentTargetExist", collectionID, partitionID)} } -func (_c *MockTargetManager_IsCurrentTargetExist_Call) Run(run func(collectionID int64)) *MockTargetManager_IsCurrentTargetExist_Call { +func (_c *MockTargetManager_IsCurrentTargetExist_Call) Run(run func(collectionID int64, partitionID int64)) *MockTargetManager_IsCurrentTargetExist_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64)) + run(args[0].(int64), args[1].(int64)) }) return _c } @@ -515,7 +516,7 @@ func (_c *MockTargetManager_IsCurrentTargetExist_Call) Return(_a0 bool) *MockTar return _c } -func (_c *MockTargetManager_IsCurrentTargetExist_Call) RunAndReturn(run func(int64) bool) *MockTargetManager_IsCurrentTargetExist_Call { +func (_c *MockTargetManager_IsCurrentTargetExist_Call) RunAndReturn(run func(int64, int64) bool) *MockTargetManager_IsCurrentTargetExist_Call { _c.Call.Return(run) return _c } diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index b7fc06930a..d924e7cbc5 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -23,19 +23,22 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // CollectionTarget collection target is immutable, type CollectionTarget struct { segments map[int64]*datapb.SegmentInfo dmChannels map[string]*DmChannel + partitions typeutil.Set[int64] // stores target partitions info version int64 } -func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel) *CollectionTarget { +func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget { return &CollectionTarget{ segments: segments, dmChannels: dmChannels, + partitions: typeutil.NewSet(partitionIDs...), version: time.Now().UnixNano(), } } @@ -43,6 +46,7 @@ func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget { segments := make(map[int64]*datapb.SegmentInfo) dmChannels := make(map[string]*DmChannel) + var partitions []int64 for _, t := range target.GetChannelTargets() { for _, partition := range t.GetPartitionTargets() { @@ -55,6 +59,7 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget InsertChannel: t.GetChannelName(), } } + partitions = append(partitions, partition.GetPartitionID()) } dmChannels[t.GetChannelName()] = &DmChannel{ VchannelInfo: &datapb.VchannelInfo{ @@ -68,7 +73,7 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget } } - return NewCollectionTarget(segments, dmChannels) + return NewCollectionTarget(segments, dmChannels, partitions) } func (p *CollectionTarget) toPbMsg() *querypb.CollectionTarget { diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 376fd37569..310ad2dcb0 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -67,7 +67,7 @@ type TargetManagerInterface interface { GetDmChannel(collectionID int64, channel string, scope TargetScope) *DmChannel GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64 - IsCurrentTargetExist(collectionID int64) bool + IsCurrentTargetExist(collectionID int64, partitionID int64) bool IsNextTargetExist(collectionID int64) bool SaveCurrentTarget(catalog metastore.QueryCoordCatalog) Recover(catalog metastore.QueryCoordCatalog) error @@ -136,7 +136,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 { return partition.PartitionID }) - allocatedTarget := NewCollectionTarget(nil, nil) + allocatedTarget := NewCollectionTarget(nil, nil, partitionIDs) mgr.rwMutex.Unlock() log := log.With(zap.Int64("collectionID", collectionID), @@ -373,8 +373,11 @@ func (mgr *TargetManager) removePartitionFromCollectionTarget(oldTarget *Collect for _, channel := range oldTarget.GetAllDmChannels() { channels[channel.GetChannelName()] = channel } + partitions := lo.Filter(oldTarget.partitions.Collect(), func(partitionID int64, _ int) bool { + return !partitionSet.Contain(partitionID) + }) - return NewCollectionTarget(segments, channels) + return NewCollectionTarget(segments, channels, partitions) } func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID int64) []*CollectionTarget { @@ -604,10 +607,13 @@ func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope T return 0 } -func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64) bool { - newChannels := mgr.GetDmChannelsByCollection(collectionID, CurrentTarget) +func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64, partitionID int64) bool { + mgr.rwMutex.RLock() + defer mgr.rwMutex.RUnlock() - return len(newChannels) > 0 + targets := mgr.getCollectionTarget(CurrentTarget, collectionID) + + return len(targets) > 0 && (targets[0].partitions.Contain(partitionID) || partitionID == common.AllPartitionsID) && len(targets[0].dmChannels) > 0 } func (mgr *TargetManager) IsNextTargetExist(collectionID int64) bool { diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index 5aad9d8a7c..894660638d 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -346,7 +346,7 @@ func (suite *TargetManagerSuite) assertSegments(expected []int64, actual map[int func (suite *TargetManagerSuite) TestGetCollectionTargetVersion() { t1 := time.Now().UnixNano() - target := NewCollectionTarget(nil, nil) + target := NewCollectionTarget(nil, nil, nil) t2 := time.Now().UnixNano() version := target.GetTargetVersion() diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index ee54517b7a..9080cee8f0 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/eventlog" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -221,7 +222,7 @@ func (ob *CollectionObserver) observeTimeout() { func (ob *CollectionObserver) readyToObserve(collectionID int64) bool { metaExist := (ob.meta.GetCollection(collectionID) != nil) - targetExist := ob.targetMgr.IsNextTargetExist(collectionID) || ob.targetMgr.IsCurrentTargetExist(collectionID) + targetExist := ob.targetMgr.IsNextTargetExist(collectionID) || ob.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID) return metaExist && targetExist } @@ -332,7 +333,7 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount if loadPercentage == 100 { - if !ob.targetObserver.Check(ctx, partition.GetCollectionID()) { + if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) { log.Warn("failed to manual check current target, skip update load status") return } diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 83fd5a3248..7d3087b83d 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -172,8 +172,8 @@ func (ob *TargetObserver) schedule(ctx context.Context) { // Check whether provided collection is has current target. // If not, submit an async task into dispatcher. -func (ob *TargetObserver) Check(ctx context.Context, collectionID int64) bool { - result := ob.targetMgr.IsCurrentTargetExist(collectionID) +func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool { + result := ob.targetMgr.IsCurrentTargetExist(collectionID, partitionID) if !result { ob.dispatcher.AddTask(collectionID) } diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 4116b06b8a..825a2b28bb 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -32,6 +32,7 @@ import ( . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -284,7 +285,7 @@ func (suite *TargetObserverCheckSuite) SetupTest() { } func (s *TargetObserverCheckSuite) TestCheck() { - r := s.observer.Check(context.Background(), s.collectionID) + r := s.observer.Check(context.Background(), s.collectionID, common.AllPartitionsID) s.False(r) s.True(s.observer.dispatcher.tasks.Contain(s.collectionID)) }