diff --git a/internal/querynode/task.go b/internal/querynode/task.go index e74fb3b7ce..44669bfd7f 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -250,17 +250,16 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { pChannels = append(pChannels, p) VPChannels[v] = p } - log.Debug("Starting WatchDmChannels ...", - zap.Any("collectionName", w.req.Schema.Name), - zap.Any("collectionID", collectionID), - zap.Any("vChannels", vChannels), - zap.Any("pChannels", pChannels), - ) + if len(VPChannels) != len(vChannels) { return errors.New("get physical channels failed, illegal channel length, collectionID = " + fmt.Sprintln(collectionID)) } - log.Debug("Get physical channels done", - zap.Any("collectionID", collectionID), + + log.Debug("Starting WatchDmChannels ...", + zap.String("collectionName", w.req.Schema.Name), + zap.Int64("collectionID", collectionID), + zap.Strings("vChannels", vChannels), + zap.Strings("pChannels", pChannels), ) // init replica @@ -309,7 +308,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { } } } - log.Debug("watchDMChannel, init replica done", zap.Any("collectionID", collectionID)) + log.Debug("watchDMChannel, init replica done", zap.Int64("collectionID", collectionID)) // get subscription name getUniqueSubName := func() string { @@ -329,7 +328,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { info.SeekPosition.MsgGroup = consumeSubName toSeekChannels = append(toSeekChannels, info.SeekPosition) } - log.Debug("watchDMChannel, group channels done", zap.Any("collectionID", collectionID)) + log.Debug("watchDMChannel, group channels done", zap.Int64("collectionID", collectionID)) // add excluded segments for unFlushed segments, // unFlushed segments before check point should be filtered out. @@ -339,7 +338,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { } w.node.streaming.replica.addExcludedSegments(collectionID, unFlushedCheckPointInfos) log.Debug("watchDMChannel, add check points info for unFlushed segments done", - zap.Any("collectionID", collectionID), + zap.Int64("collectionID", collectionID), zap.Any("unFlushedCheckPointInfos", unFlushedCheckPointInfos), ) @@ -359,7 +358,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { } w.node.streaming.replica.addExcludedSegments(collectionID, flushedCheckPointInfos) log.Debug("watchDMChannel, add check points info for flushed segments done", - zap.Any("collectionID", collectionID), + zap.Int64("collectionID", collectionID), zap.Any("flushedCheckPointInfos", flushedCheckPointInfos), ) @@ -379,7 +378,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { } w.node.streaming.replica.addExcludedSegments(collectionID, droppedCheckPointInfos) log.Debug("watchDMChannel, add check points info for dropped segments done", - zap.Any("collectionID", collectionID), + zap.Int64("collectionID", collectionID), zap.Any("droppedCheckPointInfos", droppedCheckPointInfos), ) @@ -390,7 +389,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { // add flow graph w.node.dataSyncService.addFlowGraphsForDMLChannels(collectionID, vChannels) - log.Debug("Query node add DML flow graphs", zap.Any("channels", vChannels)) + log.Debug("Query node add DML flow graphs", zap.Int64("collectionID", collectionID), zap.Any("channels", vChannels)) // add tSafe watcher if queryCollection exists qc, err := w.node.queryService.getQueryCollection(collectionID) @@ -417,8 +416,8 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { } } log.Debug("as consumer channels", - zap.Any("collectionID", collectionID), - zap.Any("toSubChannels", toSubChannels)) + zap.Int64("collectionID", collectionID), + zap.Strings("toSubChannels", toSubChannels)) // seek channel for _, pos := range toSeekChannels { @@ -440,7 +439,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { } } log.Debug("Seek all channel done", - zap.Any("collectionID", collectionID), + zap.Int64("collectionID", collectionID), zap.Any("toSeekChannels", toSeekChannels)) // load growing segments @@ -469,16 +468,16 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { Schema: w.req.Schema, } log.Debug("loading growing segments in WatchDmChannels...", - zap.Any("collectionID", collectionID), - zap.Any("unFlushedSegmentIDs", unFlushedSegmentIDs), + zap.Int64("collectionID", collectionID), + zap.Int64s("unFlushedSegmentIDs", unFlushedSegmentIDs), ) err = w.node.loader.loadSegment(req, segmentTypeGrowing) if err != nil { return err } log.Debug("load growing segments done in WatchDmChannels", - zap.Any("collectionID", collectionID), - zap.Any("unFlushedSegmentIDs", unFlushedSegmentIDs), + zap.Int64("collectionID", collectionID), + zap.Int64s("unFlushedSegmentIDs", unFlushedSegmentIDs), ) // start flow graphs @@ -489,7 +488,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { } } - log.Debug("WatchDmChannels done", zap.String("ChannelIDs", fmt.Sprintln(vChannels))) + log.Debug("WatchDmChannels done", zap.Strings("ChannelIDs", vChannels)) return nil }