From dc6a6a50facf3a3a96ba4eab152fd4381fc50e49 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 5 Jan 2024 10:04:46 +0800 Subject: [PATCH] enhance: reduce SyncTask AllocID call and refine code (#29701) See also #27675 `Allocator.Alloc` and `Allocator.AllocOne` might be invoked multiple times if there were multiple blobs set in one sync task. This PR add pre-fetch logic for all blobs and cache logIDs in sync task so that at most only one call of the allocator is needed. --------- Signed-off-by: Congqi Xia --- internal/datanode/syncmgr/task.go | 76 ++++++++++++-------------- internal/datanode/syncmgr/task_test.go | 27 +++++++++ 2 files changed, 62 insertions(+), 41 deletions(-) diff --git a/internal/datanode/syncmgr/task.go b/internal/datanode/syncmgr/task.go index ce0c5eb30b..c63ae61e77 100644 --- a/internal/datanode/syncmgr/task.go +++ b/internal/datanode/syncmgr/task.go @@ -80,6 +80,9 @@ type SyncTask struct { deltaBlob *storage.Blob deltaRowCount int64 + // prefetched log ids + ids []int64 + segmentData map[string][]byte writeRetryOpts []retry.Option @@ -141,27 +144,15 @@ func (t *SyncTask) Run() (err error) { t.segmentID = t.segment.CompactTo() } - err = t.processInsertBlobs() + err = t.prefetchIDs() if err != nil { - log.Warn("failed to process insert blobs", zap.Error(err)) + log.Warn("failed allocate ids for sync task", zap.Error(err)) return err } - err = t.processStatsBlob() - if err != nil { - log.Warn("failed to serialize insert data", zap.Error(err)) - t.handleError(err, metricSegLevel) - log.Warn("failed to process stats blobs", zap.Error(err)) - return err - } - - err = t.processDeltaBlob() - if err != nil { - log.Warn("failed to serialize delete data", zap.Error(err)) - t.handleError(err, metricSegLevel) - log.Warn("failed to process delta blobs", zap.Error(err)) - return err - } + t.processInsertBlobs() + t.processStatsBlob() + t.processDeltaBlob() err = t.writeLogs() if err != nil { @@ -210,18 +201,35 @@ func (t *SyncTask) Run() (err error) { return nil } -func (t *SyncTask) processInsertBlobs() error { - if len(t.binlogBlobs) == 0 { - return nil +// prefetchIDs pre-allcates ids depending on the number of blobs current task contains. +func (t *SyncTask) prefetchIDs() error { + totalIDCount := len(t.binlogBlobs) + if t.batchStatsBlob != nil { + totalIDCount++ } - - logidx, _, err := t.allocator.Alloc(uint32(len(t.binlogBlobs))) + if t.deltaBlob != nil { + totalIDCount++ + } + start, _, err := t.allocator.Alloc(uint32(totalIDCount)) if err != nil { return err } + t.ids = lo.RangeFrom(start, totalIDCount) + return nil +} +func (t *SyncTask) nextID() int64 { + if len(t.ids) == 0 { + panic("pre-fetched ids exhausted") + } + r := t.ids[0] + t.ids = t.ids[1:] + return r +} + +func (t *SyncTask) processInsertBlobs() { for fieldID, blob := range t.binlogBlobs { - k := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, logidx) + k := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, t.nextID()) key := path.Join(t.chunkManager.RootPath(), common.SegmentInsertLogPath, k) t.segmentData[key] = blob.GetValue() t.appendBinlog(fieldID, &datapb.Binlog{ @@ -231,38 +239,25 @@ func (t *SyncTask) processInsertBlobs() error { LogPath: key, LogSize: t.binlogMemsize[fieldID], }) - logidx++ } - return nil } -func (t *SyncTask) processStatsBlob() error { +func (t *SyncTask) processStatsBlob() { if t.batchStatsBlob != nil { - logidx, err := t.allocator.AllocOne() - if err != nil { - return err - } - t.convertBlob2StatsBinlog(t.batchStatsBlob, t.pkField.GetFieldID(), logidx, t.batchSize) + t.convertBlob2StatsBinlog(t.batchStatsBlob, t.pkField.GetFieldID(), t.nextID(), t.batchSize) } if t.mergedStatsBlob != nil { totalRowNum := t.segment.NumOfRows() t.convertBlob2StatsBinlog(t.mergedStatsBlob, t.pkField.GetFieldID(), int64(storage.CompoundStatsType), totalRowNum) } - return nil } -func (t *SyncTask) processDeltaBlob() error { +func (t *SyncTask) processDeltaBlob() { if t.deltaBlob != nil { - logID, err := t.allocator.AllocOne() - if err != nil { - log.Error("failed to alloc ID", zap.Error(err)) - return err - } - value := t.deltaBlob.GetValue() data := &datapb.Binlog{} - blobKey := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, logID) + blobKey := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, t.nextID()) blobPath := path.Join(t.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey) t.segmentData[blobPath] = value @@ -273,7 +268,6 @@ func (t *SyncTask) processDeltaBlob() error { data.EntriesNum = t.deltaRowCount t.appendDeltalog(data) } - return nil } func (t *SyncTask) convertBlob2StatsBinlog(blob *storage.Blob, fieldID, logID int64, rowNum int64) { diff --git a/internal/datanode/syncmgr/task_test.go b/internal/datanode/syncmgr/task_test.go index dc73ee708e..2b06d62c71 100644 --- a/internal/datanode/syncmgr/task_test.go +++ b/internal/datanode/syncmgr/task_test.go @@ -158,6 +158,7 @@ func (s *SyncTaskSuite) getSuiteSyncTask() *SyncTask { WithChunkManager(s.chunkManager). WithAllocator(s.allocator). WithMetaCache(s.metacache) + task.binlogMemsize = map[int64]int64{0: 1, 1: 1, 100: 100} return task } @@ -345,6 +346,17 @@ func (s *SyncTaskSuite) TestRunError() { s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.Run("allocate_id_fail", func() { + mockAllocator := allocator.NewMockAllocator(s.T()) + mockAllocator.EXPECT().Alloc(mock.Anything).Return(0, 0, errors.New("mocked")) + + task := s.getSuiteSyncTask() + task.allocator = mockAllocator + + err := task.Run() + s.Error(err) + }) + s.Run("metawrite_fail", func() { s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(errors.New("mocked")) @@ -382,6 +394,21 @@ func (s *SyncTaskSuite) TestRunError() { }) } +func (s *SyncTaskSuite) TestNextID() { + task := s.getSuiteSyncTask() + + task.ids = []int64{0} + s.Run("normal_next", func() { + id := task.nextID() + s.EqualValues(0, id) + }) + s.Run("id_exhausted", func() { + s.Panics(func() { + task.nextID() + }) + }) +} + func TestSyncTask(t *testing.T) { suite.Run(t, new(SyncTaskSuite)) }