diff --git a/internal/querycoordv2/balance/utils.go b/internal/querycoordv2/balance/utils.go index edec10d503..f53dfeee87 100644 --- a/internal/querycoordv2/balance/utils.go +++ b/internal/querycoordv2/balance/utils.go @@ -56,6 +56,7 @@ func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout t if err != nil { log.Warn("create segment task from plan failed", zap.Int64("collection", p.Segment.GetCollectionID()), + zap.Int64("segmentID", p.Segment.GetID()), zap.Int64("replica", p.ReplicaID), zap.String("channel", p.Segment.GetInsertChannel()), zap.Int64("from", p.From), @@ -67,6 +68,7 @@ func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout t log.Info("create segment task", zap.Int64("collection", p.Segment.GetCollectionID()), + zap.Int64("segmentID", p.Segment.GetID()), zap.Int64("replica", p.ReplicaID), zap.String("channel", p.Segment.GetInsertChannel()), zap.Int64("from", p.From), diff --git a/internal/querycoordv2/dist/dist_controller.go b/internal/querycoordv2/dist/dist_controller.go index 017ca3c1d2..dd43e01bf3 100644 --- a/internal/querycoordv2/dist/dist_controller.go +++ b/internal/querycoordv2/dist/dist_controller.go @@ -74,7 +74,12 @@ func (dc *ControllerImpl) SyncAll(ctx context.Context) { wg.Add(1) go func(handler *distHandler) { defer wg.Done() - handler.getDistribution(ctx) + resp, err := handler.getDistribution(ctx) + if err != nil { + log.Error("SyncAll come across err when getting data distribution", zap.Error(err)) + } else { + handler.handleDistResp(resp) + } }(h) } wg.Wait() diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 21bfcd3683..e38b97a63c 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -38,8 +38,9 @@ import ( ) const ( - distReqTimeout = 3 * time.Second - maxFailureTimes = 3 + distReqTimeout = 3 * time.Second + heartBeatLagBehindWarn = 3 * time.Second + maxFailureTimes = 3 ) type distHandler struct { @@ -71,16 +72,18 @@ func (dh *distHandler) start(ctx context.Context) { log.Info("close dist handler") return case <-ticker.C: - err := dh.getDistribution(ctx) + resp, err := dh.getDistribution(ctx) if err != nil { node := dh.nodeManager.Get(dh.nodeID) fields := []zap.Field{zap.Int("times", failures)} if node != nil { fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat())) } + fields = append(fields, zap.Error(err)) log.RatedWarn(30.0, "failed to get data distribution", fields...) } else { failures = 0 + dh.handleDistResp(resp) } } } @@ -93,6 +96,10 @@ func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse) session.WithSegmentCnt(len(resp.GetSegments())), session.WithChannelCnt(len(resp.GetChannels())), ) + if time.Since(node.LastHeartbeat()) > heartBeatLagBehindWarn { + log.Warn("node last heart beat time lag too behind", zap.Time("now", time.Now()), + zap.Time("lastHeartBeatTime", node.LastHeartbeat()), zap.Int64("nodeID", node.ID())) + } node.SetLastHeartbeat(time.Now()) } @@ -201,7 +208,7 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons dh.dist.LeaderViewManager.Update(resp.GetNodeID(), updates...) } -func (dh *distHandler) getDistribution(ctx context.Context) error { +func (dh *distHandler) getDistribution(ctx context.Context) (*querypb.GetDataDistributionResponse, error) { dh.mu.Lock() defer dh.mu.Unlock() @@ -225,14 +232,12 @@ func (dh *distHandler) getDistribution(ctx context.Context) error { }) if err != nil { - return err + return nil, err } if !merr.Ok(resp.GetStatus()) { - return merr.Error(resp.GetStatus()) + return nil, merr.Error(resp.GetStatus()) } - - dh.handleDistResp(resp) - return nil + return resp, nil } func (dh *distHandler) stop() {