diff --git a/.golangci.yml b/.golangci.yml index 3f30e3bffa..4b2bcd7639 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -111,6 +111,8 @@ linters-settings: desc: "not allowed, gogo protobuf is deprecated" - pkg: "github.com/golang/protobuf/proto" desc: "not allowed, protobuf v1 is deprecated, use google.golang.org/protobuf/proto instead" + - pkg: "github.com/pingcap/log" + desc: "not allowed, use github.com/milvus-io/milvus/pkg/v2/log" forbidigo: forbid: - '^time\.Tick$' diff --git a/go.mod b/go.mod index 05ae68ea73..f8e9990fad 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/milvus-io/milvus-proto/go-api/v2 v2.6.3 github.com/minio/minio-go/v7 v7.0.73 github.com/panjf2000/ants/v2 v2.11.3 // indirect - github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 + github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 // indirect github.com/prometheus/client_golang v1.20.5 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.55.0 diff --git a/internal/coordinator/restful_mgr_routes.go b/internal/coordinator/restful_mgr_routes.go index 17e937a7cd..b2806a443d 100644 --- a/internal/coordinator/restful_mgr_routes.go +++ b/internal/coordinator/restful_mgr_routes.go @@ -9,7 +9,6 @@ import ( "sync" "github.com/cockroachdb/errors" - "github.com/pingcap/log" "github.com/samber/lo" "go.uber.org/zap" @@ -18,6 +17,7 @@ import ( "github.com/milvus-io/milvus/internal/distributed/streaming" management "github.com/milvus-io/milvus/internal/http" "github.com/milvus-io/milvus/internal/json" + "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/querypb" "github.com/milvus-io/milvus/pkg/v2/util/commonpbutil" diff --git a/internal/flushcommon/metacache/bm25_stats.go b/internal/flushcommon/metacache/bm25_stats.go index 97c36db824..877fa3ffe2 100644 --- a/internal/flushcommon/metacache/bm25_stats.go +++ b/internal/flushcommon/metacache/bm25_stats.go @@ -19,10 +19,10 @@ package metacache import ( "sync" - "github.com/pingcap/log" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/v2/log" ) type SegmentBM25Stats struct { diff --git a/internal/querycoordv2/meta/channel_dist_manager.go b/internal/querycoordv2/meta/channel_dist_manager.go index 0701e3c267..84856777c8 100644 --- a/internal/querycoordv2/meta/channel_dist_manager.go +++ b/internal/querycoordv2/meta/channel_dist_manager.go @@ -20,10 +20,12 @@ import ( "sync" "github.com/samber/lo" + "go.uber.org/zap" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/util/metrics" + "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/querypb" "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" @@ -345,11 +347,16 @@ func (m *ChannelDistManager) updateCollectionIndex() { func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica) *DmChannel { m.rwmutex.RLock() defer m.rwmutex.RUnlock() - + logger := log.With(zap.String("Scope", "ChannelDistManager"), zap.String("channelName", channelName), + zap.Int64("replicaID", replica.GetID())) channels := m.collectionIndex[replica.GetCollectionID()] var candidates *DmChannel - for _, channel := range channels { + for chIdx, channel := range channels { + logger := logger.With(zap.Int("channelIdx", chIdx)) + logger.Debug("process", zap.Int64("channelID", channel.Node), zap.Int64("channelVersion", channel.Version), + zap.String("channel name", channel.GetChannelName()), + zap.Bool("replicaContains", replica.Contains(channel.Node))) if channel.GetChannelName() == channelName && replica.Contains(channel.Node) { if candidates == nil { candidates = channel @@ -360,13 +367,23 @@ func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica candidateIsStreamingNode := m.checkIfStreamingNode(candidates.Node) channelIsStreamingNode := m.checkIfStreamingNode(channel.Node) + logger.Debug("check whether stream node is serviceable", + zap.Bool("candidatesServiceable", candidatesServiceable), + zap.Bool("channelServiceable", channelServiceable), + zap.Bool("candidateIsStreamingNode", candidateIsStreamingNode), + zap.Bool("channelIsStreamingNode", channelIsStreamingNode)) + if channelIsStreamingNode && !candidateIsStreamingNode { // When upgrading from 2.5 to 2.6, the delegator leader may not locate at streaming node. // We always use the streaming node as the delegator leader to avoid the delete data lost when loading segment. + logger.Debug("set delegator on stream node to candidate shard leader", zap.Int64("node", channel.Node), + zap.Int64("channel version", channel.Version)) candidates = channel } else if !channelIsStreamingNode && candidateIsStreamingNode { // When downgrading from 2.6 to 2.5, the delegator leader may locate at non-streaming node. // We always use the non-streaming node as the delegator leader to avoid the delete data lost when loading segment. + logger.Debug("found delegator which is not on stream node", zap.Int64("node", channel.Node), + zap.Int64("channel version", channel.Version)) continue } else { updateNeeded := false @@ -374,19 +391,28 @@ func (m *ChannelDistManager) GetShardLeader(channelName string, replica *Replica case !candidatesServiceable && channelServiceable: // Current candidate is not serviceable but new channel is updateNeeded = true + logger.Debug("set serviceable delegator to candidate shard leader", zap.Int64("node", channel.Node), + zap.Int64("channel version", channel.Version)) case candidatesServiceable == channelServiceable && channel.Version > candidates.Version: // Same service status but higher version updateNeeded = true + logger.Debug("set serviceable delegator with larger version to candidate shard leader", zap.Int64("node", channel.Node), + zap.Int64("channel version", channel.Version), zap.Int64("candidate version", candidates.Version)) } - if updateNeeded { candidates = channel + } else { + logger.Debug("not set any channel to candidates in this round") } } } } } - + if candidates != nil { + logger.Debug("final", zap.Any("candidates", candidates), + zap.Int64("candidates version", candidates.Version), + zap.Int64("candidates node", candidates.Node)) + } return candidates } diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index ef3f774f6a..32e032f171 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -884,6 +884,11 @@ func (scheduler *taskScheduler) preProcess(task Task) bool { // wait for new delegator becomes leader, then try to remove old leader task := task.(*ChannelTask) delegator := scheduler.distMgr.ChannelDistManager.GetShardLeader(task.Shard(), task.replica) + log.Ctx(scheduler.ctx).Debug("process channelAction", zap.Bool("delegator is Nil", delegator == nil)) + if delegator != nil { + log.Ctx(scheduler.ctx).Debug("process channelAction", zap.Int64("delegator node", delegator.Node), + zap.Int64("action node", action.Node())) + } newDelegatorReady = delegator != nil && delegator.Node == action.Node() default: newDelegatorReady = true @@ -895,6 +900,7 @@ func (scheduler *taskScheduler) preProcess(task Task) bool { zap.Int64("collectionID", task.CollectionID()), zap.String("channelName", task.Shard()), zap.Int64("taskID", task.ID())) + break } } diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 6bd34e26ad..3d05b8cf56 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -26,7 +26,6 @@ import ( "time" "github.com/cockroachdb/errors" - "github.com/pingcap/log" "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -45,6 +44,7 @@ import ( "github.com/milvus-io/milvus/internal/util/function" "github.com/milvus-io/milvus/internal/util/initcore" "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" diff --git a/internal/util/bloomfilter/bloom_filter.go b/internal/util/bloomfilter/bloom_filter.go index 0c4dff36a4..b9e5c6e68c 100644 --- a/internal/util/bloomfilter/bloom_filter.go +++ b/internal/util/bloomfilter/bloom_filter.go @@ -19,11 +19,11 @@ import ( "github.com/bits-and-blooms/bloom/v3" "github.com/cockroachdb/errors" "github.com/greatroar/blobloom" - "github.com/pingcap/log" "github.com/zeebo/xxh3" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/json" + "github.com/milvus-io/milvus/pkg/v2/log" ) type BFType int