From d51a8088514230b18cd5d94cbe70ef54135feec6 Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 17 Oct 2024 12:15:25 +0800 Subject: [PATCH] fix: Rootcoord stuck at graceful stop progress (#36880) issue: #34553 when rootcoord trigger graceful stop progress, it will block until all rpc finished. for create collection request, rootcoord need to block until datacoord finish to watch all channels, but datacoord need to call `rootcoord.Alloc` during watch channel, and rootcoord doesn't respond to new request anymore. which cause create collection stucks, and graceful stop progress stucks. This PR remove the func call `rootcoord.Alloc` to solve the logic dead lock during graceful stop progress. Signed-off-by: Wei Liu --- Makefile | 2 +- internal/allocator/global_id_allocator.go | 9 + .../allocator/mock_global_id_allocator.go | 183 ++++++++++++++++++ internal/datacoord/channel_manager.go | 8 +- internal/datacoord/channel_manager_test.go | 14 +- internal/datacoord/server.go | 29 ++- internal/datacoord/server_test.go | 6 +- 7 files changed, 228 insertions(+), 23 deletions(-) create mode 100644 internal/allocator/mock_global_id_allocator.go diff --git a/Makefile b/Makefile index cb5a7001f3..21887856ba 100644 --- a/Makefile +++ b/Makefile @@ -534,7 +534,7 @@ generate-mockery-utils: getdeps $(INSTALL_PATH)/mockery --name=ProxyWatcherInterface --dir=$(PWD)/internal/util/proxyutil --output=$(PWD)/internal/util/proxyutil --filename=mock_proxy_watcher.go --with-expecter --structname=MockProxyWatcher --inpackage # function $(INSTALL_PATH)/mockery --name=FunctionRunner --dir=$(PWD)/internal/util/function --output=$(PWD)/internal/util/function --filename=mock_function.go --with-expecter --structname=MockFunctionRunner --inpackage - + $(INSTALL_PATH)/mockery --name=GlobalIDAllocatorInterface --dir=internal/allocator --output=internal/allocator --filename=mock_global_id_allocator.go --with-expecter --structname=MockGlobalIDAllocator --inpackage generate-mockery-kv: getdeps $(INSTALL_PATH)/mockery --name=TxnKV --dir=$(PWD)/pkg/kv --output=$(PWD)/internal/kv/mocks --filename=txn_kv.go --with-expecter diff --git a/internal/allocator/global_id_allocator.go b/internal/allocator/global_id_allocator.go index 12d72cc437..cd95412d5d 100644 --- a/internal/allocator/global_id_allocator.go +++ b/internal/allocator/global_id_allocator.go @@ -22,6 +22,15 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) +type GlobalIDAllocatorInterface interface { + // Initialize will initialize the created global TSO allocator. + Initialize() error + // Alloc allocates the id of the count number. + Alloc(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) + // AllocOne allocates one id. + AllocOne() (typeutil.UniqueID, error) +} + // GlobalIDAllocator is the global single point TSO allocator. type GlobalIDAllocator struct { allocator tso.Allocator diff --git a/internal/allocator/mock_global_id_allocator.go b/internal/allocator/mock_global_id_allocator.go new file mode 100644 index 0000000000..b1f0b4d246 --- /dev/null +++ b/internal/allocator/mock_global_id_allocator.go @@ -0,0 +1,183 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package allocator + +import mock "github.com/stretchr/testify/mock" + +// MockGlobalIDAllocator is an autogenerated mock type for the GlobalIDAllocatorInterface type +type MockGlobalIDAllocator struct { + mock.Mock +} + +type MockGlobalIDAllocator_Expecter struct { + mock *mock.Mock +} + +func (_m *MockGlobalIDAllocator) EXPECT() *MockGlobalIDAllocator_Expecter { + return &MockGlobalIDAllocator_Expecter{mock: &_m.Mock} +} + +// Alloc provides a mock function with given fields: count +func (_m *MockGlobalIDAllocator) Alloc(count uint32) (int64, int64, error) { + ret := _m.Called(count) + + var r0 int64 + var r1 int64 + var r2 error + if rf, ok := ret.Get(0).(func(uint32) (int64, int64, error)); ok { + return rf(count) + } + if rf, ok := ret.Get(0).(func(uint32) int64); ok { + r0 = rf(count) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(uint32) int64); ok { + r1 = rf(count) + } else { + r1 = ret.Get(1).(int64) + } + + if rf, ok := ret.Get(2).(func(uint32) error); ok { + r2 = rf(count) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MockGlobalIDAllocator_Alloc_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Alloc' +type MockGlobalIDAllocator_Alloc_Call struct { + *mock.Call +} + +// Alloc is a helper method to define mock.On call +// - count uint32 +func (_e *MockGlobalIDAllocator_Expecter) Alloc(count interface{}) *MockGlobalIDAllocator_Alloc_Call { + return &MockGlobalIDAllocator_Alloc_Call{Call: _e.mock.On("Alloc", count)} +} + +func (_c *MockGlobalIDAllocator_Alloc_Call) Run(run func(count uint32)) *MockGlobalIDAllocator_Alloc_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(uint32)) + }) + return _c +} + +func (_c *MockGlobalIDAllocator_Alloc_Call) Return(_a0 int64, _a1 int64, _a2 error) *MockGlobalIDAllocator_Alloc_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *MockGlobalIDAllocator_Alloc_Call) RunAndReturn(run func(uint32) (int64, int64, error)) *MockGlobalIDAllocator_Alloc_Call { + _c.Call.Return(run) + return _c +} + +// AllocOne provides a mock function with given fields: +func (_m *MockGlobalIDAllocator) AllocOne() (int64, error) { + ret := _m.Called() + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func() (int64, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockGlobalIDAllocator_AllocOne_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocOne' +type MockGlobalIDAllocator_AllocOne_Call struct { + *mock.Call +} + +// AllocOne is a helper method to define mock.On call +func (_e *MockGlobalIDAllocator_Expecter) AllocOne() *MockGlobalIDAllocator_AllocOne_Call { + return &MockGlobalIDAllocator_AllocOne_Call{Call: _e.mock.On("AllocOne")} +} + +func (_c *MockGlobalIDAllocator_AllocOne_Call) Run(run func()) *MockGlobalIDAllocator_AllocOne_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockGlobalIDAllocator_AllocOne_Call) Return(_a0 int64, _a1 error) *MockGlobalIDAllocator_AllocOne_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockGlobalIDAllocator_AllocOne_Call) RunAndReturn(run func() (int64, error)) *MockGlobalIDAllocator_AllocOne_Call { + _c.Call.Return(run) + return _c +} + +// Initialize provides a mock function with given fields: +func (_m *MockGlobalIDAllocator) Initialize() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockGlobalIDAllocator_Initialize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Initialize' +type MockGlobalIDAllocator_Initialize_Call struct { + *mock.Call +} + +// Initialize is a helper method to define mock.On call +func (_e *MockGlobalIDAllocator_Expecter) Initialize() *MockGlobalIDAllocator_Initialize_Call { + return &MockGlobalIDAllocator_Initialize_Call{Call: _e.mock.On("Initialize")} +} + +func (_c *MockGlobalIDAllocator_Initialize_Call) Run(run func()) *MockGlobalIDAllocator_Initialize_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockGlobalIDAllocator_Initialize_Call) Return(_a0 error) *MockGlobalIDAllocator_Initialize_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockGlobalIDAllocator_Initialize_Call) RunAndReturn(run func() error) *MockGlobalIDAllocator_Initialize_Call { + _c.Call.Return(run) + return _c +} + +// NewMockGlobalIDAllocator creates a new instance of MockGlobalIDAllocator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockGlobalIDAllocator(t interface { + mock.TestingT + Cleanup(func()) +}) *MockGlobalIDAllocator { + mock := &MockGlobalIDAllocator{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 76774ead73..01a9a01625 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -26,7 +26,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/datacoord/allocator" + "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/kv" @@ -69,7 +69,7 @@ type ChannelManagerImpl struct { h Handler store RWChannelStore subCluster SubCluster // sessionManager - allocator allocator.Allocator + allocator allocator.GlobalIDAllocatorInterface factory ChannelPolicyFactory balancePolicy BalanceChannelPolicy @@ -100,7 +100,7 @@ func NewChannelManager( kv kv.TxnKV, h Handler, subCluster SubCluster, // sessionManager - alloc allocator.Allocator, + alloc allocator.GlobalIDAllocatorInterface, options ...ChannelmanagerOpt, ) (*ChannelManagerImpl, error) { m := &ChannelManagerImpl{ @@ -702,7 +702,7 @@ func (m *ChannelManagerImpl) fillChannelWatchInfo(op *ChannelOp) error { startTs := time.Now().Unix() for _, ch := range op.Channels { vcInfo := m.h.GetDataVChanPositions(ch, allPartitionID) - opID, err := m.allocator.AllocID(context.Background()) + opID, err := m.allocator.AllocOne() if err != nil { return err } diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 6ed8aaf258..919fbd1831 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -18,10 +18,10 @@ package datacoord import ( "context" + "errors" "fmt" "testing" - "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -29,7 +29,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/datacoord/allocator" + globalIDAllocator "github.com/milvus-io/milvus/internal/allocator" kvmock "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/kv/predicates" @@ -47,7 +47,7 @@ type ChannelManagerSuite struct { mockKv *kvmock.MetaKv mockCluster *MockSubCluster - mockAlloc *allocator.MockAllocator + mockAlloc *globalIDAllocator.MockGlobalIDAllocator mockHandler *NMockHandler } @@ -96,7 +96,6 @@ func (s *ChannelManagerSuite) checkNoAssignment(m *ChannelManagerImpl, nodeID in func (s *ChannelManagerSuite) SetupTest() { s.mockKv = kvmock.NewMetaKv(s.T()) s.mockCluster = NewMockSubCluster(s.T()) - s.mockAlloc = allocator.NewMockAllocator(s.T()) s.mockHandler = NewNMockHandler(s.T()) s.mockHandler.EXPECT().GetDataVChanPositions(mock.Anything, mock.Anything). RunAndReturn(func(ch RWChannel, partitionID UniqueID) *datapb.VchannelInfo { @@ -105,12 +104,13 @@ func (s *ChannelManagerSuite) SetupTest() { ChannelName: ch.GetName(), } }).Maybe() - s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(19530, nil).Maybe() s.mockKv.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).RunAndReturn( func(save map[string]string, removals []string, preds ...predicates.Predicate) error { log.Info("test save and remove", zap.Any("save", save), zap.Any("removals", removals)) return nil }).Maybe() + s.mockAlloc = globalIDAllocator.NewMockGlobalIDAllocator(s.T()) + s.mockAlloc.EXPECT().AllocOne().Return(1, nil).Maybe() } func (s *ChannelManagerSuite) TearDownTest() {} @@ -791,8 +791,8 @@ func (s *ChannelManagerSuite) TestStartupRootCoordFailed() { } s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch) - s.mockAlloc = allocator.NewMockAllocator(s.T()) - s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(0, errors.New("mock rootcoord failure")) + s.mockAlloc = globalIDAllocator.NewMockGlobalIDAllocator(s.T()) + s.mockAlloc.EXPECT().AllocOne().Return(0, errors.New("mock error")).Maybe() m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc) s.Require().NoError(err) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index ae14db35d9..2cdf364cb5 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + globalIDAllocator "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/datacoord/session" @@ -102,14 +103,16 @@ type Server struct { quitCh chan struct{} stateCode atomic.Value - etcdCli *clientv3.Client - tikvCli *txnkv.Client - address string - watchClient kv.WatchKV - kv kv.MetaKv - meta *meta - segmentManager Manager - allocator allocator.Allocator + etcdCli *clientv3.Client + tikvCli *txnkv.Client + address string + watchClient kv.WatchKV + kv kv.MetaKv + meta *meta + segmentManager Manager + allocator allocator.Allocator + // self host id allocator, to avoid get unique id from rootcoord + idAllocator *globalIDAllocator.GlobalIDAllocator cluster Cluster sessionManager session.DataNodeManager channelManager ChannelManager @@ -346,6 +349,14 @@ func (s *Server) initDataCoord() error { return err } + // init id allocator after init meta + s.idAllocator = globalIDAllocator.NewGlobalIDAllocator("idTimestamp", s.kv) + err = s.idAllocator.Initialize() + if err != nil { + log.Error("data coordinator id allocator initialize failed", zap.Error(err)) + return err + } + // Initialize streaming coordinator. if streamingutil.IsStreamingServiceEnabled() { s.streamingCoord = streamingcoord.NewServerBuilder(). @@ -489,7 +500,7 @@ func (s *Server) initCluster() error { s.sessionManager = session.NewDataNodeManagerImpl(session.WithDataNodeCreator(s.dataNodeCreator)) var err error - s.channelManager, err = NewChannelManager(s.watchClient, s.handler, s.sessionManager, s.allocator, withCheckerV2()) + s.channelManager, err = NewChannelManager(s.watchClient, s.handler, s.sessionManager, s.idAllocator, withCheckerV2()) if err != nil { return err } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 00f10e9480..6a9ab3edd4 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -41,6 +41,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + globalIDAllocator "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/datacoord/session" @@ -1824,7 +1825,8 @@ func TestOptions(t *testing.T) { defer kv.RemoveWithPrefix("") sessionManager := session.NewDataNodeManagerImpl() - channelManager, err := NewChannelManager(kv, newMockHandler(), sessionManager, allocator.NewMockAllocator(t)) + mockAlloc := globalIDAllocator.NewMockGlobalIDAllocator(t) + channelManager, err := NewChannelManager(kv, newMockHandler(), sessionManager, mockAlloc) assert.NoError(t, err) cluster := NewClusterImpl(sessionManager, channelManager) @@ -1863,9 +1865,9 @@ func TestHandleSessionEvent(t *testing.T) { }() ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - alloc := allocator.NewMockAllocator(t) sessionManager := session.NewDataNodeManagerImpl() + alloc := globalIDAllocator.NewMockGlobalIDAllocator(t) channelManager, err := NewChannelManager(kv, newMockHandler(), sessionManager, alloc) assert.NoError(t, err)