fix: ChannelManager double assignment (#41837)

See also: #41876

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2025-05-23 14:16:29 +08:00 committed by GitHub
parent f71930e8db
commit 252d49d01e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 165 additions and 62 deletions

View File

@ -22,10 +22,12 @@ import (
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
type ROChannel interface {
@ -191,15 +193,30 @@ func NewStateChannelByWatchInfo(nodeID int64, info *datapb.ChannelWatchInfo) *St
return c
}
func (c *StateChannel) TransitionOnSuccess(opID int64) {
func (c *StateChannel) TransitionState(err error, opID int64) {
if opID != c.Info.GetOpID() {
log.Warn("Try to transit on success but opID not match, stay original state ",
log.Warn("Try to transit state but opID not match, stay original state ",
zap.Any("currentState", c.currentState),
zap.String("channel", c.Name),
zap.Int64("target opID", opID),
zap.Int64("channel opID", c.Info.GetOpID()))
return
}
if err == nil {
c.transitionOnSuccess()
return
}
if errors.Is(err, merr.ErrChannelReduplicate) {
c.setState(ToRelease)
return
}
c.transitionOnFailure()
}
func (c *StateChannel) transitionOnSuccess() {
switch c.currentState {
case Standby:
c.setState(ToWatch)
@ -216,21 +233,11 @@ func (c *StateChannel) TransitionOnSuccess(opID int64) {
}
}
func (c *StateChannel) TransitionOnFailure(opID int64) {
if opID != c.Info.GetOpID() {
log.Warn("Try to transit on failure but opID not match, stay original state",
zap.Any("currentState", c.currentState),
zap.String("channel", c.Name),
zap.Int64("target opID", opID),
zap.Int64("channel opID", c.Info.GetOpID()))
return
}
func (c *StateChannel) transitionOnFailure() {
switch c.currentState {
case Watching:
case Watching, Releasing, ToWatch:
c.setState(Standby)
case Releasing:
c.setState(Standby)
case Standby, ToWatch, Watched, ToRelease:
case Standby, Watched, ToRelease:
// Stay original state
}
}

View File

@ -156,14 +156,14 @@ func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes
m.mu.Lock()
nodeChannels := m.store.GetNodeChannelsBy(
WithAllNodes(),
func(ch *StateChannel) bool {
func(ch *StateChannel) bool { // Channel with drop-mark
return m.h.CheckShouldDropChannel(ch.GetName())
})
m.mu.Unlock()
for _, info := range nodeChannels {
m.finishRemoveChannel(info.NodeID, lo.Values(info.Channels)...)
}
m.mu.Unlock()
if m.balanceCheckLoop != nil {
log.Ctx(ctx).Info("starting channel balance loop")
@ -238,6 +238,7 @@ func (m *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error {
zap.Array("updates", updates), zap.Error(err))
}
// Speed up channel assignment
// channel already written into meta, try to assign it to the cluster
// not error is returned if failed, the assignment will retry later
updates = m.assignPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect())
@ -286,11 +287,8 @@ func (m *ChannelManagerImpl) DeleteNode(nodeID UniqueID) error {
return nil
}
// reassign reassigns a channel to another DataNode.
// inner method, lock before using it, reassign reassigns a channel to another DataNode.
func (m *ChannelManagerImpl) reassign(original *NodeChannelInfo) error {
m.mu.Lock()
defer m.mu.Unlock()
updates := m.assignPolicy(m.store.GetNodesChannels(), original, m.legacyNodes.Collect())
if updates != nil {
return m.execute(updates)
@ -436,15 +434,16 @@ func (m *ChannelManagerImpl) CheckLoop(ctx context.Context) {
}
func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) {
m.mu.RLock()
m.mu.Lock()
standbys := m.store.GetNodeChannelsBy(WithAllNodes(), WithChannelStates(Standby))
toNotifies := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(ToWatch, ToRelease))
toChecks := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(Watching, Releasing))
m.mu.RUnlock()
// Processing standby channels
updatedStandbys := false
updatedStandbys = m.advanceStandbys(ctx, standbys)
// Reassigning standby channels in locks to avoid concurrent assignment with Watch, Remove, AddNode, DeleteNode
updatedStandbys := m.advanceStandbys(ctx, standbys)
m.mu.Unlock()
// RPCs stays out of locks
updatedToCheckes := m.advanceToChecks(ctx, toChecks)
updatedToNotifies := m.advanceToNotifies(ctx, toNotifies)
@ -453,9 +452,8 @@ func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) {
}
}
// inner method need lock
func (m *ChannelManagerImpl) finishRemoveChannel(nodeID int64, channels ...RWChannel) {
m.mu.Lock()
defer m.mu.Unlock()
for _, ch := range channels {
if err := m.removeChannel(nodeID, ch); err != nil {
log.Warn("Failed to remove channel", zap.Any("channel", ch), zap.Error(err))
@ -469,6 +467,7 @@ func (m *ChannelManagerImpl) finishRemoveChannel(nodeID int64, channels ...RWCha
}
}
// inner method need locks
func (m *ChannelManagerImpl) advanceStandbys(ctx context.Context, standbys []*NodeChannelInfo) bool {
var advanced bool = false
for _, nodeAssign := range standbys {
@ -576,7 +575,7 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
}
m.mu.Lock()
m.store.UpdateState(err == nil, nodeID, res.ch, res.opID)
m.store.UpdateState(err, nodeID, res.ch, res.opID)
m.mu.Unlock()
}
@ -592,9 +591,9 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
}
type poolResult struct {
successful bool
ch RWChannel
opID int64
err error
ch RWChannel
opID int64
}
func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool {
@ -620,10 +619,14 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No
future := getOrCreateIOPool().Submit(func() (any, error) {
successful, got := m.Check(ctx, nodeID, tmpWatchInfo)
if got {
var err error
if !successful {
err = errors.New("operation in progress")
}
return poolResult{
successful: successful,
ch: innerCh,
opID: tmpWatchInfo.GetOpID(),
err: err,
ch: innerCh,
opID: tmpWatchInfo.GetOpID(),
}, nil
}
return nil, errors.New("Got results with no progress")
@ -636,7 +639,7 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No
if err == nil {
m.mu.Lock()
result := got.(poolResult)
m.store.UpdateState(result.successful, nodeID, result.ch, result.opID)
m.store.UpdateState(result.err, nodeID, result.ch, result.opID)
m.mu.Unlock()
advanced = true
@ -712,6 +715,7 @@ func (m *ChannelManagerImpl) Check(ctx context.Context, nodeID int64, info *data
return false, false
}
// inner method need lock
func (m *ChannelManagerImpl) execute(updates *ChannelOpSet) error {
for _, op := range updates.ops {
if op.Type != Delete {

View File

@ -378,6 +378,31 @@ func (s *ChannelManagerSuite) TestFindWatcher() {
func (s *ChannelManagerSuite) TestAdvanceChannelState() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.Run("advance towatch dn watched to torelease", func() {
chNodes := map[string]int64{
"ch1": 1,
"ch2": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, nodeID int64, req *datapb.ChannelOperationsRequest) error {
s.Require().Equal(1, len(req.GetInfos()))
switch req.GetInfos()[0].GetVchan().GetChannelName() {
case "ch2":
return merr.WrapErrChannelReduplicate("ch2")
default:
return nil
}
}).Twice()
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
m.AdvanceChannelState(ctx)
s.checkAssignment(m, 1, "ch1", Watching)
s.checkAssignment(m, 1, "ch2", ToRelease)
})
s.Run("advance statndby with no available nodes", func() {
chNodes := map[string]int64{
"ch1": bufferID,
@ -680,8 +705,8 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
s.checkAssignment(m, 1, "ch2", ToWatch)
m.AdvanceChannelState(ctx)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
s.checkAssignment(m, 1, "ch1", Standby)
s.checkAssignment(m, 1, "ch2", Standby)
})
s.Run("advance to release channels notify success", func() {
chNodes := map[string]int64{

View File

@ -72,7 +72,7 @@ type RWChannelStore interface {
Update(op *ChannelOpSet) error
// UpdateState is used by StateChannelStore only
UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64)
UpdateState(err error, nodeID int64, channel RWChannel, opID int64)
// SegLegacyChannelByNode is used by StateChannelStore only
SetLegacyChannelByNode(nodeIDs ...int64)
@ -339,6 +339,8 @@ func (c *StateChannelStore) Reload() error {
if err != nil {
return err
}
dupChannel := []*StateChannel{}
for i := 0; i < len(keys); i++ {
k := keys[i]
v := values[i]
@ -353,14 +355,36 @@ func (c *StateChannelStore) Reload() error {
}
reviseVChannelInfo(info.GetVchan())
c.AddNode(nodeID)
channelName := info.GetVchan().GetChannelName()
channel := NewStateChannelByWatchInfo(nodeID, info)
if c.HasChannel(channelName) {
dupChannel = append(dupChannel, channel)
log.Warn("channel store detects duplicated channel, skip recovering it",
zap.Int64("nodeID", nodeID),
zap.String("channel", channelName))
continue
}
c.AddNode(nodeID)
c.channelsInfo[nodeID].AddChannel(channel)
log.Info("channel store reload channel",
zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name))
log.Info("channel store reloads channel from meta",
zap.Int64("nodeID", nodeID),
zap.String("channel", channelName))
metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels)))
}
for _, channel := range dupChannel {
log.Warn("channel store clearing duplicated channel",
zap.String("channel", channel.GetName()), zap.Int64("nodeID", channel.assignedNode))
chOp := NewChannelOpSet(NewChannelOp(channel.assignedNode, Delete, channel))
if err := c.Update(chOp); err != nil {
log.Warn("channel store failed to remove duplicated channel, will retry later",
zap.String("channel", channel.GetName()),
zap.Int64("nodeID", channel.assignedNode),
zap.Error(err))
}
}
log.Info("channel store reload done", zap.Duration("duration", record.ElapseSpan()))
return nil
}
@ -375,15 +399,11 @@ func (c *StateChannelStore) AddNode(nodeID int64) {
}
}
func (c *StateChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) {
func (c *StateChannelStore) UpdateState(err error, nodeID int64, channel RWChannel, opID int64) {
channelName := channel.GetName()
if cInfo, ok := c.channelsInfo[nodeID]; ok {
if stateChannel, ok := cInfo.Channels[channelName]; ok {
if isSuccessful {
stateChannel.(*StateChannel).TransitionOnSuccess(opID)
} else {
stateChannel.(*StateChannel).TransitionOnFailure(opID)
}
stateChannel.(*StateChannel).TransitionState(err, opID)
}
}
}

View File

@ -6,6 +6,7 @@ import (
"strconv"
"testing"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -18,6 +19,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/testutils"
)
@ -421,12 +423,18 @@ func (s *StateChannelStoreSuite) TestUpdateState() {
tests := []struct {
description string
inSuccess bool
inErr error
inChannelState ChannelState
outChannelState ChannelState
}{
{"input standby, fail", false, Standby, Standby},
{"input standby, success", true, Standby, ToWatch},
{"input standby fail", errors.New("fail"), Standby, Standby},
{"input standby success", nil, Standby, ToWatch},
{"input towatch duplicate", merr.ErrChannelReduplicate, ToWatch, ToRelease},
{"input towatch fail", errors.New("fail"), ToWatch, Standby},
{"input towatch success", nil, ToWatch, Watching},
{"input torelease duplicate", merr.ErrChannelReduplicate, ToRelease, ToRelease},
{"input torelease fail", errors.New("fail"), ToRelease, ToRelease},
{"input torelease success", nil, ToRelease, Releasing},
}
for _, test := range tests {
@ -443,12 +451,45 @@ func (s *StateChannelStoreSuite) TestUpdateState() {
},
}
store.UpdateState(test.inSuccess, bufferID, channel, 0)
store.UpdateState(test.inErr, bufferID, channel, 0)
s.Equal(test.outChannelState, channel.currentState)
})
}
}
func (s *StateChannelStoreSuite) TestReloadDupCh() {
s.mockTxn.ExpectedCalls = nil
tests := []struct {
channelName string
nodeID int64
}{
{"ch1", 1},
{"ch1", bufferID},
{"ch1", 2},
}
var keys, values []string
for _, test := range tests {
keys = append(keys, fmt.Sprintf("channel_store/%d/%s", test.nodeID, test.channelName))
info := generateWatchInfo(test.channelName, datapb.ChannelWatchState_WatchSuccess)
bs, err := proto.Marshal(info)
s.Require().NoError(err)
values = append(values, string(bs))
}
s.mockTxn.EXPECT().LoadWithPrefix(mock.Anything, mock.AnythingOfType("string")).Return(keys, values, nil)
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything, mock.Anything).Return(nil).Times(2)
store := NewStateChannelStore(s.mockTxn)
err := store.Reload()
s.Require().NoError(err)
s.True(store.HasChannel("ch1"))
s.ElementsMatch([]int64{1}, store.GetNodes())
s.EqualValues(1, store.GetNodeChannelCount(1))
s.EqualValues(0, store.GetNodeChannelCount(2))
}
func (s *StateChannelStoreSuite) TestReload() {
type item struct {
nodeID int64

View File

@ -566,9 +566,9 @@ func (_c *MockRWChannelStore_Update_Call) RunAndReturn(run func(*ChannelOpSet) e
return _c
}
// UpdateState provides a mock function with given fields: isSuccessful, nodeID, channel, opID
func (_m *MockRWChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) {
_m.Called(isSuccessful, nodeID, channel, opID)
// UpdateState provides a mock function with given fields: err, nodeID, channel, opID
func (_m *MockRWChannelStore) UpdateState(err error, nodeID int64, channel RWChannel, opID int64) {
_m.Called(err, nodeID, channel, opID)
}
// MockRWChannelStore_UpdateState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateState'
@ -577,17 +577,17 @@ type MockRWChannelStore_UpdateState_Call struct {
}
// UpdateState is a helper method to define mock.On call
// - isSuccessful bool
// - err error
// - nodeID int64
// - channel RWChannel
// - opID int64
func (_e *MockRWChannelStore_Expecter) UpdateState(isSuccessful interface{}, nodeID interface{}, channel interface{}, opID interface{}) *MockRWChannelStore_UpdateState_Call {
return &MockRWChannelStore_UpdateState_Call{Call: _e.mock.On("UpdateState", isSuccessful, nodeID, channel, opID)}
func (_e *MockRWChannelStore_Expecter) UpdateState(err interface{}, nodeID interface{}, channel interface{}, opID interface{}) *MockRWChannelStore_UpdateState_Call {
return &MockRWChannelStore_UpdateState_Call{Call: _e.mock.On("UpdateState", err, nodeID, channel, opID)}
}
func (_c *MockRWChannelStore_UpdateState_Call) Run(run func(isSuccessful bool, nodeID int64, channel RWChannel, opID int64)) *MockRWChannelStore_UpdateState_Call {
func (_c *MockRWChannelStore_UpdateState_Call) Run(run func(err error, nodeID int64, channel RWChannel, opID int64)) *MockRWChannelStore_UpdateState_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(bool), args[1].(int64), args[2].(RWChannel), args[3].(int64))
run(args[0].(error), args[1].(int64), args[2].(RWChannel), args[3].(int64))
})
return _c
}
@ -597,7 +597,7 @@ func (_c *MockRWChannelStore_UpdateState_Call) Return() *MockRWChannelStore_Upda
return _c
}
func (_c *MockRWChannelStore_UpdateState_Call) RunAndReturn(run func(bool, int64, RWChannel, int64)) *MockRWChannelStore_UpdateState_Call {
func (_c *MockRWChannelStore_UpdateState_Call) RunAndReturn(run func(error, int64, RWChannel, int64)) *MockRWChannelStore_UpdateState_Call {
_c.Run(run)
return _c
}

View File

@ -105,6 +105,12 @@ func (m *ChannelManagerImpl) Submit(info *datapb.ChannelWatchInfo) error {
return nil
}
// DataNode already watched this channel of other OpID
if info.GetState() == datapb.ChannelWatchState_ToWatch &&
m.fgManager.HasFlowgraph(channel) {
return merr.WrapErrChannelReduplicate(channel)
}
if info.GetState() == datapb.ChannelWatchState_ToRelease &&
!m.fgManager.HasFlowgraph(channel) {
log.Warn("Release op already finished, skip", zap.Int64("opID", info.GetOpID()), zap.String("channel", channel))