Improve flow graph node name and log msgID in QueryCoord (#18043)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2022-07-05 16:46:23 +08:00 committed by GitHub
parent 749558d55b
commit 7d6624fcad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 69 additions and 45 deletions

View File

@ -400,7 +400,8 @@ func (lct *loadCollectionTask) preExecute(ctx context.Context) error {
}
log.Info("start do loadCollectionTask",
zap.Int64("msgID", lct.getTaskID()),
zap.Int64("taskID", lct.getTaskID()),
zap.Int64("msgID", lct.GetBase().GetMsgID()),
zap.Int64("collectionID", collectionID),
zap.Stringer("schema", schema),
zap.Int32("replicaNumber", lct.ReplicaNumber))
@ -607,7 +608,8 @@ func (lct *loadCollectionTask) postExecute(ctx context.Context) error {
}
log.Info("loadCollectionTask postExecute done",
zap.Int64("msgID", lct.getTaskID()),
zap.Int64("taskID", lct.getTaskID()),
zap.Int64("msgID", lct.GetBase().GetMsgID()),
zap.Int64("collectionID", collectionID))
return nil
}
@ -617,6 +619,7 @@ func (lct *loadCollectionTask) globalPostExecute(ctx context.Context) error {
if err != nil {
log.Error("loadCollectionTask: failed to get collection info from meta",
zap.Int64("taskID", lct.getTaskID()),
zap.Int64("msgID", lct.GetBase().GetMsgID()),
zap.Int64("collectionID", lct.CollectionID),
zap.Error(err))
@ -628,6 +631,7 @@ func (lct *loadCollectionTask) globalPostExecute(ctx context.Context) error {
if err != nil {
log.Error("loadCollectionTask: failed to sync replica segments to shard leader",
zap.Int64("taskID", lct.getTaskID()),
zap.Int64("msgID", lct.GetBase().GetMsgID()),
zap.Int64("collectionID", lct.CollectionID),
zap.Error(err))
@ -725,7 +729,8 @@ func (rct *releaseCollectionTask) preExecute(context.Context) error {
collectionID := rct.CollectionID
rct.setResultInfo(nil)
log.Info("start do releaseCollectionTask",
zap.Int64("msgID", rct.getTaskID()),
zap.Int64("taskID", rct.getTaskID()),
zap.Int64("msgID", rct.GetBase().GetMsgID()),
zap.Int64("collectionID", collectionID))
return nil
}
@ -772,7 +777,8 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error {
}
log.Info("releaseCollectionTask Execute done",
zap.Int64("msgID", rct.getTaskID()),
zap.Int64("taskID", rct.getTaskID()),
zap.Int64("msgID", rct.GetBase().GetMsgID()),
zap.Int64("collectionID", collectionID),
zap.Int64("nodeID", rct.NodeID))
return nil
@ -785,7 +791,8 @@ func (rct *releaseCollectionTask) postExecute(context.Context) error {
}
log.Info("releaseCollectionTask postExecute done",
zap.Int64("msgID", rct.getTaskID()),
zap.Int64("taskID", rct.getTaskID()),
zap.Int64("msgID", rct.GetBase().GetMsgID()),
zap.Int64("collectionID", collectionID),
zap.Int64("nodeID", rct.NodeID))
return nil
@ -906,7 +913,8 @@ func (lpt *loadPartitionTask) preExecute(context.Context) error {
}
log.Info("start do loadPartitionTask",
zap.Int64("msgID", lpt.getTaskID()),
zap.Int64("taskID", lpt.getTaskID()),
zap.Int64("msgID", lpt.GetBase().GetMsgID()),
zap.Int64("collectionID", collectionID))
return nil
}
@ -1089,10 +1097,10 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
}
log.Info("loadPartitionTask Execute done",
zap.Int64("msgID", lpt.getTaskID()),
zap.Int64("taskID", lpt.getTaskID()),
zap.Int64("msgID", lpt.GetBase().GetMsgID()),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", lpt.Base.MsgID))
zap.Int64s("partitionIDs", partitionIDs))
return nil
}
@ -1104,7 +1112,8 @@ func (lpt *loadPartitionTask) postExecute(ctx context.Context) error {
}
log.Info("loadPartitionTask postExecute done",
zap.Int64("msgID", lpt.getTaskID()),
zap.Int64("taskID", lpt.getTaskID()),
zap.Int64("msgID", lpt.GetBase().GetMsgID()),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs))
return nil
@ -1234,7 +1243,8 @@ func (rpt *releasePartitionTask) preExecute(context.Context) error {
partitionIDs := rpt.PartitionIDs
rpt.setResultInfo(nil)
log.Info("start do releasePartitionTask",
zap.Int64("msgID", rpt.getTaskID()),
zap.Int64("taskID", rpt.getTaskID()),
zap.Int64("msgID", rpt.GetBase().GetMsgID()),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs))
return nil
@ -1274,7 +1284,8 @@ func (rpt *releasePartitionTask) execute(ctx context.Context) error {
}
log.Info("releasePartitionTask Execute done",
zap.Int64("msgID", rpt.getTaskID()),
zap.Int64("taskID", rpt.getTaskID()),
zap.Int64("msgID", rpt.GetBase().GetMsgID()),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("nodeID", rpt.NodeID))
@ -1289,7 +1300,8 @@ func (rpt *releasePartitionTask) postExecute(context.Context) error {
}
log.Info("releasePartitionTask postExecute done",
zap.Int64("msgID", rpt.getTaskID()),
zap.Int64("taskID", rpt.getTaskID()),
zap.Int64("msgID", rpt.GetBase().GetMsgID()),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("nodeID", rpt.NodeID))
@ -1356,7 +1368,8 @@ func (lst *loadSegmentTask) preExecute(ctx context.Context) error {
log.Info("start do loadSegmentTask",
zap.Int64s("segmentIDs", segmentIDs),
zap.Int64("loaded nodeID", lst.DstNodeID),
zap.Int64("taskID", lst.getTaskID()))
zap.Int64("taskID", lst.getTaskID()),
zap.Int64("msgID", lst.GetBase().GetMsgID()))
if err := lst.broker.acquireSegmentsReferLock(ctx, lst.taskID, segmentIDs); err != nil {
log.Error("acquire reference lock on segments failed", zap.Int64s("segmentIDs", segmentIDs),
@ -1377,7 +1390,8 @@ func (lst *loadSegmentTask) execute(ctx context.Context) error {
}
log.Info("loadSegmentTask Execute done",
zap.Int64("taskID", lst.getTaskID()))
zap.Int64("taskID", lst.getTaskID()),
zap.Int64("msgID", lst.GetBase().GetMsgID()))
return nil
}
@ -1391,7 +1405,8 @@ func (lst *loadSegmentTask) postExecute(context.Context) error {
}
log.Info("loadSegmentTask postExecute done",
zap.Int64("taskID", lst.getTaskID()))
zap.Int64("taskID", lst.getTaskID()),
zap.Int64("msgID", lst.GetBase().GetMsgID()))
return nil
}
@ -1466,7 +1481,8 @@ func (rst *releaseSegmentTask) preExecute(context.Context) error {
log.Info("start do releaseSegmentTask",
zap.Int64s("segmentIDs", segmentIDs),
zap.Int64("loaded nodeID", rst.NodeID),
zap.Int64("taskID", rst.getTaskID()))
zap.Int64("taskID", rst.getTaskID()),
zap.Int64("msgID", rst.GetBase().GetMsgID()))
return nil
}
@ -1482,7 +1498,8 @@ func (rst *releaseSegmentTask) execute(ctx context.Context) error {
log.Info("releaseSegmentTask Execute done",
zap.Int64s("segmentIDs", rst.SegmentIDs),
zap.Int64("taskID", rst.getTaskID()))
zap.Int64("taskID", rst.getTaskID()),
zap.Int64("msgID", rst.GetBase().GetMsgID()))
return nil
}
@ -1490,7 +1507,8 @@ func (rst *releaseSegmentTask) postExecute(context.Context) error {
segmentIDs := rst.SegmentIDs
log.Info("releaseSegmentTask postExecute done",
zap.Int64s("segmentIDs", segmentIDs),
zap.Int64("taskID", rst.getTaskID()))
zap.Int64("taskID", rst.getTaskID()),
zap.Int64("msgID", rst.GetBase().GetMsgID()))
return nil
}
@ -1545,7 +1563,8 @@ func (wdt *watchDmChannelTask) preExecute(context.Context) error {
log.Info("start do watchDmChannelTask",
zap.Strings("dmChannels", channels),
zap.Int64("loaded nodeID", wdt.NodeID),
zap.Int64("taskID", wdt.getTaskID()))
zap.Int64("taskID", wdt.getTaskID()),
zap.Int64("msgID", wdt.GetBase().GetMsgID()))
return nil
}
@ -1560,13 +1579,15 @@ func (wdt *watchDmChannelTask) execute(ctx context.Context) error {
}
log.Info("watchDmChannelsTask Execute done",
zap.Int64("taskID", wdt.getTaskID()))
zap.Int64("taskID", wdt.getTaskID()),
zap.Int64("msgID", wdt.GetBase().GetMsgID()))
return nil
}
func (wdt *watchDmChannelTask) postExecute(context.Context) error {
log.Info("watchDmChannelTask postExecute done",
zap.Int64("taskID", wdt.getTaskID()))
zap.Int64("taskID", wdt.getTaskID()),
zap.Int64("msgID", wdt.GetBase().GetMsgID()))
return nil
}
@ -1660,7 +1681,8 @@ func (wdt *watchDeltaChannelTask) preExecute(context.Context) error {
log.Info("start do watchDeltaChannelTask",
zap.Strings("deltaChannels", channels),
zap.Int64("loaded nodeID", wdt.NodeID),
zap.Int64("taskID", wdt.getTaskID()))
zap.Int64("taskID", wdt.getTaskID()),
zap.Int64("msgID", wdt.GetBase().GetMsgID()))
return nil
}
@ -1675,13 +1697,15 @@ func (wdt *watchDeltaChannelTask) execute(ctx context.Context) error {
}
log.Info("watchDeltaChannelsTask Execute done",
zap.Int64("taskID", wdt.getTaskID()))
zap.Int64("taskID", wdt.getTaskID()),
zap.Int64("msgID", wdt.GetBase().GetMsgID()))
return nil
}
func (wdt *watchDeltaChannelTask) postExecute(context.Context) error {
log.Info("watchDeltaChannelTask postExecute done",
zap.Int64("taskID", wdt.getTaskID()))
zap.Int64("taskID", wdt.getTaskID()),
zap.Int64("msgID", wdt.GetBase().GetMsgID()))
return nil
}

View File

@ -47,22 +47,22 @@ type deleteNode struct {
// Name returns the name of deleteNode
func (dNode *deleteNode) Name() string {
return "dNode"
return fmt.Sprintf("dNode-%s", dNode.channel)
}
// Operate handles input messages, do delete operations
func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if len(in) != 1 {
log.Warn("Invalid operate message input in deleteNode", zap.Int("input length", len(in)))
log.Warn("Invalid operate message input in deleteNode", zap.Int("input length", len(in)), zap.String("name", dNode.Name()))
return []Msg{}
}
dMsg, ok := in[0].(*deleteMsg)
if !ok {
if in[0] == nil {
log.Debug("type assertion failed for deleteMsg because it's nil")
log.Debug("type assertion failed for deleteMsg because it's nil", zap.String("name", dNode.Name()))
} else {
log.Warn("type assertion failed for deleteMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
log.Warn("type assertion failed for deleteMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", dNode.Name()))
}
return []Msg{}
}

View File

@ -40,22 +40,22 @@ type filterDeleteNode struct {
// Name returns the name of filterDeleteNode
func (fddNode *filterDeleteNode) Name() string {
return fmt.Sprintf("fdNode-%d", fddNode.collectionID)
return fmt.Sprintf("fdNode-%s", fddNode.channel)
}
// Operate handles input messages, to filter invalid delete messages
func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if len(in) != 1 {
log.Warn("Invalid operate message input in filterDDNode", zap.Int("input length", len(in)))
log.Warn("Invalid operate message input in filterDDNode", zap.Int("input length", len(in)), zap.String("name", fddNode.Name()))
return []Msg{}
}
msgStreamMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
if in[0] == nil {
log.Debug("type assertion failed for MsgStreamMsg because it's nil")
log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fddNode.Name()))
} else {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fddNode.Name()))
}
return []Msg{}
}

View File

@ -40,22 +40,22 @@ type filterDmNode struct {
// Name returns the name of filterDmNode
func (fdmNode *filterDmNode) Name() string {
return fmt.Sprintf("fdmNode-%d", fdmNode.collectionID)
return fmt.Sprintf("fdmNode-%s", fdmNode.channel)
}
// Operate handles input messages, to filter invalid insert messages
func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if len(in) != 1 {
log.Warn("Invalid operate message input in filterDmNode", zap.Int("input length", len(in)))
log.Warn("Invalid operate message input in filterDmNode", zap.Int("input length", len(in)), zap.String("name", fdmNode.Name()))
return []Msg{}
}
msgStreamMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
if in[0] == nil {
log.Debug("type assertion failed for MsgStreamMsg because it's nil")
log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fdmNode.Name()))
} else {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fdmNode.Name()))
}
return []Msg{}
}

View File

@ -66,22 +66,22 @@ type deleteData struct {
// Name returns the name of insertNode
func (iNode *insertNode) Name() string {
return "iNode"
return fmt.Sprintf("iNode-%s", iNode.channel)
}
// Operate handles input messages, to execute insert operations
func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if len(in) != 1 {
log.Warn("Invalid operate message input in insertNode", zap.Int("input length", len(in)))
log.Warn("Invalid operate message input in insertNode", zap.Int("input length", len(in)), zap.String("name", iNode.Name()))
return []Msg{}
}
iMsg, ok := in[0].(*insertMsg)
if !ok {
if in[0] == nil {
log.Debug("type assertion failed for insertMsg because it's nil")
log.Debug("type assertion failed for insertMsg because it's nil", zap.String("name", iNode.Name()))
} else {
log.Warn("type assertion failed for insertMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
log.Warn("type assertion failed for insertMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", iNode.Name()))
}
return []Msg{}
}

View File

@ -37,22 +37,22 @@ type serviceTimeNode struct {
// Name returns the name of serviceTimeNode
func (stNode *serviceTimeNode) Name() string {
return fmt.Sprintf("stNode-%d-%s", stNode.collectionID, stNode.vChannel)
return fmt.Sprintf("stNode-%s", stNode.vChannel)
}
// Operate handles input messages, to execute insert operations
func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if len(in) != 1 {
log.Warn("Invalid operate message input in serviceTimeNode, input length = ", zap.Int("input node", len(in)))
log.Warn("Invalid operate message input in serviceTimeNode, input length = ", zap.Int("input node", len(in)), zap.String("name", stNode.Name()))
return []Msg{}
}
serviceTimeMsg, ok := in[0].(*serviceTimeMsg)
if !ok {
if in[0] == nil {
log.Debug("type assertion failed for serviceTimeMsg because it's nil")
log.Debug("type assertion failed for serviceTimeMsg because it's nil", zap.String("name", stNode.Name()))
} else {
log.Warn("type assertion failed for serviceTimeMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
log.Warn("type assertion failed for serviceTimeMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", stNode.Name()))
}
return []Msg{}
}