diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index df10e85f75..22d8e4c92a 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -296,6 +296,8 @@ message LoadMetaInfo { int64 collectionID = 2; repeated int64 partitionIDs = 3; string metric_type = 4 [deprecated = true]; + string db_name = 5; // Only used for metrics label. + string resource_group = 6; // Only used for metrics label. } message WatchDmChannelsRequest { diff --git a/internal/querycoordv2/balance/balance.go b/internal/querycoordv2/balance/balance.go index 5e7b61f56b..ed9568a382 100644 --- a/internal/querycoordv2/balance/balance.go +++ b/internal/querycoordv2/balance/balance.go @@ -26,27 +26,27 @@ import ( ) type SegmentAssignPlan struct { - Segment *meta.Segment - ReplicaID int64 - From int64 // -1 if empty - To int64 + Segment *meta.Segment + Replica *meta.Replica + From int64 // -1 if empty + To int64 } func (segPlan *SegmentAssignPlan) ToString() string { return fmt.Sprintf("SegmentPlan:[collectionID: %d, replicaID: %d, segmentID: %d, from: %d, to: %d]\n", - segPlan.Segment.CollectionID, segPlan.ReplicaID, segPlan.Segment.ID, segPlan.From, segPlan.To) + segPlan.Segment.CollectionID, segPlan.Replica.GetID(), segPlan.Segment.ID, segPlan.From, segPlan.To) } type ChannelAssignPlan struct { - Channel *meta.DmChannel - ReplicaID int64 - From int64 - To int64 + Channel *meta.DmChannel + Replica *meta.Replica + From int64 + To int64 } func (chanPlan *ChannelAssignPlan) ToString() string { return fmt.Sprintf("ChannelPlan:[collectionID: %d, channel: %s, replicaID: %d, from: %d, to: %d]\n", - chanPlan.Channel.CollectionID, chanPlan.Channel.ChannelName, chanPlan.ReplicaID, chanPlan.From, chanPlan.To) + chanPlan.Channel.CollectionID, chanPlan.Channel.ChannelName, chanPlan.Replica.GetID(), chanPlan.From, chanPlan.To) } var ( diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 1dfc1aa115..3225418c40 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -218,7 +218,7 @@ func (b *RowCountBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, on plans := b.AssignSegment(replica.CollectionID, segments, onlineNodes) for i := range plans { plans[i].From = nodeID - plans[i].ReplicaID = replica.ID + plans[i].Replica = replica } segmentPlans = append(segmentPlans, plans...) } @@ -286,7 +286,7 @@ func (b *RowCountBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNode segmentPlans := b.AssignSegment(replica.CollectionID, segmentsToMove, nodesWithLessRow) for i := range segmentPlans { segmentPlans[i].From = segmentPlans[i].Segment.Node - segmentPlans[i].ReplicaID = replica.ID + segmentPlans[i].Replica = replica } return segmentPlans @@ -299,7 +299,7 @@ func (b *RowCountBasedBalancer) genStoppingChannelPlan(replica *meta.Replica, on plans := b.AssignChannel(dmChannels, onlineNodes) for i := range plans { plans[i].From = nodeID - plans[i].ReplicaID = replica.ID + plans[i].Replica = replica } channelPlans = append(channelPlans, plans...) } @@ -337,7 +337,7 @@ func (b *RowCountBasedBalancer) genChannelPlan(replica *meta.Replica, onlineNode channelPlans := b.AssignChannel(channelsToMove, nodeWithLessChannel) for i := range channelPlans { channelPlans[i].From = channelPlans[i].Channel.Node - channelPlans[i].ReplicaID = replica.ID + channelPlans[i].Replica = replica } return channelPlans diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index 9f4cb73bb4..b2c14246b3 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type RowCountBasedBalancerTestSuite struct { @@ -136,7 +137,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestAssignSegment() { suite.balancer.nodeManager.Add(nodeInfo) } plans := balancer.AssignSegment(0, c.assignments, c.nodes) - suite.ElementsMatch(c.expectPlans, plans) + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, plans) }) } } @@ -168,7 +169,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { }, }, expectPlans: []SegmentAssignPlan{ - {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2}, From: 2, To: 1, ReplicaID: 1}, + {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2}, From: 2, To: 1, Replica: newReplicaDefaultRG(1)}, }, expectChannelPlans: []ChannelAssignPlan{}, }, @@ -248,7 +249,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { }, expectPlans: []SegmentAssignPlan{}, expectChannelPlans: []ChannelAssignPlan{ - {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, + {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, Replica: newReplicaDefaultRG(1)}, }, }, { @@ -277,8 +278,8 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { }, }, expectPlans: []SegmentAssignPlan{ - {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, - {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, + {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, Replica: newReplicaDefaultRG(1)}, + {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, Replica: newReplicaDefaultRG(1)}, }, expectChannelPlans: []ChannelAssignPlan{}, }, @@ -298,7 +299,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { }, expectPlans: []SegmentAssignPlan{}, expectChannelPlans: []ChannelAssignPlan{ - {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 2}, From: 2, To: 3, ReplicaID: 1}, + {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 2}, From: 2, To: 3, Replica: newReplicaDefaultRG(1)}, }, }, { @@ -340,8 +341,8 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { }, expectPlans: []SegmentAssignPlan{}, expectChannelPlans: []ChannelAssignPlan{ - {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 1}, From: 1, To: 2, ReplicaID: 1}, - {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 1}, From: 1, To: 3, ReplicaID: 1}, + {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 1}, From: 1, To: 2, Replica: newReplicaDefaultRG(1)}, + {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 1}, From: 1, To: 3, Replica: newReplicaDefaultRG(1)}, }, multiple: true, }, @@ -426,11 +427,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1) if !c.multiple { - suite.ElementsMatch(c.expectChannelPlans, channelPlans) - suite.ElementsMatch(c.expectPlans, segmentPlans) + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans) + assertChannelAssignPlanElementMatch(&suite.Suite, c.expectChannelPlans, channelPlans) } else { - suite.Subset(c.expectPlans, segmentPlans) - suite.Subset(c.expectChannelPlans, channelPlans) + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans, true) + assertChannelAssignPlanElementMatch(&suite.Suite, c.expectChannelPlans, channelPlans, true) } // clear distribution @@ -527,8 +528,8 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() { }, }, expectPlans: []SegmentAssignPlan{ - {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, - {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, + {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, Replica: newReplicaDefaultRG(1)}, + {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, Replica: newReplicaDefaultRG(1)}, }, expectChannelPlans: []ChannelAssignPlan{}, }, @@ -591,7 +592,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() { }, expectPlans: []SegmentAssignPlan{}, expectChannelPlans: []ChannelAssignPlan{ - {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, + {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, Replica: newReplicaDefaultRG(1)}, }, }, } @@ -635,8 +636,8 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() { suite.balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, c.nodes[i]) } segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1) - suite.ElementsMatch(c.expectChannelPlans, channelPlans) - suite.ElementsMatch(c.expectPlans, segmentPlans) + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans) + assertChannelAssignPlanElementMatch(&suite.Suite, c.expectChannelPlans, channelPlans) }) } } @@ -681,7 +682,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() { }, expectPlans: []SegmentAssignPlan{}, expectChannelPlans: []ChannelAssignPlan{ - {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, + {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, Replica: newReplicaDefaultRG(1)}, }, }, { @@ -710,8 +711,8 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() { }, }, expectPlans: []SegmentAssignPlan{ - {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, - {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, + {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, Replica: newReplicaDefaultRG(1)}, + {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, Replica: newReplicaDefaultRG(1)}, }, expectChannelPlans: []ChannelAssignPlan{}, }, @@ -780,8 +781,8 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() { err = balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) suite.NoError(err) segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1) - suite.ElementsMatch(c.expectChannelPlans, channelPlans) - suite.ElementsMatch(c.expectPlans, segmentPlans) + assertChannelAssignPlanElementMatch(&suite.Suite, c.expectChannelPlans, channelPlans) + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans) // clean up distribution for next test for node := range c.distributions { @@ -829,7 +830,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnLoadingCollection() { } segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1) suite.Empty(channelPlans) - suite.ElementsMatch(c.expectPlans, segmentPlans) + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans) }) } } @@ -924,7 +925,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestDisableBalanceChannel() { }, expectPlans: []SegmentAssignPlan{}, expectChannelPlans: []ChannelAssignPlan{ - {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 2}, From: 2, To: 3, ReplicaID: 1}, + {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 2}, From: 2, To: 3, Replica: newReplicaDefaultRG(1)}, }, enableBalanceChannel: true, }, @@ -1013,11 +1014,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestDisableBalanceChannel() { defer Params.Reset(Params.QueryCoordCfg.AutoBalanceChannel.Key) segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1) if !c.multiple { - suite.ElementsMatch(c.expectChannelPlans, channelPlans) - suite.ElementsMatch(c.expectPlans, segmentPlans) + assertChannelAssignPlanElementMatch(&suite.Suite, c.expectChannelPlans, channelPlans) + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans) } else { - suite.Subset(c.expectPlans, segmentPlans) - suite.Subset(c.expectChannelPlans, channelPlans) + assertChannelAssignPlanElementMatch(&suite.Suite, c.expectChannelPlans, channelPlans, true) + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans, true) } // clear distribution @@ -1161,3 +1162,101 @@ func (suite *RowCountBasedBalancerTestSuite) TestMultiReplicaBalance() { func TestRowCountBasedBalancerSuite(t *testing.T) { suite.Run(t, new(RowCountBasedBalancerTestSuite)) } + +func newReplicaDefaultRG(replicaID int64) *meta.Replica { + return meta.NewReplica( + &querypb.Replica{ + ID: replicaID, + ResourceGroup: meta.DefaultResourceGroupName, + }, + typeutil.NewUniqueSet(), + ) +} + +// remove it after resource group enhancement. +func assertSegmentAssignPlanElementMatch(suite *suite.Suite, left []SegmentAssignPlan, right []SegmentAssignPlan, subset ...bool) { + suite.Equal(len(left), len(right)) + + type comparablePlan struct { + Segment *meta.Segment + ReplicaID int64 + From int64 + To int64 + } + + leftPlan := make([]comparablePlan, 0) + for _, p := range left { + replicaID := int64(-1) + if p.Replica != nil { + replicaID = p.Replica.GetID() + } + leftPlan = append(leftPlan, comparablePlan{ + Segment: p.Segment, + ReplicaID: replicaID, + From: p.From, + To: p.To, + }) + } + + rightPlan := make([]comparablePlan, 0) + for _, p := range right { + replicaID := int64(-1) + if p.Replica != nil { + replicaID = p.Replica.GetID() + } + rightPlan = append(rightPlan, comparablePlan{ + Segment: p.Segment, + ReplicaID: replicaID, + From: p.From, + To: p.To, + }) + } + if len(subset) > 0 && subset[0] { + suite.Subset(leftPlan, rightPlan) + } else { + suite.ElementsMatch(leftPlan, rightPlan) + } +} + +// remove it after resource group enhancement. +func assertChannelAssignPlanElementMatch(suite *suite.Suite, left []ChannelAssignPlan, right []ChannelAssignPlan, subset ...bool) { + type comparablePlan struct { + Channel *meta.DmChannel + ReplicaID int64 + From int64 + To int64 + } + + leftPlan := make([]comparablePlan, 0) + for _, p := range left { + replicaID := int64(-1) + if p.Replica != nil { + replicaID = p.Replica.GetID() + } + leftPlan = append(leftPlan, comparablePlan{ + Channel: p.Channel, + ReplicaID: replicaID, + From: p.From, + To: p.To, + }) + } + + rightPlan := make([]comparablePlan, 0) + for _, p := range right { + replicaID := int64(-1) + if p.Replica != nil { + replicaID = p.Replica.GetID() + } + rightPlan = append(rightPlan, comparablePlan{ + Channel: p.Channel, + ReplicaID: replicaID, + From: p.From, + To: p.To, + }) + } + if len(subset) > 0 && subset[0] { + suite.Subset(leftPlan, rightPlan) + } else { + suite.ElementsMatch(leftPlan, rightPlan) + } +} diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 8369728e3d..0cc7adffb5 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -252,7 +252,7 @@ func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlin plans := b.AssignSegment(replica.CollectionID, segments, onlineNodes) for i := range plans { plans[i].From = nodeID - plans[i].ReplicaID = replica.ID + plans[i].Replica = replica } segmentPlans = append(segmentPlans, plans...) } @@ -316,7 +316,7 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [ segmentPlans := b.AssignSegment(replica.CollectionID, segmentsToMove, onlineNodes) for i := range segmentPlans { segmentPlans[i].From = segmentPlans[i].Segment.Node - segmentPlans[i].ReplicaID = replica.ID + segmentPlans[i].Replica = replica } return segmentPlans diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index 48545e9755..8b62aa3ab8 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -233,7 +233,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() { } for i := range c.collectionIDs { plans := balancer.AssignSegment(c.collectionIDs[i], c.assignments[i], c.nodes) - suite.ElementsMatch(c.expectPlans[i], plans) + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans) } }) } @@ -316,7 +316,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() { }, }, expectPlans: []SegmentAssignPlan{ - {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2}, From: 2, To: 1, ReplicaID: 1}, + {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2}, From: 2, To: 1, Replica: newReplicaDefaultRG(1)}, }, expectChannelPlans: []ChannelAssignPlan{}, }, @@ -386,8 +386,8 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() { // 4. balance and verify result segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, c.collectionID) - suite.ElementsMatch(c.expectChannelPlans, channelPlans) - suite.ElementsMatch(c.expectPlans, segmentPlans) + assertChannelAssignPlanElementMatch(&suite.Suite, c.expectChannelPlans, channelPlans) + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans) }) } } @@ -450,7 +450,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() { Segment: &meta.Segment{ SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 20}, Node: 2, - }, From: 2, To: 3, ReplicaID: 1, + }, From: 2, To: 3, Replica: newReplicaDefaultRG(1), }, }, {}, @@ -498,7 +498,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() { // 4. first round balance segmentPlans, _ := suite.getCollectionBalancePlans(balancer, balanceCase.collectionIDs[0]) - suite.ElementsMatch(balanceCase.expectPlans[0], segmentPlans) + assertSegmentAssignPlanElementMatch(&suite.Suite, balanceCase.expectPlans[0], segmentPlans) // 5. update segment distribution to simulate balance effect for node, s := range balanceCase.distributions[1] { @@ -507,7 +507,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() { // 6. balance again segmentPlans, _ = suite.getCollectionBalancePlans(balancer, balanceCase.collectionIDs[1]) - suite.ElementsMatch(balanceCase.expectPlans[1], segmentPlans) + assertSegmentAssignPlanElementMatch(&suite.Suite, balanceCase.expectPlans[1], segmentPlans) } func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { @@ -548,11 +548,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { {Segment: &meta.Segment{ SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 1, - }, From: 1, To: 3, ReplicaID: 1}, + }, From: 1, To: 3, Replica: newReplicaDefaultRG(1)}, {Segment: &meta.Segment{ SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1, - }, From: 1, To: 3, ReplicaID: 1}, + }, From: 1, To: 3, Replica: newReplicaDefaultRG(1)}, }, expectChannelPlans: []ChannelAssignPlan{}, }, @@ -651,8 +651,8 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { // 4. balance and verify result segmentPlans, channelPlans := suite.getCollectionBalancePlans(suite.balancer, c.collectionID) - suite.ElementsMatch(c.expectChannelPlans, channelPlans) - suite.ElementsMatch(c.expectPlans, segmentPlans) + assertChannelAssignPlanElementMatch(&suite.Suite, c.expectChannelPlans, channelPlans) + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans) }) } } diff --git a/internal/querycoordv2/balance/utils.go b/internal/querycoordv2/balance/utils.go index 8ec0104f48..24d9c53fd4 100644 --- a/internal/querycoordv2/balance/utils.go +++ b/internal/querycoordv2/balance/utils.go @@ -51,14 +51,14 @@ func CreateSegmentTasksFromPlans(ctx context.Context, source task.Source, timeou timeout, source, p.Segment.GetCollectionID(), - p.ReplicaID, + p.Replica, actions..., ) if err != nil { log.Warn("create segment task from plan failed", zap.Int64("collection", p.Segment.GetCollectionID()), zap.Int64("segmentID", p.Segment.GetID()), - zap.Int64("replica", p.ReplicaID), + zap.Int64("replica", p.Replica.GetID()), zap.String("channel", p.Segment.GetInsertChannel()), zap.Int64("from", p.From), zap.Int64("to", p.To), @@ -70,7 +70,7 @@ func CreateSegmentTasksFromPlans(ctx context.Context, source task.Source, timeou log.Info("create segment task", zap.Int64("collection", p.Segment.GetCollectionID()), zap.Int64("segmentID", p.Segment.GetID()), - zap.Int64("replica", p.ReplicaID), + zap.Int64("replica", p.Replica.GetID()), zap.String("channel", p.Segment.GetInsertChannel()), zap.Int64("from", p.From), zap.Int64("to", p.To)) @@ -98,11 +98,11 @@ func CreateChannelTasksFromPlans(ctx context.Context, source task.Source, timeou action := task.NewChannelAction(p.From, task.ActionTypeReduce, p.Channel.GetChannelName()) actions = append(actions, action) } - t, err := task.NewChannelTask(ctx, timeout, source, p.Channel.GetCollectionID(), p.ReplicaID, actions...) + t, err := task.NewChannelTask(ctx, timeout, source, p.Channel.GetCollectionID(), p.Replica, actions...) if err != nil { log.Warn("create channel task failed", zap.Int64("collection", p.Channel.GetCollectionID()), - zap.Int64("replica", p.ReplicaID), + zap.Int64("replica", p.Replica.GetID()), zap.String("channel", p.Channel.GetChannelName()), zap.Int64("from", p.From), zap.Int64("to", p.To), @@ -113,7 +113,7 @@ func CreateChannelTasksFromPlans(ctx context.Context, source task.Source, timeou log.Info("create channel task", zap.Int64("collection", p.Channel.GetCollectionID()), - zap.Int64("replica", p.ReplicaID), + zap.Int64("replica", p.Replica.GetID()), zap.String("channel", p.Channel.GetChannelName()), zap.Int64("from", p.From), zap.Int64("to", p.To)) diff --git a/internal/querycoordv2/checkers/balance_checker_test.go b/internal/querycoordv2/checkers/balance_checker_test.go index 249c20b894..d89dcc3e4f 100644 --- a/internal/querycoordv2/checkers/balance_checker_test.go +++ b/internal/querycoordv2/checkers/balance_checker_test.go @@ -286,10 +286,10 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() { // checker check segPlans, chanPlans := make([]balance.SegmentAssignPlan, 0), make([]balance.ChannelAssignPlan, 0) mockPlan := balance.SegmentAssignPlan{ - Segment: utils.CreateTestSegment(1, 1, 1, 1, 1, "1"), - ReplicaID: 1, - From: 1, - To: 2, + Segment: utils.CreateTestSegment(1, 1, 1, 1, 1, "1"), + Replica: meta.NilReplica, + From: 1, + To: 2, } segPlans = append(segPlans, mockPlan) suite.balancer.EXPECT().BalanceReplica(mock.Anything).Return(segPlans, chanPlans) diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index 3c23416e71..0217add440 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -93,7 +93,7 @@ func (c *ChannelChecker) Check(ctx context.Context) []task.Task { channels := c.dist.ChannelDistManager.GetAll() released := utils.FilterReleased(channels, collectionIDs) - releaseTasks := c.createChannelReduceTasks(ctx, released, -1) + releaseTasks := c.createChannelReduceTasks(ctx, released, meta.NilReplica) task.SetReason("collection released", releaseTasks...) tasks = append(tasks, releaseTasks...) return tasks @@ -107,12 +107,12 @@ func (c *ChannelChecker) checkReplica(ctx context.Context, replica *meta.Replica task.SetReason("lacks of channel", tasks...) ret = append(ret, tasks...) - tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica.GetID()) + tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica) task.SetReason("collection released", tasks...) ret = append(ret, tasks...) repeated := c.findRepeatedChannels(ctx, replica.GetID()) - tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), repeated, replica.GetID()) + tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), repeated, replica) task.SetReason("redundancies of channel", tasks...) ret = append(ret, tasks...) @@ -224,21 +224,21 @@ func (c *ChannelChecker) createChannelLoadTask(ctx context.Context, channels []* }) plans := c.balancer.AssignChannel(channels, availableNodes) for i := range plans { - plans[i].ReplicaID = replica.GetID() + plans[i].Replica = replica } return balance.CreateChannelTasksFromPlans(ctx, c.ID(), Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond), plans) } -func (c *ChannelChecker) createChannelReduceTasks(ctx context.Context, channels []*meta.DmChannel, replicaID int64) []task.Task { +func (c *ChannelChecker) createChannelReduceTasks(ctx context.Context, channels []*meta.DmChannel, replica *meta.Replica) []task.Task { ret := make([]task.Task, 0, len(channels)) for _, ch := range channels { action := task.NewChannelAction(ch.Node, task.ActionTypeReduce, ch.GetChannelName()) - task, err := task.NewChannelTask(ctx, Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond), c.ID(), ch.GetCollectionID(), replicaID, action) + task, err := task.NewChannelTask(ctx, Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond), c.ID(), ch.GetCollectionID(), replica, action) if err != nil { log.Warn("create channel reduce task failed", zap.Int64("collection", ch.GetCollectionID()), - zap.Int64("replica", replicaID), + zap.Int64("replica", replica.GetID()), zap.String("channel", ch.GetChannelName()), zap.Int64("from", ch.Node), zap.Error(err), diff --git a/internal/querycoordv2/checkers/channel_checker_test.go b/internal/querycoordv2/checkers/channel_checker_test.go index 107059a29c..64c28f3830 100644 --- a/internal/querycoordv2/checkers/channel_checker_test.go +++ b/internal/querycoordv2/checkers/channel_checker_test.go @@ -104,10 +104,10 @@ func (suite *ChannelCheckerTestSuite) createMockBalancer() balance.Balance { plans := make([]balance.ChannelAssignPlan, 0, len(channels)) for i, c := range channels { plan := balance.ChannelAssignPlan{ - Channel: c, - From: -1, - To: nodes[i%len(nodes)], - ReplicaID: -1, + Channel: c, + From: -1, + To: nodes[i%len(nodes)], + Replica: meta.NilReplica, } plans = append(plans, plan) } diff --git a/internal/querycoordv2/checkers/index_checker.go b/internal/querycoordv2/checkers/index_checker.go index 3b67c9f02f..5969f2983d 100644 --- a/internal/querycoordv2/checkers/index_checker.go +++ b/internal/querycoordv2/checkers/index_checker.go @@ -165,7 +165,7 @@ func (c *IndexChecker) createSegmentUpdateTask(ctx context.Context, segment *met params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), c.ID(), segment.GetCollectionID(), - replica.GetID(), + replica, action, ) if err != nil { diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go index cff6853523..baf94b3df9 100644 --- a/internal/querycoordv2/checkers/leader_checker.go +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -102,8 +102,8 @@ func (c *LeaderChecker) Check(ctx context.Context) []task.Task { leaderViews := c.dist.LeaderViewManager.GetByCollectionAndNode(replica.GetCollectionID(), node) for ch, leaderView := range leaderViews { dist := c.dist.SegmentDistManager.GetByFilter(meta.WithChannel(ch), meta.WithReplica(replica)) - tasks = append(tasks, c.findNeedLoadedSegments(ctx, replica.ID, leaderView, dist)...) - tasks = append(tasks, c.findNeedRemovedSegments(ctx, replica.ID, leaderView, dist)...) + tasks = append(tasks, c.findNeedLoadedSegments(ctx, replica, leaderView, dist)...) + tasks = append(tasks, c.findNeedRemovedSegments(ctx, replica, leaderView, dist)...) } } } @@ -112,10 +112,10 @@ func (c *LeaderChecker) Check(ctx context.Context) []task.Task { return tasks } -func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica int64, leaderView *meta.LeaderView, dist []*meta.Segment) []task.Task { +func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica *meta.Replica, leaderView *meta.LeaderView, dist []*meta.Segment) []task.Task { log := log.Ctx(ctx).With( zap.Int64("collectionID", leaderView.CollectionID), - zap.Int64("replica", replica), + zap.Int64("replica", replica.GetID()), zap.String("channel", leaderView.Channel), zap.Int64("leaderViewID", leaderView.ID), ) @@ -154,10 +154,10 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica int6 return ret } -func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica int64, leaderView *meta.LeaderView, dists []*meta.Segment) []task.Task { +func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica *meta.Replica, leaderView *meta.LeaderView, dists []*meta.Segment) []task.Task { log := log.Ctx(ctx).With( zap.Int64("collectionID", leaderView.CollectionID), - zap.Int64("replica", replica), + zap.Int64("replica", replica.GetID()), zap.String("channel", leaderView.Channel), zap.Int64("leaderViewID", leaderView.ID), ) diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index e5ff8ab432..463d37a5f7 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -97,7 +97,7 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task { // find already released segments which are not contained in target segments := c.dist.SegmentDistManager.GetByFilter(nil) released := utils.FilterReleased(segments, collectionIDs) - reduceTasks := c.createSegmentReduceTasks(ctx, released, -1, querypb.DataScope_Historical) + reduceTasks := c.createSegmentReduceTasks(ctx, released, meta.NilReplica, querypb.DataScope_Historical) task.SetReason("collection released", reduceTasks...) results = append(results, reduceTasks...) task.SetPriority(task.TaskPriorityNormal, results...) @@ -115,20 +115,20 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica ret = append(ret, tasks...) redundancies = c.filterSegmentInUse(replica, redundancies) - tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica.GetID(), querypb.DataScope_Historical) + tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica, querypb.DataScope_Historical) task.SetReason("segment not exists in target", tasks...) ret = append(ret, tasks...) // compare inner dists to find repeated loaded segments redundancies = c.findRepeatedSealedSegments(replica.GetID()) redundancies = c.filterExistedOnLeader(replica, redundancies) - tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica.GetID(), querypb.DataScope_Historical) + tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica, querypb.DataScope_Historical) task.SetReason("redundancies of segment", tasks...) ret = append(ret, tasks...) // compare with target to find the lack and redundancy of segments _, redundancies = c.getGrowingSegmentDiff(replica.GetCollectionID(), replica.GetID()) - tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica.GetID(), querypb.DataScope_Streaming) + tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica, querypb.DataScope_Streaming) task.SetReason("streaming segment not exists in target", tasks...) ret = append(ret, tasks...) @@ -381,7 +381,7 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments [] }) shardPlans := c.balancer.AssignSegment(replica.CollectionID, segmentInfos, availableNodes) for i := range shardPlans { - shardPlans[i].ReplicaID = replica.GetID() + shardPlans[i].Replica = replica } plans = append(plans, shardPlans...) } @@ -389,7 +389,7 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments [] return balance.CreateSegmentTasksFromPlans(ctx, c.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), plans) } -func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments []*meta.Segment, replicaID int64, scope querypb.DataScope) []task.Task { +func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments []*meta.Segment, replica *meta.Replica, scope querypb.DataScope) []task.Task { ret := make([]task.Task, 0, len(segments)) for _, s := range segments { action := task.NewSegmentActionWithScope(s.Node, task.ActionTypeReduce, s.GetInsertChannel(), s.GetID(), scope) @@ -398,13 +398,13 @@ func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), c.ID(), s.GetCollectionID(), - replicaID, + replica, action, ) if err != nil { log.Warn("create segment reduce task failed", zap.Int64("collection", s.GetCollectionID()), - zap.Int64("replica", replicaID), + zap.Int64("replica", replica.GetID()), zap.String("channel", s.GetInsertChannel()), zap.Int64("from", s.Node), zap.Error(err), diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 998a567bf2..54417d8e22 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -91,10 +91,10 @@ func (suite *SegmentCheckerTestSuite) createMockBalancer() balance.Balance { plans := make([]balance.SegmentAssignPlan, 0, len(segments)) for i, s := range segments { plan := balance.SegmentAssignPlan{ - Segment: s, - From: -1, - To: nodes[i%len(nodes)], - ReplicaID: -1, + Segment: s, + From: -1, + To: nodes[i%len(nodes)], + Replica: meta.NilReplica, } plans = append(plans, plan) } diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index db93d2804e..8a35b89219 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -144,7 +144,7 @@ func (s *Server) balanceSegments(ctx context.Context, req *querypb.LoadBalanceRe Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), task.WrapIDSource(req.GetBase().GetMsgID()), req.GetCollectionID(), - replica.GetID(), + replica, task.NewSegmentActionWithScope(plan.To, task.ActionTypeGrow, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical), task.NewSegmentActionWithScope(srcNode, task.ActionTypeReduce, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical), ) diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index e788075dfe..34bcbd1c76 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -30,6 +30,11 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) +// NilReplica is used to represent a nil replica. +var NilReplica = NewReplica(&querypb.Replica{ + ID: -1, +}, typeutil.NewUniqueSet()) + type Replica struct { *querypb.Replica nodes typeutil.UniqueSet // a helper field for manipulating replica's Nodes slice field diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 7976aa2a0f..6ee75d1b53 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -351,7 +351,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect if updateVersionAction != nil { actions = append(actions, updateVersionAction) } - if !ob.sync(ctx, replica.GetID(), leaderView, actions) { + if !ob.sync(ctx, replica, leaderView, actions) { return false } } @@ -360,10 +360,11 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect return true } -func (ob *TargetObserver) sync(ctx context.Context, replicaID int64, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) bool { +func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) bool { if len(diffs) == 0 { return true } + replicaID := replica.GetID() log := log.With( zap.Int64("leaderID", leaderView.ID), @@ -399,9 +400,11 @@ func (ob *TargetObserver) sync(ctx context.Context, replicaID int64, leaderView Actions: diffs, Schema: collectionInfo.GetSchema(), LoadMeta: &querypb.LoadMetaInfo{ - LoadType: ob.meta.GetLoadType(leaderView.CollectionID), - CollectionID: leaderView.CollectionID, - PartitionIDs: partitions, + LoadType: ob.meta.GetLoadType(leaderView.CollectionID), + CollectionID: leaderView.CollectionID, + PartitionIDs: partitions, + DbName: collectionInfo.GetDbName(), + ResourceGroup: replica.GetResourceGroup(), }, Version: time.Now().UnixNano(), IndexInfoList: indexInfo, diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index ec69aa9ac6..dbe5710b83 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -320,6 +320,8 @@ func (ex *Executor) subscribeChannel(task *ChannelTask, step int) error { loadMeta := packLoadMeta( ex.meta.GetLoadType(task.CollectionID()), task.CollectionID(), + collectionInfo.GetDbName(), + task.ResourceGroup(), partitions..., ) @@ -562,10 +564,13 @@ func (ex *Executor) getMetaInfo(ctx context.Context, task Task) (*milvuspb.Descr } loadMeta := packLoadMeta( - ex.meta.GetLoadType(collectionID), - collectionID, + ex.meta.GetLoadType(task.CollectionID()), + task.CollectionID(), + collectionInfo.GetDbName(), + task.ResourceGroup(), partitions..., ) + // get channel first, in case of target updated after segment info fetched channel := ex.targetMgr.GetDmChannel(collectionID, shard, meta.NextTargetFirst) if channel == nil { diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go index 22073d6a3d..6bb1aa6c71 100644 --- a/internal/querycoordv2/task/task.go +++ b/internal/querycoordv2/task/task.go @@ -71,7 +71,10 @@ type Task interface { Source() Source ID() typeutil.UniqueID CollectionID() typeutil.UniqueID + // Return 0 if the task is a reduce task without given replica. ReplicaID() typeutil.UniqueID + // Return "" if the task is a reduce task without given replica. + ResourceGroup() string Shard() string SetID(id typeutil.UniqueID) Status() Status @@ -106,7 +109,7 @@ type baseTask struct { id typeutil.UniqueID // Set by scheduler collectionID typeutil.UniqueID - replicaID typeutil.UniqueID + replica *meta.Replica shard string loadType querypb.LoadType @@ -125,14 +128,14 @@ type baseTask struct { startTs time.Time } -func newBaseTask(ctx context.Context, source Source, collectionID, replicaID typeutil.UniqueID, shard string, taskTag string) *baseTask { +func newBaseTask(ctx context.Context, source Source, collectionID typeutil.UniqueID, replica *meta.Replica, shard string, taskTag string) *baseTask { ctx, cancel := context.WithCancel(ctx) ctx, span := otel.Tracer(typeutil.QueryCoordRole).Start(ctx, taskTag) return &baseTask{ source: source, collectionID: collectionID, - replicaID: replicaID, + replica: replica, shard: shard, status: atomic.NewString(TaskStatusStarted), @@ -167,7 +170,13 @@ func (task *baseTask) CollectionID() typeutil.UniqueID { } func (task *baseTask) ReplicaID() typeutil.UniqueID { - return task.replicaID + // replica may be nil, 0 will be generated. + return task.replica.GetID() +} + +func (task *baseTask) ResourceGroup() string { + // replica may be nil, empty string will be generated. + return task.replica.GetResourceGroup() } func (task *baseTask) Shard() string { @@ -195,7 +204,7 @@ func (task *baseTask) SetPriority(priority Priority) { } func (task *baseTask) Index() string { - return fmt.Sprintf("[replica=%d]", task.replicaID) + return fmt.Sprintf("[replica=%d]", task.ReplicaID()) } func (task *baseTask) RecordStartTs() { @@ -282,13 +291,14 @@ func (task *baseTask) String() string { } } return fmt.Sprintf( - "[id=%d] [type=%s] [source=%s] [reason=%s] [collectionID=%d] [replicaID=%d] [priority=%s] [actionsCount=%d] [actions=%s]", + "[id=%d] [type=%s] [source=%s] [reason=%s] [collectionID=%d] [replicaID=%d] [resourceGroup=%s] [priority=%s] [actionsCount=%d] [actions=%s]", task.id, GetTaskType(task).String(), task.source.String(), task.reason, task.collectionID, - task.replicaID, + task.ReplicaID(), + task.ResourceGroup(), task.priority.String(), len(task.actions), actionsStr, @@ -307,8 +317,8 @@ type SegmentTask struct { func NewSegmentTask(ctx context.Context, timeout time.Duration, source Source, - collectionID, - replicaID typeutil.UniqueID, + collectionID typeutil.UniqueID, + replica *meta.Replica, actions ...Action, ) (*SegmentTask, error) { if len(actions) == 0 { @@ -330,7 +340,7 @@ func NewSegmentTask(ctx context.Context, } } - base := newBaseTask(ctx, source, collectionID, replicaID, shard, fmt.Sprintf("SegmentTask-%s-%d", actions[0].Type().String(), segmentID)) + base := newBaseTask(ctx, source, collectionID, replica, shard, fmt.Sprintf("SegmentTask-%s-%d", actions[0].Type().String(), segmentID)) base.actions = actions return &SegmentTask{ baseTask: base, @@ -360,8 +370,8 @@ type ChannelTask struct { func NewChannelTask(ctx context.Context, timeout time.Duration, source Source, - collectionID, - replicaID typeutil.UniqueID, + collectionID typeutil.UniqueID, + replica *meta.Replica, actions ...Action, ) (*ChannelTask, error) { if len(actions) == 0 { @@ -381,7 +391,7 @@ func NewChannelTask(ctx context.Context, } } - base := newBaseTask(ctx, source, collectionID, replicaID, channel, fmt.Sprintf("ChannelTask-%s-%s", actions[0].Type().String(), channel)) + base := newBaseTask(ctx, source, collectionID, replica, channel, fmt.Sprintf("ChannelTask-%s-%s", actions[0].Type().String(), channel)) base.actions = actions return &ChannelTask{ baseTask: base, @@ -410,13 +420,13 @@ type LeaderTask struct { func NewLeaderTask(ctx context.Context, timeout time.Duration, source Source, - collectionID, - replicaID typeutil.UniqueID, + collectionID typeutil.UniqueID, + replica *meta.Replica, leaderID int64, action *LeaderAction, ) *LeaderTask { segmentID := action.SegmentID() - base := newBaseTask(ctx, source, collectionID, replicaID, action.Shard(), fmt.Sprintf("LeaderTask-%s-%d", action.Type().String(), segmentID)) + base := newBaseTask(ctx, source, collectionID, replica, action.Shard(), fmt.Sprintf("LeaderTask-%s-%d", action.Type().String(), segmentID)) base.actions = []Action{action} return &LeaderTask{ baseTask: base, diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 7663bb9d1f..72f84e9a1a 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -59,7 +59,7 @@ type TaskSuite struct { // Data collection int64 - replica int64 + replica *meta.Replica subChannels []string unsubChannels []string moveChannels []string @@ -86,7 +86,7 @@ type TaskSuite struct { func (suite *TaskSuite) SetupSuite() { paramtable.Init() suite.collection = 1000 - suite.replica = 10 + suite.replica = newReplicaDefaultRG(10) suite.subChannels = []string{ "sub-0", "sub-1", @@ -191,8 +191,7 @@ func (suite *TaskSuite) BeforeTest(suiteName, testName string) { PartitionID: 1, }, }) - suite.meta.ReplicaManager.Put( - utils.CreateTestReplica(suite.replica, suite.collection, []int64{1, 2, 3})) + suite.meta.ReplicaManager.Put(utils.CreateTestReplica(suite.replica.GetID(), suite.collection, []int64{1, 2, 3})) } } @@ -349,7 +348,7 @@ func (suite *TaskSuite) TestUnsubscribeChannelTask() { timeout, WrapIDSource(0), suite.collection, - -1, + meta.NilReplica, NewChannelAction(targetNode, ActionTypeReduce, channel), ) @@ -1343,39 +1342,39 @@ func (suite *TaskSuite) TestLeaderTaskSet() { } func (suite *TaskSuite) TestCreateTaskBehavior() { - chanelTask, err := NewChannelTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, 0) + chanelTask, err := NewChannelTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica) suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(chanelTask) action := NewSegmentAction(0, 0, "", 0) - chanelTask, err = NewChannelTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, 0, action) + chanelTask, err = NewChannelTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica, action) suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(chanelTask) action1 := NewChannelAction(0, 0, "fake-channel1") action2 := NewChannelAction(0, 0, "fake-channel2") - chanelTask, err = NewChannelTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, 0, action1, action2) + chanelTask, err = NewChannelTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica, action1, action2) suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(chanelTask) - segmentTask, err := NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, 0) + segmentTask, err := NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica) suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(segmentTask) channelAction := NewChannelAction(0, 0, "fake-channel1") - segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, 0, channelAction) + segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica, channelAction) suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(segmentTask) segmentAction1 := NewSegmentAction(0, 0, "", 0) segmentAction2 := NewSegmentAction(0, 0, "", 1) - segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, 0, segmentAction1, segmentAction2) + segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica, segmentAction1, segmentAction2) suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(segmentTask) leaderAction := NewLeaderAction(1, 2, ActionTypeGrow, "fake-channel1", 100, 0) - leaderTask := NewLeaderTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, 0, 1, leaderAction) + leaderTask := NewLeaderTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica, 1, leaderAction) suite.NotNil(leaderTask) } @@ -1447,8 +1446,7 @@ func (suite *TaskSuite) TestNoExecutor() { ChannelName: Params.CommonCfg.RootCoordDml.GetValue() + "-test", } - suite.meta.ReplicaManager.Put( - utils.CreateTestReplica(suite.replica, suite.collection, []int64{1, 2, 3, -1})) + suite.meta.ReplicaManager.Put(utils.CreateTestReplica(suite.replica.GetID(), suite.collection, []int64{1, 2, 3, -1})) // Test load segment task suite.dist.ChannelDistManager.Update(targetNode, meta.DmChannelFromVChannel(&datapb.VchannelInfo{ @@ -1666,7 +1664,7 @@ func (suite *TaskSuite) TestBalanceChannelTask() { 10*time.Second, WrapIDSource(2), collectionID, - 1, + meta.NilReplica, NewChannelAction(1, ActionTypeGrow, channel), NewChannelAction(2, ActionTypeReduce, channel), ) @@ -1761,7 +1759,12 @@ func (suite *TaskSuite) TestBalanceChannelWithL0SegmentTask() { 10*time.Second, WrapIDSource(2), collectionID, - 1, + meta.NewReplica( + &querypb.Replica{ + ID: 1, + }, + typeutil.NewUniqueSet(), + ), NewChannelAction(1, ActionTypeGrow, channel), NewChannelAction(2, ActionTypeReduce, channel), ) @@ -1795,3 +1798,13 @@ func (suite *TaskSuite) TestBalanceChannelWithL0SegmentTask() { func TestTask(t *testing.T) { suite.Run(t, new(TaskSuite)) } + +func newReplicaDefaultRG(replicaID int64) *meta.Replica { + return meta.NewReplica( + &querypb.Replica{ + ID: replicaID, + ResourceGroup: meta.DefaultResourceGroupName, + }, + typeutil.NewUniqueSet(), + ) +} diff --git a/internal/querycoordv2/task/utils.go b/internal/querycoordv2/task/utils.go index d96cb60da5..dd77a08bf7 100644 --- a/internal/querycoordv2/task/utils.go +++ b/internal/querycoordv2/task/utils.go @@ -180,11 +180,13 @@ func packReleaseSegmentRequest(task *SegmentTask, action *SegmentAction) *queryp } } -func packLoadMeta(loadType querypb.LoadType, collectionID int64, partitions ...int64) *querypb.LoadMetaInfo { +func packLoadMeta(loadType querypb.LoadType, collectionID int64, databaseName string, resourceGroup string, partitions ...int64) *querypb.LoadMetaInfo { return &querypb.LoadMetaInfo{ - LoadType: loadType, - CollectionID: collectionID, - PartitionIDs: partitions, + LoadType: loadType, + CollectionID: collectionID, + PartitionIDs: partitions, + DbName: databaseName, + ResourceGroup: resourceGroup, } } diff --git a/internal/querycoordv2/task/utils_test.go b/internal/querycoordv2/task/utils_test.go index 2504bf9ca4..86302f3859 100644 --- a/internal/querycoordv2/task/utils_test.go +++ b/internal/querycoordv2/task/utils_test.go @@ -43,7 +43,7 @@ func (s *UtilsSuite) TestPackLoadSegmentRequest() { time.Second, nil, 1, - 10, + newReplicaDefaultRG(10), action, ) s.NoError(err) @@ -96,7 +96,7 @@ func (s *UtilsSuite) TestPackLoadSegmentRequestMmap() { time.Second, nil, 1, - 10, + newReplicaDefaultRG(10), action, ) s.NoError(err) diff --git a/internal/querynodev2/local_worker_test.go b/internal/querynodev2/local_worker_test.go index 9410858712..e28799df2a 100644 --- a/internal/querynodev2/local_worker_test.go +++ b/internal/querynodev2/local_worker_test.go @@ -94,7 +94,9 @@ func (suite *LocalWorkerTestSuite) BeforeTest(suiteName, testName string) { suite.schema = segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true) suite.indexMeta = segments.GenTestIndexMeta(suite.collectionID, suite.schema) - collection := segments.NewCollection(suite.collectionID, suite.schema, suite.indexMeta, querypb.LoadType_LoadCollection) + collection := segments.NewCollection(suite.collectionID, suite.schema, suite.indexMeta, &querypb.LoadMetaInfo{ + LoadType: querypb.LoadType_LoadCollection, + }) loadMata := &querypb.LoadMetaInfo{ LoadType: querypb.LoadType_LoadCollection, CollectionID: suite.collectionID, diff --git a/internal/querynodev2/pipeline/insert_node_test.go b/internal/querynodev2/pipeline/insert_node_test.go index dac82abbfc..65bf17240f 100644 --- a/internal/querynodev2/pipeline/insert_node_test.go +++ b/internal/querynodev2/pipeline/insert_node_test.go @@ -61,7 +61,9 @@ func (suite *InsertNodeSuite) TestBasic() { schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true) in := suite.buildInsertNodeMsg(schema) - collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection) + collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{ + LoadType: querypb.LoadType_LoadCollection, + }) collection.AddPartition(suite.partitionID) // init mock @@ -95,7 +97,9 @@ func (suite *InsertNodeSuite) TestDataTypeNotSupported() { schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true) in := suite.buildInsertNodeMsg(schema) - collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection) + collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{ + LoadType: querypb.LoadType_LoadCollection, + }) collection.AddPartition(suite.partitionID) // init mock diff --git a/internal/querynodev2/pipeline/pipeline_test.go b/internal/querynodev2/pipeline/pipeline_test.go index e5eb0d701c..4d7aa36cf4 100644 --- a/internal/querynodev2/pipeline/pipeline_test.go +++ b/internal/querynodev2/pipeline/pipeline_test.go @@ -109,7 +109,9 @@ func (suite *PipelineTestSuite) TestBasic() { // init mock // mock collection manager schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true) - collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection) + collection := segments.NewCollection(suite.collectionID, schema, segments.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{ + LoadType: querypb.LoadType_LoadCollection, + }) suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection) // mock mq factory diff --git a/internal/querynodev2/segments/collection.go b/internal/querynodev2/segments/collection.go index a181734187..6938602f2a 100644 --- a/internal/querynodev2/segments/collection.go +++ b/internal/querynodev2/segments/collection.go @@ -93,8 +93,7 @@ func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.Collec return } - collection := NewCollection(collectionID, schema, meta, loadMeta.GetLoadType()) - collection.AddPartition(loadMeta.GetPartitionIDs()...) + collection := NewCollection(collectionID, schema, meta, loadMeta) collection.Ref(1) m.collections[collectionID] = collection } @@ -139,19 +138,36 @@ func (m *collectionManager) Unref(collectionID int64, count uint32) bool { } // Collection is a wrapper of the underlying C-structure C.CCollection +// In a query node, `Collection` is a replica info of a collection in these query node. type Collection struct { mu sync.RWMutex // protects colllectionPtr collectionPtr C.CCollection id int64 partitions *typeutil.ConcurrentSet[int64] loadType querypb.LoadType - metricType atomic.String // deprecated - schema atomic.Pointer[schemapb.CollectionSchema] - isGpuIndex bool + dbName string + resourceGroup string + // resource group of node may be changed if node transfer, + // but Collection in Manager will be released before assign new replica of new resource group on these node. + // so we don't need to update resource group in Collection. + // if resource group is not updated, the reference count of collection manager works failed. + metricType atomic.String // deprecated + schema atomic.Pointer[schemapb.CollectionSchema] + isGpuIndex bool refCount *atomic.Uint32 } +// GetDBName returns the database name of collection. +func (c *Collection) GetDBName() string { + return c.dbName +} + +// GetResourceGroup returns the resource group of collection. +func (c *Collection) GetResourceGroup() string { + return c.resourceGroup +} + // ID returns collection id func (c *Collection) ID() int64 { return c.id @@ -214,7 +230,7 @@ func (c *Collection) Unref(count uint32) uint32 { } // newCollection returns a new Collection -func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexMeta *segcorepb.CollectionIndexMeta, loadType querypb.LoadType) *Collection { +func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexMeta *segcorepb.CollectionIndexMeta, loadMetaInfo *querypb.LoadMetaInfo) *Collection { /* CCollection NewCollection(const char* schema_proto_blob); @@ -250,10 +266,15 @@ func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexM collectionPtr: collection, id: collectionID, partitions: typeutil.NewConcurrentSet[int64](), - loadType: loadType, + loadType: loadMetaInfo.GetLoadType(), + dbName: loadMetaInfo.GetDbName(), + resourceGroup: loadMetaInfo.GetResourceGroup(), refCount: atomic.NewUint32(0), isGpuIndex: isGpuIndex, } + for _, partitionID := range loadMetaInfo.GetPartitionIDs() { + coll.partitions.Insert(partitionID) + } coll.schema.Store(schema) return coll diff --git a/internal/querynodev2/segments/manager_test.go b/internal/querynodev2/segments/manager_test.go index c046d78ad4..248fbf914e 100644 --- a/internal/querynodev2/segments/manager_test.go +++ b/internal/querynodev2/segments/manager_test.go @@ -46,7 +46,9 @@ func (s *ManagerSuite) SetupTest() { schema := GenTestCollectionSchema("manager-suite", schemapb.DataType_Int64, true) segment, err := NewSegment( context.Background(), - NewCollection(s.collectionIDs[i], schema, GenTestIndexMeta(s.collectionIDs[i], schema), querypb.LoadType_LoadCollection), + NewCollection(s.collectionIDs[i], schema, GenTestIndexMeta(s.collectionIDs[i], schema), &querypb.LoadMetaInfo{ + LoadType: querypb.LoadType_LoadCollection, + }), s.types[i], 0, &querypb.SegmentLoadInfo{ diff --git a/internal/querynodev2/segments/plan_test.go b/internal/querynodev2/segments/plan_test.go index d3e9ed178a..6fc6e1f414 100644 --- a/internal/querynodev2/segments/plan_test.go +++ b/internal/querynodev2/segments/plan_test.go @@ -44,7 +44,9 @@ func (suite *PlanSuite) SetupTest() { suite.partitionID = 10 suite.segmentID = 1 schema := GenTestCollectionSchema("plan-suite", schemapb.DataType_Int64, true) - suite.collection = NewCollection(suite.collectionID, schema, GenTestIndexMeta(suite.collectionID, schema), querypb.LoadType_LoadCollection) + suite.collection = NewCollection(suite.collectionID, schema, GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{ + LoadType: querypb.LoadType_LoadCollection, + }) suite.collection.AddPartition(suite.partitionID) } diff --git a/internal/querynodev2/segments/reduce_test.go b/internal/querynodev2/segments/reduce_test.go index 0086965715..a310126b9c 100644 --- a/internal/querynodev2/segments/reduce_test.go +++ b/internal/querynodev2/segments/reduce_test.go @@ -71,8 +71,9 @@ func (suite *ReduceSuite) SetupTest() { suite.collection = NewCollection(suite.collectionID, schema, GenTestIndexMeta(suite.collectionID, schema), - querypb.LoadType_LoadCollection, - ) + &querypb.LoadMetaInfo{ + LoadType: querypb.LoadType_LoadCollection, + }) suite.segment, err = NewSegment(ctx, suite.collection, SegmentTypeSealed, diff --git a/internal/querynodev2/segments/segment_loader_test.go b/internal/querynodev2/segments/segment_loader_test.go index 38e101a9d3..3ec906ad49 100644 --- a/internal/querynodev2/segments/segment_loader_test.go +++ b/internal/querynodev2/segments/segment_loader_test.go @@ -693,7 +693,7 @@ func (suite *SegmentLoaderDetailSuite) SetupTest() { PartitionIDs: []int64{suite.partitionID}, } - collection := NewCollection(suite.collectionID, schema, indexMeta, loadMeta.GetLoadType()) + collection := NewCollection(suite.collectionID, schema, indexMeta, loadMeta) suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection).Maybe() } diff --git a/internal/querynodev2/server_test.go b/internal/querynodev2/server_test.go index e83fcb097a..c5a458757f 100644 --- a/internal/querynodev2/server_test.go +++ b/internal/querynodev2/server_test.go @@ -219,7 +219,9 @@ func (suite *QueryNodeSuite) TestStop() { suite.node.manager = segments.NewManager() schema := segments.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true) - collection := segments.NewCollection(1, schema, nil, querypb.LoadType_LoadCollection) + collection := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{ + LoadType: querypb.LoadType_LoadCollection, + }) segment, err := segments.NewSegment( context.Background(), collection,