diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 481e3ed006..656e737db4 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -902,7 +902,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade }, nil } - leaders, err := utils.GetShardLeaders(s.meta, s.targetMgr, s.dist, s.nodeMgr, req.GetCollectionID()) + leaders, err := utils.GetShardLeaders(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr, req.GetCollectionID()) return &querypb.GetShardLeadersResponse{ Status: merr.Status(err), Shards: leaders, @@ -919,7 +919,7 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque return componentutil.CheckHealthRespWithErrMsg(errReasons...), nil } - if err := utils.CheckCollectionsQueryable(s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil { + if err := utils.CheckCollectionsQueryable(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil { return componentutil.CheckHealthRespWithErr(err), nil } diff --git a/internal/querycoordv2/utils/meta_test.go b/internal/querycoordv2/utils/meta_test.go index 70a385cc61..d5412b0e69 100644 --- a/internal/querycoordv2/utils/meta_test.go +++ b/internal/querycoordv2/utils/meta_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" etcdKV "github.com/milvus-io/milvus/internal/kv/etcd" @@ -39,7 +40,7 @@ import ( func TestSpawnReplicasWithRG(t *testing.T) { paramtable.Init() config := GenerateEtcdConfig() - cli, _ := etcd.GetEtcdClient( + cli, err := etcd.GetEtcdClient( config.UseEmbedEtcd.GetAsBool(), config.EtcdUseSSL.GetAsBool(), config.Endpoints.GetAsStrings(), @@ -47,6 +48,7 @@ func TestSpawnReplicasWithRG(t *testing.T) { config.EtcdTLSKey.GetValue(), config.EtcdTLSCACert.GetValue(), config.EtcdTLSMinVersion.GetValue()) + require.NoError(t, err) kv := etcdKV.NewEtcdKV(cli, config.MetaRootPath.GetValue()) store := querycoord.NewCatalog(kv) diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index e22a04d7b0..0c8ed726c2 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -86,11 +86,11 @@ func CheckLeaderAvailable(nodeMgr *session.NodeManager, targetMgr meta.TargetMan return nil } -func checkLoadStatus(m *meta.Meta, collectionID int64) error { +func checkLoadStatus(ctx context.Context, m *meta.Meta, collectionID int64) error { percentage := m.CollectionManager.CalculateLoadPercentage(collectionID) if percentage < 0 { err := merr.WrapErrCollectionNotLoaded(collectionID) - log.Warn("failed to GetShardLeaders", zap.Error(err)) + log.Ctx(ctx).Warn("failed to GetShardLeaders", zap.Error(err)) return err } collection := m.CollectionManager.GetCollection(collectionID) @@ -102,7 +102,7 @@ func checkLoadStatus(m *meta.Meta, collectionID int64) error { if percentage < 100 { err := merr.WrapErrCollectionNotFullyLoaded(collectionID) msg := fmt.Sprintf("collection %v is not fully loaded", collectionID) - log.Warn(msg) + log.Ctx(ctx).Warn(msg) return err } return nil @@ -169,8 +169,8 @@ func GetShardLeadersWithChannels(m *meta.Meta, targetMgr meta.TargetManagerInter return ret, nil } -func GetShardLeaders(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) { - if err := checkLoadStatus(m, collectionID); err != nil { +func GetShardLeaders(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) { + if err := checkLoadStatus(ctx, m, collectionID); err != nil { return nil, err } @@ -178,22 +178,27 @@ func GetShardLeaders(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist * if len(channels) == 0 { msg := "loaded collection do not found any channel in target, may be in recovery" err := merr.WrapErrCollectionOnRecovering(collectionID, msg) - log.Warn("failed to get channels", zap.Error(err)) + log.Ctx(ctx).Warn("failed to get channels", zap.Error(err)) return nil, err } return GetShardLeadersWithChannels(m, targetMgr, dist, nodeMgr, collectionID, channels) } // CheckCollectionsQueryable check all channels are watched and all segments are loaded for this collection -func CheckCollectionsQueryable(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error { +func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error { maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute) for _, coll := range m.GetAllCollections() { - err := checkCollectionQueryable(m, targetMgr, dist, nodeMgr, coll) + err := checkCollectionQueryable(ctx, m, targetMgr, dist, nodeMgr, coll) // the collection is not queryable, if meet following conditions: // 1. Some segments are not loaded // 2. Collection is not starting to release // 3. The load percentage has not been updated in the last 5 minutes. if err != nil && m.Exist(coll.CollectionID) && time.Since(coll.UpdatedAt) >= maxInterval { + log.Ctx(ctx).Warn("collection not querable", + zap.Int64("collectionID", coll.CollectionID), + zap.Time("lastUpdated", coll.UpdatedAt), + zap.Duration("maxInterval", maxInterval), + zap.Error(err)) return err } } @@ -201,9 +206,9 @@ func CheckCollectionsQueryable(m *meta.Meta, targetMgr meta.TargetManagerInterfa } // checkCollectionQueryable check all channels are watched and all segments are loaded for this collection -func checkCollectionQueryable(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager, coll *meta.Collection) error { +func checkCollectionQueryable(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager, coll *meta.Collection) error { collectionID := coll.GetCollectionID() - if err := checkLoadStatus(m, collectionID); err != nil { + if err := checkLoadStatus(ctx, m, collectionID); err != nil { return err } @@ -211,7 +216,7 @@ func checkCollectionQueryable(m *meta.Meta, targetMgr meta.TargetManagerInterfac if len(channels) == 0 { msg := "loaded collection do not found any channel in target, may be in recovery" err := merr.WrapErrCollectionOnRecovering(collectionID, msg) - log.Warn("failed to get channels", zap.Error(err)) + log.Ctx(ctx).Warn("failed to get channels", zap.Error(err)) return err }