From c46fccb3e9bba54f8f30413de1a0d830ac81e5cd Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Fri, 12 Jul 2024 15:19:36 +0800 Subject: [PATCH] fix: Prevent dispatcher merging if curTs is 0 (#34562) (#34626) When the main dispatcher has not yet consumed data, curTs is 0. During this time, merging dispatchers should not be allowed; otherwise, the data of the solo dispatcher will be skipped. issue: https://github.com/milvus-io/milvus/issues/34255 pr: https://github.com/milvus-io/milvus/pull/34562 Signed-off-by: bigsheeper --- pkg/mq/msgdispatcher/manager.go | 2 +- pkg/mq/msgdispatcher/manager_test.go | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/mq/msgdispatcher/manager.go b/pkg/mq/msgdispatcher/manager.go index ecd3d079ae..ddaa1c6427 100644 --- a/pkg/mq/msgdispatcher/manager.go +++ b/pkg/mq/msgdispatcher/manager.go @@ -183,7 +183,7 @@ func (c *dispatcherManager) tryMerge() { c.mu.Lock() defer c.mu.Unlock() - if c.mainDispatcher == nil { + if c.mainDispatcher == nil || c.mainDispatcher.CurTs() == 0 { return } candidates := make(map[string]struct{}) diff --git a/pkg/mq/msgdispatcher/manager_test.go b/pkg/mq/msgdispatcher/manager_test.go index 621767dd6e..79ee3399a6 100644 --- a/pkg/mq/msgdispatcher/manager_test.go +++ b/pkg/mq/msgdispatcher/manager_test.go @@ -71,6 +71,12 @@ func TestManager(t *testing.T) { _, err = c.Add(ctx, "mock_vchannel_2", nil, mqwrapper.SubscriptionPositionUnknown) assert.NoError(t, err) assert.Equal(t, 3, c.Num()) + c.(*dispatcherManager).mainDispatcher.curTs.Store(1000) + c.(*dispatcherManager).mu.RLock() + for _, d := range c.(*dispatcherManager).soloDispatchers { + d.curTs.Store(1000) + } + c.(*dispatcherManager).mu.RUnlock() c.(*dispatcherManager).tryMerge() assert.Equal(t, 1, c.Num()) @@ -96,6 +102,12 @@ func TestManager(t *testing.T) { _, err = c.Add(ctx, "mock_vchannel_2", nil, mqwrapper.SubscriptionPositionUnknown) assert.NoError(t, err) assert.Equal(t, 3, c.Num()) + c.(*dispatcherManager).mainDispatcher.curTs.Store(1000) + c.(*dispatcherManager).mu.RLock() + for _, d := range c.(*dispatcherManager).soloDispatchers { + d.curTs.Store(1000) + } + c.(*dispatcherManager).mu.RUnlock() CheckPeriod = 10 * time.Millisecond go c.Run()