enhance: wp metrics and update deps to v0.1.0 (#43569)

#43574   #43604 #43431  #43603 
Fix wp metrics not registered bug;
Update the version dependent on wp to v0.1.2-rc1;
improve advanced reader with concurrent prefetch blks;
add the segment rolling policy based on the number of blocks;
improve concurrent compaction
release lock failed bug

Signed-off-by: tinswzy <zhenyuan.wei@zilliz.com>
This commit is contained in:
tinswzy 2025-07-29 14:51:35 +08:00 committed by GitHub
parent 268f1cdace
commit 173efe2b98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 77 additions and 11 deletions

View File

@ -190,6 +190,7 @@ woodpecker:
segmentRollingPolicy: segmentRollingPolicy:
maxSize: 256M # Maximum size of a segment. maxSize: 256M # Maximum size of a segment.
maxInterval: 10m # Maximum interval between two segments, default is 10 minutes. maxInterval: 10m # Maximum interval between two segments, default is 10 minutes.
maxBlocks: 1000 # Maximum number of blocks in a segment
auditor: auditor:
maxInterval: 10s # Maximum interval between two auditing operations, default is 10 seconds. maxInterval: 10s # Maximum interval between two auditing operations, default is 10 seconds.
logstore: logstore:
@ -204,6 +205,11 @@ woodpecker:
maxFlushThreads: 32 # Maximum number of threads to flush data maxFlushThreads: 32 # Maximum number of threads to flush data
segmentCompactionPolicy: segmentCompactionPolicy:
maxSize: 2M # The maximum size of the merged files. maxSize: 2M # The maximum size of the merged files.
maxParallelUploads: 4 # The maximum number of parallel upload threads for compaction.
maxParallelReads: 8 # The maximum number of parallel read threads for compaction.
segmentReadPolicy:
maxBatchSize: 16M # Maximum size of a batch in bytes.
maxFetchThreads: 32 # Maximum number of threads to fetch data.
storage: storage:
type: minio # The Type of the storage provider. Valid values: [minio, local] type: minio # The Type of the storage provider. Valid values: [minio, local]
rootPath: /var/lib/milvus/woodpecker # The root path of the storage provider. rootPath: /var/lib/milvus/woodpecker # The root path of the storage provider.

2
go.mod
View File

@ -247,7 +247,7 @@ require (
github.com/x448/float16 v0.8.4 // indirect github.com/x448/float16 v0.8.4 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect
github.com/zilliztech/woodpecker v0.1.0-rc2.0.20250724120847-9596b90664c7 // indirect github.com/zilliztech/woodpecker v0.1.2-rc1.0.20250728150756-6584f43e6a96 // indirect
go.etcd.io/bbolt v1.3.6 // indirect go.etcd.io/bbolt v1.3.6 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
go.etcd.io/etcd/client/v2 v2.305.5 // indirect go.etcd.io/etcd/client/v2 v2.305.5 // indirect

4
go.sum
View File

@ -1079,8 +1079,8 @@ github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
github.com/zilliztech/woodpecker v0.1.0-rc2.0.20250724120847-9596b90664c7 h1:DRC/6gvrKJFN1BRN+Dwn9wYQ6WJ/5VCr2Z+sT7tWjVA= github.com/zilliztech/woodpecker v0.1.2-rc1.0.20250728150756-6584f43e6a96 h1:kVmos2GC+4W/F0ORRsGpOODslx9Kpza12WyU9yu5Nrs=
github.com/zilliztech/woodpecker v0.1.0-rc2.0.20250724120847-9596b90664c7/go.mod h1:vEEMNxlYFkNYvaCHcc83+HyVRIeZEjBTSMu+5dypKbo= github.com/zilliztech/woodpecker v0.1.2-rc1.0.20250728150756-6584f43e6a96/go.mod h1:vEEMNxlYFkNYvaCHcc83+HyVRIeZEjBTSMu+5dypKbo=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=

View File

@ -39,7 +39,7 @@ require (
github.com/tikv/client-go/v2 v2.0.4 github.com/tikv/client-go/v2 v2.0.4
github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/x448/float16 v0.8.4 github.com/x448/float16 v0.8.4
github.com/zilliztech/woodpecker v0.1.0-rc2.0.20250724120847-9596b90664c7 github.com/zilliztech/woodpecker v0.1.2-rc1.0.20250728150756-6584f43e6a96
go.etcd.io/etcd/api/v3 v3.5.5 go.etcd.io/etcd/api/v3 v3.5.5
go.etcd.io/etcd/client/v3 v3.5.5 go.etcd.io/etcd/client/v3 v3.5.5
go.etcd.io/etcd/server/v3 v3.5.5 go.etcd.io/etcd/server/v3 v3.5.5

View File

@ -822,8 +822,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/zilliztech/woodpecker v0.1.0-rc2.0.20250724120847-9596b90664c7 h1:DRC/6gvrKJFN1BRN+Dwn9wYQ6WJ/5VCr2Z+sT7tWjVA= github.com/zilliztech/woodpecker v0.1.2-rc1.0.20250728150756-6584f43e6a96 h1:kVmos2GC+4W/F0ORRsGpOODslx9Kpza12WyU9yu5Nrs=
github.com/zilliztech/woodpecker v0.1.0-rc2.0.20250724120847-9596b90664c7/go.mod h1:vEEMNxlYFkNYvaCHcc83+HyVRIeZEjBTSMu+5dypKbo= github.com/zilliztech/woodpecker v0.1.2-rc1.0.20250728150756-6584f43e6a96/go.mod h1:vEEMNxlYFkNYvaCHcc83+HyVRIeZEjBTSMu+5dypKbo=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=

View File

@ -96,6 +96,7 @@ func (b *builderImpl) setCustomWpConfig(wpConfig *config.Configuration, cfg *par
wpConfig.Woodpecker.Client.SegmentAppend.QueueSize = cfg.AppendQueueSize.GetAsInt() wpConfig.Woodpecker.Client.SegmentAppend.QueueSize = cfg.AppendQueueSize.GetAsInt()
wpConfig.Woodpecker.Client.SegmentRollingPolicy.MaxSize = cfg.SegmentRollingMaxSize.GetAsSize() wpConfig.Woodpecker.Client.SegmentRollingPolicy.MaxSize = cfg.SegmentRollingMaxSize.GetAsSize()
wpConfig.Woodpecker.Client.SegmentRollingPolicy.MaxInterval = int(cfg.SegmentRollingMaxTime.GetAsDurationByParse().Seconds()) wpConfig.Woodpecker.Client.SegmentRollingPolicy.MaxInterval = int(cfg.SegmentRollingMaxTime.GetAsDurationByParse().Seconds())
wpConfig.Woodpecker.Client.SegmentRollingPolicy.MaxBlocks = cfg.SegmentRollingMaxBlocks.GetAsInt64()
// logStore // logStore
wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.MaxInterval = int(cfg.SyncMaxInterval.GetAsDurationByParse().Milliseconds()) wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.MaxInterval = int(cfg.SyncMaxInterval.GetAsDurationByParse().Milliseconds())
wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.MaxIntervalForLocalStorage = int(cfg.SyncMaxIntervalForLocalStorage.GetAsDurationByParse().Milliseconds()) wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.MaxIntervalForLocalStorage = int(cfg.SyncMaxIntervalForLocalStorage.GetAsDurationByParse().Milliseconds())
@ -106,6 +107,10 @@ func (b *builderImpl) setCustomWpConfig(wpConfig *config.Configuration, cfg *par
wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.MaxFlushThreads = cfg.FlushMaxThreads.GetAsInt() wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.MaxFlushThreads = cfg.FlushMaxThreads.GetAsInt()
wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.RetryInterval = int(cfg.RetryInterval.GetAsDurationByParse().Milliseconds()) wpConfig.Woodpecker.Logstore.SegmentSyncPolicy.RetryInterval = int(cfg.RetryInterval.GetAsDurationByParse().Milliseconds())
wpConfig.Woodpecker.Logstore.SegmentCompactionPolicy.MaxBytes = cfg.CompactionSize.GetAsSize() wpConfig.Woodpecker.Logstore.SegmentCompactionPolicy.MaxBytes = cfg.CompactionSize.GetAsSize()
wpConfig.Woodpecker.Logstore.SegmentCompactionPolicy.MaxParallelUploads = cfg.CompactionMaxParallelUploads.GetAsInt()
wpConfig.Woodpecker.Logstore.SegmentCompactionPolicy.MaxParallelReads = cfg.CompactionMaxParallelReads.GetAsInt()
wpConfig.Woodpecker.Logstore.SegmentReadPolicy.MaxBatchSize = cfg.ReaderMaxBatchSize.GetAsSize()
wpConfig.Woodpecker.Logstore.SegmentReadPolicy.MaxFetchThreads = cfg.ReaderMaxFetchThreads.GetAsInt()
// storage // storage
wpConfig.Woodpecker.Storage.Type = cfg.StorageType.GetValue() wpConfig.Woodpecker.Storage.Type = cfg.StorageType.GetValue()
wpConfig.Woodpecker.Storage.RootPath = cfg.RootPath.GetValue() wpConfig.Woodpecker.Storage.RootPath = cfg.RootPath.GetValue()

View File

@ -671,6 +671,7 @@ type WoodpeckerConfig struct {
AppendMaxRetries ParamItem `refreshable:"true"` AppendMaxRetries ParamItem `refreshable:"true"`
SegmentRollingMaxSize ParamItem `refreshable:"true"` SegmentRollingMaxSize ParamItem `refreshable:"true"`
SegmentRollingMaxTime ParamItem `refreshable:"true"` SegmentRollingMaxTime ParamItem `refreshable:"true"`
SegmentRollingMaxBlocks ParamItem `refreshable:"true"`
AuditorMaxInterval ParamItem `refreshable:"true"` AuditorMaxInterval ParamItem `refreshable:"true"`
// logstore // logstore
@ -683,6 +684,10 @@ type WoodpeckerConfig struct {
FlushMaxSize ParamItem `refreshable:"true"` FlushMaxSize ParamItem `refreshable:"true"`
FlushMaxThreads ParamItem `refreshable:"true"` FlushMaxThreads ParamItem `refreshable:"true"`
CompactionSize ParamItem `refreshable:"true"` CompactionSize ParamItem `refreshable:"true"`
CompactionMaxParallelUploads ParamItem `refreshable:"true"`
CompactionMaxParallelReads ParamItem `refreshable:"true"`
ReaderMaxBatchSize ParamItem `refreshable:"true"`
ReaderMaxFetchThreads ParamItem `refreshable:"true"`
// storage // storage
StorageType ParamItem `refreshable:"false"` StorageType ParamItem `refreshable:"false"`
@ -744,6 +749,15 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) {
} }
p.SegmentRollingMaxTime.Init(base.mgr) p.SegmentRollingMaxTime.Init(base.mgr)
p.SegmentRollingMaxBlocks = ParamItem{
Key: "woodpecker.client.segmentRollingPolicy.maxBlocks",
Version: "2.6.0",
DefaultValue: "1000",
Doc: "Maximum number of blocks in a segment",
Export: true,
}
p.SegmentRollingMaxBlocks.Init(base.mgr)
p.AuditorMaxInterval = ParamItem{ p.AuditorMaxInterval = ParamItem{
Key: "woodpecker.client.auditor.maxInterval", Key: "woodpecker.client.auditor.maxInterval",
Version: "2.6.0", Version: "2.6.0",
@ -834,6 +848,42 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) {
} }
p.CompactionSize.Init(base.mgr) p.CompactionSize.Init(base.mgr)
p.CompactionMaxParallelUploads = ParamItem{
Key: "woodpecker.logstore.segmentCompactionPolicy.maxParallelUploads",
Version: "2.6.0",
DefaultValue: "4",
Doc: "The maximum number of parallel upload threads for compaction.",
Export: true,
}
p.CompactionMaxParallelUploads.Init(base.mgr)
p.CompactionMaxParallelReads = ParamItem{
Key: "woodpecker.logstore.segmentCompactionPolicy.maxParallelReads",
Version: "2.6.0",
DefaultValue: "8",
Doc: "The maximum number of parallel read threads for compaction.",
Export: true,
}
p.CompactionMaxParallelReads.Init(base.mgr)
p.ReaderMaxBatchSize = ParamItem{
Key: "woodpecker.logstore.segmentReadPolicy.maxBatchSize",
Version: "2.6.0",
DefaultValue: "16M",
Doc: "Maximum size of a batch in bytes.",
Export: true,
}
p.ReaderMaxBatchSize.Init(base.mgr)
p.ReaderMaxFetchThreads = ParamItem{
Key: "woodpecker.logstore.segmentReadPolicy.maxFetchThreads",
Version: "2.6.0",
DefaultValue: "32",
Doc: "Maximum number of threads to fetch data.",
Export: true,
}
p.ReaderMaxFetchThreads.Init(base.mgr)
p.StorageType = ParamItem{ p.StorageType = ParamItem{
Key: "woodpecker.storage.type", Key: "woodpecker.storage.type",
Version: "2.6.0", Version: "2.6.0",

View File

@ -105,6 +105,7 @@ func TestServiceParam(t *testing.T) {
assert.Equal(t, wpCfg.AppendMaxRetries.GetAsInt(), 3) assert.Equal(t, wpCfg.AppendMaxRetries.GetAsInt(), 3)
assert.Equal(t, wpCfg.SegmentRollingMaxSize.GetAsSize(), int64(256*1024*1024)) assert.Equal(t, wpCfg.SegmentRollingMaxSize.GetAsSize(), int64(256*1024*1024))
assert.Equal(t, wpCfg.SegmentRollingMaxTime.GetAsDurationByParse().Seconds(), float64(600)) assert.Equal(t, wpCfg.SegmentRollingMaxTime.GetAsDurationByParse().Seconds(), float64(600))
assert.Equal(t, wpCfg.SegmentRollingMaxBlocks.GetAsInt64(), int64(1000))
assert.Equal(t, wpCfg.AuditorMaxInterval.GetAsDurationByParse().Seconds(), float64(10)) assert.Equal(t, wpCfg.AuditorMaxInterval.GetAsDurationByParse().Seconds(), float64(10))
assert.Equal(t, wpCfg.SyncMaxInterval.GetAsDurationByParse().Milliseconds(), int64(200)) assert.Equal(t, wpCfg.SyncMaxInterval.GetAsDurationByParse().Milliseconds(), int64(200))
@ -116,6 +117,10 @@ func TestServiceParam(t *testing.T) {
assert.Equal(t, wpCfg.FlushMaxThreads.GetAsInt(), 32) assert.Equal(t, wpCfg.FlushMaxThreads.GetAsInt(), 32)
assert.Equal(t, wpCfg.RetryInterval.GetAsDurationByParse().Milliseconds(), int64(1000)) assert.Equal(t, wpCfg.RetryInterval.GetAsDurationByParse().Milliseconds(), int64(1000))
assert.Equal(t, wpCfg.CompactionSize.GetAsSize(), int64(2*1024*1024)) assert.Equal(t, wpCfg.CompactionSize.GetAsSize(), int64(2*1024*1024))
assert.Equal(t, wpCfg.CompactionMaxParallelUploads.GetAsInt(), 4)
assert.Equal(t, wpCfg.CompactionMaxParallelReads.GetAsInt(), 8)
assert.Equal(t, wpCfg.ReaderMaxBatchSize.GetAsSize(), int64(16*1024*1024))
assert.Equal(t, wpCfg.ReaderMaxFetchThreads.GetAsInt(), 32)
assert.Equal(t, wpCfg.StorageType.GetValue(), "minio") assert.Equal(t, wpCfg.StorageType.GetValue(), "minio")
assert.Equal(t, wpCfg.RootPath.GetValue(), "/var/lib/milvus/woodpecker") assert.Equal(t, wpCfg.RootPath.GetValue(), "/var/lib/milvus/woodpecker")