fix: Prevent dispatcher merging if curTs is 0 (#34562) (#34626)

When the main dispatcher has not yet consumed data, curTs is 0. During
this time, merging dispatchers should not be allowed; otherwise, the
data of the solo dispatcher will be skipped.

issue: https://github.com/milvus-io/milvus/issues/34255

pr: https://github.com/milvus-io/milvus/pull/34562

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-07-12 15:19:36 +08:00 committed by GitHub
parent 7f3a2a278f
commit c46fccb3e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 13 additions and 1 deletions

View File

@ -183,7 +183,7 @@ func (c *dispatcherManager) tryMerge() {
c.mu.Lock()
defer c.mu.Unlock()
if c.mainDispatcher == nil {
if c.mainDispatcher == nil || c.mainDispatcher.CurTs() == 0 {
return
}
candidates := make(map[string]struct{})

View File

@ -71,6 +71,12 @@ func TestManager(t *testing.T) {
_, err = c.Add(ctx, "mock_vchannel_2", nil, mqwrapper.SubscriptionPositionUnknown)
assert.NoError(t, err)
assert.Equal(t, 3, c.Num())
c.(*dispatcherManager).mainDispatcher.curTs.Store(1000)
c.(*dispatcherManager).mu.RLock()
for _, d := range c.(*dispatcherManager).soloDispatchers {
d.curTs.Store(1000)
}
c.(*dispatcherManager).mu.RUnlock()
c.(*dispatcherManager).tryMerge()
assert.Equal(t, 1, c.Num())
@ -96,6 +102,12 @@ func TestManager(t *testing.T) {
_, err = c.Add(ctx, "mock_vchannel_2", nil, mqwrapper.SubscriptionPositionUnknown)
assert.NoError(t, err)
assert.Equal(t, 3, c.Num())
c.(*dispatcherManager).mainDispatcher.curTs.Store(1000)
c.(*dispatcherManager).mu.RLock()
for _, d := range c.(*dispatcherManager).soloDispatchers {
d.curTs.Store(1000)
}
c.(*dispatcherManager).mu.RUnlock()
CheckPeriod = 10 * time.Millisecond
go c.Run()