diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go index 37d4e5e6e2..ef302f9654 100644 --- a/internal/querynode/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -121,13 +121,14 @@ func TestDataSyncService_DeltaFlowGraphs(t *testing.T) { assert.NoError(t, err) dataSyncService.removeFlowGraphsByDeltaChannels([]Channel{defaultDeltaChannel}) + replica.removeCollectionVDeltaChannel(defaultCollectionID, defaultDeltaChannel) assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 0) fg, err = dataSyncService.getFlowGraphByDeltaChannel(defaultCollectionID, defaultDeltaChannel) assert.Nil(t, fg) assert.Error(t, err) - _, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel}, map[string]string{defaultDMLChannel: defaultDMLChannel}) + _, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel}, map[string]string{defaultDeltaChannel: defaultDeltaChannel}) assert.NoError(t, err) assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1) @@ -147,7 +148,7 @@ func TestDataSyncService_DeltaFlowGraphs(t *testing.T) { t.Run("test addFlowGraphsForDeltaChannels checkReplica Failed", func(t *testing.T) { err = dataSyncService.metaReplica.removeCollection(defaultCollectionID) assert.NoError(t, err) - _, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel}, map[string]string{defaultDMLChannel: defaultDMLChannel}) + _, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel}, map[string]string{defaultDeltaChannel: defaultDeltaChannel}) assert.Error(t, err) dataSyncService.metaReplica.addCollection(defaultCollectionID, genTestCollectionSchema()) }) diff --git a/internal/querynode/flow_graph_delete_node.go b/internal/querynode/flow_graph_delete_node.go index 47cfd09cbc..8d09bb6f4e 100644 --- a/internal/querynode/flow_graph_delete_node.go +++ b/internal/querynode/flow_graph_delete_node.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/flowgraph" + "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/trace" ) @@ -41,14 +42,15 @@ var newVarCharPrimaryKey = storage.NewVarCharPrimaryKey // deleteNode is the one of nodes in delta flow graph type deleteNode struct { baseNode - collectionID UniqueID - metaReplica ReplicaInterface // historical - vchannel Channel + collectionID UniqueID + metaReplica ReplicaInterface // historical + deltaVchannel Channel + dmlVchannel Channel } // Name returns the name of deleteNode func (dNode *deleteNode) Name() string { - return fmt.Sprintf("dNode-%s", dNode.vchannel) + return fmt.Sprintf("dNode-%s", dNode.deltaVchannel) } // Operate handles input messages, do delete operations @@ -86,7 +88,7 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { for i, delMsg := range dMsg.deleteMessages { traceID, _, _ := trace.InfoFromSpan(spans[i]) log.Debug("delete in historical replica", - zap.String("vchannel", dNode.vchannel), + zap.String("vchannel", dNode.deltaVchannel), zap.Int64("collectionID", delMsg.CollectionID), zap.String("collectionName", delMsg.CollectionName), zap.Int64("numPKs", delMsg.NumRows), @@ -98,10 +100,10 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ) if dNode.metaReplica.getSegmentNum(segmentTypeSealed) != 0 { - err := processDeleteMessages(dNode.metaReplica, segmentTypeSealed, delMsg, delData) + err := processDeleteMessages(dNode.metaReplica, segmentTypeSealed, delMsg, delData, dNode.dmlVchannel) if err != nil { // error occurs when missing meta info or unexpected pk type, should not happen - err = fmt.Errorf("deleteNode processDeleteMessages failed, collectionID = %d, err = %s, channel = %s", delMsg.CollectionID, err, dNode.vchannel) + err = fmt.Errorf("deleteNode processDeleteMessages failed, collectionID = %d, err = %s, channel = %s", delMsg.CollectionID, err, dNode.deltaVchannel) log.Error(err.Error()) panic(err) } @@ -116,7 +118,7 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { log.Warn("failed to get segment", zap.Int64("collectionID", dNode.collectionID), zap.Int64("segmentID", segmentID), - zap.String("vchannel", dNode.vchannel), + zap.String("vchannel", dNode.deltaVchannel), ) continue } @@ -183,12 +185,12 @@ func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg * log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID), zap.String("SegmentType", targetSegment.getType().String()), - zap.String("vchannel", dNode.vchannel)) + zap.String("vchannel", dNode.deltaVchannel)) return nil } // newDeleteNode returns a new deleteNode -func newDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel Channel) *deleteNode { +func newDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, deltaVchannel Channel) (*deleteNode, error) { maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism @@ -196,10 +198,17 @@ func newDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) - return &deleteNode{ - baseNode: baseNode, - collectionID: collectionID, - metaReplica: metaReplica, - vchannel: vchannel, + dmlVChannel, err := funcutil.ConvertChannelName(deltaVchannel, Params.CommonCfg.RootCoordDelta, Params.CommonCfg.RootCoordDml) + if err != nil { + log.Error("failed to convert deltaVChannel to dmlVChannel", zap.String("deltaVChannel", deltaVchannel), zap.Error(err)) + return nil, err } + + return &deleteNode{ + baseNode: baseNode, + collectionID: collectionID, + metaReplica: metaReplica, + deltaVchannel: deltaVchannel, + dmlVchannel: dmlVChannel, + }, nil } diff --git a/internal/querynode/flow_graph_delete_node_test.go b/internal/querynode/flow_graph_delete_node_test.go index d9d3c331a7..cc6ac3f252 100644 --- a/internal/querynode/flow_graph_delete_node_test.go +++ b/internal/querynode/flow_graph_delete_node_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/common" @@ -33,7 +34,8 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) { t.Run("test delete", func(t *testing.T) { historical, err := genSimpleReplica() assert.NoError(t, err) - deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName) + deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel) + require.NoError(t, err) err = historical.addSegment(defaultSegmentID, defaultPartitionID, @@ -54,8 +56,9 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) { t.Run("test segment delete error", func(t *testing.T) { historical, err := genSimpleReplica() - assert.NoError(t, err) - deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName) + require.NoError(t, err) + deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel) + require.NoError(t, err) err = historical.addSegment(defaultSegmentID, defaultPartitionID, @@ -77,8 +80,10 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) { t.Run("test no target segment", func(t *testing.T) { historical, err := genSimpleReplica() - assert.NoError(t, err) - deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName) + require.NoError(t, err) + deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel) + require.NoError(t, err) + wg := &sync.WaitGroup{} wg.Add(1) err = deleteNode.delete(nil, defaultSegmentID, wg) @@ -87,8 +92,9 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) { t.Run("test invalid segmentType", func(t *testing.T) { historical, err := genSimpleReplica() - assert.NoError(t, err) - deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName) + require.NoError(t, err) + deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel) + require.NoError(t, err) err = historical.addSegment(defaultSegmentID, defaultPartitionID, @@ -108,8 +114,9 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) { func TestFlowGraphDeleteNode_operate(t *testing.T) { t.Run("test operate", func(t *testing.T) { historical, err := genSimpleReplica() - assert.NoError(t, err) - deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName) + require.NoError(t, err) + deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel) + require.NoError(t, err) err = historical.addSegment(defaultSegmentID, defaultPartitionID, @@ -187,8 +194,9 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) { t.Run("test invalid partitionID", func(t *testing.T) { historical, err := genSimpleReplica() - assert.NoError(t, err) - deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName) + require.NoError(t, err) + deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel) + require.NoError(t, err) err = historical.addSegment(defaultSegmentID, defaultPartitionID, @@ -212,8 +220,9 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) { t.Run("test collection partition not exist", func(t *testing.T) { historical, err := genSimpleReplica() - assert.NoError(t, err) - deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName) + require.NoError(t, err) + deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel) + require.NoError(t, err) err = historical.addSegment(defaultSegmentID, defaultPartitionID, @@ -238,8 +247,9 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) { t.Run("test partition not exist", func(t *testing.T) { historical, err := genSimpleReplica() - assert.NoError(t, err) - deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName) + require.NoError(t, err) + deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel) + require.NoError(t, err) err = historical.addSegment(defaultSegmentID, defaultPartitionID, @@ -263,8 +273,9 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) { t.Run("test invalid input length", func(t *testing.T) { historical, err := genSimpleReplica() - assert.NoError(t, err) - deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName) + require.NoError(t, err) + deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel) + require.NoError(t, err) err = historical.addSegment(defaultSegmentID, defaultPartitionID, @@ -283,4 +294,11 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) { msg := []flowgraph.Msg{&dMsg, &dMsg} deleteNode.Operate(msg) }) + + t.Run("test bad deltaChannelName", func(t *testing.T) { + historical, err := genSimpleReplica() + require.NoError(t, err) + _, err = newDeleteNode(historical, defaultCollectionID, defaultDMLChannel) + assert.Error(t, err) + }) } diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index ad548556fe..472bb5ab87 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -234,7 +234,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { zap.Int64("collectionID", delMsg.CollectionID), zap.String("collectionName", delMsg.CollectionName), zap.Int64("numPKs", delMsg.NumRows)) - err := processDeleteMessages(iNode.metaReplica, segmentTypeGrowing, delMsg, delData) + err := processDeleteMessages(iNode.metaReplica, segmentTypeGrowing, delMsg, delData, iNode.vchannel) if err != nil { // error occurs when missing meta info or unexpected pk type, should not happen err = fmt.Errorf("insertNode processDeleteMessages failed, collectionID = %d, err = %s, vchannel: %s", delMsg.CollectionID, err, iNode.vchannel) @@ -294,7 +294,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } // processDeleteMessages would execute delete operations for growing segments -func processDeleteMessages(replica ReplicaInterface, segType segmentType, msg *msgstream.DeleteMsg, delData *deleteData) error { +func processDeleteMessages(replica ReplicaInterface, segType segmentType, msg *msgstream.DeleteMsg, delData *deleteData, vchannelName string) error { var partitionIDs []UniqueID var err error if msg.PartitionID != -1 { @@ -309,17 +309,13 @@ func processDeleteMessages(replica ReplicaInterface, segType segmentType, msg *m return err } } - resultSegmentIDs := make([]UniqueID, 0) - for _, partitionID := range partitionIDs { - segmentIDs, err := replica.getSegmentIDs(partitionID, segType) - if err != nil { - // Skip this partition - if errors.Is(err, ErrPartitionNotFound) { - continue - } + var resultSegmentIDs []UniqueID + resultSegmentIDs, err = replica.getSegmentIDsByVChannel(partitionIDs, vchannelName, segType) + log.Warn("processDeleteMessage", zap.String("vchannel", vchannelName), zap.Int64s("segmentIDs", resultSegmentIDs), zap.Int64s("paritions", partitionIDs)) + if err != nil { + if !errors.Is(err, ErrPartitionNotFound) { return err } - resultSegmentIDs = append(resultSegmentIDs, segmentIDs...) } primaryKeys := storage.ParseIDs2PrimaryKeys(msg.PrimaryKeys) diff --git a/internal/querynode/flow_graph_insert_node_test.go b/internal/querynode/flow_graph_insert_node_test.go index aef3a10570..387a020f65 100644 --- a/internal/querynode/flow_graph_insert_node_test.go +++ b/internal/querynode/flow_graph_insert_node_test.go @@ -197,7 +197,7 @@ func TestFlowGraphInsertNode_delete(t *testing.T) { } func TestFlowGraphInsertNode_processDeleteMessages(t *testing.T) { - t.Run("test processDeleteMessages", func(t *testing.T) { + t.Run("test processDeleteMessages growing", func(t *testing.T) { streaming, err := genSimpleReplica() assert.NoError(t, err) @@ -205,11 +205,11 @@ func TestFlowGraphInsertNode_processDeleteMessages(t *testing.T) { dData, err := genFlowGraphDeleteData() assert.NoError(t, err) - err = processDeleteMessages(streaming, segmentTypeGrowing, dMsg, dData) + err = processDeleteMessages(streaming, segmentTypeGrowing, dMsg, dData, defaultChannelName) assert.NoError(t, err) }) - t.Run("test processDeleteMessages", func(t *testing.T) { + t.Run("test processDeleteMessages sealed", func(t *testing.T) { streaming, err := genSimpleReplica() assert.NoError(t, err) @@ -217,7 +217,7 @@ func TestFlowGraphInsertNode_processDeleteMessages(t *testing.T) { dData, err := genFlowGraphDeleteData() assert.NoError(t, err) - err = processDeleteMessages(streaming, segmentTypeGrowing, dMsg, dData) + err = processDeleteMessages(streaming, segmentTypeSealed, dMsg, dData, defaultChannelName) assert.NoError(t, err) }) } diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index 074501bd96..902fcf19ac 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -143,7 +143,10 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context, return nil, err } var filterDeleteNode node = newFilteredDeleteNode(metaReplica, collectionID, vchannel) - var deleteNode node = newDeleteNode(metaReplica, collectionID, vchannel) + deleteNode, err := newDeleteNode(metaReplica, collectionID, vchannel) + if err != nil { + return nil, err + } var serviceTimeNode node = newServiceTimeNode(tSafeReplica, collectionID, vchannel) q.flowGraph.AddNode(dmStreamNode) diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index cd6557603d..f570c3237a 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -72,8 +72,8 @@ const ( defaultMetricType = L2 defaultNQ = 10 - defaultDMLChannel = "query-node-unittest-DML-0" - defaultDeltaChannel = "query-node-unittest-delta-channel-0" + defaultDMLChannel = "by-dev-rootcoord-dml-DML-0" + defaultDeltaChannel = "by-dev-rootcoord-delta-channel-0" defaultSubName = "query-node-unittest-sub-name-0" defaultVersion = 1 diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 2caff0ff19..a8ff0d4d71 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -722,6 +722,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection stream.Close() }() + vchannelName := position.ChannelName pChannelName := funcutil.ToPhysicalChannel(position.ChannelName) position.ChannelName = pChannelName @@ -796,7 +797,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection if dmsg.CollectionID != collectionID { continue } - err = processDeleteMessages(loader.metaReplica, segmentTypeSealed, dmsg, delData) + err = processDeleteMessages(loader.metaReplica, segmentTypeSealed, dmsg, delData, vchannelName) if err != nil { // TODO: panic? // error occurs when missing meta info or unexpected pk type, should not happen