mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: Return error when startup Delete/AddNode fail (#33193)
See also: #33151, #33149 --------- Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
f20becb725
commit
819a624753
@ -31,6 +31,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
@ -131,16 +132,19 @@ func (m *ChannelManagerImplV2) Startup(ctx context.Context, legacyNodes, allNode
|
||||
oNodes := m.store.GetNodes()
|
||||
m.mu.Unlock()
|
||||
|
||||
// Add new online nodes to the cluster.
|
||||
offLines, newOnLines := lo.Difference(oNodes, allNodes)
|
||||
lo.ForEach(newOnLines, func(nodeID int64, _ int) {
|
||||
m.AddNode(nodeID)
|
||||
})
|
||||
|
||||
// Delete offlines from the cluster
|
||||
lo.ForEach(offLines, func(nodeID int64, _ int) {
|
||||
m.DeleteNode(nodeID)
|
||||
})
|
||||
for _, nodeID := range offLines {
|
||||
if err := m.DeleteNode(nodeID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Add new online nodes to the cluster.
|
||||
for _, nodeID := range newOnLines {
|
||||
if err := m.AddNode(nodeID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
nodeChannels := m.store.GetNodeChannelsBy(
|
||||
@ -654,7 +658,10 @@ func (m *ChannelManagerImplV2) Check(ctx context.Context, nodeID int64, info *da
|
||||
)
|
||||
resp, err := m.subCluster.CheckChannelOperationProgress(ctx, nodeID, info)
|
||||
if err != nil {
|
||||
log.Warn("Fail to check channel operation progress")
|
||||
log.Warn("Fail to check channel operation progress", zap.Error(err))
|
||||
if errors.Is(err, merr.ErrNodeNotFound) {
|
||||
return false, true
|
||||
}
|
||||
return false, false
|
||||
}
|
||||
log.Info("Got channel operation progress",
|
||||
|
||||
@ -21,6 +21,7 @@ import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/mock"
|
||||
@ -31,6 +32,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/kv/predicates"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
@ -446,6 +448,29 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
|
||||
s.checkAssignment(m, 1, "ch1", Watching)
|
||||
s.checkAssignment(m, 1, "ch2", Watching)
|
||||
})
|
||||
s.Run("advance watching channels check ErrNodeNotFound", func() {
|
||||
chNodes := map[string]int64{
|
||||
"ch1": 1,
|
||||
"ch2": 1,
|
||||
}
|
||||
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
|
||||
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
|
||||
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
|
||||
s.Require().NoError(err)
|
||||
s.checkAssignment(m, 1, "ch1", ToWatch)
|
||||
s.checkAssignment(m, 1, "ch2", ToWatch)
|
||||
|
||||
m.AdvanceChannelState(ctx)
|
||||
s.checkAssignment(m, 1, "ch1", Watching)
|
||||
s.checkAssignment(m, 1, "ch2", Watching)
|
||||
|
||||
s.mockCluster.EXPECT().CheckChannelOperationProgress(mock.Anything, mock.Anything, mock.Anything).
|
||||
Return(nil, merr.WrapErrNodeNotFound(1)).Twice()
|
||||
m.AdvanceChannelState(ctx)
|
||||
s.checkAssignment(m, 1, "ch1", Standby)
|
||||
s.checkAssignment(m, 1, "ch2", Standby)
|
||||
})
|
||||
|
||||
s.Run("advance watching channels check watch success", func() {
|
||||
chNodes := map[string]int64{
|
||||
"ch1": 1,
|
||||
@ -517,6 +542,28 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
|
||||
s.checkAssignment(m, 1, "ch1", Releasing)
|
||||
s.checkAssignment(m, 1, "ch2", Releasing)
|
||||
})
|
||||
s.Run("advance releasing channels check ErrNodeNotFound", func() {
|
||||
chNodes := map[string]int64{
|
||||
"ch1": 1,
|
||||
"ch2": 1,
|
||||
}
|
||||
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
|
||||
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
|
||||
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
|
||||
s.Require().NoError(err)
|
||||
s.checkAssignment(m, 1, "ch1", ToRelease)
|
||||
s.checkAssignment(m, 1, "ch2", ToRelease)
|
||||
|
||||
m.AdvanceChannelState(ctx)
|
||||
s.checkAssignment(m, 1, "ch1", Releasing)
|
||||
s.checkAssignment(m, 1, "ch2", Releasing)
|
||||
|
||||
s.mockCluster.EXPECT().CheckChannelOperationProgress(mock.Anything, mock.Anything, mock.Anything).
|
||||
Return(nil, merr.WrapErrNodeNotFound(1)).Twice()
|
||||
m.AdvanceChannelState(ctx)
|
||||
s.checkAssignment(m, 1, "ch1", Standby)
|
||||
s.checkAssignment(m, 1, "ch2", Standby)
|
||||
})
|
||||
s.Run("advance releasing channels check release success", func() {
|
||||
chNodes := map[string]int64{
|
||||
"ch1": 1,
|
||||
@ -659,5 +706,26 @@ func (s *ChannelManagerSuite) TestStartup() {
|
||||
s.checkAssignment(m, 2, "ch3", ToWatch)
|
||||
}
|
||||
|
||||
func (s *ChannelManagerSuite) TestStartupRootCoordFailed() {
|
||||
chNodes := map[string]int64{
|
||||
"ch1": 1,
|
||||
"ch2": 1,
|
||||
"ch3": 1,
|
||||
"ch4": bufferID,
|
||||
}
|
||||
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
|
||||
|
||||
s.mockAlloc = NewNMockAllocator(s.T())
|
||||
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(0, errors.New("mock rootcoord failure"))
|
||||
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = m.Startup(context.TODO(), nil, []int64{2})
|
||||
s.Error(err)
|
||||
|
||||
err = m.Startup(context.TODO(), nil, []int64{1, 2})
|
||||
s.Error(err)
|
||||
}
|
||||
|
||||
func (s *ChannelManagerSuite) TestCheckLoop() {}
|
||||
func (s *ChannelManagerSuite) TestGet() {}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user