From 5c5f9aa05e24cdfabf59d892b3ff503a26c44852 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Wed, 27 Sep 2023 11:07:25 +0800 Subject: [PATCH] Enhance newDataSyncService (#27277) - Add flowgraph.Assemble assembles nodes in flowgraph.go - remove fgCtx in newDataSyncService - Add newServiceWithEtcdTickler func, reduce param numbers to 3 - Remove unnecessary params - config.maxQueueLength, config.maxParallelish See also: #27207 Signed-off-by: yangxuan --- internal/datanode/data_node.go | 3 +- internal/datanode/data_node_test.go | 2 +- internal/datanode/data_sync_service.go | 487 +++++++----------- internal/datanode/data_sync_service_test.go | 201 +++----- internal/datanode/event_manager.go | 16 +- internal/datanode/event_manager_test.go | 2 +- internal/datanode/flow_graph_delete_node.go | 4 +- .../flow_graph_dmstream_input_node.go | 4 +- .../flow_graph_dmstream_input_node_test.go | 15 +- .../datanode/flow_graph_insert_buffer_node.go | 17 +- .../flow_graph_insert_buffer_node_test.go | 13 +- internal/datanode/flow_graph_manager.go | 46 +- internal/datanode/flow_graph_manager_test.go | 10 +- internal/datanode/flush_manager.go | 4 +- internal/datanode/mock_test.go | 8 +- internal/datanode/services.go | 26 +- internal/datanode/services_test.go | 27 +- internal/util/flowgraph/flow_graph.go | 19 + 18 files changed, 398 insertions(+), 506 deletions(-) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index f78c36ff65..80e3e26320 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -75,8 +75,7 @@ var Params *paramtable.ComponentParam = paramtable.Get() // `etcdCli` is a connection of etcd // `rootCoord` is a grpc client of root coordinator. // `dataCoord` is a grpc client of data service. -// `NodeID` is unique to each datanode. -// `State` is current statement of this data node, indicating whether it's healthy. +// `stateCode` is current statement of this data node, indicating whether it's healthy. // // `clearSignal` is a signal channel for releasing the flowgraph resources. // `segmentCache` stores all flushing and flushed segments. diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index a8ef04ffdc..2526c3731d 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -197,7 +197,7 @@ func TestDataNode(t *testing.T) { } for _, test := range testDataSyncs { - err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler()) + err = node.flowgraphManager.addAndStartWithEtcdTickler(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler()) assert.NoError(t, err) vchanNameCh <- test.dmChannelName } diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 214e0074d3..082831e08e 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -18,7 +18,6 @@ package datanode import ( "context" - "fmt" "sync" "github.com/cockroachdb/errors" @@ -33,7 +32,6 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/mq/msgstream" - "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -45,93 +43,36 @@ import ( type dataSyncService struct { ctx context.Context cancelFn context.CancelFunc - fg *flowgraph.TimeTickedFlowGraph // internal flowgraph processes insert/delta messages - flushCh chan flushMsg - resendTTCh chan resendTTMsg // chan to ask for resending DataNode time tick message. - channel Channel // channel stores meta of channel - idAllocator allocator.Allocator // id/timestamp allocator - dispClient msgdispatcher.Client - msFactory msgstream.Factory + channel Channel // channel stores meta of channel collectionID UniqueID // collection id of vchan for which this data sync service serves vchannelName string - dataCoord types.DataCoordClient // DataCoord instance to interact with - clearSignal chan<- string // signal channel to notify flowgraph close for collection/partition drop msg consumed + + // TODO: should be equal to paramtable.GetNodeID(), but intergrationtest has 1 paramtable for a minicluster, the NodeID + // varies, will cause savebinglogpath check fail. So we pass ServerID into dataSyncService to aviod it failure. + serverID UniqueID + + fg *flowgraph.TimeTickedFlowGraph // internal flowgraph processes insert/delta messages delBufferManager *DeltaBufferManager - flushingSegCache *Cache // a guarding cache stores currently flushing segment ids flushManager flushManager // flush manager handles flush process - chunkManager storage.ChunkManager + + flushCh chan flushMsg + resendTTCh chan resendTTMsg // chan to ask for resending DataNode time tick message. + timetickSender *timeTickSender // reference to timeTickSender compactor *compactionExecutor // reference to compaction executor + flushingSegCache *Cache // a guarding cache stores currently flushing segment ids - serverID int64 - stopOnce sync.Once - flushListener chan *segmentFlushPack // chan to listen flush event - timetickSender *timeTickSender // reference to timeTickSender -} + clearSignal chan<- string // signal channel to notify flowgraph close for collection/partition drop msg consumed + idAllocator allocator.Allocator // id/timestamp allocator + msFactory msgstream.Factory + dispClient msgdispatcher.Client + dataCoord types.DataCoordClient // DataCoord instance to interact with + chunkManager storage.ChunkManager -func newDataSyncService( - fgCtx, initCtx context.Context, - flushCh chan flushMsg, - resendTTCh chan resendTTMsg, - channel Channel, - alloc allocator.Allocator, - dispClient msgdispatcher.Client, - factory msgstream.Factory, - vchan *datapb.VchannelInfo, - clearSignal chan<- string, - dataCoord types.DataCoordClient, - flushingSegCache *Cache, - chunkManager storage.ChunkManager, - compactor *compactionExecutor, - tickler *tickler, - serverID int64, - timetickSender *timeTickSender, -) (*dataSyncService, error) { - if channel == nil { - return nil, errors.New("Nil input") - } + // test only + flushListener chan *segmentFlushPack // chan to listen flush event - childCtx, cancel := context.WithCancel(fgCtx) - - delBufferManager := &DeltaBufferManager{ - channel: channel, - delBufHeap: &PriorityQueue{}, - } - - service := &dataSyncService{ - ctx: childCtx, - cancelFn: cancel, - fg: nil, - flushCh: flushCh, - resendTTCh: resendTTCh, - channel: channel, - idAllocator: alloc, - dispClient: dispClient, - msFactory: factory, - collectionID: vchan.GetCollectionID(), - vchannelName: vchan.GetChannelName(), - dataCoord: dataCoord, - clearSignal: clearSignal, - delBufferManager: delBufferManager, - flushingSegCache: flushingSegCache, - chunkManager: chunkManager, - compactor: compactor, - serverID: serverID, - timetickSender: timetickSender, - } - - if err := service.initNodes(initCtx, vchan, tickler); err != nil { - return nil, err - } - if tickler.isWatchFailed.Load() { - return nil, errors.Errorf("tickler watch failed") - } - return service, nil -} - -type parallelConfig struct { - maxQueueLength int32 - maxParallelism int32 + stopOnce sync.Once } type nodeConfig struct { @@ -140,13 +81,7 @@ type nodeConfig struct { vChannelName string channel Channel // Channel info allocator allocator.Allocator - serverID int64 - // defaults - parallelConfig -} - -func newParallelConfig() parallelConfig { - return parallelConfig{Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32(), Params.DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32()} + serverID UniqueID } // start the flow graph in datasyncservice @@ -184,8 +119,12 @@ func (dsService *dataSyncService) close() { dsService.clearGlobalFlushingCache() close(dsService.flushCh) - dsService.flushManager.close() - log.Info("dataSyncService flush manager closed") + + if dsService.flushManager != nil { + dsService.flushManager.close() + log.Info("dataSyncService flush manager closed") + } + dsService.cancelFn() dsService.channel.close() @@ -198,47 +137,43 @@ func (dsService *dataSyncService) clearGlobalFlushingCache() { dsService.flushingSegCache.Remove(segments...) } -// initNodes inits a TimetickedFlowGraph -// initCtx are used to init only. -func (dsService *dataSyncService) initNodes(initCtx context.Context, vchanInfo *datapb.VchannelInfo, tickler *tickler) error { - dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) - // initialize flush manager for DataSync Service - dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.chunkManager, dsService.channel, - flushNotifyFunc(dsService, retry.Attempts(50)), dropVirtualChannelFunc(dsService)) +// getSegmentInfos return the SegmentInfo details according to the given ids through RPC to datacoord +// TODO: add a broker for the rpc +func getSegmentInfos(ctx context.Context, datacoord types.DataCoordClient, segmentIDs []int64) ([]*datapb.SegmentInfo, error) { + infoResp, err := datacoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo), + commonpbutil.WithMsgID(0), + commonpbutil.WithSourceID(paramtable.GetNodeID()), + ), + SegmentIDs: segmentIDs, + IncludeUnHealthy: true, + }) + if err := funcutil.VerifyResponse(infoResp, err); err != nil { + log.Error("Fail to get SegmentInfo by ids from datacoord", zap.Error(err)) + return nil, err + } - log.Info("begin to init data sync service", zap.Int64("collection", vchanInfo.CollectionID), - zap.String("Chan", vchanInfo.ChannelName), - zap.Int64s("unflushed", vchanInfo.GetUnflushedSegmentIds()), - zap.Int64s("flushed", vchanInfo.GetFlushedSegmentIds()), + return infoResp.Infos, nil +} + +// getChannelWithEtcdTickler updates progress into etcd when a new segment is added into channel. +func getChannelWithEtcdTickler(initCtx context.Context, node *DataNode, info *datapb.ChannelWatchInfo, tickler *etcdTickler, unflushed, flushed []*datapb.SegmentInfo) (Channel, error) { + var ( + channelName = info.GetVchan().GetChannelName() + collectionID = info.GetVchan().GetCollectionID() + recoverTs = info.GetVchan().GetSeekPosition().GetTimestamp() ) - var err error - // recover segment checkpoints - unflushedSegmentInfos, err := dsService.getSegmentInfos(initCtx, vchanInfo.GetUnflushedSegmentIds()) - if err != nil { - return err - } - flushedSegmentInfos, err := dsService.getSegmentInfos(initCtx, vchanInfo.GetFlushedSegmentIds()) - if err != nil { - return err - } + + // init channel meta + channel := newChannel(channelName, collectionID, info.GetSchema(), node.rootCoord, node.chunkManager) // tickler will update addSegment progress to watchInfo tickler.watch() defer tickler.stop() - futures := make([]*conc.Future[any], 0, len(unflushedSegmentInfos)+len(flushedSegmentInfos)) - - for _, us := range unflushedSegmentInfos { - if us.CollectionID != dsService.collectionID || - us.GetInsertChannel() != vchanInfo.ChannelName { - log.Warn("Collection ID or ChannelName not match", - zap.Int64("Wanted ID", dsService.collectionID), - zap.Int64("Actual ID", us.CollectionID), - zap.String("Wanted channel Name", vchanInfo.ChannelName), - zap.String("Actual Channel Name", us.GetInsertChannel()), - ) - continue - } + futures := make([]*conc.Future[any], 0, len(unflushed)+len(flushed)) + for _, us := range unflushed { log.Info("recover growing segments from checkpoints", zap.String("vChannelName", us.GetInsertChannel()), zap.Int64("segmentID", us.GetID()), @@ -248,7 +183,7 @@ func (dsService *dataSyncService) initNodes(initCtx context.Context, vchanInfo * // avoid closure capture iteration variable segment := us future := getOrCreateIOPool().Submit(func() (interface{}, error) { - if err := dsService.channel.addSegment(initCtx, addSegmentReq{ + if err := channel.addSegment(initCtx, addSegmentReq{ segType: datapb.SegmentType_Normal, segID: segment.GetID(), collID: segment.CollectionID, @@ -257,7 +192,7 @@ func (dsService *dataSyncService) initNodes(initCtx context.Context, vchanInfo * statsBinLogs: segment.Statslogs, binLogs: segment.GetBinlogs(), endPos: segment.GetDmlPosition(), - recoverTs: vchanInfo.GetSeekPosition().GetTimestamp(), + recoverTs: recoverTs, }); err != nil { return nil, err } @@ -267,17 +202,7 @@ func (dsService *dataSyncService) initNodes(initCtx context.Context, vchanInfo * futures = append(futures, future) } - for _, fs := range flushedSegmentInfos { - if fs.CollectionID != dsService.collectionID || - fs.GetInsertChannel() != vchanInfo.ChannelName { - log.Warn("Collection ID or ChannelName not match", - zap.Int64("Wanted ID", dsService.collectionID), - zap.Int64("Actual ID", fs.CollectionID), - zap.String("Wanted Channel Name", vchanInfo.ChannelName), - zap.String("Actual Channel Name", fs.GetInsertChannel()), - ) - continue - } + for _, fs := range flushed { log.Info("recover sealed segments form checkpoints", zap.String("vChannelName", fs.GetInsertChannel()), zap.Int64("segmentID", fs.GetID()), @@ -286,7 +211,7 @@ func (dsService *dataSyncService) initNodes(initCtx context.Context, vchanInfo * // avoid closure capture iteration variable segment := fs future := getOrCreateIOPool().Submit(func() (interface{}, error) { - if err := dsService.channel.addSegment(initCtx, addSegmentReq{ + if err := channel.addSegment(initCtx, addSegmentReq{ segType: datapb.SegmentType_Flushed, segID: segment.GetID(), collID: segment.GetCollectionID(), @@ -294,7 +219,7 @@ func (dsService *dataSyncService) initNodes(initCtx context.Context, vchanInfo * numOfRows: segment.GetNumOfRows(), statsBinLogs: segment.GetStatslogs(), binLogs: segment.GetBinlogs(), - recoverTs: vchanInfo.GetSeekPosition().GetTimestamp(), + recoverTs: recoverTs, }); err != nil { return nil, err } @@ -304,163 +229,149 @@ func (dsService *dataSyncService) initNodes(initCtx context.Context, vchanInfo * futures = append(futures, future) } - err = conc.AwaitAll(futures...) - if err != nil { - return err + if err := conc.AwaitAll(futures...); err != nil { + return nil, err } - c := &nodeConfig{ - msFactory: dsService.msFactory, - collectionID: vchanInfo.GetCollectionID(), - vChannelName: vchanInfo.GetChannelName(), - channel: dsService.channel, - allocator: dsService.idAllocator, - - parallelConfig: newParallelConfig(), - serverID: dsService.serverID, + if tickler.isWatchFailed.Load() { + return nil, errors.Errorf("tickler watch failed") } - - var dmStreamNode Node - dmStreamNode, err = newDmInputNode(initCtx, dsService.dispClient, vchanInfo.GetSeekPosition(), c) - if err != nil { - return err - } - - var ddNode Node - ddNode, err = newDDNode( - dsService.ctx, - dsService.collectionID, - vchanInfo.GetChannelName(), - vchanInfo.GetDroppedSegmentIds(), - flushedSegmentInfos, - unflushedSegmentInfos, - dsService.compactor) - if err != nil { - return err - } - - var insertBufferNode Node - insertBufferNode, err = newInsertBufferNode( - dsService.ctx, - dsService.collectionID, - dsService.delBufferManager, - dsService.flushCh, - dsService.resendTTCh, - dsService.flushManager, - dsService.flushingSegCache, - c, - dsService.timetickSender, - ) - if err != nil { - return err - } - - var deleteNode Node - deleteNode, err = newDeleteNode(dsService.ctx, dsService.flushManager, dsService.delBufferManager, dsService.clearSignal, c) - if err != nil { - return err - } - - var ttNode Node - ttNode, err = newTTNode(c, dsService.dataCoord) - if err != nil { - return err - } - - dsService.fg.AddNode(dmStreamNode) - dsService.fg.AddNode(ddNode) - dsService.fg.AddNode(insertBufferNode) - dsService.fg.AddNode(deleteNode) - dsService.fg.AddNode(ttNode) - - // ddStreamNode - err = dsService.fg.SetEdges(dmStreamNode.Name(), - []string{ddNode.Name()}, - ) - if err != nil { - log.Error("set edges failed in node", zap.String("name", dmStreamNode.Name()), zap.Error(err)) - return err - } - - // ddNode - err = dsService.fg.SetEdges(ddNode.Name(), - []string{insertBufferNode.Name()}, - ) - if err != nil { - log.Error("set edges failed in node", zap.String("name", ddNode.Name()), zap.Error(err)) - return err - } - - // insertBufferNode - err = dsService.fg.SetEdges(insertBufferNode.Name(), - []string{deleteNode.Name()}, - ) - if err != nil { - log.Error("set edges failed in node", zap.String("name", insertBufferNode.Name()), zap.Error(err)) - return err - } - - // deleteNode - err = dsService.fg.SetEdges(deleteNode.Name(), - []string{ttNode.Name()}, - ) - if err != nil { - log.Error("set edges failed in node", zap.String("name", deleteNode.Name()), zap.Error(err)) - return err - } - - // ttNode - err = dsService.fg.SetEdges(ttNode.Name(), - []string{}, - ) - if err != nil { - log.Error("set edges failed in node", zap.String("name", ttNode.Name()), zap.Error(err)) - return err - } - return nil + return channel, nil } -// getSegmentInfos return the SegmentInfo details according to the given ids through RPC to datacoord -func (dsService *dataSyncService) getSegmentInfos(ctx context.Context, segmentIDs []int64) ([]*datapb.SegmentInfo, error) { - infoResp, err := dsService.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo), - commonpbutil.WithMsgID(0), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - SegmentIDs: segmentIDs, - IncludeUnHealthy: true, - }) - if err != nil { - log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err)) - return nil, err - } - if infoResp.GetStatus().ErrorCode != commonpb.ErrorCode_Success { - err = errors.New(infoResp.GetStatus().Reason) - log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err)) - return nil, err - } - return infoResp.Infos, nil -} - -func (dsService *dataSyncService) getChannelLatestMsgID(ctx context.Context, channelName string, segmentID int64) ([]byte, error) { - pChannelName := funcutil.ToPhysicalChannel(channelName) - dmlStream, err := dsService.msFactory.NewMsgStream(ctx) - if err != nil { - return nil, err - } - defer dmlStream.Close() - - subName := fmt.Sprintf("datanode-%d-%s-%d", paramtable.GetNodeID(), channelName, segmentID) - log.Debug("dataSyncService register consumer for getChannelLatestMsgID", - zap.String("pChannelName", pChannelName), - zap.String("subscription", subName), +func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb.ChannelWatchInfo, channel Channel, unflushed, flushed []*datapb.SegmentInfo) (*dataSyncService, error) { + var ( + channelName = info.GetVchan().GetChannelName() + collectionID = info.GetVchan().GetCollectionID() ) - dmlStream.AsConsumer(ctx, []string{pChannelName}, subName, mqwrapper.SubscriptionPositionUnknown) - id, err := dmlStream.GetLatestMsgID(pChannelName) + + config := &nodeConfig{ + msFactory: node.factory, + allocator: node.allocator, + + collectionID: collectionID, + vChannelName: channelName, + channel: channel, + serverID: node.session.ServerID, + } + + var ( + flushCh = make(chan flushMsg, 100) + resendTTCh = make(chan resendTTMsg, 100) + delBufferManager = &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, + } + ) + + ctx, cancel := context.WithCancel(node.ctx) + ds := &dataSyncService{ + ctx: ctx, + cancelFn: cancel, + flushCh: flushCh, + resendTTCh: resendTTCh, + delBufferManager: delBufferManager, + + dispClient: node.dispClient, + msFactory: node.factory, + dataCoord: node.dataCoord, + + idAllocator: config.allocator, + channel: config.channel, + collectionID: config.collectionID, + vchannelName: config.vChannelName, + serverID: config.serverID, + + flushingSegCache: node.segmentCache, + clearSignal: node.clearSignal, + chunkManager: node.chunkManager, + compactor: node.compactionExecutor, + timetickSender: node.timeTickSender, + + fg: nil, + flushManager: nil, + } + + // init flushManager + flushManager := NewRendezvousFlushManager( + node.allocator, + node.chunkManager, + channel, + flushNotifyFunc(ds, retry.Attempts(50)), dropVirtualChannelFunc(ds), + ) + ds.flushManager = flushManager + + // init flowgraph + fg := flowgraph.NewTimeTickedFlowGraph(node.ctx) + dmStreamNode, err := newDmInputNode(initCtx, node.dispClient, info.GetVchan().GetSeekPosition(), config) if err != nil { - log.Error("fail to GetLatestMsgID", zap.String("pChannelName", pChannelName), zap.Error(err)) return nil, err } - return id.Serialize(), nil + + ddNode, err := newDDNode( + node.ctx, + collectionID, + channelName, + info.GetVchan().GetDroppedSegmentIds(), + flushed, + unflushed, + node.compactionExecutor, + ) + if err != nil { + return nil, err + } + + insertBufferNode, err := newInsertBufferNode( + node.ctx, + flushCh, + resendTTCh, + delBufferManager, + flushManager, + node.segmentCache, + node.timeTickSender, + config, + ) + if err != nil { + return nil, err + } + + deleteNode, err := newDeleteNode(node.ctx, flushManager, delBufferManager, node.clearSignal, config) + if err != nil { + return nil, err + } + + ttNode, err := newTTNode(config, node.dataCoord) + if err != nil { + return nil, err + } + + if err := fg.AssembleNodes(dmStreamNode, ddNode, insertBufferNode, deleteNode, ttNode); err != nil { + return nil, err + } + ds.fg = fg + + return ds, nil +} + +// newServiceWithEtcdTickler gets a dataSyncService, but flowgraphs are not running +// initCtx is used to init the dataSyncService only, if initCtx.Canceled or initCtx.Timeout +// newServiceWithEtcdTickler stops and returns the initCtx.Err() +func newServiceWithEtcdTickler(initCtx context.Context, node *DataNode, info *datapb.ChannelWatchInfo, tickler *etcdTickler) (*dataSyncService, error) { + // recover segment checkpoints + unflushedSegmentInfos, err := getSegmentInfos(initCtx, node.dataCoord, info.GetVchan().GetUnflushedSegmentIds()) + if err != nil { + return nil, err + } + flushedSegmentInfos, err := getSegmentInfos(initCtx, node.dataCoord, info.GetVchan().GetFlushedSegmentIds()) + if err != nil { + return nil, err + } + + // init channel meta + channel, err := getChannelWithEtcdTickler(initCtx, node, info, tickler, unflushedSegmentInfos, flushedSegmentInfos) + if err != nil { + return nil, err + } + + return getServiceWithChannel(initCtx, node, info, channel, unflushedSegmentInfos, flushedSegmentInfos) } diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 93f8621a7b..4ba62077e0 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -41,11 +41,9 @@ import ( "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) var dataSyncServiceTestDir = "/tmp/milvus_test/data_sync_service" @@ -54,6 +52,12 @@ func init() { paramtable.Init() } +func getWatchInfo(info *testInfo) *datapb.ChannelWatchInfo { + return &datapb.ChannelWatchInfo{ + Vchan: getVchanInfo(info), + } +} + func getVchanInfo(info *testInfo) *datapb.VchannelInfo { var ufs []*datapb.SegmentInfo var fs []*datapb.SegmentInfo @@ -99,7 +103,7 @@ func getVchanInfo(info *testInfo) *datapb.VchannelInfo { type testInfo struct { isValidCase bool channelNil bool - inMsgFactory msgstream.Factory + inMsgFactory dependency.Factory collID UniqueID chanName string @@ -117,30 +121,16 @@ type testInfo struct { description string } -func TestDataSyncService_newDataSyncService(t *testing.T) { +func TestDataSyncService_getDataSyncService(t *testing.T) { ctx := context.Background() tests := []*testInfo{ { true, false, &mockMsgStreamFactory{false, true}, - 0, "by-dev-rootcoord-dml-test_v0", - 0, 0, "", 0, - 0, 0, "", 0, - "SetParamsReturnError", - }, - { - true, false, &mockMsgStreamFactory{true, true}, - 0, "by-dev-rootcoord-dml-test_v0", + 1, "by-dev-rootcoord-dml-test_v0", 1, 0, "", 0, - 1, 1, "", 0, - "CollID 0 mismach with seginfo collID 1", - }, - { - true, false, &mockMsgStreamFactory{true, true}, - 1, "by-dev-rootcoord-dml-test_v1", - 1, 0, "by-dev-rootcoord-dml-test_v2", 0, - 1, 1, "by-dev-rootcoord-dml-test_v3", 0, - "chanName c1 mismach with seginfo chanName c2", + 1, 0, "", 0, + "SetParamsReturnError", }, { true, false, &mockMsgStreamFactory{true, true}, @@ -160,34 +150,16 @@ func TestDataSyncService_newDataSyncService(t *testing.T) { cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir)) defer cm.RemoveWithPrefix(ctx, cm.RootPath()) + node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64) + for _, test := range tests { t.Run(test.description, func(t *testing.T) { - df := &DataCoordFactory{} - rc := &RootCoordFactory{pkType: schemapb.DataType_Int64} - - channel := newChannel("channel", test.collID, nil, rc, cm) - if test.channelNil { - channel = nil - } - dispClient := msgdispatcher.NewClient(test.inMsgFactory, typeutil.DataNodeRole, paramtable.GetNodeID()) - - ds, err := newDataSyncService(ctx, + node.factory = test.inMsgFactory + ds, err := newServiceWithEtcdTickler( ctx, - make(chan flushMsg), - make(chan resendTTMsg), - channel, - allocator.NewMockAllocator(t), - dispClient, - test.inMsgFactory, - getVchanInfo(test), - make(chan string), - df, - newCache(), - cm, - newCompactionExecutor(), + node, + getWatchInfo(test), genTestTickler(), - 0, - nil, ) if !test.isValidCase { @@ -208,34 +180,31 @@ func TestDataSyncService_newDataSyncService(t *testing.T) { // NOTE: start pulsar before test func TestDataSyncService_Start(t *testing.T) { const ctxTimeInMillisecond = 10000 + os.RemoveAll("/tmp/milvus") + defer os.RemoveAll("/tmp/milvus") delay := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), delay) defer cancel() - // init data node - insertChannelName := fmt.Sprintf("by-dev-rootcoord-dml-%d", rand.Int()) - - Factory := &MetaFactory{} - collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64) - mockRootCoord := &RootCoordFactory{ - pkType: schemapb.DataType_Int64, - } - - flushChan := make(chan flushMsg, 100) - resendTTChan := make(chan resendTTMsg, 100) - cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir)) - defer cm.RemoveWithPrefix(ctx, cm.RootPath()) - channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm) + node := newIDLEDataNodeMock(context.Background(), schemapb.DataType_Int64) + node.chunkManager = storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir)) + defer node.chunkManager.RemoveWithPrefix(ctx, node.chunkManager.RootPath()) alloc := allocator.NewMockAllocator(t) alloc.EXPECT().Alloc(mock.Anything).Call.Return(int64(22222), func(count uint32) int64 { return int64(22222 + count) }, nil) - factory := dependency.NewDefaultFactory(true) - dispClient := msgdispatcher.NewClient(factory, typeutil.DataNodeRole, paramtable.GetNodeID()) - defer os.RemoveAll("/tmp/milvus") + node.allocator = alloc + + var ( + insertChannelName = fmt.Sprintf("by-dev-rootcoord-dml-%d", rand.Int()) + + Factory = &MetaFactory{} + collMeta = Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64) + ) + paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "1") ufs := []*datapb.SegmentInfo{{ @@ -262,17 +231,18 @@ func TestDataSyncService_Start(t *testing.T) { for _, segmentInfo := range fs { fsIds = append(fsIds, segmentInfo.ID) } - vchan := &datapb.VchannelInfo{ - CollectionID: collMeta.ID, - ChannelName: insertChannelName, - UnflushedSegmentIds: ufsIds, - FlushedSegmentIds: fsIds, + + watchInfo := &datapb.ChannelWatchInfo{ + Schema: collMeta.GetSchema(), + Vchan: &datapb.VchannelInfo{ + CollectionID: collMeta.ID, + ChannelName: insertChannelName, + UnflushedSegmentIds: ufsIds, + FlushedSegmentIds: fsIds, + }, } - signalCh := make(chan string, 100) - - dataCoord := &DataCoordFactory{} - dataCoord.UserSegmentInfo = map[int64]*datapb.SegmentInfo{ + node.dataCoord.(*DataCoordFactory).UserSegmentInfo = map[int64]*datapb.SegmentInfo{ 0: { ID: 0, CollectionID: collMeta.ID, @@ -288,9 +258,14 @@ func TestDataSyncService_Start(t *testing.T) { }, } - atimeTickSender := newTimeTickSender(dataCoord, 0) - sync, err := newDataSyncService(ctx, ctx, flushChan, resendTTChan, channel, alloc, dispClient, factory, vchan, signalCh, dataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0, atimeTickSender) - assert.Nil(t, err) + sync, err := newServiceWithEtcdTickler( + ctx, + node, + watchInfo, + genTestTickler(), + ) + require.NoError(t, err) + require.NotNil(t, sync) sync.flushListener = make(chan *segmentFlushPack) defer close(sync.flushListener) @@ -338,7 +313,7 @@ func TestDataSyncService_Start(t *testing.T) { // pulsar produce assert.NoError(t, err) - insertStream, _ := factory.NewMsgStream(ctx) + insertStream, _ := node.factory.NewMsgStream(ctx) insertStream.AsProducer([]string{insertChannelName}) var insertMsgStream msgstream.MsgStream = insertStream @@ -372,13 +347,12 @@ func TestDataSyncService_Close(t *testing.T) { var ( insertChannelName = "by-dev-rootcoord-dml2" - metaFactory = &MetaFactory{} - mockRootCoord = &RootCoordFactory{pkType: schemapb.DataType_Int64} - - collMeta = metaFactory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64) - cm = storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir)) + metaFactory = &MetaFactory{} + collMeta = metaFactory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64) + node = newIDLEDataNodeMock(context.Background(), schemapb.DataType_Int64) ) - defer cm.RemoveWithPrefix(ctx, cm.RootPath()) + node.chunkManager = storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir)) + defer node.chunkManager.RemoveWithPrefix(ctx, node.chunkManager.RootPath()) ufs := []*datapb.SegmentInfo{{ CollectionID: collMeta.ID, @@ -404,29 +378,25 @@ func TestDataSyncService_Close(t *testing.T) { for _, segmentInfo := range fs { fsIds = append(fsIds, segmentInfo.ID) } - vchan := &datapb.VchannelInfo{ - CollectionID: collMeta.ID, - ChannelName: insertChannelName, - UnflushedSegmentIds: ufsIds, - FlushedSegmentIds: fsIds, + watchInfo := &datapb.ChannelWatchInfo{ + Schema: collMeta.GetSchema(), + Vchan: &datapb.VchannelInfo{ + CollectionID: collMeta.ID, + ChannelName: insertChannelName, + UnflushedSegmentIds: ufsIds, + FlushedSegmentIds: fsIds, + }, } + alloc := allocator.NewMockAllocator(t) alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) alloc.EXPECT().Alloc(mock.Anything).Call.Return(int64(22222), func(count uint32) int64 { return int64(22222 + count) }, nil) + node.allocator = alloc - var ( - flushChan = make(chan flushMsg, 100) - resendTTChan = make(chan resendTTMsg, 100) - signalCh = make(chan string, 100) - - factory = dependency.NewDefaultFactory(true) - dispClient = msgdispatcher.NewClient(factory, typeutil.DataNodeRole, paramtable.GetNodeID()) - mockDataCoord = &DataCoordFactory{} - ) - mockDataCoord.UserSegmentInfo = map[int64]*datapb.SegmentInfo{ + node.dataCoord.(*DataCoordFactory).UserSegmentInfo = map[int64]*datapb.SegmentInfo{ 0: { ID: 0, CollectionID: collMeta.ID, @@ -445,13 +415,17 @@ func TestDataSyncService_Close(t *testing.T) { // No Auto flush paramtable.Get().Reset(Params.DataNodeCfg.FlushInsertBufferSize.Key) - channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm) - channel.syncPolicies = []segmentSyncPolicy{ + syncService, err := newServiceWithEtcdTickler( + context.Background(), + node, + watchInfo, + genTestTickler(), + ) + assert.NoError(t, err) + assert.NotNil(t, syncService) + syncService.channel.(*ChannelMeta).syncPolicies = []segmentSyncPolicy{ syncMemoryTooHigh(), } - atimeTickSender := newTimeTickSender(mockDataCoord, 0) - syncService, err := newDataSyncService(ctx, ctx, flushChan, resendTTChan, channel, alloc, dispClient, factory, vchan, signalCh, mockDataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0, atimeTickSender) - assert.NoError(t, err) syncService.flushListener = make(chan *segmentFlushPack, 10) defer close(syncService.flushListener) @@ -524,7 +498,7 @@ func TestDataSyncService_Close(t *testing.T) { // pulsar produce assert.NoError(t, err) - insertStream, _ := factory.NewMsgStream(ctx) + insertStream, _ := node.factory.NewMsgStream(ctx) insertStream.AsProducer([]string{insertChannelName}) var insertMsgStream msgstream.MsgStream = insertStream @@ -628,22 +602,19 @@ func TestBytesReader(t *testing.T) { func TestGetSegmentInfos(t *testing.T) { dataCoord := &DataCoordFactory{} - dsService := &dataSyncService{ - dataCoord: dataCoord, - } ctx := context.Background() - segmentInfos, err := dsService.getSegmentInfos(ctx, []int64{1}) + segmentInfos, err := getSegmentInfos(ctx, dataCoord, []int64{1}) assert.NoError(t, err) assert.Equal(t, 1, len(segmentInfos)) dataCoord.GetSegmentInfosError = true - segmentInfos2, err := dsService.getSegmentInfos(ctx, []int64{1}) + segmentInfos2, err := getSegmentInfos(ctx, dataCoord, []int64{1}) assert.Error(t, err) assert.Empty(t, segmentInfos2) dataCoord.GetSegmentInfosError = false dataCoord.GetSegmentInfosNotSuccess = true - segmentInfos3, err := dsService.getSegmentInfos(ctx, []int64{1}) + segmentInfos3, err := getSegmentInfos(ctx, dataCoord, []int64{1}) assert.Error(t, err) assert.Empty(t, segmentInfos3) @@ -658,7 +629,7 @@ func TestGetSegmentInfos(t *testing.T) { }, } - segmentInfos, err = dsService.getSegmentInfos(ctx, []int64{5}) + segmentInfos, err = getSegmentInfos(ctx, dataCoord, []int64{5}) assert.NoError(t, err) assert.Equal(t, 1, len(segmentInfos)) assert.Equal(t, int64(100), segmentInfos[0].ID) @@ -734,19 +705,13 @@ func TestGetChannelLatestMsgID(t *testing.T) { delay := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), delay) defer cancel() - factory := dependency.NewDefaultFactory(true) - - dataCoord := &DataCoordFactory{} - dsService := &dataSyncService{ - dataCoord: dataCoord, - msFactory: factory, - } + node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64) dmlChannelName := "fake-by-dev-rootcoord-dml-channel_12345v0" - insertStream, _ := factory.NewMsgStream(ctx) + insertStream, _ := node.factory.NewMsgStream(ctx) insertStream.AsProducer([]string{dmlChannelName}) - id, err := dsService.getChannelLatestMsgID(ctx, dmlChannelName, 0) + id, err := node.getChannelLatestMsgID(ctx, dmlChannelName, 0) assert.NoError(t, err) assert.NotNil(t, id) } diff --git a/internal/datanode/event_manager.go b/internal/datanode/event_manager.go index e1c0e6e09c..9db8448076 100644 --- a/internal/datanode/event_manager.go +++ b/internal/datanode/event_manager.go @@ -167,11 +167,11 @@ func parseDeleteEventKey(key string) string { func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version int64) (err error) { vChanName := watchInfo.GetVchan().GetChannelName() key := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID), vChanName) - tickler := newTickler(version, key, watchInfo, node.watchKv, Params.DataNodeCfg.WatchEventTicklerInterval.GetAsDuration(time.Second)) + tickler := newEtcdTickler(version, key, watchInfo, node.watchKv, Params.DataNodeCfg.WatchEventTicklerInterval.GetAsDuration(time.Second)) switch watchInfo.State { case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch: - if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil { + if err := node.flowgraphManager.addAndStartWithEtcdTickler(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil { log.Warn("handle put event: new data sync service failed", zap.String("vChanName", vChanName), zap.Error(err)) watchInfo.State = datapb.ChannelWatchState_WatchFailure } else { @@ -291,7 +291,7 @@ func isEndWatchState(state datapb.ChannelWatchState) bool { state != datapb.ChannelWatchState_Uncomplete // legacy state, equal to ToWatch } -type tickler struct { +type etcdTickler struct { progress *atomic.Int32 version int64 @@ -305,11 +305,11 @@ type tickler struct { isWatchFailed *atomic.Bool } -func (t *tickler) inc() { +func (t *etcdTickler) inc() { t.progress.Inc() } -func (t *tickler) watch() { +func (t *etcdTickler) watch() { if t.interval == 0 { log.Info("zero interval, close ticler watch", zap.String("channelName", t.watchInfo.GetVchan().GetChannelName()), @@ -363,13 +363,13 @@ func (t *tickler) watch() { }() } -func (t *tickler) stop() { +func (t *etcdTickler) stop() { close(t.closeCh) t.closeWg.Wait() } -func newTickler(version int64, path string, watchInfo *datapb.ChannelWatchInfo, kv kv.WatchKV, interval time.Duration) *tickler { - return &tickler{ +func newEtcdTickler(version int64, path string, watchInfo *datapb.ChannelWatchInfo, kv kv.WatchKV, interval time.Duration) *etcdTickler { + return &etcdTickler{ progress: atomic.NewInt32(0), path: path, kv: kv, diff --git a/internal/datanode/event_manager_test.go b/internal/datanode/event_manager_test.go index cd1732c8e0..f8b9eb988f 100644 --- a/internal/datanode/event_manager_test.go +++ b/internal/datanode/event_manager_test.go @@ -432,7 +432,7 @@ func TestEventTickler(t *testing.T) { kv.RemoveWithPrefix(etcdPrefix) defer kv.RemoveWithPrefix(etcdPrefix) - tickler := newTickler(0, path.Join(etcdPrefix, channelName), &datapb.ChannelWatchInfo{ + tickler := newEtcdTickler(0, path.Join(etcdPrefix, channelName), &datapb.ChannelWatchInfo{ Vchan: &datapb.VchannelInfo{ ChannelName: channelName, }, diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index 8bed430d23..faccda3a53 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -198,8 +198,8 @@ func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []primaryKey, tss [ func newDeleteNode(ctx context.Context, fm flushManager, manager *DeltaBufferManager, sig chan<- string, config *nodeConfig) (*deleteNode, error) { baseNode := BaseNode{} - baseNode.SetMaxQueueLength(config.maxQueueLength) - baseNode.SetMaxParallelism(config.maxParallelism) + baseNode.SetMaxQueueLength(Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()) + baseNode.SetMaxParallelism(Params.DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32()) return &deleteNode{ ctx: ctx, diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index 1e4b40a9ed..7add6b06f6 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -66,8 +66,8 @@ func newDmInputNode(initCtx context.Context, dispatcherClient msgdispatcher.Clie node := flowgraph.NewInputNode( input, name, - dmNodeConfig.maxQueueLength, - dmNodeConfig.maxParallelism, + Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32(), + Params.DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32(), typeutil.DataNodeRole, paramtable.GetNodeID(), dmNodeConfig.collectionID, diff --git a/internal/datanode/flow_graph_dmstream_input_node_test.go b/internal/datanode/flow_graph_dmstream_input_node_test.go index 045b411575..ad23b2e2eb 100644 --- a/internal/datanode/flow_graph_dmstream_input_node_test.go +++ b/internal/datanode/flow_graph_dmstream_input_node_test.go @@ -24,6 +24,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" @@ -36,13 +38,14 @@ type mockMsgStreamFactory struct { NewMsgStreamNoError bool } -var _ msgstream.Factory = &mockMsgStreamFactory{} +var ( + _ msgstream.Factory = &mockMsgStreamFactory{} + _ dependency.Factory = (*mockMsgStreamFactory)(nil) +) -func (mm *mockMsgStreamFactory) Init(params *paramtable.ComponentParam) error { - if !mm.InitReturnNil { - return errors.New("Init Error") - } - return nil +func (mm *mockMsgStreamFactory) Init(params *paramtable.ComponentParam) {} +func (mm *mockMsgStreamFactory) NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) { + return nil, nil } func (mm *mockMsgStreamFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) { diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 46bcffa337..69b655dd17 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -699,12 +699,19 @@ func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID Uni return ibNode.channel.getCollectionAndPartitionID(segmentID) } -func newInsertBufferNode(ctx context.Context, collID UniqueID, delBufManager *DeltaBufferManager, flushCh <-chan flushMsg, resendTTCh <-chan resendTTMsg, - fm flushManager, flushingSegCache *Cache, config *nodeConfig, timeTickManager *timeTickSender, +func newInsertBufferNode( + ctx context.Context, + flushCh <-chan flushMsg, + resendTTCh <-chan resendTTMsg, + delBufManager *DeltaBufferManager, + fm flushManager, + flushingSegCache *Cache, + timeTickManager *timeTickSender, + config *nodeConfig, ) (*insertBufferNode, error) { baseNode := BaseNode{} - baseNode.SetMaxQueueLength(config.maxQueueLength) - baseNode.SetMaxParallelism(config.maxParallelism) + baseNode.SetMaxQueueLength(Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()) + baseNode.SetMaxParallelism(Params.DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32()) if Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() { return &insertBufferNode{ @@ -767,7 +774,7 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, delBufManager *De sub := tsoutil.SubByNow(ts) pChan := funcutil.ToPhysicalChannel(config.vChannelName) metrics.DataNodeProduceTimeTickLag. - WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(collID), pChan). + WithLabelValues(fmt.Sprint(config.serverID), fmt.Sprint(config.collectionID), pChan). Set(float64(sub)) return wTtMsgStream.Produce(&msgPack) }) diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 93eca76e2e..965f863128 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -120,7 +120,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { dataCoord := &DataCoordFactory{} atimeTickSender := newTimeTickSender(dataCoord, 0) - iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender) + iBNode, err := newInsertBufferNode(ctx, flushChan, resendTTChan, delBufManager, fm, newCache(), atimeTickSender, c) assert.NotNil(t, iBNode) require.NoError(t, err) } @@ -221,7 +221,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { dataCoord := &DataCoordFactory{} atimeTickSender := newTimeTickSender(dataCoord, 0) - iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender) + iBNode, err := newInsertBufferNode(ctx, flushChan, resendTTChan, delBufManager, fm, newCache(), atimeTickSender, c) require.NoError(t, err) flushChan <- flushMsg{ @@ -387,6 +387,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { flushChan := make(chan flushMsg, 100) resendTTChan := make(chan resendTTMsg, 100) c := &nodeConfig{ + collectionID: collMeta.GetID(), channel: channel, msFactory: factory, allocator: alloc, @@ -398,7 +399,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { } dataCoord := &DataCoordFactory{} atimeTickSender := newTimeTickSender(dataCoord, 0) - iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender) + iBNode, err := newInsertBufferNode(ctx, flushChan, resendTTChan, delBufManager, fm, newCache(), atimeTickSender, c) require.NoError(t, err) // Auto flush number of rows set to 2 @@ -632,6 +633,7 @@ func TestInsertBufferNodeRollBF(t *testing.T) { flushChan := make(chan flushMsg, 100) resendTTChan := make(chan resendTTMsg, 100) c := &nodeConfig{ + collectionID: collMeta.ID, channel: channel, msFactory: factory, allocator: alloc, @@ -644,7 +646,7 @@ func TestInsertBufferNodeRollBF(t *testing.T) { dataCoord := &DataCoordFactory{} atimeTickSender := newTimeTickSender(dataCoord, 0) - iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender) + iBNode, err := newInsertBufferNode(ctx, flushChan, resendTTChan, delBufManager, fm, newCache(), atimeTickSender, c) require.NoError(t, err) // Auto flush number of rows set to 2 @@ -1012,6 +1014,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { flushChan := make(chan flushMsg, 100) resendTTChan := make(chan resendTTMsg, 100) c := &nodeConfig{ + collectionID: collMeta.ID, channel: channel, msFactory: factory, allocator: alloc, @@ -1024,7 +1027,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { dataCoord := &DataCoordFactory{} atimeTickSender := newTimeTickSender(dataCoord, 0) - iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender) + iBNode, err := newInsertBufferNode(ctx, flushChan, resendTTChan, delBufManager, fm, newCache(), atimeTickSender, c) require.NoError(t, err) inMsg := genFlowGraphInsertMsg(insertChannelName) diff --git a/internal/datanode/flow_graph_manager.go b/internal/datanode/flow_graph_manager.go index 9213a60458..5d175135b2 100644 --- a/internal/datanode/flow_graph_manager.go +++ b/internal/datanode/flow_graph_manager.go @@ -115,14 +115,14 @@ func (fm *flowgraphManager) execute(totalMemory uint64) { } } -func (fm *flowgraphManager) addAndStart(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *tickler) error { +func (fm *flowgraphManager) addAndStartWithEtcdTickler(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *etcdTickler) error { log := log.With(zap.String("channel", vchan.GetChannelName())) if fm.flowgraphs.Contain(vchan.GetChannelName()) { log.Warn("try to add an existed DataSyncService") return nil } - dataSyncService, err := getDataSyncService(context.TODO(), dn, &datapb.ChannelWatchInfo{ + dataSyncService, err := newServiceWithEtcdTickler(context.TODO(), dn, &datapb.ChannelWatchInfo{ Schema: schema, Vchan: vchan, }, tickler) @@ -240,45 +240,3 @@ func (fm *flowgraphManager) collections() []int64 { return collectionSet.Collect() } - -// getDataSyncService gets and init the dataSyncService -// initCtx is used to init the dataSyncService only, if initCtx.Canceled or initCtx.Timeout -// getDataSyncService stops and returns the initCtx.Err() -func getDataSyncService(initCtx context.Context, node *DataNode, info *datapb.ChannelWatchInfo, tickler *tickler) (*dataSyncService, error) { - channelName := info.GetVchan().GetChannelName() - log := log.With(zap.String("channel", channelName)) - - channel := newChannel( - info.GetVchan().GetChannelName(), - info.GetVchan().GetCollectionID(), - info.GetSchema(), - node.rootCoord, - node.chunkManager, - ) - - dataSyncService, err := newDataSyncService( - node.ctx, - initCtx, - make(chan flushMsg, 100), - make(chan resendTTMsg, 100), - channel, - node.allocator, - node.dispClient, - node.factory, - info.GetVchan(), - node.clearSignal, - node.dataCoord, - node.segmentCache, - node.chunkManager, - node.compactionExecutor, - tickler, - node.GetSession().ServerID, - node.timeTickSender, - ) - if err != nil { - log.Warn("fail to create new datasyncservice", zap.Error(err)) - return nil, err - } - - return dataSyncService, nil -} diff --git a/internal/datanode/flow_graph_manager_test.go b/internal/datanode/flow_graph_manager_test.go index 22ffcbf925..b396ee1901 100644 --- a/internal/datanode/flow_graph_manager_test.go +++ b/internal/datanode/flow_graph_manager_test.go @@ -64,7 +64,7 @@ func TestFlowGraphManager(t *testing.T) { } require.False(t, fm.exist(vchanName)) - err := fm.addAndStart(node, vchan, nil, genTestTickler()) + err := fm.addAndStartWithEtcdTickler(node, vchan, nil, genTestTickler()) assert.NoError(t, err) assert.True(t, fm.exist(vchanName)) @@ -79,7 +79,7 @@ func TestFlowGraphManager(t *testing.T) { } require.False(t, fm.exist(vchanName)) - err := fm.addAndStart(node, vchan, nil, genTestTickler()) + err := fm.addAndStartWithEtcdTickler(node, vchan, nil, genTestTickler()) assert.NoError(t, err) assert.True(t, fm.exist(vchanName)) @@ -97,7 +97,7 @@ func TestFlowGraphManager(t *testing.T) { } require.False(t, fm.exist(vchanName)) - err := fm.addAndStart(node, vchan, nil, genTestTickler()) + err := fm.addAndStartWithEtcdTickler(node, vchan, nil, genTestTickler()) assert.NoError(t, err) assert.True(t, fm.exist(vchanName)) fg, ok := fm.getFlowgraphService(vchanName) @@ -147,7 +147,7 @@ func TestFlowGraphManager(t *testing.T) { } require.False(t, fm.exist(vchanName)) - err := fm.addAndStart(node, vchan, nil, genTestTickler()) + err := fm.addAndStartWithEtcdTickler(node, vchan, nil, genTestTickler()) assert.NoError(t, err) assert.True(t, fm.exist(vchanName)) @@ -226,7 +226,7 @@ func TestFlowGraphManager(t *testing.T) { vchan := &datapb.VchannelInfo{ ChannelName: vchannel, } - err = fm.addAndStart(node, vchan, nil, genTestTickler()) + err = fm.addAndStartWithEtcdTickler(node, vchan, nil, genTestTickler()) assert.NoError(t, err) fg, ok := fm.flowgraphs.Get(vchannel) assert.True(t, ok) diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index c3f16c15c9..c7d4f4448f 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -746,7 +746,7 @@ func dropVirtualChannelFunc(dsService *dataSyncService, opts ...retry.Option) fl Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(0), // TODO msg type commonpbutil.WithMsgID(0), // TODO msg id - commonpbutil.WithSourceID(paramtable.GetNodeID()), + commonpbutil.WithSourceID(dsService.serverID), ), ChannelName: dsService.vchannelName, } @@ -900,7 +900,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(0), commonpbutil.WithMsgID(0), - commonpbutil.WithSourceID(dsService.serverID), + commonpbutil.WithSourceID(paramtable.GetNodeID()), ), SegmentID: pack.segmentID, CollectionID: dsService.collectionID, diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 2150743dbd..bb7208926c 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -92,6 +92,7 @@ func newIDLEDataNodeMock(ctx context.Context, pkType schemapb.DataType) *DataNod ds := &DataCoordFactory{} node.dataCoord = ds + node.timeTickSender = newTimeTickSender(node.dataCoord, 0) return node } @@ -312,7 +313,8 @@ func (ds *DataCoordFactory) GetSegmentInfo(ctx context.Context, req *datapb.GetS segmentInfos = append(segmentInfos, segInfo) } else { segmentInfos = append(segmentInfos, &datapb.SegmentInfo{ - ID: segmentID, + ID: segmentID, + CollectionID: 1, }) } } @@ -1257,6 +1259,6 @@ func genTimestamp() typeutil.Timestamp { return tsoutil.ComposeTSByTime(gb, 0) } -func genTestTickler() *tickler { - return newTickler(0, "", nil, nil, 0) +func genTestTickler() *etcdTickler { + return newEtcdTickler(0, "", nil, nil, 0) } diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 48bd910985..d656b5d3ab 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -43,7 +43,9 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/metricsinfo" @@ -642,7 +644,7 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor // Get the current dml channel position ID, that will be used in segments start positions and end positions. var posID []byte err = retry.Do(ctx, func() error { - id, innerError := ds.getChannelLatestMsgID(context.Background(), req.GetChannelName(), req.GetSegmentId()) + id, innerError := node.getChannelLatestMsgID(context.Background(), req.GetChannelName(), req.GetSegmentId()) posID = id return innerError }, retry.Attempts(30)) @@ -701,6 +703,28 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor }, nil } +func (node *DataNode) getChannelLatestMsgID(ctx context.Context, channelName string, segmentID int64) ([]byte, error) { + pChannelName := funcutil.ToPhysicalChannel(channelName) + dmlStream, err := node.factory.NewMsgStream(ctx) + if err != nil { + return nil, err + } + defer dmlStream.Close() + + subName := fmt.Sprintf("datanode-%d-%s-%d", paramtable.GetNodeID(), channelName, segmentID) + log.Debug("dataSyncService register consumer for getChannelLatestMsgID", + zap.String("pChannelName", pChannelName), + zap.String("subscription", subName), + ) + dmlStream.AsConsumer(ctx, []string{pChannelName}, subName, mqwrapper.SubscriptionPositionUnknown) + id, err := dmlStream.GetLatestMsgID(pChannelName) + if err != nil { + log.Error("fail to GetLatestMsgID", zap.String("pChannelName", pChannelName), zap.Error(err)) + return nil, err + } + return id.Serialize(), nil +} + func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil.AssignSegmentFunc { return func(shardID int, partID int64) (int64, string, error) { chNames := req.GetImportTask().GetChannelNames() diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 28cea854fb..57ab03dfcd 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -190,7 +190,7 @@ func (s *DataNodeServicesSuite) TestFlushSegments() { FlushedSegmentIds: []int64{}, } - err := s.node.flowgraphManager.addAndStart(s.node, vchan, nil, genTestTickler()) + err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, vchan, nil, genTestTickler()) s.Require().NoError(err) fgservice, ok := s.node.flowgraphManager.getFlowgraphService(dmChannelName) @@ -394,14 +394,14 @@ func (s *DataNodeServicesSuite) TestImport() { chName1 := "fake-by-dev-rootcoord-dml-testimport-1" chName2 := "fake-by-dev-rootcoord-dml-testimport-2" - err := s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{ + err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ CollectionID: 100, ChannelName: chName1, UnflushedSegmentIds: []int64{}, FlushedSegmentIds: []int64{}, }, nil, genTestTickler()) s.Require().Nil(err) - err = s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{ + err = s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ CollectionID: 100, ChannelName: chName2, UnflushedSegmentIds: []int64{}, @@ -472,14 +472,14 @@ func (s *DataNodeServicesSuite) TestImport() { s.Run("Test Import bad flow graph", func() { chName1 := "fake-by-dev-rootcoord-dml-testimport-1-badflowgraph" chName2 := "fake-by-dev-rootcoord-dml-testimport-2-badflowgraph" - err := s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{ + err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ CollectionID: 100, ChannelName: chName1, UnflushedSegmentIds: []int64{}, FlushedSegmentIds: []int64{}, }, nil, genTestTickler()) s.Require().Nil(err) - err = s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{ + err = s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ CollectionID: 999, // wrong collection ID. ChannelName: chName2, UnflushedSegmentIds: []int64{}, @@ -620,14 +620,14 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() { chName1 := "fake-by-dev-rootcoord-dml-testaddsegment-1" chName2 := "fake-by-dev-rootcoord-dml-testaddsegment-2" - err := s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{ + err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ CollectionID: 100, ChannelName: chName1, UnflushedSegmentIds: []int64{}, FlushedSegmentIds: []int64{}, }, nil, genTestTickler()) s.Require().NoError(err) - err = s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{ + err = s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ CollectionID: 100, ChannelName: chName2, UnflushedSegmentIds: []int64{}, @@ -670,7 +670,8 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() { func (s *DataNodeServicesSuite) TestSyncSegments() { chanName := "fake-by-dev-rootcoord-dml-test-syncsegments-1" - err := s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{ + err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ + CollectionID: 1, ChannelName: chanName, UnflushedSegmentIds: []int64{}, FlushedSegmentIds: []int64{100, 200, 300}, @@ -679,9 +680,9 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { fg, ok := s.node.flowgraphManager.getFlowgraphService(chanName) s.Assert().True(ok) - s1 := Segment{segmentID: 100} - s2 := Segment{segmentID: 200} - s3 := Segment{segmentID: 300} + s1 := Segment{segmentID: 100, collectionID: 1} + s2 := Segment{segmentID: 200, collectionID: 1} + s3 := Segment{segmentID: 300, collectionID: 1} s1.setType(datapb.SegmentType_Flushed) s2.setType(datapb.SegmentType_Flushed) s3.setType(datapb.SegmentType_Flushed) @@ -762,7 +763,7 @@ func (s *DataNodeServicesSuite) TestResendSegmentStats() { FlushedSegmentIds: []int64{}, } - err := s.node.flowgraphManager.addAndStart(s.node, vChan, nil, genTestTickler()) + err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, vChan, nil, genTestTickler()) s.Require().Nil(err) fgService, ok := s.node.flowgraphManager.getFlowgraphService(dmChannelName) @@ -831,7 +832,7 @@ func (s *DataNodeServicesSuite) TestFlushChannels() { FlushedSegmentIds: []int64{}, } - err := s.node.flowgraphManager.addAndStart(s.node, vChan, nil, genTestTickler()) + err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, vChan, nil, genTestTickler()) s.Require().NoError(err) fgService, ok := s.node.flowgraphManager.getFlowgraphService(dmChannelName) diff --git a/internal/util/flowgraph/flow_graph.go b/internal/util/flowgraph/flow_graph.go index 8f5e9777b4..46b6cd79b6 100644 --- a/internal/util/flowgraph/flow_graph.go +++ b/internal/util/flowgraph/flow_graph.go @@ -18,6 +18,7 @@ package flowgraph import ( "context" + "fmt" "sync" "github.com/cockroachdb/errors" @@ -125,3 +126,21 @@ func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph { return &flowGraph } + +func (fg *TimeTickedFlowGraph) AssembleNodes(orderedNodes ...Node) error { + for _, node := range orderedNodes { + fg.AddNode(node) + } + + for i, node := range orderedNodes { + // Set edge to the next node + if i < len(orderedNodes)-1 { + err := fg.SetEdges(node.Name(), []string{orderedNodes[i+1].Name()}) + if err != nil { + errMsg := fmt.Sprintf("set edges failed for flow graph, node=%s", node.Name()) + return errors.New(errMsg) + } + } + } + return nil +}