diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 070b6b2959..11f0320983 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -187,24 +187,24 @@ woodpecker: queueSize: 10000 # The size of the queue for pending messages to be sent of each log. maxRetries: 3 # Maximum number of retries for segment append operations. segmentRollingPolicy: - maxSize: 2000000000 # Maximum entries count of a segment, default is 2GB - maxInterval: 600 # Maximum interval between two segments in seconds, default is 10 minutes. + maxSize: 2GB # Maximum entries count of a segment, default is 2GB + maxInterval: 10m # Maximum interval between two segments, default is 10 minutes. auditor: - maxInterval: 10 # Maximum interval between two auditing operations in seconds, default is 10 seconds. + maxInterval: 10s # Maximum interval between two auditing operations, default is 10 seconds. logstore: logFileSyncPolicy: - maxInterval: 1000 # Maximum interval between two sync operations in milliseconds. - maxBytes: 64000000 # Maximum size of write buffer in bytes. + maxInterval: 200ms # Maximum interval between two sync operations, default is 200 milliseconds. + maxBytes: 64M # Maximum size of write buffer in bytes. maxEntries: 100000 # Maximum entries number of write buffer. maxFlushRetries: 5 # Maximum size of write buffer in bytes. - retryInterval: 1000 # Maximum interval between two retries in milliseconds. - maxFlushSize: 8000000 # Maximum size of a fragment in bytes to flush, default is 8M. + retryInterval: 1000ms # Maximum interval between two retries. default is 1000 milliseconds. + maxFlushSize: 8M # Maximum size of a fragment in bytes to flush, default is 8M. maxFlushThreads: 4 # Maximum number of threads to flush data fragmentManager: - maxBytes: 1000000000 # Maximum size of fragment cached data in bytes. - maxInterval: 1000 # Maximum interval between two fragment evicts in milliseconds. + maxBytes: 512M # Maximum size of fragment cached data in bytes. + maxInterval: 1s # Maximum interval between two fragment evicts. default is 1 second storage: - type: minio # The Type of the storage provider. Valid values: [default, minio, local, service], default is minio. + type: minio # The Type of the storage provider. Valid values: [minio, local] rootPath: /var/lib/milvus/woodpecker # The root path of the storage provider. # Related configuration of pulsar, used to manage Milvus logs of recent mutation operations, output streaming log, and provide log publish-subscribe services. diff --git a/go.mod b/go.mod index 45bfbc39e2..11777790de 100644 --- a/go.mod +++ b/go.mod @@ -243,7 +243,7 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - github.com/zilliztech/woodpecker v0.0.0-20250514005855-9467e66ea2bc // indirect + github.com/zilliztech/woodpecker v0.0.0-20250515090344-a7889b2d072c // indirect go.etcd.io/bbolt v1.3.6 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect go.etcd.io/etcd/client/v2 v2.305.5 // indirect diff --git a/go.sum b/go.sum index 5f43c62495..ece88b5e89 100644 --- a/go.sum +++ b/go.sum @@ -1066,8 +1066,8 @@ github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= -github.com/zilliztech/woodpecker v0.0.0-20250514005855-9467e66ea2bc h1:9KEOCnDt//GAimP3Z3Qh08VwPY7H9AOOjHx9C9ckMSQ= -github.com/zilliztech/woodpecker v0.0.0-20250514005855-9467e66ea2bc/go.mod h1:MLt2hsMXd5bVOykwZyWXYHsy9kN4C2gQEaCrID5rM1w= +github.com/zilliztech/woodpecker v0.0.0-20250515090344-a7889b2d072c h1:OBTsouB6tIUMFQhu4wPS5/HQkgMdaAmS0ZOUoToltXI= +github.com/zilliztech/woodpecker v0.0.0-20250515090344-a7889b2d072c/go.mod h1:MLt2hsMXd5bVOykwZyWXYHsy9kN4C2gQEaCrID5rM1w= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= diff --git a/pkg/go.mod b/pkg/go.mod index bc5acb2fc7..873b8400d6 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -39,7 +39,7 @@ require ( github.com/tikv/client-go/v2 v2.0.4 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/x448/float16 v0.8.4 - github.com/zilliztech/woodpecker v0.0.0-20250514005855-9467e66ea2bc + github.com/zilliztech/woodpecker v0.0.0-20250515090344-a7889b2d072c go.etcd.io/etcd/api/v3 v3.5.5 go.etcd.io/etcd/client/v3 v3.5.5 go.etcd.io/etcd/server/v3 v3.5.5 diff --git a/pkg/go.sum b/pkg/go.sum index f46898c0c3..285d8f55ea 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -824,8 +824,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -github.com/zilliztech/woodpecker v0.0.0-20250514005855-9467e66ea2bc h1:9KEOCnDt//GAimP3Z3Qh08VwPY7H9AOOjHx9C9ckMSQ= -github.com/zilliztech/woodpecker v0.0.0-20250514005855-9467e66ea2bc/go.mod h1:MLt2hsMXd5bVOykwZyWXYHsy9kN4C2gQEaCrID5rM1w= +github.com/zilliztech/woodpecker v0.0.0-20250515090344-a7889b2d072c h1:OBTsouB6tIUMFQhu4wPS5/HQkgMdaAmS0ZOUoToltXI= +github.com/zilliztech/woodpecker v0.0.0-20250515090344-a7889b2d072c/go.mod h1:MLt2hsMXd5bVOykwZyWXYHsy9kN4C2gQEaCrID5rM1w= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= diff --git a/pkg/streaming/walimpls/impls/wp/builder.go b/pkg/streaming/walimpls/impls/wp/builder.go index ad38b9f120..8d0e409552 100644 --- a/pkg/streaming/walimpls/impls/wp/builder.go +++ b/pkg/streaming/walimpls/impls/wp/builder.go @@ -6,12 +6,14 @@ import ( "github.com/minio/minio-go/v7" "github.com/zilliztech/woodpecker/common/config" + wpMetrics "github.com/zilliztech/woodpecker/common/metrics" wpMinioHandler "github.com/zilliztech/woodpecker/common/minio" "github.com/zilliztech/woodpecker/woodpecker" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/objectstorage" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls" @@ -67,6 +69,7 @@ func (b *builderImpl) Build() (walimpls.OpenerImpls, error) { return nil, err } log.Ctx(context.Background()).Info("build wp opener finish", zap.String("wpClientInstance", fmt.Sprintf("%p", wpClient))) + wpMetrics.RegisterWoodpeckerWithRegisterer(metrics.GetRegisterer()) return &openerImpl{ c: wpClient, }, nil @@ -88,21 +91,21 @@ func (b *builderImpl) setCustomWpConfig(wpConfig *config.Configuration, cfg *par // set the rootPath as the prefix for wp object storage wpConfig.Woodpecker.Meta.Prefix = fmt.Sprintf("%s/wp", paramtable.Get().EtcdCfg.RootPath.GetValue()) // logClient - wpConfig.Woodpecker.Client.Auditor.MaxInterval = cfg.AuditorMaxInterval.GetAsInt() + wpConfig.Woodpecker.Client.Auditor.MaxInterval = int(cfg.AuditorMaxInterval.GetAsDurationByParse().Seconds()) wpConfig.Woodpecker.Client.SegmentAppend.MaxRetries = cfg.AppendMaxRetries.GetAsInt() wpConfig.Woodpecker.Client.SegmentAppend.QueueSize = cfg.AppendQueueSize.GetAsInt() wpConfig.Woodpecker.Client.SegmentRollingPolicy.MaxSize = cfg.SegmentRollingMaxSize.GetAsSize() - wpConfig.Woodpecker.Client.SegmentRollingPolicy.MaxInterval = cfg.SegmentRollingMaxTime.GetAsInt() + wpConfig.Woodpecker.Client.SegmentRollingPolicy.MaxInterval = int(cfg.SegmentRollingMaxTime.GetAsDurationByParse().Seconds()) // logStore - wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxInterval = cfg.SyncMaxInterval.GetAsInt() + wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxInterval = int(cfg.SyncMaxInterval.GetAsDurationByParse().Milliseconds()) wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxEntries = cfg.SyncMaxEntries.GetAsInt() wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxBytes = cfg.SyncMaxBytes.GetAsSize() wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxFlushRetries = cfg.FlushMaxRetries.GetAsInt() wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxFlushSize = cfg.FlushMaxSize.GetAsSize() wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxFlushThreads = cfg.FlushMaxThreads.GetAsInt() - wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.RetryInterval = cfg.RetryInterval.GetAsInt() + wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.RetryInterval = int(cfg.RetryInterval.GetAsDurationByParse().Milliseconds()) wpConfig.Woodpecker.Logstore.FragmentManager.MaxBytes = cfg.FragmentCachedMaxBytes.GetAsSize() - wpConfig.Woodpecker.Logstore.FragmentManager.MaxInterval = cfg.FragmentCachedInterval.GetAsInt() + wpConfig.Woodpecker.Logstore.FragmentManager.MaxInterval = int(cfg.FragmentCachedInterval.GetAsDurationByParse().Milliseconds()) // storage wpConfig.Woodpecker.Storage.Type = cfg.StorageType.GetValue() wpConfig.Woodpecker.Storage.RootPath = cfg.RootPath.GetValue() @@ -166,7 +169,7 @@ func (b *builderImpl) getEtcdClient(ctx context.Context) (*clientv3.Client, erro etcdConfig.EtcdTLSCACert.GetValue(), etcdConfig.EtcdTLSMinVersion.GetValue()) if err != nil { - log.Warn("Woodpecker walimpls connect to etcd failed", zap.Error(err)) + log.Warn("Woodpecker create connection to etcd failed", zap.Error(err)) return nil, err } return etcdCli, nil diff --git a/pkg/streaming/walimpls/impls/wp/wal.go b/pkg/streaming/walimpls/impls/wp/wal.go index 195fa9b31a..1e6dabefd2 100644 --- a/pkg/streaming/walimpls/impls/wp/wal.go +++ b/pkg/streaming/walimpls/impls/wp/wal.go @@ -3,6 +3,8 @@ package wp import ( "context" + "github.com/cockroachdb/errors" + "github.com/zilliztech/woodpecker/common/werr" wp "github.com/zilliztech/woodpecker/woodpecker/log" "go.uber.org/zap" @@ -37,6 +39,10 @@ func (w *walImpl) Append(ctx context.Context, msg message.MutableMessage) (messa }, ) if r.Err != nil { + if werr.ErrWriterLockLost.Is(r.Err) { + w.Log().RatedWarn(1, "wp writer fenced", zap.Error(r.Err)) + return nil, errors.Mark(r.Err, walimpls.ErrFenced) + } w.Log().RatedWarn(1, "write message to woodpecker failed", zap.Error(r.Err)) return nil, r.Err } diff --git a/pkg/streaming/walimpls/impls/wp/wp_test.go b/pkg/streaming/walimpls/impls/wp/wp_test.go index 096d31cb17..b45d1faadf 100644 --- a/pkg/streaming/walimpls/impls/wp/wp_test.go +++ b/pkg/streaming/walimpls/impls/wp/wp_test.go @@ -1,9 +1,11 @@ package wp import ( + "path/filepath" "testing" "github.com/stretchr/testify/assert" + "github.com/zilliztech/woodpecker/woodpecker" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls" @@ -27,5 +29,38 @@ func TestRegistry(t *testing.T) { } func TestWAL(t *testing.T) { - walimpls.NewWALImplsTestFramework(t, 100, &builderImpl{}).Run() + tmpDir := t.TempDir() + rootPath := filepath.Join(tmpDir, "TestWpWAL") + testCases := []struct { + name string + storageType string + rootPath string + }{ + { + name: "LocalFsStorage", + storageType: "local", + rootPath: rootPath, + }, + { + name: "ObjectStorage", + storageType: "minio", // Using default storage type minio-compatible + rootPath: "", // No need to specify path for this storage + }, + } + wpBackendTypeKey := paramtable.Get().WoodpeckerCfg.StorageType.Key + wpBackendRootPathKey := paramtable.Get().WoodpeckerCfg.RootPath.Key + logLevelKey := paramtable.Get().LogCfg.Level.Key + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := paramtable.Get().Save(wpBackendTypeKey, tc.storageType) + assert.NoError(t, err) + err = paramtable.Get().Save(wpBackendRootPathKey, tc.rootPath) + assert.NoError(t, err) + err = paramtable.Get().Save(logLevelKey, "debug") + assert.NoError(t, err) + walimpls.NewWALImplsTestFramework(t, 100, &builderImpl{}).Run() + stopEmbedLogStoreErr := woodpecker.StopEmbedLogStore() + assert.NoError(t, stopEmbedLogStoreErr) + }) + } } diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 721430f0a9..83b5b75797 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -729,7 +729,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.SegmentRollingMaxSize = ParamItem{ Key: "woodpecker.client.segmentRollingPolicy.maxSize", Version: "2.6.0", - DefaultValue: "2000000000", // 1 GB + DefaultValue: "2GB", Doc: "Maximum entries count of a segment, default is 2GB", Export: true, } @@ -738,8 +738,8 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.SegmentRollingMaxTime = ParamItem{ Key: "woodpecker.client.segmentRollingPolicy.maxInterval", Version: "2.6.0", - DefaultValue: "600", - Doc: "Maximum interval between two segments in seconds, default is 10 minutes.", + DefaultValue: "10m", + Doc: "Maximum interval between two segments, default is 10 minutes.", Export: true, } p.SegmentRollingMaxTime.Init(base.mgr) @@ -747,8 +747,8 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.AuditorMaxInterval = ParamItem{ Key: "woodpecker.client.auditor.maxInterval", Version: "2.6.0", - DefaultValue: "10", - Doc: "Maximum interval between two auditing operations in seconds, default is 10 seconds.", + DefaultValue: "10s", + Doc: "Maximum interval between two auditing operations, default is 10 seconds.", Export: true, } p.AuditorMaxInterval.Init(base.mgr) @@ -756,8 +756,8 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.SyncMaxInterval = ParamItem{ Key: "woodpecker.logstore.logFileSyncPolicy.maxInterval", Version: "2.6.0", - DefaultValue: "1000", - Doc: "Maximum interval between two sync operations in milliseconds.", + DefaultValue: "200ms", + Doc: "Maximum interval between two sync operations, default is 200 milliseconds.", Export: true, } p.SyncMaxInterval.Init(base.mgr) @@ -774,7 +774,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.SyncMaxBytes = ParamItem{ Key: "woodpecker.logstore.logFileSyncPolicy.maxBytes", Version: "2.6.0", - DefaultValue: "64000000", + DefaultValue: "64M", Doc: "Maximum size of write buffer in bytes.", Export: true, } @@ -792,7 +792,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.FlushMaxSize = ParamItem{ Key: "woodpecker.logstore.logFileSyncPolicy.maxFlushSize", Version: "2.6.0", - DefaultValue: "8000000", + DefaultValue: "8M", Doc: "Maximum size of a fragment in bytes to flush, default is 8M.", Export: true, } @@ -801,8 +801,8 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.RetryInterval = ParamItem{ Key: "woodpecker.logstore.logFileSyncPolicy.retryInterval", Version: "2.6.0", - DefaultValue: "1000", - Doc: "Maximum interval between two retries in milliseconds.", + DefaultValue: "1000ms", + Doc: "Maximum interval between two retries. default is 1000 milliseconds.", Export: true, } p.RetryInterval.Init(base.mgr) @@ -819,7 +819,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.FragmentCachedMaxBytes = ParamItem{ Key: "woodpecker.logstore.fragmentManager.maxBytes", Version: "2.6.0", - DefaultValue: "1000000000", + DefaultValue: "512M", Doc: "Maximum size of fragment cached data in bytes.", Export: true, } @@ -828,8 +828,8 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.FragmentCachedInterval = ParamItem{ Key: "woodpecker.logstore.fragmentManager.maxInterval", Version: "2.6.0", - DefaultValue: "1000", - Doc: "Maximum interval between two fragment evicts in milliseconds.", + DefaultValue: "1s", + Doc: "Maximum interval between two fragment evicts. default is 1 second", Export: true, } p.FragmentCachedInterval.Init(base.mgr) @@ -838,7 +838,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { Key: "woodpecker.storage.type", Version: "2.6.0", DefaultValue: "minio", - Doc: "The Type of the storage provider. Valid values: [default, minio, local, service], default is minio.", + Doc: "The Type of the storage provider. Valid values: [minio, local]", Export: true, } p.StorageType.Init(base.mgr) diff --git a/pkg/util/paramtable/service_param_test.go b/pkg/util/paramtable/service_param_test.go index ac7e8f7e99..b9e4f0bc37 100644 --- a/pkg/util/paramtable/service_param_test.go +++ b/pkg/util/paramtable/service_param_test.go @@ -103,19 +103,19 @@ func TestServiceParam(t *testing.T) { assert.Equal(t, wpCfg.AppendQueueSize.GetAsInt(), 10000) assert.Equal(t, wpCfg.AppendMaxRetries.GetAsInt(), 3) - assert.Equal(t, wpCfg.SegmentRollingMaxSize.GetAsInt(), 2000000000) - assert.Equal(t, wpCfg.SegmentRollingMaxTime.GetAsInt(), 600) - assert.Equal(t, wpCfg.AuditorMaxInterval.GetAsInt(), 10) + assert.Equal(t, wpCfg.SegmentRollingMaxSize.GetAsSize(), int64(2*1024*1024*1024)) + assert.Equal(t, wpCfg.SegmentRollingMaxTime.GetAsDurationByParse().Seconds(), float64(600)) + assert.Equal(t, wpCfg.AuditorMaxInterval.GetAsDurationByParse().Seconds(), float64(10)) - assert.Equal(t, wpCfg.SyncMaxInterval.GetAsInt(), 1000) + assert.Equal(t, wpCfg.SyncMaxInterval.GetAsDurationByParse().Milliseconds(), int64(200)) assert.Equal(t, wpCfg.SyncMaxEntries.GetAsInt(), 100000) - assert.Equal(t, wpCfg.SyncMaxBytes.GetAsInt(), 64000000) + assert.Equal(t, wpCfg.SyncMaxBytes.GetAsSize(), int64(64*1024*1024)) assert.Equal(t, wpCfg.FlushMaxRetries.GetAsInt(), 5) - assert.Equal(t, wpCfg.FlushMaxSize.GetAsInt(), 8000000) + assert.Equal(t, wpCfg.FlushMaxSize.GetAsSize(), int64(8*1024*1024)) assert.Equal(t, wpCfg.FlushMaxThreads.GetAsInt(), 4) - assert.Equal(t, wpCfg.RetryInterval.GetAsInt(), 1000) - assert.Equal(t, wpCfg.FragmentCachedMaxBytes.GetAsInt64(), int64(1000000000)) - assert.Equal(t, wpCfg.FragmentCachedInterval.GetAsInt(), 1000) + assert.Equal(t, wpCfg.RetryInterval.GetAsDurationByParse().Milliseconds(), int64(1000)) + assert.Equal(t, wpCfg.FragmentCachedMaxBytes.GetAsSize(), int64(512*1024*1024)) + assert.Equal(t, wpCfg.FragmentCachedInterval.GetAsDurationByParse().Milliseconds(), int64(1000)) assert.Equal(t, wpCfg.StorageType.GetValue(), "minio") assert.Equal(t, wpCfg.RootPath.GetValue(), "/var/lib/milvus/woodpecker")