fix: always retry when writing binlog (#46309)

issue: #46205

---------

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-12-12 18:27:15 +08:00 committed by GitHub
parent 76aa00a4c6
commit d24cd6200b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 60 additions and 121 deletions

View File

@ -134,7 +134,7 @@ func (s *L0ImportSuite) TestL0Import() {
s.syncMgr.EXPECT().SyncDataWithChunkManager(mock.Anything, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, task syncmgr.Task, cm storage.ChunkManager, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().Alloc(mock.Anything).Return(1, int64(s.delCnt)+1, nil)
alloc.EXPECT().AllocOne().Return(1, nil)
task.(*syncmgr.SyncTask).WithAllocator(alloc)
s.cm.(*mocks.ChunkManager).EXPECT().RootPath().Return("mock-rootpath")

View File

@ -24,7 +24,6 @@ import (
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/apache/arrow/go/v17/arrow/memory"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -52,7 +51,6 @@ type BulkPackWriter struct {
writeRetryOpts []retry.Option
// prefetched log ids
ids []int64
sizeWritten int64
}
@ -78,12 +76,6 @@ func (bw *BulkPackWriter) Write(ctx context.Context, pack *SyncPack) (
size int64,
err error,
) {
err = bw.prefetchIDs(pack)
if err != nil {
log.Warn("failed allocate ids for sync task", zap.Error(err))
return
}
if inserts, err = bw.writeInserts(ctx, pack); err != nil {
log.Error("failed to write insert data", zap.Error(err))
return
@ -106,45 +98,6 @@ func (bw *BulkPackWriter) Write(ctx context.Context, pack *SyncPack) (
return
}
// prefetchIDs pre-allcates ids depending on the number of blobs current task contains.
func (bw *BulkPackWriter) prefetchIDs(pack *SyncPack) error {
totalIDCount := 0
if len(pack.insertData) > 0 {
totalIDCount += len(pack.insertData[0].Data) * 2 // binlogs and statslogs
}
if pack.isFlush {
totalIDCount++ // merged stats log
}
if pack.deltaData != nil {
totalIDCount++
}
if pack.bm25Stats != nil {
totalIDCount += len(pack.bm25Stats)
if pack.isFlush {
totalIDCount++ // merged bm25 stats
}
}
if totalIDCount == 0 {
return nil
}
start, _, err := bw.allocator.Alloc(uint32(totalIDCount))
if err != nil {
return err
}
bw.ids = lo.RangeFrom(start, totalIDCount)
return nil
}
func (bw *BulkPackWriter) nextID() int64 {
if len(bw.ids) == 0 {
panic("pre-fetched ids exhausted")
}
r := bw.ids[0]
bw.ids = bw.ids[1:]
return r
}
func (bw *BulkPackWriter) writeLog(ctx context.Context, blob *storage.Blob,
root, p string, pack *SyncPack,
) (*datapb.Binlog, error) {
@ -184,7 +137,11 @@ func (bw *BulkPackWriter) writeInserts(ctx context.Context, pack *SyncPack) (map
logs := make(map[int64]*datapb.FieldBinlog)
for fieldID, blob := range binlogBlobs {
k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, fieldID, bw.nextID())
id, err := bw.allocator.AllocOne()
if err != nil {
return nil, err
}
k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, fieldID, id)
binlog, err := bw.writeLog(ctx, blob, common.SegmentInsertLogPath, k, pack)
if err != nil {
return nil, err
@ -217,7 +174,11 @@ func (bw *BulkPackWriter) writeStats(ctx context.Context, pack *SyncPack) (map[i
pkFieldID := serializer.pkField.GetFieldID()
binlogs := make([]*datapb.Binlog, 0)
k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, pkFieldID, bw.nextID())
id, err := bw.allocator.AllocOne()
if err != nil {
return nil, err
}
k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, pkFieldID, id)
if binlog, err := bw.writeLog(ctx, batchStatsBlob, common.SegmentStatslogPath, k, pack); err != nil {
return nil, err
} else {
@ -264,7 +225,11 @@ func (bw *BulkPackWriter) writeBM25Stasts(ctx context.Context, pack *SyncPack) (
logs := make(map[int64]*datapb.FieldBinlog)
for fieldID, blob := range bm25Blobs {
k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, fieldID, bw.nextID())
id, err := bw.allocator.AllocOne()
if err != nil {
return nil, err
}
k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, fieldID, id)
binlog, err := bw.writeLog(ctx, blob, common.SegmentBm25LogPath, k, pack)
if err != nil {
return nil, err
@ -323,7 +288,10 @@ func (bw *BulkPackWriter) writeDelta(ctx context.Context, pack *SyncPack) (*data
return nil, fmt.Errorf("primary key field not found")
}
logID := bw.nextID()
logID, err := bw.allocator.AllocOne()
if err != nil {
return nil, err
}
k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, logID)
path := path.Join(bw.chunkManager.RootPath(), common.SegmentDeltaLogPath, k)
writer, err := storage.NewDeltalogWriter(

View File

@ -22,7 +22,6 @@ import (
"reflect"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -36,33 +35,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
func TestNextID(t *testing.T) {
al := allocator.NewMockGIDAllocator()
i := int64(0)
al.AllocF = func(count uint32) (int64, int64, error) {
rt := i
i += int64(count)
return rt, int64(count), nil
}
al.AllocOneF = func() (allocator.UniqueID, error) {
rt := i
i++
return rt, nil
}
bw := NewBulkPackWriter(nil, nil, nil, al)
bw.prefetchIDs(new(SyncPack).WithFlush())
t.Run("normal_next", func(t *testing.T) {
id := bw.nextID()
assert.Equal(t, int64(0), id)
})
t.Run("id_exhausted", func(t *testing.T) {
assert.Panics(t, func() {
bw.nextID()
})
})
}
func TestBulkPackWriter_Write(t *testing.T) {
paramtable.Get().Init(paramtable.NewBaseTable())

View File

@ -82,12 +82,6 @@ func (bw *BulkPackWriterV2) Write(ctx context.Context, pack *SyncPack) (
size int64,
err error,
) {
err = bw.prefetchIDs(pack)
if err != nil {
log.Warn("failed allocate ids for sync task", zap.Error(err))
return
}
if inserts, manifest, err = bw.writeInserts(ctx, pack); err != nil {
log.Error("failed to write insert data", zap.Error(err))
return
@ -132,15 +126,11 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m
return make(map[int64]*datapb.FieldBinlog), "", nil
}
columnGroups := bw.columnGroups
rec, err := bw.serializeBinlog(ctx, pack)
if err != nil {
return nil, "", err
}
logs := make(map[int64]*datapb.FieldBinlog)
tsArray := rec.Column(common.TimeStampField).(*array.Int64)
rows := rec.Len()
var tsFrom uint64 = math.MaxUint64
@ -154,9 +144,6 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m
tsTo = ts
}
}
bucketName := bw.getBucketName()
var pluginContextPtr *indexcgopb.StoragePluginContext
if hookutil.IsClusterEncyptionEnabled() {
ez := hookutil.GetEzByCollProperties(bw.schema.GetProperties(), pack.collectionID)
@ -172,9 +159,43 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m
}
}
}
var logs map[int64]*datapb.FieldBinlog
var manifestPath string
if err := retry.Do(ctx, func() error {
var err error
logs, manifestPath, err = bw.writeInsertsIntoStorage(ctx, pluginContextPtr, pack, rec, tsFrom, tsTo)
if err != nil {
log.Warn("failed to write inserts into storage",
zap.Int64("collectionID", pack.collectionID),
zap.Int64("segmentID", pack.segmentID),
zap.Error(err))
return err
}
return nil
}, bw.writeRetryOpts...); err != nil {
return nil, "", err
}
return logs, manifestPath, nil
}
func (bw *BulkPackWriterV2) writeInsertsIntoStorage(_ context.Context,
pluginContextPtr *indexcgopb.StoragePluginContext,
pack *SyncPack,
rec storage.Record,
tsFrom typeutil.Timestamp,
tsTo typeutil.Timestamp,
) (map[int64]*datapb.FieldBinlog, string, error) {
logs := make(map[int64]*datapb.FieldBinlog)
columnGroups := bw.columnGroups
bucketName := bw.getBucketName()
var err error
doWrite := func(w storage.RecordWriter) error {
if err = w.Write(rec); err != nil {
if closeErr := w.Close(); closeErr != nil {
log.Error("failed to close writer after write failed", zap.Error(closeErr))
}
return err
}
// close first the get stats & output
@ -213,7 +234,11 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m
} else {
paths := make([]string, 0)
for _, columnGroup := range columnGroups {
path := metautil.BuildInsertLogPath(bw.getRootPath(), pack.collectionID, pack.partitionID, pack.segmentID, columnGroup.GroupID, bw.nextID())
id, err := bw.allocator.AllocOne()
if err != nil {
return nil, "", err
}
path := metautil.BuildInsertLogPath(bw.getRootPath(), pack.collectionID, pack.partitionID, pack.segmentID, columnGroup.GroupID, id)
paths = append(paths, path)
}
w, err := storage.NewPackedRecordWriter(bucketName, paths, bw.schema, bw.bufferSize, bw.multiPartUploadSize, columnGroups, bw.storageConfig, pluginContextPtr)

View File

@ -348,16 +348,6 @@ func (s *SyncTaskSuite) TestRunError() {
s.metacache.EXPECT().Collection().Return(s.collectionID).Maybe()
s.metacache.EXPECT().GetSchema(mock.Anything).Return(s.schema).Maybe()
s.Run("allocate_id_fail", func() {
mockAllocator := allocator.NewMockAllocator(s.T())
mockAllocator.EXPECT().Alloc(mock.Anything).Return(0, 0, errors.New("mocked"))
task := s.getSuiteSyncTask(new(SyncPack).WithFlush()).WithAllocator(mockAllocator)
err := task.Run(ctx)
s.Error(err)
})
s.Run("metawrite_fail", func() {
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(errors.New("mocked"))
@ -381,7 +371,7 @@ func (s *SyncTaskSuite) TestRunError() {
s.chunkManager.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(retry.Unrecoverable(errors.New("mocked")))
task := s.getSuiteSyncTask(new(SyncPack).WithInsertData([]*storage.InsertData{s.getInsertBuffer()})).
WithFailureCallback(handler).
WithWriteRetryOptions(retry.Attempts(1))
WithWriteRetryOptions(retry.AttemptAlways(), retry.MaxSleepTime(10*time.Second))
err := task.Run(ctx)
@ -390,22 +380,6 @@ func (s *SyncTaskSuite) TestRunError() {
})
}
func (s *SyncTaskSuite) TestRunErrorWithStorageV2() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.Run("storage v2 allocate_id_fail", func() {
mockAllocator := allocator.NewMockAllocator(s.T())
mockAllocator.EXPECT().Alloc(mock.Anything).Return(0, 0, errors.New("mocked"))
segV2 := metacache.NewSegmentInfo(&datapb.SegmentInfo{Level: datapb.SegmentLevel_L0, StorageVersion: storage.StorageV2}, pkoracle.NewBloomFilterSet(), nil)
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(segV2, true)
task := s.getSuiteSyncTask(new(SyncPack).WithFlush()).WithAllocator(mockAllocator)
err := task.Run(ctx)
s.Error(err)
})
}
func (s *SyncTaskSuite) TestSyncTask_MarshalJSON() {
t := &SyncTask{
segmentID: 12345,