diff --git a/internal/allocator/allocator.go b/internal/allocator/allocator.go index e14f94c4f2..be7c4be1be 100644 --- a/internal/allocator/allocator.go +++ b/internal/allocator/allocator.go @@ -254,7 +254,7 @@ func (ta *Allocator) failRemainRequest() { err = errors.New(errMsg) } if len(ta.ToDoReqs) > 0 { - log.Debug("Allocator has some reqs to fail", + log.Warn("Allocator has some reqs to fail", zap.Any("Role", ta.Role), zap.Any("reqLen", len(ta.ToDoReqs))) } diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 6a1690e6b4..00408318eb 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -125,7 +125,7 @@ func (c *ChannelManager) Startup(nodes []int64) error { // Unwatch and drop channel with drop flag. c.unwatchDroppedChannels() - log.Debug("cluster start up", + log.Info("cluster start up", zap.Any("nodes", nodes), zap.Any("oNodes", oNodes), zap.Int64s("new onlines", newOnLines), @@ -171,7 +171,7 @@ func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) { } updates := c.reassignPolicy(c.store, reallocates) - log.Debug("channel manager bg check reassign", zap.Array("updates", updates)) + log.Info("channel manager bg check reassign", zap.Array("updates", updates)) for _, update := range updates { if update.Type == Add { c.fillChannelWatchInfo(update) @@ -225,7 +225,7 @@ func (c *ChannelManager) AddNode(nodeID int64) error { c.store.Add(nodeID) updates := c.registerPolicy(c.store, nodeID) - log.Debug("register node", + log.Info("register node", zap.Int64("registered node", nodeID), zap.Array("updates", updates)) @@ -250,7 +250,7 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error { c.unsubAttempt(nodeChannelInfo) updates := c.deregisterPolicy(c.store, nodeID) - log.Debug("deregister node", + log.Warn("deregister node", zap.Int64("unregistered node", nodeID), zap.Array("updates", updates)) @@ -312,7 +312,7 @@ func (c *ChannelManager) Watch(ch *channel) error { if len(updates) == 0 { return nil } - log.Debug("watch channel", + log.Info("watch channel", zap.Any("channel", ch), zap.Array("updates", updates)) @@ -327,7 +327,7 @@ func (c *ChannelManager) Watch(ch *channel) error { zap.String("channelName", ch.Name), zap.Error(err)) return err } - log.Debug("ChannelManager RWChannelStore update success", zap.Int64("collectionID", ch.CollectionID), + log.Info("ChannelManager RWChannelStore update success", zap.Int64("collectionID", ch.CollectionID), zap.String("channelName", ch.Name)) return nil } diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index 9d8030d1da..366565ab03 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -122,7 +122,7 @@ func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, mar SegmentIDs: segments, MarkSegmentIDs: marks, } - log.Warn("Plan to flush", zap.Int64("node_id", nodeID), zap.Int64s("segments", segments), zap.Int64s("marks", marks)) + log.Info("Plan to flush", zap.Int64("node_id", nodeID), zap.Int64s("segments", segments), zap.Int64s("marks", marks)) c.sessionManager.Flush(ctx, nodeID, req) } } diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 9d4733ccca..444c214de6 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -103,7 +103,7 @@ func (t *compactionTrigger) start() { for { select { case <-t.quit: - log.Debug("compaction trigger quit") + log.Info("compaction trigger quit") return case signal := <-t.signals: switch { diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 3771c7de86..d8621ef1d4 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -143,7 +143,7 @@ func (gc *garbageCollector) scan() { } } } - log.Warn("scan result", zap.Int("valid", v), zap.Int("missing", m), zap.Strings("removed keys", removedKeys)) + log.Info("scan result", zap.Int("valid", v), zap.Int("missing", m), zap.Strings("removed keys", removedKeys)) } func (gc *garbageCollector) clearEtcd() { diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index ee196db153..ee71e64c01 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -51,7 +51,7 @@ func (h *ServerHandler) GetVChanPositions(channel string, collectionID UniqueID, segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool { return s.InsertChannel == channel }) - log.Debug("GetSegmentsByChannel", + log.Info("GetSegmentsByChannel", zap.Any("collectionID", collectionID), zap.Any("channel", channel), zap.Any("numOfSegments", len(segments)), diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 2c75e6bd8c..0f586a0425 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -320,7 +320,7 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique return nil, err } s.segments = append(s.segments, id) - log.Debug("datacoord: estimateTotalRows: ", + log.Info("datacoord: estimateTotalRows: ", zap.Int64("CollectionID", segmentInfo.CollectionID), zap.Int64("SegmentID", segmentInfo.ID), zap.Int("Rows", maxNumOfRows), diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 04f1a2db63..b1a3614665 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -390,10 +390,10 @@ func (s *Server) initGarbageCollection() error { func (s *Server) initServiceDiscovery() error { sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole) if err != nil { - log.Debug("DataCoord failed to init service discovery", zap.Error(err)) + log.Warn("DataCoord failed to init service discovery", zap.Error(err)) return err } - log.Debug("DataCoord success to get DataNode sessions", zap.Any("sessions", sessions)) + log.Info("DataCoord success to get DataNode sessions", zap.Any("sessions", sessions)) datanodes := make([]*NodeInfo, 0, len(sessions)) for _, session := range sessions { @@ -450,7 +450,7 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { } ttMsgStream.AsConsumerWithPosition([]string{Params.MsgChannelCfg.DataCoordTimeTick}, Params.MsgChannelCfg.DataCoordSubName, mqclient.SubscriptionPositionLatest) - log.Debug("DataCoord creates the timetick channel consumer", + log.Info("DataCoord creates the timetick channel consumer", zap.String("timeTickChannel", Params.MsgChannelCfg.DataCoordTimeTick), zap.String("subscription", Params.MsgChannelCfg.DataCoordSubName)) ttMsgStream.Start() @@ -481,13 +481,13 @@ func (s *Server) handleDataNodeTimetickMsgstream(ctx context.Context, ttMsgStrea for { select { case <-ctx.Done(): - log.Debug("DataNode timetick loop shutdown") + log.Info("DataNode timetick loop shutdown") return default: } msgPack := ttMsgStream.Consume() if msgPack == nil { - log.Debug("receive nil timetick msg and shutdown timetick channel") + log.Info("receive nil timetick msg and shutdown timetick channel") return } for _, msg := range msgPack.Msgs { @@ -537,7 +537,7 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat return nil } - log.Debug("flush segments", zap.Int64s("segmentIDs", flushableIDs), zap.Int("markSegments count", len(staleSegments))) + log.Info("flush segments", zap.Int64s("segmentIDs", flushableIDs), zap.Int("markSegments count", len(staleSegments))) s.setLastFlushTime(flushableSegments) s.setLastFlushTime(staleSegments) @@ -615,7 +615,7 @@ func (s *Server) watchService(ctx context.Context) { for { select { case <-ctx.Done(): - log.Debug("watch service shutdown") + log.Info("watch service shutdown") return case event, ok := <-s.eventCh: if !ok { @@ -730,7 +730,7 @@ func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error { log.Error("flush segment complete failed", zap.Error(err)) return err } - log.Debug("flush segment complete", zap.Int64("id", segmentID)) + log.Info("flush segment complete", zap.Int64("id", segmentID)) return nil } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 2ae48e7b93..42fbbc8fd0 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -66,7 +66,7 @@ func (s *Server) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp // this api only guarantees all the segments requested is sealed // these segments will be flushed only after the Flush policy is fulfilled func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) { - log.Debug("receive flush request", zap.Int64("dbID", req.GetDbID()), zap.Int64("collectionID", req.GetCollectionID())) + log.Info("receive flush request", zap.Int64("dbID", req.GetDbID()), zap.Int64("collectionID", req.GetCollectionID())) sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "DataCoord-Flush") defer sp.Finish() resp := &datapb.FlushResponse{ @@ -87,7 +87,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F resp.Status.Reason = fmt.Sprintf("failed to flush %d, %s", req.CollectionID, err) return resp, nil } - log.Debug("flush response with segments", + log.Info("flush response with segments", zap.Int64("collectionID", req.GetCollectionID()), zap.Any("segments", sealedSegments)) resp.Status.ErrorCode = commonpb.ErrorCode_Success @@ -111,7 +111,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI assigns := make([]*datapb.SegmentIDAssignment, 0, len(req.SegmentIDRequests)) for _, r := range req.SegmentIDRequests { - log.Debug("handle assign segment request", + log.Info("handle assign segment request", zap.Int64("collectionID", r.GetCollectionID()), zap.Int64("partitionID", r.GetPartitionID()), zap.String("channelName", r.GetChannelName()), @@ -136,7 +136,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI log.Warn("failed to alloc segment", zap.Any("request", r), zap.Error(err)) continue } - log.Debug("success to assign segments", zap.Int64("collectionID", r.GetCollectionID()), zap.Any("assignments", segAlloc)) + log.Info("success to assign segments", zap.Int64("collectionID", r.GetCollectionID()), zap.Any("assignments", segAlloc)) for _, allocation := range segAlloc { result := &datapb.SegmentIDAssignment{ @@ -310,7 +310,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath return resp, nil } - log.Debug("receive SaveBinlogPaths request", + log.Info("receive SaveBinlogPaths request", zap.Int64("nodeID", req.GetBase().GetSourceID()), zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("segmentID", req.GetSegmentID()), @@ -359,7 +359,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath return resp, nil } - log.Debug("flush segment with meta", zap.Int64("id", req.SegmentID), + log.Info("flush segment with meta", zap.Int64("id", req.SegmentID), zap.Any("meta", req.GetField2BinlogPaths())) if req.GetFlushed() { @@ -398,7 +398,7 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual } channel := req.GetChannelName() - log.Debug("receive DropVirtualChannel request", + log.Info("receive DropVirtualChannel request", zap.String("channel name", channel)) // validate @@ -433,7 +433,7 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual return resp, nil } - log.Debug("DropVChannel plan to remove", zap.String("channel", channel)) + log.Info("DropVChannel plan to remove", zap.String("channel", channel)) err = s.channelManager.RemoveChannel(channel) if err != nil { log.Warn("DropVChannel failed to RemoveChannel", zap.String("channel", channel), zap.Error(err)) @@ -703,7 +703,7 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest return metrics, nil } - log.Debug("DataCoord.GetMetrics failed, request metric type is not implemented yet", + log.RatedWarn(60.0, "DataCoord.GetMetrics failed, request metric type is not implemented yet", zap.Int64("node_id", Params.DataCoordCfg.NodeID), zap.String("req", req.Request), zap.String("metric_type", metricType)) @@ -719,7 +719,7 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest // CompleteCompaction completes a compaction with the result func (s *Server) CompleteCompaction(ctx context.Context, req *datapb.CompactionResult) (*commonpb.Status, error) { - log.Debug("receive complete compaction request", zap.Int64("planID", req.PlanID), zap.Int64("segmentID", req.GetSegmentID())) + log.Info("receive complete compaction request", zap.Int64("planID", req.PlanID), zap.Int64("segmentID", req.GetSegmentID())) resp := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -744,14 +744,14 @@ func (s *Server) CompleteCompaction(ctx context.Context, req *datapb.CompactionR return resp, nil } - log.Debug("success to complete compaction", zap.Int64("planID", req.PlanID)) + log.Info("success to complete compaction", zap.Int64("planID", req.PlanID)) resp.ErrorCode = commonpb.ErrorCode_Success return resp, nil } // ManualCompaction triggers a compaction for a collection func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) { - log.Debug("received manual compaction", zap.Int64("collectionID", req.GetCollectionID())) + log.Info("received manual compaction", zap.Int64("collectionID", req.GetCollectionID())) resp := &milvuspb.ManualCompactionResponse{ Status: &commonpb.Status{ @@ -785,7 +785,7 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa return resp, nil } - log.Debug("success to trigger manual compaction", zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("compactionID", id)) + log.Info("success to trigger manual compaction", zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("compactionID", id)) resp.Status.ErrorCode = commonpb.ErrorCode_Success resp.CompactionID = id return resp, nil @@ -793,7 +793,7 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa // GetCompactionState gets the state of a compaction func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) { - log.Debug("received get compaction state request", zap.Int64("compactionID", req.GetCompactionID())) + log.Info("received get compaction state request", zap.Int64("compactionID", req.GetCompactionID())) resp := &milvuspb.GetCompactionStateResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -820,14 +820,14 @@ func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompac resp.CompletedPlanNo = int64(completedCnt) resp.TimeoutPlanNo = int64(timeoutCnt) resp.Status.ErrorCode = commonpb.ErrorCode_Success - log.Debug("success to get compaction state", zap.Any("state", state), zap.Int("executing", executingCnt), + log.Info("success to get compaction state", zap.Any("state", state), zap.Int("executing", executingCnt), zap.Int("completed", completedCnt), zap.Int("timeout", timeoutCnt)) return resp, nil } // GetCompactionStateWithPlans returns the compaction state of given plan func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) { - log.Debug("received the request to get compaction state with plans", zap.Int64("compactionID", req.GetCompactionID())) + log.Info("received the request to get compaction state with plans", zap.Int64("compactionID", req.GetCompactionID())) resp := &milvuspb.GetCompactionPlansResponse{ Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, @@ -853,7 +853,7 @@ func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb. resp.Status.ErrorCode = commonpb.ErrorCode_Success resp.State = state - log.Debug("success to get state with plans", zap.Any("state", state), zap.Any("merge infos", resp.MergeInfos)) + log.Info("success to get state with plans", zap.Any("state", state), zap.Any("merge infos", resp.MergeInfos)) return resp, nil } @@ -896,7 +896,7 @@ func getCompactionState(tasks []*compactionTask) (state commonpb.CompactionState // WatchChannels notifies DataCoord to watch vchannels of a collection. func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) { - log.Debug("receive watch channels request", zap.Any("channels", req.GetChannelNames())) + log.Info("receive watch channels request", zap.Any("channels", req.GetChannelNames())) resp := &datapb.WatchChannelsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -928,7 +928,7 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq // GetFlushState gets the flush state of multiple segments func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) { - log.Debug("received get flush state request", zap.Int64s("segmentIDs", req.GetSegmentIDs()), zap.Int("len", len(req.GetSegmentIDs()))) + log.Info("received get flush state request", zap.Int64s("segmentIDs", req.GetSegmentIDs()), zap.Int("len", len(req.GetSegmentIDs()))) resp := &milvuspb.GetFlushStateResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}} if s.isClosed() { @@ -951,10 +951,10 @@ func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR } if len(unflushed) != 0 { - log.Debug("[flush state] unflushed segment ids", zap.Int64s("segmentIDs", unflushed), zap.Int("len", len(unflushed))) + log.Info("[flush state] unflushed segment ids", zap.Int64s("segmentIDs", unflushed), zap.Int("len", len(unflushed))) resp.Flushed = false } else { - log.Debug("[flush state] all segment is flushed", zap.Int64s("segment ids", req.GetSegmentIDs())) + log.Info("[flush state] all segment is flushed", zap.Int64s("segment ids", req.GetSegmentIDs())) resp.Flushed = true } diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index 695e88fc39..d8d0894099 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -122,7 +122,7 @@ func (c *SessionManager) execFlush(ctx context.Context, nodeID int64, req *datap return } - log.Debug("success to flush", zap.Int64("node", nodeID), zap.Any("segments", req)) + log.Info("success to flush", zap.Int64("node", nodeID), zap.Any("segments", req)) } // Compaction is a grpc interface. It will send request to DataNode with provided `nodeID` asynchronously. @@ -145,7 +145,7 @@ func (c *SessionManager) execCompaction(nodeID int64, plan *datapb.CompactionPla return } - log.Debug("success to execute compaction", zap.Int64("node", nodeID), zap.Any("planID", plan.GetPlanID())) + log.Info("success to execute compaction", zap.Int64("node", nodeID), zap.Any("planID", plan.GetPlanID())) } func (c *SessionManager) getClient(ctx context.Context, nodeID int64) (types.DataNode, error) { diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go index 40a3cc0405..da8aba33bc 100644 --- a/internal/datanode/binlog_io.go +++ b/internal/datanode/binlog_io.go @@ -205,7 +205,7 @@ func (b *binlogIO) upload( return nil, errUploadToBlobStorage default: if err != errStart { - log.Info("save binlog failed, retry in 50ms", + log.Warn("save binlog failed, retry in 50ms", zap.Int64("collectionID", meta.GetID()), zap.Int64("segmentID", segID)) <-time.After(50 * time.Millisecond) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 916d20998a..a6ed6bf5a0 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -206,7 +206,7 @@ func (node *DataNode) initSession() error { // Init function does nothing now. func (node *DataNode) Init() error { - log.Debug("DataNode Init", + log.Info("DataNode Init", zap.String("TimeTickChannelName", Params.MsgChannelCfg.DataCoordTimeTick), ) if err := node.initSession(); err != nil { @@ -225,7 +225,7 @@ func (node *DataNode) Init() error { zap.Error(err)) return err } - log.Debug("DataNode Init", + log.Info("DataNode Init successfully", zap.String("MsgChannelSubName", Params.MsgChannelCfg.DataNodeSubName)) return nil @@ -246,7 +246,7 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) { for { select { case <-ctx.Done(): - log.Debug("watch etcd loop quit") + log.Info("watch etcd loop quit") return case event := <-evtChan: if event.Canceled { // event canceled @@ -373,7 +373,7 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo) error { if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan()); err != nil { return fmt.Errorf("fail to add and start flowgraph for vChanName: %s, err: %v", vChanName, err) } - log.Debug("handle put event: new data sync service success", zap.String("vChanName", vChanName)) + log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName)) watchInfo.State = datapb.ChannelWatchState_Complete v, err := proto.Marshal(watchInfo) @@ -382,7 +382,7 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo) error { } k := path.Join(Params.DataNodeCfg.ChannelWatchSubPath, fmt.Sprintf("%d", node.NodeID), vChanName) - log.Debug("handle put event: try to save completed state", zap.String("key", k)) + log.Info("handle put event: try to save completed state", zap.String("key", k)) err = node.watchKv.Save(k, string(v)) // TODO DataNode unable to save into etcd, may need to panic @@ -552,7 +552,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen return status, nil } - log.Debug("Receive FlushSegments req", + log.Info("Receive FlushSegments req", zap.Int64("collectionID", req.GetCollectionID()), zap.Int("num", len(req.SegmentIDs)), zap.Int64s("segments", req.SegmentIDs), ) @@ -586,7 +586,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen flushed: flushed, } } - log.Debug("Flowgraph flushSegment tasks triggered", zap.Bool("flushed", flushed), + log.Info("Flowgraph flushSegment tasks triggered", zap.Bool("flushed", flushed), zap.Int64("collectionID", req.GetCollectionID()), zap.Int64s("segments", segmentIDs)) return noErr diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 5a5b839989..a96ab3c202 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -116,7 +116,7 @@ func newParallelConfig() parallelConfig { // start starts the flowgraph in datasyncservice func (dsService *dataSyncService) start() { if dsService.fg != nil { - log.Debug("dataSyncService starting flowgraph", zap.Int64("collectionID", dsService.collectionID), + log.Info("dataSyncService starting flowgraph", zap.Int64("collectionID", dsService.collectionID), zap.String("vChanName", dsService.vchannelName)) dsService.fg.Start() } else { @@ -127,7 +127,7 @@ func (dsService *dataSyncService) start() { func (dsService *dataSyncService) close() { if dsService.fg != nil { - log.Debug("dataSyncService closing flowgraph", zap.Int64("collectionID", dsService.collectionID), + log.Info("dataSyncService closing flowgraph", zap.Int64("collectionID", dsService.collectionID), zap.String("vChanName", dsService.vchannelName)) dsService.fg.Close() metrics.DataNodeNumConsumers.WithLabelValues(fmt.Sprint(dsService.collectionID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Dec() diff --git a/internal/datanode/event_manager.go b/internal/datanode/event_manager.go index 31c553d791..debfea8652 100644 --- a/internal/datanode/event_manager.go +++ b/internal/datanode/event_manager.go @@ -76,7 +76,7 @@ func (e *channelEventManager) Run() { err = e.handlePutEvent(event.info) if err == nil { - log.Debug("retry to handle put event successfully", + log.Info("retry to handle put event successfully", zap.String("vChanName", event.vChanName)) return } diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index e039859be6..b0dbe4980b 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -74,8 +74,6 @@ func (ddn *ddNode) Name() string { // Operate handles input messages, implementing flowgrpah.Node func (ddn *ddNode) Operate(in []Msg) []Msg { - // log.Debug("DDNode Operating") - if len(in) != 1 { log.Warn("Invalid operate message input in ddNode", zap.Int("input length", len(in))) return []Msg{} @@ -122,7 +120,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { zap.String("vChannelName", ddn.vchannelName)) ddn.dropMode.Store(true) - log.Debug("Stop compaction of vChannel", zap.String("vChannelName", ddn.vchannelName)) + log.Info("Stop compaction of vChannel", zap.String("vChannelName", ddn.vchannelName)) ddn.compactionExecutor.stopExecutingtaskByVChannelName(ddn.vchannelName) fgMsg.dropCollection = true } @@ -274,7 +272,7 @@ func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelI fs := make([]*datapb.SegmentInfo, 0, len(vchanInfo.GetFlushedSegments())) fs = append(fs, vchanInfo.GetFlushedSegments()...) - log.Debug("ddNode add flushed segment", + log.Info("ddNode add flushed segment", zap.Int64("collectionID", vchanInfo.GetCollectionID()), zap.Int("No. Segment", len(vchanInfo.GetFlushedSegments())), ) @@ -314,7 +312,7 @@ func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelI dd.segID2SegInfo.Store(us.GetID(), us) } - log.Debug("ddNode add unflushed segment", + log.Info("ddNode add unflushed segment", zap.Int64("collectionID", collID), zap.Int("No. Segment", len(vchanInfo.GetUnflushedSegments())), ) diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index 26378b47f3..49a34bed2c 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -46,17 +46,17 @@ func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNode pchannelName := rootcoord.ToPhysicalChannel(dmNodeConfig.vChannelName) insertStream.AsConsumer([]string{pchannelName}, consumeSubName) metrics.DataNodeNumConsumers.WithLabelValues(fmt.Sprint(dmNodeConfig.collectionID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc() - log.Debug("datanode AsConsumer", zap.String("physical channel", pchannelName), zap.String("subName", consumeSubName), zap.Int64("collection ID", dmNodeConfig.collectionID)) + log.Info("datanode AsConsumer", zap.String("physical channel", pchannelName), zap.String("subName", consumeSubName), zap.Int64("collection ID", dmNodeConfig.collectionID)) if seekPos != nil { seekPos.ChannelName = pchannelName start := time.Now() - log.Debug("datanode begin to seek", zap.String("physical channel", seekPos.GetChannelName()), zap.Int64("collection ID", dmNodeConfig.collectionID)) + log.Info("datanode begin to seek", zap.String("physical channel", seekPos.GetChannelName()), zap.Int64("collection ID", dmNodeConfig.collectionID)) err = insertStream.Seek([]*internalpb.MsgPosition{seekPos}) if err != nil { return nil, err } - log.Debug("datanode seek successfully", zap.String("physical channel", seekPos.GetChannelName()), zap.Int64("collection ID", dmNodeConfig.collectionID), zap.Duration("elapse", time.Since(start))) + log.Info("datanode seek successfully", zap.String("physical channel", seekPos.GetChannelName()), zap.Int64("collection ID", dmNodeConfig.collectionID), zap.Duration("elapse", time.Since(start))) } name := fmt.Sprintf("dmInputNode-%d-%s", dmNodeConfig.collectionID, dmNodeConfig.vChannelName) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index d74c912136..20dd757c46 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -237,7 +237,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { continue } - log.Debug("insert seg buffer status", zap.Int("No.", k), + log.Info("insert seg buffer status", zap.Int("No.", k), zap.Int64("segmentID", segID), zap.String("vchannel name", ibNode.channelName), zap.Int64("buffer size", bd.(*BufferData).size), @@ -259,7 +259,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { if fgMsg.dropCollection { segmentsToFlush := ibNode.replica.listAllSegmentIDs() - log.Debug("Recive drop collection req and flushing all segments", + log.Info("Receive drop collection req and flushing all segments", zap.Any("segments", segmentsToFlush), zap.String("vchannel name", ibNode.channelName), ) @@ -309,7 +309,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { select { case fmsg := <-ibNode.flushChan: - log.Debug(". Receiving flush message", + log.Info(". Receiving flush message", zap.Int64("segmentID", fmsg.segmentID), zap.Int64("collectionID", fmsg.collectionID), zap.String("vchannel name", ibNode.channelName), diff --git a/internal/datanode/flow_graph_manager.go b/internal/datanode/flow_graph_manager.go index 4a3a988995..2c1caa5e54 100644 --- a/internal/datanode/flow_graph_manager.go +++ b/internal/datanode/flow_graph_manager.go @@ -36,7 +36,7 @@ func newFlowgraphManager() *flowgraphManager { } func (fm *flowgraphManager) addAndStart(dn *DataNode, vchan *datapb.VchannelInfo) error { - log.Debug("received Vchannel Info", + log.Info("received Vchannel Info", zap.String("vChannelName", vchan.GetChannelName()), zap.Int("Unflushed Segment Number", len(vchan.GetUnflushedSegments())), zap.Int("Flushed Segment Number", len(vchan.GetFlushedSegments())), @@ -74,7 +74,7 @@ func (fm *flowgraphManager) addAndStart(dn *DataNode, vchan *datapb.VchannelInfo } func (fm *flowgraphManager) release(vchanName string) { - log.Debug("release flowgraph resources begin", zap.String("vChannelName", vchanName)) + log.Info("release flowgraph resources begin", zap.String("vChannelName", vchanName)) if fg, loaded := fm.flowgraphs.LoadAndDelete(vchanName); loaded { collectionID := fg.(*dataSyncService).collectionID @@ -83,7 +83,7 @@ func (fm *flowgraphManager) release(vchanName string) { metrics.DataNodeNumDmlChannels.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Dec() metrics.DataNodeNumDeltaChannels.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Dec() } - log.Debug("release flowgraph resources end", zap.String("Vchannel", vchanName)) + log.Info("release flowgraph resources end", zap.String("Vchannel", vchanName)) } func (fm *flowgraphManager) getFlushCh(segID UniqueID) (chan<- flushMsg, error) { @@ -124,13 +124,12 @@ func (fm *flowgraphManager) exist(vchan string) bool { } func (fm *flowgraphManager) dropAll() { - log.Debug("start drop all flowgraph resources in DataNode") + log.Info("start drop all flowgraph resources in DataNode") fm.flowgraphs.Range(func(key, value interface{}) bool { value.(*dataSyncService).close() fm.flowgraphs.Delete(key.(string)) - log.Debug("successfully dropped flowgraph", zap.String("vChannelName", key.(string))) + log.Info("successfully dropped flowgraph", zap.String("vChannelName", key.(string))) return true }) - log.Debug("end drop all flowgraph resources in DataNode") } diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index 5aaf89b0ab..95fd340c97 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -450,7 +450,7 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique kvs := map[string]string{blobPath: string(blob.Value[:])} data.LogSize = int64(len(blob.Value)) data.LogPath = blobPath - log.Debug("delete blob path", zap.String("path", blobPath)) + log.Info("delete blob path", zap.String("path", blobPath)) m.handleDeleteTask(segmentID, &flushBufferDeleteTask{ BaseKV: m.BaseKV, data: kvs, @@ -737,7 +737,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet startPos := dsService.replica.listNewSegmentsStartPositions() - log.Debug("SaveBinlogPath", + log.Info("SaveBinlogPath", zap.Int64("SegmentID", pack.segmentID), zap.Int64("CollectionID", dsService.collectionID), zap.Any("startPos", startPos), diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go index 4c261111ae..6a1953b128 100644 --- a/internal/datanode/segment_replica.go +++ b/internal/datanode/segment_replica.go @@ -240,7 +240,7 @@ func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID return fmt.Errorf("mismatch collection, ID=%d", collID) } - log.Debug("Add new segment", + log.Info("Add new segment", zap.Int64("segment ID", segID), zap.Int64("collection ID", collID), zap.Int64("partition ID", partitionID), @@ -330,7 +330,7 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu return fmt.Errorf("mismatch collection, ID=%d", collID) } - log.Debug("Add Normal segment", + log.Info("Add Normal segment", zap.Int64("segment ID", segID), zap.Int64("collection ID", collID), zap.Int64("partition ID", partitionID), @@ -379,7 +379,7 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq return fmt.Errorf("mismatch collection, ID=%d", collID) } - log.Debug("Add Flushed segment", + log.Info("Add Flushed segment", zap.Int64("segment ID", segID), zap.Int64("collection ID", collID), zap.Int64("partition ID", partitionID), @@ -415,9 +415,7 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq } func (replica *SegmentReplica) initPKBloomFilter(s *Segment, statsBinlogs []*datapb.FieldBinlog) error { - if len(statsBinlogs) == 0 { - log.Info("statsBinlogs is empty") - } + log.Info("begin to init pk bloom filter", zap.Int("stats bin logs", len(statsBinlogs))) schema, err := replica.getCollectionSchema(s.collectionID, 0) if err != nil { return err @@ -445,6 +443,7 @@ func (replica *SegmentReplica) initPKBloomFilter(s *Segment, statsBinlogs []*dat values, err := replica.minIOKV.MultiLoad(bloomFilterFiles) if err != nil { + log.Warn("failed to load bloom filter files", zap.Error(err)) return err } blobs := make([]*Blob, 0) @@ -454,6 +453,7 @@ func (replica *SegmentReplica) initPKBloomFilter(s *Segment, statsBinlogs []*dat stats, err := storage.DeserializeStats(blobs) if err != nil { + log.Warn("failed to deserialize bloom filter files", zap.Error(err)) return err } for _, stat := range stats { @@ -559,7 +559,7 @@ func (replica *SegmentReplica) removeSegments(segIDs ...UniqueID) { replica.segMu.Lock() defer replica.segMu.Unlock() - log.Debug("remove segments if exist", zap.Int64s("segmentIDs", segIDs)) + log.Info("remove segments if exist", zap.Int64s("segmentIDs", segIDs)) for _, segID := range segIDs { if seg, ok := replica.newSegments[segID]; ok { @@ -611,7 +611,7 @@ func (replica *SegmentReplica) updateStatistics(segID UniqueID, numRows int64) { replica.segMu.Lock() defer replica.segMu.Unlock() - log.Debug("updating segment", zap.Int64("Segment ID", segID), zap.Int64("numRows", numRows)) + log.Info("updating segment", zap.Int64("Segment ID", segID), zap.Int64("numRows", numRows)) if seg, ok := replica.newSegments[segID]; ok { seg.memorySize = 0 seg.numRows += numRows @@ -709,7 +709,7 @@ func (replica *SegmentReplica) mergeFlushedSegments(segID, collID, partID, planI return } - log.Debug("merge flushed segments", + log.Info("merge flushed segments", zap.Int64("planID", planID), zap.Int64("compacted To segmentID", segID), zap.Int64s("compacted From segmentIDs", compactedFrom), @@ -763,7 +763,7 @@ func (replica *SegmentReplica) addFlushedSegmentWithPKs(segID, collID, partID Un return } - log.Debug("Add Flushed segment", + log.Info("Add Flushed segment", zap.Int64("segment ID", segID), zap.Int64("collection ID", collID), zap.Int64("partition ID", partID),