diff --git a/go.mod b/go.mod index 0a4c49b033..b99a957372 100644 --- a/go.mod +++ b/go.mod @@ -243,7 +243,7 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - github.com/zilliztech/woodpecker v0.0.0-20250418010644-1a9ae136fa65 // indirect + github.com/zilliztech/woodpecker v0.0.0-20250427123625-654f0175eff0 // indirect go.etcd.io/bbolt v1.3.6 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect go.etcd.io/etcd/client/v2 v2.305.5 // indirect diff --git a/go.sum b/go.sum index 29c7f8e762..1bfaea7418 100644 --- a/go.sum +++ b/go.sum @@ -1065,8 +1065,8 @@ github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= -github.com/zilliztech/woodpecker v0.0.0-20250418010644-1a9ae136fa65 h1:Te+TxCQVisH/ntsulwDlO77s3PuJhj//ok9xnaGupZo= -github.com/zilliztech/woodpecker v0.0.0-20250418010644-1a9ae136fa65/go.mod h1:MLt2hsMXd5bVOykwZyWXYHsy9kN4C2gQEaCrID5rM1w= +github.com/zilliztech/woodpecker v0.0.0-20250427123625-654f0175eff0 h1:6B7IUyTRarQVTvusRS0bs6aJn3tUTVTIVqPEOj5IQHM= +github.com/zilliztech/woodpecker v0.0.0-20250427123625-654f0175eff0/go.mod h1:MLt2hsMXd5bVOykwZyWXYHsy9kN4C2gQEaCrID5rM1w= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= diff --git a/pkg/go.mod b/pkg/go.mod index 92eefe7b96..bf1733d8af 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -39,7 +39,7 @@ require ( github.com/tikv/client-go/v2 v2.0.4 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/x448/float16 v0.8.4 - github.com/zilliztech/woodpecker v0.0.0-20250418010644-1a9ae136fa65 + github.com/zilliztech/woodpecker v0.0.0-20250427123625-654f0175eff0 go.etcd.io/etcd/api/v3 v3.5.5 go.etcd.io/etcd/client/v3 v3.5.5 go.etcd.io/etcd/server/v3 v3.5.5 diff --git a/pkg/go.sum b/pkg/go.sum index f9e4f17b3c..8f4a521647 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -823,8 +823,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -github.com/zilliztech/woodpecker v0.0.0-20250418010644-1a9ae136fa65 h1:Te+TxCQVisH/ntsulwDlO77s3PuJhj//ok9xnaGupZo= -github.com/zilliztech/woodpecker v0.0.0-20250418010644-1a9ae136fa65/go.mod h1:MLt2hsMXd5bVOykwZyWXYHsy9kN4C2gQEaCrID5rM1w= +github.com/zilliztech/woodpecker v0.0.0-20250427123625-654f0175eff0 h1:6B7IUyTRarQVTvusRS0bs6aJn3tUTVTIVqPEOj5IQHM= +github.com/zilliztech/woodpecker v0.0.0-20250427123625-654f0175eff0/go.mod h1:MLt2hsMXd5bVOykwZyWXYHsy9kN4C2gQEaCrID5rM1w= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= diff --git a/pkg/mocks/streaming/mock_walimpls/mock_WALImpls.go b/pkg/mocks/streaming/mock_walimpls/mock_WALImpls.go index 9f627489a2..00c7af4e54 100644 --- a/pkg/mocks/streaming/mock_walimpls/mock_WALImpls.go +++ b/pkg/mocks/streaming/mock_walimpls/mock_WALImpls.go @@ -221,6 +221,53 @@ func (_c *MockWALImpls_Read_Call) RunAndReturn(run func(context.Context, walimpl return _c } +// Truncate provides a mock function with given fields: ctx, id +func (_m *MockWALImpls) Truncate(ctx context.Context, id message.MessageID) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for Truncate") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, message.MessageID) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockWALImpls_Truncate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Truncate' +type MockWALImpls_Truncate_Call struct { + *mock.Call +} + +// Truncate is a helper method to define mock.On call +// - ctx context.Context +// - id message.MessageID +func (_e *MockWALImpls_Expecter) Truncate(ctx interface{}, id interface{}) *MockWALImpls_Truncate_Call { + return &MockWALImpls_Truncate_Call{Call: _e.mock.On("Truncate", ctx, id)} +} + +func (_c *MockWALImpls_Truncate_Call) Run(run func(ctx context.Context, id message.MessageID)) *MockWALImpls_Truncate_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(message.MessageID)) + }) + return _c +} + +func (_c *MockWALImpls_Truncate_Call) Return(_a0 error) *MockWALImpls_Truncate_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWALImpls_Truncate_Call) RunAndReturn(run func(context.Context, message.MessageID) error) *MockWALImpls_Truncate_Call { + _c.Call.Return(run) + return _c +} + // WALName provides a mock function with no fields func (_m *MockWALImpls) WALName() string { ret := _m.Called() diff --git a/pkg/streaming/walimpls/impls/kafka/wal.go b/pkg/streaming/walimpls/impls/kafka/wal.go index 48e1233fc2..dd63202ff6 100644 --- a/pkg/streaming/walimpls/impls/kafka/wal.go +++ b/pkg/streaming/walimpls/impls/kafka/wal.go @@ -103,6 +103,10 @@ func (w *walImpl) Read(ctx context.Context, opt walimpls.ReadOption) (s walimpls return newScanner(opt.Name, exclude, c), nil } +func (w *walImpl) Truncate(ctx context.Context, id message.MessageID) error { + return nil +} + func (w *walImpl) Close() { // The lifetime control of the producer is delegated to the wal adaptor. // So we just make resource cleanup here. diff --git a/pkg/streaming/walimpls/impls/pulsar/wal.go b/pkg/streaming/walimpls/impls/pulsar/wal.go index 5aefb712e0..0857932072 100644 --- a/pkg/streaming/walimpls/impls/pulsar/wal.go +++ b/pkg/streaming/walimpls/impls/pulsar/wal.go @@ -76,6 +76,10 @@ func (w *walImpl) Read(ctx context.Context, opt walimpls.ReadOption) (s walimpls return newScanner(opt.Name, reader), nil } +func (w *walImpl) Truncate(ctx context.Context, id message.MessageID) error { + return nil +} + func (w *walImpl) Close() { if w.p != nil { w.p.Close() // close producer diff --git a/pkg/streaming/walimpls/impls/rmq/wal.go b/pkg/streaming/walimpls/impls/rmq/wal.go index 0c841baea4..a00e84bfac 100644 --- a/pkg/streaming/walimpls/impls/rmq/wal.go +++ b/pkg/streaming/walimpls/impls/rmq/wal.go @@ -104,6 +104,10 @@ func (w *walImpl) Read(ctx context.Context, opt walimpls.ReadOption) (s walimpls return newScanner(scannerName, exclude, consumer), nil } +func (w *walImpl) Truncate(ctx context.Context, id message.MessageID) error { + return nil +} + // Close closes the wal. func (w *walImpl) Close() { if w.p != nil { diff --git a/pkg/streaming/walimpls/impls/walimplstest/wal.go b/pkg/streaming/walimpls/impls/walimplstest/wal.go index 6a6a7868ef..8b3e85f5e6 100644 --- a/pkg/streaming/walimpls/impls/walimplstest/wal.go +++ b/pkg/streaming/walimpls/impls/walimplstest/wal.go @@ -56,5 +56,9 @@ func (w *walImpls) Read(ctx context.Context, opts walimpls.ReadOption) (walimpls ), nil } +func (w *walImpls) Truncate(ctx context.Context, id message.MessageID) error { + return nil +} + func (w *walImpls) Close() { } diff --git a/pkg/streaming/walimpls/impls/wp/wal.go b/pkg/streaming/walimpls/impls/wp/wal.go index 65656d9c09..95cd0fdc7c 100644 --- a/pkg/streaming/walimpls/impls/wp/wal.go +++ b/pkg/streaming/walimpls/impls/wp/wal.go @@ -62,13 +62,17 @@ func (w *walImpl) Read(ctx context.Context, opt walimpls.ReadOption) (walimpls.S from.EntryId = id.logMsgId.EntryId + 1 } - reader, err := w.l.OpenLogReader(ctx, &from) + reader, err := w.l.OpenLogReader(ctx, &from, opt.Name) if err != nil { return nil, err } return newScanner(opt.Name, reader), nil } +func (w *walImpl) Truncate(ctx context.Context, id message.MessageID) error { + return w.l.Truncate(ctx, id.(*wpID).logMsgId) +} + func (w *walImpl) Close() { closeWriterErr := w.p.Close(context.Background()) if closeWriterErr != nil { diff --git a/pkg/streaming/walimpls/wal.go b/pkg/streaming/walimpls/wal.go index 85bf74494d..236c45cbba 100644 --- a/pkg/streaming/walimpls/wal.go +++ b/pkg/streaming/walimpls/wal.go @@ -30,4 +30,7 @@ type WALImpls interface { // Append writes a record to the log. // Can be only called when the wal is in read-write mode. Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) + + // Truncate truncates the wal to the given id (inclusive). + Truncate(ctx context.Context, id message.MessageID) error }