diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index d27cb58373..4a7f29797e 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -45,6 +45,8 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metautil" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/milvus-io/milvus/pkg/v2/util/retry" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -110,12 +112,18 @@ func NewSyncTask(ctx context.Context, syncPack.WithBM25Stats(bm25Stats) } + writeRetryAttempts := paramtable.Get().DataNodeCfg.ImportMaxWriteRetryAttempts.GetAsUint() + retryOpts := []retry.Option{ + retry.Attempts(writeRetryAttempts), // default retry always + retry.MaxSleepTime(10 * time.Second), + } task := syncmgr.NewSyncTask(). WithAllocator(allocator). WithMetaCache(metaCache). WithSchema(metaCache.GetSchema(0)). // TODO specify import schema if needed WithSyncPack(syncPack). - WithStorageConfig(storageConfig) + WithStorageConfig(storageConfig). + WithWriteRetryOptions(retryOpts...) return task, nil } diff --git a/internal/flushcommon/writebuffer/write_buffer.go b/internal/flushcommon/writebuffer/write_buffer.go index 873e6c235e..1521a05f4a 100644 --- a/internal/flushcommon/writebuffer/write_buffer.go +++ b/internal/flushcommon/writebuffer/write_buffer.go @@ -5,6 +5,7 @@ import ( "fmt" "path" "sync" + "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -29,6 +30,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metautil" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/milvus-io/milvus/pkg/v2/util/retry" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -618,7 +620,8 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (sy WithMetaWriter(wb.metaWriter). WithMetaCache(wb.metaCache). WithSchema(schema). - WithSyncPack(pack) + WithSyncPack(pack). + WithWriteRetryOptions(retry.AttemptAlways(), retry.MaxSleepTime(10*time.Second)) return task, nil } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 24aa99a2cb..7411dfc25a 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -6006,6 +6006,7 @@ type dataNodeConfig struct { ImportBaseBufferSize ParamItem `refreshable:"true"` ImportDeleteBufferSize ParamItem `refreshable:"true"` ImportMemoryLimitPercentage ParamItem `refreshable:"true"` + ImportMaxWriteRetryAttempts ParamItem `refreshable:"true"` // Compaction L0BatchMemoryRatio ParamItem `refreshable:"true"` @@ -6359,6 +6360,14 @@ if this parameter <= 0, will set it as 10`, } p.ImportMemoryLimitPercentage.Init(base.mgr) + p.ImportMaxWriteRetryAttempts = ParamItem{ + Key: "dataNode.import.maxWriteRetryAttempts", + Version: "2.6.9", + Doc: "The maximum number of write retry attempts. 0 means unlimited.", + DefaultValue: "0", + } + p.ImportMaxWriteRetryAttempts.Init(base.mgr) + p.L0BatchMemoryRatio = ParamItem{ Key: "dataNode.compaction.levelZeroBatchMemoryRatio", Version: "2.4.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index a535cfb698..4083de3bb7 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -662,6 +662,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 16*1024*1024, Params.ImportBaseBufferSize.GetAsInt()) assert.Equal(t, 16*1024*1024, Params.ImportDeleteBufferSize.GetAsInt()) assert.Equal(t, 10.0, Params.ImportMemoryLimitPercentage.GetAsFloat()) + assert.Equal(t, 0, Params.ImportMaxWriteRetryAttempts.GetAsInt()) params.Save("datanode.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) assert.Equal(t, 16, Params.SlotCap.GetAsInt()) diff --git a/pkg/util/retry/retry.go b/pkg/util/retry/retry.go index 0c90e4d849..a80094f151 100644 --- a/pkg/util/retry/retry.go +++ b/pkg/util/retry/retry.go @@ -141,7 +141,7 @@ func Handle(ctx context.Context, fn func() (bool, error), opts ...Option) error } var lastErr error - for i := uint(0); i < c.attempts; i++ { + for i := uint(0); c.attempts == 0 || i < c.attempts; i++ { if shouldRetry, err := fn(); err != nil { if i%4 == 0 { log.Warn("retry func failed",