From 4279f166c6f0bb0f103b8c362687d320a839caa0 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Fri, 10 Oct 2025 10:07:55 +0800 Subject: [PATCH] enhance: Add refine logs for task scheduler in QueryCoord (#44577) issue: https://github.com/milvus-io/milvus/issues/43968 Signed-off-by: zhenshan.cao --- .../querycoordv2/meta/channel_dist_manager.go | 34 ++++++++++++++++--- internal/querycoordv2/task/scheduler.go | 6 ++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/internal/querycoordv2/meta/channel_dist_manager.go b/internal/querycoordv2/meta/channel_dist_manager.go index 0701e3c267..fd81df3278 100644 --- a/internal/querycoordv2/meta/channel_dist_manager.go +++ b/internal/querycoordv2/meta/channel_dist_manager.go @@ -19,7 +19,9 @@ package meta import ( "sync" + "github.com/pingcap/log" "github.com/samber/lo" + "go.uber.org/zap" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/querycoordv2/session" @@ -345,11 +347,16 @@ func (m *ChannelDistManager) updateCollectionIndex() { func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica) *DmChannel { m.rwmutex.RLock() defer m.rwmutex.RUnlock() - + logger := log.With(zap.String("Scope", "ChannelDistManager"), zap.String("channelName", channelName), + zap.Int64("replicaID", replica.GetID())) channels := m.collectionIndex[replica.GetCollectionID()] var candidates *DmChannel - for _, channel := range channels { + for chIdx, channel := range channels { + logger := logger.With(zap.Int("channelIdx", chIdx)) + logger.Debug("process", zap.Int64("channelID", channel.Node), zap.Int64("channelVersion", channel.Version), + zap.String("channel name", channel.GetChannelName()), + zap.Bool("replicaContains", replica.Contains(channel.Node))) if channel.GetChannelName() == channelName && replica.Contains(channel.Node) { if candidates == nil { candidates = channel @@ -360,13 +367,23 @@ func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica candidateIsStreamingNode := m.checkIfStreamingNode(candidates.Node) channelIsStreamingNode := m.checkIfStreamingNode(channel.Node) + logger.Debug("check whether stream node is serviceable", + zap.Bool("candidatesServiceable", candidatesServiceable), + zap.Bool("channelServiceable", channelServiceable), + zap.Bool("candidateIsStreamingNode", candidateIsStreamingNode), + zap.Bool("channelIsStreamingNode", channelIsStreamingNode)) + if channelIsStreamingNode && !candidateIsStreamingNode { // When upgrading from 2.5 to 2.6, the delegator leader may not locate at streaming node. // We always use the streaming node as the delegator leader to avoid the delete data lost when loading segment. + logger.Debug("set delegator on stream node to candidate shard leader", zap.Int64("node", channel.Node), + zap.Int64("channel version", channel.Version)) candidates = channel } else if !channelIsStreamingNode && candidateIsStreamingNode { // When downgrading from 2.6 to 2.5, the delegator leader may locate at non-streaming node. // We always use the non-streaming node as the delegator leader to avoid the delete data lost when loading segment. + logger.Debug("found delegator which is not on stream node", zap.Int64("node", channel.Node), + zap.Int64("channel version", channel.Version)) continue } else { updateNeeded := false @@ -374,19 +391,28 @@ func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica case !candidatesServiceable && channelServiceable: // Current candidate is not serviceable but new channel is updateNeeded = true + logger.Debug("set serviceable delegator to candidate shard leader", zap.Int64("node", channel.Node), + zap.Int64("channel version", channel.Version)) case candidatesServiceable == channelServiceable && channel.Version > candidates.Version: // Same service status but higher version updateNeeded = true + logger.Debug("set serviceable delegator with larger version to candidate shard leader", zap.Int64("node", channel.Node), + zap.Int64("channel version", channel.Version), zap.Int64("candidate version", candidates.Version)) } - if updateNeeded { candidates = channel + } else { + logger.Debug("not set any channel to candidates in this round") } } } } } - + if candidates != nil { + logger.Debug("final", zap.Any("candidates", candidates), + zap.Int64("candidates version", candidates.Version), + zap.Int64("candidates node", candidates.Node)) + } return candidates } diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index ef3f774f6a..32e032f171 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -884,6 +884,11 @@ func (scheduler *taskScheduler) preProcess(task Task) bool { // wait for new delegator becomes leader, then try to remove old leader task := task.(*ChannelTask) delegator := scheduler.distMgr.ChannelDistManager.GetShardLeader(task.Shard(), task.replica) + log.Ctx(scheduler.ctx).Debug("process channelAction", zap.Bool("delegator is Nil", delegator == nil)) + if delegator != nil { + log.Ctx(scheduler.ctx).Debug("process channelAction", zap.Int64("delegator node", delegator.Node), + zap.Int64("action node", action.Node())) + } newDelegatorReady = delegator != nil && delegator.Node == action.Node() default: newDelegatorReady = true @@ -895,6 +900,7 @@ func (scheduler *taskScheduler) preProcess(task Task) bool { zap.Int64("collectionID", task.CollectionID()), zap.String("channelName", task.Shard()), zap.Int64("taskID", task.ID())) + break } }