diff --git a/internal/querynode/load_segment_task.go b/internal/querynode/load_segment_task.go index 0d6e6ebe06..ca508198df 100644 --- a/internal/querynode/load_segment_task.go +++ b/internal/querynode/load_segment_task.go @@ -18,7 +18,6 @@ package querynode import ( "context" - "fmt" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -133,18 +132,21 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error { } // internal helper function to subscribe delta channel -func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string) error { - collectionID := l.req.CollectionID +func (l *loadSegmentsTask) watchDeltaChannel(dmlChannels []string) error { + var ( + collectionID = l.req.CollectionID + vDeltaChannels []string + VPDeltaChannels = make(map[string]string) + ) log := log.With( zap.Int64("collectionID", collectionID), + zap.Strings("dmlChannels", dmlChannels), ) - var vDeltaChannels []string - VPDeltaChannels := make(map[string]string) - for _, v := range deltaChannels { + for _, v := range dmlChannels { dc, err := funcutil.ConvertChannelName(v, Params.CommonCfg.RootCoordDml.GetValue(), Params.CommonCfg.RootCoordDelta.GetValue()) if err != nil { log.Warn("watchDeltaChannels, failed to convert deltaChannel from dmlChannel", - zap.String("DmlChannel", v), + zap.String("dmlChannel", v), zap.Error(err), ) return err @@ -153,9 +155,7 @@ func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string) error { vDeltaChannels = append(vDeltaChannels, dc) VPDeltaChannels[dc] = p } - log.Info("Starting WatchDeltaChannels ...", - zap.Strings("channels", vDeltaChannels), - ) + log.Info("Starting WatchDeltaChannels ...", zap.Strings("deltaChannels", vDeltaChannels)) coll, err := l.node.metaReplica.getCollectionByID(collectionID) if err != nil { @@ -165,14 +165,14 @@ func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string) error { // add collection meta and fg with mutex protection. channel2FlowGraph, err := l.node.dataSyncService.addFlowGraphsForDeltaChannels(collectionID, vDeltaChannels, VPDeltaChannels) if err != nil { - log.Warn("watchDeltaChannel, add flowGraph for deltaChannel failed", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels), zap.Error(err)) + log.Warn("watchDeltaChannel, failed to add flowGraph for deltaChannels", + zap.Strings("deltaChannels", vDeltaChannels), + zap.Error(err)) return err } if len(channel2FlowGraph) == 0 { - log.Warn("all delta channels have been added before", - zap.Strings("deltaChannels", deltaChannels), - ) + log.Warn("all delta channels have been added before", zap.Strings("deltaChannels", vDeltaChannels)) return nil } @@ -189,7 +189,7 @@ func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string) error { } }() - log.Info("watchDeltaChannel, add flowGraph for deltaChannel success", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels)) + log.Info("watchDeltaChannel, add flowGraph for deltaChannel success", zap.Strings("deltaChannels", vDeltaChannels)) // create tSafe for _, channel := range vDeltaChannels { @@ -200,12 +200,15 @@ func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string) error { for _, channel := range vDeltaChannels { dmlChannel, err := funcutil.ConvertChannelName(channel, Params.CommonCfg.RootCoordDelta.GetValue(), Params.CommonCfg.RootCoordDml.GetValue()) if err != nil { - log.Error("failed to convert delta channel to dml", zap.String("channel", channel), zap.Error(err)) + log.Error("failed to convert delta channel to dml", zap.String("deltaChannel", channel), zap.Error(err)) panic(err) } err = l.node.queryShardService.addQueryShard(collectionID, dmlChannel, l.req.GetReplicaID()) if err != nil { - log.Error("failed to add shard Service to query shard", zap.String("channel", channel), zap.Error(err)) + log.Error("failed to add shard Service to query shard", + zap.String("dmlChannel", dmlChannel), + zap.String("deltaChannel", channel), + zap.Error(err)) panic(err) } } @@ -215,6 +218,6 @@ func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string) error { fg.flowGraph.Start() } - log.Info("WatchDeltaChannels done", zap.Int64("collectionID", collectionID), zap.String("ChannelIDs", fmt.Sprintln(vDeltaChannels))) + log.Info("WatchDeltaChannels done", zap.Strings("deltaChannels", vDeltaChannels)) return nil }