Concurrent load segment stats log when initialize dataSyncService (#18125)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2022-07-07 14:32:21 +08:00 committed by GitHub
parent dd310bbce9
commit 8f1ba6a0bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 12 deletions

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/concurrency"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
@ -52,6 +53,9 @@ type dataSyncService struct {
flushManager flushManager // flush manager handles flush process
chunkManager storage.ChunkManager
compactor *compactionExecutor // reference to compaction executor
// concurrent add segments, reduce time to load delta log from oss
ioPool *concurrency.Pool
}
func newDataSyncService(ctx context.Context,
@ -72,6 +76,14 @@ func newDataSyncService(ctx context.Context,
return nil, errors.New("Nil input")
}
// Initialize io cocurrency pool
log.Info("initialize io concurrency pool", zap.String("vchannel", vchan.GetChannelName()), zap.Int("ioConcurrency", Params.DataNodeCfg.IOConcurrency))
ioPool, err := concurrency.NewPool(Params.DataNodeCfg.IOConcurrency)
if err != nil {
log.Error("failed to create goroutine pool for dataSyncService", zap.Error(err))
return nil, err
}
ctx1, cancel := context.WithCancel(ctx)
service := &dataSyncService{
@ -90,6 +102,7 @@ func newDataSyncService(ctx context.Context,
flushingSegCache: flushingSegCache,
chunkManager: chunkManager,
compactor: compactor,
ioPool: ioPool,
}
if err := service.initNodes(vchan); err != nil {
@ -189,6 +202,12 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
if err != nil {
return err
}
flushedSegmentInfos, err := dsService.getSegmentInfos(vchanInfo.GetFlushedSegmentIds())
if err != nil {
return err
}
futures := make([]*concurrency.Future, 0, len(unflushedSegmentInfos)+len(flushedSegmentInfos))
for _, us := range unflushedSegmentInfos {
if us.CollectionID != dsService.collectionID ||
us.GetInsertChannel() != vchanInfo.ChannelName {
@ -213,16 +232,18 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
pos: *us.GetDmlPosition(),
}
}
if err := dsService.replica.addNormalSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel(),
us.GetNumOfRows(), us.Statslogs, cp, vchanInfo.GetSeekPosition().GetTimestamp()); err != nil {
return err
}
// avoid closure capture iteration variable
segment := us
future := dsService.ioPool.Submit(func() (interface{}, error) {
if err := dsService.replica.addNormalSegment(segment.GetID(), segment.GetCollectionID(), segment.GetPartitionID(), segment.GetInsertChannel(),
segment.GetNumOfRows(), segment.GetStatslogs(), cp, vchanInfo.GetSeekPosition().GetTimestamp()); err != nil {
return nil, err
}
return nil, nil
})
futures = append(futures, future)
}
flushedSegmentInfos, err := dsService.getSegmentInfos(vchanInfo.GetFlushedSegmentIds())
if err != nil {
return err
}
for _, fs := range flushedSegmentInfos {
if fs.CollectionID != dsService.collectionID ||
fs.GetInsertChannel() != vchanInfo.ChannelName {
@ -240,10 +261,21 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
zap.Int64("SegmentID", fs.GetID()),
zap.Int64("NumOfRows", fs.GetNumOfRows()),
)
if err := dsService.replica.addFlushedSegment(fs.GetID(), fs.CollectionID, fs.PartitionID, fs.GetInsertChannel(),
fs.GetNumOfRows(), fs.Statslogs, vchanInfo.GetSeekPosition().GetTimestamp()); err != nil {
return err
}
// avoid closure capture iteration variable
segment := fs
future := dsService.ioPool.Submit(func() (interface{}, error) {
if err := dsService.replica.addFlushedSegment(segment.GetID(), segment.GetCollectionID(), segment.GetPartitionID(), segment.GetInsertChannel(),
segment.GetNumOfRows(), segment.GetStatslogs(), vchanInfo.GetSeekPosition().GetTimestamp()); err != nil {
return nil, err
}
return nil, nil
})
futures = append(futures, future)
}
err = concurrency.AwaitAll(futures...)
if err != nil {
return err
}
c := &nodeConfig{

View File

@ -1101,6 +1101,9 @@ type dataNodeConfig struct {
// etcd
ChannelWatchSubPath string
// io concurrency to fetch stats logs
IOConcurrency int
CreatedTime time.Time
UpdatedTime time.Time
}
@ -1114,6 +1117,7 @@ func (p *dataNodeConfig) init(base *BaseTable) {
p.initInsertBinlogRootPath()
p.initStatsBinlogRootPath()
p.initDeleteBinlogRootPath()
p.initIOConcurrency()
p.initChannelWatchPath()
}
@ -1164,6 +1168,10 @@ func (p *dataNodeConfig) initChannelWatchPath() {
p.ChannelWatchSubPath = "channelwatch"
}
func (p *dataNodeConfig) initIOConcurrency() {
p.IOConcurrency = p.Base.ParseIntWithDefault("dataNode.dataSync.ioConcurrency", 10)
}
func (p *dataNodeConfig) SetNodeID(id UniqueID) {
p.NodeID.Store(id)
}