diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index ab5f3762ca..9fe405321f 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -82,6 +82,26 @@ func newDataSyncService(ctx context.Context, return service, nil } +type parallelConfig struct { + maxQueueLength int32 + maxParallelism int32 +} + +type nodeConfig struct { + msFactory msgstream.Factory // msgStream factory + collectionID UniqueID + vChannelName string + replica Replica // Segment replica + allocator allocatorInterface + + // defaults + parallelConfig +} + +func newParallelConfig() parallelConfig { + return parallelConfig{Params.FlowGraphMaxQueueLength, Params.FlowGraphMaxParallelism} +} + // start starts the flowgraph in datasyncservice func (dsService *dataSyncService) start() { if dsService.fg != nil { @@ -163,41 +183,37 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro dsService.saveBinlog = saveBinlog + c := &nodeConfig{ + msFactory: dsService.msFactory, + collectionID: vchanInfo.GetCollectionID(), + vChannelName: vchanInfo.GetChannelName(), + replica: dsService.replica, + allocator: dsService.idAllocator, + + parallelConfig: newParallelConfig(), + } + var dmStreamNode Node - dmStreamNode, err = newDmInputNode( - dsService.ctx, - dsService.msFactory, - vchanInfo.CollectionID, - vchanInfo.GetChannelName(), - vchanInfo.GetSeekPosition(), - ) + dmStreamNode, err = newDmInputNode(dsService.ctx, vchanInfo.GetSeekPosition(), c) if err != nil { return err } + var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo) var insertBufferNode Node insertBufferNode, err = newInsertBufferNode( dsService.ctx, - dsService.replica, - dsService.msFactory, - dsService.idAllocator, dsService.flushChs.insertBufferCh, saveBinlog, - vchanInfo.GetChannelName(), dsService.flushingSegCache, + c, ) if err != nil { return err } var deleteNode Node - deleteNode, err = newDeleteNode( - dsService.ctx, - dsService.replica, - dsService.idAllocator, - dsService.flushChs.deleteBufferCh, - vchanInfo.GetChannelName(), - ) + deleteNode, err = newDeleteNode(dsService.ctx, dsService.flushChs.deleteBufferCh, c) if err != nil { return err } diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 9c167a8ffa..dc58510435 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -166,7 +166,8 @@ func (ddn *ddNode) isFlushed(segmentID UniqueID) bool { func newDDNode(clearSignal chan<- UniqueID, collID UniqueID, vchanInfo *datapb.VchannelInfo) *ddNode { baseNode := BaseNode{} - baseNode.SetMaxParallelism(Params.FlowGraphMaxQueueLength) + baseNode.SetMaxQueueLength(Params.FlowGraphMaxQueueLength) + baseNode.SetMaxParallelism(Params.FlowGraphMaxParallelism) fs := make([]*datapb.SegmentInfo, 0, len(vchanInfo.GetFlushedSegments())) fs = append(fs, vchanInfo.GetFlushedSegments()...) diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index ef3515e857..fdaa08251f 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -254,16 +254,10 @@ func (dn *deleteNode) flushDelData(collID UniqueID, timeRange TimeRange) { } } -func newDeleteNode( - ctx context.Context, - replica Replica, - idAllocator allocatorInterface, - flushCh <-chan *flushMsg, - channelName string, -) (*deleteNode, error) { +func newDeleteNode(ctx context.Context, flushCh <-chan *flushMsg, config *nodeConfig) (*deleteNode, error) { baseNode := BaseNode{} - baseNode.SetMaxQueueLength(Params.FlowGraphMaxQueueLength) - baseNode.SetMaxParallelism(Params.FlowGraphMaxParallelism) + baseNode.SetMaxQueueLength(config.maxQueueLength) + baseNode.SetMaxParallelism(config.maxParallelism) // MinIO option := &miniokv.Option{ @@ -280,12 +274,13 @@ func newDeleteNode( } return &deleteNode{ - BaseNode: baseNode, - channelName: channelName, - delBuf: sync.Map{}, - replica: replica, - idAllocator: idAllocator, - flushCh: flushCh, - minIOKV: minIOKV, + BaseNode: baseNode, + delBuf: sync.Map{}, + flushCh: flushCh, + minIOKV: minIOKV, + + replica: config.replica, + idAllocator: config.allocator, + channelName: config.vChannelName, }, nil } diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index 08dde15890..7a5be47a31 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -63,18 +63,17 @@ func (replica *mockReplica) getCollectionAndPartitionID(segID UniqueID) (collID, func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) { tests := []struct { - ctx context.Context - replica Replica - idAllocator allocatorInterface + ctx context.Context + config *nodeConfig description string }{ - {context.Background(), &SegmentReplica{}, &allocator{}, "pointer of SegmentReplica"}, + {context.Background(), &nodeConfig{}, "pointer of SegmentReplica"}, } for _, test := range tests { te.Run(test.description, func(t *testing.T) { - dn, err := newDeleteNode(test.ctx, test.replica, test.idAllocator, make(chan *flushMsg), "") + dn, err := newDeleteNode(test.ctx, make(chan *flushMsg), test.config) assert.Nil(t, err) assert.NotNil(t, dn) @@ -173,7 +172,13 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { ) replica := genMockReplica(segIDs, pks, chanName) t.Run("Test get segment by primary keys", func(te *testing.T) { - dn, err := newDeleteNode(context.Background(), replica, &allocator{}, make(chan *flushMsg), chanName) + c := &nodeConfig{ + replica: replica, + allocator: &allocator{}, + vChannelName: chanName, + } + + dn, err := newDeleteNode(context.Background(), make(chan *flushMsg), c) assert.Nil(t, err) results := dn.filterSegmentByPK(0, pks) @@ -200,7 +205,12 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { Params.DeleteBinlogRootPath = testPath flushChan := make(chan *flushMsg, 100) - delNode, err := newDeleteNode(ctx, replica, NewAllocatorFactory(), flushChan, chanName) + c := &nodeConfig{ + replica: replica, + allocator: NewAllocatorFactory(), + vChannelName: chanName, + } + delNode, err := newDeleteNode(ctx, flushChan, c) assert.Nil(te, err) flushChan <- &flushMsg{ diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index 4254ed92e8..26a55c90fc 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -16,7 +16,6 @@ import ( "fmt" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/util/flowgraph" @@ -26,21 +25,18 @@ import ( // DmInputNode receives messages from message streams, packs messages between two timeticks, and passes all // messages between two timeticks to the following flowgraph node. In DataNode, the following flow graph node is // flowgraph ddNode. -func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID UniqueID, chanName string, seekPos *internalpb.MsgPosition) (*flowgraph.InputNode, error) { - maxQueueLength := Params.FlowGraphMaxQueueLength - maxParallelism := Params.FlowGraphMaxParallelism - +func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNodeConfig *nodeConfig) (*flowgraph.InputNode, error) { // subName should be unique, since pchannelName is shared among several collections // consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10) - consumeSubName := fmt.Sprintf("%s-%d", Params.MsgChannelSubName, collID) - insertStream, err := factory.NewTtMsgStream(ctx) + consumeSubName := fmt.Sprintf("%s-%d", Params.MsgChannelSubName, dmNodeConfig.collectionID) + insertStream, err := dmNodeConfig.msFactory.NewTtMsgStream(ctx) if err != nil { return nil, err } // MsgStream needs a physical channel name, but the channel name in seek position from DataCoord // is virtual channel name, so we need to convert vchannel name into pchannel neme here. - pchannelName := rootcoord.ToPhysicalChannel(chanName) + pchannelName := rootcoord.ToPhysicalChannel(dmNodeConfig.vChannelName) insertStream.AsConsumer([]string{pchannelName}, consumeSubName) log.Debug("datanode AsConsumer", zap.String("physical channel", pchannelName), zap.String("subName", consumeSubName)) @@ -53,7 +49,6 @@ func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID Uniqu } } - var stream msgstream.MsgStream = insertStream - node := flowgraph.NewInputNode(stream, "dmInputNode", maxQueueLength, maxParallelism) + node := flowgraph.NewInputNode(insertStream, "dmInputNode", dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism) return node, nil } diff --git a/internal/datanode/flow_graph_dmstream_input_node_test.go b/internal/datanode/flow_graph_dmstream_input_node_test.go index e33adb8449..fa0ebc789e 100644 --- a/internal/datanode/flow_graph_dmstream_input_node_test.go +++ b/internal/datanode/flow_graph_dmstream_input_node_test.go @@ -91,6 +91,6 @@ func (mtm *mockTtMsgStream) Seek(offset []*internalpb.MsgPosition) error { func TestNewDmInputNode(t *testing.T) { ctx := context.Background() - _, err := newDmInputNode(ctx, &mockMsgStreamFactory{}, 0, "abc_adc", new(internalpb.MsgPosition)) + _, err := newDmInputNode(ctx, new(internalpb.MsgPosition), &nodeConfig{msFactory: &mockMsgStreamFactory{}}) assert.Nil(t, err) } diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 70cd0d6377..c6d6061774 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -874,23 +874,12 @@ func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID Uni return ibNode.replica.getCollectionAndPartitionID(segmentID) } -func newInsertBufferNode( - ctx context.Context, - replica Replica, - factory msgstream.Factory, - idAllocator allocatorInterface, - flushCh <-chan *flushMsg, - saveBinlog func(*segmentFlushUnit) error, - channelName string, - flushingSegCache *Cache, -) (*insertBufferNode, error) { - - maxQueueLength := Params.FlowGraphMaxQueueLength - maxParallelism := Params.FlowGraphMaxParallelism +func newInsertBufferNode(ctx context.Context, flushCh <-chan *flushMsg, saveBinlog func(*segmentFlushUnit) error, + flushingSegCache *Cache, config *nodeConfig) (*insertBufferNode, error) { baseNode := BaseNode{} - baseNode.SetMaxQueueLength(maxQueueLength) - baseNode.SetMaxParallelism(maxParallelism) + baseNode.SetMaxQueueLength(config.maxQueueLength) + baseNode.SetMaxParallelism(config.maxParallelism) // MinIO option := &miniokv.Option{ @@ -908,7 +897,7 @@ func newInsertBufferNode( } //input stream, data node time tick - wTt, err := factory.NewMsgStream(ctx) + wTt, err := config.msFactory.NewMsgStream(ctx) if err != nil { return nil, err } @@ -918,7 +907,7 @@ func newInsertBufferNode( wTtMsgStream.Start() // update statistics channel - segS, err := factory.NewMsgStream(ctx) + segS, err := config.msFactory.NewMsgStream(ctx) if err != nil { return nil, err } @@ -931,16 +920,17 @@ func newInsertBufferNode( BaseNode: baseNode, insertBuffer: sync.Map{}, minIOKV: minIOKV, - channelName: channelName, timeTickStream: wTtMsgStream, segmentStatisticsStream: segStatisticsMsgStream, - replica: replica, flushMap: sync.Map{}, flushChan: flushCh, - idAllocator: idAllocator, dsSaveBinlog: saveBinlog, flushingSegCache: flushingSegCache, + + replica: config.replica, + idAllocator: config.allocator, + channelName: config.vChannelName, }, nil } diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 2ac47bff8f..d87b3baafa 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -84,27 +84,36 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { } flushChan := make(chan *flushMsg, 100) - iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache()) + + c := &nodeConfig{ + replica: replica, + msFactory: msFactory, + allocator: NewAllocatorFactory(), + vChannelName: "string", + } + + iBNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c) assert.NotNil(t, iBNode) require.NoError(t, err) ctxDone, cancel := context.WithCancel(ctx) cancel() // cancel now to make context done - _, err = newInsertBufferNode(ctxDone, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache()) + _, err = newInsertBufferNode(ctxDone, flushChan, saveBinlog, newCache(), c) assert.Error(t, err) - cdf := &CDFMsFactory{ + c.msFactory = &CDFMsFactory{ Factory: msFactory, cd: 0, } - _, err = newInsertBufferNode(ctx, replica, cdf, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache()) + _, err = newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c) assert.Error(t, err) - cdf = &CDFMsFactory{ + + c.msFactory = &CDFMsFactory{ Factory: msFactory, cd: 1, } - _, err = newInsertBufferNode(ctx, replica, cdf, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache()) + _, err = newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c) assert.Error(t, err) } @@ -170,7 +179,14 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { } flushChan := make(chan *flushMsg, 100) - iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache()) + c := &nodeConfig{ + replica: replica, + msFactory: msFactory, + allocator: NewAllocatorFactory(), + vChannelName: "string", + } + + iBNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c) require.NoError(t, err) flushChan <- &flushMsg{ @@ -238,7 +254,14 @@ func TestFlushSegment(t *testing.T) { saveBinlog := func(*segmentFlushUnit) error { return nil } - ibNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache()) + + c := &nodeConfig{ + replica: replica, + msFactory: msFactory, + allocator: NewAllocatorFactory(), + vChannelName: "string", + } + ibNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c) require.NoError(t, err) flushSegment(collMeta, @@ -352,7 +375,13 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { } flushChan := make(chan *flushMsg, 100) - iBNode, err := newInsertBufferNode(ctx, colRep, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache()) + c := &nodeConfig{ + replica: colRep, + msFactory: msFactory, + allocator: NewAllocatorFactory(), + vChannelName: "string", + } + iBNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c) require.NoError(t, err) // Auto flush number of rows set to 2 @@ -583,7 +612,13 @@ func TestInsertBufferNode_getCollMetaBySegID(t *testing.T) { } flushChan := make(chan *flushMsg, 100) - iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache()) + c := &nodeConfig{ + replica: replica, + msFactory: msFactory, + allocator: NewAllocatorFactory(), + vChannelName: "string", + } + iBNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c) require.NoError(t, err) meta, err := iBNode.getCollMetabySegID(1, 101) @@ -636,7 +671,13 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { } flushChan := make(chan *flushMsg, 100) - iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache()) + c := &nodeConfig{ + replica: replica, + msFactory: msFactory, + allocator: NewAllocatorFactory(), + vChannelName: "string", + } + iBNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c) require.NoError(t, err) inMsg := GenFlowGraphInsertMsg(insertChannelName)