From 252d49d01e645ef4520e5ac7f942751fb2495d69 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Fri, 23 May 2025 14:16:29 +0800 Subject: [PATCH] fix: ChannelManager double assignment (#41837) See also: #41876 --------- Signed-off-by: yangxuan --- internal/datacoord/channel.go | 37 +++++++++------ internal/datacoord/channel_manager.go | 46 +++++++++--------- internal/datacoord/channel_manager_test.go | 29 +++++++++++- internal/datacoord/channel_store.go | 42 ++++++++++++----- internal/datacoord/channel_store_test.go | 49 ++++++++++++++++++-- internal/datacoord/mock_channel_store.go | 18 +++---- internal/datanode/channel/channel_manager.go | 6 +++ 7 files changed, 165 insertions(+), 62 deletions(-) diff --git a/internal/datacoord/channel.go b/internal/datacoord/channel.go index f85e77702b..e591ef73bf 100644 --- a/internal/datacoord/channel.go +++ b/internal/datacoord/channel.go @@ -22,10 +22,12 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/proto" + "github.com/cockroachdb/errors" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/util/merr" ) type ROChannel interface { @@ -191,15 +193,30 @@ func NewStateChannelByWatchInfo(nodeID int64, info *datapb.ChannelWatchInfo) *St return c } -func (c *StateChannel) TransitionOnSuccess(opID int64) { +func (c *StateChannel) TransitionState(err error, opID int64) { if opID != c.Info.GetOpID() { - log.Warn("Try to transit on success but opID not match, stay original state ", + log.Warn("Try to transit state but opID not match, stay original state ", zap.Any("currentState", c.currentState), zap.String("channel", c.Name), zap.Int64("target opID", opID), zap.Int64("channel opID", c.Info.GetOpID())) return } + + if err == nil { + c.transitionOnSuccess() + return + } + + if errors.Is(err, merr.ErrChannelReduplicate) { + c.setState(ToRelease) + return + } + + c.transitionOnFailure() +} + +func (c *StateChannel) transitionOnSuccess() { switch c.currentState { case Standby: c.setState(ToWatch) @@ -216,21 +233,11 @@ func (c *StateChannel) TransitionOnSuccess(opID int64) { } } -func (c *StateChannel) TransitionOnFailure(opID int64) { - if opID != c.Info.GetOpID() { - log.Warn("Try to transit on failure but opID not match, stay original state", - zap.Any("currentState", c.currentState), - zap.String("channel", c.Name), - zap.Int64("target opID", opID), - zap.Int64("channel opID", c.Info.GetOpID())) - return - } +func (c *StateChannel) transitionOnFailure() { switch c.currentState { - case Watching: + case Watching, Releasing, ToWatch: c.setState(Standby) - case Releasing: - c.setState(Standby) - case Standby, ToWatch, Watched, ToRelease: + case Standby, Watched, ToRelease: // Stay original state } } diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index bac3effc2e..7141658ef5 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -156,14 +156,14 @@ func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes m.mu.Lock() nodeChannels := m.store.GetNodeChannelsBy( WithAllNodes(), - func(ch *StateChannel) bool { + func(ch *StateChannel) bool { // Channel with drop-mark return m.h.CheckShouldDropChannel(ch.GetName()) }) - m.mu.Unlock() for _, info := range nodeChannels { m.finishRemoveChannel(info.NodeID, lo.Values(info.Channels)...) } + m.mu.Unlock() if m.balanceCheckLoop != nil { log.Ctx(ctx).Info("starting channel balance loop") @@ -238,6 +238,7 @@ func (m *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error { zap.Array("updates", updates), zap.Error(err)) } + // Speed up channel assignment // channel already written into meta, try to assign it to the cluster // not error is returned if failed, the assignment will retry later updates = m.assignPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect()) @@ -286,11 +287,8 @@ func (m *ChannelManagerImpl) DeleteNode(nodeID UniqueID) error { return nil } -// reassign reassigns a channel to another DataNode. +// inner method, lock before using it, reassign reassigns a channel to another DataNode. func (m *ChannelManagerImpl) reassign(original *NodeChannelInfo) error { - m.mu.Lock() - defer m.mu.Unlock() - updates := m.assignPolicy(m.store.GetNodesChannels(), original, m.legacyNodes.Collect()) if updates != nil { return m.execute(updates) @@ -436,15 +434,16 @@ func (m *ChannelManagerImpl) CheckLoop(ctx context.Context) { } func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) { - m.mu.RLock() + m.mu.Lock() standbys := m.store.GetNodeChannelsBy(WithAllNodes(), WithChannelStates(Standby)) toNotifies := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(ToWatch, ToRelease)) toChecks := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(Watching, Releasing)) - m.mu.RUnlock() - // Processing standby channels - updatedStandbys := false - updatedStandbys = m.advanceStandbys(ctx, standbys) + // Reassigning standby channels in locks to avoid concurrent assignment with Watch, Remove, AddNode, DeleteNode + updatedStandbys := m.advanceStandbys(ctx, standbys) + m.mu.Unlock() + + // RPCs stays out of locks updatedToCheckes := m.advanceToChecks(ctx, toChecks) updatedToNotifies := m.advanceToNotifies(ctx, toNotifies) @@ -453,9 +452,8 @@ func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) { } } +// inner method need lock func (m *ChannelManagerImpl) finishRemoveChannel(nodeID int64, channels ...RWChannel) { - m.mu.Lock() - defer m.mu.Unlock() for _, ch := range channels { if err := m.removeChannel(nodeID, ch); err != nil { log.Warn("Failed to remove channel", zap.Any("channel", ch), zap.Error(err)) @@ -469,6 +467,7 @@ func (m *ChannelManagerImpl) finishRemoveChannel(nodeID int64, channels ...RWCha } } +// inner method need locks func (m *ChannelManagerImpl) advanceStandbys(ctx context.Context, standbys []*NodeChannelInfo) bool { var advanced bool = false for _, nodeAssign := range standbys { @@ -576,7 +575,7 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [ } m.mu.Lock() - m.store.UpdateState(err == nil, nodeID, res.ch, res.opID) + m.store.UpdateState(err, nodeID, res.ch, res.opID) m.mu.Unlock() } @@ -592,9 +591,9 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [ } type poolResult struct { - successful bool - ch RWChannel - opID int64 + err error + ch RWChannel + opID int64 } func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool { @@ -620,10 +619,14 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No future := getOrCreateIOPool().Submit(func() (any, error) { successful, got := m.Check(ctx, nodeID, tmpWatchInfo) if got { + var err error + if !successful { + err = errors.New("operation in progress") + } return poolResult{ - successful: successful, - ch: innerCh, - opID: tmpWatchInfo.GetOpID(), + err: err, + ch: innerCh, + opID: tmpWatchInfo.GetOpID(), }, nil } return nil, errors.New("Got results with no progress") @@ -636,7 +639,7 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No if err == nil { m.mu.Lock() result := got.(poolResult) - m.store.UpdateState(result.successful, nodeID, result.ch, result.opID) + m.store.UpdateState(result.err, nodeID, result.ch, result.opID) m.mu.Unlock() advanced = true @@ -712,6 +715,7 @@ func (m *ChannelManagerImpl) Check(ctx context.Context, nodeID int64, info *data return false, false } +// inner method need lock func (m *ChannelManagerImpl) execute(updates *ChannelOpSet) error { for _, op := range updates.ops { if op.Type != Delete { diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 907c56f8bb..5bebdfa461 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -378,6 +378,31 @@ func (s *ChannelManagerSuite) TestFindWatcher() { func (s *ChannelManagerSuite) TestAdvanceChannelState() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + s.Run("advance towatch dn watched to torelease", func() { + chNodes := map[string]int64{ + "ch1": 1, + "ch2": 1, + } + s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch) + s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, nodeID int64, req *datapb.ChannelOperationsRequest) error { + s.Require().Equal(1, len(req.GetInfos())) + switch req.GetInfos()[0].GetVchan().GetChannelName() { + case "ch2": + return merr.WrapErrChannelReduplicate("ch2") + default: + return nil + } + }).Twice() + m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc) + s.Require().NoError(err) + s.checkAssignment(m, 1, "ch1", ToWatch) + s.checkAssignment(m, 1, "ch2", ToWatch) + + m.AdvanceChannelState(ctx) + s.checkAssignment(m, 1, "ch1", Watching) + s.checkAssignment(m, 1, "ch2", ToRelease) + }) s.Run("advance statndby with no available nodes", func() { chNodes := map[string]int64{ "ch1": bufferID, @@ -680,8 +705,8 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() { s.checkAssignment(m, 1, "ch2", ToWatch) m.AdvanceChannelState(ctx) - s.checkAssignment(m, 1, "ch1", ToWatch) - s.checkAssignment(m, 1, "ch2", ToWatch) + s.checkAssignment(m, 1, "ch1", Standby) + s.checkAssignment(m, 1, "ch2", Standby) }) s.Run("advance to release channels notify success", func() { chNodes := map[string]int64{ diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index 7d018d15dc..2b6e631c4f 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -72,7 +72,7 @@ type RWChannelStore interface { Update(op *ChannelOpSet) error // UpdateState is used by StateChannelStore only - UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) + UpdateState(err error, nodeID int64, channel RWChannel, opID int64) // SegLegacyChannelByNode is used by StateChannelStore only SetLegacyChannelByNode(nodeIDs ...int64) @@ -339,6 +339,8 @@ func (c *StateChannelStore) Reload() error { if err != nil { return err } + + dupChannel := []*StateChannel{} for i := 0; i < len(keys); i++ { k := keys[i] v := values[i] @@ -353,14 +355,36 @@ func (c *StateChannelStore) Reload() error { } reviseVChannelInfo(info.GetVchan()) - c.AddNode(nodeID) - + channelName := info.GetVchan().GetChannelName() channel := NewStateChannelByWatchInfo(nodeID, info) + + if c.HasChannel(channelName) { + dupChannel = append(dupChannel, channel) + log.Warn("channel store detects duplicated channel, skip recovering it", + zap.Int64("nodeID", nodeID), + zap.String("channel", channelName)) + continue + } + + c.AddNode(nodeID) c.channelsInfo[nodeID].AddChannel(channel) - log.Info("channel store reload channel", - zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name)) + log.Info("channel store reloads channel from meta", + zap.Int64("nodeID", nodeID), + zap.String("channel", channelName)) metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels))) } + + for _, channel := range dupChannel { + log.Warn("channel store clearing duplicated channel", + zap.String("channel", channel.GetName()), zap.Int64("nodeID", channel.assignedNode)) + chOp := NewChannelOpSet(NewChannelOp(channel.assignedNode, Delete, channel)) + if err := c.Update(chOp); err != nil { + log.Warn("channel store failed to remove duplicated channel, will retry later", + zap.String("channel", channel.GetName()), + zap.Int64("nodeID", channel.assignedNode), + zap.Error(err)) + } + } log.Info("channel store reload done", zap.Duration("duration", record.ElapseSpan())) return nil } @@ -375,15 +399,11 @@ func (c *StateChannelStore) AddNode(nodeID int64) { } } -func (c *StateChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) { +func (c *StateChannelStore) UpdateState(err error, nodeID int64, channel RWChannel, opID int64) { channelName := channel.GetName() if cInfo, ok := c.channelsInfo[nodeID]; ok { if stateChannel, ok := cInfo.Channels[channelName]; ok { - if isSuccessful { - stateChannel.(*StateChannel).TransitionOnSuccess(opID) - } else { - stateChannel.(*StateChannel).TransitionOnFailure(opID) - } + stateChannel.(*StateChannel).TransitionState(err, opID) } } } diff --git a/internal/datacoord/channel_store_test.go b/internal/datacoord/channel_store_test.go index 4300e30e4d..1c76ea49c0 100644 --- a/internal/datacoord/channel_store_test.go +++ b/internal/datacoord/channel_store_test.go @@ -6,6 +6,7 @@ import ( "strconv" "testing" + "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -18,6 +19,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/testutils" ) @@ -421,12 +423,18 @@ func (s *StateChannelStoreSuite) TestUpdateState() { tests := []struct { description string - inSuccess bool + inErr error inChannelState ChannelState outChannelState ChannelState }{ - {"input standby, fail", false, Standby, Standby}, - {"input standby, success", true, Standby, ToWatch}, + {"input standby fail", errors.New("fail"), Standby, Standby}, + {"input standby success", nil, Standby, ToWatch}, + {"input towatch duplicate", merr.ErrChannelReduplicate, ToWatch, ToRelease}, + {"input towatch fail", errors.New("fail"), ToWatch, Standby}, + {"input towatch success", nil, ToWatch, Watching}, + {"input torelease duplicate", merr.ErrChannelReduplicate, ToRelease, ToRelease}, + {"input torelease fail", errors.New("fail"), ToRelease, ToRelease}, + {"input torelease success", nil, ToRelease, Releasing}, } for _, test := range tests { @@ -443,12 +451,45 @@ func (s *StateChannelStoreSuite) TestUpdateState() { }, } - store.UpdateState(test.inSuccess, bufferID, channel, 0) + store.UpdateState(test.inErr, bufferID, channel, 0) s.Equal(test.outChannelState, channel.currentState) }) } } +func (s *StateChannelStoreSuite) TestReloadDupCh() { + s.mockTxn.ExpectedCalls = nil + + tests := []struct { + channelName string + nodeID int64 + }{ + {"ch1", 1}, + {"ch1", bufferID}, + {"ch1", 2}, + } + + var keys, values []string + for _, test := range tests { + keys = append(keys, fmt.Sprintf("channel_store/%d/%s", test.nodeID, test.channelName)) + info := generateWatchInfo(test.channelName, datapb.ChannelWatchState_WatchSuccess) + bs, err := proto.Marshal(info) + s.Require().NoError(err) + values = append(values, string(bs)) + } + s.mockTxn.EXPECT().LoadWithPrefix(mock.Anything, mock.AnythingOfType("string")).Return(keys, values, nil) + s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything, mock.Anything).Return(nil).Times(2) + + store := NewStateChannelStore(s.mockTxn) + err := store.Reload() + s.Require().NoError(err) + + s.True(store.HasChannel("ch1")) + s.ElementsMatch([]int64{1}, store.GetNodes()) + s.EqualValues(1, store.GetNodeChannelCount(1)) + s.EqualValues(0, store.GetNodeChannelCount(2)) +} + func (s *StateChannelStoreSuite) TestReload() { type item struct { nodeID int64 diff --git a/internal/datacoord/mock_channel_store.go b/internal/datacoord/mock_channel_store.go index acda3845eb..f1245a1863 100644 --- a/internal/datacoord/mock_channel_store.go +++ b/internal/datacoord/mock_channel_store.go @@ -566,9 +566,9 @@ func (_c *MockRWChannelStore_Update_Call) RunAndReturn(run func(*ChannelOpSet) e return _c } -// UpdateState provides a mock function with given fields: isSuccessful, nodeID, channel, opID -func (_m *MockRWChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) { - _m.Called(isSuccessful, nodeID, channel, opID) +// UpdateState provides a mock function with given fields: err, nodeID, channel, opID +func (_m *MockRWChannelStore) UpdateState(err error, nodeID int64, channel RWChannel, opID int64) { + _m.Called(err, nodeID, channel, opID) } // MockRWChannelStore_UpdateState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateState' @@ -577,17 +577,17 @@ type MockRWChannelStore_UpdateState_Call struct { } // UpdateState is a helper method to define mock.On call -// - isSuccessful bool +// - err error // - nodeID int64 // - channel RWChannel // - opID int64 -func (_e *MockRWChannelStore_Expecter) UpdateState(isSuccessful interface{}, nodeID interface{}, channel interface{}, opID interface{}) *MockRWChannelStore_UpdateState_Call { - return &MockRWChannelStore_UpdateState_Call{Call: _e.mock.On("UpdateState", isSuccessful, nodeID, channel, opID)} +func (_e *MockRWChannelStore_Expecter) UpdateState(err interface{}, nodeID interface{}, channel interface{}, opID interface{}) *MockRWChannelStore_UpdateState_Call { + return &MockRWChannelStore_UpdateState_Call{Call: _e.mock.On("UpdateState", err, nodeID, channel, opID)} } -func (_c *MockRWChannelStore_UpdateState_Call) Run(run func(isSuccessful bool, nodeID int64, channel RWChannel, opID int64)) *MockRWChannelStore_UpdateState_Call { +func (_c *MockRWChannelStore_UpdateState_Call) Run(run func(err error, nodeID int64, channel RWChannel, opID int64)) *MockRWChannelStore_UpdateState_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(bool), args[1].(int64), args[2].(RWChannel), args[3].(int64)) + run(args[0].(error), args[1].(int64), args[2].(RWChannel), args[3].(int64)) }) return _c } @@ -597,7 +597,7 @@ func (_c *MockRWChannelStore_UpdateState_Call) Return() *MockRWChannelStore_Upda return _c } -func (_c *MockRWChannelStore_UpdateState_Call) RunAndReturn(run func(bool, int64, RWChannel, int64)) *MockRWChannelStore_UpdateState_Call { +func (_c *MockRWChannelStore_UpdateState_Call) RunAndReturn(run func(error, int64, RWChannel, int64)) *MockRWChannelStore_UpdateState_Call { _c.Run(run) return _c } diff --git a/internal/datanode/channel/channel_manager.go b/internal/datanode/channel/channel_manager.go index d909f9c230..b7615bb8a6 100644 --- a/internal/datanode/channel/channel_manager.go +++ b/internal/datanode/channel/channel_manager.go @@ -105,6 +105,12 @@ func (m *ChannelManagerImpl) Submit(info *datapb.ChannelWatchInfo) error { return nil } + // DataNode already watched this channel of other OpID + if info.GetState() == datapb.ChannelWatchState_ToWatch && + m.fgManager.HasFlowgraph(channel) { + return merr.WrapErrChannelReduplicate(channel) + } + if info.GetState() == datapb.ChannelWatchState_ToRelease && !m.fgManager.HasFlowgraph(channel) { log.Warn("Release op already finished, skip", zap.Int64("opID", info.GetOpID()), zap.String("channel", channel))