diff --git a/internal/datanode/io/binlog_io.go b/internal/datanode/io/binlog_io.go index b0731d23d5..bd470d21e6 100644 --- a/internal/datanode/io/binlog_io.go +++ b/internal/datanode/io/binlog_io.go @@ -85,17 +85,28 @@ func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte, func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "Upload") defer span.End() - future := b.pool.Submit(func() (any, error) { - log.Debug("BinlogIO uplaod", zap.Strings("paths", lo.Keys(kvs))) - err := retry.Do(ctx, func() error { - return b.MultiWrite(ctx, kvs) + + futures := make([]*conc.Future[any], 0, len(kvs)) + for k, v := range kvs { + innerK, innerV := k, v + future := b.pool.Submit(func() (any, error) { + var err error + log.Debug("BinlogIO upload", zap.String("paths", innerK)) + err = retry.Do(ctx, func() error { + err = b.Write(ctx, innerK, innerV) + if err != nil { + log.Warn("BinlogIO fail to upload", zap.String("paths", innerK), zap.Error(err)) + } + return err + }) + + return struct{}{}, err }) - return nil, err - }) + futures = append(futures, future) + } - _, err := future.Await() - return err + return conc.AwaitAll(futures...) } func (b *BinlogIoImpl) JoinFullPath(paths ...string) string { diff --git a/internal/datanode/l0_compactor.go b/internal/datanode/l0_compactor.go index 6d17294b1e..5031cc775f 100644 --- a/internal/datanode/l0_compactor.go +++ b/internal/datanode/l0_compactor.go @@ -355,6 +355,8 @@ func (t *levelZeroCompactionTask) composeDeltalog(segmentID int64, dData *storag } func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireCheck bool, alteredSegments map[int64]*storage.DeleteData, resultSegments map[int64]*datapb.CompactionSegment) error { + allBlobs := make(map[string][]byte) + tmpResults := make(map[int64]*datapb.CompactionSegment) for segID, dData := range alteredSegments { if !requireCheck || (dData.Size() >= paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64()) { blobs, binlog, err := t.composeDeltalog(segID, dData) @@ -362,24 +364,33 @@ func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireChec log.Warn("L0 compaction composeDelta fail", zap.Int64("segmentID", segID), zap.Error(err)) return err } - err = t.Upload(ctx, blobs) - if err != nil { - log.Warn("L0 compaction upload blobs fail", zap.Int64("segmentID", segID), zap.Any("binlog", binlog), zap.Error(err)) - return err + allBlobs = lo.Assign(blobs, allBlobs) + tmpResults[segID] = &datapb.CompactionSegment{ + SegmentID: segID, + Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{binlog}}}, + Channel: t.plan.GetChannel(), } - - if _, ok := resultSegments[segID]; !ok { - resultSegments[segID] = &datapb.CompactionSegment{ - SegmentID: segID, - Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{binlog}}}, - Channel: t.plan.GetChannel(), - } - } else { - resultSegments[segID].Deltalogs[0].Binlogs = append(resultSegments[segID].Deltalogs[0].Binlogs, binlog) - } - delete(alteredSegments, segID) } } + + if len(allBlobs) == 0 { + return nil + } + + if err := t.Upload(ctx, allBlobs); err != nil { + log.Warn("L0 compaction upload blobs fail", zap.Error(err)) + return err + } + + for segID, compSeg := range tmpResults { + if _, ok := resultSegments[segID]; !ok { + resultSegments[segID] = compSeg + } else { + binlog := compSeg.Deltalogs[0].Binlogs[0] + resultSegments[segID].Deltalogs[0].Binlogs = append(resultSegments[segID].Deltalogs[0].Binlogs, binlog) + } + } + return nil } diff --git a/internal/datanode/l0_compactor_test.go b/internal/datanode/l0_compactor_test.go index ba7564b92e..80b9241285 100644 --- a/internal/datanode/l0_compactor_test.go +++ b/internal/datanode/l0_compactor_test.go @@ -220,7 +220,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() { RunAndReturn(func(paths ...string) string { return path.Join(paths...) }).Times(2) - s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Times(2) + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once() s.Require().Equal(plan.GetPlanID(), s.task.getPlanID()) s.Require().Equal(plan.GetChannel(), s.task.getChannelName()) @@ -322,7 +322,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() { RunAndReturn(func(paths ...string) string { return path.Join(paths...) }).Times(2) - s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Times(2) + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once() l0Segments := lo.Filter(s.task.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool { return s.Level == datapb.SegmentLevel_L0