From 3d316fc0521dbfae14d2f9764fe3e364c830220d Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Tue, 25 Oct 2022 19:29:36 +0800 Subject: [PATCH] Fix Query Log Level (#19995) Signed-off-by: xiaofan-luan Signed-off-by: xiaofan-luan --- internal/querycoordv2/job/job.go | 2 +- .../observers/handoff_observer.go | 2 +- internal/querycoordv2/server.go | 20 +++++++++---------- internal/querycoordv2/services.go | 2 +- internal/querynode/flow_graph_delete_node.go | 2 +- internal/querynode/flow_graph_insert_node.go | 6 +++--- internal/querynode/impl.go | 9 +++++---- internal/querynode/segment.go | 4 ++-- internal/querynode/segment_loader.go | 12 +++++------ internal/querynode/shard_cluster.go | 2 +- internal/querynode/task_read.go | 2 +- internal/querynode/task_scheduler.go | 4 +--- 12 files changed, 33 insertions(+), 34 deletions(-) diff --git a/internal/querycoordv2/job/job.go b/internal/querycoordv2/job/job.go index 34c4b6eaa3..60f7975bc9 100644 --- a/internal/querycoordv2/job/job.go +++ b/internal/querycoordv2/job/job.go @@ -513,7 +513,7 @@ func (job *ReleasePartitionJob) Execute() error { } if len(toRelease) == len(loadedPartitions) { // All partitions are released, clear all - log.Debug("release partitions covers all partitions, will remove the whole collection") + log.Info("release partitions covers all partitions, will remove the whole collection") err := job.meta.CollectionManager.RemoveCollection(req.GetCollectionID()) if err != nil { msg := "failed to release partitions from store" diff --git a/internal/querycoordv2/observers/handoff_observer.go b/internal/querycoordv2/observers/handoff_observer.go index 7baad540cd..51941e9eb8 100644 --- a/internal/querycoordv2/observers/handoff_observer.go +++ b/internal/querycoordv2/observers/handoff_observer.go @@ -252,7 +252,7 @@ func (ob *HandoffObserver) tryHandoff(ctx context.Context, segment *querypb.Segm ob.handoffSubmitOrders[segment.GetPartitionID()] = append(ob.handoffSubmitOrders[segment.GetPartitionID()], segment.GetSegmentID()) } else { // ignore handoff task - log.Debug("handoff event trigger failed due to collection/partition is not loaded!") + log.Info("handoff event trigger failed due to collection/partition is not loaded!") ob.cleanEvent(ctx, segment) } } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 456d64b5fe..baa0ba1324 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -164,7 +164,7 @@ func (s *Server) Init() error { // Init KV etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath) s.kv = etcdKV - log.Debug("query coordinator try to connect etcd success") + log.Info("query coordinator try to connect etcd success") // Init ID allocator idAllocatorKV := tsoutil.NewTSOKVBase(s.etcdCli, Params.EtcdCfg.KvRootPath, "querycoord-id-allocator") @@ -187,12 +187,12 @@ func (s *Server) Init() error { return err } // Init session - log.Debug("init session") + log.Info("init session") s.nodeMgr = session.NewNodeManager() s.cluster = session.NewCluster(s.nodeMgr) // Init schedulers - log.Debug("init schedulers") + log.Info("init schedulers") s.jobScheduler = job.NewScheduler() s.taskScheduler = task.NewScheduler( s.ctx, @@ -205,7 +205,7 @@ func (s *Server) Init() error { ) // Init heartbeat - log.Debug("init dist controller") + log.Info("init dist controller") s.distController = dist.NewDistController( s.cluster, s.nodeMgr, @@ -215,7 +215,7 @@ func (s *Server) Init() error { ) // Init balancer - log.Debug("init balancer") + log.Info("init balancer") s.balancer = balance.NewRowCountBasedBalancer( s.taskScheduler, s.nodeMgr, @@ -224,7 +224,7 @@ func (s *Server) Init() error { ) // Init checker controller - log.Debug("init checker controller") + log.Info("init checker controller") s.checkerController = checkers.NewCheckerController( s.meta, s.dist, @@ -241,11 +241,11 @@ func (s *Server) Init() error { } func (s *Server) initMeta() error { - log.Debug("init meta") + log.Info("init meta") s.store = meta.NewMetaStore(s.kv) s.meta = meta.NewMeta(s.idAllocator, s.store) - log.Debug("recover meta...") + log.Info("recover meta...") err := s.meta.CollectionManager.Recover() if err != nil { log.Error("failed to recover collections") @@ -272,7 +272,7 @@ func (s *Server) initMeta() error { } func (s *Server) initObserver() { - log.Debug("init observers") + log.Info("init observers") s.collectionObserver = observers.NewCollectionObserver( s.dist, s.meta, @@ -658,7 +658,7 @@ func (s *Server) checkReplicas() { zap.Int64("replicaID", replica.GetID()), zap.Int64s("offlineNodes", toRemove), ) - log.Debug("some nodes are offline, remove them from replica") + log.Info("some nodes are offline, remove them from replica", zap.Any("toRemove", toRemove)) replica.RemoveNode(toRemove...) err := s.meta.ReplicaManager.Put(replica) if err != nil { diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index ed7b1b6348..d47d8aca22 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -514,7 +514,7 @@ func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon zap.Int64("msgID", req.GetBase().GetMsgID()), ) - log.Debug("show configurations request received", zap.String("pattern", req.GetPattern())) + log.Info("show configurations request received", zap.String("pattern", req.GetPattern())) if s.status.Load() != commonpb.StateCode_Healthy { msg := "failed to show configurations" diff --git a/internal/querynode/flow_graph_delete_node.go b/internal/querynode/flow_graph_delete_node.go index 6e246a4ae5..9d7fd2e3ce 100644 --- a/internal/querynode/flow_graph_delete_node.go +++ b/internal/querynode/flow_graph_delete_node.go @@ -144,7 +144,7 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { err := dNode.delete(delData, segmentID, &wg) if err != nil { // error occurs when segment cannot be found, calling cgo function delete failed and etc... - log.Error("failed to apply deletions to segment", + log.Warn("failed to apply deletions to segment", zap.Int64("segmentID", segmentID), zap.Error(err), ) diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 66ff9dfac1..e426f4060b 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -187,7 +187,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { offset, err := targetSegment.segmentPreInsert(numOfRecords) if err != nil { if errors.Is(err, ErrSegmentUnhealthy) { - log.Debug("segment removed before preInsert") + log.Warn("segment removed before preInsert") continue } // error occurs when cgo function `PreInsert` failed @@ -388,7 +388,7 @@ func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID) error { err = targetSegment.segmentInsert(offsets, ids, timestamps, insertRecord) if err != nil { if errors.Is(err, ErrSegmentUnhealthy) { - log.Debug("segment removed before insert") + log.Warn("segment removed before insert") return nil } return fmt.Errorf("segmentInsert failed, segmentID = %d, err = %s", segmentID, err) @@ -426,7 +426,7 @@ func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID) erro err = targetSegment.segmentDelete(offset, ids, timestamps) if err != nil { if errors.Is(err, ErrSegmentUnhealthy) { - log.Debug("segment removed before delete") + log.Warn("segment removed before delete") return nil } return fmt.Errorf("segmentDelete failed, err = %s", err) diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 941311a83e..2eb85aabd6 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -1085,7 +1085,7 @@ func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.Syn }, nil } - log.Debug("Received SyncReplicaSegments request", zap.String("vchannelName", req.GetVchannelName())) + log.Info("Received SyncReplicaSegments request", zap.String("vchannelName", req.GetVchannelName())) err := node.ShardClusterService.SyncReplicaSegments(req.GetVchannelName(), req.GetReplicaSegments()) if err != nil { @@ -1096,7 +1096,7 @@ func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.Syn }, nil } - log.Debug("SyncReplicaSegments Done", zap.String("vchannel", req.GetVchannelName())) + log.Info("SyncReplicaSegments Done", zap.String("vchannel", req.GetVchannelName())) return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil } @@ -1282,22 +1282,23 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi } // check target matches if req.GetBase().GetTargetID() != node.session.ServerID { + log.Warn("failed to do match target id when sync ", zap.Int64("expect", req.GetBase().GetTargetID()), zap.Int64("actual", node.session.ServerID)) status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_NodeIDNotMatch, Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), node.session.ServerID), } return status, nil } - log.Debug("SyncDistribution received") shardCluster, ok := node.ShardClusterService.getShardCluster(req.GetChannel()) if !ok { + log.Warn("failed to find shard cluster when sync ", zap.String("channel", req.GetChannel())) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: "shard not exist", }, nil } for _, action := range req.GetActions() { - log.Debug("sync action", zap.String("Action", action.GetType().String()), zap.Int64("segmentID", action.SegmentID)) + log.Info("sync action", zap.String("Action", action.GetType().String()), zap.Int64("segmentID", action.SegmentID)) switch action.GetType() { case querypb.SyncType_Remove: shardCluster.ReleaseSegments(ctx, &querypb.ReleaseSegmentsRequest{ diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index d22409a0c1..19dbb6f149 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -193,7 +193,7 @@ func newSegment(collection *Collection, }).Await() default: err := fmt.Errorf("illegal segment type %d when create segment %d", segType, segmentID) - log.Error("create new segment error", + log.Warn("create new segment error", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID), @@ -927,7 +927,7 @@ func (s *Segment) segmentLoadIndexData(bytesIndex [][]byte, indexInfo *querypb.F err = loadIndexInfo.appendLoadIndexInfo(bytesIndex, indexInfo, s.collectionID, s.partitionID, s.segmentID, fieldType) if err != nil { if loadIndexInfo.cleanLocalData() != nil { - log.Error("failed to clean cached data on disk after append index failed", + log.Warn("failed to clean cached data on disk after append index failed", zap.Int64("buildID", indexInfo.BuildID), zap.Int64("index version", indexInfo.IndexVersion)) } diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 942bb21eca..8cb3f7c83f 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -184,7 +184,7 @@ func (loader *segmentLoader) LoadSegment(ctx context.Context, req *querypb.LoadS // start to load // Make sure we can always benefit from concurrency, and not spawn too many idle goroutines - log.Debug("start to load segments in parallel", + log.Info("start to load segments in parallel", zap.Int("segmentNum", segmentNum), zap.Int("concurrencyLevel", concurrencyLevel)) err = funcutil.ProcessFuncParallel(segmentNum, @@ -285,7 +285,7 @@ func (loader *segmentLoader) loadFiles(ctx context.Context, segment *Segment, if pkFieldID == common.InvalidFieldID { log.Warn("segment primary key field doesn't exist when load segment") } else { - log.Debug("loading bloom filter...", zap.Int64("segmentID", segmentID)) + log.Info("loading bloom filter...", zap.Int64("segmentID", segmentID)) pkStatsBinlogs := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkFieldID) err = loader.loadSegmentBloomFilter(ctx, segment, pkStatsBinlogs) if err != nil { @@ -293,7 +293,7 @@ func (loader *segmentLoader) loadFiles(ctx context.Context, segment *Segment, } } - log.Debug("loading delta...", zap.Int64("segmentID", segmentID)) + log.Info("loading delta...", zap.Int64("segmentID", segmentID)) err = loader.loadDeltaLogs(ctx, segment, loadInfo.Deltalogs) return err } @@ -479,7 +479,7 @@ func (loader *segmentLoader) loadFieldIndexData(ctx context.Context, segment *Se // get index params when detecting indexParamPrefix if path.Base(indexPath) == storage.IndexParamsKey { indexParamsFuture := loader.ioPool.Submit(func() (interface{}, error) { - log.Debug("load index params file", zap.String("path", indexPath)) + log.Info("load index params file", zap.String("path", indexPath)) return loader.cm.Read(ctx, indexPath) }) @@ -521,7 +521,7 @@ func (loader *segmentLoader) loadFieldIndexData(ctx context.Context, segment *Se indexPath := p indexFuture := loader.cpuPool.Submit(func() (interface{}, error) { indexBlobFuture := loader.ioPool.Submit(func() (interface{}, error) { - log.Debug("load index file", zap.String("path", indexPath)) + log.Info("load index file", zap.String("path", indexPath)) data, err := loader.cm.Read(ctx, indexPath) if err != nil { log.Warn("failed to load index file", zap.String("path", indexPath), zap.Error(err)) @@ -917,7 +917,7 @@ func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentLoad // when load segment, data will be copied from go memory to c++ memory memLoadingUsage := usedMemAfterLoad + uint64( float64(maxSegmentSize)*float64(concurrency)*Params.QueryNodeCfg.LoadMemoryUsageFactor) - log.Debug("predict memory and disk usage while loading (in MiB)", + log.Info("predict memory and disk usage while loading (in MiB)", zap.Int64("collectionID", collectionID), zap.Int("concurrency", concurrency), zap.Uint64("memUsage", toMB(memLoadingUsage)), diff --git a/internal/querynode/shard_cluster.go b/internal/querynode/shard_cluster.go index f63c58f7de..7eb0e5c6a4 100644 --- a/internal/querynode/shard_cluster.go +++ b/internal/querynode/shard_cluster.go @@ -904,7 +904,7 @@ func (sc *ShardCluster) GetStatistics(ctx context.Context, req *querypb.GetStati func (sc *ShardCluster) Search(ctx context.Context, req *querypb.SearchRequest, withStreaming withStreaming) ([]*internalpb.SearchResults, error) { if !sc.serviceable() { err := WrapErrShardNotAvailable(sc.replicaID, sc.vchannelName) - log.Debug("failed to search on shard", + log.Warn("failed to search on shard", zap.Int64("replicaID", sc.replicaID), zap.String("channel", sc.vchannelName), zap.Int32("state", sc.state.Load()), diff --git a/internal/querynode/task_read.go b/internal/querynode/task_read.go index 1f399d4fea..f9b2653225 100644 --- a/internal/querynode/task_read.go +++ b/internal/querynode/task_read.go @@ -154,7 +154,7 @@ func (b *baseReadTask) Ready() (bool, error) { } if _, released := b.QS.collection.getReleaseTime(); released { - log.Debug("collection release before search", zap.Int64("collectionID", b.CollectionID)) + log.Info("collection release before search", zap.Int64("collectionID", b.CollectionID)) return false, fmt.Errorf("collection has been released, taskID = %d, collectionID = %d", b.ID(), b.CollectionID) } diff --git a/internal/querynode/task_scheduler.go b/internal/querynode/task_scheduler.go index 17fbc4ec40..5acee0946f 100644 --- a/internal/querynode/task_scheduler.go +++ b/internal/querynode/task_scheduler.go @@ -298,7 +298,7 @@ func (s *taskScheduler) executeReadTasks() { for { select { case <-s.ctx.Done(): - log.Debug("QueryNode stop executeReadTasks", zap.Int64("NodeID", Params.QueryNodeCfg.GetNodeID())) + log.Info("QueryNode stop executeReadTasks", zap.Int64("NodeID", Params.QueryNodeCfg.GetNodeID())) return case t, ok := <-s.executeReadTaskChan: if ok { @@ -306,7 +306,6 @@ func (s *taskScheduler) executeReadTasks() { pendingTaskLen := len(s.executeReadTaskChan) taskWg.Add(1) atomic.AddInt32(&s.readConcurrency, int32(pendingTaskLen+1)) - log.Debug("begin to execute task") go executeFunc(t) for i := 0; i < pendingTaskLen; i++ { @@ -315,7 +314,6 @@ func (s *taskScheduler) executeReadTasks() { rateCol.rtCounter.sub(t, receiveQueueType) go executeFunc(t) } - //log.Debug("QueryNode taskScheduler executeReadTasks process tasks done", zap.Int("numOfTasks", pendingTaskLen+1)) } else { errMsg := "taskScheduler executeReadTaskChan has been closed" log.Warn(errMsg)