From 2b81933d13360bed5da3b7c2b0460966ba790c29 Mon Sep 17 00:00:00 2001 From: yah01 Date: Tue, 21 Mar 2023 18:11:57 +0800 Subject: [PATCH] Refine logs of DistHandler (#22879) Signed-off-by: yah01 --- internal/querycoordv2/dist/dist_controller.go | 2 +- .../querycoordv2/dist/dist_controller_test.go | 6 +- internal/querycoordv2/dist/dist_handler.go | 73 +++++++------------ .../querycoordv2/observers/target_observer.go | 3 +- 4 files changed, 32 insertions(+), 52 deletions(-) diff --git a/internal/querycoordv2/dist/dist_controller.go b/internal/querycoordv2/dist/dist_controller.go index 4bfe836dd2..100f36937b 100644 --- a/internal/querycoordv2/dist/dist_controller.go +++ b/internal/querycoordv2/dist/dist_controller.go @@ -73,7 +73,7 @@ func (dc *ControllerImpl) SyncAll(ctx context.Context) { wg.Add(1) go func(handler *distHandler) { defer wg.Done() - handler.getDistribution(ctx, nil) + handler.getDistribution(ctx) }(h) } wg.Wait() diff --git a/internal/querycoordv2/dist/dist_controller_test.go b/internal/querycoordv2/dist/dist_controller_test.go index 0f0ff913d6..c948349918 100644 --- a/internal/querycoordv2/dist/dist_controller_test.go +++ b/internal/querycoordv2/dist/dist_controller_test.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/util/etcd" + "github.com/milvus-io/milvus/internal/util/merr" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -83,7 +84,7 @@ func (suite *DistControllerTestSuite) TearDownSuite() { func (suite *DistControllerTestSuite) TestStart() { dispatchCalled := atomic.NewBool(false) suite.mockCluster.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return( - &querypb.GetDataDistributionResponse{NodeID: 1}, + &querypb.GetDataDistributionResponse{Status: merr.Status(nil), NodeID: 1}, nil, ) suite.mockScheduler.EXPECT().Dispatch(int64(1)).Run(func(node int64) { dispatchCalled.Store(true) }) @@ -111,7 +112,7 @@ func (suite *DistControllerTestSuite) TestStop() { suite.controller.StartDistInstance(context.TODO(), 1) called := atomic.NewBool(false) suite.mockCluster.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Maybe().Return( - &querypb.GetDataDistributionResponse{NodeID: 1}, + &querypb.GetDataDistributionResponse{Status: merr.Status(nil), NodeID: 1}, nil, ).Run(func(args mock.Arguments) { called.Store(true) @@ -136,6 +137,7 @@ func (suite *DistControllerTestSuite) TestSyncAll() { suite.mockCluster.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Call.Return( func(ctx context.Context, nodeID int64, req *querypb.GetDataDistributionRequest) *querypb.GetDataDistributionResponse { return &querypb.GetDataDistributionResponse{ + Status: merr.Status(nil), NodeID: nodeID, } }, diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 5174ee3998..b689f885bc 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -18,7 +18,6 @@ package dist import ( "context" - "fmt" "sync" "time" @@ -32,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/util/commonpbutil" + "github.com/milvus-io/milvus/internal/util/merr" "go.uber.org/zap" ) @@ -55,55 +55,35 @@ type distHandler struct { func (dh *distHandler) start(ctx context.Context) { defer dh.wg.Done() - logger := log.Ctx(ctx).With(zap.Int64("nodeID", dh.nodeID)).WithRateGroup("qnv2.distHandler", 1, 60) - logger.Info("start dist handler") + log := log.Ctx(ctx).With(zap.Int64("nodeID", dh.nodeID)).WithRateGroup("qnv2.distHandler", 1, 60) + log.Info("start dist handler") ticker := time.NewTicker(Params.QueryCoordCfg.DistPullInterval.GetAsDuration(time.Millisecond)) defer ticker.Stop() failures := 0 for { select { case <-ctx.Done(): - logger.Info("close dist handler due to context done") + log.Info("close dist handler due to context done") return case <-dh.c: - logger.Info("close dist handelr") + log.Info("close dist handler") return case <-ticker.C: - dh.getDistribution(ctx, func(isFail bool) { - if isFail { - failures++ - node := dh.nodeManager.Get(dh.nodeID) - if node != nil { - log.RatedDebug(30.0, "failed to get node's data distribution", - zap.Int64("nodeID", dh.nodeID), - zap.Time("lastHeartbeat", node.LastHeartbeat()), - ) - } - } else { - failures = 0 + err := dh.getDistribution(ctx) + if err != nil { + node := dh.nodeManager.Get(dh.nodeID) + fields := []zap.Field{zap.Int("times", failures)} + if node != nil { + fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat())) } - - if failures >= maxFailureTimes { - log.RatedInfo(30.0, fmt.Sprintf("can not get data distribution from node %d for %d times", dh.nodeID, failures)) - // TODO: kill the querynode server and stop the loop? - } - }) + log.RatedWarn(30.0, "failed to get data distribution", fields...) + } else { + failures = 0 + } } } } -func (dh *distHandler) logFailureInfo(resp *querypb.GetDataDistributionResponse, err error) { - log := log.With(zap.Int64("nodeID", dh.nodeID)) - if err != nil { - log.Warn("failed to get data distribution", - zap.Error(err)) - } else if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - log.Warn("failed to get data distribution", - zap.Any("errorCode", resp.GetStatus().GetErrorCode()), - zap.Any("reason", resp.GetStatus().GetReason())) - } -} - func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse) { node := dh.nodeManager.Get(resp.GetNodeID()) if node != nil { @@ -217,27 +197,26 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons dh.dist.LeaderViewManager.Update(resp.GetNodeID(), updates...) } -func (dh *distHandler) getDistribution(ctx context.Context, fn func(isFail bool)) { +func (dh *distHandler) getDistribution(ctx context.Context) error { dh.mu.Lock() defer dh.mu.Unlock() - cctx, cancel := context.WithTimeout(ctx, distReqTimeout) - resp, err := dh.client.GetDataDistribution(cctx, dh.nodeID, &querypb.GetDataDistributionRequest{ + ctx, cancel := context.WithTimeout(ctx, distReqTimeout) + defer cancel() + resp, err := dh.client.GetDataDistribution(ctx, dh.nodeID, &querypb.GetDataDistributionRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_GetDistribution), ), }) - cancel() - isFail := err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success - if isFail { - dh.logFailureInfo(resp, err) - } else { - dh.handleDistResp(resp) + if err != nil { + return err + } + if !merr.Ok(resp.GetStatus()) { + return merr.Error(resp.GetStatus()) } - if fn != nil { - fn(isFail) - } + dh.handleDistResp(resp) + return nil } func (dh *distHandler) stop() { diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 61e0f8bf74..cfc7ff349e 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -267,8 +267,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(collectionID int64) bool { } func (ob *TargetObserver) updateCurrentTarget(collectionID int64) { - log.Warn("observer trigger update current target", - zap.Int64("collectionID", collectionID)) + log.Info("observer trigger update current target", zap.Int64("collectionID", collectionID)) ob.targetMgr.UpdateCollectionCurrentTarget(collectionID) ob.mut.Lock()