From d24cd6200bb6722d13c78a5000658028e2657411 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Fri, 12 Dec 2025 18:27:15 +0800 Subject: [PATCH] fix: always retry when writing binlog (#46309) issue: #46205 --------- Signed-off-by: chyezh --- .../datanode/importv2/task_l0_import_test.go | 2 +- internal/flushcommon/syncmgr/pack_writer.go | 70 +++++-------------- .../flushcommon/syncmgr/pack_writer_test.go | 28 -------- .../flushcommon/syncmgr/pack_writer_v2.go | 53 ++++++++++---- internal/flushcommon/syncmgr/task_test.go | 28 +------- 5 files changed, 60 insertions(+), 121 deletions(-) diff --git a/internal/datanode/importv2/task_l0_import_test.go b/internal/datanode/importv2/task_l0_import_test.go index 2bc4142c17..a0ddc4e45e 100644 --- a/internal/datanode/importv2/task_l0_import_test.go +++ b/internal/datanode/importv2/task_l0_import_test.go @@ -134,7 +134,7 @@ func (s *L0ImportSuite) TestL0Import() { s.syncMgr.EXPECT().SyncDataWithChunkManager(mock.Anything, mock.Anything, mock.Anything). RunAndReturn(func(ctx context.Context, task syncmgr.Task, cm storage.ChunkManager, callbacks ...func(error) error) (*conc.Future[struct{}], error) { alloc := allocator.NewMockAllocator(s.T()) - alloc.EXPECT().Alloc(mock.Anything).Return(1, int64(s.delCnt)+1, nil) + alloc.EXPECT().AllocOne().Return(1, nil) task.(*syncmgr.SyncTask).WithAllocator(alloc) s.cm.(*mocks.ChunkManager).EXPECT().RootPath().Return("mock-rootpath") diff --git a/internal/flushcommon/syncmgr/pack_writer.go b/internal/flushcommon/syncmgr/pack_writer.go index e70d40972b..5db4d4cba8 100644 --- a/internal/flushcommon/syncmgr/pack_writer.go +++ b/internal/flushcommon/syncmgr/pack_writer.go @@ -24,7 +24,6 @@ import ( "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/array" "github.com/apache/arrow/go/v17/arrow/memory" - "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -52,7 +51,6 @@ type BulkPackWriter struct { writeRetryOpts []retry.Option // prefetched log ids - ids []int64 sizeWritten int64 } @@ -78,12 +76,6 @@ func (bw *BulkPackWriter) Write(ctx context.Context, pack *SyncPack) ( size int64, err error, ) { - err = bw.prefetchIDs(pack) - if err != nil { - log.Warn("failed allocate ids for sync task", zap.Error(err)) - return - } - if inserts, err = bw.writeInserts(ctx, pack); err != nil { log.Error("failed to write insert data", zap.Error(err)) return @@ -106,45 +98,6 @@ func (bw *BulkPackWriter) Write(ctx context.Context, pack *SyncPack) ( return } -// prefetchIDs pre-allcates ids depending on the number of blobs current task contains. -func (bw *BulkPackWriter) prefetchIDs(pack *SyncPack) error { - totalIDCount := 0 - if len(pack.insertData) > 0 { - totalIDCount += len(pack.insertData[0].Data) * 2 // binlogs and statslogs - } - if pack.isFlush { - totalIDCount++ // merged stats log - } - if pack.deltaData != nil { - totalIDCount++ - } - if pack.bm25Stats != nil { - totalIDCount += len(pack.bm25Stats) - if pack.isFlush { - totalIDCount++ // merged bm25 stats - } - } - - if totalIDCount == 0 { - return nil - } - start, _, err := bw.allocator.Alloc(uint32(totalIDCount)) - if err != nil { - return err - } - bw.ids = lo.RangeFrom(start, totalIDCount) - return nil -} - -func (bw *BulkPackWriter) nextID() int64 { - if len(bw.ids) == 0 { - panic("pre-fetched ids exhausted") - } - r := bw.ids[0] - bw.ids = bw.ids[1:] - return r -} - func (bw *BulkPackWriter) writeLog(ctx context.Context, blob *storage.Blob, root, p string, pack *SyncPack, ) (*datapb.Binlog, error) { @@ -184,7 +137,11 @@ func (bw *BulkPackWriter) writeInserts(ctx context.Context, pack *SyncPack) (map logs := make(map[int64]*datapb.FieldBinlog) for fieldID, blob := range binlogBlobs { - k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, fieldID, bw.nextID()) + id, err := bw.allocator.AllocOne() + if err != nil { + return nil, err + } + k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, fieldID, id) binlog, err := bw.writeLog(ctx, blob, common.SegmentInsertLogPath, k, pack) if err != nil { return nil, err @@ -217,7 +174,11 @@ func (bw *BulkPackWriter) writeStats(ctx context.Context, pack *SyncPack) (map[i pkFieldID := serializer.pkField.GetFieldID() binlogs := make([]*datapb.Binlog, 0) - k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, pkFieldID, bw.nextID()) + id, err := bw.allocator.AllocOne() + if err != nil { + return nil, err + } + k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, pkFieldID, id) if binlog, err := bw.writeLog(ctx, batchStatsBlob, common.SegmentStatslogPath, k, pack); err != nil { return nil, err } else { @@ -264,7 +225,11 @@ func (bw *BulkPackWriter) writeBM25Stasts(ctx context.Context, pack *SyncPack) ( logs := make(map[int64]*datapb.FieldBinlog) for fieldID, blob := range bm25Blobs { - k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, fieldID, bw.nextID()) + id, err := bw.allocator.AllocOne() + if err != nil { + return nil, err + } + k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, fieldID, id) binlog, err := bw.writeLog(ctx, blob, common.SegmentBm25LogPath, k, pack) if err != nil { return nil, err @@ -323,7 +288,10 @@ func (bw *BulkPackWriter) writeDelta(ctx context.Context, pack *SyncPack) (*data return nil, fmt.Errorf("primary key field not found") } - logID := bw.nextID() + logID, err := bw.allocator.AllocOne() + if err != nil { + return nil, err + } k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, logID) path := path.Join(bw.chunkManager.RootPath(), common.SegmentDeltaLogPath, k) writer, err := storage.NewDeltalogWriter( diff --git a/internal/flushcommon/syncmgr/pack_writer_test.go b/internal/flushcommon/syncmgr/pack_writer_test.go index 1935456e39..c381881719 100644 --- a/internal/flushcommon/syncmgr/pack_writer_test.go +++ b/internal/flushcommon/syncmgr/pack_writer_test.go @@ -22,7 +22,6 @@ import ( "reflect" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -36,33 +35,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) -func TestNextID(t *testing.T) { - al := allocator.NewMockGIDAllocator() - i := int64(0) - al.AllocF = func(count uint32) (int64, int64, error) { - rt := i - i += int64(count) - return rt, int64(count), nil - } - al.AllocOneF = func() (allocator.UniqueID, error) { - rt := i - i++ - return rt, nil - } - bw := NewBulkPackWriter(nil, nil, nil, al) - bw.prefetchIDs(new(SyncPack).WithFlush()) - - t.Run("normal_next", func(t *testing.T) { - id := bw.nextID() - assert.Equal(t, int64(0), id) - }) - t.Run("id_exhausted", func(t *testing.T) { - assert.Panics(t, func() { - bw.nextID() - }) - }) -} - func TestBulkPackWriter_Write(t *testing.T) { paramtable.Get().Init(paramtable.NewBaseTable()) diff --git a/internal/flushcommon/syncmgr/pack_writer_v2.go b/internal/flushcommon/syncmgr/pack_writer_v2.go index 2bba7eb242..d7fb9e0eab 100644 --- a/internal/flushcommon/syncmgr/pack_writer_v2.go +++ b/internal/flushcommon/syncmgr/pack_writer_v2.go @@ -82,12 +82,6 @@ func (bw *BulkPackWriterV2) Write(ctx context.Context, pack *SyncPack) ( size int64, err error, ) { - err = bw.prefetchIDs(pack) - if err != nil { - log.Warn("failed allocate ids for sync task", zap.Error(err)) - return - } - if inserts, manifest, err = bw.writeInserts(ctx, pack); err != nil { log.Error("failed to write insert data", zap.Error(err)) return @@ -132,15 +126,11 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m return make(map[int64]*datapb.FieldBinlog), "", nil } - columnGroups := bw.columnGroups - rec, err := bw.serializeBinlog(ctx, pack) if err != nil { return nil, "", err } - logs := make(map[int64]*datapb.FieldBinlog) - tsArray := rec.Column(common.TimeStampField).(*array.Int64) rows := rec.Len() var tsFrom uint64 = math.MaxUint64 @@ -154,9 +144,6 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m tsTo = ts } } - - bucketName := bw.getBucketName() - var pluginContextPtr *indexcgopb.StoragePluginContext if hookutil.IsClusterEncyptionEnabled() { ez := hookutil.GetEzByCollProperties(bw.schema.GetProperties(), pack.collectionID) @@ -172,9 +159,43 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m } } } + var logs map[int64]*datapb.FieldBinlog + var manifestPath string + if err := retry.Do(ctx, func() error { + var err error + logs, manifestPath, err = bw.writeInsertsIntoStorage(ctx, pluginContextPtr, pack, rec, tsFrom, tsTo) + if err != nil { + log.Warn("failed to write inserts into storage", + zap.Int64("collectionID", pack.collectionID), + zap.Int64("segmentID", pack.segmentID), + zap.Error(err)) + return err + } + return nil + }, bw.writeRetryOpts...); err != nil { + return nil, "", err + } + return logs, manifestPath, nil +} + +func (bw *BulkPackWriterV2) writeInsertsIntoStorage(_ context.Context, + pluginContextPtr *indexcgopb.StoragePluginContext, + pack *SyncPack, + rec storage.Record, + tsFrom typeutil.Timestamp, + tsTo typeutil.Timestamp, +) (map[int64]*datapb.FieldBinlog, string, error) { + logs := make(map[int64]*datapb.FieldBinlog) + columnGroups := bw.columnGroups + bucketName := bw.getBucketName() + + var err error doWrite := func(w storage.RecordWriter) error { if err = w.Write(rec); err != nil { + if closeErr := w.Close(); closeErr != nil { + log.Error("failed to close writer after write failed", zap.Error(closeErr)) + } return err } // close first the get stats & output @@ -213,7 +234,11 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m } else { paths := make([]string, 0) for _, columnGroup := range columnGroups { - path := metautil.BuildInsertLogPath(bw.getRootPath(), pack.collectionID, pack.partitionID, pack.segmentID, columnGroup.GroupID, bw.nextID()) + id, err := bw.allocator.AllocOne() + if err != nil { + return nil, "", err + } + path := metautil.BuildInsertLogPath(bw.getRootPath(), pack.collectionID, pack.partitionID, pack.segmentID, columnGroup.GroupID, id) paths = append(paths, path) } w, err := storage.NewPackedRecordWriter(bucketName, paths, bw.schema, bw.bufferSize, bw.multiPartUploadSize, columnGroups, bw.storageConfig, pluginContextPtr) diff --git a/internal/flushcommon/syncmgr/task_test.go b/internal/flushcommon/syncmgr/task_test.go index 4f5c736e3b..4865dd22f7 100644 --- a/internal/flushcommon/syncmgr/task_test.go +++ b/internal/flushcommon/syncmgr/task_test.go @@ -348,16 +348,6 @@ func (s *SyncTaskSuite) TestRunError() { s.metacache.EXPECT().Collection().Return(s.collectionID).Maybe() s.metacache.EXPECT().GetSchema(mock.Anything).Return(s.schema).Maybe() - s.Run("allocate_id_fail", func() { - mockAllocator := allocator.NewMockAllocator(s.T()) - mockAllocator.EXPECT().Alloc(mock.Anything).Return(0, 0, errors.New("mocked")) - - task := s.getSuiteSyncTask(new(SyncPack).WithFlush()).WithAllocator(mockAllocator) - - err := task.Run(ctx) - s.Error(err) - }) - s.Run("metawrite_fail", func() { s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(errors.New("mocked")) @@ -381,7 +371,7 @@ func (s *SyncTaskSuite) TestRunError() { s.chunkManager.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(retry.Unrecoverable(errors.New("mocked"))) task := s.getSuiteSyncTask(new(SyncPack).WithInsertData([]*storage.InsertData{s.getInsertBuffer()})). WithFailureCallback(handler). - WithWriteRetryOptions(retry.Attempts(1)) + WithWriteRetryOptions(retry.AttemptAlways(), retry.MaxSleepTime(10*time.Second)) err := task.Run(ctx) @@ -390,22 +380,6 @@ func (s *SyncTaskSuite) TestRunError() { }) } -func (s *SyncTaskSuite) TestRunErrorWithStorageV2() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - s.Run("storage v2 allocate_id_fail", func() { - mockAllocator := allocator.NewMockAllocator(s.T()) - mockAllocator.EXPECT().Alloc(mock.Anything).Return(0, 0, errors.New("mocked")) - segV2 := metacache.NewSegmentInfo(&datapb.SegmentInfo{Level: datapb.SegmentLevel_L0, StorageVersion: storage.StorageV2}, pkoracle.NewBloomFilterSet(), nil) - s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(segV2, true) - - task := s.getSuiteSyncTask(new(SyncPack).WithFlush()).WithAllocator(mockAllocator) - - err := task.Run(ctx) - s.Error(err) - }) -} - func (s *SyncTaskSuite) TestSyncTask_MarshalJSON() { t := &SyncTask{ segmentID: 12345,