diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 4a5c83fae2..6b1cbd87f0 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -41,7 +41,6 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/optimizers" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/common" @@ -87,6 +86,10 @@ type ShardDelegator interface { VerifyExcludedSegments(segmentID int64, ts uint64) bool TryCleanExcludedSegments(ts uint64) + // tsafe + UpdateTSafe(ts uint64) + GetTSafe() uint64 + // control Serviceable() bool Start() @@ -111,7 +114,6 @@ type shardDelegator struct { distribution *distribution segmentManager segments.SegmentManager - tsafeManager tsafe.Manager pkOracle pkoracle.PkOracle level0Mut sync.RWMutex // stream delete buffer @@ -751,36 +753,9 @@ func (sd *shardDelegator) waitTSafe(ctx context.Context, ts uint64) (uint64, err } } -// watchTSafe is the worker function to update serviceable timestamp. -func (sd *shardDelegator) watchTSafe() { - defer sd.lifetime.Done() - listener := sd.tsafeManager.WatchChannel(sd.vchannelName) - sd.updateTSafe() - log := sd.getLogger(context.Background()) - for { - select { - case _, ok := <-listener.On(): - if !ok { - // listener close - log.Warn("tsafe listener closed") - return - } - sd.updateTSafe() - case <-sd.lifetime.CloseCh(): - log.Info("updateTSafe quit") - // shard delegator closed - return - } - } -} - // updateTSafe read current tsafe value from tsafeManager. -func (sd *shardDelegator) updateTSafe() { +func (sd *shardDelegator) UpdateTSafe(tsafe uint64) { sd.tsCond.L.Lock() - tsafe, err := sd.tsafeManager.Get(sd.vchannelName) - if err != nil { - log.Warn("tsafeManager failed to get lastest", zap.Error(err)) - } if tsafe > sd.latestTsafe.Load() { sd.latestTsafe.Store(tsafe) sd.tsCond.Broadcast() @@ -788,6 +763,10 @@ func (sd *shardDelegator) updateTSafe() { sd.tsCond.L.Unlock() } +func (sd *shardDelegator) GetTSafe() uint64 { + return sd.latestTsafe.Load() +} + // Close closes the delegator. func (sd *shardDelegator) Close() { sd.lifetime.SetState(lifetime.Stopped) @@ -849,7 +828,7 @@ func (sd *shardDelegator) loadPartitionStats(ctx context.Context, partStatsVersi // NewShardDelegator creates a new ShardDelegator instance with all fields initialized. func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID UniqueID, channel string, version int64, - workerManager cluster.Manager, manager *segments.Manager, tsafeManager tsafe.Manager, loader segments.Loader, + workerManager cluster.Manager, manager *segments.Manager, loader segments.Loader, factory msgstream.Factory, startTs uint64, queryHook optimizers.QueryHook, chunkManager storage.ChunkManager, ) (ShardDelegator, error) { log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID), @@ -885,7 +864,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock, []string{fmt.Sprint(paramtable.GetNodeID()), channel}), pkOracle: pkoracle.NewPkOracle(), - tsafeManager: tsafeManager, latestTsafe: atomic.NewUint64(startTs), loader: loader, factory: factory, @@ -898,9 +876,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni } m := sync.Mutex{} sd.tsCond = sync.NewCond(&m) - if sd.lifetime.Add(lifetime.NotStopped) == nil { - go sd.watchTSafe() - } log.Info("finish build new shardDelegator") return sd, nil } diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 407de0effd..55c9e04fe9 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -39,7 +39,6 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/cluster" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/internal/util/initcore" @@ -62,7 +61,6 @@ type DelegatorDataSuite struct { version int64 workerManager *cluster.MockManager manager *segments.Manager - tsafeManager tsafe.Manager loader *segments.MockLoader mq *msgstream.MockMsgStream channel metautil.Channel @@ -98,7 +96,6 @@ func (s *DelegatorDataSuite) TearDownSuite() { func (s *DelegatorDataSuite) SetupTest() { s.workerManager = &cluster.MockManager{} s.manager = segments.NewManager() - s.tsafeManager = tsafe.NewTSafeReplica() s.loader = &segments.MockLoader{} // init schema @@ -159,7 +156,7 @@ func (s *DelegatorDataSuite) SetupTest() { s.rootPath = s.Suite.T().Name() chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath) s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background()) - delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{ + delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.loader, &msgstream.MockMqFactory{ NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) { return s.mq, nil }, @@ -654,7 +651,6 @@ func (s *DelegatorDataSuite) TestLoadSegments() { s.version, s.workerManager, s.manager, - s.tsafeManager, s.loader, &msgstream.MockMqFactory{ NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) { diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index 2dcd9ac5e0..982164e2ff 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -19,17 +19,13 @@ package delegator import ( "context" "io" - "sync" "testing" "time" "github.com/cockroachdb/errors" "github.com/samber/lo" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "go.uber.org/atomic" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" @@ -39,13 +35,11 @@ import ( "github.com/milvus-io/milvus/internal/proto/segcorepb" "github.com/milvus-io/milvus/internal/querynodev2/cluster" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/commonpbutil" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metric" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -61,7 +55,6 @@ type DelegatorSuite struct { version int64 workerManager *cluster.MockManager manager *segments.Manager - tsafeManager tsafe.Manager loader *segments.MockLoader mq *msgstream.MockMsgStream @@ -85,7 +78,6 @@ func (s *DelegatorSuite) SetupTest() { s.version = 2000 s.workerManager = &cluster.MockManager{} s.manager = segments.NewManager() - s.tsafeManager = tsafe.NewTSafeReplica() s.loader = &segments.MockLoader{} s.loader.EXPECT(). Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything). @@ -165,7 +157,7 @@ func (s *DelegatorSuite) SetupTest() { var err error // s.delegator, err = NewShardDelegator(s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader) - s.delegator, err = NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{ + s.delegator, err = NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.loader, &msgstream.MockMqFactory{ NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) { return s.mq, nil }, @@ -1120,72 +1112,3 @@ func (s *DelegatorSuite) TestGetStats() { func TestDelegatorSuite(t *testing.T) { suite.Run(t, new(DelegatorSuite)) } - -func TestDelegatorWatchTsafe(t *testing.T) { - channelName := "default_dml_channel" - - tsafeManager := tsafe.NewTSafeReplica() - tsafeManager.Add(context.Background(), channelName, 100) - sd := &shardDelegator{ - tsafeManager: tsafeManager, - vchannelName: channelName, - lifetime: lifetime.NewLifetime(lifetime.Initializing), - latestTsafe: atomic.NewUint64(0), - } - defer sd.Close() - - m := sync.Mutex{} - sd.tsCond = sync.NewCond(&m) - if sd.lifetime.Add(lifetime.NotStopped) == nil { - go sd.watchTSafe() - } - - err := tsafeManager.Set(channelName, 200) - require.NoError(t, err) - - assert.Eventually(t, func() bool { - return sd.latestTsafe.Load() == 200 - }, time.Second*10, time.Millisecond*10) -} - -func TestDelegatorTSafeListenerClosed(t *testing.T) { - channelName := "default_dml_channel" - - tsafeManager := tsafe.NewTSafeReplica() - tsafeManager.Add(context.Background(), channelName, 100) - sd := &shardDelegator{ - tsafeManager: tsafeManager, - vchannelName: channelName, - lifetime: lifetime.NewLifetime(lifetime.Initializing), - latestTsafe: atomic.NewUint64(0), - } - defer sd.Close() - - m := sync.Mutex{} - sd.tsCond = sync.NewCond(&m) - signal := make(chan struct{}) - if sd.lifetime.Add(lifetime.NotStopped) == nil { - go func() { - sd.watchTSafe() - close(signal) - }() - } - - select { - case <-signal: - assert.FailNow(t, "watchTsafe quit unexpectedly") - case <-time.After(time.Millisecond * 10): - } - - tsafeManager.Remove(context.Background(), channelName) - - select { - case <-signal: - case <-time.After(time.Second): - assert.FailNow(t, "watchTsafe still working after listener closed") - } - - sd.Close() - assert.Equal(t, sd.Serviceable(), false) - assert.Equal(t, sd.Stopped(), true) -} diff --git a/internal/querynodev2/delegator/delta_forward_test.go b/internal/querynodev2/delegator/delta_forward_test.go index e9661a53bc..99c4822795 100644 --- a/internal/querynodev2/delegator/delta_forward_test.go +++ b/internal/querynodev2/delegator/delta_forward_test.go @@ -31,7 +31,6 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/cluster" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -51,7 +50,6 @@ type StreamingForwardSuite struct { version int64 workerManager *cluster.MockManager manager *segments.Manager - tsafeManager tsafe.Manager loader *segments.MockLoader mq *msgstream.MockMsgStream @@ -73,7 +71,6 @@ func (s *StreamingForwardSuite) SetupTest() { s.version = 2000 s.workerManager = &cluster.MockManager{} s.manager = segments.NewManager() - s.tsafeManager = tsafe.NewTSafeReplica() s.loader = &segments.MockLoader{} s.loader.EXPECT(). Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything). @@ -151,7 +148,7 @@ func (s *StreamingForwardSuite) SetupTest() { chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath) s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background()) - delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{ + delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.loader, &msgstream.MockMqFactory{ NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) { return s.mq, nil }, diff --git a/internal/querynodev2/delegator/mock_delegator.go b/internal/querynodev2/delegator/mock_delegator.go index 4cc49950a9..8249a46e59 100644 --- a/internal/querynodev2/delegator/mock_delegator.go +++ b/internal/querynodev2/delegator/mock_delegator.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package delegator @@ -97,6 +97,10 @@ func (_c *MockShardDelegator_Close_Call) RunAndReturn(run func()) *MockShardDele func (_m *MockShardDelegator) Collection() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Collection") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -138,6 +142,10 @@ func (_c *MockShardDelegator_Collection_Call) RunAndReturn(run func() int64) *Mo func (_m *MockShardDelegator) GetDeleteBufferSize() (int64, int64) { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetDeleteBufferSize") + } + var r0 int64 var r1 int64 if rf, ok := ret.Get(0).(func() (int64, int64)); ok { @@ -189,6 +197,10 @@ func (_c *MockShardDelegator_GetDeleteBufferSize_Call) RunAndReturn(run func() ( func (_m *MockShardDelegator) GetPartitionStatsVersions(ctx context.Context) map[int64]int64 { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for GetPartitionStatsVersions") + } + var r0 map[int64]int64 if rf, ok := ret.Get(0).(func(context.Context) map[int64]int64); ok { r0 = rf(ctx) @@ -233,6 +245,10 @@ func (_c *MockShardDelegator_GetPartitionStatsVersions_Call) RunAndReturn(run fu func (_m *MockShardDelegator) GetSegmentInfo(readable bool) ([]SnapshotItem, []SegmentEntry) { ret := _m.Called(readable) + if len(ret) == 0 { + panic("no return value specified for GetSegmentInfo") + } + var r0 []SnapshotItem var r1 []SegmentEntry if rf, ok := ret.Get(0).(func(bool) ([]SnapshotItem, []SegmentEntry)); ok { @@ -289,6 +305,10 @@ func (_c *MockShardDelegator_GetSegmentInfo_Call) RunAndReturn(run func(bool) ([ func (_m *MockShardDelegator) GetStatistics(ctx context.Context, req *querypb.GetStatisticsRequest) ([]*internalpb.GetStatisticsResponse, error) { ret := _m.Called(ctx, req) + if len(ret) == 0 { + panic("no return value specified for GetStatistics") + } + var r0 []*internalpb.GetStatisticsResponse var r1 error if rf, ok := ret.Get(0).(func(context.Context, *querypb.GetStatisticsRequest) ([]*internalpb.GetStatisticsResponse, error)); ok { @@ -340,10 +360,59 @@ func (_c *MockShardDelegator_GetStatistics_Call) RunAndReturn(run func(context.C return _c } +// GetTSafe provides a mock function with given fields: +func (_m *MockShardDelegator) GetTSafe() uint64 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetTSafe") + } + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + return r0 +} + +// MockShardDelegator_GetTSafe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTSafe' +type MockShardDelegator_GetTSafe_Call struct { + *mock.Call +} + +// GetTSafe is a helper method to define mock.On call +func (_e *MockShardDelegator_Expecter) GetTSafe() *MockShardDelegator_GetTSafe_Call { + return &MockShardDelegator_GetTSafe_Call{Call: _e.mock.On("GetTSafe")} +} + +func (_c *MockShardDelegator_GetTSafe_Call) Run(run func()) *MockShardDelegator_GetTSafe_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockShardDelegator_GetTSafe_Call) Return(_a0 uint64) *MockShardDelegator_GetTSafe_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockShardDelegator_GetTSafe_Call) RunAndReturn(run func() uint64) *MockShardDelegator_GetTSafe_Call { + _c.Call.Return(run) + return _c +} + // GetTargetVersion provides a mock function with given fields: func (_m *MockShardDelegator) GetTargetVersion() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetTargetVersion") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -385,6 +454,10 @@ func (_c *MockShardDelegator_GetTargetVersion_Call) RunAndReturn(run func() int6 func (_m *MockShardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error { ret := _m.Called(ctx, infos, version) + if len(ret) == 0 { + panic("no return value specified for LoadGrowing") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, []*querypb.SegmentLoadInfo, int64) error); ok { r0 = rf(ctx, infos, version) @@ -429,6 +502,10 @@ func (_c *MockShardDelegator_LoadGrowing_Call) RunAndReturn(run func(context.Con func (_m *MockShardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error { ret := _m.Called(ctx, req) + if len(ret) == 0 { + panic("no return value specified for LoadSegments") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *querypb.LoadSegmentsRequest) error); ok { r0 = rf(ctx, req) @@ -539,6 +616,10 @@ func (_c *MockShardDelegator_ProcessInsert_Call) RunAndReturn(run func(map[int64 func (_m *MockShardDelegator) Query(ctx context.Context, req *querypb.QueryRequest) ([]*internalpb.RetrieveResults, error) { ret := _m.Called(ctx, req) + if len(ret) == 0 { + panic("no return value specified for Query") + } + var r0 []*internalpb.RetrieveResults var r1 error if rf, ok := ret.Get(0).(func(context.Context, *querypb.QueryRequest) ([]*internalpb.RetrieveResults, error)); ok { @@ -594,6 +675,10 @@ func (_c *MockShardDelegator_Query_Call) RunAndReturn(run func(context.Context, func (_m *MockShardDelegator) QueryStream(ctx context.Context, req *querypb.QueryRequest, srv streamrpc.QueryStreamServer) error { ret := _m.Called(ctx, req, srv) + if len(ret) == 0 { + panic("no return value specified for QueryStream") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *querypb.QueryRequest, streamrpc.QueryStreamServer) error); ok { r0 = rf(ctx, req, srv) @@ -638,6 +723,10 @@ func (_c *MockShardDelegator_QueryStream_Call) RunAndReturn(run func(context.Con func (_m *MockShardDelegator) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error { ret := _m.Called(ctx, req, force) + if len(ret) == 0 { + panic("no return value specified for ReleaseSegments") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *querypb.ReleaseSegmentsRequest, bool) error); ok { r0 = rf(ctx, req, force) @@ -682,6 +771,10 @@ func (_c *MockShardDelegator_ReleaseSegments_Call) RunAndReturn(run func(context func (_m *MockShardDelegator) Search(ctx context.Context, req *querypb.SearchRequest) ([]*internalpb.SearchResults, error) { ret := _m.Called(ctx, req) + if len(ret) == 0 { + panic("no return value specified for Search") + } + var r0 []*internalpb.SearchResults var r1 error if rf, ok := ret.Get(0).(func(context.Context, *querypb.SearchRequest) ([]*internalpb.SearchResults, error)); ok { @@ -737,6 +830,10 @@ func (_c *MockShardDelegator_Search_Call) RunAndReturn(run func(context.Context, func (_m *MockShardDelegator) Serviceable() bool { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Serviceable") + } + var r0 bool if rf, ok := ret.Get(0).(func() bool); ok { r0 = rf() @@ -958,10 +1055,47 @@ func (_c *MockShardDelegator_TryCleanExcludedSegments_Call) RunAndReturn(run fun return _c } +// UpdateTSafe provides a mock function with given fields: ts +func (_m *MockShardDelegator) UpdateTSafe(ts uint64) { + _m.Called(ts) +} + +// MockShardDelegator_UpdateTSafe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateTSafe' +type MockShardDelegator_UpdateTSafe_Call struct { + *mock.Call +} + +// UpdateTSafe is a helper method to define mock.On call +// - ts uint64 +func (_e *MockShardDelegator_Expecter) UpdateTSafe(ts interface{}) *MockShardDelegator_UpdateTSafe_Call { + return &MockShardDelegator_UpdateTSafe_Call{Call: _e.mock.On("UpdateTSafe", ts)} +} + +func (_c *MockShardDelegator_UpdateTSafe_Call) Run(run func(ts uint64)) *MockShardDelegator_UpdateTSafe_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(uint64)) + }) + return _c +} + +func (_c *MockShardDelegator_UpdateTSafe_Call) Return() *MockShardDelegator_UpdateTSafe_Call { + _c.Call.Return() + return _c +} + +func (_c *MockShardDelegator_UpdateTSafe_Call) RunAndReturn(run func(uint64)) *MockShardDelegator_UpdateTSafe_Call { + _c.Call.Return(run) + return _c +} + // VerifyExcludedSegments provides a mock function with given fields: segmentID, ts func (_m *MockShardDelegator) VerifyExcludedSegments(segmentID int64, ts uint64) bool { ret := _m.Called(segmentID, ts) + if len(ret) == 0 { + panic("no return value specified for VerifyExcludedSegments") + } + var r0 bool if rf, ok := ret.Get(0).(func(int64, uint64) bool); ok { r0 = rf(segmentID, ts) @@ -1005,6 +1139,10 @@ func (_c *MockShardDelegator_VerifyExcludedSegments_Call) RunAndReturn(run func( func (_m *MockShardDelegator) Version() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Version") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index 151b4e03fb..f4f7d11d33 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -19,6 +19,7 @@ package querynodev2 import ( "context" "fmt" + "math" "github.com/samber/lo" @@ -58,7 +59,17 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error return nil, err } - minTsafeChannel, minTsafe := node.tSafeManager.Min() + minTsafeChannel := "" + minTsafe := uint64(math.MaxUint64) + node.delegators.Range(func(channel string, delegator delegator.ShardDelegator) bool { + tsafe := delegator.GetTSafe() + if tsafe < minTsafe { + minTsafeChannel = channel + minTsafe = tsafe + } + return true + }) + collections := node.manager.Collection.ListWithName() nodeID := fmt.Sprint(node.GetNodeID()) diff --git a/internal/querynodev2/pipeline/delete_node.go b/internal/querynodev2/pipeline/delete_node.go index 4452da5e64..5607fdd7ec 100644 --- a/internal/querynodev2/pipeline/delete_node.go +++ b/internal/querynodev2/pipeline/delete_node.go @@ -35,9 +35,8 @@ type deleteNode struct { collectionID UniqueID channel string - manager *DataManager - tSafeManager TSafeManager - delegator delegator.ShardDelegator + manager *DataManager + delegator delegator.ShardDelegator } // addDeleteData find the segment of delete column in DeleteMsg and save in deleteData @@ -78,17 +77,13 @@ func (dNode *deleteNode) Operate(in Msg) Msg { } // update tSafe - err := dNode.tSafeManager.Set(dNode.channel, nodeMsg.timeRange.timestampMax) - if err != nil { - // should not happen, QueryNode should addTSafe before start pipeline - panic(fmt.Errorf("serviceTimeNode setTSafe timeout, collectionID = %d, err = %s", dNode.collectionID, err)) - } + dNode.delegator.UpdateTSafe(nodeMsg.timeRange.timestampMax) return nil } func newDeleteNode( collectionID UniqueID, channel string, - manager *DataManager, tSafeManager TSafeManager, delegator delegator.ShardDelegator, + manager *DataManager, delegator delegator.ShardDelegator, maxQueueLength int32, ) *deleteNode { return &deleteNode{ @@ -96,7 +91,6 @@ func newDeleteNode( collectionID: collectionID, channel: channel, manager: manager, - tSafeManager: tSafeManager, delegator: delegator, } } diff --git a/internal/querynodev2/pipeline/delete_node_test.go b/internal/querynodev2/pipeline/delete_node_test.go index 72a8c75813..da3607b848 100644 --- a/internal/querynodev2/pipeline/delete_node_test.go +++ b/internal/querynodev2/pipeline/delete_node_test.go @@ -17,7 +17,6 @@ package pipeline import ( - "context" "testing" "github.com/samber/lo" @@ -26,7 +25,6 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -40,8 +38,6 @@ type DeleteNodeSuite struct { channel string timeRange TimeRange - // dependency - tSafeManager TSafeManager // mocks manager *segments.Manager delegator *delegator.MockShardDelegator @@ -93,18 +89,13 @@ func (suite *DeleteNodeSuite) TestBasic() { } }) // init dependency - suite.tSafeManager = tsafe.NewTSafeReplica() - suite.tSafeManager.Add(context.Background(), suite.channel, 0) // build delete node and data - node := newDeleteNode(suite.collectionID, suite.channel, suite.manager, suite.tSafeManager, suite.delegator, 8) + node := newDeleteNode(suite.collectionID, suite.channel, suite.manager, suite.delegator, 8) in := suite.buildDeleteNodeMsg() + suite.delegator.EXPECT().UpdateTSafe(in.timeRange.timestampMax).Return() // run out := node.Operate(in) suite.Nil(out) - // check tsafe - tt, err := suite.tSafeManager.Get(suite.channel) - suite.NoError(err) - suite.Equal(suite.timeRange.timestampMax, tt) } func TestDeleteNode(t *testing.T) { diff --git a/internal/querynodev2/pipeline/manager.go b/internal/querynodev2/pipeline/manager.go index 453c963843..9c390b4d2f 100644 --- a/internal/querynodev2/pipeline/manager.go +++ b/internal/querynodev2/pipeline/manager.go @@ -47,9 +47,8 @@ type manager struct { dataManager *DataManager delegators *typeutil.ConcurrentMap[string, delegator.ShardDelegator] - tSafeManager TSafeManager - dispatcher msgdispatcher.Client - mu sync.RWMutex + dispatcher msgdispatcher.Client + mu sync.RWMutex } func (m *manager) Num() int { @@ -83,7 +82,7 @@ func (m *manager) Add(collectionID UniqueID, channel string) (Pipeline, error) { return nil, merr.WrapErrChannelNotFound(channel, "delegator not found") } - newPipeLine, err := NewPipeLine(collectionID, channel, m.dataManager, m.tSafeManager, m.dispatcher, delegator) + newPipeLine, err := NewPipeLine(collectionID, channel, m.dataManager, m.dispatcher, delegator) if err != nil { return nil, merr.WrapErrServiceUnavailable(err.Error(), "failed to create new pipeline") } @@ -156,7 +155,6 @@ func (m *manager) Close() { } func NewManager(dataManager *DataManager, - tSafeManager TSafeManager, dispatcher msgdispatcher.Client, delegators *typeutil.ConcurrentMap[string, delegator.ShardDelegator], ) Manager { @@ -164,7 +162,6 @@ func NewManager(dataManager *DataManager, channel2Pipeline: make(map[string]Pipeline), dataManager: dataManager, delegators: delegators, - tSafeManager: tSafeManager, dispatcher: dispatcher, } } diff --git a/internal/querynodev2/pipeline/manager_test.go b/internal/querynodev2/pipeline/manager_test.go index 5d306864ab..8d23dd9e0d 100644 --- a/internal/querynodev2/pipeline/manager_test.go +++ b/internal/querynodev2/pipeline/manager_test.go @@ -17,7 +17,6 @@ package pipeline import ( - "context" "testing" "github.com/stretchr/testify/mock" @@ -26,7 +25,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/pkg/mq/common" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -40,8 +38,7 @@ type PipelineManagerTestSuite struct { collectionID int64 channel string // dependencies - tSafeManager TSafeManager - delegators *typeutil.ConcurrentMap[string, delegator.ShardDelegator] + delegators *typeutil.ConcurrentMap[string, delegator.ShardDelegator] // mocks segmentManager *segments.MockSegmentManager @@ -59,9 +56,6 @@ func (suite *PipelineManagerTestSuite) SetupSuite() { func (suite *PipelineManagerTestSuite) SetupTest() { paramtable.Init() // init dependency - // init tsafeManager - suite.tSafeManager = tsafe.NewTSafeReplica() - suite.tSafeManager.Add(context.Background(), suite.channel, 0) suite.delegators = typeutil.NewConcurrentMap[string, delegator.ShardDelegator]() // init mock @@ -88,7 +82,7 @@ func (suite *PipelineManagerTestSuite) TestBasic() { Collection: suite.collectionManager, Segment: suite.segmentManager, } - pipelineManager := NewManager(manager, suite.tSafeManager, suite.msgDispatcher, suite.delegators) + pipelineManager := NewManager(manager, suite.msgDispatcher, suite.delegators) defer pipelineManager.Close() // Add pipeline diff --git a/internal/querynodev2/pipeline/pipeline.go b/internal/querynodev2/pipeline/pipeline.go index 16b4fb02c3..82702976d8 100644 --- a/internal/querynodev2/pipeline/pipeline.go +++ b/internal/querynodev2/pipeline/pipeline.go @@ -42,7 +42,6 @@ func NewPipeLine( collectionID UniqueID, channel string, manager *DataManager, - tSafeManager TSafeManager, dispatcher msgdispatcher.Client, delegator delegator.ShardDelegator, ) (Pipeline, error) { @@ -55,7 +54,7 @@ func NewPipeLine( filterNode := newFilterNode(collectionID, channel, manager, delegator, pipelineQueueLength) insertNode := newInsertNode(collectionID, channel, manager, delegator, pipelineQueueLength) - deleteNode := newDeleteNode(collectionID, channel, manager, tSafeManager, delegator, pipelineQueueLength) + deleteNode := newDeleteNode(collectionID, channel, manager, delegator, pipelineQueueLength) p.Add(filterNode, insertNode, deleteNode) return p, nil } diff --git a/internal/querynodev2/pipeline/pipeline_test.go b/internal/querynodev2/pipeline/pipeline_test.go index 97afa551e8..01773940e8 100644 --- a/internal/querynodev2/pipeline/pipeline_test.go +++ b/internal/querynodev2/pipeline/pipeline_test.go @@ -17,7 +17,6 @@ package pipeline import ( - "context" "testing" "github.com/samber/lo" @@ -29,7 +28,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/pkg/mq/common" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -46,9 +44,6 @@ type PipelineTestSuite struct { insertSegmentIDs []int64 deletePKs []int64 - // dependencies - tSafeManager TSafeManager - // mocks segmentManager *segments.MockSegmentManager collectionManager *segments.MockCollectionManager @@ -98,11 +93,6 @@ func (suite *PipelineTestSuite) SetupTest() { suite.delegator = delegator.NewMockShardDelegator(suite.T()) // init mq dispatcher suite.msgDispatcher = msgdispatcher.NewMockClient(suite.T()) - - // init dependency - // init tsafeManager - suite.tSafeManager = tsafe.NewTSafeReplica() - suite.tSafeManager.Add(context.Background(), suite.channel, 0) } func (suite *PipelineTestSuite) TestBasic() { @@ -138,12 +128,13 @@ func (suite *PipelineTestSuite) TestBasic() { } } }) + // build pipleine manager := &segments.Manager{ Collection: suite.collectionManager, Segment: suite.segmentManager, } - pipeline, err := NewPipeLine(suite.collectionID, suite.channel, manager, suite.tSafeManager, suite.msgDispatcher, suite.delegator) + pipeline, err := NewPipeLine(suite.collectionID, suite.channel, manager, suite.msgDispatcher, suite.delegator) suite.NoError(err) // Init Consumer @@ -154,20 +145,10 @@ func (suite *PipelineTestSuite) TestBasic() { suite.NoError(err) defer pipeline.Close() - // watch tsafe manager - listener := suite.tSafeManager.WatchChannel(suite.channel) - // build input msg in := suite.buildMsgPack(schema) + suite.delegator.EXPECT().UpdateTSafe(in.EndTs).Return() suite.msgChan <- in - - // wait pipeline work - <-listener.On() - - // check tsafe - tsafe, err := suite.tSafeManager.Get(suite.channel) - suite.NoError(err) - suite.Equal(in.EndTs, tsafe) } func TestQueryNodePipeline(t *testing.T) { diff --git a/internal/querynodev2/pipeline/type.go b/internal/querynodev2/pipeline/type.go index ec18bb5191..41de59465e 100644 --- a/internal/querynodev2/pipeline/type.go +++ b/internal/querynodev2/pipeline/type.go @@ -22,7 +22,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" base "github.com/milvus-io/milvus/internal/util/pipeline" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -51,8 +50,6 @@ type ( DataManager = segments.Manager Segment = segments.Segment - TSafeManager = tsafe.Manager - BaseNode = base.BaseNode Msg = base.Msg ) diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 8a1f1eedcb..b6c806950c 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -51,7 +51,6 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/pipeline" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tasks" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/registry" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" @@ -97,7 +96,6 @@ type QueryNode struct { // internal components manager *segments.Manager clusterManager cluster.Manager - tSafeManager tsafe.Manager pipelineManager pipeline.Manager subscribingChannels *typeutil.ConcurrentSet[string] unsubscribingChannels *typeutil.ConcurrentSet[string] @@ -145,7 +143,6 @@ func NewQueryNode(ctx context.Context, factory dependency.Factory) *QueryNode { lifetime: lifetime.NewLifetime(commonpb.StateCode_Abnormal), } - node.tSafeManager = tsafe.NewTSafeReplica() expr.Register("querynode", node) return node } @@ -352,7 +349,7 @@ func (node *QueryNode) Init() error { node.manager.SetLoader(node.loader) node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, node.GetNodeID()) // init pipeline manager - node.pipelineManager = pipeline.NewManager(node.manager, node.tSafeManager, node.dispClient, node.delegators) + node.pipelineManager = pipeline.NewManager(node.manager, node.dispClient, node.delegators) err = node.InitSegcore() if err != nil { diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index cab6f17dde..f53ac5faf8 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -253,7 +253,6 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm req.GetVersion(), node.clusterManager, node.manager, - node.tSafeManager, node.loader, node.factory, channel.GetSeekPosition().GetTimestamp(), @@ -271,14 +270,6 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm } }() - // create tSafe - node.tSafeManager.Add(ctx, channel.ChannelName, channel.GetSeekPosition().GetTimestamp()) - defer func() { - if err != nil { - node.tSafeManager.Remove(ctx, channel.ChannelName) - } - }() - pipeline, err := node.pipelineManager.Add(req.GetCollectionID(), channel.GetChannelName()) if err != nil { msg := "failed to create pipeline" @@ -368,7 +359,6 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC node.pipelineManager.Remove(req.GetChannelName()) node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing)) _, sealed := node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0)) - node.tSafeManager.Remove(ctx, req.GetChannelName()) node.manager.Collection.Unref(req.GetCollectionID(), uint32(1+sealed)) } diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 1ab3663b11..c3d44aadce 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -1755,11 +1755,13 @@ func (suite *ServiceSuite) TestGetMetric_Normal() { sd1 := delegator.NewMockShardDelegator(suite.T()) sd1.EXPECT().Collection().Return(100) sd1.EXPECT().GetDeleteBufferSize().Return(10, 1000) + sd1.EXPECT().GetTSafe().Return(100) sd1.EXPECT().Close().Maybe() suite.node.delegators.Insert("qn_unitest_dml_0_100v0", sd1) sd2 := delegator.NewMockShardDelegator(suite.T()) sd2.EXPECT().Collection().Return(100) + sd2.EXPECT().GetTSafe().Return(200) sd2.EXPECT().GetDeleteBufferSize().Return(10, 1000) sd2.EXPECT().Close().Maybe() suite.node.delegators.Insert("qn_unitest_dml_1_100v1", sd2) diff --git a/internal/querynodev2/tsafe/OWNERS b/internal/querynodev2/tsafe/OWNERS deleted file mode 100644 index b4179850dd..0000000000 --- a/internal/querynodev2/tsafe/OWNERS +++ /dev/null @@ -1,9 +0,0 @@ -reviewers: - - aoiasd - - bigsheeper - - congqixia - - yah01 - -approvers: - - maintainers - diff --git a/internal/querynodev2/tsafe/listener.go b/internal/querynodev2/tsafe/listener.go deleted file mode 100644 index b0abd0a5df..0000000000 --- a/internal/querynodev2/tsafe/listener.go +++ /dev/null @@ -1,64 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tsafe - -type Listener interface { - On() <-chan struct{} - Close() -} - -type listener struct { - tsafe *tSafeManager - channel string - ch chan struct{} -} - -func (l *listener) On() <-chan struct{} { - return l.ch -} - -func (l *listener) Close() { - l.tsafe.mu.Lock() - defer l.tsafe.mu.Unlock() - l.close() -} - -// close remove the listener from the tSafeReplica without lock -func (l *listener) close() { - for i, listen := range l.tsafe.listeners[l.channel] { - if l == listen { - close(l.ch) - l.tsafe.listeners[l.channel] = append(l.tsafe.listeners[l.channel][:i], l.tsafe.listeners[l.channel][i+1:]...) - break - } - } -} - -func (l *listener) nonBlockingNotify() { - select { - case l.ch <- struct{}{}: - default: - } -} - -func newListener(tsafe *tSafeManager, channel string) *listener { - return &listener{ - tsafe: tsafe, - channel: channel, - ch: make(chan struct{}, 1), - } -} diff --git a/internal/querynodev2/tsafe/manager.go b/internal/querynodev2/tsafe/manager.go deleted file mode 100644 index 6e56e8ae2b..0000000000 --- a/internal/querynodev2/tsafe/manager.go +++ /dev/null @@ -1,149 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tsafe - -import ( - "context" - "fmt" - "sync" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/tsoutil" - . "github.com/milvus-io/milvus/pkg/util/typeutil" -) - -// Manager is the interface for tsafe manager. -type Manager interface { - Get(vChannel string) (Timestamp, error) - Set(vChannel string, timestamp Timestamp) error - Add(ctx context.Context, vChannel string, timestamp Timestamp) - Remove(ctx context.Context, vChannel string) - Watch() Listener - WatchChannel(channel string) Listener - - Min() (string, Timestamp) -} - -// tSafeManager implements `Manager` interface. -type tSafeManager struct { - mu sync.Mutex // guards tSafes - tSafes map[string]*tSafe // map[DMLChannel]*tSafe - listeners map[string][]*listener // map[DMLChannel][]*listener, key "" means all channels. -} - -func (t *tSafeManager) Watch() Listener { - return t.WatchChannel("") -} - -func (t *tSafeManager) WatchChannel(channel string) Listener { - t.mu.Lock() - defer t.mu.Unlock() - l := newListener(t, channel) - t.listeners[channel] = append(t.listeners[channel], l) - return l -} - -func (t *tSafeManager) Add(ctx context.Context, vChannel string, timestamp uint64) { - ts, _ := tsoutil.ParseTS(timestamp) - t.mu.Lock() - defer t.mu.Unlock() - if _, ok := t.tSafes[vChannel]; !ok { - t.tSafes[vChannel] = newTSafe(vChannel, timestamp) - } - log.Ctx(ctx).Info("add tSafe done", - zap.String("channel", vChannel), zap.Time("timestamp", ts)) -} - -func (t *tSafeManager) Get(vChannel string) (Timestamp, error) { - t.mu.Lock() - defer t.mu.Unlock() - ts, err := t.get(vChannel) - if err != nil { - return 0, err - } - return ts.get(), nil -} - -func (t *tSafeManager) Set(vChannel string, timestamp Timestamp) error { - t.mu.Lock() - defer t.mu.Unlock() - ts, err := t.get(vChannel) - if err != nil { - return fmt.Errorf("set tSafe failed, err = %w", err) - } - ts.set(timestamp) - t.notifyAll(vChannel) - return nil -} - -func (t *tSafeManager) Remove(ctx context.Context, vChannel string) { - t.mu.Lock() - defer t.mu.Unlock() - tsafe, ok := t.tSafes[vChannel] - if ok { - tsafe.close() - } - for _, l := range t.listeners[vChannel] { - l.close() - } - delete(t.tSafes, vChannel) - delete(t.listeners, vChannel) - log.Ctx(ctx).Info("remove tSafe replica", - zap.String("vChannel", vChannel)) -} - -func (t *tSafeManager) Min() (string, Timestamp) { - t.mu.Lock() - defer t.mu.Unlock() - var minChannel string - minTt := MaxTimestamp - for channel, tsafe := range t.tSafes { - t := tsafe.get() - if t < minTt && t != 0 { - minChannel = channel - minTt = t - } - } - return minChannel, minTt -} - -func (t *tSafeManager) get(vChannel string) (*tSafe, error) { - if _, ok := t.tSafes[vChannel]; !ok { - return nil, fmt.Errorf("cannot found tSafer, vChannel = %s", vChannel) - } - return t.tSafes[vChannel], nil -} - -// since notifyAll called by setTSafe, no need to lock -func (t *tSafeManager) notifyAll(channel string) { - for _, l := range t.listeners[""] { - l.nonBlockingNotify() - } - for _, l := range t.listeners[channel] { - l.nonBlockingNotify() - } -} - -func NewTSafeReplica() Manager { - replica := &tSafeManager{ - tSafes: make(map[string]*tSafe), - listeners: make(map[string][]*listener), - } - return replica -} diff --git a/internal/querynodev2/tsafe/tsafe.go b/internal/querynodev2/tsafe/tsafe.go deleted file mode 100644 index 9ec40f1ed8..0000000000 --- a/internal/querynodev2/tsafe/tsafe.go +++ /dev/null @@ -1,55 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tsafe - -import ( - "go.uber.org/atomic" - - . "github.com/milvus-io/milvus/pkg/util/typeutil" -) - -type tSafe struct { - channel string - tSafe atomic.Uint64 - closed atomic.Bool -} - -func (ts *tSafe) valid() bool { - return !ts.closed.Load() -} - -func (ts *tSafe) close() { - ts.closed.Store(true) -} - -func (ts *tSafe) get() Timestamp { - return ts.tSafe.Load() -} - -func (ts *tSafe) set(t Timestamp) { - ts.tSafe.Store(t) -} - -func newTSafe(channel string, timestamp uint64) *tSafe { - ts := &tSafe{ - channel: channel, - } - ts.tSafe.Store(timestamp) - ts.closed.Store(false) - - return ts -} diff --git a/internal/querynodev2/tsafe/tsafe_test.go b/internal/querynodev2/tsafe/tsafe_test.go deleted file mode 100644 index 3fd94065a6..0000000000 --- a/internal/querynodev2/tsafe/tsafe_test.go +++ /dev/null @@ -1,94 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tsafe - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/suite" - - . "github.com/milvus-io/milvus/pkg/util/typeutil" -) - -type TSafeTestSuite struct { - suite.Suite - tSafeReplica Manager - channel string - time Timestamp -} - -func (suite *TSafeTestSuite) SetupSuite() { - suite.channel = "test-channel" - suite.time = uint64(time.Now().Unix()) -} - -func (suite *TSafeTestSuite) SetupTest() { - suite.tSafeReplica = NewTSafeReplica() -} - -// test Basic use of TSafeReplica -func (suite *TSafeTestSuite) TestBasic() { - suite.tSafeReplica.Add(context.Background(), suite.channel, ZeroTimestamp) - t, err := suite.tSafeReplica.Get(suite.channel) - suite.NoError(err) - suite.Equal(ZeroTimestamp, t) - - // Add listener - globalWatcher := suite.tSafeReplica.WatchChannel(suite.channel) - channelWatcher := suite.tSafeReplica.Watch() - defer globalWatcher.Close() - defer channelWatcher.Close() - - // Test Set tSafe - suite.tSafeReplica.Set(suite.channel, suite.time) - t, err = suite.tSafeReplica.Get(suite.channel) - suite.NoError(err) - suite.Equal(suite.time, t) - - // Test listener - select { - case <-globalWatcher.On(): - default: - suite.Fail("global watcher should be triggered") - } - - select { - case <-channelWatcher.On(): - default: - suite.Fail("channel watcher should be triggered") - } -} - -func (suite *TSafeTestSuite) TestRemoveAndInvalid() { - suite.tSafeReplica.Add(context.Background(), suite.channel, ZeroTimestamp) - t, err := suite.tSafeReplica.Get(suite.channel) - suite.NoError(err) - suite.Equal(ZeroTimestamp, t) - - suite.tSafeReplica.Remove(context.Background(), suite.channel) - _, err = suite.tSafeReplica.Get(suite.channel) - suite.Error(err) - - err = suite.tSafeReplica.Set(suite.channel, suite.time) - suite.Error(err) -} - -func TestTSafe(t *testing.T) { - suite.Run(t, new(TSafeTestSuite)) -}