From c1ff0cec8fa97f5d0554775f5becdf411908d3e7 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Tue, 26 Apr 2022 11:29:54 +0800 Subject: [PATCH] Change QueryCoord Log level (#16590) Signed-off-by: xiaofan-luan --- internal/querycoord/channel_unsubscribe.go | 10 +- internal/querycoord/cluster.go | 36 +++--- internal/querycoord/global_meta_broker.go | 16 +-- internal/querycoord/impl.go | 96 +++++++++------- internal/querycoord/index_checker.go | 13 +-- internal/querycoord/meta.go | 15 +-- internal/querycoord/query_coord.go | 46 ++++---- internal/querycoord/querynode.go | 4 +- internal/querycoord/segment_allocator.go | 10 +- internal/querycoord/task.go | 126 ++++++++++----------- internal/querycoord/task_scheduler.go | 36 +++--- 11 files changed, 205 insertions(+), 203 deletions(-) diff --git a/internal/querycoord/channel_unsubscribe.go b/internal/querycoord/channel_unsubscribe.go index eb47dee421..1001bcdfb7 100644 --- a/internal/querycoord/channel_unsubscribe.go +++ b/internal/querycoord/channel_unsubscribe.go @@ -75,7 +75,7 @@ func newChannelUnsubscribeHandler(ctx context.Context, kv *etcdkv.EtcdKV, factor // reloadFromKV reload unsolved channels to unsubscribe func (csh *channelUnsubscribeHandler) reloadFromKV() error { - log.Debug("start reload unsubscribe channelInfo from kv") + log.Info("start reload unsubscribe channelInfo from kv") _, channelInfoValues, err := csh.kvClient.LoadWithPrefix(unsubscribeChannelInfoPrefix) if err != nil { return err @@ -116,7 +116,7 @@ func (csh *channelUnsubscribeHandler) addUnsubscribeChannelInfo(info *querypb.Un } csh.channelInfos.PushBack(info) csh.downNodeChan <- info.NodeID - log.Debug("add unsubscribeChannelInfo to handler", zap.Int64("nodeID", info.NodeID)) + log.Info("add unsubscribeChannelInfo to handler", zap.Int64("nodeID", info.NodeID)) } } @@ -126,7 +126,7 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() { for { select { case <-csh.ctx.Done(): - log.Debug("channelUnsubscribeHandler ctx done, handleChannelUnsubscribeLoop end") + log.Info("channelUnsubscribeHandler ctx done, handleChannelUnsubscribeLoop end") return case <-csh.downNodeChan: channelInfo := csh.channelInfos.Front().Value.(*querypb.UnsubscribeChannelInfo) @@ -136,7 +136,7 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() { subName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, nodeID) err := unsubscribeChannels(csh.ctx, csh.factory, subName, collectionChannels.Channels) if err != nil { - log.Debug("unsubscribe channels failed", zap.Int64("nodeID", nodeID)) + log.Error("unsubscribe channels failed", zap.Int64("nodeID", nodeID)) panic(err) } } @@ -147,7 +147,7 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() { log.Error("remove unsubscribe channelInfo from etcd failed", zap.Int64("nodeID", nodeID)) panic(err) } - log.Debug("unsubscribe channels success", zap.Int64("nodeID", nodeID)) + log.Info("unsubscribe channels success", zap.Int64("nodeID", nodeID)) } } } diff --git a/internal/querycoord/cluster.go b/internal/querycoord/cluster.go index 489546c670..cb4484f960 100644 --- a/internal/querycoord/cluster.go +++ b/internal/querycoord/cluster.go @@ -146,10 +146,10 @@ func (c *queryNodeCluster) reloadFromKV() error { onlineSessionMap[nodeID] = session } for nodeID, session := range onlineSessionMap { - log.Debug("reloadFromKV: register a queryNode to cluster", zap.Any("nodeID", nodeID)) + log.Info("reloadFromKV: register a queryNode to cluster", zap.Any("nodeID", nodeID)) err := c.registerNode(c.ctx, session, nodeID, disConnect) if err != nil { - log.Error("QueryNode failed to register", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error())) + log.Warn("QueryNode failed to register", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error())) return err } toLoadMetaNodeIDs = append(toLoadMetaNodeIDs, nodeID) @@ -159,25 +159,25 @@ func (c *queryNodeCluster) reloadFromKV() error { // load node information before power off from etcd oldStringNodeIDs, oldNodeSessions, err := c.client.LoadWithPrefix(queryNodeInfoPrefix) if err != nil { - log.Error("reloadFromKV: get previous node info from etcd error", zap.Error(err)) + log.Warn("reloadFromKV: get previous node info from etcd error", zap.Error(err)) return err } for index := range oldStringNodeIDs { nodeID, err := strconv.ParseInt(filepath.Base(oldStringNodeIDs[index]), 10, 64) if err != nil { - log.Error("watchNodeLoop: parse nodeID error", zap.Error(err)) + log.Warn("watchNodeLoop: parse nodeID error", zap.Error(err)) return err } if _, ok := onlineSessionMap[nodeID]; !ok { session := &sessionutil.Session{} err = json.Unmarshal([]byte(oldNodeSessions[index]), session) if err != nil { - log.Error("watchNodeLoop: unmarshal session error", zap.Error(err)) + log.Warn("watchNodeLoop: unmarshal session error", zap.Error(err)) return err } err = c.registerNode(context.Background(), session, nodeID, offline) if err != nil { - log.Debug("reloadFromKV: failed to add queryNode to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error())) + log.Warn("reloadFromKV: failed to add queryNode to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error())) return err } toLoadMetaNodeIDs = append(toLoadMetaNodeIDs, nodeID) @@ -214,7 +214,7 @@ func (c *queryNodeCluster) loadSegments(ctx context.Context, nodeID int64, in *q if targetNode != nil { err := targetNode.loadSegments(ctx, in) if err != nil { - log.Debug("loadSegments: queryNode load segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error())) + log.Warn("loadSegments: queryNode load segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error())) return err } @@ -238,7 +238,7 @@ func (c *queryNodeCluster) releaseSegments(ctx context.Context, nodeID int64, in err := targetNode.releaseSegments(ctx, in) if err != nil { - log.Debug("releaseSegments: queryNode release segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error())) + log.Warn("releaseSegments: queryNode release segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error())) return err } @@ -259,7 +259,6 @@ func (c *queryNodeCluster) watchDmChannels(ctx context.Context, nodeID int64, in if targetNode != nil { err := targetNode.watchDmChannels(ctx, in) if err != nil { - log.Debug("watchDmChannels: queryNode watch dm channel error", zap.String("error", err.Error())) return err } dmChannelWatchInfo := make([]*querypb.DmChannelWatchInfo, len(in.Infos)) @@ -274,7 +273,7 @@ func (c *queryNodeCluster) watchDmChannels(ctx context.Context, nodeID int64, in err = c.clusterMeta.setDmChannelInfos(dmChannelWatchInfo) if err != nil { - log.Debug("watchDmChannels: update dmChannelWatchInfos to meta failed", zap.String("error", err.Error())) + // TODO DML channel maybe leaked, need to release dml if no related segment return err } @@ -294,7 +293,6 @@ func (c *queryNodeCluster) watchDeltaChannels(ctx context.Context, nodeID int64, if targetNode != nil { err := targetNode.watchDeltaChannels(ctx, in) if err != nil { - log.Debug("watchDeltaChannels: queryNode watch delta channel error", zap.String("error", err.Error())) return err } @@ -334,7 +332,6 @@ func (c *queryNodeCluster) addQueryChannel(ctx context.Context, nodeID int64, in } msgPosition, err := c.clusterMeta.sendSealedSegmentChangeInfos(in.CollectionID, in.QueryChannel, emptyChangeInfo) if err != nil { - log.Error("addQueryChannel: get latest messageID of query channel error", zap.String("queryChannel", in.QueryChannel), zap.Error(err)) return err } @@ -342,7 +339,6 @@ func (c *queryNodeCluster) addQueryChannel(ctx context.Context, nodeID int64, in in.SeekPosition = msgPosition err = targetNode.addQueryChannel(ctx, in) if err != nil { - log.Error("addQueryChannel: queryNode add query channel error", zap.String("queryChannel", in.QueryChannel), zap.Error(err)) return err } return nil @@ -361,7 +357,7 @@ func (c *queryNodeCluster) removeQueryChannel(ctx context.Context, nodeID int64, if targetNode != nil { err := targetNode.removeQueryChannel(ctx, in) if err != nil { - log.Debug("removeQueryChannel: queryNode remove query channel error", zap.String("error", err.Error())) + log.Warn("removeQueryChannel: queryNode remove query channel error", zap.String("error", err.Error())) return err } @@ -382,7 +378,6 @@ func (c *queryNodeCluster) releaseCollection(ctx context.Context, nodeID int64, if targetNode != nil { err := targetNode.releaseCollection(ctx, in) if err != nil { - log.Debug("releaseCollection: queryNode release collection error", zap.String("error", err.Error())) return err } @@ -403,7 +398,6 @@ func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64, if targetNode != nil { err := targetNode.releasePartitions(ctx, in) if err != nil { - log.Debug("releasePartitions: queryNode release partitions error", zap.String("error", err.Error())) return err } @@ -561,7 +555,7 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti if _, ok := c.nodes[id]; !ok { sessionJSON, err := json.Marshal(session) if err != nil { - log.Debug("registerNode: marshal session error", zap.Int64("nodeID", id), zap.Any("address", session)) + log.Warn("registerNode: marshal session error", zap.Int64("nodeID", id), zap.Any("address", session)) return err } key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, id) @@ -571,7 +565,7 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti } node, err := c.newNodeFn(ctx, session.Address, id, c.client) if err != nil { - log.Debug("registerNode: create a new QueryNode failed", zap.Int64("nodeID", id), zap.Error(err)) + log.Warn("registerNode: create a new QueryNode failed", zap.Int64("nodeID", id), zap.Error(err)) return err } c.setNodeState(id, node, state) @@ -580,7 +574,7 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti } c.nodes[id] = node metrics.QueryCoordNumQueryNodes.WithLabelValues().Inc() - log.Debug("registerNode: create a new QueryNode", zap.Int64("nodeID", id), zap.String("address", session.Address), zap.Any("state", state)) + log.Info("registerNode: create a new QueryNode", zap.Int64("nodeID", id), zap.String("address", session.Address), zap.Any("state", state)) return nil } return fmt.Errorf("registerNode: QueryNode %d alredy exists in cluster", id) @@ -613,7 +607,7 @@ func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error { delete(c.nodes, nodeID) metrics.QueryCoordNumQueryNodes.WithLabelValues().Dec() - log.Debug("removeNodeInfo: delete nodeInfo in cluster MetaReplica", zap.Int64("nodeID", nodeID)) + log.Info("removeNodeInfo: delete nodeInfo in cluster MetaReplica", zap.Int64("nodeID", nodeID)) return nil } @@ -625,7 +619,7 @@ func (c *queryNodeCluster) stopNode(nodeID int64) { if node, ok := c.nodes[nodeID]; ok { node.stop() c.setNodeState(nodeID, node, offline) - log.Debug("stopNode: queryNode offline", zap.Int64("nodeID", nodeID)) + log.Info("stopNode: queryNode offline", zap.Int64("nodeID", nodeID)) } } diff --git a/internal/querycoord/global_meta_broker.go b/internal/querycoord/global_meta_broker.go index b8b4fbb6c6..30db0d0df6 100644 --- a/internal/querycoord/global_meta_broker.go +++ b/internal/querycoord/global_meta_broker.go @@ -68,7 +68,7 @@ func (broker *globalMetaBroker) releaseDQLMessageStream(ctx context.Context, col log.Error("releaseDQLMessageStream occur error", zap.Int64("collectionID", collectionID), zap.Error(err)) return err } - log.Debug("releaseDQLMessageStream successfully", zap.Int64("collectionID", collectionID)) + log.Info("releaseDQLMessageStream successfully", zap.Int64("collectionID", collectionID)) return nil } @@ -93,7 +93,7 @@ func (broker *globalMetaBroker) showPartitionIDs(ctx context.Context, collection log.Error("showPartition failed", zap.Int64("collectionID", collectionID), zap.Error(err)) return nil, err } - log.Debug("show partition successfully", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", showPartitionResponse.PartitionIDs)) + log.Info("show partition successfully", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", showPartitionResponse.PartitionIDs)) return showPartitionResponse.PartitionIDs, nil } @@ -119,7 +119,7 @@ func (broker *globalMetaBroker) getRecoveryInfo(ctx context.Context, collectionI log.Error("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err)) return nil, nil, err } - log.Debug("get recovery info successfully", + log.Info("get recovery info successfully", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int("num channels", len(recoveryInfo.Channels)), @@ -156,14 +156,14 @@ func (broker *globalMetaBroker) getIndexBuildID(ctx context.Context, collectionI } if !response.EnableIndex { - log.Debug("describe segment from rootCoord successfully", + log.Info("describe segment from rootCoord successfully", zap.Int64("collectionID", collectionID), zap.Int64("segmentID", segmentID), zap.Bool("enableIndex", false)) return false, 0, nil } - log.Debug("describe segment from rootCoord successfully", + log.Info("describe segment from rootCoord successfully", zap.Int64("collectionID", collectionID), zap.Int64("segmentID", segmentID), zap.Bool("enableIndex", true), @@ -190,7 +190,7 @@ func (broker *globalMetaBroker) getIndexFilePaths(ctx context.Context, buildID i log.Error(err.Error()) return nil, err } - log.Debug("get index info from indexCoord successfully", zap.Int64("buildID", buildID)) + log.Info("get index info from indexCoord successfully", zap.Int64("buildID", buildID)) return pathResponse.FilePaths, nil } @@ -266,7 +266,7 @@ func (broker *globalMetaBroker) parseIndexInfo(ctx context.Context, segmentID Un return err } - log.Debug("set index info success", zap.Int64("segmentID", segmentID), zap.Int64("fieldID", indexInfo.FieldID), zap.Int64("buildID", buildID)) + log.Info("set index info success", zap.Int64("segmentID", segmentID), zap.Int64("fieldID", indexInfo.FieldID), zap.Int64("buildID", buildID)) return nil } @@ -317,7 +317,7 @@ func (broker *globalMetaBroker) describeSegments(ctx context.Context, collection return nil, err } - log.Debug("describe segments successfully", + log.Info("describe segments successfully", zap.Int64("collection", collectionID), zap.Int64s("segments", segmentIDs)) diff --git a/internal/querycoord/impl.go b/internal/querycoord/impl.go index aeab357984..abdee8f2f7 100644 --- a/internal/querycoord/impl.go +++ b/internal/querycoord/impl.go @@ -90,7 +90,7 @@ func (qc *QueryCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin // ShowCollections return all the collections that have been loaded func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) { - log.Debug("show collection start", + log.Info("show collection start", zap.String("role", typeutil.QueryCoordRole), zap.Int64s("collectionIDs", req.CollectionIDs), zap.Int64("msgID", req.Base.MsgID)) @@ -118,7 +118,7 @@ func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowColl for _, id := range inMemoryCollectionIDs { inMemoryPercentages = append(inMemoryPercentages, ID2collectionInfo[id].InMemoryPercentage) } - log.Debug("show collection end", + log.Info("show collection end", zap.String("role", typeutil.QueryCoordRole), zap.Int64s("collections", inMemoryCollectionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages), @@ -145,7 +145,7 @@ func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowColl } inMemoryPercentages = append(inMemoryPercentages, ID2collectionInfo[id].InMemoryPercentage) } - log.Debug("show collection end", + log.Info("show collection end", zap.String("role", typeutil.QueryCoordRole), zap.Int64s("collections", req.CollectionIDs), zap.Int64("msgID", req.Base.MsgID), @@ -163,7 +163,7 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle collectionID := req.CollectionID //schema := req.Schema - log.Debug("loadCollectionRequest received", + log.Info("loadCollectionRequest received", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", collectionID), zap.Int64("msgID", req.Base.MsgID)) @@ -184,7 +184,7 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle if collectionInfo, err := qc.meta.getCollectionInfoByID(collectionID); err == nil { // if collection has been loaded by load collection request, return success if collectionInfo.LoadType == querypb.LoadType_LoadCollection { - log.Debug("collection has already been loaded, return load success directly", + log.Info("collection has already been loaded, return load success directly", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", collectionID), zap.Int64("msgID", req.Base.MsgID)) @@ -247,7 +247,7 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle return status, nil } - log.Debug("loadCollectionRequest completed", + log.Info("loadCollectionRequest completed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", collectionID), zap.Int64("msgID", req.Base.MsgID)) @@ -260,7 +260,7 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas metrics.QueryCoordReleaseCount.WithLabelValues(metrics.TotalLabel).Inc() //dbID := req.DbID collectionID := req.CollectionID - log.Debug("releaseCollectionRequest received", + log.Info("releaseCollectionRequest received", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", collectionID), zap.Int64("msgID", req.Base.MsgID)) @@ -281,7 +281,7 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas // if collection has not been loaded into memory, return release collection successfully hasCollection := qc.meta.hasCollection(collectionID) if !hasCollection { - log.Debug("release collection end, the collection has not been loaded into QueryNode", + log.Info("release collection end, the collection has not been loaded into QueryNode", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", collectionID), zap.Int64("msgID", req.Base.MsgID)) @@ -326,7 +326,7 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas return status, nil } - log.Debug("releaseCollectionRequest completed", + log.Info("releaseCollectionRequest completed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", collectionID), zap.Int64("msgID", req.Base.MsgID)) @@ -341,7 +341,7 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas // ShowPartitions return all the partitions that have been loaded func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) { collectionID := req.CollectionID - log.Debug("show partitions start", + log.Info("show partitions start", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", req.PartitionIDs), @@ -383,7 +383,7 @@ func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowParti for _, id := range inMemoryPartitionIDs { inMemoryPercentages = append(inMemoryPercentages, ID2PartitionState[id].InMemoryPercentage) } - log.Debug("show partitions end", + log.Info("show partitions end", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", collectionID), zap.Int64("msgID", req.Base.MsgID), @@ -413,7 +413,7 @@ func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowParti inMemoryPercentages = append(inMemoryPercentages, ID2PartitionState[id].InMemoryPercentage) } - log.Debug("show partitions end", + log.Info("show partitions end", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", req.PartitionIDs), @@ -433,7 +433,7 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti collectionID := req.CollectionID partitionIDs := req.PartitionIDs - log.Debug("loadPartitionRequest received", + log.Info("loadPartitionRequest received", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), @@ -509,7 +509,7 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti return status, nil } - log.Debug("loadPartitionRequest completed, all partitions to load have already been loaded into memory", + log.Info("loadPartitionRequest completed, all partitions to load have already been loaded into memory", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs), @@ -557,7 +557,7 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti return status, nil } - log.Debug("loadPartitionRequest completed", + log.Info("loadPartitionRequest completed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs), @@ -573,7 +573,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas //dbID := req.DbID collectionID := req.CollectionID partitionIDs := req.PartitionIDs - log.Debug("releasePartitionRequest received", + log.Info("releasePartitionRequest received", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs), @@ -586,7 +586,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas status.ErrorCode = commonpb.ErrorCode_UnexpectedError err := errors.New("QueryCoord is not healthy") status.Reason = err.Error() - log.Error("release partition failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + log.Warn("release partition failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc() return status, nil @@ -639,7 +639,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas } } } else { - log.Debug("release partitions end, the collection has not been loaded into QueryNode", + log.Info("release partitions end, the collection has not been loaded into QueryNode", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", req.CollectionID), zap.Int64("msgID", req.Base.MsgID)) @@ -649,7 +649,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas } if len(toReleasedPartitions) == 0 { - log.Debug("release partitions end, the partitions has not been loaded into QueryNode", + log.Info("release partitions end, the partitions has not been loaded into QueryNode", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs), @@ -663,7 +663,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_GrpcRequest) if releaseCollection { // if all loaded partitions will be released from memory, then upgrade release partitions request to release collection request - log.Debug(fmt.Sprintf("all partitions of collection %d will released from QueryNode, so release the collection directly", collectionID), + log.Info(fmt.Sprintf("all partitions of collection %d will released from QueryNode, so release the collection directly", collectionID), zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID)) msgBase := req.Base @@ -690,7 +690,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas } err := qc.scheduler.Enqueue(releaseTask) if err != nil { - log.Error("releasePartitionRequest failed to add execute task to scheduler", + log.Warn("releasePartitionRequest failed to add execute task to scheduler", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), @@ -705,7 +705,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas err = releaseTask.waitToFinish() if err != nil { - log.Error("releasePartitionRequest failed", + log.Warn("releasePartitionRequest failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs), @@ -718,7 +718,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas return status, nil } - log.Debug("releasePartitionRequest completed", + log.Info("releasePartitionRequest completed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), @@ -767,7 +767,7 @@ func (qc *QueryCoord) CreateQueryChannel(ctx context.Context, req *querypb.Creat // GetPartitionStates returns state of the partition, including notExist, notPresent, onDisk, partitionInMemory, inMemory, partitionInGPU, InGPU func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) { - log.Debug("getPartitionStatesRequest received", + log.Info("getPartitionStatesRequest received", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", req.PartitionIDs), @@ -780,7 +780,7 @@ func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPa status.ErrorCode = commonpb.ErrorCode_UnexpectedError err := errors.New("QueryCoord is not healthy") status.Reason = err.Error() - log.Error("getPartitionStates failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + log.Warn("getPartitionStates failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) return &querypb.GetPartitionStatesResponse{ Status: status, }, nil @@ -810,7 +810,7 @@ func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPa } partitionStates = append(partitionStates, partitionState) } - log.Debug("getPartitionStatesRequest completed", + log.Info("getPartitionStatesRequest completed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", req.PartitionIDs), @@ -823,7 +823,7 @@ func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPa // GetSegmentInfo returns information of all the segments on queryNodes, and the information includes memSize, numRow, indexName, indexID ... func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) { - log.Debug("getSegmentInfoRequest received", + log.Info("getSegmentInfoRequest received", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", req.CollectionID), zap.Int64s("segmentIDs", req.SegmentIDs), @@ -836,7 +836,7 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen status.ErrorCode = commonpb.ErrorCode_UnexpectedError err := errors.New("QueryCoord is not healthy") status.Reason = err.Error() - log.Error("getSegmentInfo failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + log.Warn("getSegmentInfo failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) return &querypb.GetSegmentInfoResponse{ Status: status, }, nil @@ -865,7 +865,7 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen totalNumRows += info.NumRows totalMemSize += info.MemSize } - log.Debug("getSegmentInfoRequest completed", + log.Info("getSegmentInfoRequest completed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", req.CollectionID), zap.Int64("msgID", req.Base.MsgID), @@ -879,7 +879,7 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen // LoadBalance would do a load balancing operation between query nodes func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceRequest) (*commonpb.Status, error) { - log.Debug("loadBalanceRequest received", + log.Info("loadBalanceRequest received", zap.String("role", typeutil.QueryCoordRole), zap.Int64s("source nodeIDs", req.SourceNodeIDs), zap.Int64s("dst nodeIDs", req.DstNodeIDs), @@ -894,7 +894,7 @@ func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceR status.ErrorCode = commonpb.ErrorCode_UnexpectedError err := errors.New("QueryCoord is not healthy") status.Reason = err.Error() - log.Error("loadBalance failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + log.Warn("loadBalance failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) return status, nil } @@ -909,7 +909,7 @@ func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceR } err := qc.scheduler.Enqueue(loadBalanceTask) if err != nil { - log.Error("loadBalanceRequest failed to add execute task to scheduler", + log.Warn("loadBalanceRequest failed to add execute task to scheduler", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) @@ -920,13 +920,13 @@ func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceR err = loadBalanceTask.waitToFinish() if err != nil { - log.Error("loadBalanceRequest failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + log.Warn("loadBalanceRequest failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) status.ErrorCode = commonpb.ErrorCode_UnexpectedError status.Reason = err.Error() return status, nil } - log.Debug("loadBalanceRequest completed", + log.Info("loadBalanceRequest completed", zap.String("role", typeutil.QueryCoordRole), zap.Int64s("source nodeIDs", req.SourceNodeIDs), zap.Int64s("dst nodeIDs", req.DstNodeIDs), @@ -954,14 +954,14 @@ func (qc *QueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe if qc.stateCode.Load() != internalpb.StateCode_Healthy { err := errors.New("QueryCoord is not healthy") getMetricsResponse.Status.Reason = err.Error() - log.Error("getMetrics failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + log.Warn("getMetrics failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) return getMetricsResponse, nil } metricType, err := metricsinfo.ParseMetricType(req.Request) if err != nil { getMetricsResponse.Status.Reason = err.Error() - log.Error("getMetrics failed to parse metric type", + log.Warn("getMetrics failed to parse metric type", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) @@ -1009,7 +1009,7 @@ func (qc *QueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe err = errors.New(metricsinfo.MsgUnimplementedMetric) getMetricsResponse.Status.Reason = err.Error() - log.Error("getMetrics failed", + log.Warn("getMetrics failed", zap.String("role", typeutil.QueryCoordRole), zap.String("req", req.Request), zap.Int64("msgID", req.Base.MsgID), @@ -1020,7 +1020,7 @@ func (qc *QueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe // GetReplicas gets replicas of a certain collection func (qc *QueryCoord) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) { - log.Debug("GetReplicas received", + log.Info("GetReplicas received", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", req.CollectionID), zap.Int64("msgID", req.Base.MsgID)) @@ -1033,7 +1033,7 @@ func (qc *QueryCoord) GetReplicas(ctx context.Context, req *milvuspb.GetReplicas status.ErrorCode = commonpb.ErrorCode_UnexpectedError err := errors.New("QueryCoord is not healthy") status.Reason = err.Error() - log.Error("GetReplicasResponse failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + log.Warn("GetReplicasResponse failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) return &milvuspb.GetReplicasResponse{ Status: status, }, nil @@ -1043,7 +1043,7 @@ func (qc *QueryCoord) GetReplicas(ctx context.Context, req *milvuspb.GetReplicas if err != nil { status.ErrorCode = commonpb.ErrorCode_MetaFailed status.Reason = err.Error() - log.Error("GetReplicasResponse failed to get replicas", + log.Warn("GetReplicasResponse failed to get replicas", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", req.CollectionID), zap.Int64("msgID", req.Base.MsgID), @@ -1083,6 +1083,11 @@ func (qc *QueryCoord) GetReplicas(ctx context.Context, req *milvuspb.GetReplicas } } + log.Info("GetReplicas finished", + zap.String("role", typeutil.QueryCoordRole), + zap.Int64("collectionID", req.CollectionID), + zap.Any("replicas", replicas)) + return &milvuspb.GetReplicasResponse{ Status: status, Replicas: replicas, @@ -1091,7 +1096,7 @@ func (qc *QueryCoord) GetReplicas(ctx context.Context, req *milvuspb.GetReplicas // GetShardLeaders gets shard leaders of a certain collection func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) { - log.Debug("GetShardLeaders received", + log.Info("GetShardLeaders received", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", req.CollectionID), zap.Int64("msgID", req.Base.MsgID)) @@ -1104,7 +1109,7 @@ func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShard status.ErrorCode = commonpb.ErrorCode_UnexpectedError err := errors.New("QueryCoord is not healthy") status.Reason = err.Error() - log.Error("GetShardLeadersResponse failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + log.Warn("GetShardLeadersResponse failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) return &querypb.GetShardLeadersResponse{ Status: status, }, nil @@ -1114,7 +1119,7 @@ func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShard if err != nil { status.ErrorCode = commonpb.ErrorCode_MetaFailed status.Reason = err.Error() - log.Error("GetShardLeadersResponse failed to get replicas", + log.Warn("GetShardLeadersResponse failed to get replicas", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", req.CollectionID), zap.Int64("msgID", req.Base.MsgID), @@ -1148,6 +1153,11 @@ func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShard shardLeaderLists = append(shardLeaderLists, shard) } + log.Info("GetShardLeaders finished", + zap.String("role", typeutil.QueryCoordRole), + zap.Int64("collectionID", req.CollectionID), + zap.Any("replicas", shardLeaderLists)) + return &querypb.GetShardLeadersResponse{ Status: status, Shards: shardLeaderLists, diff --git a/internal/querycoord/index_checker.go b/internal/querycoord/index_checker.go index 1e526d5873..e90ab4a761 100644 --- a/internal/querycoord/index_checker.go +++ b/internal/querycoord/index_checker.go @@ -122,7 +122,7 @@ func (ic *IndexChecker) reloadFromKV() error { // in case handoffReqChan is full, and block start process go ic.enqueueHandoffReq(segmentInfo) } else { - log.Debug("reloadFromKV: collection/partition has not been loaded, remove req from etcd", zap.Any("segmentInfo", segmentInfo)) + log.Info("reloadFromKV: collection/partition has not been loaded, remove req from etcd", zap.Any("segmentInfo", segmentInfo)) buildQuerySegmentPath := fmt.Sprintf("%s/%d/%d/%d", handoffSegmentPrefix, segmentInfo.CollectionID, segmentInfo.PartitionID, segmentInfo.SegmentID) err = ic.client.Remove(buildQuerySegmentPath) if err != nil { @@ -130,7 +130,7 @@ func (ic *IndexChecker) reloadFromKV() error { return err } } - log.Debug("reloadFromKV: process handoff request done", zap.Any("segmentInfo", segmentInfo)) + log.Info("reloadFromKV: process handoff request done", zap.Any("segmentInfo", segmentInfo)) } return nil @@ -207,7 +207,7 @@ func (ic *IndexChecker) checkIndexLoop() { continue } - log.Debug("checkIndexLoop: segment has been compacted and dropped before handoff", zap.Int64("segmentID", segmentInfo.SegmentID)) + log.Info("checkIndexLoop: segment has been compacted and dropped before handoff", zap.Int64("segmentID", segmentInfo.SegmentID)) } buildQuerySegmentPath := fmt.Sprintf("%s/%d/%d/%d", handoffSegmentPrefix, segmentInfo.CollectionID, segmentInfo.PartitionID, segmentInfo.SegmentID) @@ -220,8 +220,7 @@ func (ic *IndexChecker) checkIndexLoop() { } case segmentInfo := <-ic.unIndexedSegmentsChan: //TODO:: check index after load collection/partition, some segments may don't has index when loading - log.Debug("checkIndexLoop: start check index for segment which has not loaded index", zap.Int64("segmentID", segmentInfo.SegmentID)) - + log.Warn("checkIndexLoop: start check index for segment which has not loaded index", zap.Int64("segmentID", segmentInfo.SegmentID)) } } } @@ -237,7 +236,7 @@ func (ic *IndexChecker) processHandoffAfterIndexDone() { collectionID := segmentInfo.CollectionID partitionID := segmentInfo.PartitionID segmentID := segmentInfo.SegmentID - log.Debug("processHandoffAfterIndexDone: handoff segment start", zap.Any("segmentInfo", segmentInfo)) + log.Info("processHandoffAfterIndexDone: handoff segment start", zap.Any("segmentInfo", segmentInfo)) baseTask := newBaseTask(ic.ctx, querypb.TriggerCondition_Handoff) handoffReq := &querypb.HandoffSegmentsRequest{ Base: &commonpb.MsgBase{ @@ -265,7 +264,7 @@ func (ic *IndexChecker) processHandoffAfterIndexDone() { log.Warn("processHandoffAfterIndexDone: handoffTask failed", zap.Error(err)) } - log.Debug("processHandoffAfterIndexDone: handoffTask completed", zap.Any("segment infos", handoffTask.SegmentInfos)) + log.Info("processHandoffAfterIndexDone: handoffTask completed", zap.Any("segment infos", handoffTask.SegmentInfos)) }() // once task enqueue, etcd data can be cleaned, handoffTask will recover from taskScheduler's reloadFromKV() diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index e9a4fe29a4..05d14d729a 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -166,8 +166,6 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory dependency.Factory, idAl } func (m *MetaReplica) reloadFromKV() error { - log.Debug("start reload from kv") - log.Info("recovery collections...") collectionKeys, collectionValues, err := m.getKvClient().LoadWithPrefix(collectionMetaPrefix) if err != nil { @@ -288,7 +286,7 @@ func (m *MetaReplica) reloadFromKV() error { } //TODO::update partition states - log.Debug("reload from kv finished") + log.Info("reload from kv finished") return nil } @@ -484,7 +482,7 @@ func (m *MetaReplica) addPartitions(collectionID UniqueID, partitionIDs []Unique collectionInfo.PartitionStates = newPartitionStates collectionInfo.ReleasedPartitionIDs = newReleasedPartitionIDs - log.Debug("add a partition to MetaReplica", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", collectionInfo.PartitionIDs)) + log.Info("add a partition to MetaReplica", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", collectionInfo.PartitionIDs)) err := saveGlobalCollectionInfo(collectionID, collectionInfo, m.getKvClient()) if err != nil { log.Error("save collectionInfo error", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", collectionInfo.PartitionIDs), zap.Any("error", err.Error())) @@ -824,7 +822,7 @@ func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, queryC return nil, fmt.Errorf("sendSealedSegmentChangeInfos: length of the positions in stream is not correct, collectionID = %d, query channel = %s, len = %d", collectionID, queryChannel, len(messageIDs)) } - log.Debug("updateGlobalSealedSegmentInfos: send sealed segment change info to queryChannel", zap.Any("msgPack", msgPack)) + log.Info("updateGlobalSealedSegmentInfos: send sealed segment change info to queryChannel", zap.Any("msgPack", msgPack)) return &internalpb.MsgPosition{ ChannelName: queryChannel, MsgID: messageIDs[0].Serialize(), @@ -926,7 +924,6 @@ func (m *MetaReplica) setDmChannelInfos(dmChannelWatchInfos []*querypb.DmChannel err := saveDmChannelWatchInfos(dmChannelWatchInfos, m.getKvClient()) if err != nil { - log.Error("save dmChannelWatchInfo error", zap.Any("error", err.Error())) return err } for _, channelInfo := range dmChannelWatchInfos { @@ -943,7 +940,7 @@ func (m *MetaReplica) createQueryChannel(collectionID UniqueID) *querypb.QueryCh allocatedQueryChannel := fmt.Sprintf("%s-0", Params.CommonCfg.QueryCoordSearch) allocatedQueryResultChannel := fmt.Sprintf("%s-0", Params.CommonCfg.QueryCoordSearchResult) - log.Debug("query coordinator is creating query channel", + log.Info("query coordinator is creating query channel", zap.String("query channel name", allocatedQueryChannel), zap.String("query result channel name", allocatedQueryResultChannel)) @@ -987,7 +984,7 @@ func (m *MetaReplica) setDeltaChannel(collectionID UniqueID, infos []*datapb.Vch log.Error("save delta channel info error", zap.Int64("collectionID", collectionID), zap.Error(err)) return err } - log.Debug("save delta channel infos to meta", zap.Any("collectionID", collectionID)) + log.Info("save delta channel infos to meta", zap.Any("collectionID", collectionID)) m.deltaChannelInfos[collectionID] = infos return nil } @@ -1025,7 +1022,7 @@ func (m *MetaReplica) getQueryStreamByID(collectionID UniqueID, queryChannel str queryStream.AsProducer([]string{queryChannel}) m.queryStreams[collectionID] = queryStream - log.Debug("getQueryStreamByID: create query msgStream for collection", zap.Int64("collectionID", collectionID)) + log.Info("getQueryStreamByID: create query msgStream for collection", zap.Int64("collectionID", collectionID)) } return queryStream, nil diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 5846ab9ecc..031644bc38 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -136,7 +136,7 @@ func (qc *QueryCoord) initSession() error { // Init function initializes the queryCoord's meta, cluster, etcdKV and task scheduler func (qc *QueryCoord) Init() error { - log.Debug("query coordinator start init, session info", zap.String("metaPath", Params.EtcdCfg.MetaRootPath), zap.String("address", Params.QueryCoordCfg.Address)) + log.Info("query coordinator start init, session info", zap.String("metaPath", Params.EtcdCfg.MetaRootPath), zap.String("address", Params.QueryCoordCfg.Address)) var initError error qc.initOnce.Do(func() { err := qc.initSession() @@ -145,7 +145,6 @@ func (qc *QueryCoord) Init() error { initError = err return } - log.Debug("queryCoord try to connect etcd") etcdKV := etcdkv.NewEtcdKV(qc.etcdCli, Params.EtcdCfg.MetaRootPath) qc.kvClient = etcdKV log.Debug("query coordinator try to connect etcd success") @@ -155,7 +154,7 @@ func (qc *QueryCoord) Init() error { idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", idAllocatorKV) initError = idAllocator.Initialize() if initError != nil { - log.Debug("query coordinator idAllocator initialize failed", zap.Error(initError)) + log.Error("query coordinator idAllocator initialize failed", zap.Error(initError)) return } qc.idAllocator = func() (UniqueID, error) { @@ -220,20 +219,20 @@ func (qc *QueryCoord) Init() error { qc.metricsCacheManager = metricsinfo.NewMetricsCacheManager() }) - log.Debug("QueryCoord init success") + log.Info("QueryCoord init success") return initError } // Start function starts the goroutines to watch the meta and node updates func (qc *QueryCoord) Start() error { qc.scheduler.Start() - log.Debug("start scheduler ...") + log.Info("start scheduler ...") qc.indexChecker.start() - log.Debug("start index checker ...") + log.Info("start index checker ...") qc.handler.start() - log.Debug("start channel unsubscribe loop ...") + log.Info("start channel unsubscribe loop ...") Params.QueryCoordCfg.CreatedTime = time.Now() Params.QueryCoordCfg.UpdatedTime = time.Now() @@ -260,17 +259,17 @@ func (qc *QueryCoord) Stop() error { if qc.scheduler != nil { qc.scheduler.Close() - log.Debug("close scheduler ...") + log.Info("close scheduler ...") } if qc.indexChecker != nil { qc.indexChecker.close() - log.Debug("close index checker ...") + log.Info("close index checker ...") } if qc.handler != nil { qc.handler.close() - log.Debug("close channel unsubscribe loop ...") + log.Info("close channel unsubscribe loop ...") } if qc.loopCancel != nil { @@ -278,6 +277,7 @@ func (qc *QueryCoord) Stop() error { log.Info("cancel the loop of QueryCoord") } + log.Warn("Query Coord stopped successfully...") qc.loopWg.Wait() qc.session.Revoke(time.Second) return nil @@ -342,7 +342,7 @@ func (qc *QueryCoord) watchNodeLoop() { ctx, cancel := context.WithCancel(qc.loopCtx) defer cancel() defer qc.loopWg.Done() - log.Debug("QueryCoord start watch node loop") + log.Info("QueryCoord start watch node loop") unallocatedNodes := qc.getUnallocatedNodes() for _, n := range unallocatedNodes { @@ -372,7 +372,7 @@ func (qc *QueryCoord) watchNodeLoop() { } //TODO::deal enqueue error qc.scheduler.Enqueue(loadBalanceTask) - log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask)) + log.Info("start a loadBalance task", zap.Any("task", loadBalanceTask)) } // TODO silverxia add Rewatch logic @@ -464,7 +464,7 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) { switch event.EventType { case sessionutil.SessionAddEvent: serverID := event.Session.ServerID - log.Debug("start add a QueryNode to cluster", zap.Any("nodeID", serverID)) + log.Info("start add a QueryNode to cluster", zap.Any("nodeID", serverID)) err := qc.cluster.registerNode(ctx, event.Session, serverID, disConnect) if err != nil { log.Error("QueryCoord failed to register a QueryNode", zap.Int64("nodeID", serverID), zap.String("error info", err.Error())) @@ -476,7 +476,7 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) { qc.metricsCacheManager.InvalidateSystemInfoMetrics() case sessionutil.SessionDelEvent: serverID := event.Session.ServerID - log.Debug("get a del event after QueryNode down", zap.Int64("nodeID", serverID)) + log.Info("get a del event after QueryNode down", zap.Int64("nodeID", serverID)) nodeExist := qc.cluster.hasNode(serverID) if !nodeExist { log.Error("QueryNode not exist", zap.Int64("nodeID", serverID)) @@ -504,7 +504,7 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) { qc.metricsCacheManager.InvalidateSystemInfoMetrics() //TODO:: deal enqueue error qc.scheduler.Enqueue(loadBalanceTask) - log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask)) + log.Info("start a loadBalance task", zap.Any("task", loadBalanceTask)) } } } @@ -515,7 +515,7 @@ func (qc *QueryCoord) watchHandoffSegmentLoop() { defer cancel() defer qc.loopWg.Done() - log.Debug("QueryCoord start watch segment loop") + log.Info("QueryCoord start watch segment loop") watchChan := qc.kvClient.WatchWithRevision(handoffSegmentPrefix, qc.indexChecker.revision+1) @@ -536,9 +536,9 @@ func (qc *QueryCoord) watchHandoffSegmentLoop() { validHandoffReq, _ := qc.indexChecker.verifyHandoffReqValid(segmentInfo) if Params.QueryCoordCfg.AutoHandoff && validHandoffReq { qc.indexChecker.enqueueHandoffReq(segmentInfo) - log.Debug("watchHandoffSegmentLoop: enqueue a handoff request to index checker", zap.Any("segment info", segmentInfo)) + log.Info("watchHandoffSegmentLoop: enqueue a handoff request to index checker", zap.Any("segment info", segmentInfo)) } else { - log.Debug("watchHandoffSegmentLoop: collection/partition has not been loaded or autoHandoff equal to false, remove req from etcd", zap.Any("segmentInfo", segmentInfo)) + log.Info("watchHandoffSegmentLoop: collection/partition has not been loaded or autoHandoff equal to false, remove req from etcd", zap.Any("segmentInfo", segmentInfo)) buildQuerySegmentPath := fmt.Sprintf("%s/%d/%d/%d", handoffSegmentPrefix, segmentInfo.CollectionID, segmentInfo.PartitionID, segmentInfo.SegmentID) err = qc.kvClient.Remove(buildQuerySegmentPath) if err != nil { @@ -558,7 +558,7 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() { ctx, cancel := context.WithCancel(qc.loopCtx) defer cancel() defer qc.loopWg.Done() - log.Debug("QueryCoord start load balance segment loop") + log.Info("QueryCoord start load balance segment loop") timer := time.NewTicker(time.Duration(Params.QueryCoordCfg.BalanceIntervalSeconds) * time.Second) @@ -625,7 +625,7 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() { nodeID2SegmentInfos[nodeID] = leastSegmentInfos } } - log.Debug("loadBalanceSegmentLoop: memory usage rate of all online QueryNode", zap.Any("mem rate", nodeID2MemUsageRate)) + log.Info("loadBalanceSegmentLoop: memory usage rate of all online QueryNode", zap.Any("mem rate", nodeID2MemUsageRate)) if len(availableNodeIDs) <= 1 { log.Warn("loadBalanceSegmentLoop: there are too few available query nodes to balance", zap.Int64s("onlineNodeIDs", onlineNodeIDs), zap.Int64s("availableNodeIDs", availableNodeIDs)) continue @@ -695,7 +695,7 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() { } for _, t := range loadBalanceTasks { qc.scheduler.Enqueue(t) - log.Debug("loadBalanceSegmentLoop: enqueue a loadBalance task", zap.Any("task", t)) + log.Info("loadBalanceSegmentLoop: enqueue a loadBalance task", zap.Any("task", t)) err := t.waitToFinish() if err != nil { // if failed, wait for next balance loop @@ -703,10 +703,10 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() { // it also may be other abnormal errors log.Error("loadBalanceSegmentLoop: balance task execute failed", zap.Any("task", t)) } else { - log.Debug("loadBalanceSegmentLoop: balance task execute success", zap.Any("task", t)) + log.Info("loadBalanceSegmentLoop: balance task execute success", zap.Any("task", t)) } } - log.Debug("loadBalanceSegmentLoop: load balance Done in this loop", zap.Any("tasks", loadBalanceTasks)) + log.Info("loadBalanceSegmentLoop: load balance Done in this loop", zap.Any("tasks", loadBalanceTasks)) } } } diff --git a/internal/querycoord/querynode.go b/internal/querycoord/querynode.go index 77cd95b1fc..ebc7b56324 100644 --- a/internal/querycoord/querynode.go +++ b/internal/querycoord/querynode.go @@ -127,7 +127,7 @@ func (qn *queryNode) start() error { qn.state = online } qn.stateLock.Unlock() - log.Debug("start: queryNode client start success", zap.Int64("nodeID", qn.id), zap.String("address", qn.address)) + log.Info("start: queryNode client start success", zap.Int64("nodeID", qn.id), zap.String("address", qn.address)) return nil } @@ -295,7 +295,7 @@ func (qn *queryNode) removeQueryChannel(ctx context.Context, in *querypb.RemoveQ func (qn *queryNode) releaseCollection(ctx context.Context, in *querypb.ReleaseCollectionRequest) error { if !qn.isOnline() { - log.Debug("ReleaseCollection: the QueryNode has been offline, the release request is no longer needed", zap.Int64("nodeID", qn.id)) + log.Warn("ReleaseCollection: the QueryNode has been offline, the release request is no longer needed", zap.Int64("nodeID", qn.id)) return nil } diff --git a/internal/querycoord/segment_allocator.go b/internal/querycoord/segment_allocator.go index d9c1ba5502..ec93b4f814 100644 --- a/internal/querycoord/segment_allocator.go +++ b/internal/querycoord/segment_allocator.go @@ -74,7 +74,7 @@ func shuffleSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegment } if len(availableNodeIDs) > 0 { - log.Debug("shuffleSegmentsToQueryNode: shuffle segment to available QueryNode", zap.Int64s("available nodeIDs", availableNodeIDs)) + log.Info("shuffleSegmentsToQueryNode: shuffle segment to available QueryNode", zap.Int64s("available nodeIDs", availableNodeIDs)) for _, req := range reqs { sort.Slice(availableNodeIDs, func(i, j int) bool { return nodeID2NumSegment[availableNodeIDs[i]] < nodeID2NumSegment[availableNodeIDs[j]] @@ -109,7 +109,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme dataSizePerReq[offset] = reqSize } - log.Debug("shuffleSegmentsToQueryNodeV2: get the segment size of loadReqs end", zap.Int64s("segment size of reqs", dataSizePerReq)) + log.Info("shuffleSegmentsToQueryNodeV2: get the segment size of loadReqs end", zap.Int64s("segment size of reqs", dataSizePerReq)) for { // online nodes map and totalMem, usedMem, memUsage of every node totalMem := make(map[int64]uint64) @@ -151,7 +151,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme queryNodeInfo := nodeInfo.(*queryNode) // avoid allocate segment to node which memUsageRate is high if queryNodeInfo.memUsageRate >= Params.QueryCoordCfg.OverloadedMemoryThresholdPercentage { - log.Debug("shuffleSegmentsToQueryNodeV2: queryNode memUsageRate large than MaxMemUsagePerNode", zap.Int64("nodeID", nodeID), zap.Float64("current rate", queryNodeInfo.memUsageRate)) + log.Info("shuffleSegmentsToQueryNodeV2: queryNode memUsageRate large than MaxMemUsagePerNode", zap.Int64("nodeID", nodeID), zap.Float64("current rate", queryNodeInfo.memUsageRate)) continue } @@ -160,7 +160,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme availableNodeIDs = append(availableNodeIDs, nodeID) } if len(availableNodeIDs) > 0 { - log.Debug("shuffleSegmentsToQueryNodeV2: shuffle segment to available QueryNode", zap.Int64s("available nodeIDs", availableNodeIDs)) + log.Info("shuffleSegmentsToQueryNodeV2: shuffle segment to available QueryNode", zap.Int64s("available nodeIDs", availableNodeIDs)) memoryInsufficient := false for offset, sizeOfReq := range dataSizePerReq { // sort nodes by memUsageRate, low to high @@ -190,7 +190,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme // shuffle segment success if !memoryInsufficient { - log.Debug("shuffleSegmentsToQueryNodeV2: shuffle segment to query node success") + log.Info("shuffleSegmentsToQueryNodeV2: shuffle segment to query node success") return nil } diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 7bd3080d37..e1be829110 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -363,7 +363,7 @@ func (lct *loadCollectionTask) preExecute(ctx context.Context) error { collectionID := lct.CollectionID schema := lct.Schema lct.setResultInfo(nil) - log.Debug("start do loadCollectionTask", + log.Info("start do loadCollectionTask", zap.Int64("msgID", lct.getTaskID()), zap.Int64("collectionID", collectionID), zap.Stringer("schema", schema), @@ -381,7 +381,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { lct.setResultInfo(err) return err } - log.Debug("loadCollectionTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIds), zap.Int64("msgID", lct.Base.MsgID)) + log.Info("loadCollectionTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIds), zap.Int64("msgID", lct.Base.MsgID)) var ( replicas = make([]*milvuspb.ReplicaInfo, lct.ReplicaNumber) @@ -524,10 +524,10 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { DmChannelName: task.WatchDmChannelsRequest.Infos[0].ChannelName, }) } - log.Debug("loadCollectionTask: add a childTask", zap.Int64("collectionID", collectionID), zap.Int32("task type", int32(internalTask.msgType())), zap.Int64("msgID", lct.Base.MsgID)) + log.Info("loadCollectionTask: add a childTask", zap.Int64("collectionID", collectionID), zap.Int32("task type", int32(internalTask.msgType())), zap.Int64("msgID", lct.Base.MsgID)) } metrics.QueryCoordNumChildTasks.WithLabelValues().Add(float64(len(internalTasks))) - log.Debug("loadCollectionTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lct.Base.MsgID)) + log.Info("loadCollectionTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lct.Base.MsgID)) } err = lct.meta.addCollection(collectionID, querypb.LoadType_LoadCollection, lct.Schema) @@ -553,7 +553,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { } } - log.Debug("LoadCollection execute done", + log.Info("LoadCollection execute done", zap.Int64("msgID", lct.getTaskID()), zap.Int64("collectionID", collectionID)) return nil @@ -570,7 +570,7 @@ func (lct *loadCollectionTask) postExecute(ctx context.Context) error { } } - log.Debug("loadCollectionTask postExecute done", + log.Info("loadCollectionTask postExecute done", zap.Int64("msgID", lct.getTaskID()), zap.Int64("collectionID", collectionID)) return nil @@ -605,7 +605,7 @@ func (lct *loadCollectionTask) rollBack(ctx context.Context) []task { panic(err) } - log.Debug("loadCollectionTask: generate rollBack task for loadCollectionTask", zap.Int64("collectionID", lct.CollectionID), zap.Int64("msgID", lct.Base.MsgID)) + log.Info("loadCollectionTask: generate rollBack task for loadCollectionTask", zap.Int64("collectionID", lct.CollectionID), zap.Int64("msgID", lct.Base.MsgID)) return resultTasks } @@ -650,7 +650,7 @@ func (rct *releaseCollectionTask) updateTaskProcess() { func (rct *releaseCollectionTask) preExecute(context.Context) error { collectionID := rct.CollectionID rct.setResultInfo(nil) - log.Debug("start do releaseCollectionTask", + log.Info("start do releaseCollectionTask", zap.Int64("msgID", rct.getTaskID()), zap.Int64("collectionID", collectionID)) return nil @@ -686,7 +686,7 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error { } rct.addChildTask(releaseCollectionTask) - log.Debug("releaseCollectionTask: add a releaseCollectionTask to releaseCollectionTask's childTask", zap.Any("task", releaseCollectionTask)) + log.Info("releaseCollectionTask: add a releaseCollectionTask to releaseCollectionTask's childTask", zap.Any("task", releaseCollectionTask)) } } else { err := rct.cluster.releaseCollection(ctx, rct.NodeID, rct.ReleaseCollectionRequest) @@ -698,7 +698,7 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error { } } - log.Debug("releaseCollectionTask Execute done", + log.Info("releaseCollectionTask Execute done", zap.Int64("msgID", rct.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64("nodeID", rct.NodeID)) @@ -711,7 +711,7 @@ func (rct *releaseCollectionTask) postExecute(context.Context) error { rct.clearChildTasks() } - log.Debug("releaseCollectionTask postExecute done", + log.Info("releaseCollectionTask postExecute done", zap.Int64("msgID", rct.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64("nodeID", rct.NodeID)) @@ -805,7 +805,7 @@ func (lpt *loadPartitionTask) preExecute(context.Context) error { collectionID := lpt.CollectionID lpt.setResultInfo(nil) - log.Debug("start do loadPartitionTask", + log.Info("start do loadPartitionTask", zap.Int64("msgID", lpt.getTaskID()), zap.Int64("collectionID", collectionID)) return nil @@ -954,10 +954,10 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error { DmChannelName: task.WatchDmChannelsRequest.Infos[0].ChannelName, }) } - log.Debug("loadPartitionTask: add a childTask", zap.Int64("collectionID", collectionID), zap.Int32("task type", int32(internalTask.msgType()))) + log.Info("loadPartitionTask: add a childTask", zap.Int64("collectionID", collectionID), zap.Int32("task type", int32(internalTask.msgType()))) } metrics.QueryCoordNumChildTasks.WithLabelValues().Add(float64(len(internalTasks))) - log.Debug("loadPartitionTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Int64("msgID", lpt.Base.MsgID)) + log.Info("loadPartitionTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Int64("msgID", lpt.Base.MsgID)) } err = lpt.meta.addCollection(collectionID, querypb.LoadType_LoadPartition, lpt.Schema) @@ -983,7 +983,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error { } } - log.Debug("loadPartitionTask Execute done", + log.Info("loadPartitionTask Execute done", zap.Int64("msgID", lpt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), @@ -1003,7 +1003,7 @@ func (lpt *loadPartitionTask) postExecute(ctx context.Context) error { } } - log.Debug("loadPartitionTask postExecute done", + log.Info("loadPartitionTask postExecute done", zap.Int64("msgID", lpt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) @@ -1042,7 +1042,7 @@ func (lpt *loadPartitionTask) rollBack(ctx context.Context) []task { log.Error("loadPartitionTask: release collection info from meta failed", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lpt.Base.MsgID), zap.Error(err)) panic(err) } - log.Debug("loadPartitionTask: generate rollBack task for loadPartitionTask", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lpt.Base.MsgID)) + log.Info("loadPartitionTask: generate rollBack task for loadPartitionTask", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lpt.Base.MsgID)) return resultTasks } @@ -1089,7 +1089,7 @@ func (rpt *releasePartitionTask) preExecute(context.Context) error { collectionID := rpt.CollectionID partitionIDs := rpt.PartitionIDs rpt.setResultInfo(nil) - log.Debug("start do releasePartitionTask", + log.Info("start do releasePartitionTask", zap.Int64("msgID", rpt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) @@ -1117,7 +1117,7 @@ func (rpt *releasePartitionTask) execute(ctx context.Context) error { meta: rpt.meta, } rpt.addChildTask(releasePartitionTask) - log.Debug("releasePartitionTask: add a releasePartitionTask to releasePartitionTask's childTask", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rpt.Base.MsgID)) + log.Info("releasePartitionTask: add a releasePartitionTask to releasePartitionTask's childTask", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rpt.Base.MsgID)) } } else { err := rpt.cluster.releasePartitions(ctx, rpt.NodeID, rpt.ReleasePartitionsRequest) @@ -1129,7 +1129,7 @@ func (rpt *releasePartitionTask) execute(ctx context.Context) error { } } - log.Debug("releasePartitionTask Execute done", + log.Info("releasePartitionTask Execute done", zap.Int64("msgID", rpt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), @@ -1144,7 +1144,7 @@ func (rpt *releasePartitionTask) postExecute(context.Context) error { rpt.clearChildTasks() } - log.Debug("releasePartitionTask postExecute done", + log.Info("releasePartitionTask postExecute done", zap.Int64("msgID", rpt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), @@ -1207,7 +1207,7 @@ func (lst *loadSegmentTask) preExecute(context.Context) error { segmentIDs = append(segmentIDs, info.SegmentID) } lst.setResultInfo(nil) - log.Debug("start do loadSegmentTask", + log.Info("start do loadSegmentTask", zap.Int64s("segmentIDs", segmentIDs), zap.Int64("loaded nodeID", lst.DstNodeID), zap.Int64("taskID", lst.getTaskID())) @@ -1224,13 +1224,13 @@ func (lst *loadSegmentTask) execute(ctx context.Context) error { return err } - log.Debug("loadSegmentTask Execute done", + log.Info("loadSegmentTask Execute done", zap.Int64("taskID", lst.getTaskID())) return nil } func (lst *loadSegmentTask) postExecute(context.Context) error { - log.Debug("loadSegmentTask postExecute done", + log.Info("loadSegmentTask postExecute done", zap.Int64("taskID", lst.getTaskID())) return nil } @@ -1306,7 +1306,7 @@ func (rst *releaseSegmentTask) timestamp() Timestamp { func (rst *releaseSegmentTask) preExecute(context.Context) error { segmentIDs := rst.SegmentIDs rst.setResultInfo(nil) - log.Debug("start do releaseSegmentTask", + log.Info("start do releaseSegmentTask", zap.Int64s("segmentIDs", segmentIDs), zap.Int64("loaded nodeID", rst.NodeID), zap.Int64("taskID", rst.getTaskID())) @@ -1323,7 +1323,7 @@ func (rst *releaseSegmentTask) execute(ctx context.Context) error { return err } - log.Debug("releaseSegmentTask Execute done", + log.Info("releaseSegmentTask Execute done", zap.Int64s("segmentIDs", rst.SegmentIDs), zap.Int64("taskID", rst.getTaskID())) return nil @@ -1331,7 +1331,7 @@ func (rst *releaseSegmentTask) execute(ctx context.Context) error { func (rst *releaseSegmentTask) postExecute(context.Context) error { segmentIDs := rst.SegmentIDs - log.Debug("releaseSegmentTask postExecute done", + log.Info("releaseSegmentTask postExecute done", zap.Int64s("segmentIDs", segmentIDs), zap.Int64("taskID", rst.getTaskID())) return nil @@ -1385,7 +1385,7 @@ func (wdt *watchDmChannelTask) preExecute(context.Context) error { channels = append(channels, info.ChannelName) } wdt.setResultInfo(nil) - log.Debug("start do watchDmChannelTask", + log.Info("start do watchDmChannelTask", zap.Strings("dmChannels", channels), zap.Int64("loaded nodeID", wdt.NodeID), zap.Int64("taskID", wdt.getTaskID())) @@ -1402,13 +1402,13 @@ func (wdt *watchDmChannelTask) execute(ctx context.Context) error { return err } - log.Debug("watchDmChannelsTask Execute done", + log.Info("watchDmChannelsTask Execute done", zap.Int64("taskID", wdt.getTaskID())) return nil } func (wdt *watchDmChannelTask) postExecute(context.Context) error { - log.Debug("watchDmChannelTask postExecute done", + log.Info("watchDmChannelTask postExecute done", zap.Int64("taskID", wdt.getTaskID())) return nil } @@ -1500,7 +1500,7 @@ func (wdt *watchDeltaChannelTask) preExecute(context.Context) error { channels = append(channels, info.ChannelName) } wdt.setResultInfo(nil) - log.Debug("start do watchDeltaChannelTask", + log.Info("start do watchDeltaChannelTask", zap.Strings("deltaChannels", channels), zap.Int64("loaded nodeID", wdt.NodeID), zap.Int64("taskID", wdt.getTaskID())) @@ -1517,13 +1517,13 @@ func (wdt *watchDeltaChannelTask) execute(ctx context.Context) error { return err } - log.Debug("watchDeltaChannelsTask Execute done", + log.Info("watchDeltaChannelsTask Execute done", zap.Int64("taskID", wdt.getTaskID())) return nil } func (wdt *watchDeltaChannelTask) postExecute(context.Context) error { - log.Debug("watchDeltaChannelTask postExecute done", + log.Info("watchDeltaChannelTask postExecute done", zap.Int64("taskID", wdt.getTaskID())) return nil } @@ -1570,7 +1570,7 @@ func (wqt *watchQueryChannelTask) updateTaskProcess() { func (wqt *watchQueryChannelTask) preExecute(context.Context) error { wqt.setResultInfo(nil) - log.Debug("start do watchQueryChannelTask", + log.Info("start do watchQueryChannelTask", zap.Int64("collectionID", wqt.CollectionID), zap.String("queryChannel", wqt.QueryChannel), zap.String("queryResultChannel", wqt.QueryResultChannel), @@ -1584,12 +1584,15 @@ func (wqt *watchQueryChannelTask) execute(ctx context.Context) error { err := wqt.cluster.addQueryChannel(wqt.ctx, wqt.NodeID, wqt.AddQueryChannelRequest) if err != nil { - log.Warn("watchQueryChannelTask: watchQueryChannel occur error", zap.Int64("taskID", wqt.getTaskID()), zap.Error(err)) + log.Warn("watchQueryChannelTask: watchQueryChannel occur error", + zap.Int64("taskID", wqt.getTaskID()), + zap.String("channel", wqt.AddQueryChannelRequest.QueryChannel), + zap.Error(err)) wqt.setResultInfo(err) return err } - log.Debug("watchQueryChannelTask Execute done", + log.Info("watchQueryChannelTask Execute done", zap.Int64("collectionID", wqt.CollectionID), zap.String("queryChannel", wqt.QueryChannel), zap.String("queryResultChannel", wqt.QueryResultChannel), @@ -1598,7 +1601,7 @@ func (wqt *watchQueryChannelTask) execute(ctx context.Context) error { } func (wqt *watchQueryChannelTask) postExecute(context.Context) error { - log.Debug("watchQueryChannelTask postExecute done", + log.Info("watchQueryChannelTask postExecute done", zap.Int64("collectionID", wqt.CollectionID), zap.String("queryChannel", wqt.QueryChannel), zap.String("queryResultChannel", wqt.QueryResultChannel), @@ -1638,7 +1641,7 @@ func (ht *handoffTask) preExecute(context.Context) error { for _, info := range segmentInfos { segmentIDs = append(segmentIDs, info.SegmentID) } - log.Debug("start do handoff segments task", + log.Info("start do handoff segments task", zap.Int64s("segmentIDs", segmentIDs)) return nil } @@ -1652,12 +1655,12 @@ func (ht *handoffTask) execute(ctx context.Context) error { collectionInfo, err := ht.meta.getCollectionInfoByID(collectionID) if err != nil { - log.Debug("handoffTask: collection has not been loaded into memory", zap.Int64("collectionID", collectionID), zap.Int64("segmentID", segmentID)) + log.Warn("handoffTask: collection has not been loaded into memory", zap.Int64("collectionID", collectionID), zap.Int64("segmentID", segmentID)) continue } if collectionInfo.LoadType == querypb.LoadType_LoadCollection && ht.meta.hasReleasePartition(collectionID, partitionID) { - log.Debug("handoffTask: partition has not been released", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID)) + log.Warn("handoffTask: partition has not been released", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID)) continue } @@ -1669,7 +1672,7 @@ func (ht *handoffTask) execute(ctx context.Context) error { } if collectionInfo.LoadType != querypb.LoadType_LoadCollection && !partitionLoaded { - log.Debug("handoffTask: partition has not been loaded into memory", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID)) + log.Warn("handoffTask: partition has not been loaded into memory", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID)) continue } @@ -1759,7 +1762,7 @@ func (ht *handoffTask) execute(ctx context.Context) error { } for _, internalTask := range internalTasks { ht.addChildTask(internalTask) - log.Debug("handoffTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Int64("segmentID", segmentID)) + log.Info("handoffTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Int64("segmentID", segmentID)) } } else { err = fmt.Errorf("sealed segment has been exist on query node, segmentID is %d", segmentID) @@ -1769,10 +1772,7 @@ func (ht *handoffTask) execute(ctx context.Context) error { } } - log.Debug("handoffTask: assign child task done", zap.Any("segmentInfos", segmentInfos)) - - log.Debug("handoffTask Execute done", - zap.Int64("taskID", ht.getTaskID())) + log.Info("handoffTask: assign child task done", zap.Any("segmentInfos", segmentInfos), zap.Int64("taskID", ht.getTaskID())) return nil } @@ -1781,9 +1781,7 @@ func (ht *handoffTask) postExecute(context.Context) error { ht.clearChildTasks() } - log.Debug("handoffTask postExecute done", - zap.Int64("taskID", ht.getTaskID())) - + log.Info("handoffTask postExecute done", zap.Int64("taskID", ht.getTaskID())) return nil } @@ -1826,7 +1824,7 @@ func (lbt *loadBalanceTask) timestamp() Timestamp { func (lbt *loadBalanceTask) preExecute(context.Context) error { lbt.setResultInfo(nil) - log.Debug("start do loadBalanceTask", + log.Info("start do loadBalanceTask", zap.Int32("trigger type", int32(lbt.triggerCondition)), zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs), zap.Any("balanceReason", lbt.BalanceReason), @@ -1887,7 +1885,7 @@ func (lbt *loadBalanceTask) checkForManualLoadBalance() error { lbt.replicaID = replicaID - log.Debug("start do loadBalanceTask", + log.Info("start do loadBalanceTask", zap.Int32("trigger type", int32(lbt.triggerCondition)), zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs), zap.Any("balanceReason", lbt.BalanceReason), @@ -1939,7 +1937,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { } else { toRecoverPartitionIDs = collectionInfo.PartitionIDs } - log.Debug("loadBalanceTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", toRecoverPartitionIDs)) + log.Info("loadBalanceTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", toRecoverPartitionIDs)) replica, err := lbt.getReplica(nodeID, collectionID) if err != nil { lbt.setResultInfo(err) @@ -2038,9 +2036,9 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { } for _, internalTask := range internalTasks { lbt.addChildTask(internalTask) - log.Debug("loadBalanceTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Any("task", internalTask)) + log.Info("loadBalanceTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Any("task", internalTask)) } - log.Debug("loadBalanceTask: assign child task done", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs)) + log.Info("loadBalanceTask: assign child task done", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs)) } if lbt.triggerCondition == querypb.TriggerCondition_LoadBalance { @@ -2173,12 +2171,12 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { } for _, internalTask := range internalTasks { lbt.addChildTask(internalTask) - log.Debug("loadBalanceTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Any("balance request", lbt.LoadBalanceRequest)) + log.Info("loadBalanceTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Any("balance request", lbt.LoadBalanceRequest)) } - log.Debug("loadBalanceTask: assign child task done", zap.Any("balance request", lbt.LoadBalanceRequest)) + log.Info("loadBalanceTask: assign child task done", zap.Any("balance request", lbt.LoadBalanceRequest)) } - log.Debug("loadBalanceTask Execute done", + log.Info("loadBalanceTask Execute done", zap.Int32("trigger type", int32(lbt.triggerCondition)), zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs), zap.Any("balanceReason", lbt.BalanceReason), @@ -2217,7 +2215,7 @@ func (lbt *loadBalanceTask) postExecute(context.Context) error { } } - log.Debug("loadBalanceTask postExecute done", + log.Info("loadBalanceTask postExecute done", zap.Int32("trigger type", int32(lbt.triggerCondition)), zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs), zap.Any("balanceReason", lbt.BalanceReason), @@ -2230,21 +2228,21 @@ func assignInternalTask(ctx context.Context, loadSegmentRequests []*querypb.LoadSegmentsRequest, watchDmChannelRequests []*querypb.WatchDmChannelsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) ([]task, error) { - log.Debug("assignInternalTask: start assign task to query node") + internalTasks := make([]task, 0) err := cluster.allocateSegmentsToQueryNode(ctx, loadSegmentRequests, wait, excludeNodeIDs, includeNodeIDs, replicaID) if err != nil { - log.Error("assignInternalTask: assign segment to node failed", zap.Any("load segments requests", loadSegmentRequests)) + log.Error("assignInternalTask: assign segment to node failed", zap.Error(err)) return nil, err } - log.Debug("assignInternalTask: assign segment to node success") + log.Info("assignInternalTask: assign segment to node success", zap.Int("load segments", len(loadSegmentRequests))) err = cluster.allocateChannelsToQueryNode(ctx, watchDmChannelRequests, wait, excludeNodeIDs, includeNodeIDs, replicaID) if err != nil { - log.Error("assignInternalTask: assign dmChannel to node failed", zap.Any("watch dmChannel requests", watchDmChannelRequests)) + log.Error("assignInternalTask: assign dmChannel to node failed", zap.Error(err)) return nil, err } - log.Debug("assignInternalTask: assign dmChannel to node success") + log.Info("assignInternalTask: assign dmChannel to node success", zap.Int("watch dmchannels", len(watchDmChannelRequests))) if len(loadSegmentRequests) > 0 { sort.Slice(loadSegmentRequests, func(i, j int) bool { @@ -2335,7 +2333,7 @@ func mergeWatchDeltaChannelInfo(infos []*datapb.VchannelInfo) []*datapb.Vchannel for _, index := range minPositions { result = append(result, infos[index]) } - log.Debug("merge delta channels finished", + log.Info("merge delta channels finished", zap.Any("origin info length", len(infos)), zap.Any("merged info length", len(result)), ) diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index 71356a07ce..7a88e45546 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "path/filepath" + "reflect" "strconv" "sync" @@ -110,7 +111,6 @@ func (queue *taskQueue) popTask() task { defer queue.Unlock() if queue.tasks.Len() <= 0 { - log.Warn("sorry, but the unissued task list is empty!") return nil } @@ -467,6 +467,7 @@ func (scheduler *TaskScheduler) Enqueue(t task) error { } func (scheduler *TaskScheduler) processTask(t task) error { + log.Info("begin to process task", zap.Int64("taskID", t.getTaskID()), zap.String("task", reflect.TypeOf(t).String())) var taskInfoKey string // assign taskID for childTask and update triggerTask's childTask to etcd updateKVFn := func(parentTask task) error { @@ -547,11 +548,13 @@ func (scheduler *TaskScheduler) processTask(t task) error { span.LogFields(oplog.Int64("processTask: scheduler process Execute", t.getTaskID())) err = t.execute(ctx) if err != nil { + log.Warn("failed to execute task", zap.Error(err)) trace.LogError(span, err) return err } err = updateKVFn(t) if err != nil { + log.Warn("failed to execute task", zap.Error(err)) trace.LogError(span, err) t.setResultInfo(err) return err @@ -619,12 +622,15 @@ func (scheduler *TaskScheduler) scheduleLoop() { return case <-scheduler.triggerTaskQueue.Chan(): triggerTask = scheduler.triggerTaskQueue.popTask() - log.Debug("scheduleLoop: pop a triggerTask from triggerTaskQueue", zap.Int64("triggerTaskID", triggerTask.getTaskID())) + if triggerTask == nil { + break + } + log.Info("scheduleLoop: pop a triggerTask from triggerTaskQueue", zap.Int64("triggerTaskID", triggerTask.getTaskID())) alreadyNotify := true if triggerTask.getState() == taskUndo || triggerTask.getState() == taskDoing { err = scheduler.processTask(triggerTask) if err != nil { - log.Debug("scheduleLoop: process triggerTask failed", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err)) + log.Warn("scheduleLoop: process triggerTask failed", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err)) alreadyNotify = false } } @@ -666,7 +672,7 @@ func (scheduler *TaskScheduler) scheduleLoop() { alreadyNotify = true } rollBackTasks := triggerTask.rollBack(scheduler.ctx) - log.Debug("scheduleLoop: start rollBack after triggerTask failed", + log.Info("scheduleLoop: start rollBack after triggerTask failed", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Any("rollBackTasks", rollBackTasks)) // there is no need to save rollBacked internal task to etcd @@ -681,7 +687,7 @@ func (scheduler *TaskScheduler) scheduleLoop() { log.Error("scheduleLoop: error when remove trigger and internal tasks from etcd", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err)) triggerTask.setResultInfo(err) } else { - log.Debug("scheduleLoop: trigger task done and delete from etcd", zap.Int64("triggerTaskID", triggerTask.getTaskID())) + log.Info("scheduleLoop: trigger task done and delete from etcd", zap.Int64("triggerTaskID", triggerTask.getTaskID())) } resultStatus := triggerTask.getResultInfo() @@ -707,7 +713,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task, var err error redoFunc1 := func() { if !t.isValid() || !t.isRetryable() { - log.Debug("waitActivateTaskDone: reSchedule the activate task", + log.Info("waitActivateTaskDone: reSchedule the activate task", zap.Int64("taskID", t.getTaskID()), zap.Int64("triggerTaskID", triggerTask.getTaskID())) reScheduledTasks, err := t.reschedule(scheduler.ctx) @@ -737,7 +743,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task, return } rt.setTaskID(id) - log.Debug("waitActivateTaskDone: reScheduler set id", zap.Int64("id", rt.getTaskID())) + log.Info("waitActivateTaskDone: reScheduler set id", zap.Int64("id", rt.getTaskID())) taskKey := fmt.Sprintf("%s/%d", activeTaskPrefix, rt.getTaskID()) blobs, err := rt.marshal() if err != nil { @@ -760,7 +766,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task, return } triggerTask.removeChildTaskByID(t.getTaskID()) - log.Debug("waitActivateTaskDone: delete failed active task and save reScheduled task to etcd", + log.Info("waitActivateTaskDone: delete failed active task and save reScheduled task to etcd", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Int64("failed taskID", t.getTaskID()), zap.Any("reScheduled tasks", reScheduledTasks)) @@ -768,7 +774,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task, for _, rt := range reScheduledTasks { if rt != nil { triggerTask.addChildTask(rt) - log.Debug("waitActivateTaskDone: add a reScheduled active task to activateChan", zap.Int64("taskID", rt.getTaskID())) + log.Info("waitActivateTaskDone: add a reScheduled active task to activateChan", zap.Int64("taskID", rt.getTaskID())) scheduler.activateTaskChan <- rt wg.Add(1) go scheduler.waitActivateTaskDone(wg, rt, triggerTask) @@ -776,7 +782,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task, } //delete task from etcd } else { - log.Debug("waitActivateTaskDone: retry the active task", + log.Info("waitActivateTaskDone: retry the active task", zap.Int64("taskID", t.getTaskID()), zap.Int64("triggerTaskID", triggerTask.getTaskID())) scheduler.activateTaskChan <- t @@ -794,7 +800,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task, triggerTask.setResultInfo(err) return } - log.Debug("waitActivateTaskDone: retry the active task", + log.Info("waitActivateTaskDone: retry the active task", zap.Int64("taskID", t.getTaskID()), zap.Int64("triggerTaskID", triggerTask.getTaskID())) scheduler.activateTaskChan <- t @@ -804,7 +810,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task, } err = t.waitToFinish() if err != nil { - log.Debug("waitActivateTaskDone: activate task return err", + log.Warn("waitActivateTaskDone: activate task return err", zap.Int64("taskID", t.getTaskID()), zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err)) @@ -828,7 +834,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task, //TODO:: case commonpb.MsgType_RemoveDmChannels: } } else { - log.Debug("waitActivateTaskDone: one activate task done", + log.Info("waitActivateTaskDone: one activate task done", zap.Int64("taskID", t.getTaskID()), zap.Int64("triggerTaskID", triggerTask.getTaskID())) metrics.QueryCoordChildTaskLatency.WithLabelValues().Observe(float64(t.elapseSpan().Milliseconds())) @@ -841,9 +847,8 @@ func (scheduler *TaskScheduler) processActivateTaskLoop() { for { select { case <-scheduler.stopActivateTaskLoopChan: - log.Debug("processActivateTaskLoop, ctx done") + log.Info("processActivateTaskLoop, ctx done") return - case t := <-scheduler.activateTaskChan: if t == nil { log.Error("processActivateTaskLoop: pop a nil active task", zap.Int64("taskID", t.getTaskID())) @@ -851,7 +856,6 @@ func (scheduler *TaskScheduler) processActivateTaskLoop() { } if t.getState() != taskDone { - log.Debug("processActivateTaskLoop: pop an active task from activateChan", zap.Int64("taskID", t.getTaskID())) go func() { err := scheduler.processTask(t) t.notify(err)