mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
refine log and process for distribution handler (#24239)
Signed-off-by: MrPresent-Han <chun.han@zilliz.com>
This commit is contained in:
parent
6da8853756
commit
675821c79d
@ -56,6 +56,7 @@ func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout t
|
||||
if err != nil {
|
||||
log.Warn("create segment task from plan failed",
|
||||
zap.Int64("collection", p.Segment.GetCollectionID()),
|
||||
zap.Int64("segmentID", p.Segment.GetID()),
|
||||
zap.Int64("replica", p.ReplicaID),
|
||||
zap.String("channel", p.Segment.GetInsertChannel()),
|
||||
zap.Int64("from", p.From),
|
||||
@ -67,6 +68,7 @@ func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout t
|
||||
|
||||
log.Info("create segment task",
|
||||
zap.Int64("collection", p.Segment.GetCollectionID()),
|
||||
zap.Int64("segmentID", p.Segment.GetID()),
|
||||
zap.Int64("replica", p.ReplicaID),
|
||||
zap.String("channel", p.Segment.GetInsertChannel()),
|
||||
zap.Int64("from", p.From),
|
||||
|
||||
@ -74,7 +74,12 @@ func (dc *ControllerImpl) SyncAll(ctx context.Context) {
|
||||
wg.Add(1)
|
||||
go func(handler *distHandler) {
|
||||
defer wg.Done()
|
||||
handler.getDistribution(ctx)
|
||||
resp, err := handler.getDistribution(ctx)
|
||||
if err != nil {
|
||||
log.Error("SyncAll come across err when getting data distribution", zap.Error(err))
|
||||
} else {
|
||||
handler.handleDistResp(resp)
|
||||
}
|
||||
}(h)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
19
internal/querycoordv2/dist/dist_handler.go
vendored
19
internal/querycoordv2/dist/dist_handler.go
vendored
@ -39,6 +39,7 @@ import (
|
||||
|
||||
const (
|
||||
distReqTimeout = 3 * time.Second
|
||||
heartBeatLagBehindWarn = 3 * time.Second
|
||||
maxFailureTimes = 3
|
||||
)
|
||||
|
||||
@ -71,16 +72,18 @@ func (dh *distHandler) start(ctx context.Context) {
|
||||
log.Info("close dist handler")
|
||||
return
|
||||
case <-ticker.C:
|
||||
err := dh.getDistribution(ctx)
|
||||
resp, err := dh.getDistribution(ctx)
|
||||
if err != nil {
|
||||
node := dh.nodeManager.Get(dh.nodeID)
|
||||
fields := []zap.Field{zap.Int("times", failures)}
|
||||
if node != nil {
|
||||
fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat()))
|
||||
}
|
||||
fields = append(fields, zap.Error(err))
|
||||
log.RatedWarn(30.0, "failed to get data distribution", fields...)
|
||||
} else {
|
||||
failures = 0
|
||||
dh.handleDistResp(resp)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -93,6 +96,10 @@ func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse)
|
||||
session.WithSegmentCnt(len(resp.GetSegments())),
|
||||
session.WithChannelCnt(len(resp.GetChannels())),
|
||||
)
|
||||
if time.Since(node.LastHeartbeat()) > heartBeatLagBehindWarn {
|
||||
log.Warn("node last heart beat time lag too behind", zap.Time("now", time.Now()),
|
||||
zap.Time("lastHeartBeatTime", node.LastHeartbeat()), zap.Int64("nodeID", node.ID()))
|
||||
}
|
||||
node.SetLastHeartbeat(time.Now())
|
||||
}
|
||||
|
||||
@ -201,7 +208,7 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons
|
||||
dh.dist.LeaderViewManager.Update(resp.GetNodeID(), updates...)
|
||||
}
|
||||
|
||||
func (dh *distHandler) getDistribution(ctx context.Context) error {
|
||||
func (dh *distHandler) getDistribution(ctx context.Context) (*querypb.GetDataDistributionResponse, error) {
|
||||
dh.mu.Lock()
|
||||
defer dh.mu.Unlock()
|
||||
|
||||
@ -225,14 +232,12 @@ func (dh *distHandler) getDistribution(ctx context.Context) error {
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
if !merr.Ok(resp.GetStatus()) {
|
||||
return merr.Error(resp.GetStatus())
|
||||
return nil, merr.Error(resp.GetStatus())
|
||||
}
|
||||
|
||||
dh.handleDistResp(resp)
|
||||
return nil
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (dh *distHandler) stop() {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user