From 28841ebdf9ff59d3cef0cb2337db8e765f81730f Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 13 Dec 2024 21:20:57 +0800 Subject: [PATCH] enhance: [10kcp] Simplify querynode tsafe & reduce goroutine number (#38416) (#38433) Related to #37630 TSafe manager is too complex for current implementation and each delegator need one goroutine waiting for tsafe update event. Tsafe updating could be executed in pipeline. This PR remove tsafe manager and simplify the entire logic of tsafe updating. Signed-off-by: Congqi Xia --- internal/querynodev2/delegator/delegator.go | 45 ++---- .../delegator/delegator_data_test.go | 6 +- .../querynodev2/delegator/delegator_test.go | 79 +--------- .../delegator/delta_forward_test.go | 5 +- .../querynodev2/delegator/mock_delegator.go | 140 +++++++++++++++- internal/querynodev2/metrics_info.go | 13 +- internal/querynodev2/pipeline/delete_node.go | 14 +- .../querynodev2/pipeline/delete_node_test.go | 13 +- internal/querynodev2/pipeline/manager.go | 9 +- internal/querynodev2/pipeline/manager_test.go | 10 +- internal/querynodev2/pipeline/pipeline.go | 3 +- .../querynodev2/pipeline/pipeline_test.go | 25 +-- internal/querynodev2/pipeline/type.go | 3 - internal/querynodev2/server.go | 5 +- internal/querynodev2/services.go | 10 -- internal/querynodev2/services_test.go | 2 + internal/querynodev2/tsafe/OWNERS | 9 -- internal/querynodev2/tsafe/listener.go | 64 -------- internal/querynodev2/tsafe/manager.go | 149 ------------------ internal/querynodev2/tsafe/tsafe.go | 55 ------- internal/querynodev2/tsafe/tsafe_test.go | 94 ----------- 21 files changed, 182 insertions(+), 571 deletions(-) delete mode 100644 internal/querynodev2/tsafe/OWNERS delete mode 100644 internal/querynodev2/tsafe/listener.go delete mode 100644 internal/querynodev2/tsafe/manager.go delete mode 100644 internal/querynodev2/tsafe/tsafe.go delete mode 100644 internal/querynodev2/tsafe/tsafe_test.go diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 4a5c83fae2..6b1cbd87f0 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -41,7 +41,6 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/optimizers" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/common" @@ -87,6 +86,10 @@ type ShardDelegator interface { VerifyExcludedSegments(segmentID int64, ts uint64) bool TryCleanExcludedSegments(ts uint64) + // tsafe + UpdateTSafe(ts uint64) + GetTSafe() uint64 + // control Serviceable() bool Start() @@ -111,7 +114,6 @@ type shardDelegator struct { distribution *distribution segmentManager segments.SegmentManager - tsafeManager tsafe.Manager pkOracle pkoracle.PkOracle level0Mut sync.RWMutex // stream delete buffer @@ -751,36 +753,9 @@ func (sd *shardDelegator) waitTSafe(ctx context.Context, ts uint64) (uint64, err } } -// watchTSafe is the worker function to update serviceable timestamp. -func (sd *shardDelegator) watchTSafe() { - defer sd.lifetime.Done() - listener := sd.tsafeManager.WatchChannel(sd.vchannelName) - sd.updateTSafe() - log := sd.getLogger(context.Background()) - for { - select { - case _, ok := <-listener.On(): - if !ok { - // listener close - log.Warn("tsafe listener closed") - return - } - sd.updateTSafe() - case <-sd.lifetime.CloseCh(): - log.Info("updateTSafe quit") - // shard delegator closed - return - } - } -} - // updateTSafe read current tsafe value from tsafeManager. -func (sd *shardDelegator) updateTSafe() { +func (sd *shardDelegator) UpdateTSafe(tsafe uint64) { sd.tsCond.L.Lock() - tsafe, err := sd.tsafeManager.Get(sd.vchannelName) - if err != nil { - log.Warn("tsafeManager failed to get lastest", zap.Error(err)) - } if tsafe > sd.latestTsafe.Load() { sd.latestTsafe.Store(tsafe) sd.tsCond.Broadcast() @@ -788,6 +763,10 @@ func (sd *shardDelegator) updateTSafe() { sd.tsCond.L.Unlock() } +func (sd *shardDelegator) GetTSafe() uint64 { + return sd.latestTsafe.Load() +} + // Close closes the delegator. func (sd *shardDelegator) Close() { sd.lifetime.SetState(lifetime.Stopped) @@ -849,7 +828,7 @@ func (sd *shardDelegator) loadPartitionStats(ctx context.Context, partStatsVersi // NewShardDelegator creates a new ShardDelegator instance with all fields initialized. func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID UniqueID, channel string, version int64, - workerManager cluster.Manager, manager *segments.Manager, tsafeManager tsafe.Manager, loader segments.Loader, + workerManager cluster.Manager, manager *segments.Manager, loader segments.Loader, factory msgstream.Factory, startTs uint64, queryHook optimizers.QueryHook, chunkManager storage.ChunkManager, ) (ShardDelegator, error) { log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID), @@ -885,7 +864,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock, []string{fmt.Sprint(paramtable.GetNodeID()), channel}), pkOracle: pkoracle.NewPkOracle(), - tsafeManager: tsafeManager, latestTsafe: atomic.NewUint64(startTs), loader: loader, factory: factory, @@ -898,9 +876,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni } m := sync.Mutex{} sd.tsCond = sync.NewCond(&m) - if sd.lifetime.Add(lifetime.NotStopped) == nil { - go sd.watchTSafe() - } log.Info("finish build new shardDelegator") return sd, nil } diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 407de0effd..55c9e04fe9 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -39,7 +39,6 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/cluster" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/internal/util/initcore" @@ -62,7 +61,6 @@ type DelegatorDataSuite struct { version int64 workerManager *cluster.MockManager manager *segments.Manager - tsafeManager tsafe.Manager loader *segments.MockLoader mq *msgstream.MockMsgStream channel metautil.Channel @@ -98,7 +96,6 @@ func (s *DelegatorDataSuite) TearDownSuite() { func (s *DelegatorDataSuite) SetupTest() { s.workerManager = &cluster.MockManager{} s.manager = segments.NewManager() - s.tsafeManager = tsafe.NewTSafeReplica() s.loader = &segments.MockLoader{} // init schema @@ -159,7 +156,7 @@ func (s *DelegatorDataSuite) SetupTest() { s.rootPath = s.Suite.T().Name() chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath) s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background()) - delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{ + delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.loader, &msgstream.MockMqFactory{ NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) { return s.mq, nil }, @@ -654,7 +651,6 @@ func (s *DelegatorDataSuite) TestLoadSegments() { s.version, s.workerManager, s.manager, - s.tsafeManager, s.loader, &msgstream.MockMqFactory{ NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) { diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index 2dcd9ac5e0..982164e2ff 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -19,17 +19,13 @@ package delegator import ( "context" "io" - "sync" "testing" "time" "github.com/cockroachdb/errors" "github.com/samber/lo" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "go.uber.org/atomic" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" @@ -39,13 +35,11 @@ import ( "github.com/milvus-io/milvus/internal/proto/segcorepb" "github.com/milvus-io/milvus/internal/querynodev2/cluster" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/commonpbutil" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metric" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -61,7 +55,6 @@ type DelegatorSuite struct { version int64 workerManager *cluster.MockManager manager *segments.Manager - tsafeManager tsafe.Manager loader *segments.MockLoader mq *msgstream.MockMsgStream @@ -85,7 +78,6 @@ func (s *DelegatorSuite) SetupTest() { s.version = 2000 s.workerManager = &cluster.MockManager{} s.manager = segments.NewManager() - s.tsafeManager = tsafe.NewTSafeReplica() s.loader = &segments.MockLoader{} s.loader.EXPECT(). Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything). @@ -165,7 +157,7 @@ func (s *DelegatorSuite) SetupTest() { var err error // s.delegator, err = NewShardDelegator(s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader) - s.delegator, err = NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{ + s.delegator, err = NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.loader, &msgstream.MockMqFactory{ NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) { return s.mq, nil }, @@ -1120,72 +1112,3 @@ func (s *DelegatorSuite) TestGetStats() { func TestDelegatorSuite(t *testing.T) { suite.Run(t, new(DelegatorSuite)) } - -func TestDelegatorWatchTsafe(t *testing.T) { - channelName := "default_dml_channel" - - tsafeManager := tsafe.NewTSafeReplica() - tsafeManager.Add(context.Background(), channelName, 100) - sd := &shardDelegator{ - tsafeManager: tsafeManager, - vchannelName: channelName, - lifetime: lifetime.NewLifetime(lifetime.Initializing), - latestTsafe: atomic.NewUint64(0), - } - defer sd.Close() - - m := sync.Mutex{} - sd.tsCond = sync.NewCond(&m) - if sd.lifetime.Add(lifetime.NotStopped) == nil { - go sd.watchTSafe() - } - - err := tsafeManager.Set(channelName, 200) - require.NoError(t, err) - - assert.Eventually(t, func() bool { - return sd.latestTsafe.Load() == 200 - }, time.Second*10, time.Millisecond*10) -} - -func TestDelegatorTSafeListenerClosed(t *testing.T) { - channelName := "default_dml_channel" - - tsafeManager := tsafe.NewTSafeReplica() - tsafeManager.Add(context.Background(), channelName, 100) - sd := &shardDelegator{ - tsafeManager: tsafeManager, - vchannelName: channelName, - lifetime: lifetime.NewLifetime(lifetime.Initializing), - latestTsafe: atomic.NewUint64(0), - } - defer sd.Close() - - m := sync.Mutex{} - sd.tsCond = sync.NewCond(&m) - signal := make(chan struct{}) - if sd.lifetime.Add(lifetime.NotStopped) == nil { - go func() { - sd.watchTSafe() - close(signal) - }() - } - - select { - case <-signal: - assert.FailNow(t, "watchTsafe quit unexpectedly") - case <-time.After(time.Millisecond * 10): - } - - tsafeManager.Remove(context.Background(), channelName) - - select { - case <-signal: - case <-time.After(time.Second): - assert.FailNow(t, "watchTsafe still working after listener closed") - } - - sd.Close() - assert.Equal(t, sd.Serviceable(), false) - assert.Equal(t, sd.Stopped(), true) -} diff --git a/internal/querynodev2/delegator/delta_forward_test.go b/internal/querynodev2/delegator/delta_forward_test.go index e9661a53bc..99c4822795 100644 --- a/internal/querynodev2/delegator/delta_forward_test.go +++ b/internal/querynodev2/delegator/delta_forward_test.go @@ -31,7 +31,6 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/cluster" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -51,7 +50,6 @@ type StreamingForwardSuite struct { version int64 workerManager *cluster.MockManager manager *segments.Manager - tsafeManager tsafe.Manager loader *segments.MockLoader mq *msgstream.MockMsgStream @@ -73,7 +71,6 @@ func (s *StreamingForwardSuite) SetupTest() { s.version = 2000 s.workerManager = &cluster.MockManager{} s.manager = segments.NewManager() - s.tsafeManager = tsafe.NewTSafeReplica() s.loader = &segments.MockLoader{} s.loader.EXPECT(). Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything). @@ -151,7 +148,7 @@ func (s *StreamingForwardSuite) SetupTest() { chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath) s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background()) - delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{ + delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.loader, &msgstream.MockMqFactory{ NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) { return s.mq, nil }, diff --git a/internal/querynodev2/delegator/mock_delegator.go b/internal/querynodev2/delegator/mock_delegator.go index 4cc49950a9..8249a46e59 100644 --- a/internal/querynodev2/delegator/mock_delegator.go +++ b/internal/querynodev2/delegator/mock_delegator.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package delegator @@ -97,6 +97,10 @@ func (_c *MockShardDelegator_Close_Call) RunAndReturn(run func()) *MockShardDele func (_m *MockShardDelegator) Collection() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Collection") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -138,6 +142,10 @@ func (_c *MockShardDelegator_Collection_Call) RunAndReturn(run func() int64) *Mo func (_m *MockShardDelegator) GetDeleteBufferSize() (int64, int64) { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetDeleteBufferSize") + } + var r0 int64 var r1 int64 if rf, ok := ret.Get(0).(func() (int64, int64)); ok { @@ -189,6 +197,10 @@ func (_c *MockShardDelegator_GetDeleteBufferSize_Call) RunAndReturn(run func() ( func (_m *MockShardDelegator) GetPartitionStatsVersions(ctx context.Context) map[int64]int64 { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for GetPartitionStatsVersions") + } + var r0 map[int64]int64 if rf, ok := ret.Get(0).(func(context.Context) map[int64]int64); ok { r0 = rf(ctx) @@ -233,6 +245,10 @@ func (_c *MockShardDelegator_GetPartitionStatsVersions_Call) RunAndReturn(run fu func (_m *MockShardDelegator) GetSegmentInfo(readable bool) ([]SnapshotItem, []SegmentEntry) { ret := _m.Called(readable) + if len(ret) == 0 { + panic("no return value specified for GetSegmentInfo") + } + var r0 []SnapshotItem var r1 []SegmentEntry if rf, ok := ret.Get(0).(func(bool) ([]SnapshotItem, []SegmentEntry)); ok { @@ -289,6 +305,10 @@ func (_c *MockShardDelegator_GetSegmentInfo_Call) RunAndReturn(run func(bool) ([ func (_m *MockShardDelegator) GetStatistics(ctx context.Context, req *querypb.GetStatisticsRequest) ([]*internalpb.GetStatisticsResponse, error) { ret := _m.Called(ctx, req) + if len(ret) == 0 { + panic("no return value specified for GetStatistics") + } + var r0 []*internalpb.GetStatisticsResponse var r1 error if rf, ok := ret.Get(0).(func(context.Context, *querypb.GetStatisticsRequest) ([]*internalpb.GetStatisticsResponse, error)); ok { @@ -340,10 +360,59 @@ func (_c *MockShardDelegator_GetStatistics_Call) RunAndReturn(run func(context.C return _c } +// GetTSafe provides a mock function with given fields: +func (_m *MockShardDelegator) GetTSafe() uint64 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetTSafe") + } + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + return r0 +} + +// MockShardDelegator_GetTSafe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTSafe' +type MockShardDelegator_GetTSafe_Call struct { + *mock.Call +} + +// GetTSafe is a helper method to define mock.On call +func (_e *MockShardDelegator_Expecter) GetTSafe() *MockShardDelegator_GetTSafe_Call { + return &MockShardDelegator_GetTSafe_Call{Call: _e.mock.On("GetTSafe")} +} + +func (_c *MockShardDelegator_GetTSafe_Call) Run(run func()) *MockShardDelegator_GetTSafe_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockShardDelegator_GetTSafe_Call) Return(_a0 uint64) *MockShardDelegator_GetTSafe_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockShardDelegator_GetTSafe_Call) RunAndReturn(run func() uint64) *MockShardDelegator_GetTSafe_Call { + _c.Call.Return(run) + return _c +} + // GetTargetVersion provides a mock function with given fields: func (_m *MockShardDelegator) GetTargetVersion() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetTargetVersion") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -385,6 +454,10 @@ func (_c *MockShardDelegator_GetTargetVersion_Call) RunAndReturn(run func() int6 func (_m *MockShardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error { ret := _m.Called(ctx, infos, version) + if len(ret) == 0 { + panic("no return value specified for LoadGrowing") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, []*querypb.SegmentLoadInfo, int64) error); ok { r0 = rf(ctx, infos, version) @@ -429,6 +502,10 @@ func (_c *MockShardDelegator_LoadGrowing_Call) RunAndReturn(run func(context.Con func (_m *MockShardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error { ret := _m.Called(ctx, req) + if len(ret) == 0 { + panic("no return value specified for LoadSegments") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *querypb.LoadSegmentsRequest) error); ok { r0 = rf(ctx, req) @@ -539,6 +616,10 @@ func (_c *MockShardDelegator_ProcessInsert_Call) RunAndReturn(run func(map[int64 func (_m *MockShardDelegator) Query(ctx context.Context, req *querypb.QueryRequest) ([]*internalpb.RetrieveResults, error) { ret := _m.Called(ctx, req) + if len(ret) == 0 { + panic("no return value specified for Query") + } + var r0 []*internalpb.RetrieveResults var r1 error if rf, ok := ret.Get(0).(func(context.Context, *querypb.QueryRequest) ([]*internalpb.RetrieveResults, error)); ok { @@ -594,6 +675,10 @@ func (_c *MockShardDelegator_Query_Call) RunAndReturn(run func(context.Context, func (_m *MockShardDelegator) QueryStream(ctx context.Context, req *querypb.QueryRequest, srv streamrpc.QueryStreamServer) error { ret := _m.Called(ctx, req, srv) + if len(ret) == 0 { + panic("no return value specified for QueryStream") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *querypb.QueryRequest, streamrpc.QueryStreamServer) error); ok { r0 = rf(ctx, req, srv) @@ -638,6 +723,10 @@ func (_c *MockShardDelegator_QueryStream_Call) RunAndReturn(run func(context.Con func (_m *MockShardDelegator) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error { ret := _m.Called(ctx, req, force) + if len(ret) == 0 { + panic("no return value specified for ReleaseSegments") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *querypb.ReleaseSegmentsRequest, bool) error); ok { r0 = rf(ctx, req, force) @@ -682,6 +771,10 @@ func (_c *MockShardDelegator_ReleaseSegments_Call) RunAndReturn(run func(context func (_m *MockShardDelegator) Search(ctx context.Context, req *querypb.SearchRequest) ([]*internalpb.SearchResults, error) { ret := _m.Called(ctx, req) + if len(ret) == 0 { + panic("no return value specified for Search") + } + var r0 []*internalpb.SearchResults var r1 error if rf, ok := ret.Get(0).(func(context.Context, *querypb.SearchRequest) ([]*internalpb.SearchResults, error)); ok { @@ -737,6 +830,10 @@ func (_c *MockShardDelegator_Search_Call) RunAndReturn(run func(context.Context, func (_m *MockShardDelegator) Serviceable() bool { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Serviceable") + } + var r0 bool if rf, ok := ret.Get(0).(func() bool); ok { r0 = rf() @@ -958,10 +1055,47 @@ func (_c *MockShardDelegator_TryCleanExcludedSegments_Call) RunAndReturn(run fun return _c } +// UpdateTSafe provides a mock function with given fields: ts +func (_m *MockShardDelegator) UpdateTSafe(ts uint64) { + _m.Called(ts) +} + +// MockShardDelegator_UpdateTSafe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateTSafe' +type MockShardDelegator_UpdateTSafe_Call struct { + *mock.Call +} + +// UpdateTSafe is a helper method to define mock.On call +// - ts uint64 +func (_e *MockShardDelegator_Expecter) UpdateTSafe(ts interface{}) *MockShardDelegator_UpdateTSafe_Call { + return &MockShardDelegator_UpdateTSafe_Call{Call: _e.mock.On("UpdateTSafe", ts)} +} + +func (_c *MockShardDelegator_UpdateTSafe_Call) Run(run func(ts uint64)) *MockShardDelegator_UpdateTSafe_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(uint64)) + }) + return _c +} + +func (_c *MockShardDelegator_UpdateTSafe_Call) Return() *MockShardDelegator_UpdateTSafe_Call { + _c.Call.Return() + return _c +} + +func (_c *MockShardDelegator_UpdateTSafe_Call) RunAndReturn(run func(uint64)) *MockShardDelegator_UpdateTSafe_Call { + _c.Call.Return(run) + return _c +} + // VerifyExcludedSegments provides a mock function with given fields: segmentID, ts func (_m *MockShardDelegator) VerifyExcludedSegments(segmentID int64, ts uint64) bool { ret := _m.Called(segmentID, ts) + if len(ret) == 0 { + panic("no return value specified for VerifyExcludedSegments") + } + var r0 bool if rf, ok := ret.Get(0).(func(int64, uint64) bool); ok { r0 = rf(segmentID, ts) @@ -1005,6 +1139,10 @@ func (_c *MockShardDelegator_VerifyExcludedSegments_Call) RunAndReturn(run func( func (_m *MockShardDelegator) Version() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Version") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index 151b4e03fb..f4f7d11d33 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -19,6 +19,7 @@ package querynodev2 import ( "context" "fmt" + "math" "github.com/samber/lo" @@ -58,7 +59,17 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error return nil, err } - minTsafeChannel, minTsafe := node.tSafeManager.Min() + minTsafeChannel := "" + minTsafe := uint64(math.MaxUint64) + node.delegators.Range(func(channel string, delegator delegator.ShardDelegator) bool { + tsafe := delegator.GetTSafe() + if tsafe < minTsafe { + minTsafeChannel = channel + minTsafe = tsafe + } + return true + }) + collections := node.manager.Collection.ListWithName() nodeID := fmt.Sprint(node.GetNodeID()) diff --git a/internal/querynodev2/pipeline/delete_node.go b/internal/querynodev2/pipeline/delete_node.go index 4452da5e64..5607fdd7ec 100644 --- a/internal/querynodev2/pipeline/delete_node.go +++ b/internal/querynodev2/pipeline/delete_node.go @@ -35,9 +35,8 @@ type deleteNode struct { collectionID UniqueID channel string - manager *DataManager - tSafeManager TSafeManager - delegator delegator.ShardDelegator + manager *DataManager + delegator delegator.ShardDelegator } // addDeleteData find the segment of delete column in DeleteMsg and save in deleteData @@ -78,17 +77,13 @@ func (dNode *deleteNode) Operate(in Msg) Msg { } // update tSafe - err := dNode.tSafeManager.Set(dNode.channel, nodeMsg.timeRange.timestampMax) - if err != nil { - // should not happen, QueryNode should addTSafe before start pipeline - panic(fmt.Errorf("serviceTimeNode setTSafe timeout, collectionID = %d, err = %s", dNode.collectionID, err)) - } + dNode.delegator.UpdateTSafe(nodeMsg.timeRange.timestampMax) return nil } func newDeleteNode( collectionID UniqueID, channel string, - manager *DataManager, tSafeManager TSafeManager, delegator delegator.ShardDelegator, + manager *DataManager, delegator delegator.ShardDelegator, maxQueueLength int32, ) *deleteNode { return &deleteNode{ @@ -96,7 +91,6 @@ func newDeleteNode( collectionID: collectionID, channel: channel, manager: manager, - tSafeManager: tSafeManager, delegator: delegator, } } diff --git a/internal/querynodev2/pipeline/delete_node_test.go b/internal/querynodev2/pipeline/delete_node_test.go index 72a8c75813..da3607b848 100644 --- a/internal/querynodev2/pipeline/delete_node_test.go +++ b/internal/querynodev2/pipeline/delete_node_test.go @@ -17,7 +17,6 @@ package pipeline import ( - "context" "testing" "github.com/samber/lo" @@ -26,7 +25,6 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -40,8 +38,6 @@ type DeleteNodeSuite struct { channel string timeRange TimeRange - // dependency - tSafeManager TSafeManager // mocks manager *segments.Manager delegator *delegator.MockShardDelegator @@ -93,18 +89,13 @@ func (suite *DeleteNodeSuite) TestBasic() { } }) // init dependency - suite.tSafeManager = tsafe.NewTSafeReplica() - suite.tSafeManager.Add(context.Background(), suite.channel, 0) // build delete node and data - node := newDeleteNode(suite.collectionID, suite.channel, suite.manager, suite.tSafeManager, suite.delegator, 8) + node := newDeleteNode(suite.collectionID, suite.channel, suite.manager, suite.delegator, 8) in := suite.buildDeleteNodeMsg() + suite.delegator.EXPECT().UpdateTSafe(in.timeRange.timestampMax).Return() // run out := node.Operate(in) suite.Nil(out) - // check tsafe - tt, err := suite.tSafeManager.Get(suite.channel) - suite.NoError(err) - suite.Equal(suite.timeRange.timestampMax, tt) } func TestDeleteNode(t *testing.T) { diff --git a/internal/querynodev2/pipeline/manager.go b/internal/querynodev2/pipeline/manager.go index 453c963843..9c390b4d2f 100644 --- a/internal/querynodev2/pipeline/manager.go +++ b/internal/querynodev2/pipeline/manager.go @@ -47,9 +47,8 @@ type manager struct { dataManager *DataManager delegators *typeutil.ConcurrentMap[string, delegator.ShardDelegator] - tSafeManager TSafeManager - dispatcher msgdispatcher.Client - mu sync.RWMutex + dispatcher msgdispatcher.Client + mu sync.RWMutex } func (m *manager) Num() int { @@ -83,7 +82,7 @@ func (m *manager) Add(collectionID UniqueID, channel string) (Pipeline, error) { return nil, merr.WrapErrChannelNotFound(channel, "delegator not found") } - newPipeLine, err := NewPipeLine(collectionID, channel, m.dataManager, m.tSafeManager, m.dispatcher, delegator) + newPipeLine, err := NewPipeLine(collectionID, channel, m.dataManager, m.dispatcher, delegator) if err != nil { return nil, merr.WrapErrServiceUnavailable(err.Error(), "failed to create new pipeline") } @@ -156,7 +155,6 @@ func (m *manager) Close() { } func NewManager(dataManager *DataManager, - tSafeManager TSafeManager, dispatcher msgdispatcher.Client, delegators *typeutil.ConcurrentMap[string, delegator.ShardDelegator], ) Manager { @@ -164,7 +162,6 @@ func NewManager(dataManager *DataManager, channel2Pipeline: make(map[string]Pipeline), dataManager: dataManager, delegators: delegators, - tSafeManager: tSafeManager, dispatcher: dispatcher, } } diff --git a/internal/querynodev2/pipeline/manager_test.go b/internal/querynodev2/pipeline/manager_test.go index 5d306864ab..8d23dd9e0d 100644 --- a/internal/querynodev2/pipeline/manager_test.go +++ b/internal/querynodev2/pipeline/manager_test.go @@ -17,7 +17,6 @@ package pipeline import ( - "context" "testing" "github.com/stretchr/testify/mock" @@ -26,7 +25,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/pkg/mq/common" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -40,8 +38,7 @@ type PipelineManagerTestSuite struct { collectionID int64 channel string // dependencies - tSafeManager TSafeManager - delegators *typeutil.ConcurrentMap[string, delegator.ShardDelegator] + delegators *typeutil.ConcurrentMap[string, delegator.ShardDelegator] // mocks segmentManager *segments.MockSegmentManager @@ -59,9 +56,6 @@ func (suite *PipelineManagerTestSuite) SetupSuite() { func (suite *PipelineManagerTestSuite) SetupTest() { paramtable.Init() // init dependency - // init tsafeManager - suite.tSafeManager = tsafe.NewTSafeReplica() - suite.tSafeManager.Add(context.Background(), suite.channel, 0) suite.delegators = typeutil.NewConcurrentMap[string, delegator.ShardDelegator]() // init mock @@ -88,7 +82,7 @@ func (suite *PipelineManagerTestSuite) TestBasic() { Collection: suite.collectionManager, Segment: suite.segmentManager, } - pipelineManager := NewManager(manager, suite.tSafeManager, suite.msgDispatcher, suite.delegators) + pipelineManager := NewManager(manager, suite.msgDispatcher, suite.delegators) defer pipelineManager.Close() // Add pipeline diff --git a/internal/querynodev2/pipeline/pipeline.go b/internal/querynodev2/pipeline/pipeline.go index 16b4fb02c3..82702976d8 100644 --- a/internal/querynodev2/pipeline/pipeline.go +++ b/internal/querynodev2/pipeline/pipeline.go @@ -42,7 +42,6 @@ func NewPipeLine( collectionID UniqueID, channel string, manager *DataManager, - tSafeManager TSafeManager, dispatcher msgdispatcher.Client, delegator delegator.ShardDelegator, ) (Pipeline, error) { @@ -55,7 +54,7 @@ func NewPipeLine( filterNode := newFilterNode(collectionID, channel, manager, delegator, pipelineQueueLength) insertNode := newInsertNode(collectionID, channel, manager, delegator, pipelineQueueLength) - deleteNode := newDeleteNode(collectionID, channel, manager, tSafeManager, delegator, pipelineQueueLength) + deleteNode := newDeleteNode(collectionID, channel, manager, delegator, pipelineQueueLength) p.Add(filterNode, insertNode, deleteNode) return p, nil } diff --git a/internal/querynodev2/pipeline/pipeline_test.go b/internal/querynodev2/pipeline/pipeline_test.go index 97afa551e8..01773940e8 100644 --- a/internal/querynodev2/pipeline/pipeline_test.go +++ b/internal/querynodev2/pipeline/pipeline_test.go @@ -17,7 +17,6 @@ package pipeline import ( - "context" "testing" "github.com/samber/lo" @@ -29,7 +28,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/pkg/mq/common" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -46,9 +44,6 @@ type PipelineTestSuite struct { insertSegmentIDs []int64 deletePKs []int64 - // dependencies - tSafeManager TSafeManager - // mocks segmentManager *segments.MockSegmentManager collectionManager *segments.MockCollectionManager @@ -98,11 +93,6 @@ func (suite *PipelineTestSuite) SetupTest() { suite.delegator = delegator.NewMockShardDelegator(suite.T()) // init mq dispatcher suite.msgDispatcher = msgdispatcher.NewMockClient(suite.T()) - - // init dependency - // init tsafeManager - suite.tSafeManager = tsafe.NewTSafeReplica() - suite.tSafeManager.Add(context.Background(), suite.channel, 0) } func (suite *PipelineTestSuite) TestBasic() { @@ -138,12 +128,13 @@ func (suite *PipelineTestSuite) TestBasic() { } } }) + // build pipleine manager := &segments.Manager{ Collection: suite.collectionManager, Segment: suite.segmentManager, } - pipeline, err := NewPipeLine(suite.collectionID, suite.channel, manager, suite.tSafeManager, suite.msgDispatcher, suite.delegator) + pipeline, err := NewPipeLine(suite.collectionID, suite.channel, manager, suite.msgDispatcher, suite.delegator) suite.NoError(err) // Init Consumer @@ -154,20 +145,10 @@ func (suite *PipelineTestSuite) TestBasic() { suite.NoError(err) defer pipeline.Close() - // watch tsafe manager - listener := suite.tSafeManager.WatchChannel(suite.channel) - // build input msg in := suite.buildMsgPack(schema) + suite.delegator.EXPECT().UpdateTSafe(in.EndTs).Return() suite.msgChan <- in - - // wait pipeline work - <-listener.On() - - // check tsafe - tsafe, err := suite.tSafeManager.Get(suite.channel) - suite.NoError(err) - suite.Equal(in.EndTs, tsafe) } func TestQueryNodePipeline(t *testing.T) { diff --git a/internal/querynodev2/pipeline/type.go b/internal/querynodev2/pipeline/type.go index ec18bb5191..41de59465e 100644 --- a/internal/querynodev2/pipeline/type.go +++ b/internal/querynodev2/pipeline/type.go @@ -22,7 +22,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" base "github.com/milvus-io/milvus/internal/util/pipeline" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -51,8 +50,6 @@ type ( DataManager = segments.Manager Segment = segments.Segment - TSafeManager = tsafe.Manager - BaseNode = base.BaseNode Msg = base.Msg ) diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 8a1f1eedcb..b6c806950c 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -51,7 +51,6 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/pipeline" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tasks" - "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/registry" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" @@ -97,7 +96,6 @@ type QueryNode struct { // internal components manager *segments.Manager clusterManager cluster.Manager - tSafeManager tsafe.Manager pipelineManager pipeline.Manager subscribingChannels *typeutil.ConcurrentSet[string] unsubscribingChannels *typeutil.ConcurrentSet[string] @@ -145,7 +143,6 @@ func NewQueryNode(ctx context.Context, factory dependency.Factory) *QueryNode { lifetime: lifetime.NewLifetime(commonpb.StateCode_Abnormal), } - node.tSafeManager = tsafe.NewTSafeReplica() expr.Register("querynode", node) return node } @@ -352,7 +349,7 @@ func (node *QueryNode) Init() error { node.manager.SetLoader(node.loader) node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, node.GetNodeID()) // init pipeline manager - node.pipelineManager = pipeline.NewManager(node.manager, node.tSafeManager, node.dispClient, node.delegators) + node.pipelineManager = pipeline.NewManager(node.manager, node.dispClient, node.delegators) err = node.InitSegcore() if err != nil { diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index cab6f17dde..f53ac5faf8 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -253,7 +253,6 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm req.GetVersion(), node.clusterManager, node.manager, - node.tSafeManager, node.loader, node.factory, channel.GetSeekPosition().GetTimestamp(), @@ -271,14 +270,6 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm } }() - // create tSafe - node.tSafeManager.Add(ctx, channel.ChannelName, channel.GetSeekPosition().GetTimestamp()) - defer func() { - if err != nil { - node.tSafeManager.Remove(ctx, channel.ChannelName) - } - }() - pipeline, err := node.pipelineManager.Add(req.GetCollectionID(), channel.GetChannelName()) if err != nil { msg := "failed to create pipeline" @@ -368,7 +359,6 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC node.pipelineManager.Remove(req.GetChannelName()) node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing)) _, sealed := node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0)) - node.tSafeManager.Remove(ctx, req.GetChannelName()) node.manager.Collection.Unref(req.GetCollectionID(), uint32(1+sealed)) } diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 1ab3663b11..c3d44aadce 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -1755,11 +1755,13 @@ func (suite *ServiceSuite) TestGetMetric_Normal() { sd1 := delegator.NewMockShardDelegator(suite.T()) sd1.EXPECT().Collection().Return(100) sd1.EXPECT().GetDeleteBufferSize().Return(10, 1000) + sd1.EXPECT().GetTSafe().Return(100) sd1.EXPECT().Close().Maybe() suite.node.delegators.Insert("qn_unitest_dml_0_100v0", sd1) sd2 := delegator.NewMockShardDelegator(suite.T()) sd2.EXPECT().Collection().Return(100) + sd2.EXPECT().GetTSafe().Return(200) sd2.EXPECT().GetDeleteBufferSize().Return(10, 1000) sd2.EXPECT().Close().Maybe() suite.node.delegators.Insert("qn_unitest_dml_1_100v1", sd2) diff --git a/internal/querynodev2/tsafe/OWNERS b/internal/querynodev2/tsafe/OWNERS deleted file mode 100644 index b4179850dd..0000000000 --- a/internal/querynodev2/tsafe/OWNERS +++ /dev/null @@ -1,9 +0,0 @@ -reviewers: - - aoiasd - - bigsheeper - - congqixia - - yah01 - -approvers: - - maintainers - diff --git a/internal/querynodev2/tsafe/listener.go b/internal/querynodev2/tsafe/listener.go deleted file mode 100644 index b0abd0a5df..0000000000 --- a/internal/querynodev2/tsafe/listener.go +++ /dev/null @@ -1,64 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tsafe - -type Listener interface { - On() <-chan struct{} - Close() -} - -type listener struct { - tsafe *tSafeManager - channel string - ch chan struct{} -} - -func (l *listener) On() <-chan struct{} { - return l.ch -} - -func (l *listener) Close() { - l.tsafe.mu.Lock() - defer l.tsafe.mu.Unlock() - l.close() -} - -// close remove the listener from the tSafeReplica without lock -func (l *listener) close() { - for i, listen := range l.tsafe.listeners[l.channel] { - if l == listen { - close(l.ch) - l.tsafe.listeners[l.channel] = append(l.tsafe.listeners[l.channel][:i], l.tsafe.listeners[l.channel][i+1:]...) - break - } - } -} - -func (l *listener) nonBlockingNotify() { - select { - case l.ch <- struct{}{}: - default: - } -} - -func newListener(tsafe *tSafeManager, channel string) *listener { - return &listener{ - tsafe: tsafe, - channel: channel, - ch: make(chan struct{}, 1), - } -} diff --git a/internal/querynodev2/tsafe/manager.go b/internal/querynodev2/tsafe/manager.go deleted file mode 100644 index 6e56e8ae2b..0000000000 --- a/internal/querynodev2/tsafe/manager.go +++ /dev/null @@ -1,149 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tsafe - -import ( - "context" - "fmt" - "sync" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/tsoutil" - . "github.com/milvus-io/milvus/pkg/util/typeutil" -) - -// Manager is the interface for tsafe manager. -type Manager interface { - Get(vChannel string) (Timestamp, error) - Set(vChannel string, timestamp Timestamp) error - Add(ctx context.Context, vChannel string, timestamp Timestamp) - Remove(ctx context.Context, vChannel string) - Watch() Listener - WatchChannel(channel string) Listener - - Min() (string, Timestamp) -} - -// tSafeManager implements `Manager` interface. -type tSafeManager struct { - mu sync.Mutex // guards tSafes - tSafes map[string]*tSafe // map[DMLChannel]*tSafe - listeners map[string][]*listener // map[DMLChannel][]*listener, key "" means all channels. -} - -func (t *tSafeManager) Watch() Listener { - return t.WatchChannel("") -} - -func (t *tSafeManager) WatchChannel(channel string) Listener { - t.mu.Lock() - defer t.mu.Unlock() - l := newListener(t, channel) - t.listeners[channel] = append(t.listeners[channel], l) - return l -} - -func (t *tSafeManager) Add(ctx context.Context, vChannel string, timestamp uint64) { - ts, _ := tsoutil.ParseTS(timestamp) - t.mu.Lock() - defer t.mu.Unlock() - if _, ok := t.tSafes[vChannel]; !ok { - t.tSafes[vChannel] = newTSafe(vChannel, timestamp) - } - log.Ctx(ctx).Info("add tSafe done", - zap.String("channel", vChannel), zap.Time("timestamp", ts)) -} - -func (t *tSafeManager) Get(vChannel string) (Timestamp, error) { - t.mu.Lock() - defer t.mu.Unlock() - ts, err := t.get(vChannel) - if err != nil { - return 0, err - } - return ts.get(), nil -} - -func (t *tSafeManager) Set(vChannel string, timestamp Timestamp) error { - t.mu.Lock() - defer t.mu.Unlock() - ts, err := t.get(vChannel) - if err != nil { - return fmt.Errorf("set tSafe failed, err = %w", err) - } - ts.set(timestamp) - t.notifyAll(vChannel) - return nil -} - -func (t *tSafeManager) Remove(ctx context.Context, vChannel string) { - t.mu.Lock() - defer t.mu.Unlock() - tsafe, ok := t.tSafes[vChannel] - if ok { - tsafe.close() - } - for _, l := range t.listeners[vChannel] { - l.close() - } - delete(t.tSafes, vChannel) - delete(t.listeners, vChannel) - log.Ctx(ctx).Info("remove tSafe replica", - zap.String("vChannel", vChannel)) -} - -func (t *tSafeManager) Min() (string, Timestamp) { - t.mu.Lock() - defer t.mu.Unlock() - var minChannel string - minTt := MaxTimestamp - for channel, tsafe := range t.tSafes { - t := tsafe.get() - if t < minTt && t != 0 { - minChannel = channel - minTt = t - } - } - return minChannel, minTt -} - -func (t *tSafeManager) get(vChannel string) (*tSafe, error) { - if _, ok := t.tSafes[vChannel]; !ok { - return nil, fmt.Errorf("cannot found tSafer, vChannel = %s", vChannel) - } - return t.tSafes[vChannel], nil -} - -// since notifyAll called by setTSafe, no need to lock -func (t *tSafeManager) notifyAll(channel string) { - for _, l := range t.listeners[""] { - l.nonBlockingNotify() - } - for _, l := range t.listeners[channel] { - l.nonBlockingNotify() - } -} - -func NewTSafeReplica() Manager { - replica := &tSafeManager{ - tSafes: make(map[string]*tSafe), - listeners: make(map[string][]*listener), - } - return replica -} diff --git a/internal/querynodev2/tsafe/tsafe.go b/internal/querynodev2/tsafe/tsafe.go deleted file mode 100644 index 9ec40f1ed8..0000000000 --- a/internal/querynodev2/tsafe/tsafe.go +++ /dev/null @@ -1,55 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tsafe - -import ( - "go.uber.org/atomic" - - . "github.com/milvus-io/milvus/pkg/util/typeutil" -) - -type tSafe struct { - channel string - tSafe atomic.Uint64 - closed atomic.Bool -} - -func (ts *tSafe) valid() bool { - return !ts.closed.Load() -} - -func (ts *tSafe) close() { - ts.closed.Store(true) -} - -func (ts *tSafe) get() Timestamp { - return ts.tSafe.Load() -} - -func (ts *tSafe) set(t Timestamp) { - ts.tSafe.Store(t) -} - -func newTSafe(channel string, timestamp uint64) *tSafe { - ts := &tSafe{ - channel: channel, - } - ts.tSafe.Store(timestamp) - ts.closed.Store(false) - - return ts -} diff --git a/internal/querynodev2/tsafe/tsafe_test.go b/internal/querynodev2/tsafe/tsafe_test.go deleted file mode 100644 index 3fd94065a6..0000000000 --- a/internal/querynodev2/tsafe/tsafe_test.go +++ /dev/null @@ -1,94 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tsafe - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/suite" - - . "github.com/milvus-io/milvus/pkg/util/typeutil" -) - -type TSafeTestSuite struct { - suite.Suite - tSafeReplica Manager - channel string - time Timestamp -} - -func (suite *TSafeTestSuite) SetupSuite() { - suite.channel = "test-channel" - suite.time = uint64(time.Now().Unix()) -} - -func (suite *TSafeTestSuite) SetupTest() { - suite.tSafeReplica = NewTSafeReplica() -} - -// test Basic use of TSafeReplica -func (suite *TSafeTestSuite) TestBasic() { - suite.tSafeReplica.Add(context.Background(), suite.channel, ZeroTimestamp) - t, err := suite.tSafeReplica.Get(suite.channel) - suite.NoError(err) - suite.Equal(ZeroTimestamp, t) - - // Add listener - globalWatcher := suite.tSafeReplica.WatchChannel(suite.channel) - channelWatcher := suite.tSafeReplica.Watch() - defer globalWatcher.Close() - defer channelWatcher.Close() - - // Test Set tSafe - suite.tSafeReplica.Set(suite.channel, suite.time) - t, err = suite.tSafeReplica.Get(suite.channel) - suite.NoError(err) - suite.Equal(suite.time, t) - - // Test listener - select { - case <-globalWatcher.On(): - default: - suite.Fail("global watcher should be triggered") - } - - select { - case <-channelWatcher.On(): - default: - suite.Fail("channel watcher should be triggered") - } -} - -func (suite *TSafeTestSuite) TestRemoveAndInvalid() { - suite.tSafeReplica.Add(context.Background(), suite.channel, ZeroTimestamp) - t, err := suite.tSafeReplica.Get(suite.channel) - suite.NoError(err) - suite.Equal(ZeroTimestamp, t) - - suite.tSafeReplica.Remove(context.Background(), suite.channel) - _, err = suite.tSafeReplica.Get(suite.channel) - suite.Error(err) - - err = suite.tSafeReplica.Set(suite.channel, suite.time) - suite.Error(err) -} - -func TestTSafe(t *testing.T) { - suite.Run(t, new(TSafeTestSuite)) -}