From 7d6624fcadc17a135edefea55c75858ee8be1916 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Tue, 5 Jul 2022 16:46:23 +0800 Subject: [PATCH] Improve flow graph node name and log msgID in QueryCoord (#18043) Signed-off-by: bigsheeper --- internal/querycoord/task.go | 74 ++++++++++++------- internal/querynode/flow_graph_delete_node.go | 8 +- .../flow_graph_filter_delete_node.go | 8 +- .../querynode/flow_graph_filter_dm_node.go | 8 +- internal/querynode/flow_graph_insert_node.go | 8 +- .../querynode/flow_graph_service_time_node.go | 8 +- 6 files changed, 69 insertions(+), 45 deletions(-) diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 7929cb2948..d6c091b90f 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -400,7 +400,8 @@ func (lct *loadCollectionTask) preExecute(ctx context.Context) error { } log.Info("start do loadCollectionTask", - zap.Int64("msgID", lct.getTaskID()), + zap.Int64("taskID", lct.getTaskID()), + zap.Int64("msgID", lct.GetBase().GetMsgID()), zap.Int64("collectionID", collectionID), zap.Stringer("schema", schema), zap.Int32("replicaNumber", lct.ReplicaNumber)) @@ -607,7 +608,8 @@ func (lct *loadCollectionTask) postExecute(ctx context.Context) error { } log.Info("loadCollectionTask postExecute done", - zap.Int64("msgID", lct.getTaskID()), + zap.Int64("taskID", lct.getTaskID()), + zap.Int64("msgID", lct.GetBase().GetMsgID()), zap.Int64("collectionID", collectionID)) return nil } @@ -617,6 +619,7 @@ func (lct *loadCollectionTask) globalPostExecute(ctx context.Context) error { if err != nil { log.Error("loadCollectionTask: failed to get collection info from meta", zap.Int64("taskID", lct.getTaskID()), + zap.Int64("msgID", lct.GetBase().GetMsgID()), zap.Int64("collectionID", lct.CollectionID), zap.Error(err)) @@ -628,6 +631,7 @@ func (lct *loadCollectionTask) globalPostExecute(ctx context.Context) error { if err != nil { log.Error("loadCollectionTask: failed to sync replica segments to shard leader", zap.Int64("taskID", lct.getTaskID()), + zap.Int64("msgID", lct.GetBase().GetMsgID()), zap.Int64("collectionID", lct.CollectionID), zap.Error(err)) @@ -725,7 +729,8 @@ func (rct *releaseCollectionTask) preExecute(context.Context) error { collectionID := rct.CollectionID rct.setResultInfo(nil) log.Info("start do releaseCollectionTask", - zap.Int64("msgID", rct.getTaskID()), + zap.Int64("taskID", rct.getTaskID()), + zap.Int64("msgID", rct.GetBase().GetMsgID()), zap.Int64("collectionID", collectionID)) return nil } @@ -772,7 +777,8 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error { } log.Info("releaseCollectionTask Execute done", - zap.Int64("msgID", rct.getTaskID()), + zap.Int64("taskID", rct.getTaskID()), + zap.Int64("msgID", rct.GetBase().GetMsgID()), zap.Int64("collectionID", collectionID), zap.Int64("nodeID", rct.NodeID)) return nil @@ -785,7 +791,8 @@ func (rct *releaseCollectionTask) postExecute(context.Context) error { } log.Info("releaseCollectionTask postExecute done", - zap.Int64("msgID", rct.getTaskID()), + zap.Int64("taskID", rct.getTaskID()), + zap.Int64("msgID", rct.GetBase().GetMsgID()), zap.Int64("collectionID", collectionID), zap.Int64("nodeID", rct.NodeID)) return nil @@ -906,7 +913,8 @@ func (lpt *loadPartitionTask) preExecute(context.Context) error { } log.Info("start do loadPartitionTask", - zap.Int64("msgID", lpt.getTaskID()), + zap.Int64("taskID", lpt.getTaskID()), + zap.Int64("msgID", lpt.GetBase().GetMsgID()), zap.Int64("collectionID", collectionID)) return nil } @@ -1089,10 +1097,10 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error { } log.Info("loadPartitionTask Execute done", - zap.Int64("msgID", lpt.getTaskID()), + zap.Int64("taskID", lpt.getTaskID()), + zap.Int64("msgID", lpt.GetBase().GetMsgID()), zap.Int64("collectionID", collectionID), - zap.Int64s("partitionIDs", partitionIDs), - zap.Int64("msgID", lpt.Base.MsgID)) + zap.Int64s("partitionIDs", partitionIDs)) return nil } @@ -1104,7 +1112,8 @@ func (lpt *loadPartitionTask) postExecute(ctx context.Context) error { } log.Info("loadPartitionTask postExecute done", - zap.Int64("msgID", lpt.getTaskID()), + zap.Int64("taskID", lpt.getTaskID()), + zap.Int64("msgID", lpt.GetBase().GetMsgID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) return nil @@ -1234,7 +1243,8 @@ func (rpt *releasePartitionTask) preExecute(context.Context) error { partitionIDs := rpt.PartitionIDs rpt.setResultInfo(nil) log.Info("start do releasePartitionTask", - zap.Int64("msgID", rpt.getTaskID()), + zap.Int64("taskID", rpt.getTaskID()), + zap.Int64("msgID", rpt.GetBase().GetMsgID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) return nil @@ -1274,7 +1284,8 @@ func (rpt *releasePartitionTask) execute(ctx context.Context) error { } log.Info("releasePartitionTask Execute done", - zap.Int64("msgID", rpt.getTaskID()), + zap.Int64("taskID", rpt.getTaskID()), + zap.Int64("msgID", rpt.GetBase().GetMsgID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Int64("nodeID", rpt.NodeID)) @@ -1289,7 +1300,8 @@ func (rpt *releasePartitionTask) postExecute(context.Context) error { } log.Info("releasePartitionTask postExecute done", - zap.Int64("msgID", rpt.getTaskID()), + zap.Int64("taskID", rpt.getTaskID()), + zap.Int64("msgID", rpt.GetBase().GetMsgID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Int64("nodeID", rpt.NodeID)) @@ -1356,7 +1368,8 @@ func (lst *loadSegmentTask) preExecute(ctx context.Context) error { log.Info("start do loadSegmentTask", zap.Int64s("segmentIDs", segmentIDs), zap.Int64("loaded nodeID", lst.DstNodeID), - zap.Int64("taskID", lst.getTaskID())) + zap.Int64("taskID", lst.getTaskID()), + zap.Int64("msgID", lst.GetBase().GetMsgID())) if err := lst.broker.acquireSegmentsReferLock(ctx, lst.taskID, segmentIDs); err != nil { log.Error("acquire reference lock on segments failed", zap.Int64s("segmentIDs", segmentIDs), @@ -1377,7 +1390,8 @@ func (lst *loadSegmentTask) execute(ctx context.Context) error { } log.Info("loadSegmentTask Execute done", - zap.Int64("taskID", lst.getTaskID())) + zap.Int64("taskID", lst.getTaskID()), + zap.Int64("msgID", lst.GetBase().GetMsgID())) return nil } @@ -1391,7 +1405,8 @@ func (lst *loadSegmentTask) postExecute(context.Context) error { } log.Info("loadSegmentTask postExecute done", - zap.Int64("taskID", lst.getTaskID())) + zap.Int64("taskID", lst.getTaskID()), + zap.Int64("msgID", lst.GetBase().GetMsgID())) return nil } @@ -1466,7 +1481,8 @@ func (rst *releaseSegmentTask) preExecute(context.Context) error { log.Info("start do releaseSegmentTask", zap.Int64s("segmentIDs", segmentIDs), zap.Int64("loaded nodeID", rst.NodeID), - zap.Int64("taskID", rst.getTaskID())) + zap.Int64("taskID", rst.getTaskID()), + zap.Int64("msgID", rst.GetBase().GetMsgID())) return nil } @@ -1482,7 +1498,8 @@ func (rst *releaseSegmentTask) execute(ctx context.Context) error { log.Info("releaseSegmentTask Execute done", zap.Int64s("segmentIDs", rst.SegmentIDs), - zap.Int64("taskID", rst.getTaskID())) + zap.Int64("taskID", rst.getTaskID()), + zap.Int64("msgID", rst.GetBase().GetMsgID())) return nil } @@ -1490,7 +1507,8 @@ func (rst *releaseSegmentTask) postExecute(context.Context) error { segmentIDs := rst.SegmentIDs log.Info("releaseSegmentTask postExecute done", zap.Int64s("segmentIDs", segmentIDs), - zap.Int64("taskID", rst.getTaskID())) + zap.Int64("taskID", rst.getTaskID()), + zap.Int64("msgID", rst.GetBase().GetMsgID())) return nil } @@ -1545,7 +1563,8 @@ func (wdt *watchDmChannelTask) preExecute(context.Context) error { log.Info("start do watchDmChannelTask", zap.Strings("dmChannels", channels), zap.Int64("loaded nodeID", wdt.NodeID), - zap.Int64("taskID", wdt.getTaskID())) + zap.Int64("taskID", wdt.getTaskID()), + zap.Int64("msgID", wdt.GetBase().GetMsgID())) return nil } @@ -1560,13 +1579,15 @@ func (wdt *watchDmChannelTask) execute(ctx context.Context) error { } log.Info("watchDmChannelsTask Execute done", - zap.Int64("taskID", wdt.getTaskID())) + zap.Int64("taskID", wdt.getTaskID()), + zap.Int64("msgID", wdt.GetBase().GetMsgID())) return nil } func (wdt *watchDmChannelTask) postExecute(context.Context) error { log.Info("watchDmChannelTask postExecute done", - zap.Int64("taskID", wdt.getTaskID())) + zap.Int64("taskID", wdt.getTaskID()), + zap.Int64("msgID", wdt.GetBase().GetMsgID())) return nil } @@ -1660,7 +1681,8 @@ func (wdt *watchDeltaChannelTask) preExecute(context.Context) error { log.Info("start do watchDeltaChannelTask", zap.Strings("deltaChannels", channels), zap.Int64("loaded nodeID", wdt.NodeID), - zap.Int64("taskID", wdt.getTaskID())) + zap.Int64("taskID", wdt.getTaskID()), + zap.Int64("msgID", wdt.GetBase().GetMsgID())) return nil } @@ -1675,13 +1697,15 @@ func (wdt *watchDeltaChannelTask) execute(ctx context.Context) error { } log.Info("watchDeltaChannelsTask Execute done", - zap.Int64("taskID", wdt.getTaskID())) + zap.Int64("taskID", wdt.getTaskID()), + zap.Int64("msgID", wdt.GetBase().GetMsgID())) return nil } func (wdt *watchDeltaChannelTask) postExecute(context.Context) error { log.Info("watchDeltaChannelTask postExecute done", - zap.Int64("taskID", wdt.getTaskID())) + zap.Int64("taskID", wdt.getTaskID()), + zap.Int64("msgID", wdt.GetBase().GetMsgID())) return nil } diff --git a/internal/querynode/flow_graph_delete_node.go b/internal/querynode/flow_graph_delete_node.go index 7e60d06591..edd530588a 100644 --- a/internal/querynode/flow_graph_delete_node.go +++ b/internal/querynode/flow_graph_delete_node.go @@ -47,22 +47,22 @@ type deleteNode struct { // Name returns the name of deleteNode func (dNode *deleteNode) Name() string { - return "dNode" + return fmt.Sprintf("dNode-%s", dNode.channel) } // Operate handles input messages, do delete operations func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if len(in) != 1 { - log.Warn("Invalid operate message input in deleteNode", zap.Int("input length", len(in))) + log.Warn("Invalid operate message input in deleteNode", zap.Int("input length", len(in)), zap.String("name", dNode.Name())) return []Msg{} } dMsg, ok := in[0].(*deleteMsg) if !ok { if in[0] == nil { - log.Debug("type assertion failed for deleteMsg because it's nil") + log.Debug("type assertion failed for deleteMsg because it's nil", zap.String("name", dNode.Name())) } else { - log.Warn("type assertion failed for deleteMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + log.Warn("type assertion failed for deleteMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", dNode.Name())) } return []Msg{} } diff --git a/internal/querynode/flow_graph_filter_delete_node.go b/internal/querynode/flow_graph_filter_delete_node.go index bff1ce63b7..f98c4c3759 100644 --- a/internal/querynode/flow_graph_filter_delete_node.go +++ b/internal/querynode/flow_graph_filter_delete_node.go @@ -40,22 +40,22 @@ type filterDeleteNode struct { // Name returns the name of filterDeleteNode func (fddNode *filterDeleteNode) Name() string { - return fmt.Sprintf("fdNode-%d", fddNode.collectionID) + return fmt.Sprintf("fdNode-%s", fddNode.channel) } // Operate handles input messages, to filter invalid delete messages func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if len(in) != 1 { - log.Warn("Invalid operate message input in filterDDNode", zap.Int("input length", len(in))) + log.Warn("Invalid operate message input in filterDDNode", zap.Int("input length", len(in)), zap.String("name", fddNode.Name())) return []Msg{} } msgStreamMsg, ok := in[0].(*MsgStreamMsg) if !ok { if in[0] == nil { - log.Debug("type assertion failed for MsgStreamMsg because it's nil") + log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fddNode.Name())) } else { - log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fddNode.Name())) } return []Msg{} } diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index 60f8f9c46b..0550873106 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -40,22 +40,22 @@ type filterDmNode struct { // Name returns the name of filterDmNode func (fdmNode *filterDmNode) Name() string { - return fmt.Sprintf("fdmNode-%d", fdmNode.collectionID) + return fmt.Sprintf("fdmNode-%s", fdmNode.channel) } // Operate handles input messages, to filter invalid insert messages func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if len(in) != 1 { - log.Warn("Invalid operate message input in filterDmNode", zap.Int("input length", len(in))) + log.Warn("Invalid operate message input in filterDmNode", zap.Int("input length", len(in)), zap.String("name", fdmNode.Name())) return []Msg{} } msgStreamMsg, ok := in[0].(*MsgStreamMsg) if !ok { if in[0] == nil { - log.Debug("type assertion failed for MsgStreamMsg because it's nil") + log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fdmNode.Name())) } else { - log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fdmNode.Name())) } return []Msg{} } diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 8ba50e0455..1a59a11059 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -66,22 +66,22 @@ type deleteData struct { // Name returns the name of insertNode func (iNode *insertNode) Name() string { - return "iNode" + return fmt.Sprintf("iNode-%s", iNode.channel) } // Operate handles input messages, to execute insert operations func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if len(in) != 1 { - log.Warn("Invalid operate message input in insertNode", zap.Int("input length", len(in))) + log.Warn("Invalid operate message input in insertNode", zap.Int("input length", len(in)), zap.String("name", iNode.Name())) return []Msg{} } iMsg, ok := in[0].(*insertMsg) if !ok { if in[0] == nil { - log.Debug("type assertion failed for insertMsg because it's nil") + log.Debug("type assertion failed for insertMsg because it's nil", zap.String("name", iNode.Name())) } else { - log.Warn("type assertion failed for insertMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + log.Warn("type assertion failed for insertMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", iNode.Name())) } return []Msg{} } diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index e5a6479e93..c853982cf3 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -37,22 +37,22 @@ type serviceTimeNode struct { // Name returns the name of serviceTimeNode func (stNode *serviceTimeNode) Name() string { - return fmt.Sprintf("stNode-%d-%s", stNode.collectionID, stNode.vChannel) + return fmt.Sprintf("stNode-%s", stNode.vChannel) } // Operate handles input messages, to execute insert operations func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if len(in) != 1 { - log.Warn("Invalid operate message input in serviceTimeNode, input length = ", zap.Int("input node", len(in))) + log.Warn("Invalid operate message input in serviceTimeNode, input length = ", zap.Int("input node", len(in)), zap.String("name", stNode.Name())) return []Msg{} } serviceTimeMsg, ok := in[0].(*serviceTimeMsg) if !ok { if in[0] == nil { - log.Debug("type assertion failed for serviceTimeMsg because it's nil") + log.Debug("type assertion failed for serviceTimeMsg because it's nil", zap.String("name", stNode.Name())) } else { - log.Warn("type assertion failed for serviceTimeMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + log.Warn("type assertion failed for serviceTimeMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", stNode.Name())) } return []Msg{} }