From 64dad60dc29a6157bb0781fd736667433ce25c9b Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Mon, 17 Feb 2025 14:10:15 +0800 Subject: [PATCH] fix: delegator doesn't follow with wal if streaming enabled (#39890) issue: #38399 Signed-off-by: chyezh --- internal/querycoordv2/balance/score_based_balancer.go | 10 +++++++++- internal/querycoordv2/checkers/leader_checker.go | 7 ++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 333f1411cf..c36156a70d 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -180,11 +180,14 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64 for _, ch := range channels { func(ch *meta.DmChannel) { var targetNode *nodeItem + forceAssignChannel := forceAssign if streamingutil.IsStreamingServiceEnabled() { // When streaming service is enabled, we need to assign channel to the node where WAL is located. nodeID := snmanager.StaticStreamingNodeManager.GetWALLocated(ch.GetChannelName()) if item, ok := nodeItemsMap[nodeID]; ok { targetNode = item + // assgin channel to the node where WAL is located always has enough benefits. + forceAssignChannel = true } } // for each channel, pick the node with the least score @@ -196,10 +199,15 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64 scoreChanges := b.calculateChannelScore(ch, collectionID) sourceNode := nodeItemsMap[ch.Node] + if sourceNode != nil && sourceNode.nodeID == targetNode.nodeID { + // if the channel is already on the target node, skip assignment operation. + return + } + // if segment's node exist, which means this segment comes from balancer. we should consider the benefit // if the segment reassignment doesn't got enough benefit, we should skip this reassignment // notice: we should skip benefit check for forceAssign - if !forceAssign && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) { + if !forceAssignChannel && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) { br.AddRecord(StrRecordf("skip generate balance plan for channel %s since no enough benefit", ch.GetChannelName())) return } diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go index 110d58068f..fe3f453760 100644 --- a/internal/querycoordv2/checkers/leader_checker.go +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/proto/datapb" @@ -92,7 +93,11 @@ func (c *LeaderChecker) Check(ctx context.Context) []task.Task { replicas := c.meta.ReplicaManager.GetByCollection(ctx, collectionID) for _, replica := range replicas { - for _, node := range replica.GetRWNodes() { + nodes := replica.GetRWNodes() + if streamingutil.IsStreamingServiceEnabled() { + nodes = replica.GetRWSQNodes() + } + for _, node := range nodes { leaderViews := c.dist.LeaderViewManager.GetByFilter(meta.WithCollectionID2LeaderView(replica.GetCollectionID()), meta.WithNodeID2LeaderView(node)) for _, leaderView := range leaderViews { dist := c.dist.SegmentDistManager.GetByFilter(meta.WithChannel(leaderView.Channel), meta.WithReplica(replica))