fix: resolve wp WALImpls concurrent read/write bug (#41763)

#41563 #41579 #41842 #41846 #41758
Upgraded the wp dependency to incorporate recent fixes addressing
multiple concurrency bugs in WALImpls.

Signed-off-by: tinswzy <zhenyuan.wei@zilliz.com>
This commit is contained in:
tinswzy 2025-05-16 12:02:27 +08:00 committed by GitHub
parent d3fff1769e
commit 4edb1bc6f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 91 additions and 47 deletions

View File

@ -187,24 +187,24 @@ woodpecker:
queueSize: 10000 # The size of the queue for pending messages to be sent of each log.
maxRetries: 3 # Maximum number of retries for segment append operations.
segmentRollingPolicy:
maxSize: 2000000000 # Maximum entries count of a segment, default is 2GB
maxInterval: 600 # Maximum interval between two segments in seconds, default is 10 minutes.
maxSize: 2GB # Maximum entries count of a segment, default is 2GB
maxInterval: 10m # Maximum interval between two segments, default is 10 minutes.
auditor:
maxInterval: 10 # Maximum interval between two auditing operations in seconds, default is 10 seconds.
maxInterval: 10s # Maximum interval between two auditing operations, default is 10 seconds.
logstore:
logFileSyncPolicy:
maxInterval: 1000 # Maximum interval between two sync operations in milliseconds.
maxBytes: 64000000 # Maximum size of write buffer in bytes.
maxInterval: 200ms # Maximum interval between two sync operations, default is 200 milliseconds.
maxBytes: 64M # Maximum size of write buffer in bytes.
maxEntries: 100000 # Maximum entries number of write buffer.
maxFlushRetries: 5 # Maximum size of write buffer in bytes.
retryInterval: 1000 # Maximum interval between two retries in milliseconds.
maxFlushSize: 8000000 # Maximum size of a fragment in bytes to flush, default is 8M.
retryInterval: 1000ms # Maximum interval between two retries. default is 1000 milliseconds.
maxFlushSize: 8M # Maximum size of a fragment in bytes to flush, default is 8M.
maxFlushThreads: 4 # Maximum number of threads to flush data
fragmentManager:
maxBytes: 1000000000 # Maximum size of fragment cached data in bytes.
maxInterval: 1000 # Maximum interval between two fragment evicts in milliseconds.
maxBytes: 512M # Maximum size of fragment cached data in bytes.
maxInterval: 1s # Maximum interval between two fragment evicts. default is 1 second
storage:
type: minio # The Type of the storage provider. Valid values: [default, minio, local, service], default is minio.
type: minio # The Type of the storage provider. Valid values: [minio, local]
rootPath: /var/lib/milvus/woodpecker # The root path of the storage provider.
# Related configuration of pulsar, used to manage Milvus logs of recent mutation operations, output streaming log, and provide log publish-subscribe services.

2
go.mod
View File

@ -243,7 +243,7 @@ require (
github.com/x448/float16 v0.8.4 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
github.com/zilliztech/woodpecker v0.0.0-20250514005855-9467e66ea2bc // indirect
github.com/zilliztech/woodpecker v0.0.0-20250515090344-a7889b2d072c // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
go.etcd.io/etcd/client/v2 v2.305.5 // indirect

4
go.sum
View File

@ -1066,8 +1066,8 @@ github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
github.com/zilliztech/woodpecker v0.0.0-20250514005855-9467e66ea2bc h1:9KEOCnDt//GAimP3Z3Qh08VwPY7H9AOOjHx9C9ckMSQ=
github.com/zilliztech/woodpecker v0.0.0-20250514005855-9467e66ea2bc/go.mod h1:MLt2hsMXd5bVOykwZyWXYHsy9kN4C2gQEaCrID5rM1w=
github.com/zilliztech/woodpecker v0.0.0-20250515090344-a7889b2d072c h1:OBTsouB6tIUMFQhu4wPS5/HQkgMdaAmS0ZOUoToltXI=
github.com/zilliztech/woodpecker v0.0.0-20250515090344-a7889b2d072c/go.mod h1:MLt2hsMXd5bVOykwZyWXYHsy9kN4C2gQEaCrID5rM1w=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=

View File

@ -39,7 +39,7 @@ require (
github.com/tikv/client-go/v2 v2.0.4
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/x448/float16 v0.8.4
github.com/zilliztech/woodpecker v0.0.0-20250514005855-9467e66ea2bc
github.com/zilliztech/woodpecker v0.0.0-20250515090344-a7889b2d072c
go.etcd.io/etcd/api/v3 v3.5.5
go.etcd.io/etcd/client/v3 v3.5.5
go.etcd.io/etcd/server/v3 v3.5.5

View File

@ -824,8 +824,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/zilliztech/woodpecker v0.0.0-20250514005855-9467e66ea2bc h1:9KEOCnDt//GAimP3Z3Qh08VwPY7H9AOOjHx9C9ckMSQ=
github.com/zilliztech/woodpecker v0.0.0-20250514005855-9467e66ea2bc/go.mod h1:MLt2hsMXd5bVOykwZyWXYHsy9kN4C2gQEaCrID5rM1w=
github.com/zilliztech/woodpecker v0.0.0-20250515090344-a7889b2d072c h1:OBTsouB6tIUMFQhu4wPS5/HQkgMdaAmS0ZOUoToltXI=
github.com/zilliztech/woodpecker v0.0.0-20250515090344-a7889b2d072c/go.mod h1:MLt2hsMXd5bVOykwZyWXYHsy9kN4C2gQEaCrID5rM1w=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=

View File

@ -6,12 +6,14 @@ import (
"github.com/minio/minio-go/v7"
"github.com/zilliztech/woodpecker/common/config"
wpMetrics "github.com/zilliztech/woodpecker/common/metrics"
wpMinioHandler "github.com/zilliztech/woodpecker/common/minio"
"github.com/zilliztech/woodpecker/woodpecker"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/objectstorage"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
@ -67,6 +69,7 @@ func (b *builderImpl) Build() (walimpls.OpenerImpls, error) {
return nil, err
}
log.Ctx(context.Background()).Info("build wp opener finish", zap.String("wpClientInstance", fmt.Sprintf("%p", wpClient)))
wpMetrics.RegisterWoodpeckerWithRegisterer(metrics.GetRegisterer())
return &openerImpl{
c: wpClient,
}, nil
@ -88,21 +91,21 @@ func (b *builderImpl) setCustomWpConfig(wpConfig *config.Configuration, cfg *par
// set the rootPath as the prefix for wp object storage
wpConfig.Woodpecker.Meta.Prefix = fmt.Sprintf("%s/wp", paramtable.Get().EtcdCfg.RootPath.GetValue())
// logClient
wpConfig.Woodpecker.Client.Auditor.MaxInterval = cfg.AuditorMaxInterval.GetAsInt()
wpConfig.Woodpecker.Client.Auditor.MaxInterval = int(cfg.AuditorMaxInterval.GetAsDurationByParse().Seconds())
wpConfig.Woodpecker.Client.SegmentAppend.MaxRetries = cfg.AppendMaxRetries.GetAsInt()
wpConfig.Woodpecker.Client.SegmentAppend.QueueSize = cfg.AppendQueueSize.GetAsInt()
wpConfig.Woodpecker.Client.SegmentRollingPolicy.MaxSize = cfg.SegmentRollingMaxSize.GetAsSize()
wpConfig.Woodpecker.Client.SegmentRollingPolicy.MaxInterval = cfg.SegmentRollingMaxTime.GetAsInt()
wpConfig.Woodpecker.Client.SegmentRollingPolicy.MaxInterval = int(cfg.SegmentRollingMaxTime.GetAsDurationByParse().Seconds())
// logStore
wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxInterval = cfg.SyncMaxInterval.GetAsInt()
wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxInterval = int(cfg.SyncMaxInterval.GetAsDurationByParse().Milliseconds())
wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxEntries = cfg.SyncMaxEntries.GetAsInt()
wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxBytes = cfg.SyncMaxBytes.GetAsSize()
wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxFlushRetries = cfg.FlushMaxRetries.GetAsInt()
wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxFlushSize = cfg.FlushMaxSize.GetAsSize()
wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.MaxFlushThreads = cfg.FlushMaxThreads.GetAsInt()
wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.RetryInterval = cfg.RetryInterval.GetAsInt()
wpConfig.Woodpecker.Logstore.LogFileSyncPolicy.RetryInterval = int(cfg.RetryInterval.GetAsDurationByParse().Milliseconds())
wpConfig.Woodpecker.Logstore.FragmentManager.MaxBytes = cfg.FragmentCachedMaxBytes.GetAsSize()
wpConfig.Woodpecker.Logstore.FragmentManager.MaxInterval = cfg.FragmentCachedInterval.GetAsInt()
wpConfig.Woodpecker.Logstore.FragmentManager.MaxInterval = int(cfg.FragmentCachedInterval.GetAsDurationByParse().Milliseconds())
// storage
wpConfig.Woodpecker.Storage.Type = cfg.StorageType.GetValue()
wpConfig.Woodpecker.Storage.RootPath = cfg.RootPath.GetValue()
@ -166,7 +169,7 @@ func (b *builderImpl) getEtcdClient(ctx context.Context) (*clientv3.Client, erro
etcdConfig.EtcdTLSCACert.GetValue(),
etcdConfig.EtcdTLSMinVersion.GetValue())
if err != nil {
log.Warn("Woodpecker walimpls connect to etcd failed", zap.Error(err))
log.Warn("Woodpecker create connection to etcd failed", zap.Error(err))
return nil, err
}
return etcdCli, nil

View File

@ -3,6 +3,8 @@ package wp
import (
"context"
"github.com/cockroachdb/errors"
"github.com/zilliztech/woodpecker/common/werr"
wp "github.com/zilliztech/woodpecker/woodpecker/log"
"go.uber.org/zap"
@ -37,6 +39,10 @@ func (w *walImpl) Append(ctx context.Context, msg message.MutableMessage) (messa
},
)
if r.Err != nil {
if werr.ErrWriterLockLost.Is(r.Err) {
w.Log().RatedWarn(1, "wp writer fenced", zap.Error(r.Err))
return nil, errors.Mark(r.Err, walimpls.ErrFenced)
}
w.Log().RatedWarn(1, "write message to woodpecker failed", zap.Error(r.Err))
return nil, r.Err
}

View File

@ -1,9 +1,11 @@
package wp
import (
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/woodpecker/woodpecker"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
@ -27,5 +29,38 @@ func TestRegistry(t *testing.T) {
}
func TestWAL(t *testing.T) {
walimpls.NewWALImplsTestFramework(t, 100, &builderImpl{}).Run()
tmpDir := t.TempDir()
rootPath := filepath.Join(tmpDir, "TestWpWAL")
testCases := []struct {
name string
storageType string
rootPath string
}{
{
name: "LocalFsStorage",
storageType: "local",
rootPath: rootPath,
},
{
name: "ObjectStorage",
storageType: "minio", // Using default storage type minio-compatible
rootPath: "", // No need to specify path for this storage
},
}
wpBackendTypeKey := paramtable.Get().WoodpeckerCfg.StorageType.Key
wpBackendRootPathKey := paramtable.Get().WoodpeckerCfg.RootPath.Key
logLevelKey := paramtable.Get().LogCfg.Level.Key
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := paramtable.Get().Save(wpBackendTypeKey, tc.storageType)
assert.NoError(t, err)
err = paramtable.Get().Save(wpBackendRootPathKey, tc.rootPath)
assert.NoError(t, err)
err = paramtable.Get().Save(logLevelKey, "debug")
assert.NoError(t, err)
walimpls.NewWALImplsTestFramework(t, 100, &builderImpl{}).Run()
stopEmbedLogStoreErr := woodpecker.StopEmbedLogStore()
assert.NoError(t, stopEmbedLogStoreErr)
})
}
}

View File

@ -729,7 +729,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) {
p.SegmentRollingMaxSize = ParamItem{
Key: "woodpecker.client.segmentRollingPolicy.maxSize",
Version: "2.6.0",
DefaultValue: "2000000000", // 1 GB
DefaultValue: "2GB",
Doc: "Maximum entries count of a segment, default is 2GB",
Export: true,
}
@ -738,8 +738,8 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) {
p.SegmentRollingMaxTime = ParamItem{
Key: "woodpecker.client.segmentRollingPolicy.maxInterval",
Version: "2.6.0",
DefaultValue: "600",
Doc: "Maximum interval between two segments in seconds, default is 10 minutes.",
DefaultValue: "10m",
Doc: "Maximum interval between two segments, default is 10 minutes.",
Export: true,
}
p.SegmentRollingMaxTime.Init(base.mgr)
@ -747,8 +747,8 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) {
p.AuditorMaxInterval = ParamItem{
Key: "woodpecker.client.auditor.maxInterval",
Version: "2.6.0",
DefaultValue: "10",
Doc: "Maximum interval between two auditing operations in seconds, default is 10 seconds.",
DefaultValue: "10s",
Doc: "Maximum interval between two auditing operations, default is 10 seconds.",
Export: true,
}
p.AuditorMaxInterval.Init(base.mgr)
@ -756,8 +756,8 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) {
p.SyncMaxInterval = ParamItem{
Key: "woodpecker.logstore.logFileSyncPolicy.maxInterval",
Version: "2.6.0",
DefaultValue: "1000",
Doc: "Maximum interval between two sync operations in milliseconds.",
DefaultValue: "200ms",
Doc: "Maximum interval between two sync operations, default is 200 milliseconds.",
Export: true,
}
p.SyncMaxInterval.Init(base.mgr)
@ -774,7 +774,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) {
p.SyncMaxBytes = ParamItem{
Key: "woodpecker.logstore.logFileSyncPolicy.maxBytes",
Version: "2.6.0",
DefaultValue: "64000000",
DefaultValue: "64M",
Doc: "Maximum size of write buffer in bytes.",
Export: true,
}
@ -792,7 +792,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) {
p.FlushMaxSize = ParamItem{
Key: "woodpecker.logstore.logFileSyncPolicy.maxFlushSize",
Version: "2.6.0",
DefaultValue: "8000000",
DefaultValue: "8M",
Doc: "Maximum size of a fragment in bytes to flush, default is 8M.",
Export: true,
}
@ -801,8 +801,8 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) {
p.RetryInterval = ParamItem{
Key: "woodpecker.logstore.logFileSyncPolicy.retryInterval",
Version: "2.6.0",
DefaultValue: "1000",
Doc: "Maximum interval between two retries in milliseconds.",
DefaultValue: "1000ms",
Doc: "Maximum interval between two retries. default is 1000 milliseconds.",
Export: true,
}
p.RetryInterval.Init(base.mgr)
@ -819,7 +819,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) {
p.FragmentCachedMaxBytes = ParamItem{
Key: "woodpecker.logstore.fragmentManager.maxBytes",
Version: "2.6.0",
DefaultValue: "1000000000",
DefaultValue: "512M",
Doc: "Maximum size of fragment cached data in bytes.",
Export: true,
}
@ -828,8 +828,8 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) {
p.FragmentCachedInterval = ParamItem{
Key: "woodpecker.logstore.fragmentManager.maxInterval",
Version: "2.6.0",
DefaultValue: "1000",
Doc: "Maximum interval between two fragment evicts in milliseconds.",
DefaultValue: "1s",
Doc: "Maximum interval between two fragment evicts. default is 1 second",
Export: true,
}
p.FragmentCachedInterval.Init(base.mgr)
@ -838,7 +838,7 @@ func (p *WoodpeckerConfig) Init(base *BaseTable) {
Key: "woodpecker.storage.type",
Version: "2.6.0",
DefaultValue: "minio",
Doc: "The Type of the storage provider. Valid values: [default, minio, local, service], default is minio.",
Doc: "The Type of the storage provider. Valid values: [minio, local]",
Export: true,
}
p.StorageType.Init(base.mgr)

View File

@ -103,19 +103,19 @@ func TestServiceParam(t *testing.T) {
assert.Equal(t, wpCfg.AppendQueueSize.GetAsInt(), 10000)
assert.Equal(t, wpCfg.AppendMaxRetries.GetAsInt(), 3)
assert.Equal(t, wpCfg.SegmentRollingMaxSize.GetAsInt(), 2000000000)
assert.Equal(t, wpCfg.SegmentRollingMaxTime.GetAsInt(), 600)
assert.Equal(t, wpCfg.AuditorMaxInterval.GetAsInt(), 10)
assert.Equal(t, wpCfg.SegmentRollingMaxSize.GetAsSize(), int64(2*1024*1024*1024))
assert.Equal(t, wpCfg.SegmentRollingMaxTime.GetAsDurationByParse().Seconds(), float64(600))
assert.Equal(t, wpCfg.AuditorMaxInterval.GetAsDurationByParse().Seconds(), float64(10))
assert.Equal(t, wpCfg.SyncMaxInterval.GetAsInt(), 1000)
assert.Equal(t, wpCfg.SyncMaxInterval.GetAsDurationByParse().Milliseconds(), int64(200))
assert.Equal(t, wpCfg.SyncMaxEntries.GetAsInt(), 100000)
assert.Equal(t, wpCfg.SyncMaxBytes.GetAsInt(), 64000000)
assert.Equal(t, wpCfg.SyncMaxBytes.GetAsSize(), int64(64*1024*1024))
assert.Equal(t, wpCfg.FlushMaxRetries.GetAsInt(), 5)
assert.Equal(t, wpCfg.FlushMaxSize.GetAsInt(), 8000000)
assert.Equal(t, wpCfg.FlushMaxSize.GetAsSize(), int64(8*1024*1024))
assert.Equal(t, wpCfg.FlushMaxThreads.GetAsInt(), 4)
assert.Equal(t, wpCfg.RetryInterval.GetAsInt(), 1000)
assert.Equal(t, wpCfg.FragmentCachedMaxBytes.GetAsInt64(), int64(1000000000))
assert.Equal(t, wpCfg.FragmentCachedInterval.GetAsInt(), 1000)
assert.Equal(t, wpCfg.RetryInterval.GetAsDurationByParse().Milliseconds(), int64(1000))
assert.Equal(t, wpCfg.FragmentCachedMaxBytes.GetAsSize(), int64(512*1024*1024))
assert.Equal(t, wpCfg.FragmentCachedInterval.GetAsDurationByParse().Milliseconds(), int64(1000))
assert.Equal(t, wpCfg.StorageType.GetValue(), "minio")
assert.Equal(t, wpCfg.RootPath.GetValue(), "/var/lib/milvus/woodpecker")