From 02ace25c68248b98d1e2cb33d75aecb7adb1d7c1 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Thu, 25 Apr 2024 20:49:25 -0700 Subject: [PATCH] enhance: reduce the cpu usage when collection number is high (#32245) related to #32165 1. for all the manager, support collection level index 2. remove collection level filter to avoid extra cpu usage when collection number increases Signed-off-by: xiaofanluan --- .../balance/rowcount_based_balancer.go | 4 +- internal/querycoordv2/balance/utils.go | 4 +- .../querycoordv2/checkers/channel_checker.go | 4 +- internal/querycoordv2/job/utils.go | 8 ++- .../querycoordv2/meta/channel_dist_manager.go | 54 ++++++++++++++++--- .../meta/channel_dist_manager_test.go | 10 ++-- .../observers/replica_observer.go | 2 +- .../observers/resource_observer.go | 8 +-- internal/querycoordv2/ops_services.go | 2 +- pkg/util/paramtable/component_param.go | 4 +- 10 files changed, 73 insertions(+), 27 deletions(-) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index e2c56bd797..cab6bd5488 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -316,7 +316,7 @@ func (b *RowCountBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNode func (b *RowCountBasedBalancer) genStoppingChannelPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []ChannelAssignPlan { channelPlans := make([]ChannelAssignPlan, 0) for _, nodeID := range offlineNodes { - dmChannels := b.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(nodeID)) + dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID)) plans := b.AssignChannel(dmChannels, onlineNodes, false) for i := range plans { plans[i].From = nodeID @@ -341,7 +341,7 @@ func (b *RowCountBasedBalancer) genChannelPlan(replica *meta.Replica, onlineNode nodeWithLessChannel := make([]int64, 0) channelsToMove := make([]*meta.DmChannel, 0) for _, node := range onlineNodes { - channels := b.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(node)) + channels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(node)) if len(channels) <= average { nodeWithLessChannel = append(nodeWithLessChannel, node) diff --git a/internal/querycoordv2/balance/utils.go b/internal/querycoordv2/balance/utils.go index 858de2ec07..9e91b6bb87 100644 --- a/internal/querycoordv2/balance/utils.go +++ b/internal/querycoordv2/balance/utils.go @@ -176,7 +176,7 @@ func PrintCurrentReplicaDist(replica *meta.Replica, // 3. print stopping nodes channel distribution distInfo += "[stoppingNodesChannelDist:" for stoppingNodeID := range stoppingNodesSegments { - stoppingNodeChannels := channelManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(stoppingNodeID)) + stoppingNodeChannels := channelManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(stoppingNodeID)) distInfo += fmt.Sprintf("[nodeID:%d, count:%d,", stoppingNodeID, len(stoppingNodeChannels)) distInfo += "channels:[" for _, stoppingChan := range stoppingNodeChannels { @@ -189,7 +189,7 @@ func PrintCurrentReplicaDist(replica *meta.Replica, // 4. print normal nodes channel distribution distInfo += "[normalNodesChannelDist:" for normalNodeID := range nodeSegments { - normalNodeChannels := channelManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(normalNodeID)) + normalNodeChannels := channelManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(normalNodeID)) distInfo += fmt.Sprintf("[nodeID:%d, count:%d,", normalNodeID, len(normalNodeChannels)) distInfo += "channels:[" for _, normalNodeChan := range normalNodeChannels { diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index 1152a47cfc..a55cd13cbe 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -162,7 +162,7 @@ func (c *ChannelChecker) getDmChannelDiff(collectionID int64, func (c *ChannelChecker) getChannelDist(replica *meta.Replica) []*meta.DmChannel { dist := make([]*meta.DmChannel, 0) for _, nodeID := range replica.GetNodes() { - dist = append(dist, c.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(nodeID))...) + dist = append(dist, c.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID))...) } return dist } @@ -183,7 +183,7 @@ func (c *ChannelChecker) findRepeatedChannels(ctx context.Context, replicaID int for _, ch := range dist { leaderView := c.dist.LeaderViewManager.GetLeaderShardView(ch.Node, ch.GetChannelName()) if leaderView == nil { - log.Info("shard leadview is not ready, skip", + log.Info("shard leader view is not ready, skip", zap.Int64("collectionID", replica.GetCollectionID()), zap.Int64("replicaID", replicaID), zap.Int64("leaderID", ch.Node), diff --git a/internal/querycoordv2/job/utils.go b/internal/querycoordv2/job/utils.go index 85faa81ee7..7f56794144 100644 --- a/internal/querycoordv2/job/utils.go +++ b/internal/querycoordv2/job/utils.go @@ -49,11 +49,17 @@ func waitCollectionReleased(dist *meta.DistributionManager, checkerController *c return partitionSet.Contain(segment.GetPartitionID()) }) } else { - channels = dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(collection)) + channels = dist.ChannelDistManager.GetByCollectionAndFilter(collection) } if len(channels)+len(segments) == 0 { break + } else { + log.Info("wait for release done", zap.Int64("collection", collection), + zap.Int64s("partitions", partitions), + zap.Int("channel", len(channels)), + zap.Int("segments", len(segments)), + ) } // trigger check more frequently diff --git a/internal/querycoordv2/meta/channel_dist_manager.go b/internal/querycoordv2/meta/channel_dist_manager.go index 131c211ee5..a95fbca03f 100644 --- a/internal/querycoordv2/meta/channel_dist_manager.go +++ b/internal/querycoordv2/meta/channel_dist_manager.go @@ -45,12 +45,6 @@ func WithReplica2Channel(replica *Replica) ChannelDistFilter { } } -func WithChannelName2Channel(channelName string) ChannelDistFilter { - return func(ch *DmChannel) bool { - return ch.GetChannelName() == channelName - } -} - type DmChannel struct { *datapb.VchannelInfo Node int64 @@ -76,11 +70,15 @@ type ChannelDistManager struct { // NodeID -> Channels channels map[UniqueID][]*DmChannel + + // CollectionID -> Channels + collectionIndex map[int64][]*DmChannel } func NewChannelDistManager() *ChannelDistManager { return &ChannelDistManager{ - channels: make(map[UniqueID][]*DmChannel), + channels: make(map[UniqueID][]*DmChannel), + collectionIndex: make(map[int64][]*DmChannel), } } @@ -146,6 +144,31 @@ func (m *ChannelDistManager) GetByFilter(filters ...ChannelDistFilter) []*DmChan return ret } +func (m *ChannelDistManager) GetByCollectionAndFilter(collectionID int64, filters ...ChannelDistFilter) []*DmChannel { + m.rwmutex.RLock() + defer m.rwmutex.RUnlock() + + mergedFilters := func(ch *DmChannel) bool { + for _, fn := range filters { + if fn != nil && !fn(ch) { + return false + } + } + + return true + } + + ret := make([]*DmChannel, 0) + + // If a collection ID is provided, use the collection index + for _, channel := range m.collectionIndex[collectionID] { + if mergedFilters(channel) { + ret = append(ret, channel) + } + } + return ret +} + func (m *ChannelDistManager) Update(nodeID UniqueID, channels ...*DmChannel) { m.rwmutex.Lock() defer m.rwmutex.Unlock() @@ -155,4 +178,21 @@ func (m *ChannelDistManager) Update(nodeID UniqueID, channels ...*DmChannel) { } m.channels[nodeID] = channels + + m.updateCollectionIndex() +} + +// update secondary index for channel distribution +func (m *ChannelDistManager) updateCollectionIndex() { + m.collectionIndex = make(map[int64][]*DmChannel) + for _, nodeChannels := range m.channels { + for _, channel := range nodeChannels { + collectionID := channel.GetCollectionID() + if channels, ok := m.collectionIndex[collectionID]; !ok { + m.collectionIndex[collectionID] = []*DmChannel{channel} + } else { + m.collectionIndex[collectionID] = append(channels, channel) + } + } + } } diff --git a/internal/querycoordv2/meta/channel_dist_manager_test.go b/internal/querycoordv2/meta/channel_dist_manager_test.go index be67db31ac..9d0df0546e 100644 --- a/internal/querycoordv2/meta/channel_dist_manager_test.go +++ b/internal/querycoordv2/meta/channel_dist_manager_test.go @@ -76,26 +76,26 @@ func (suite *ChannelDistManagerSuite) TestGetBy() { } // Test GetByCollection - channels = dist.GetByFilter(WithCollectionID2Channel(suite.collection)) + channels = dist.GetByCollectionAndFilter(suite.collection) suite.Len(channels, 4) suite.AssertCollection(channels, suite.collection) - channels = dist.GetByFilter(WithCollectionID2Channel(-1)) + channels = dist.GetByCollectionAndFilter(-1) suite.Len(channels, 0) // Test GetByNodeAndCollection // 1. Valid node and valid collection for _, node := range suite.nodes { - channels := dist.GetByFilter(WithCollectionID2Channel(suite.collection), WithNodeID2Channel(node)) + channels := dist.GetByCollectionAndFilter(suite.collection, WithNodeID2Channel(node)) suite.AssertNode(channels, node) suite.AssertCollection(channels, suite.collection) } // 2. Valid node and invalid collection - channels = dist.GetByFilter(WithCollectionID2Channel(-1), WithNodeID2Channel(suite.nodes[1])) + channels = dist.GetByCollectionAndFilter(-1, WithNodeID2Channel(suite.nodes[1])) suite.Len(channels, 0) // 3. Invalid node and valid collection - channels = dist.GetByFilter(WithCollectionID2Channel(suite.collection), WithNodeID2Channel(-1)) + channels = dist.GetByCollectionAndFilter(suite.collection, WithNodeID2Channel(-1)) suite.Len(channels, 0) } diff --git a/internal/querycoordv2/observers/replica_observer.go b/internal/querycoordv2/observers/replica_observer.go index ebb9bae98d..e60a988a5e 100644 --- a/internal/querycoordv2/observers/replica_observer.go +++ b/internal/querycoordv2/observers/replica_observer.go @@ -110,7 +110,7 @@ func (ob *ReplicaObserver) checkNodesInReplica() { ) removeNodes := make([]int64, 0, len(roNodes)) for _, node := range roNodes { - channels := ob.distMgr.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(node)) + channels := ob.distMgr.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(node)) segments := ob.distMgr.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithNodeID(node)) if len(channels) == 0 && len(segments) == 0 { removeNodes = append(removeNodes, node) diff --git a/internal/querycoordv2/observers/resource_observer.go b/internal/querycoordv2/observers/resource_observer.go index 4d1c62cb3b..43e96fd104 100644 --- a/internal/querycoordv2/observers/resource_observer.go +++ b/internal/querycoordv2/observers/resource_observer.go @@ -91,7 +91,7 @@ func (ob *ResourceObserver) checkAndRecoverResourceGroup() { manager := ob.meta.ResourceManager rgNames := manager.ListResourceGroups() enableRGAutoRecover := params.Params.QueryCoordCfg.EnableRGAutoRecover.GetAsBool() - log.Info("start to check resource group", zap.Bool("enableRGAutoRecover", enableRGAutoRecover), zap.Int("resourceGroupNum", len(rgNames))) + log.Debug("start to check resource group", zap.Bool("enableRGAutoRecover", enableRGAutoRecover), zap.Int("resourceGroupNum", len(rgNames))) // Check if there is any incoming node. if manager.CheckIncomingNodeNum() > 0 { @@ -100,10 +100,10 @@ func (ob *ResourceObserver) checkAndRecoverResourceGroup() { } // Remove all down nodes in resource group manager. - log.Info("remove all down nodes in resource group manager...") + log.Debug("remove all down nodes in resource group manager...") ob.meta.RemoveAllDownNode() - log.Info("recover resource groups...") + log.Debug("recover resource groups...") // Recover all resource group into expected configuration. for _, rgName := range rgNames { if err := manager.MeetRequirement(rgName); err != nil { @@ -126,5 +126,5 @@ func (ob *ResourceObserver) checkAndRecoverResourceGroup() { if enableRGAutoRecover { utils.RecoverAllCollection(ob.meta) } - log.Info("check resource group done", zap.Bool("enableRGAutoRecover", enableRGAutoRecover), zap.Int("resourceGroupNum", len(rgNames))) + log.Debug("check resource group done", zap.Bool("enableRGAutoRecover", enableRGAutoRecover), zap.Int("resourceGroupNum", len(rgNames))) } diff --git a/internal/querycoordv2/ops_services.go b/internal/querycoordv2/ops_services.go index cb6853692c..2d643b18c2 100644 --- a/internal/querycoordv2/ops_services.go +++ b/internal/querycoordv2/ops_services.go @@ -360,7 +360,7 @@ func (s *Server) TransferChannel(ctx context.Context, req *querypb.TransferChann dstNodeSet.Remove(srcNode) // check sealed segment list - channels := s.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(srcNode)) + channels := s.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(srcNode)) toBalance := typeutil.NewSet[*meta.DmChannel]() if req.GetTransferAll() { toBalance.Insert(channels...) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index e3c5aac8e4..0a35f3508f 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1688,7 +1688,7 @@ func (p *queryCoordConfig) init(base *BaseTable) { p.SegmentCheckInterval = ParamItem{ Key: "queryCoord.checkSegmentInterval", Version: "2.3.0", - DefaultValue: "1000", + DefaultValue: "3000", PanicIfEmpty: true, Export: true, } @@ -1697,7 +1697,7 @@ func (p *queryCoordConfig) init(base *BaseTable) { p.ChannelCheckInterval = ParamItem{ Key: "queryCoord.checkChannelInterval", Version: "2.3.0", - DefaultValue: "1000", + DefaultValue: "3000", PanicIfEmpty: true, Export: true, }