From 03eaa5d478ac63e8a5e514cd9958a421a2ea147a Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 21 Mar 2024 18:09:07 +0800 Subject: [PATCH] fix: Load segment task promote failed (#31430) issue: #30816 pr #31319 introduce the logic that segment checker need to load level zero segment which only exist in current target. This PR fix load segment task promote failed when segment only belongs to current target --------- Signed-off-by: Wei Liu --- internal/querycoordv2/task/scheduler.go | 2 +- tests/integration/balance/balance_test.go | 51 ++++++++++++++++------- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index af342c2c34..55ad90ed52 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -894,7 +894,7 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error { if taskType == TaskTypeMove || taskType == TaskTypeUpdate { segment = scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.CurrentTarget) } else { - segment = scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget) + segment = scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTargetFirst) } if segment == nil { log.Warn("task stale due to the segment to load not exists in targets", diff --git a/tests/integration/balance/balance_test.go b/tests/integration/balance/balance_test.go index 893e4644a6..7a8b4f072d 100644 --- a/tests/integration/balance/balance_test.go +++ b/tests/integration/balance/balance_test.go @@ -18,10 +18,14 @@ package balance import ( "context" + "fmt" + "strconv" + "strings" "testing" "time" "github.com/golang/protobuf/proto" + "github.com/samber/lo" "github.com/stretchr/testify/suite" "go.uber.org/zap" @@ -51,7 +55,7 @@ func (s *BalanceTestSuit) SetupSuite() { s.Require().NoError(s.SetupEmbedEtcd()) } -func (s *BalanceTestSuit) initCollection(collectionName string, replica int, channelNum int, segmentNum int, segmentRowNum int) { +func (s *BalanceTestSuit) initCollection(collectionName string, replica int, channelNum int, segmentNum int, segmentRowNum int, segmentDeleteNum int) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -92,6 +96,26 @@ func (s *BalanceTestSuit) initCollection(collectionName string, replica int, cha s.NoError(err) s.True(merr.Ok(insertResult.Status)) + if segmentDeleteNum > 0 { + if segmentDeleteNum > segmentRowNum { + segmentDeleteNum = segmentRowNum + } + + pks := insertResult.GetIDs().GetIntId().GetData() + expr := fmt.Sprintf("%s in [%s]", integration.Int64Field, strings.Join(lo.Map(pks, func(pk int64, _ int) string { return strconv.FormatInt(pk, 10) }), ",")) + log.Info("========================delete expr==================", + zap.String("expr", expr), + ) + + deleteResp, err := s.Cluster.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ + CollectionName: collectionName, + Expr: expr, + }) + s.Require().NoError(err) + s.Require().True(merr.Ok(deleteResp.GetStatus())) + s.Require().EqualValues(len(pks), deleteResp.GetDeleteCnt()) + } + // flush flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{ DbName: dbName, @@ -137,7 +161,7 @@ func (s *BalanceTestSuit) initCollection(collectionName string, replica int, cha func (s *BalanceTestSuit) TestBalanceOnSingleReplica() { name := "test_balance_" + funcutil.GenRandomStr() - s.initCollection(name, 1, 2, 2, 2000) + s.initCollection(name, 1, 2, 2, 2000, 500) ctx := context.Background() // disable compact @@ -161,7 +185,7 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() { resp, err := qn.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) s.NoError(err) s.True(merr.Ok(resp.GetStatus())) - return len(resp.Channels) == 1 && len(resp.Segments) == 2 + return len(resp.Channels) == 1 && len(resp.Segments) >= 2 }, 30*time.Second, 1*time.Second) // check total segment number @@ -173,7 +197,7 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() { s.True(merr.Ok(resp1.GetStatus())) count += len(resp1.Segments) } - return count == 4 + return count == 8 }, 10*time.Second, 1*time.Second) } @@ -193,11 +217,11 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() { Command: datapb.GcCommand_Resume, }) - // init collection with 2 channel, each channel has 2 segment, each segment has 2000 row + // init collection with 2 channel, each channel has 4 segment, each segment has 2000 row // and load it with 2 replicas on 2 nodes. // then we add 2 query node, after balance happens, expected each node have 1 channel and 2 segments name := "test_balance_" + funcutil.GenRandomStr() - s.initCollection(name, 2, 2, 2, 2000) + s.initCollection(name, 2, 2, 2, 2000, 500) resp, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{CollectionName: name}) s.NoError(err) @@ -211,13 +235,13 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() { s.Eventually(func() bool { resp, err := qn1.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) s.NoError(err) - return len(resp.Channels) == 1 && len(resp.Segments) == 2 + return len(resp.Channels) == 1 && len(resp.Segments) >= 2 }, 30*time.Second, 1*time.Second) s.Eventually(func() bool { resp, err := qn2.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) s.NoError(err) - return len(resp.Channels) == 1 && len(resp.Segments) == 2 + return len(resp.Channels) == 1 && len(resp.Segments) >= 2 }, 30*time.Second, 1*time.Second) // check total segment num @@ -229,7 +253,7 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() { s.True(merr.Ok(resp1.GetStatus())) count += len(resp1.Segments) } - return count == 8 + return count == 16 }, 10*time.Second, 1*time.Second) } @@ -256,7 +280,7 @@ func (s *BalanceTestSuit) TestNodeDown() { // init collection with 3 channel, each channel has 15 segment, each segment has 2000 row // and load it with 2 replicas on 2 nodes. name := "test_balance_" + funcutil.GenRandomStr() - s.initCollection(name, 1, 2, 15, 2000) + s.initCollection(name, 1, 2, 15, 2000, 500) // then we add 2 query node, after balance happens, expected each node have 1 channel and 2 segments qn1 := s.Cluster.AddQueryNode() @@ -268,7 +292,7 @@ func (s *BalanceTestSuit) TestNodeDown() { s.NoError(err) s.True(merr.Ok(resp.GetStatus())) log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) - return len(resp.Channels) == 0 && len(resp.Segments) == 10 + return len(resp.Channels) == 0 && len(resp.Segments) >= 10 }, 30*time.Second, 1*time.Second) s.Eventually(func() bool { @@ -276,7 +300,7 @@ func (s *BalanceTestSuit) TestNodeDown() { s.NoError(err) s.True(merr.Ok(resp.GetStatus())) log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) - return len(resp.Channels) == 0 && len(resp.Segments) == 10 + return len(resp.Channels) == 0 && len(resp.Segments) >= 10 }, 30*time.Second, 1*time.Second) // then we force stop qn1 and resume balance channel, let balance channel and load segment happens concurrently on qn2 @@ -298,7 +322,7 @@ func (s *BalanceTestSuit) TestNodeDown() { s.NoError(err) s.True(merr.Ok(resp.GetStatus())) log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) - return len(resp.Channels) == 1 && len(resp.Segments) == 15 + return len(resp.Channels) == 1 && len(resp.Segments) >= 15 }, 30*time.Second, 1*time.Second) // expect all delegator will recover to healthy @@ -308,7 +332,6 @@ func (s *BalanceTestSuit) TestNodeDown() { CollectionID: collectionID, }) s.NoError(err) - s.True(merr.Ok(resp.GetStatus())) return len(resp.Shards) == 2 }, 30*time.Second, 1*time.Second) }