diff --git a/pkg/mq/msgdispatcher/dispatcher.go b/pkg/mq/msgdispatcher/dispatcher.go index 690301ecf6..dfd232a977 100644 --- a/pkg/mq/msgdispatcher/dispatcher.go +++ b/pkg/mq/msgdispatcher/dispatcher.go @@ -19,6 +19,8 @@ package msgdispatcher import ( "context" "fmt" + "strconv" + "strings" "sync" "time" @@ -263,17 +265,28 @@ func (d *Dispatcher) groupingMsgs(pack *MsgPack) map[string]*MsgPack { } // group messages by vchannel for _, msg := range pack.Msgs { - var vchannel string + var vchannel, collectionID string switch msg.Type() { case commonpb.MsgType_Insert: vchannel = msg.(*msgstream.InsertMsg).GetShardName() case commonpb.MsgType_Delete: vchannel = msg.(*msgstream.DeleteMsg).GetShardName() + case commonpb.MsgType_CreateCollection: + collectionID = strconv.FormatInt(msg.(*msgstream.CreateCollectionMsg).GetCollectionID(), 10) + case commonpb.MsgType_DropCollection: + collectionID = strconv.FormatInt(msg.(*msgstream.DropCollectionMsg).GetCollectionID(), 10) + case commonpb.MsgType_CreatePartition: + collectionID = strconv.FormatInt(msg.(*msgstream.CreatePartitionMsg).GetCollectionID(), 10) + case commonpb.MsgType_DropPartition: + collectionID = strconv.FormatInt(msg.(*msgstream.DropPartitionMsg).GetCollectionID(), 10) } if vchannel == "" { // for non-dml msg, such as CreateCollection, DropCollection, ... - // we need to dispatch it to all the vchannels. + // we need to dispatch it to the vchannel of this collection for k := range targetPacks { + if !strings.Contains(k, collectionID) { + continue + } // TODO: There's data race when non-dml msg is sent to different flow graph. // Wrong open-trancing information is generated, Fix in future. targetPacks[k].Msgs = append(targetPacks[k].Msgs, msg)