From 7bb806b5f670822b36dffc683d3a62412898d70d Mon Sep 17 00:00:00 2001 From: sunby Date: Sat, 13 Mar 2021 11:59:24 +0800 Subject: [PATCH] Log msgID and role when log collection Signed-off-by: sunby --- internal/masterservice/master_service.go | 14 ++++++++++---- internal/masterservice/param_table.go | 7 +++++++ internal/proxynode/impl.go | 4 +++- internal/proxynode/paramtable.go | 6 ++++++ internal/proxynode/task.go | 10 +++++++++- internal/queryservice/param_table.go | 8 +++++++- internal/queryservice/queryservice.go | 23 ++++++++++++++++------- 7 files changed, 58 insertions(+), 14 deletions(-) diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 01f02b972e..8d327f1d77 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -1162,18 +1162,21 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques } func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) { + log.Debug("ShowPartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", in.Base.MsgID), + zap.String("collection", in.CollectionName)) code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { + log.Error("ShowPartitionRequest failed: master is not healthy", zap.String("role", Params.RoleName), + zap.Int64("msgID", in.Base.MsgID), zap.String("state", internalpb.StateCode_name[int32(code)])) return &milvuspb.ShowPartitionsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), + Reason: fmt.Sprintf("master is not healthy, state code = %s", internalpb.StateCode_name[int32(code)]), }, PartitionNames: nil, PartitionIDs: nil, }, nil } - log.Debug("ShowPartitions", zap.String("collection name", in.CollectionName)) t := &ShowPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1188,15 +1191,18 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { + log.Error("ShowPartitionsRequest failed", zap.String("role", Params.RoleName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) return &milvuspb.ShowPartitionsResponse{ PartitionNames: nil, Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "ShowPartitions failed: " + err.Error(), + Reason: err.Error(), }, }, nil } - log.Debug("ShowPartitions Success", zap.String("collection name", in.CollectionName), zap.Strings("partition names", t.Rsp.PartitionNames), zap.Int64s("partition ids", t.Rsp.PartitionIDs)) + log.Debug("ShowPartitions succeed", zap.String("role", Params.RoleName), zap.Int64("msgID", t.Req.Base.MsgID), + zap.String("collection name", in.CollectionName), zap.Strings("partition names", t.Rsp.PartitionNames), + zap.Int64s("partition ids", t.Rsp.PartitionIDs)) t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", diff --git a/internal/masterservice/param_table.go b/internal/masterservice/param_table.go index 93c20a95c6..50dc3f6d61 100644 --- a/internal/masterservice/param_table.go +++ b/internal/masterservice/param_table.go @@ -37,6 +37,8 @@ type ParamTable struct { Timeout int Log log.Config + + RoleName string } func (p *ParamTable) Init() { @@ -68,6 +70,7 @@ func (p *ParamTable) Init() { p.initTimeout() p.initLogCfg() + p.initRoleName() }) } @@ -209,3 +212,7 @@ func (p *ParamTable) initLogCfg() { p.Log.File.Filename = "" } } + +func (p *ParamTable) initRoleName() { + p.RoleName = fmt.Sprintf("%s-%d", "MasterService", p.NodeID) +} diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index 2064ef90e3..fc2223e9c1 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -181,15 +181,17 @@ func (node *ProxyNode) LoadCollection(ctx context.Context, request *milvuspb.Loa Reason: err.Error(), }, nil } - + log.Debug("LoadCollectionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID), zap.String("collection", request.CollectionName)) err = lct.WaitToFinish() if err != nil { + log.Error("LoadCollectionTask failed", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID), zap.Error(err)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), }, nil } + log.Debug("LoadCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID)) return lct.result, nil } diff --git a/internal/proxynode/paramtable.go b/internal/proxynode/paramtable.go index 6238806237..4db9adc93b 100644 --- a/internal/proxynode/paramtable.go +++ b/internal/proxynode/paramtable.go @@ -59,6 +59,7 @@ type ParamTable struct { PulsarMaxMessageSize int Log log.Config + RoleName string } var Params ParamTable @@ -158,6 +159,7 @@ func (pt *ParamTable) initParams() { pt.initDefaultIndexName() pt.initPulsarMaxMessageSize() + pt.initRoleName() } func (pt *ParamTable) initPulsarAddress() { @@ -478,3 +480,7 @@ func (pt *ParamTable) initLogCfg() { pt.Log.File.Filename = "" } } + +func (pt *ParamTable) initRoleName() { + pt.RoleName = fmt.Sprintf("%s-%d", "ProxyNode", pt.ProxyID) +} diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index cdac33420d..864347b9d7 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -1942,6 +1942,7 @@ func (lct *LoadCollectionTask) OnEnqueue() error { } func (lct *LoadCollectionTask) PreExecute(ctx context.Context) error { + log.Debug("LoadCollectionTask PreExecute", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID)) lct.Base.MsgType = commonpb.MsgType_LoadCollection lct.Base.SourceID = Params.ProxyID @@ -1955,6 +1956,7 @@ func (lct *LoadCollectionTask) PreExecute(ctx context.Context) error { } func (lct *LoadCollectionTask) Execute(ctx context.Context) (err error) { + log.Debug("LoadCollectionTask Execute", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID)) collID, err := globalMetaCache.GetCollectionID(ctx, lct.CollectionName) if err != nil { return err @@ -1975,11 +1977,17 @@ func (lct *LoadCollectionTask) Execute(ctx context.Context) (err error) { CollectionID: collID, Schema: collSchema, } + log.Debug("send LoadCollectionRequest to query service", zap.String("role", Params.RoleName), zap.Int64("msgID", request.Base.MsgID), zap.Int64("collectionID", request.CollectionID), + zap.Any("schema", request.Schema)) lct.result, err = lct.queryService.LoadCollection(ctx, request) - return err + if err != nil { + return fmt.Errorf("call query service LoadCollection: %s", err) + } + return nil } func (lct *LoadCollectionTask) PostExecute(ctx context.Context) error { + log.Debug("LoadCollectionTask PostExecute", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID)) return nil } diff --git a/internal/queryservice/param_table.go b/internal/queryservice/param_table.go index 7b737db3d5..bca1f2bd66 100644 --- a/internal/queryservice/param_table.go +++ b/internal/queryservice/param_table.go @@ -27,7 +27,8 @@ type ParamTable struct { // timetick TimeTickChannelName string - Log log.Config + Log log.Config + RoleName string } var Params ParamTable @@ -57,6 +58,7 @@ func (p *ParamTable) Init() { p.initStatsChannelName() p.initTimeTickChannelName() p.initQueryServiceAddress() + p.initRoleName() }) } @@ -123,3 +125,7 @@ func (p *ParamTable) initQueryServiceAddress() { } p.Address = url } + +func (p *ParamTable) initRoleName() { + p.RoleName = fmt.Sprintf("%s-%d", "QueryService", p.NodeID) +} diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index ae57cf8be2..bd9c7b3be0 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -207,6 +207,8 @@ func (qs *QueryService) ShowCollections(ctx context.Context, req *querypb.ShowCo } func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) { + log.Debug("LoadCollectionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), + zap.Stringer("schema", req.Schema)) dbID := req.DbID collectionID := req.CollectionID schema := req.Schema @@ -222,7 +224,7 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol } } - log.Debug("load collection start", zap.String("collectionID", fmt.Sprintln(collectionID))) + log.Debug("load collection start", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID)) _, err := qs.replica.getCollectionByID(dbID, collectionID) if err != nil { @@ -241,14 +243,16 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol showPartitionRequest := &milvuspb.ShowPartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ShowPartitions, + MsgID: req.Base.MsgID, }, CollectionID: collectionID, } showPartitionResponse, err := qs.masterServiceClient.ShowPartitions(ctx, showPartitionRequest) if err != nil { - return fn(err), err + return fn(err), fmt.Errorf("call master ShowPartitions: %s", err) } + log.Debug("ShowPartitions returned from Master", zap.String("role", Params.RoleName), zap.Int64("msgID", showPartitionRequest.Base.MsgID)) if showPartitionResponse.Status.ErrorCode != commonpb.ErrorCode_Success { return showPartitionResponse.Status, err } @@ -273,8 +277,7 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol } if len(partitionIDsToLoad) == 0 { - log.Debug("load collection end", zap.String("collectionID", fmt.Sprintln(collectionID))) - + log.Debug("LoadCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.String("collectionID", fmt.Sprintln(collectionID))) return &commonpb.Status{ Reason: "Partitions has been already loaded!", ErrorCode: commonpb.ErrorCode_Success, @@ -290,9 +293,13 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol } status, err := qs.LoadPartitions(ctx, loadPartitionsRequest) + if err != nil { + log.Error("LoadCollectionRequest failed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + return status, fmt.Errorf("load partitions: %s", err) + } + log.Debug("LoadCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID)) + return status, nil - log.Debug("load collection end", zap.String("collectionID", fmt.Sprintln(collectionID))) - return status, err } func (qs *QueryService) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { @@ -356,6 +363,8 @@ func (qs *QueryService) ShowPartitions(ctx context.Context, req *querypb.ShowPar func (qs *QueryService) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) { //TODO::suggest different partitions have different dm channel + log.Debug("LoadPartitionRequest received", zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), + zap.Stringer("schema", req.Schema)) dbID := req.DbID collectionID := req.CollectionID partitionIDs := req.PartitionIDs @@ -482,7 +491,7 @@ func (qs *QueryService) LoadPartitions(ctx context.Context, req *querypb.LoadPar qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory) } - log.Debug("load partitions end", zap.String("partitionIDs", fmt.Sprintln(partitionIDs))) + log.Debug("LoadPartitionRequest completed", zap.Int64("msgID", req.Base.MsgID), zap.Int64s("partitionIDs", partitionIDs)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, nil