From f55f900c8517ca9e69f474e48028b5765184253a Mon Sep 17 00:00:00 2001 From: tinswzy Date: Tue, 3 Jun 2025 09:58:36 +0800 Subject: [PATCH] fix insert hang caused by WAL writer writing to a closing logfile (#42078) related issue #42049 wp commit [94de4](https://github.com/zilliztech/woodpecker/commit/94de4cbc60ebb01536eb2f1ab755da51f2146f64) Signed-off-by: tinswzy --- configs/milvus.yaml | 10 ++++++---- go.mod | 2 +- go.sum | 6 ++---- pkg/go.mod | 2 +- pkg/go.sum | 6 ++---- pkg/streaming/walimpls/impls/wp/builder.go | 1 + pkg/util/paramtable/service_param.go | 20 +++++++++++++++----- pkg/util/paramtable/service_param_test.go | 9 +++++---- 8 files changed, 33 insertions(+), 23 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 7ab6b0c75e..a2a5bcfe62 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -187,21 +187,23 @@ woodpecker: queueSize: 10000 # The size of the queue for pending messages to be sent of each log. maxRetries: 3 # Maximum number of retries for segment append operations. segmentRollingPolicy: - maxSize: 2GB # Maximum entries count of a segment, default is 2GB + maxSize: 128M # Maximum entries count of a segment, default is 128M maxInterval: 10m # Maximum interval between two segments, default is 10 minutes. auditor: maxInterval: 10s # Maximum interval between two auditing operations, default is 10 seconds. logstore: logFileSyncPolicy: maxInterval: 200ms # Maximum interval between two sync operations, default is 200 milliseconds. - maxBytes: 64M # Maximum size of write buffer in bytes. - maxEntries: 100000 # Maximum entries number of write buffer. + maxBytes: 32M # Maximum size of write buffer in bytes. + maxEntries: 10000 # Maximum entries number of write buffer. maxFlushRetries: 5 # Maximum size of write buffer in bytes. retryInterval: 1000ms # Maximum interval between two retries. default is 1000 milliseconds. maxFlushSize: 8M # Maximum size of a fragment in bytes to flush, default is 8M. maxFlushThreads: 4 # Maximum number of threads to flush data + logFileCompactionPolicy: + maxSize: 8M # The maximum size of the merged files, default is 8M. fragmentManager: - maxBytes: 512M # Maximum size of fragment cached data in bytes. + maxBytes: 128M # Maximum size of fragment cached data in bytes. maxInterval: 1s # Maximum interval between two fragment evicts. default is 1 second storage: type: minio # The Type of the storage provider. Valid values: [minio, local] diff --git a/go.mod b/go.mod index cb3f5d1aa4..9ef85beb15 100644 --- a/go.mod +++ b/go.mod @@ -243,7 +243,7 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - github.com/zilliztech/woodpecker v0.0.0-20250522011808-4059b68a0e03 // indirect + github.com/zilliztech/woodpecker v0.0.0-20250602050237-2822b5a11b22 // indirect go.etcd.io/bbolt v1.3.6 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect go.etcd.io/etcd/client/v2 v2.305.5 // indirect diff --git a/go.sum b/go.sum index ab20812e82..a5845a58fb 100644 --- a/go.sum +++ b/go.sum @@ -474,8 +474,6 @@ github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/gops v0.3.28 h1:2Xr57tqKAmQYRAfG12E+yLcoa2Y42UJo2lOrUFL9ark= -github.com/google/gops v0.3.28/go.mod h1:6f6+Nl8LcHrzJwi8+p0ii+vmBFSlB4f8cOOkTJ7sk4c= github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -1066,8 +1064,8 @@ github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= -github.com/zilliztech/woodpecker v0.0.0-20250522011808-4059b68a0e03 h1:79e9xOq1BU102KutPMQegP3ItGb9BbDbi7BE2ii8Leo= -github.com/zilliztech/woodpecker v0.0.0-20250522011808-4059b68a0e03/go.mod h1:476FWuCIVwvrN/qc+rNxunoMTgC07FW4oc/fMDg4f0A= +github.com/zilliztech/woodpecker v0.0.0-20250602050237-2822b5a11b22 h1:Yb1t/yyELWoFyVde0AWIsjF+fwqYdo1VVv7yqhHhIvM= +github.com/zilliztech/woodpecker v0.0.0-20250602050237-2822b5a11b22/go.mod h1:MeyFx9vsAzxEysO2wzMC5d1QcSknbfVZOAhEmyxgQbo= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= diff --git a/pkg/go.mod b/pkg/go.mod index d979a90b19..5a0152d8fa 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -39,7 +39,7 @@ require ( github.com/tikv/client-go/v2 v2.0.4 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/x448/float16 v0.8.4 - github.com/zilliztech/woodpecker v0.0.0-20250522011808-4059b68a0e03 + github.com/zilliztech/woodpecker v0.0.0-20250602050237-2822b5a11b22 go.etcd.io/etcd/api/v3 v3.5.5 go.etcd.io/etcd/client/v3 v3.5.5 go.etcd.io/etcd/server/v3 v3.5.5 diff --git a/pkg/go.sum b/pkg/go.sum index 067ce2467a..723d12bcea 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -361,8 +361,6 @@ github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/gops v0.3.28 h1:2Xr57tqKAmQYRAfG12E+yLcoa2Y42UJo2lOrUFL9ark= -github.com/google/gops v0.3.28/go.mod h1:6f6+Nl8LcHrzJwi8+p0ii+vmBFSlB4f8cOOkTJ7sk4c= github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -824,8 +822,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -github.com/zilliztech/woodpecker v0.0.0-20250522011808-4059b68a0e03 h1:79e9xOq1BU102KutPMQegP3ItGb9BbDbi7BE2ii8Leo= -github.com/zilliztech/woodpecker v0.0.0-20250522011808-4059b68a0e03/go.mod h1:476FWuCIVwvrN/qc+rNxunoMTgC07FW4oc/fMDg4f0A= +github.com/zilliztech/woodpecker v0.0.0-20250602050237-2822b5a11b22 h1:Yb1t/yyELWoFyVde0AWIsjF+fwqYdo1VVv7yqhHhIvM= +github.com/zilliztech/woodpecker v0.0.0-20250602050237-2822b5a11b22/go.mod h1:MeyFx9vsAzxEysO2wzMC5d1QcSknbfVZOAhEmyxgQbo= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= diff --git a/pkg/streaming/walimpls/impls/wp/builder.go b/pkg/streaming/walimpls/impls/wp/builder.go index 3cd47dd522..63776c38b6 100644 --- a/pkg/streaming/walimpls/impls/wp/builder.go +++ b/pkg/streaming/walimpls/impls/wp/builder.go @@ -104,6 +104,7 @@ func (b *builderImpl) setCustomWpConfig(wpConfig *config.Configuration, cfg *par wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxFlushSize = cfg.FlushMaxSize.GetAsSize() wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxFlushThreads = cfg.FlushMaxThreads.GetAsInt() wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.RetryInterval = int(cfg.RetryInterval.GetAsDurationByParse().Milliseconds()) + wpConfig.Woodpecker.Logstore.LogFileCompactionPolicy.MaxBytes = cfg.CompactionSize.GetAsSize() wpConfig.Woodpecker.Logstore.FragmentManager.MaxBytes = cfg.FragmentCachedMaxBytes.GetAsSize() wpConfig.Woodpecker.Logstore.FragmentManager.MaxInterval = int(cfg.FragmentCachedInterval.GetAsDurationByParse().Milliseconds()) // storage diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 83b5b75797..a0300dd276 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -681,6 +681,7 @@ type WoodpeckerConfig struct { RetryInterval ParamItem `refreshable:"true"` FlushMaxSize ParamItem `refreshable:"true"` FlushMaxThreads ParamItem `refreshable:"true"` + CompactionSize ParamItem `refreshable:"true"` FragmentCachedMaxBytes ParamItem `refreshable:"true"` FragmentCachedInterval ParamItem `refreshable:"true"` @@ -729,8 +730,8 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.SegmentRollingMaxSize = ParamItem{ Key: "woodpecker.client.segmentRollingPolicy.maxSize", Version: "2.6.0", - DefaultValue: "2GB", - Doc: "Maximum entries count of a segment, default is 2GB", + DefaultValue: "128M", + Doc: "Maximum entries count of a segment, default is 128M", Export: true, } p.SegmentRollingMaxSize.Init(base.mgr) @@ -765,7 +766,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.SyncMaxEntries = ParamItem{ Key: "woodpecker.logstore.logFileSyncPolicy.maxEntries", Version: "2.6.0", - DefaultValue: "100000", + DefaultValue: "10000", Doc: "Maximum entries number of write buffer.", Export: true, } @@ -774,7 +775,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { p.SyncMaxBytes = ParamItem{ Key: "woodpecker.logstore.logFileSyncPolicy.maxBytes", Version: "2.6.0", - DefaultValue: "64M", + DefaultValue: "32M", Doc: "Maximum size of write buffer in bytes.", Export: true, } @@ -816,10 +817,19 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) { } p.FlushMaxThreads.Init(base.mgr) + p.CompactionSize = ParamItem{ + Key: "woodpecker.logstore.logFileCompactionPolicy.maxSize", + Version: "2.6.0", + DefaultValue: "8M", + Doc: "The maximum size of the merged files, default is 8M.", + Export: true, + } + p.CompactionSize.Init(base.mgr) + p.FragmentCachedMaxBytes = ParamItem{ Key: "woodpecker.logstore.fragmentManager.maxBytes", Version: "2.6.0", - DefaultValue: "512M", + DefaultValue: "128M", Doc: "Maximum size of fragment cached data in bytes.", Export: true, } diff --git a/pkg/util/paramtable/service_param_test.go b/pkg/util/paramtable/service_param_test.go index b9e4f0bc37..7dfcbffac9 100644 --- a/pkg/util/paramtable/service_param_test.go +++ b/pkg/util/paramtable/service_param_test.go @@ -103,18 +103,19 @@ func TestServiceParam(t *testing.T) { assert.Equal(t, wpCfg.AppendQueueSize.GetAsInt(), 10000) assert.Equal(t, wpCfg.AppendMaxRetries.GetAsInt(), 3) - assert.Equal(t, wpCfg.SegmentRollingMaxSize.GetAsSize(), int64(2*1024*1024*1024)) + assert.Equal(t, wpCfg.SegmentRollingMaxSize.GetAsSize(), int64(128*1024*1024)) assert.Equal(t, wpCfg.SegmentRollingMaxTime.GetAsDurationByParse().Seconds(), float64(600)) assert.Equal(t, wpCfg.AuditorMaxInterval.GetAsDurationByParse().Seconds(), float64(10)) assert.Equal(t, wpCfg.SyncMaxInterval.GetAsDurationByParse().Milliseconds(), int64(200)) - assert.Equal(t, wpCfg.SyncMaxEntries.GetAsInt(), 100000) - assert.Equal(t, wpCfg.SyncMaxBytes.GetAsSize(), int64(64*1024*1024)) + assert.Equal(t, wpCfg.SyncMaxEntries.GetAsInt(), 10000) + assert.Equal(t, wpCfg.SyncMaxBytes.GetAsSize(), int64(32*1024*1024)) assert.Equal(t, wpCfg.FlushMaxRetries.GetAsInt(), 5) assert.Equal(t, wpCfg.FlushMaxSize.GetAsSize(), int64(8*1024*1024)) assert.Equal(t, wpCfg.FlushMaxThreads.GetAsInt(), 4) assert.Equal(t, wpCfg.RetryInterval.GetAsDurationByParse().Milliseconds(), int64(1000)) - assert.Equal(t, wpCfg.FragmentCachedMaxBytes.GetAsSize(), int64(512*1024*1024)) + assert.Equal(t, wpCfg.CompactionSize.GetAsSize(), int64(8*1024*1024)) + assert.Equal(t, wpCfg.FragmentCachedMaxBytes.GetAsSize(), int64(128*1024*1024)) assert.Equal(t, wpCfg.FragmentCachedInterval.GetAsDurationByParse().Milliseconds(), int64(1000)) assert.Equal(t, wpCfg.StorageType.GetValue(), "minio")