diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 3c9040daa2..b1706f51c9 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/concurrency" "github.com/milvus-io/milvus/internal/util/flowgraph" ) @@ -52,6 +53,9 @@ type dataSyncService struct { flushManager flushManager // flush manager handles flush process chunkManager storage.ChunkManager compactor *compactionExecutor // reference to compaction executor + + // concurrent add segments, reduce time to load delta log from oss + ioPool *concurrency.Pool } func newDataSyncService(ctx context.Context, @@ -72,6 +76,14 @@ func newDataSyncService(ctx context.Context, return nil, errors.New("Nil input") } + // Initialize io cocurrency pool + log.Info("initialize io concurrency pool", zap.String("vchannel", vchan.GetChannelName()), zap.Int("ioConcurrency", Params.DataNodeCfg.IOConcurrency)) + ioPool, err := concurrency.NewPool(Params.DataNodeCfg.IOConcurrency) + if err != nil { + log.Error("failed to create goroutine pool for dataSyncService", zap.Error(err)) + return nil, err + } + ctx1, cancel := context.WithCancel(ctx) service := &dataSyncService{ @@ -90,6 +102,7 @@ func newDataSyncService(ctx context.Context, flushingSegCache: flushingSegCache, chunkManager: chunkManager, compactor: compactor, + ioPool: ioPool, } if err := service.initNodes(vchan); err != nil { @@ -189,6 +202,12 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro if err != nil { return err } + flushedSegmentInfos, err := dsService.getSegmentInfos(vchanInfo.GetFlushedSegmentIds()) + if err != nil { + return err + } + + futures := make([]*concurrency.Future, 0, len(unflushedSegmentInfos)+len(flushedSegmentInfos)) for _, us := range unflushedSegmentInfos { if us.CollectionID != dsService.collectionID || us.GetInsertChannel() != vchanInfo.ChannelName { @@ -213,16 +232,18 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro pos: *us.GetDmlPosition(), } } - if err := dsService.replica.addNormalSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel(), - us.GetNumOfRows(), us.Statslogs, cp, vchanInfo.GetSeekPosition().GetTimestamp()); err != nil { - return err - } + // avoid closure capture iteration variable + segment := us + future := dsService.ioPool.Submit(func() (interface{}, error) { + if err := dsService.replica.addNormalSegment(segment.GetID(), segment.GetCollectionID(), segment.GetPartitionID(), segment.GetInsertChannel(), + segment.GetNumOfRows(), segment.GetStatslogs(), cp, vchanInfo.GetSeekPosition().GetTimestamp()); err != nil { + return nil, err + } + return nil, nil + }) + futures = append(futures, future) } - flushedSegmentInfos, err := dsService.getSegmentInfos(vchanInfo.GetFlushedSegmentIds()) - if err != nil { - return err - } for _, fs := range flushedSegmentInfos { if fs.CollectionID != dsService.collectionID || fs.GetInsertChannel() != vchanInfo.ChannelName { @@ -240,10 +261,21 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro zap.Int64("SegmentID", fs.GetID()), zap.Int64("NumOfRows", fs.GetNumOfRows()), ) - if err := dsService.replica.addFlushedSegment(fs.GetID(), fs.CollectionID, fs.PartitionID, fs.GetInsertChannel(), - fs.GetNumOfRows(), fs.Statslogs, vchanInfo.GetSeekPosition().GetTimestamp()); err != nil { - return err - } + // avoid closure capture iteration variable + segment := fs + future := dsService.ioPool.Submit(func() (interface{}, error) { + if err := dsService.replica.addFlushedSegment(segment.GetID(), segment.GetCollectionID(), segment.GetPartitionID(), segment.GetInsertChannel(), + segment.GetNumOfRows(), segment.GetStatslogs(), vchanInfo.GetSeekPosition().GetTimestamp()); err != nil { + return nil, err + } + return nil, nil + }) + futures = append(futures, future) + } + + err = concurrency.AwaitAll(futures...) + if err != nil { + return err } c := &nodeConfig{ diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index b6516d36a1..246d729827 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -1101,6 +1101,9 @@ type dataNodeConfig struct { // etcd ChannelWatchSubPath string + // io concurrency to fetch stats logs + IOConcurrency int + CreatedTime time.Time UpdatedTime time.Time } @@ -1114,6 +1117,7 @@ func (p *dataNodeConfig) init(base *BaseTable) { p.initInsertBinlogRootPath() p.initStatsBinlogRootPath() p.initDeleteBinlogRootPath() + p.initIOConcurrency() p.initChannelWatchPath() } @@ -1164,6 +1168,10 @@ func (p *dataNodeConfig) initChannelWatchPath() { p.ChannelWatchSubPath = "channelwatch" } +func (p *dataNodeConfig) initIOConcurrency() { + p.IOConcurrency = p.Base.ParseIntWithDefault("dataNode.dataSync.ioConcurrency", 10) +} + func (p *dataNodeConfig) SetNodeID(id UniqueID) { p.NodeID.Store(id) }