milvus/pkg/objectstorage/options.go
yihao.dai 5e525eb3bf
enhance: Retry reads from object storage on rate limit error (#46455)
This PR improves the robustness of object storage operations by retrying
both explicit throttling errors (e.g. HTTP 429, SlowDown, ServerBusy).
These errors commonly occur under high concurrency and are typically
recoverable with bounded retries.

issue: https://github.com/milvus-io/milvus/issues/44772

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Configurable retry support for reads from object storage and improved
mapping of transient/rate-limit errors.
* Added a retryable reader wrapper used by CSV/JSON/Parquet/Numpy import
paths.

* **Configuration**
  * New parameter to control storage read retry attempts.

* **Tests**
* Expanded unit tests covering error mapping and retry behaviors across
storage backends.
* Standardized mock readers and test initialization to simplify test
setups.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
2025-12-23 11:03:18 +08:00

124 lines
2.4 KiB
Go

package objectstorage
import "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
// Config for setting params used by chunk manager client.
type Config struct {
Address string
BucketName string
AccessKeyID string
SecretAccessKeyID string
UseSSL bool
SslCACert string
CreateBucket bool
RootPath string
UseIAM bool
CloudProvider string
IAMEndpoint string
UseVirtualHost bool
Region string
RequestTimeoutMs int64
GcpCredentialJSON string
GcpNativeWithoutAuth bool // used for Unit Testing
ReadRetryAttempts uint
}
func NewDefaultConfig() *Config {
return &Config{
ReadRetryAttempts: paramtable.Get().CommonCfg.StorageReadRetryAttempts.GetAsUint(),
}
}
// Option is used to Config the retry function.
type Option func(*Config)
func Address(addr string) Option {
return func(c *Config) {
c.Address = addr
}
}
func BucketName(bucketName string) Option {
return func(c *Config) {
c.BucketName = bucketName
}
}
func AccessKeyID(accessKeyID string) Option {
return func(c *Config) {
c.AccessKeyID = accessKeyID
}
}
func SecretAccessKeyID(secretAccessKeyID string) Option {
return func(c *Config) {
c.SecretAccessKeyID = secretAccessKeyID
}
}
func UseSSL(useSSL bool) Option {
return func(c *Config) {
c.UseSSL = useSSL
}
}
func SslCACert(sslCACert string) Option {
return func(c *Config) {
c.SslCACert = sslCACert
}
}
func CreateBucket(createBucket bool) Option {
return func(c *Config) {
c.CreateBucket = createBucket
}
}
func RootPath(rootPath string) Option {
return func(c *Config) {
c.RootPath = rootPath
}
}
func UseIAM(useIAM bool) Option {
return func(c *Config) {
c.UseIAM = useIAM
}
}
func CloudProvider(cloudProvider string) Option {
return func(c *Config) {
c.CloudProvider = cloudProvider
}
}
func IAMEndpoint(iamEndpoint string) Option {
return func(c *Config) {
c.IAMEndpoint = iamEndpoint
}
}
func UseVirtualHost(useVirtualHost bool) Option {
return func(c *Config) {
c.UseVirtualHost = useVirtualHost
}
}
func Region(region string) Option {
return func(c *Config) {
c.Region = region
}
}
func RequestTimeout(requestTimeoutMs int64) Option {
return func(c *Config) {
c.RequestTimeoutMs = requestTimeoutMs
}
}
func GcpCredentialJSON(gcpCredentialJSON string) Option {
return func(c *Config) {
c.GcpCredentialJSON = gcpCredentialJSON
}
}