diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 543c317022..b90fc98722 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -74,8 +74,6 @@ func (node *DataNode) GetComponentStates(ctx context.Context, req *milvuspb.GetC func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) { serverID := node.GetNodeID() - metrics.DataNodeFlushReqCounter.WithLabelValues(fmt.Sprint(serverID), metrics.TotalLabel).Inc() - log := log.Ctx(ctx).With( zap.Int64("nodeID", serverID), zap.Int64("collectionID", req.GetCollectionID()), @@ -83,7 +81,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen zap.Int64s("segmentIDs", req.GetSegmentIDs()), ) log.Info("receive FlushSegments request") - if err := merr.CheckHealthy(node.GetStateCode()); err != nil { log.Warn("failed to FlushSegments", zap.Error(err)) return merr.Status(err), nil @@ -101,8 +98,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen } log.Info("success to FlushSegments") - - metrics.DataNodeFlushReqCounter.WithLabelValues(fmt.Sprint(serverID), metrics.SuccessLabel).Inc() return merr.Success(), nil } @@ -371,12 +366,13 @@ func (node *DataNode) CheckChannelOperationProgress(ctx context.Context, req *da } func (node *DataNode) FlushChannels(ctx context.Context, req *datapb.FlushChannelsRequest) (*commonpb.Status, error) { + metrics.DataNodeFlushReqCounter.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.TotalLabel).Inc() log := log.Ctx(ctx).With(zap.Int64("nodeId", node.GetNodeID()), - zap.Time("flushTs", tsoutil.PhysicalTime(req.GetFlushTs())), + zap.Uint64("flushTs", req.GetFlushTs()), + zap.Time("flushTs in Time", tsoutil.PhysicalTime(req.GetFlushTs())), zap.Strings("channels", req.GetChannels())) log.Info("DataNode receives FlushChannels request") - if err := merr.CheckHealthy(node.GetStateCode()); err != nil { log.Warn("DataNode.FlushChannels failed", zap.Error(err)) return merr.Status(err), nil @@ -385,11 +381,13 @@ func (node *DataNode) FlushChannels(ctx context.Context, req *datapb.FlushChanne for _, channel := range req.GetChannels() { err := node.writeBufferManager.FlushChannel(ctx, channel, req.GetFlushTs()) if err != nil { - log.Warn("failed to flush channel", zap.String("channel", channel), zap.Error(err)) + log.Warn("WriteBufferManager failed to flush channel", zap.String("channel", channel), zap.Error(err)) return merr.Status(err), nil } } + metrics.DataNodeFlushReqCounter.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SuccessLabel).Inc() + log.Info("success to FlushChannels") return merr.Success(), nil }