diff --git a/internal/querynode/task.go b/internal/querynode/task.go index a2acff486f..38da26cd0d 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -22,11 +22,10 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/proto/milvuspb" queryPb "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/rootcoord" ) type task interface { @@ -118,45 +117,26 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { // get all vChannels vChannels := make([]Channel, 0) pChannels := make([]Channel, 0) + VPChannels := make(map[string]string) // map[vChannel]pChannel for _, info := range w.req.Infos { - vChannels = append(vChannels, info.ChannelName) + v := info.ChannelName + p := rootcoord.ToPhysicalChannel(info.ChannelName) + vChannels = append(vChannels, v) + pChannels = append(pChannels, p) + VPChannels[v] = p } log.Debug("starting WatchDmChannels ...", zap.Any("collectionName", w.req.Schema.Name), zap.Any("collectionID", collectionID), - zap.String("vChannels", fmt.Sprintln(vChannels))) - - // get physical channels - desColReq := &milvuspb.DescribeCollectionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_DescribeCollection, - }, - CollectionID: collectionID, - } - desColRsp, err := w.node.rootCoord.DescribeCollection(ctx, desColReq) - if err != nil { - log.Error("get channels failed, err = " + err.Error()) - return err - } - log.Debug("get channels from master", - zap.Any("collectionID", collectionID), - zap.Any("vChannels", desColRsp.VirtualChannelNames), - zap.Any("pChannels", desColRsp.PhysicalChannelNames), + zap.Any("vChannels", vChannels), + zap.Any("pChannels", pChannels), ) - VPChannels := make(map[string]string) // map[vChannel]pChannel - for _, ch := range vChannels { - for i := range desColRsp.VirtualChannelNames { - if desColRsp.VirtualChannelNames[i] == ch { - VPChannels[ch] = desColRsp.PhysicalChannelNames[i] - pChannels = append(pChannels, desColRsp.PhysicalChannelNames[i]) - break - } - } - } if len(VPChannels) != len(vChannels) { return errors.New("get physical channels failed, illegal channel length, collectionID = " + fmt.Sprintln(collectionID)) } - log.Debug("get physical channels done", zap.Any("collectionID", collectionID)) + log.Debug("get physical channels done", + zap.Any("collectionID", collectionID), + ) // init replica if hasCollectionInStreaming := w.node.streaming.replica.hasCollection(collectionID); !hasCollectionInStreaming {