From d4704ab9b6644bbc3fa216875ee18e576bc27209 Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 7 Jul 2023 19:32:25 +0800 Subject: [PATCH] fix set shard unservicable when sync target version (#25418) Signed-off-by: Wei Liu --- internal/querynodev2/delegator/distribution.go | 9 ++++++--- internal/querynodev2/delegator/distribution_test.go | 7 +++++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/internal/querynodev2/delegator/distribution.go b/internal/querynodev2/delegator/distribution.go index da064bd96e..73a0f96b96 100644 --- a/internal/querynodev2/delegator/distribution.go +++ b/internal/querynodev2/delegator/distribution.go @@ -210,24 +210,25 @@ func (d *distribution) AddOfflines(segmentIDs ...int64) { func (d *distribution) SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64) { d.mut.Lock() defer d.mut.Unlock() + for _, segmentID := range growingInTarget { entry, ok := d.growingSegments[segmentID] if !ok { - log.Error("readable growing segment lost, make it unserviceable", + log.Warn("readable growing segment lost, consume from dml seems too slow", zap.Int64("segmentID", segmentID)) - d.serviceable.Store(false) continue } entry.TargetVersion = newVersion d.growingSegments[segmentID] = entry } + available := true for _, segmentID := range sealedInTarget { entry, ok := d.sealedSegments[segmentID] if !ok { log.Error("readable sealed segment lost, make it unserviceable", zap.Int64("segmentID", segmentID)) - d.serviceable.Store(false) + available = false continue } entry.TargetVersion = newVersion @@ -237,6 +238,8 @@ func (d *distribution) SyncTargetVersion(newVersion int64, growingInTarget []int oldValue := d.targetVersion.Load() d.targetVersion.Store(newVersion) d.genSnapshot() + // if sealed segment in leader view is less than sealed segment in target, set delegator to unserviceable + d.serviceable.Store(available) log.Info("Update readable segment version", zap.Int64("oldVersion", oldValue), zap.Int64("newVersion", newVersion), diff --git a/internal/querynodev2/delegator/distribution_test.go b/internal/querynodev2/delegator/distribution_test.go index d449b82b4e..58efba7a93 100644 --- a/internal/querynodev2/delegator/distribution_test.go +++ b/internal/querynodev2/delegator/distribution_test.go @@ -631,6 +631,13 @@ func (s *DistributionSuite) Test_SyncTargetVersion() { s1, s2, _ = s.dist.GetSegments(false) s.Len(s1[0].Segments, 3) s.Len(s2, 3) + + s.dist.serviceable.Store(true) + s.dist.SyncTargetVersion(2, []int64{222}, []int64{}) + s.True(s.dist.Serviceable()) + + s.dist.SyncTargetVersion(2, []int64{}, []int64{333}) + s.False(s.dist.Serviceable()) } func TestDistributionSuite(t *testing.T) {