From d4837307b3750f14d55b40966a27f58517ea69eb Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 14 May 2024 22:09:34 +0800 Subject: [PATCH] fix: Make submit idempotent (#33053) issue: #33054 Signed-off-by: yangxuan --- internal/datanode/channel_manager.go | 21 +++++++++++++++ internal/datanode/channel_manager_test.go | 32 +++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/internal/datanode/channel_manager.go b/internal/datanode/channel_manager.go index 6f6242c726..97ae15e714 100644 --- a/internal/datanode/channel_manager.go +++ b/internal/datanode/channel_manager.go @@ -76,6 +76,27 @@ func NewChannelManager(dn *DataNode) *ChannelManagerImpl { func (m *ChannelManagerImpl) Submit(info *datapb.ChannelWatchInfo) error { channel := info.GetVchan().GetChannelName() + + // skip enqueue datacoord re-submit the same operations + if runner, ok := m.opRunners.Get(channel); ok { + if runner.Exist(info.GetOpID()) { + log.Warn("op already exist, skip", zap.Int64("opID", info.GetOpID()), zap.String("channel", channel)) + return nil + } + } + + if info.GetState() == datapb.ChannelWatchState_ToWatch && + m.fgManager.HasFlowgraphWithOpID(channel, info.GetOpID()) { + log.Warn("Watch op already finished, skip", zap.Int64("opID", info.GetOpID()), zap.String("channel", channel)) + return nil + } + + if info.GetState() == datapb.ChannelWatchState_ToRelease && + !m.fgManager.HasFlowgraph(channel) { + log.Warn("Release op already finished, skip", zap.Int64("opID", info.GetOpID()), zap.String("channel", channel)) + return nil + } + runner := m.getOrCreateRunner(channel) return runner.Enqueue(info) } diff --git a/internal/datanode/channel_manager_test.go b/internal/datanode/channel_manager_test.go index 8a6b2f441d..85c13d7fe9 100644 --- a/internal/datanode/channel_manager_test.go +++ b/internal/datanode/channel_manager_test.go @@ -207,6 +207,32 @@ func (s *ChannelManagerSuite) TestSubmitIdempotent() { s.Equal(1, runner.UnfinishedOpSize()) } +func (s *ChannelManagerSuite) TestSubmitSkip() { + channel := "by-dev-rootcoord-dml-1" + + info := getWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch) + s.Require().Equal(0, s.manager.opRunners.Len()) + + err := s.manager.Submit(info) + s.NoError(err) + + s.Equal(1, s.manager.opRunners.Len()) + s.True(s.manager.opRunners.Contain(channel)) + opState := <-s.manager.communicateCh + s.NotNil(opState) + s.Equal(datapb.ChannelWatchState_WatchSuccess, opState.state) + s.NotNil(opState.fg) + s.Equal(info.GetOpID(), opState.fg.opID) + s.manager.handleOpState(opState) + + err = s.manager.Submit(info) + s.NoError(err) + + runner, ok := s.manager.opRunners.Get(channel) + s.False(ok) + s.Nil(runner) +} + func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() { channel := "by-dev-rootcoord-dml-0" @@ -253,4 +279,10 @@ func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() { s.Equal(0, s.manager.fgManager.GetFlowgraphCount()) s.False(s.manager.opRunners.Contain(info.GetVchan().GetChannelName())) s.Equal(0, s.manager.opRunners.Len()) + + err = s.manager.Submit(info) + s.NoError(err) + runner, ok := s.manager.opRunners.Get(channel) + s.False(ok) + s.Nil(runner) }