diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 005ed24bf2..438808fb2d 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -403,7 +403,8 @@ func (c *ChannelManager) unsubAttempt(ncInfo *NodeChannelInfo) { nodeID := ncInfo.NodeID for _, ch := range ncInfo.Channels { - subName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, nodeID, ch.CollectionID) + // align to datanode subname, using vchannel name + subName := fmt.Sprintf("%s-%d-%s", Params.CommonCfg.DataNodeSubName, nodeID, ch.Name) pchannelName := funcutil.ToPhysicalChannel(ch.Name) msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName}) } diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index 86979098b7..2c17700c87 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -34,8 +34,8 @@ import ( // flowgraph ddNode. func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNodeConfig *nodeConfig) (*flowgraph.InputNode, error) { // subName should be unique, since pchannelName is shared among several collections - // consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10) - consumeSubName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, Params.DataNodeCfg.GetNodeID(), dmNodeConfig.collectionID) + // use vchannel in case of reuse pchannel for same collection + consumeSubName := fmt.Sprintf("%s-%d-%s", Params.CommonCfg.DataNodeSubName, Params.DataNodeCfg.GetNodeID(), dmNodeConfig.vChannelName) insertStream, err := dmNodeConfig.msFactory.NewTtMsgStream(ctx) if err != nil { return nil, err