diff --git a/internal/datacoord/channel_manager_v2.go b/internal/datacoord/channel_manager_v2.go index 6243761ce7..ccaed65ea9 100644 --- a/internal/datacoord/channel_manager_v2.go +++ b/internal/datacoord/channel_manager_v2.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/lock" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -131,16 +132,19 @@ func (m *ChannelManagerImplV2) Startup(ctx context.Context, legacyNodes, allNode oNodes := m.store.GetNodes() m.mu.Unlock() - // Add new online nodes to the cluster. offLines, newOnLines := lo.Difference(oNodes, allNodes) - lo.ForEach(newOnLines, func(nodeID int64, _ int) { - m.AddNode(nodeID) - }) - // Delete offlines from the cluster - lo.ForEach(offLines, func(nodeID int64, _ int) { - m.DeleteNode(nodeID) - }) + for _, nodeID := range offLines { + if err := m.DeleteNode(nodeID); err != nil { + return err + } + } + // Add new online nodes to the cluster. + for _, nodeID := range newOnLines { + if err := m.AddNode(nodeID); err != nil { + return err + } + } m.mu.Lock() nodeChannels := m.store.GetNodeChannelsBy( @@ -654,7 +658,10 @@ func (m *ChannelManagerImplV2) Check(ctx context.Context, nodeID int64, info *da ) resp, err := m.subCluster.CheckChannelOperationProgress(ctx, nodeID, info) if err != nil { - log.Warn("Fail to check channel operation progress") + log.Warn("Fail to check channel operation progress", zap.Error(err)) + if errors.Is(err, merr.ErrNodeNotFound) { + return false, true + } return false, false } log.Info("Got channel operation progress", diff --git a/internal/datacoord/channel_manager_v2_test.go b/internal/datacoord/channel_manager_v2_test.go index 4bacd11399..b2093b9881 100644 --- a/internal/datacoord/channel_manager_v2_test.go +++ b/internal/datacoord/channel_manager_v2_test.go @@ -21,6 +21,7 @@ import ( "fmt" "testing" + "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" "github.com/samber/lo" "github.com/stretchr/testify/mock" @@ -31,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/kv/predicates" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -446,6 +448,29 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() { s.checkAssignment(m, 1, "ch1", Watching) s.checkAssignment(m, 1, "ch2", Watching) }) + s.Run("advance watching channels check ErrNodeNotFound", func() { + chNodes := map[string]int64{ + "ch1": 1, + "ch2": 1, + } + s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch) + s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice() + m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc) + s.Require().NoError(err) + s.checkAssignment(m, 1, "ch1", ToWatch) + s.checkAssignment(m, 1, "ch2", ToWatch) + + m.AdvanceChannelState(ctx) + s.checkAssignment(m, 1, "ch1", Watching) + s.checkAssignment(m, 1, "ch2", Watching) + + s.mockCluster.EXPECT().CheckChannelOperationProgress(mock.Anything, mock.Anything, mock.Anything). + Return(nil, merr.WrapErrNodeNotFound(1)).Twice() + m.AdvanceChannelState(ctx) + s.checkAssignment(m, 1, "ch1", Standby) + s.checkAssignment(m, 1, "ch2", Standby) + }) + s.Run("advance watching channels check watch success", func() { chNodes := map[string]int64{ "ch1": 1, @@ -517,6 +542,28 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() { s.checkAssignment(m, 1, "ch1", Releasing) s.checkAssignment(m, 1, "ch2", Releasing) }) + s.Run("advance releasing channels check ErrNodeNotFound", func() { + chNodes := map[string]int64{ + "ch1": 1, + "ch2": 1, + } + s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease) + s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice() + m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc) + s.Require().NoError(err) + s.checkAssignment(m, 1, "ch1", ToRelease) + s.checkAssignment(m, 1, "ch2", ToRelease) + + m.AdvanceChannelState(ctx) + s.checkAssignment(m, 1, "ch1", Releasing) + s.checkAssignment(m, 1, "ch2", Releasing) + + s.mockCluster.EXPECT().CheckChannelOperationProgress(mock.Anything, mock.Anything, mock.Anything). + Return(nil, merr.WrapErrNodeNotFound(1)).Twice() + m.AdvanceChannelState(ctx) + s.checkAssignment(m, 1, "ch1", Standby) + s.checkAssignment(m, 1, "ch2", Standby) + }) s.Run("advance releasing channels check release success", func() { chNodes := map[string]int64{ "ch1": 1, @@ -659,5 +706,26 @@ func (s *ChannelManagerSuite) TestStartup() { s.checkAssignment(m, 2, "ch3", ToWatch) } +func (s *ChannelManagerSuite) TestStartupRootCoordFailed() { + chNodes := map[string]int64{ + "ch1": 1, + "ch2": 1, + "ch3": 1, + "ch4": bufferID, + } + s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch) + + s.mockAlloc = NewNMockAllocator(s.T()) + s.mockAlloc.EXPECT().allocID(mock.Anything).Return(0, errors.New("mock rootcoord failure")) + m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc) + s.Require().NoError(err) + + err = m.Startup(context.TODO(), nil, []int64{2}) + s.Error(err) + + err = m.Startup(context.TODO(), nil, []int64{1, 2}) + s.Error(err) +} + func (s *ChannelManagerSuite) TestCheckLoop() {} func (s *ChannelManagerSuite) TestGet() {}