enhance: use the collection id to group msg pack in the msg dispatcher (#34871)

- issue: #34870

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2024-07-29 11:45:49 +08:00 committed by GitHub
parent c45f38aa61
commit 0e41f104c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -19,6 +19,8 @@ package msgdispatcher
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"
@ -263,17 +265,28 @@ func (d *Dispatcher) groupingMsgs(pack *MsgPack) map[string]*MsgPack {
}
// group messages by vchannel
for _, msg := range pack.Msgs {
var vchannel string
var vchannel, collectionID string
switch msg.Type() {
case commonpb.MsgType_Insert:
vchannel = msg.(*msgstream.InsertMsg).GetShardName()
case commonpb.MsgType_Delete:
vchannel = msg.(*msgstream.DeleteMsg).GetShardName()
case commonpb.MsgType_CreateCollection:
collectionID = strconv.FormatInt(msg.(*msgstream.CreateCollectionMsg).GetCollectionID(), 10)
case commonpb.MsgType_DropCollection:
collectionID = strconv.FormatInt(msg.(*msgstream.DropCollectionMsg).GetCollectionID(), 10)
case commonpb.MsgType_CreatePartition:
collectionID = strconv.FormatInt(msg.(*msgstream.CreatePartitionMsg).GetCollectionID(), 10)
case commonpb.MsgType_DropPartition:
collectionID = strconv.FormatInt(msg.(*msgstream.DropPartitionMsg).GetCollectionID(), 10)
}
if vchannel == "" {
// for non-dml msg, such as CreateCollection, DropCollection, ...
// we need to dispatch it to all the vchannels.
// we need to dispatch it to the vchannel of this collection
for k := range targetPacks {
if !strings.Contains(k, collectionID) {
continue
}
// TODO: There's data race when non-dml msg is sent to different flow graph.
// Wrong open-trancing information is generated, Fix in future.
targetPacks[k].Msgs = append(targetPacks[k].Msgs, msg)