diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 6804969e1e..5feea28439 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -87,7 +87,7 @@ func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelPair) { var ddStreamNode Node = newDDInputNode(dsService.ctx, dsService.msFactory, vchanPair.GetDdlVchannelName(), vchanPair.GetDdlPosition()) var filterDmNode Node = newFilteredDmNode() - var ddNode Node = newDDNode(dsService.ctx, dsService.flushChan, dsService.replica, dsService.idAllocator) + var ddNode Node = newDDNode(dsService.ctx, dsService.flushChan, dsService.replica, dsService.idAllocator, vchanPair.CollectionID) var insertBufferNode Node = newInsertBufferNode(dsService.ctx, dsService.replica, dsService.msFactory, dsService.idAllocator) var gcNode Node = newGCNode(dsService.replica) diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index e77d05a94f..099d956eb5 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -26,7 +26,6 @@ import ( miniokv "github.com/milvus-io/milvus/internal/kv/minio" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" - "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/storage" @@ -46,6 +45,8 @@ type ddNode struct { kv kv.BaseKV replica Replica + + collectionID UniqueID } type ddData struct { @@ -85,6 +86,10 @@ func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // TODO: add error handling } + if len(in) == 0 { + return []flowgraph.Msg{} + } + msMsg, ok := in[0].(*MsgStreamMsg) if !ok { log.Error("type assertion failed for MsgStreamMsg") @@ -122,15 +127,27 @@ func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // do dd tasks for _, msg := range tsMessages { - switch msg.Type() { - case commonpb.MsgType_CreateCollection: - ddNode.createCollection(msg.(*msgstream.CreateCollectionMsg)) - case commonpb.MsgType_DropCollection: - ddNode.dropCollection(msg.(*msgstream.DropCollectionMsg)) - case commonpb.MsgType_CreatePartition: - ddNode.createPartition(msg.(*msgstream.CreatePartitionMsg)) - case commonpb.MsgType_DropPartition: - ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg)) + switch msg := msg.(type) { + case *msgstream.CreateCollectionMsg: + if msg.CollectionID != ddNode.collectionID { + continue + } + ddNode.createCollection(msg) + case *msgstream.DropCollectionMsg: + if msg.CollectionID != ddNode.collectionID { + continue + } + ddNode.dropCollection(msg) + case *msgstream.CreatePartitionMsg: + if msg.CollectionID != ddNode.collectionID { + continue + } + ddNode.createPartition(msg) + case *msgstream.DropPartitionMsg: + if msg.CollectionID != ddNode.collectionID { + continue + } + ddNode.dropPartition(msg) default: log.Error("Not supporting message type", zap.Any("Type", msg.Type())) } @@ -439,7 +456,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { } func newDDNode(ctx context.Context, inFlushCh <-chan *flushMsg, - replica Replica, idAllocator allocatorInterface) *ddNode { + replica Replica, idAllocator allocatorInterface, collectionID UniqueID) *ddNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -478,5 +495,7 @@ func newDDNode(ctx context.Context, inFlushCh <-chan *flushMsg, kv: minioKV, replica: replica, flushMap: &sync.Map{}, + + collectionID: collectionID, } } diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index 76c9741383..4d0784dae5 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -45,9 +45,9 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { defer close(inFlushCh) replica := newReplica() - ddNode := newDDNode(ctx, inFlushCh, replica, NewAllocatorFactory()) - collID := UniqueID(0) + ddNode := newDDNode(ctx, inFlushCh, replica, NewAllocatorFactory(), collID) + collName := "col-test-0" // create collection createCollReq := internalpb.CreateCollectionRequest{