diff --git a/configs/milvus.yaml b/configs/milvus.yaml index bca676e140..3107249261 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -192,19 +192,17 @@ woodpecker: auditor: maxInterval: 10s # Maximum interval between two auditing operations, default is 10 seconds. logstore: - logFileSyncPolicy: + segmentSyncPolicy: maxInterval: 200ms # Maximum interval between two sync operations, default is 200 milliseconds. + maxIntervalForLocalStorage: 10ms # Maximum interval between two sync operations local storage backend, default is 10 milliseconds. maxBytes: 32M # Maximum size of write buffer in bytes. maxEntries: 10000 # Maximum entries number of write buffer. maxFlushRetries: 5 # Maximum size of write buffer in bytes. retryInterval: 1000ms # Maximum interval between two retries. default is 1000 milliseconds. maxFlushSize: 8M # Maximum size of a fragment in bytes to flush, default is 8M. maxFlushThreads: 4 # Maximum number of threads to flush data - logFileCompactionPolicy: + segmentCompactionPolicy: maxSize: 8M # The maximum size of the merged files, default is 8M. - fragmentManager: - maxBytes: 128M # Maximum size of fragment cached data in bytes. - maxInterval: 1s # Maximum interval between two fragment evicts. default is 1 second storage: type: minio # The Type of the storage provider. Valid values: [minio, local] rootPath: /var/lib/milvus/woodpecker # The root path of the storage provider. diff --git a/go.mod b/go.mod index 52dcd35636..4ae5c10b15 100644 --- a/go.mod +++ b/go.mod @@ -243,7 +243,7 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - github.com/zilliztech/woodpecker v0.0.0-20250607012818-0b658cd0c958 // indirect + github.com/zilliztech/woodpecker v0.0.0-20250609083736-62920192a254 // indirect go.etcd.io/bbolt v1.3.6 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect go.etcd.io/etcd/client/v2 v2.305.5 // indirect diff --git a/go.sum b/go.sum index e1d5235303..6cd6d427e6 100644 --- a/go.sum +++ b/go.sum @@ -1064,8 +1064,8 @@ github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= -github.com/zilliztech/woodpecker v0.0.0-20250607012818-0b658cd0c958 h1:TYKnS3kpVOlE6oGI2rrq8yODBcnxUkzocRSPpySMjdo= -github.com/zilliztech/woodpecker v0.0.0-20250607012818-0b658cd0c958/go.mod h1:MeyFx9vsAzxEysO2wzMC5d1QcSknbfVZOAhEmyxgQbo= +github.com/zilliztech/woodpecker v0.0.0-20250609083736-62920192a254 h1:e0p+viwWPAHLhXqrlwSfqYnN23agGQtvyIW5tWc8kmg= +github.com/zilliztech/woodpecker v0.0.0-20250609083736-62920192a254/go.mod h1:MeyFx9vsAzxEysO2wzMC5d1QcSknbfVZOAhEmyxgQbo= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= diff --git a/pkg/go.mod b/pkg/go.mod index 0d27109ea2..2bed5ebf7e 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -39,7 +39,7 @@ require ( github.com/tikv/client-go/v2 v2.0.4 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/x448/float16 v0.8.4 - github.com/zilliztech/woodpecker v0.0.0-20250607012818-0b658cd0c958 + github.com/zilliztech/woodpecker v0.0.0-20250609083736-62920192a254 go.etcd.io/etcd/api/v3 v3.5.5 go.etcd.io/etcd/client/v3 v3.5.5 go.etcd.io/etcd/server/v3 v3.5.5 diff --git a/pkg/go.sum b/pkg/go.sum index 2267b76f08..02129786e2 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -822,8 +822,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -github.com/zilliztech/woodpecker v0.0.0-20250607012818-0b658cd0c958 h1:TYKnS3kpVOlE6oGI2rrq8yODBcnxUkzocRSPpySMjdo= -github.com/zilliztech/woodpecker v0.0.0-20250607012818-0b658cd0c958/go.mod h1:MeyFx9vsAzxEysO2wzMC5d1QcSknbfVZOAhEmyxgQbo= +github.com/zilliztech/woodpecker v0.0.0-20250609083736-62920192a254 h1:e0p+viwWPAHLhXqrlwSfqYnN23agGQtvyIW5tWc8kmg= +github.com/zilliztech/woodpecker v0.0.0-20250609083736-62920192a254/go.mod h1:MeyFx9vsAzxEysO2wzMC5d1QcSknbfVZOAhEmyxgQbo= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= diff --git a/pkg/streaming/walimpls/impls/wp/builder.go b/pkg/streaming/walimpls/impls/wp/builder.go index 63776c38b6..d4e02ad107 100644 --- a/pkg/streaming/walimpls/impls/wp/builder.go +++ b/pkg/streaming/walimpls/impls/wp/builder.go @@ -97,16 +97,15 @@ func (b *builderImpl) setCustomWpConfig(wpConfig *config.Configuration, cfg *par wpConfig.Woodpecker.Client.SegmentRollingPolicy.MaxSize = cfg.SegmentRollingMaxSize.GetAsSize() wpConfig.Woodpecker.Client.SegmentRollingPolicy.MaxInterval = int(cfg.SegmentRollingMaxTime.GetAsDurationByParse().Seconds()) // logStore - wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxInterval = int(cfg.SyncMaxInterval.GetAsDurationByParse().Milliseconds()) - wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxEntries = cfg.SyncMaxEntries.GetAsInt() - wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxBytes = cfg.SyncMaxBytes.GetAsSize() - wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxFlushRetries = cfg.FlushMaxRetries.GetAsInt() - wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxFlushSize = cfg.FlushMaxSize.GetAsSize() - wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxFlushThreads = cfg.FlushMaxThreads.GetAsInt() - wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.RetryInterval = int(cfg.RetryInterval.GetAsDurationByParse().Milliseconds()) - wpConfig.Woodpecker.Logstore.LogFileCompactionPolicy.MaxBytes = cfg.CompactionSize.GetAsSize() - wpConfig.Woodpecker.Logstore.FragmentManager.MaxBytes = cfg.FragmentCachedMaxBytes.GetAsSize() - wpConfig.Woodpecker.Logstore.FragmentManager.MaxInterval = int(cfg.FragmentCachedInterval.GetAsDurationByParse().Milliseconds()) + wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.MaxInterval = int(cfg.SyncMaxInterval.GetAsDurationByParse().Milliseconds()) + wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.MaxIntervalForLocalStorage = int(cfg.SyncMaxIntervalForLocalStorage.GetAsDurationByParse().Milliseconds()) + wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.MaxEntries = cfg.SyncMaxEntries.GetAsInt() + wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.MaxBytes = cfg.SyncMaxBytes.GetAsSize() + wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.MaxFlushRetries = cfg.FlushMaxRetries.GetAsInt() + wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.MaxFlushSize = cfg.FlushMaxSize.GetAsSize() + wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.MaxFlushThreads = cfg.FlushMaxThreads.GetAsInt() + wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.RetryInterval = int(cfg.RetryInterval.GetAsDurationByParse().Milliseconds()) + wpConfig.Woodpecker.Logstore.SegmentCompactionPolicy.MaxBytes = cfg.CompactionSize.GetAsSize() // storage wpConfig.Woodpecker.Storage.Type = cfg.StorageType.GetValue() wpConfig.Woodpecker.Storage.RootPath = cfg.RootPath.GetValue() diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 2be4bd6d83..e44f374fbd 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -674,16 +674,15 @@ type WoodpeckerConfig struct { AuditorMaxInterval ParamItem `refreshable:"true"` // logstore - SyncMaxInterval ParamItem `refreshable:"true"` - SyncMaxBytes ParamItem `refreshable:"true"` - SyncMaxEntries ParamItem `refreshable:"true"` - FlushMaxRetries ParamItem `refreshable:"true"` - RetryInterval ParamItem `refreshable:"true"` - FlushMaxSize ParamItem `refreshable:"true"` - FlushMaxThreads ParamItem `refreshable:"true"` - CompactionSize ParamItem `refreshable:"true"` - FragmentCachedMaxBytes ParamItem `refreshable:"true"` - FragmentCachedInterval ParamItem `refreshable:"true"` + SyncMaxInterval ParamItem `refreshable:"true"` + SyncMaxIntervalForLocalStorage ParamItem `refreshable:"true"` + SyncMaxBytes ParamItem `refreshable:"true"` + SyncMaxEntries ParamItem `refreshable:"true"` + FlushMaxRetries ParamItem `refreshable:"true"` + RetryInterval ParamItem `refreshable:"true"` + FlushMaxSize ParamItem `refreshable:"true"` + FlushMaxThreads ParamItem `refreshable:"true"` + CompactionSize ParamItem `refreshable:"true"` // storage StorageType ParamItem `refreshable:"false"` @@ -755,7 +754,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.AuditorMaxInterval.Init(base.mgr) p.SyncMaxInterval = ParamItem{ - Key: "woodpecker.logstore.logFileSyncPolicy.maxInterval", + Key: "woodpecker.logstore.segmentSyncPolicy.maxInterval", Version: "2.6.0", DefaultValue: "200ms", Doc: "Maximum interval between two sync operations, default is 200 milliseconds.", @@ -763,8 +762,17 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { } p.SyncMaxInterval.Init(base.mgr) + p.SyncMaxIntervalForLocalStorage = ParamItem{ + Key: "woodpecker.logstore.segmentSyncPolicy.maxIntervalForLocalStorage", + Version: "2.6.0", + DefaultValue: "10ms", + Doc: "Maximum interval between two sync operations local storage backend, default is 10 milliseconds.", + Export: true, + } + p.SyncMaxIntervalForLocalStorage.Init(base.mgr) + p.SyncMaxEntries = ParamItem{ - Key: "woodpecker.logstore.logFileSyncPolicy.maxEntries", + Key: "woodpecker.logstore.segmentSyncPolicy.maxEntries", Version: "2.6.0", DefaultValue: "10000", Doc: "Maximum entries number of write buffer.", @@ -773,7 +781,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.SyncMaxEntries.Init(base.mgr) p.SyncMaxBytes = ParamItem{ - Key: "woodpecker.logstore.logFileSyncPolicy.maxBytes", + Key: "woodpecker.logstore.segmentSyncPolicy.maxBytes", Version: "2.6.0", DefaultValue: "32M", Doc: "Maximum size of write buffer in bytes.", @@ -782,7 +790,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.SyncMaxBytes.Init(base.mgr) p.FlushMaxRetries = ParamItem{ - Key: "woodpecker.logstore.logFileSyncPolicy.maxFlushRetries", + Key: "woodpecker.logstore.segmentSyncPolicy.maxFlushRetries", Version: "2.6.0", DefaultValue: "5", Doc: "Maximum size of write buffer in bytes.", @@ -791,7 +799,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.FlushMaxRetries.Init(base.mgr) p.FlushMaxSize = ParamItem{ - Key: "woodpecker.logstore.logFileSyncPolicy.maxFlushSize", + Key: "woodpecker.logstore.segmentSyncPolicy.maxFlushSize", Version: "2.6.0", DefaultValue: "8M", Doc: "Maximum size of a fragment in bytes to flush, default is 8M.", @@ -800,7 +808,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.FlushMaxSize.Init(base.mgr) p.RetryInterval = ParamItem{ - Key: "woodpecker.logstore.logFileSyncPolicy.retryInterval", + Key: "woodpecker.logstore.segmentSyncPolicy.retryInterval", Version: "2.6.0", DefaultValue: "1000ms", Doc: "Maximum interval between two retries. default is 1000 milliseconds.", @@ -809,7 +817,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.RetryInterval.Init(base.mgr) p.FlushMaxThreads = ParamItem{ - Key: "woodpecker.logstore.logFileSyncPolicy.maxFlushThreads", + Key: "woodpecker.logstore.segmentSyncPolicy.maxFlushThreads", Version: "2.6.0", DefaultValue: "4", Doc: "Maximum number of threads to flush data", @@ -818,7 +826,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.FlushMaxThreads.Init(base.mgr) p.CompactionSize = ParamItem{ - Key: "woodpecker.logstore.logFileCompactionPolicy.maxSize", + Key: "woodpecker.logstore.segmentCompactionPolicy.maxSize", Version: "2.6.0", DefaultValue: "8M", Doc: "The maximum size of the merged files, default is 8M.", @@ -826,24 +834,6 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { } p.CompactionSize.Init(base.mgr) - p.FragmentCachedMaxBytes = ParamItem{ - Key: "woodpecker.logstore.fragmentManager.maxBytes", - Version: "2.6.0", - DefaultValue: "128M", - Doc: "Maximum size of fragment cached data in bytes.", - Export: true, - } - p.FragmentCachedMaxBytes.Init(base.mgr) - - p.FragmentCachedInterval = ParamItem{ - Key: "woodpecker.logstore.fragmentManager.maxInterval", - Version: "2.6.0", - DefaultValue: "1s", - Doc: "Maximum interval between two fragment evicts. default is 1 second", - Export: true, - } - p.FragmentCachedInterval.Init(base.mgr) - p.StorageType = ParamItem{ Key: "woodpecker.storage.type", Version: "2.6.0", diff --git a/pkg/util/paramtable/service_param_test.go b/pkg/util/paramtable/service_param_test.go index b28696e1d6..be2ea653a4 100644 --- a/pkg/util/paramtable/service_param_test.go +++ b/pkg/util/paramtable/service_param_test.go @@ -108,6 +108,7 @@ func TestServiceParam(t *testing.T) { assert.Equal(t, wpCfg.AuditorMaxInterval.GetAsDurationByParse().Seconds(), float64(10)) assert.Equal(t, wpCfg.SyncMaxInterval.GetAsDurationByParse().Milliseconds(), int64(200)) + assert.Equal(t, wpCfg.SyncMaxIntervalForLocalStorage.GetAsDurationByParse().Milliseconds(), int64(10)) assert.Equal(t, wpCfg.SyncMaxEntries.GetAsInt(), 10000) assert.Equal(t, wpCfg.SyncMaxBytes.GetAsSize(), int64(32*1024*1024)) assert.Equal(t, wpCfg.FlushMaxRetries.GetAsInt(), 5) @@ -115,8 +116,6 @@ func TestServiceParam(t *testing.T) { assert.Equal(t, wpCfg.FlushMaxThreads.GetAsInt(), 4) assert.Equal(t, wpCfg.RetryInterval.GetAsDurationByParse().Milliseconds(), int64(1000)) assert.Equal(t, wpCfg.CompactionSize.GetAsSize(), int64(8*1024*1024)) - assert.Equal(t, wpCfg.FragmentCachedMaxBytes.GetAsSize(), int64(128*1024*1024)) - assert.Equal(t, wpCfg.FragmentCachedInterval.GetAsDurationByParse().Milliseconds(), int64(1000)) assert.Equal(t, wpCfg.StorageType.GetValue(), "minio") assert.Equal(t, wpCfg.RootPath.GetValue(), "/var/lib/milvus/woodpecker")