From 1c1f2a137163a8a920746cf4e382a5db56c47fca Mon Sep 17 00:00:00 2001 From: smellthemoon <64083300+smellthemoon@users.noreply.github.com> Date: Fri, 5 Jan 2024 16:12:48 +0800 Subject: [PATCH] enhance:change some logs (#29579) related #29588 Signed-off-by: lixinguo Co-authored-by: lixinguo --- internal/allocator/cached_allocator.go | 4 +-- internal/datacoord/channel_manager.go | 2 +- internal/datacoord/compaction_l0_view_test.go | 2 +- internal/datacoord/compaction_trigger.go | 2 +- internal/datacoord/segment_manager.go | 2 +- internal/datacoord/server.go | 4 +-- internal/datacoord/session_manager.go | 2 +- internal/datanode/binlog_io_test.go | 2 +- internal/datanode/event_manager.go | 2 +- internal/distributed/connection_manager.go | 18 ++++++------ internal/distributed/indexnode/service.go | 2 +- internal/indexnode/indexnode.go | 2 +- internal/indexnode/task_scheduler.go | 2 +- .../mq/mqimpl/rocksmq/client/client_impl.go | 2 +- .../mq/mqimpl/rocksmq/client/consumer_impl.go | 2 +- .../mq/mqimpl/rocksmq/client/producer_impl.go | 2 +- .../mq/mqimpl/rocksmq/client/test_helper.go | 4 +-- .../mq/mqimpl/rocksmq/server/rocksmq_impl.go | 2 +- .../rocksmq/server/rocksmq_retention.go | 12 ++++---- internal/proxy/accesslog/minio_handler.go | 2 +- internal/proxy/accesslog/writer.go | 4 +-- internal/proxy/meta_cache.go | 2 +- internal/proxy/segment.go | 16 +++++------ internal/proxy/task.go | 28 +++++++++---------- internal/proxy/task_scheduler.go | 4 +-- internal/proxy/task_search.go | 2 +- internal/querynodev2/pipeline/manager.go | 2 +- internal/rootcoord/root_coord.go | 6 ++-- internal/rootcoord/timeticksync.go | 2 +- internal/storage/minio_object_storage.go | 2 +- internal/util/flowgraph/input_node.go | 2 +- internal/util/importutil/binlog_adapter.go | 2 +- internal/util/sessionutil/session_util.go | 7 +++-- pkg/mq/msgstream/mq_factory.go | 2 +- .../msgstream/mqwrapper/kafka/kafka_client.go | 2 +- .../mqwrapper/kafka/kafka_consumer.go | 6 ++-- .../mqwrapper/kafka/kafka_producer.go | 4 +-- pkg/util/funcutil/parallel.go | 12 ++++---- 38 files changed, 89 insertions(+), 88 deletions(-) diff --git a/internal/allocator/cached_allocator.go b/internal/allocator/cached_allocator.go index fe63b2f3c3..f6e8da7f59 100644 --- a/internal/allocator/cached_allocator.go +++ b/internal/allocator/cached_allocator.go @@ -256,8 +256,8 @@ func (ta *CachedAllocator) failRemainRequest() { } if len(ta.ToDoReqs) > 0 { log.Warn("Allocator has some reqs to fail", - zap.Any("Role", ta.Role), - zap.Any("reqLen", len(ta.ToDoReqs))) + zap.String("Role", ta.Role), + zap.Int("reqLen", len(ta.ToDoReqs))) } for _, req := range ta.ToDoReqs { if req != nil { diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index c1aaf95fe9..4dadfc213d 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -719,7 +719,7 @@ func (c *ChannelManagerImpl) watchChannelStatesLoop(ctx context.Context, revisio return case ackEvent := <-timeoutWatcher: log.Info("receive timeout acks from state watcher", - zap.Any("state", ackEvent.ackType), + zap.Int("state", ackEvent.ackType), zap.Int64("nodeID", ackEvent.nodeID), zap.String("channelName", ackEvent.channelName)) c.processAck(ackEvent) case event, ok := <-etcdWatcher: diff --git a/internal/datacoord/compaction_l0_view_test.go b/internal/datacoord/compaction_l0_view_test.go index ff4b7bf812..eeea705ead 100644 --- a/internal/datacoord/compaction_l0_view_test.go +++ b/internal/datacoord/compaction_l0_view_test.go @@ -152,7 +152,7 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() { return v.ID }) s.ElementsMatch(gotSegIDs, test.expectedSegs) - log.Info("trigger reason", zap.Any("trigger reason", reason)) + log.Info("trigger reason", zap.String("trigger reason", reason)) } }) } diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 661d868761..700cac5e1f 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -553,7 +553,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { } log.Info("time cost of generating compaction", zap.Int64("plan ID", plan.PlanID), - zap.Any("time cost", time.Since(start).Milliseconds()), + zap.Int64("time cost", time.Since(start).Milliseconds()), zap.Int64("collectionID", signal.collectionID), zap.String("channel", channel), zap.Int64("partitionID", partitionID), diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 8298362892..526680bbc9 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -553,7 +553,7 @@ func (s *SegmentManager) cleanupSealedSegment(ts Timestamp, channel string) { } if isEmptySealedSegment(segment, ts) { - log.Info("remove empty sealed segment", zap.Int64("collection", segment.CollectionID), zap.Any("segment", id)) + log.Info("remove empty sealed segment", zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id)) s.meta.SetState(id, commonpb.SegmentState_Dropped) continue } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index fe45a04552..2c57f33c99 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -784,7 +784,7 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat } err = s.cluster.Flush(s.ctx, ttMsg.GetBase().GetSourceID(), ch, finfo) if err != nil { - log.Warn("failed to handle flush", zap.Any("source", ttMsg.GetBase().GetSourceID()), zap.Error(err)) + log.Warn("failed to handle flush", zap.Int64("source", ttMsg.GetBase().GetSourceID()), zap.Error(err)) return err } @@ -1011,7 +1011,7 @@ func (s *Server) startFlushLoop(ctx context.Context) { log.Info("flush successfully", zap.Any("segmentID", segmentID)) err := s.postFlush(ctx, segmentID) if err != nil { - log.Warn("failed to do post flush", zap.Any("segmentID", segmentID), zap.Error(err)) + log.Warn("failed to do post flush", zap.Int64("segmentID", segmentID), zap.Error(err)) } } } diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index 689ea59b95..34a80fe7e6 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -200,7 +200,7 @@ func (c *SessionManagerImpl) Compaction(nodeID int64, plan *datapb.CompactionPla return err } - log.Info("success to execute compaction", zap.Int64("node", nodeID), zap.Any("planID", plan.GetPlanID())) + log.Info("success to execute compaction", zap.Int64("node", nodeID), zap.Int64("planID", plan.GetPlanID())) return nil } diff --git a/internal/datanode/binlog_io_test.go b/internal/datanode/binlog_io_test.go index 8b4c8323d9..b71da427b7 100644 --- a/internal/datanode/binlog_io_test.go +++ b/internal/datanode/binlog_io_test.go @@ -267,7 +267,7 @@ func TestBinlogIOInnerMethods(t *testing.T) { assert.Equal(t, 12, len(kvs)) log.Debug("test paths", - zap.Any("kvs no.", len(kvs)), + zap.Int("kvs no.", len(kvs)), zap.String("insert paths field0", pin[common.TimeStampField].GetBinlogs()[0].GetLogPath())) }) } diff --git a/internal/datanode/event_manager.go b/internal/datanode/event_manager.go index 21542c1cb8..01b6d215b9 100644 --- a/internal/datanode/event_manager.go +++ b/internal/datanode/event_manager.go @@ -120,7 +120,7 @@ func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) { e.info = watchInfo e.vChanName = watchInfo.GetVchan().GetChannelName() - log.Info("DataNode is handling watchInfo PUT event", zap.String("key", key), zap.Any("watch state", watchInfo.GetState().String())) + log.Info("DataNode is handling watchInfo PUT event", zap.String("key", key), zap.String("watch state", watchInfo.GetState().String())) case deleteEventType: e.vChanName = parseDeleteEventKey(key) log.Info("DataNode is handling watchInfo DELETE event", zap.String("key", key)) diff --git a/internal/distributed/connection_manager.go b/internal/distributed/connection_manager.go index 682a751dae..bb82a1d2bc 100644 --- a/internal/distributed/connection_manager.go +++ b/internal/distributed/connection_manager.go @@ -98,19 +98,19 @@ func (cm *ConnectionManager) AddDependency(roleName string) error { _, ok := cm.dependencies[roleName] if ok { - log.Warn("Dependency is already added", zap.Any("roleName", roleName)) + log.Warn("Dependency is already added", zap.String("roleName", roleName)) return nil } cm.dependencies[roleName] = struct{}{} msess, rev, err := cm.session.GetSessions(roleName) if err != nil { - log.Debug("ClientManager GetSessions failed", zap.Any("roleName", roleName)) + log.Debug("ClientManager GetSessions failed", zap.String("roleName", roleName)) return err } if len(msess) == 0 { - log.Debug("No nodes are currently alive", zap.Any("roleName", roleName)) + log.Debug("No nodes are currently alive", zap.String("roleName", roleName)) } else { for _, value := range msess { cm.buildConnections(value) @@ -254,12 +254,12 @@ func (cm *ConnectionManager) receiveFinishTask() { case serverID := <-cm.notify: cm.taskMu.Lock() task, ok := cm.buildTasks[serverID] - log.Debug("ConnectionManager", zap.Any("receive finish", serverID)) + log.Debug("ConnectionManager", zap.Int64("receive finish", serverID)) if ok { - log.Debug("ConnectionManager", zap.Any("get task ok", serverID)) + log.Debug("ConnectionManager", zap.Int64("get task ok", serverID)) log.Debug("ConnectionManager", zap.Any("task state", task.state)) if task.state == buildClientSuccess { - log.Debug("ConnectionManager", zap.Any("build success", serverID)) + log.Debug("ConnectionManager", zap.Int64("build success", serverID)) cm.addConnection(task.sess.ServerID, task.result) cm.buildClients(task.sess, task.result) } @@ -410,10 +410,10 @@ func (bct *buildClientTask) Run() { } err := retry.Do(bct.ctx, connectGrpcFunc, bct.retryOptions...) - log.Debug("ConnectionManager", zap.Any("build connection finish", bct.sess.ServerID)) + log.Debug("ConnectionManager", zap.Int64("build connection finish", bct.sess.ServerID)) if err != nil { log.Debug("BuildClientTask try connect failed", - zap.Any("roleName", bct.sess.ServerName), zap.Error(err)) + zap.String("roleName", bct.sess.ServerName), zap.Error(err)) bct.state = buildClientFailed return } @@ -425,7 +425,7 @@ func (bct *buildClientTask) Stop() { } func (bct *buildClientTask) finish() { - log.Debug("ConnectionManager", zap.Any("notify connection finish", bct.sess.ServerID)) + log.Debug("ConnectionManager", zap.Int64("notify connection finish", bct.sess.ServerID)) bct.notify <- bct.sess.ServerID } diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 10b3b8ac02..774f0f8e1c 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -164,7 +164,7 @@ func (s *Server) init() error { // wait for grpc server loop start err = <-s.grpcErrChan if err != nil { - log.Error("IndexNode", zap.Any("grpc error", err)) + log.Error("IndexNode", zap.Error(err)) return err } diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index d33adcfde8..44c4971cc1 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -230,7 +230,7 @@ func (i *IndexNode) Start() error { startErr = i.sched.Start() i.UpdateStateCode(commonpb.StateCode_Healthy) - log.Info("IndexNode", zap.Any("State", i.lifetime.GetState().String())) + log.Info("IndexNode", zap.String("State", i.lifetime.GetState().String())) }) log.Info("IndexNode start finished", zap.Error(startErr)) diff --git a/internal/indexnode/task_scheduler.go b/internal/indexnode/task_scheduler.go index 0e4563ccae..a58011b421 100644 --- a/internal/indexnode/task_scheduler.go +++ b/internal/indexnode/task_scheduler.go @@ -108,7 +108,7 @@ func (queue *IndexTaskQueue) AddActiveTask(t task) { tName := t.Name() _, ok := queue.activeTasks[tName] if ok { - log.Debug("IndexNode task already in active task list", zap.Any("TaskID", tName)) + log.Debug("IndexNode task already in active task list", zap.String("TaskID", tName)) } queue.activeTasks[tName] = t diff --git a/internal/mq/mqimpl/rocksmq/client/client_impl.go b/internal/mq/mqimpl/rocksmq/client/client_impl.go index 8680f77e97..f0cce86530 100644 --- a/internal/mq/mqimpl/rocksmq/client/client_impl.go +++ b/internal/mq/mqimpl/rocksmq/client/client_impl.go @@ -78,7 +78,7 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { return nil, err } if exist { - log.Debug("ConsumerGroup already existed", zap.Any("topic", options.Topic), zap.Any("SubscriptionName", options.SubscriptionName)) + log.Debug("ConsumerGroup already existed", zap.Any("topic", options.Topic), zap.String("SubscriptionName", options.SubscriptionName)) consumer, err := getExistedConsumer(c, options, con.MsgMutex) if err != nil { return nil, err diff --git a/internal/mq/mqimpl/rocksmq/client/consumer_impl.go b/internal/mq/mqimpl/rocksmq/client/consumer_impl.go index 1f95087ef3..2957300b65 100644 --- a/internal/mq/mqimpl/rocksmq/client/consumer_impl.go +++ b/internal/mq/mqimpl/rocksmq/client/consumer_impl.go @@ -132,7 +132,7 @@ func (c *consumer) Close() { // TODO should panic? err := c.client.server.DestroyConsumerGroup(c.topic, c.consumerName) if err != nil { - log.Warn("Consumer close failed", zap.Any("topicName", c.topic), zap.Any("groupName", c.consumerName), zap.Any("error", err)) + log.Warn("Consumer close failed", zap.String("topicName", c.topic), zap.String("groupName", c.consumerName), zap.Error(err)) } } diff --git a/internal/mq/mqimpl/rocksmq/client/producer_impl.go b/internal/mq/mqimpl/rocksmq/client/producer_impl.go index d401f7ed2a..ad27f449b0 100644 --- a/internal/mq/mqimpl/rocksmq/client/producer_impl.go +++ b/internal/mq/mqimpl/rocksmq/client/producer_impl.go @@ -80,6 +80,6 @@ func (p *producer) Send(message *mqwrapper.ProducerMessage) (UniqueID, error) { func (p *producer) Close() { err := p.c.server.DestroyTopic(p.topic) if err != nil { - log.Warn("Producer close failed", zap.Any("topicName", p.topic), zap.Any("error", err)) + log.Warn("Producer close failed", zap.String("topicName", p.topic), zap.Error(err)) } } diff --git a/internal/mq/mqimpl/rocksmq/client/test_helper.go b/internal/mq/mqimpl/rocksmq/client/test_helper.go index d99ade29e8..cdff81465c 100644 --- a/internal/mq/mqimpl/rocksmq/client/test_helper.go +++ b/internal/mq/mqimpl/rocksmq/client/test_helper.go @@ -58,11 +58,11 @@ func removePath(rmqPath string) { rocksdbPath := rmqPath err := os.RemoveAll(rocksdbPath) if err != nil { - log.Error("Failed to call os.removeAll.", zap.Any("path", rocksdbPath)) + log.Error("Failed to call os.removeAll.", zap.String("path", rocksdbPath)) } metaPath := rmqPath + "_meta_kv" err = os.RemoveAll(metaPath) if err != nil { - log.Error("Failed to call os.removeAll.", zap.Any("path", metaPath)) + log.Error("Failed to call os.removeAll.", zap.String("path", metaPath)) } } diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go index ec817fb1a8..9198613dbc 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -307,7 +307,7 @@ func (rmq *rocksmq) Close() { for _, consumer := range v.([]*Consumer) { err := rmq.destroyConsumerGroupInternal(consumer.Topic, consumer.GroupName) if err != nil { - log.Warn("Failed to destroy consumer group in rocksmq!", zap.Any("topic", consumer.Topic), zap.Any("groupName", consumer.GroupName), zap.Any("error", err)) + log.Warn("Failed to destroy consumer group in rocksmq!", zap.String("topic", consumer.Topic), zap.String("groupName", consumer.GroupName), zap.Error(err)) } } return true diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go index 80ebec395d..21e847702d 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go @@ -143,8 +143,8 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { } // Quick Path, No page to check if totalAckedSize == 0 { - log.Debug("All messages are not expired, skip retention because no ack", zap.Any("topic", topic), - zap.Any("time taken", time.Since(start).Milliseconds())) + log.Debug("All messages are not expired, skip retention because no ack", zap.String("topic", topic), + zap.Int64("time taken", time.Since(start).Milliseconds())) return nil } pageReadOpts := gorocksdb.NewDefaultReadOptions() @@ -232,13 +232,13 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { } if pageEndID == 0 { - log.Debug("All messages are not expired, skip retention", zap.Any("topic", topic), zap.Any("time taken", time.Since(start).Milliseconds())) + log.Debug("All messages are not expired, skip retention", zap.String("topic", topic), zap.Int64("time taken", time.Since(start).Milliseconds())) return nil } expireTime := time.Since(start).Milliseconds() - log.Debug("Expired check by message size: ", zap.Any("topic", topic), - zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize), - zap.Any("pageCleaned", pageCleaned), zap.Any("time taken", expireTime)) + log.Debug("Expired check by message size: ", zap.String("topic", topic), + zap.Int64("pageEndID", pageEndID), zap.Int64("deletedAckedSize", deletedAckedSize), + zap.Int64("pageCleaned", pageCleaned), zap.Int64("time taken", expireTime)) return ri.cleanData(topic, pageEndID) } diff --git a/internal/proxy/accesslog/minio_handler.go b/internal/proxy/accesslog/minio_handler.go index 0df84120f3..852620a3ed 100644 --- a/internal/proxy/accesslog/minio_handler.go +++ b/internal/proxy/accesslog/minio_handler.go @@ -122,7 +122,7 @@ func newMinioClient(ctx context.Context, cfg config) (*minio.Client, error) { } if !bucketExists { if cfg.createBucket { - log.Info("blob bucket not exist, create bucket.", zap.Any("bucket name", cfg.bucketName)) + log.Info("blob bucket not exist, create bucket.", zap.String("bucket name", cfg.bucketName)) err := minioClient.MakeBucket(ctx, cfg.bucketName, minio.MakeBucketOptions{}) if err != nil { log.Warn("failed to create blob bucket", zap.String("bucket", cfg.bucketName), zap.Error(err)) diff --git a/internal/proxy/accesslog/writer.go b/internal/proxy/accesslog/writer.go index 56dfc53a55..18372790ec 100644 --- a/internal/proxy/accesslog/writer.go +++ b/internal/proxy/accesslog/writer.go @@ -100,8 +100,8 @@ func NewRotateLogger(logCfg *paramtable.AccessLogConfig, minioCfg *paramtable.Mi ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - log.Debug("remtepath", zap.Any("remote", logCfg.RemotePath.GetValue())) - log.Debug("maxBackups", zap.Any("maxBackups", logCfg.MaxBackups.GetValue())) + log.Debug("remtepath", zap.String("remote", logCfg.RemotePath.GetValue())) + log.Debug("maxBackups", zap.String("maxBackups", logCfg.MaxBackups.GetValue())) handler, err := NewMinioHandler(ctx, minioCfg, logCfg.RemotePath.GetValue(), logCfg.MaxBackups.GetAsInt()) if err != nil { return nil, err diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index c372b2ee7a..3f582baf40 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -470,7 +470,7 @@ func (m *MetaCache) GetCollectionSchema(ctx context.Context, database, collectio metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds())) log.Debug("Reload collection from root coordinator ", zap.String("collectionName", collectionName), - zap.Any("time (milliseconds) take ", tr.ElapseSpan().Milliseconds())) + zap.Int64("time (milliseconds) take ", tr.ElapseSpan().Milliseconds())) return collInfo.schema, nil } defer m.mu.RUnlock() diff --git a/internal/proxy/segment.go b/internal/proxy/segment.go index f9a6b89682..3c1d8321d2 100644 --- a/internal/proxy/segment.go +++ b/internal/proxy/segment.go @@ -82,8 +82,8 @@ func (info *segInfo) Capacity(ts Timestamp) uint32 { func (info *segInfo) Assign(ts Timestamp, count uint32) uint32 { if info.IsExpired(ts) { - log.Debug("segInfo Assign IsExpired", zap.Any("ts", ts), - zap.Any("count", count)) + log.Debug("segInfo Assign IsExpired", zap.Uint64("ts", ts), + zap.Uint32("count", count)) return 0 } ret := uint32(0) @@ -229,8 +229,8 @@ func (sa *segIDAssigner) pickCanDoFunc() { } } log.Debug("Proxy segIDAssigner pickCanDoFunc", zap.Any("records", records), - zap.Any("len(newTodoReqs)", len(newTodoReqs)), - zap.Any("len(CanDoReqs)", len(sa.CanDoReqs))) + zap.Int("len(newTodoReqs)", len(newTodoReqs)), + zap.Int("len(CanDoReqs)", len(sa.CanDoReqs))) sa.ToDoReqs = newTodoReqs } @@ -268,7 +268,7 @@ func (sa *segIDAssigner) checkSegReqEqual(req1, req2 *datapb.SegmentIDRequest) b } func (sa *segIDAssigner) reduceSegReqs() { - log.Debug("Proxy segIDAssigner reduceSegReqs", zap.Any("len(segReqs)", len(sa.segReqs))) + log.Debug("Proxy segIDAssigner reduceSegReqs", zap.Int("len(segReqs)", len(sa.segReqs))) if len(sa.segReqs) == 0 { return } @@ -298,9 +298,9 @@ func (sa *segIDAssigner) reduceSegReqs() { afterCnt += req.Count } sa.segReqs = newSegReqs - log.Debug("Proxy segIDAssigner reduceSegReqs after reduce", zap.Any("len(segReqs)", len(sa.segReqs)), - zap.Any("BeforeCnt", beforeCnt), - zap.Any("AfterCnt", afterCnt)) + log.Debug("Proxy segIDAssigner reduceSegReqs after reduce", zap.Int("len(segReqs)", len(sa.segReqs)), + zap.Uint32("BeforeCnt", beforeCnt), + zap.Uint32("AfterCnt", afterCnt)) } func (sa *segIDAssigner) syncSegments() (bool, error) { diff --git a/internal/proxy/task.go b/internal/proxy/task.go index fa25102f68..a3f9bf16e3 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -679,8 +679,8 @@ func (t *showCollectionsTask) Execute(ctx context.Context) error { for _, collectionName := range t.CollectionNames { collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), collectionName) if err != nil { - log.Debug("Failed to get collection id.", zap.Any("collectionName", collectionName), - zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showCollections")) + log.Debug("Failed to get collection id.", zap.String("collectionName", collectionName), + zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "showCollections")) return err } collectionIDs = append(collectionIDs, collectionID) @@ -726,14 +726,14 @@ func (t *showCollectionsTask) Execute(ctx context.Context) error { collectionName, ok := IDs2Names[id] if !ok { log.Debug("Failed to get collection info. This collection may be not released", - zap.Any("collectionID", id), - zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showCollections")) + zap.Int64("collectionID", id), + zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "showCollections")) continue } collectionInfo, err := globalMetaCache.GetCollectionInfo(ctx, t.GetDbName(), collectionName, id) if err != nil { - log.Debug("Failed to get collection info.", zap.Any("collectionName", collectionName), - zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showCollections")) + log.Debug("Failed to get collection info.", zap.String("collectionName", collectionName), + zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "showCollections")) return err } t.result.CollectionIds = append(t.result.CollectionIds, id) @@ -1178,8 +1178,8 @@ func (t *showPartitionsTask) Execute(ctx context.Context) error { collectionName := t.CollectionName collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), collectionName) if err != nil { - log.Debug("Failed to get collection id.", zap.Any("collectionName", collectionName), - zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showPartitions")) + log.Debug("Failed to get collection id.", zap.String("collectionName", collectionName), + zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "showPartitions")) return err } IDs2Names := make(map[UniqueID]string) @@ -1191,8 +1191,8 @@ func (t *showPartitionsTask) Execute(ctx context.Context) error { for _, partitionName := range t.PartitionNames { partitionID, err := globalMetaCache.GetPartitionID(ctx, t.GetDbName(), collectionName, partitionName) if err != nil { - log.Debug("Failed to get partition id.", zap.Any("partitionName", partitionName), - zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showPartitions")) + log.Debug("Failed to get partition id.", zap.String("partitionName", partitionName), + zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "showPartitions")) return err } partitionIDs = append(partitionIDs, partitionID) @@ -1230,14 +1230,14 @@ func (t *showPartitionsTask) Execute(ctx context.Context) error { for offset, id := range resp.PartitionIDs { partitionName, ok := IDs2Names[id] if !ok { - log.Debug("Failed to get partition id.", zap.Any("partitionName", partitionName), - zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showPartitions")) + log.Debug("Failed to get partition id.", zap.String("partitionName", partitionName), + zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "showPartitions")) return errors.New("failed to show partitions") } partitionInfo, err := globalMetaCache.GetPartitionInfo(ctx, t.GetDbName(), collectionName, partitionName) if err != nil { - log.Debug("Failed to get partition id.", zap.Any("partitionName", partitionName), - zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showPartitions")) + log.Debug("Failed to get partition id.", zap.String("partitionName", partitionName), + zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "showPartitions")) return err } t.result.PartitionIDs = append(t.result.PartitionIDs, id) diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index f28a3a7e81..be4741e6d1 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -263,10 +263,10 @@ func (queue *dmTaskQueue) PopActiveTask(taskID UniqueID) task { defer queue.statsLock.Unlock() delete(queue.activeTasks, taskID) - log.Debug("Proxy dmTaskQueue popPChanStats", zap.Any("taskID", t.ID())) + log.Debug("Proxy dmTaskQueue popPChanStats", zap.Int64("taskID", t.ID())) queue.popPChanStats(t) } else { - log.Warn("Proxy task not in active task list!", zap.Any("taskID", taskID)) + log.Warn("Proxy task not in active task list!", zap.Int64("taskID", taskID)) } return t } diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 15650b94e6..bf34500516 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -854,7 +854,7 @@ func reduceSearchResultData(ctx context.Context, subSearchResultData []*schemapb zap.Int64("nq", sData.NumQueries), zap.Int64("topk", sData.TopK), zap.Int("length of pks", pkLength), - zap.Any("length of FieldsData", len(sData.FieldsData))) + zap.Int("length of FieldsData", len(sData.FieldsData))) if err := checkSearchResultData(sData, nq, topk); err != nil { log.Ctx(ctx).Warn("invalid search results", zap.Error(err)) return ret, err diff --git a/internal/querynodev2/pipeline/manager.go b/internal/querynodev2/pipeline/manager.go index cf4a746d7d..453c963843 100644 --- a/internal/querynodev2/pipeline/manager.go +++ b/internal/querynodev2/pipeline/manager.go @@ -120,7 +120,7 @@ func (m *manager) Remove(channels ...string) { pipeline.Close() delete(m.channel2Pipeline, channel) } else { - log.Warn("pipeline to be removed doesn't existed", zap.Any("channel", channel)) + log.Warn("pipeline to be removed doesn't existed", zap.String("channel", channel)) } } metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec() diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 5c179d2f24..928f8444fe 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -626,7 +626,7 @@ func (c *Core) initBuiltinRoles() error { for role, privilegesJSON := range rolePrivilegesMap { err := c.meta.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: role}) if err != nil && !common.IsIgnorableError(err) { - log.Error("create a builtin role fail", zap.Any("error", err), zap.String("roleName", role)) + log.Error("create a builtin role fail", zap.String("roleName", role), zap.Error(err)) return errors.Wrapf(err, "failed to create a builtin role: %s", role) } for _, privilege := range privilegesJSON[util.RoleConfigPrivileges] { @@ -645,7 +645,7 @@ func (c *Core) initBuiltinRoles() error { }, }, milvuspb.OperatePrivilegeType_Grant) if err != nil && !common.IsIgnorableError(err) { - log.Error("grant privilege to builtin role fail", zap.Any("error", err), zap.String("roleName", role), zap.Any("privilege", privilege)) + log.Error("grant privilege to builtin role fail", zap.String("roleName", role), zap.Any("privilege", privilege), zap.Error(err)) return errors.Wrapf(err, "failed to grant privilege: <%s, %s, %s> of db: %s to role: %s", privilege[util.RoleConfigObjectType], privilege[util.RoleConfigObjectName], privilege[util.RoleConfigPrivilege], privilege[util.RoleConfigDBName], role) } } @@ -2039,7 +2039,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( // Here ir.GetState() == commonpb.ImportState_ImportPersisted // Seal these import segments, so they can be auto-flushed later. log.Info("an import task turns to persisted state, flush segments to be sealed", - zap.Any("task ID", ir.GetTaskId()), zap.Any("segments", ir.GetSegments())) + zap.Int64("task ID", ir.GetTaskId()), zap.Any("segments", ir.GetSegments())) if err := c.broker.Flush(ctx, ti.GetCollectionId(), ir.GetSegments()); err != nil { log.Error("failed to call Flush on bulk insert segments", zap.Int64("task ID", ir.GetTaskId())) diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index bc906519cc..4ad56bc059 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -166,7 +166,7 @@ func (t *timetickSync) sendToChannel() bool { // give warning every 2 second if not get ttMsg from source sessions if maxCnt%10 == 0 { log.Warn("session idle for long time", zap.Any("idle list", idleSessionList), - zap.Any("idle time", Params.ProxyCfg.TimeTickInterval.GetAsInt64()*time.Millisecond.Milliseconds()*maxCnt)) + zap.Int64("idle time", Params.ProxyCfg.TimeTickInterval.GetAsInt64()*time.Millisecond.Milliseconds()*maxCnt)) } return false } diff --git a/internal/storage/minio_object_storage.go b/internal/storage/minio_object_storage.go index 639d7bbce0..bf273ec99d 100644 --- a/internal/storage/minio_object_storage.go +++ b/internal/storage/minio_object_storage.go @@ -118,7 +118,7 @@ func newMinioClient(ctx context.Context, c *config) (*minio.Client, error) { } if !bucketExists { if c.createBucket { - log.Info("blob bucket not exist, create bucket.", zap.Any("bucket name", c.bucketName)) + log.Info("blob bucket not exist, create bucket.", zap.String("bucket name", c.bucketName)) err := minIOClient.MakeBucket(ctx, c.bucketName, minio.MakeBucketOptions{}) if err != nil { log.Warn("failed to create blob bucket", zap.String("bucket", c.bucketName), zap.Error(err)) diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go index 65337f2b21..24eeff9b4e 100644 --- a/internal/util/flowgraph/input_node.go +++ b/internal/util/flowgraph/input_node.go @@ -77,7 +77,7 @@ func (inNode *InputNode) SetCloseMethod(gracefully bool) { log.Info("input node close method set", zap.String("node", inNode.Name()), zap.Int64("collection", inNode.collectionID), - zap.Any("gracefully", gracefully)) + zap.Bool("gracefully", gracefully)) } // Operate consume a message pack from msgstream and return diff --git a/internal/util/importutil/binlog_adapter.go b/internal/util/importutil/binlog_adapter.go index 29b2ef8b44..554e659ff5 100644 --- a/internal/util/importutil/binlog_adapter.go +++ b/internal/util/importutil/binlog_adapter.go @@ -180,7 +180,7 @@ func (p *BinlogAdapter) Read(segmentHolder *SegmentFilesHolder) error { // read timestamps list timestampLog := segmentHolder.fieldFiles[common.TimeStampField][i] // no need to check existence, already verified - log.Info("Binlog adapter: prepare to read timestamp binglog", zap.Any("logPath", timestampLog)) + log.Info("Binlog adapter: prepare to read timestamp binglog", zap.String("logPath", timestampLog)) timestampList, err := p.readTimestamp(timestampLog) if err != nil { return err diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 18de6f4a0e..3e76d1c5d3 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -485,15 +485,16 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er // If we find previous session have same address as current , simply purge the old one so the recovery can be much faster func (s *Session) handleRestart(key string) { resp, err := s.etcdCli.Get(s.ctx, key) + log := log.With(zap.String("key", key)) if err != nil { - log.Warn("failed to read old session from etcd, ignore", zap.Any("key", key), zap.Error(err)) + log.Warn("failed to read old session from etcd, ignore", zap.Error(err)) return } for _, kv := range resp.Kvs { session := &Session{} err = json.Unmarshal(kv.Value, session) if err != nil { - log.Warn("failed to unmarshal old session from etcd, ignore", zap.Any("key", key), zap.Error(err)) + log.Warn("failed to unmarshal old session from etcd, ignore", zap.Error(err)) return } @@ -502,7 +503,7 @@ func (s *Session) handleRestart(key string) { zap.String("address", session.Address)) _, err := s.etcdCli.Delete(s.ctx, key) if err != nil { - log.Warn("failed to unmarshal old session from etcd, ignore", zap.Any("key", key), zap.Error(err)) + log.Warn("failed to unmarshal old session from etcd, ignore", zap.Error(err)) return } } diff --git a/pkg/mq/msgstream/mq_factory.go b/pkg/mq/msgstream/mq_factory.go index cf9b23e25e..74e324e0ce 100644 --- a/pkg/mq/msgstream/mq_factory.go +++ b/pkg/mq/msgstream/mq_factory.go @@ -169,7 +169,7 @@ func (f *PmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, st } } log.Warn("failed to clean up subscriptions", zap.String("pulsar web", f.PulsarWebAddress), - zap.String("topic", channel), zap.Any("subname", subname), zap.Error(err)) + zap.String("topic", channel), zap.String("subname", subname), zap.Error(err)) } } return nil diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go index 060f502ac4..d4d03b6e2c 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -124,7 +124,7 @@ func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) { // authentication issues, etc. // After a fatal error has been raised, any subsequent Produce*() calls will fail with // the original error code. - log.Error("kafka error", zap.Any("error msg", ev.Error())) + log.Error("kafka error", zap.String("error msg", ev.Error())) if ev.IsFatal() { panic(ev) } diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go index f1b0b9d125..c86aecf6ec 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go @@ -135,7 +135,7 @@ func (kc *Consumer) Chan() <-chan mqwrapper.Message { e, err := kc.c.ReadMessage(readTimeout) if err != nil { // if we failed to read message in 30 Seconds, print out a warn message since there should always be a tt - log.Warn("consume msg failed", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(err)) + log.Warn("consume msg failed", zap.String("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(err)) } else { if kc.skipMsg { kc.skipMsg = false @@ -217,7 +217,7 @@ func (kc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) { high = high - 1 } - log.Info("get latest msg ID ", zap.Any("topic", kc.topic), zap.Int64("oldest offset", low), zap.Int64("latest offset", high)) + log.Info("get latest msg ID ", zap.String("topic", kc.topic), zap.Int64("oldest offset", low), zap.Int64("latest offset", high)) return &kafkaID{messageID: high}, nil } @@ -249,7 +249,7 @@ func (kc *Consumer) closeInternal() { } cost := time.Since(start).Milliseconds() if cost > 200 { - log.Warn("close consumer costs too long time", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Int64("time(ms)", cost)) + log.Warn("close consumer costs too long time", zap.String("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Int64("time(ms)", cost)) } } diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go index f2f0ec4e43..71fdcf0bcf 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go @@ -81,14 +81,14 @@ func (kp *kafkaProducer) Close() { // flush in-flight msg within queue. i := kp.p.Flush(10000) if i > 0 { - log.Warn("There are still un-flushed outstanding events", zap.Int("event_num", i), zap.Any("topic", kp.topic)) + log.Warn("There are still un-flushed outstanding events", zap.Int("event_num", i), zap.String("topic", kp.topic)) } close(kp.deliveryChan) cost := time.Since(start).Milliseconds() if cost > 500 { - log.Debug("kafka producer is closed", zap.Any("topic", kp.topic), zap.Int64("time cost(ms)", cost)) + log.Debug("kafka producer is closed", zap.String("topic", kp.topic), zap.Int64("time cost(ms)", cost)) } }) } diff --git a/pkg/util/funcutil/parallel.go b/pkg/util/funcutil/parallel.go index 258d603ccf..6e679c7cbb 100644 --- a/pkg/util/funcutil/parallel.go +++ b/pkg/util/funcutil/parallel.go @@ -51,7 +51,7 @@ func ProcessFuncParallel(total, maxParallel int, f ProcessFunc, fname string) er t := time.Now() defer func() { - log.Debug(fname, zap.Any("total", total), zap.Any("time cost", time.Since(t))) + log.Debug(fname, zap.Int("total", total), zap.Any("time cost", time.Since(t))) }() nPerBatch := (total + maxParallel - 1) / maxParallel @@ -85,7 +85,7 @@ func ProcessFuncParallel(total, maxParallel int, f ProcessFunc, fname string) er for idx := begin; idx < end; idx++ { err = f(idx) if err != nil { - log.Error(fname, zap.Error(err), zap.Any("idx", idx)) + log.Error(fname, zap.Error(err), zap.Int("idx", idx)) break } } @@ -146,8 +146,8 @@ func ProcessTaskParallel(maxParallel int, fname string, tasks ...TaskFunc) error total := len(tasks) nPerBatch := (total + maxParallel - 1) / maxParallel - log.Debug(fname, zap.Any("total", total)) - log.Debug(fname, zap.Any("nPerBatch", nPerBatch)) + log.Debug(fname, zap.Int("total", total)) + log.Debug(fname, zap.Int("nPerBatch", nPerBatch)) quit := make(chan bool) errc := make(chan error) @@ -188,7 +188,7 @@ func ProcessTaskParallel(maxParallel int, fname string, tasks ...TaskFunc) error for idx := begin; idx < end; idx++ { err = tasks[idx]() if err != nil { - log.Error(fname, zap.Error(err), zap.Any("idx", idx)) + log.Error(fname, zap.Error(err), zap.Int("idx", idx)) break } } @@ -212,7 +212,7 @@ func ProcessTaskParallel(maxParallel int, fname string, tasks ...TaskFunc) error routineNum++ } - log.Debug(fname, zap.Any("NumOfGoRoutines", routineNum)) + log.Debug(fname, zap.Int("NumOfGoRoutines", routineNum)) if routineNum <= 0 { return nil