enhance: Add refine logs for task scheduler in QueryCoord (#44577) (#44725)

issue: https://github.com/milvus-io/milvus/issues/43968
pr: https://github.com/milvus-io/milvus/pull/44577

---------

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
Co-authored-by: wei liu <wei.liu@zilliz.com>
This commit is contained in:
zhenshan.cao 2025-10-11 15:35:57 +08:00 committed by GitHub
parent 0ca9a76ab8
commit fc6fe6e3bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 43 additions and 9 deletions

View File

@ -111,6 +111,8 @@ linters-settings:
desc: "not allowed, gogo protobuf is deprecated"
- pkg: "github.com/golang/protobuf/proto"
desc: "not allowed, protobuf v1 is deprecated, use google.golang.org/protobuf/proto instead"
- pkg: "github.com/pingcap/log"
desc: "not allowed, use github.com/milvus-io/milvus/pkg/v2/log"
forbidigo:
forbid:
- '^time\.Tick$'

2
go.mod
View File

@ -24,7 +24,7 @@ require (
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.3
github.com/minio/minio-go/v7 v7.0.73
github.com/panjf2000/ants/v2 v2.11.3 // indirect
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 // indirect
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.55.0

View File

@ -9,7 +9,6 @@ import (
"sync"
"github.com/cockroachdb/errors"
"github.com/pingcap/log"
"github.com/samber/lo"
"go.uber.org/zap"
@ -18,6 +17,7 @@ import (
"github.com/milvus-io/milvus/internal/distributed/streaming"
management "github.com/milvus-io/milvus/internal/http"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"

View File

@ -19,10 +19,10 @@ package metacache
import (
"sync"
"github.com/pingcap/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/log"
)
type SegmentBM25Stats struct {

View File

@ -20,10 +20,12 @@ import (
"sync"
"github.com/samber/lo"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
@ -345,11 +347,16 @@ func (m *ChannelDistManager) updateCollectionIndex() {
func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica) *DmChannel {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
logger := log.With(zap.String("Scope", "ChannelDistManager"), zap.String("channelName", channelName),
zap.Int64("replicaID", replica.GetID()))
channels := m.collectionIndex[replica.GetCollectionID()]
var candidates *DmChannel
for _, channel := range channels {
for chIdx, channel := range channels {
logger := logger.With(zap.Int("channelIdx", chIdx))
logger.Debug("process", zap.Int64("channelID", channel.Node), zap.Int64("channelVersion", channel.Version),
zap.String("channel name", channel.GetChannelName()),
zap.Bool("replicaContains", replica.Contains(channel.Node)))
if channel.GetChannelName() == channelName && replica.Contains(channel.Node) {
if candidates == nil {
candidates = channel
@ -360,13 +367,23 @@ func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica
candidateIsStreamingNode := m.checkIfStreamingNode(candidates.Node)
channelIsStreamingNode := m.checkIfStreamingNode(channel.Node)
logger.Debug("check whether stream node is serviceable",
zap.Bool("candidatesServiceable", candidatesServiceable),
zap.Bool("channelServiceable", channelServiceable),
zap.Bool("candidateIsStreamingNode", candidateIsStreamingNode),
zap.Bool("channelIsStreamingNode", channelIsStreamingNode))
if channelIsStreamingNode && !candidateIsStreamingNode {
// When upgrading from 2.5 to 2.6, the delegator leader may not locate at streaming node.
// We always use the streaming node as the delegator leader to avoid the delete data lost when loading segment.
logger.Debug("set delegator on stream node to candidate shard leader", zap.Int64("node", channel.Node),
zap.Int64("channel version", channel.Version))
candidates = channel
} else if !channelIsStreamingNode && candidateIsStreamingNode {
// When downgrading from 2.6 to 2.5, the delegator leader may locate at non-streaming node.
// We always use the non-streaming node as the delegator leader to avoid the delete data lost when loading segment.
logger.Debug("found delegator which is not on stream node", zap.Int64("node", channel.Node),
zap.Int64("channel version", channel.Version))
continue
} else {
updateNeeded := false
@ -374,19 +391,28 @@ func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica
case !candidatesServiceable && channelServiceable:
// Current candidate is not serviceable but new channel is
updateNeeded = true
logger.Debug("set serviceable delegator to candidate shard leader", zap.Int64("node", channel.Node),
zap.Int64("channel version", channel.Version))
case candidatesServiceable == channelServiceable && channel.Version > candidates.Version:
// Same service status but higher version
updateNeeded = true
logger.Debug("set serviceable delegator with larger version to candidate shard leader", zap.Int64("node", channel.Node),
zap.Int64("channel version", channel.Version), zap.Int64("candidate version", candidates.Version))
}
if updateNeeded {
candidates = channel
} else {
logger.Debug("not set any channel to candidates in this round")
}
}
}
}
}
if candidates != nil {
logger.Debug("final", zap.Any("candidates", candidates),
zap.Int64("candidates version", candidates.Version),
zap.Int64("candidates node", candidates.Node))
}
return candidates
}

View File

@ -884,6 +884,11 @@ func (scheduler *taskScheduler) preProcess(task Task) bool {
// wait for new delegator becomes leader, then try to remove old leader
task := task.(*ChannelTask)
delegator := scheduler.distMgr.ChannelDistManager.GetShardLeader(task.Shard(), task.replica)
log.Ctx(scheduler.ctx).Debug("process channelAction", zap.Bool("delegator is Nil", delegator == nil))
if delegator != nil {
log.Ctx(scheduler.ctx).Debug("process channelAction", zap.Int64("delegator node", delegator.Node),
zap.Int64("action node", action.Node()))
}
newDelegatorReady = delegator != nil && delegator.Node == action.Node()
default:
newDelegatorReady = true
@ -895,6 +900,7 @@ func (scheduler *taskScheduler) preProcess(task Task) bool {
zap.Int64("collectionID", task.CollectionID()),
zap.String("channelName", task.Shard()),
zap.Int64("taskID", task.ID()))
break
}
}

View File

@ -26,7 +26,6 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/pingcap/log"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -45,6 +44,7 @@ import (
"github.com/milvus-io/milvus/internal/util/function"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"

View File

@ -19,11 +19,11 @@ import (
"github.com/bits-and-blooms/bloom/v3"
"github.com/cockroachdb/errors"
"github.com/greatroar/blobloom"
"github.com/pingcap/log"
"github.com/zeebo/xxh3"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/pkg/v2/log"
)
type BFType int