mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: Add refine logs for task scheduler in QueryCoord (#44577)
issue: https://github.com/milvus-io/milvus/issues/43968 Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
parent
e83c7e0c92
commit
4279f166c6
@ -19,7 +19,9 @@ package meta
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/pingcap/log"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
@ -345,11 +347,16 @@ func (m *ChannelDistManager) updateCollectionIndex() {
|
||||
func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica) *DmChannel {
|
||||
m.rwmutex.RLock()
|
||||
defer m.rwmutex.RUnlock()
|
||||
|
||||
logger := log.With(zap.String("Scope", "ChannelDistManager"), zap.String("channelName", channelName),
|
||||
zap.Int64("replicaID", replica.GetID()))
|
||||
channels := m.collectionIndex[replica.GetCollectionID()]
|
||||
|
||||
var candidates *DmChannel
|
||||
for _, channel := range channels {
|
||||
for chIdx, channel := range channels {
|
||||
logger := logger.With(zap.Int("channelIdx", chIdx))
|
||||
logger.Debug("process", zap.Int64("channelID", channel.Node), zap.Int64("channelVersion", channel.Version),
|
||||
zap.String("channel name", channel.GetChannelName()),
|
||||
zap.Bool("replicaContains", replica.Contains(channel.Node)))
|
||||
if channel.GetChannelName() == channelName && replica.Contains(channel.Node) {
|
||||
if candidates == nil {
|
||||
candidates = channel
|
||||
@ -360,13 +367,23 @@ func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica
|
||||
|
||||
candidateIsStreamingNode := m.checkIfStreamingNode(candidates.Node)
|
||||
channelIsStreamingNode := m.checkIfStreamingNode(channel.Node)
|
||||
logger.Debug("check whether stream node is serviceable",
|
||||
zap.Bool("candidatesServiceable", candidatesServiceable),
|
||||
zap.Bool("channelServiceable", channelServiceable),
|
||||
zap.Bool("candidateIsStreamingNode", candidateIsStreamingNode),
|
||||
zap.Bool("channelIsStreamingNode", channelIsStreamingNode))
|
||||
|
||||
if channelIsStreamingNode && !candidateIsStreamingNode {
|
||||
// When upgrading from 2.5 to 2.6, the delegator leader may not locate at streaming node.
|
||||
// We always use the streaming node as the delegator leader to avoid the delete data lost when loading segment.
|
||||
logger.Debug("set delegator on stream node to candidate shard leader", zap.Int64("node", channel.Node),
|
||||
zap.Int64("channel version", channel.Version))
|
||||
candidates = channel
|
||||
} else if !channelIsStreamingNode && candidateIsStreamingNode {
|
||||
// When downgrading from 2.6 to 2.5, the delegator leader may locate at non-streaming node.
|
||||
// We always use the non-streaming node as the delegator leader to avoid the delete data lost when loading segment.
|
||||
logger.Debug("found delegator which is not on stream node", zap.Int64("node", channel.Node),
|
||||
zap.Int64("channel version", channel.Version))
|
||||
continue
|
||||
} else {
|
||||
updateNeeded := false
|
||||
@ -374,19 +391,28 @@ func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica
|
||||
case !candidatesServiceable && channelServiceable:
|
||||
// Current candidate is not serviceable but new channel is
|
||||
updateNeeded = true
|
||||
logger.Debug("set serviceable delegator to candidate shard leader", zap.Int64("node", channel.Node),
|
||||
zap.Int64("channel version", channel.Version))
|
||||
case candidatesServiceable == channelServiceable && channel.Version > candidates.Version:
|
||||
// Same service status but higher version
|
||||
updateNeeded = true
|
||||
logger.Debug("set serviceable delegator with larger version to candidate shard leader", zap.Int64("node", channel.Node),
|
||||
zap.Int64("channel version", channel.Version), zap.Int64("candidate version", candidates.Version))
|
||||
}
|
||||
|
||||
if updateNeeded {
|
||||
candidates = channel
|
||||
} else {
|
||||
logger.Debug("not set any channel to candidates in this round")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if candidates != nil {
|
||||
logger.Debug("final", zap.Any("candidates", candidates),
|
||||
zap.Int64("candidates version", candidates.Version),
|
||||
zap.Int64("candidates node", candidates.Node))
|
||||
}
|
||||
return candidates
|
||||
}
|
||||
|
||||
|
||||
@ -884,6 +884,11 @@ func (scheduler *taskScheduler) preProcess(task Task) bool {
|
||||
// wait for new delegator becomes leader, then try to remove old leader
|
||||
task := task.(*ChannelTask)
|
||||
delegator := scheduler.distMgr.ChannelDistManager.GetShardLeader(task.Shard(), task.replica)
|
||||
log.Ctx(scheduler.ctx).Debug("process channelAction", zap.Bool("delegator is Nil", delegator == nil))
|
||||
if delegator != nil {
|
||||
log.Ctx(scheduler.ctx).Debug("process channelAction", zap.Int64("delegator node", delegator.Node),
|
||||
zap.Int64("action node", action.Node()))
|
||||
}
|
||||
newDelegatorReady = delegator != nil && delegator.Node == action.Node()
|
||||
default:
|
||||
newDelegatorReady = true
|
||||
@ -895,6 +900,7 @@ func (scheduler *taskScheduler) preProcess(task Task) bool {
|
||||
zap.Int64("collectionID", task.CollectionID()),
|
||||
zap.String("channelName", task.Shard()),
|
||||
zap.Int64("taskID", task.ID()))
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user