diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 7141658ef5..dd5d4d36cb 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -84,6 +84,10 @@ type ChannelManagerImpl struct { legacyNodes typeutil.UniqueSet lastActiveTimestamp time.Time + + // Idempotency and restart support + startupMu sync.Mutex // Protects Startup/Close operations + started bool } // ChannelBGChecker are goroutining running background @@ -130,7 +134,19 @@ func NewChannelManager( } func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes []int64) error { - ctx, m.cancel = context.WithCancel(ctx) + m.startupMu.Lock() + defer m.startupMu.Unlock() + + if m.started { + // Already started, need to close first then restart + m.doClose() + } + + return m.doStartup(ctx, legacyNodes, allNodes) +} + +func (m *ChannelManagerImpl) doStartup(ctx context.Context, legacyNodes, allNodes []int64) error { + ctx, cancel := context.WithCancel(ctx) m.legacyNodes = typeutil.NewUniqueSet(legacyNodes...) @@ -165,6 +181,10 @@ func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes } m.mu.Unlock() + // All operations succeeded, now set the state + m.cancel = cancel + m.started = true + if m.balanceCheckLoop != nil { log.Ctx(ctx).Info("starting channel balance loop") m.wg.Add(1) @@ -184,10 +204,24 @@ func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes } func (m *ChannelManagerImpl) Close() { + m.startupMu.Lock() + defer m.startupMu.Unlock() + m.doClose() +} + +// doClose is the internal implementation of Close without acquiring startupMu. +// It should only be called when startupMu is already held. +func (m *ChannelManagerImpl) doClose() { + if !m.started { + return + } + if m.cancel != nil { m.cancel() m.wg.Wait() } + + m.started = false } func (m *ChannelManagerImpl) AddNode(nodeID UniqueID) error { diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 5bebdfa461..461e86ead5 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -904,3 +904,140 @@ func (s *ChannelManagerSuite) TestGetChannelWatchInfos() { infos = cm.GetChannelWatchInfos() s.Equal(0, len(infos)) } + +func (s *ChannelManagerSuite) TestStartupIdempotency() { + s.Run("repeated Startup calls should be idempotent", func() { + chNodes := map[string]int64{ + "ch1": 1, + "ch2": 1, + } + s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess) + s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false).Maybe() + + m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc, withCheckerV2()) + s.Require().NoError(err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + legacyNodes = []int64{1} + allNodes = []int64{1, 2} + ) + + // First Startup + err = m.Startup(ctx, legacyNodes, allNodes) + s.NoError(err) + s.True(m.started) + s.checkAssignment(m, 1, "ch1", Legacy) + s.checkAssignment(m, 1, "ch2", Legacy) + + // Wait a bit for goroutine to start + // Second Startup - should close and restart + err = m.Startup(ctx, legacyNodes, allNodes) + s.NoError(err) + s.True(m.started) + s.checkAssignment(m, 1, "ch1", Legacy) + s.checkAssignment(m, 1, "ch2", Legacy) + + // Third Startup - should still work + err = m.Startup(ctx, legacyNodes, allNodes) + s.NoError(err) + s.True(m.started) + }) +} + +func (s *ChannelManagerSuite) TestStartupAfterClose() { + s.Run("Startup after Close should restart successfully", func() { + chNodes := map[string]int64{ + "ch1": 1, + "ch2": 1, + } + s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess) + s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false).Maybe() + + m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc, withCheckerV2()) + s.Require().NoError(err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + legacyNodes = []int64{1} + allNodes = []int64{1} + ) + + // First Startup + err = m.Startup(ctx, legacyNodes, allNodes) + s.NoError(err) + s.True(m.started) + s.checkAssignment(m, 1, "ch1", Legacy) + s.checkAssignment(m, 1, "ch2", Legacy) + + // Close + m.Close() + s.False(m.started) + + // Startup again after Close + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + + err = m.Startup(ctx2, legacyNodes, allNodes) + s.NoError(err) + s.True(m.started) + s.checkAssignment(m, 1, "ch1", Legacy) + s.checkAssignment(m, 1, "ch2", Legacy) + + // Close again + m.Close() + s.False(m.started) + }) +} + +func (s *ChannelManagerSuite) TestCloseIdempotency() { + s.Run("multiple Close calls should be idempotent", func() { + chNodes := map[string]int64{ + "ch1": 1, + } + s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess) + s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false).Maybe() + + m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc, withCheckerV2()) + s.Require().NoError(err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Startup first + err = m.Startup(ctx, []int64{1}, []int64{1}) + s.NoError(err) + s.True(m.started) + + // First Close + m.Close() + s.False(m.started) + + // Second Close - should be safe + m.Close() + s.False(m.started) + + // Third Close - should still be safe + m.Close() + s.False(m.started) + }) + + s.Run("Close without Startup should be safe", func() { + s.prepareMeta(nil, 0) + m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc) + s.Require().NoError(err) + + s.False(m.started) + + // Close without Startup should not panic + s.NotPanics(func() { + m.Close() + }) + + s.False(m.started) + }) +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 1093525b42..3fbb6b9cce 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -649,11 +649,6 @@ func (s *Server) rewatchDataNodes(sessions map[string]*sessionutil.Session) erro datanodes = append(datanodes, info) } - // if err := s.nodeManager.Startup(s.ctx, datanodes); err != nil { - // log.Warn("DataCoord failed to add datanode", zap.Error(err)) - // return err - // } - log.Info("DataCoord Cluster Manager start up") if err := s.cluster.Startup(s.ctx, datanodes); err != nil { log.Warn("DataCoord Cluster Manager failed to start up", zap.Error(err))