From a2502bde75e1e849253cc1df58b9f766bfd9c685 Mon Sep 17 00:00:00 2001 From: chyezh Date: Fri, 5 Apr 2024 04:57:16 +0800 Subject: [PATCH] enhance: replica manager enhancement (#31496) issue: #30647 - ReplicaManager manage read only node now, and always do persistent of node distribution of replica. - All segment/channel checker using ReplicaManager to get read-only node or read-write node, but not ResourceManager. - ReplicaManager promise that only apply unique querynode to one replica in same collection now (replicas in same collection never hold same querynode at same time). - ReplicaManager promise that fairly node count assignment policy if multi replicas of collection is assigned to one resource group. - Move some parameters check into ReplicaManager to avoid data race. - Allow transfer replica to resource group that already load replica of same collection - Allow transfer node between resource groups that load replica of same collection --------- Signed-off-by: chyezh --- internal/metastore/catalog.go | 2 +- .../metastore/kv/querycoord/kv_catalog.go | 16 +- .../mocks/mock_querycoord_catalog.go | 35 +- internal/proto/query_coord.proto | 4 +- .../balance/multi_target_balance.go | 30 +- .../balance/rowcount_based_balancer.go | 25 +- .../balance/rowcount_based_balancer_test.go | 1 + .../balance/score_based_balancer.go | 32 +- .../balance/score_based_balancer_test.go | 1 + internal/querycoordv2/balance/utils.go | 2 +- .../querycoordv2/checkers/channel_checker.go | 13 +- .../querycoordv2/checkers/segment_checker.go | 19 +- internal/querycoordv2/handlers.go | 8 +- internal/querycoordv2/job/job_load.go | 2 + internal/querycoordv2/job/job_test.go | 2 +- internal/querycoordv2/meta/replica.go | 169 ++++++ internal/querycoordv2/meta/replica_manager.go | 350 ++++++++----- .../meta/replica_manager_helper.go | 272 ++++++++++ .../meta/replica_manager_helper_test.go | 490 ++++++++++++++++++ .../querycoordv2/meta/replica_manager_test.go | 381 ++++++++++---- internal/querycoordv2/meta/replica_test.go | 175 +++++++ .../querycoordv2/meta/resource_manager.go | 41 +- .../meta/resource_manager_test.go | 19 +- .../meta/segment_dist_manager_test.go | 27 +- .../observers/collection_observer_test.go | 4 +- .../observers/replica_observer.go | 73 ++- .../observers/replica_observer_test.go | 4 +- .../observers/resource_observer.go | 5 +- .../observers/resource_observer_test.go | 6 +- .../observers/target_observer_test.go | 8 +- internal/querycoordv2/ops_services.go | 8 +- internal/querycoordv2/server.go | 7 +- internal/querycoordv2/services.go | 76 +-- internal/querycoordv2/services_test.go | 45 +- internal/querycoordv2/utils/meta.go | 179 +++---- internal/querycoordv2/utils/meta_test.go | 38 +- pkg/util/typeutil/set.go | 18 + pkg/util/typeutil/set_test.go | 13 + 38 files changed, 1962 insertions(+), 638 deletions(-) create mode 100644 internal/querycoordv2/meta/replica.go create mode 100644 internal/querycoordv2/meta/replica_manager_helper.go create mode 100644 internal/querycoordv2/meta/replica_manager_helper_test.go create mode 100644 internal/querycoordv2/meta/replica_test.go diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index ad604a0f5f..9a07c102d9 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -153,7 +153,7 @@ type DataCoordCatalog interface { type QueryCoordCatalog interface { SaveCollection(collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error SavePartition(info ...*querypb.PartitionLoadInfo) error - SaveReplica(replica *querypb.Replica) error + SaveReplica(replicas ...*querypb.Replica) error GetCollections() ([]*querypb.CollectionLoadInfo, error) GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error) GetReplicas() ([]*querypb.Replica, error) diff --git a/internal/metastore/kv/querycoord/kv_catalog.go b/internal/metastore/kv/querycoord/kv_catalog.go index 22dcc7f984..5c590cf96e 100644 --- a/internal/metastore/kv/querycoord/kv_catalog.go +++ b/internal/metastore/kv/querycoord/kv_catalog.go @@ -70,13 +70,17 @@ func (s Catalog) SavePartition(info ...*querypb.PartitionLoadInfo) error { return nil } -func (s Catalog) SaveReplica(replica *querypb.Replica) error { - key := encodeReplicaKey(replica.GetCollectionID(), replica.GetID()) - value, err := proto.Marshal(replica) - if err != nil { - return err +func (s Catalog) SaveReplica(replicas ...*querypb.Replica) error { + kvs := make(map[string]string) + for _, replica := range replicas { + key := encodeReplicaKey(replica.GetCollectionID(), replica.GetID()) + value, err := proto.Marshal(replica) + if err != nil { + return err + } + kvs[key] = string(value) } - return s.cli.Save(key, string(value)) + return s.cli.MultiSave(kvs) } func (s Catalog) SaveResourceGroup(rgs ...*querypb.ResourceGroup) error { diff --git a/internal/metastore/mocks/mock_querycoord_catalog.go b/internal/metastore/mocks/mock_querycoord_catalog.go index 06e12d6937..92c1d3efb7 100644 --- a/internal/metastore/mocks/mock_querycoord_catalog.go +++ b/internal/metastore/mocks/mock_querycoord_catalog.go @@ -720,13 +720,19 @@ func (_c *QueryCoordCatalog_SavePartition_Call) RunAndReturn(run func(...*queryp return _c } -// SaveReplica provides a mock function with given fields: replica -func (_m *QueryCoordCatalog) SaveReplica(replica *querypb.Replica) error { - ret := _m.Called(replica) +// SaveReplica provides a mock function with given fields: replicas +func (_m *QueryCoordCatalog) SaveReplica(replicas ...*querypb.Replica) error { + _va := make([]interface{}, len(replicas)) + for _i := range replicas { + _va[_i] = replicas[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) var r0 error - if rf, ok := ret.Get(0).(func(*querypb.Replica) error); ok { - r0 = rf(replica) + if rf, ok := ret.Get(0).(func(...*querypb.Replica) error); ok { + r0 = rf(replicas...) } else { r0 = ret.Error(0) } @@ -740,14 +746,21 @@ type QueryCoordCatalog_SaveReplica_Call struct { } // SaveReplica is a helper method to define mock.On call -// - replica *querypb.Replica -func (_e *QueryCoordCatalog_Expecter) SaveReplica(replica interface{}) *QueryCoordCatalog_SaveReplica_Call { - return &QueryCoordCatalog_SaveReplica_Call{Call: _e.mock.On("SaveReplica", replica)} +// - replicas ...*querypb.Replica +func (_e *QueryCoordCatalog_Expecter) SaveReplica(replicas ...interface{}) *QueryCoordCatalog_SaveReplica_Call { + return &QueryCoordCatalog_SaveReplica_Call{Call: _e.mock.On("SaveReplica", + append([]interface{}{}, replicas...)...)} } -func (_c *QueryCoordCatalog_SaveReplica_Call) Run(run func(replica *querypb.Replica)) *QueryCoordCatalog_SaveReplica_Call { +func (_c *QueryCoordCatalog_SaveReplica_Call) Run(run func(replicas ...*querypb.Replica)) *QueryCoordCatalog_SaveReplica_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*querypb.Replica)) + variadicArgs := make([]*querypb.Replica, len(args)-0) + for i, a := range args[0:] { + if a != nil { + variadicArgs[i] = a.(*querypb.Replica) + } + } + run(variadicArgs...) }) return _c } @@ -757,7 +770,7 @@ func (_c *QueryCoordCatalog_SaveReplica_Call) Return(_a0 error) *QueryCoordCatal return _c } -func (_c *QueryCoordCatalog_SaveReplica_Call) RunAndReturn(run func(*querypb.Replica) error) *QueryCoordCatalog_SaveReplica_Call { +func (_c *QueryCoordCatalog_SaveReplica_Call) RunAndReturn(run func(...*querypb.Replica) error) *QueryCoordCatalog_SaveReplica_Call { _c.Call.Return(run) return _c } diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index f000212fcf..4ee8cfa1d2 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -667,8 +667,10 @@ message PartitionLoadInfo { message Replica { int64 ID = 1; int64 collectionID = 2; - repeated int64 nodes = 3; + repeated int64 nodes = 3; // all (read and write) nodes. mutual exclusive with ro_nodes. string resource_group = 4; + repeated int64 ro_nodes = 5; // the in-using node but should not be assigned to these replica. + // can not load new channel or segment on it anymore. } enum SyncType { diff --git a/internal/querycoordv2/balance/multi_target_balance.go b/internal/querycoordv2/balance/multi_target_balance.go index 8ed1973c8c..cdab89ded2 100644 --- a/internal/querycoordv2/balance/multi_target_balance.go +++ b/internal/querycoordv2/balance/multi_target_balance.go @@ -458,34 +458,36 @@ type MultiTargetBalancer struct { func (b *MultiTargetBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { log := log.With( - zap.Int64("collection", replica.CollectionID), - zap.Int64("replica id", replica.Replica.GetID()), - zap.String("replica group", replica.Replica.GetResourceGroup()), + zap.Int64("collection", replica.GetCollectionID()), + zap.Int64("replica id", replica.GetID()), + zap.String("replica group", replica.GetResourceGroup()), ) - nodes := replica.GetNodes() - if len(nodes) == 0 { + if replica.NodesCount() == 0 { return nil, nil } - outboundNodes := b.meta.ResourceManager.CheckOutboundNodes(replica) onlineNodes := make([]int64, 0) offlineNodes := make([]int64, 0) - for _, nid := range nodes { + + // read only nodes is offline in current replica. + if replica.RONodesCount() > 0 { + // if node is stop or transfer to other rg + log.RatedInfo(10, "meet read only node, try to move out all segment/channel", zap.Int64s("node", replica.GetRONodes())) + offlineNodes = append(offlineNodes, replica.GetRONodes()...) + } + + for _, nid := range replica.GetNodes() { if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil { log.Info("not existed node", zap.Int64("nid", nid), zap.Error(err)) continue } else if isStopping { offlineNodes = append(offlineNodes, nid) - } else if outboundNodes.Contain(nid) { - // if node is stop or transfer to other rg - log.RatedInfo(10, "meet outbound node, try to move out all segment/channel", zap.Int64("node", nid)) - offlineNodes = append(offlineNodes, nid) } else { onlineNodes = append(onlineNodes, nid) } } - if len(nodes) == len(offlineNodes) || len(onlineNodes) == 0 { + if len(onlineNodes) == 0 { // no available nodes to balance return nil, nil } @@ -524,8 +526,8 @@ func (b *MultiTargetBalancer) genSegmentPlan(replica *meta.Replica) []SegmentAss // get segments distribution on replica level and global level nodeSegments := make(map[int64][]*meta.Segment) globalNodeSegments := make(map[int64][]*meta.Segment) - for _, node := range replica.Nodes { - dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.CollectionID), meta.WithNodeID(node)) + for _, node := range replica.GetNodes() { + dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(node)) segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool { return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil && b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil && diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 594360dc88..5f389ed93f 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -162,33 +162,34 @@ func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]Segment log := log.Ctx(context.TODO()).WithRateGroup("qcv2.RowCountBasedBalancer", 1, 60).With( zap.Int64("collectionID", replica.GetCollectionID()), zap.Int64("replicaID", replica.GetCollectionID()), - zap.String("resourceGroup", replica.Replica.GetResourceGroup()), + zap.String("resourceGroup", replica.GetResourceGroup()), ) - nodes := replica.GetNodes() - if len(nodes) < 2 { + if replica.NodesCount() == 0 { return nil, nil } - outboundNodes := b.meta.ResourceManager.CheckOutboundNodes(replica) onlineNodes := make([]int64, 0) offlineNodes := make([]int64, 0) - for _, nid := range nodes { + // read only nodes is offline in current replica. + if replica.RONodesCount() > 0 { + // if node is stop or transfer to other rg + log.RatedInfo(10, "meet read only node, try to move out all segment/channel", zap.Int64s("node", replica.GetRONodes())) + offlineNodes = append(offlineNodes, replica.GetRONodes()...) + } + + for _, nid := range replica.GetNodes() { if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil { log.Info("not existed node", zap.Int64("nid", nid), zap.Error(err)) continue } else if isStopping { offlineNodes = append(offlineNodes, nid) - } else if outboundNodes.Contain(nid) { - // if node is stop or transfer to other rg - log.RatedInfo(10, "meet outbound node, try to move out all segment/channel", zap.Int64("node", nid)) - offlineNodes = append(offlineNodes, nid) } else { onlineNodes = append(onlineNodes, nid) } } - if len(nodes) == len(offlineNodes) || len(onlineNodes) == 0 { + if len(onlineNodes) == 0 { // no available nodes to balance return nil, nil } @@ -231,7 +232,7 @@ func (b *RowCountBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, on b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil && segment.GetLevel() != datapb.SegmentLevel_L0 }) - plans := b.AssignSegment(replica.CollectionID, segments, onlineNodes, false) + plans := b.AssignSegment(replica.GetCollectionID(), segments, onlineNodes, false) for i := range plans { plans[i].From = nodeID plans[i].Replica = replica @@ -299,7 +300,7 @@ func (b *RowCountBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNode return nil } - segmentPlans := b.AssignSegment(replica.CollectionID, segmentsToMove, nodesWithLessRow, false) + segmentPlans := b.AssignSegment(replica.GetCollectionID(), segmentsToMove, nodesWithLessRow, false) for i := range segmentPlans { segmentPlans[i].From = segmentPlans[i].Segment.Node segmentPlans[i].Replica = replica diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index ecc283f11a..b2cc58e5b4 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -835,6 +835,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() { suite.NoError(err) err = balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) suite.NoError(err) + utils.RecoverAllCollection(balancer.meta) segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1) assertChannelAssignPlanElementMatch(&suite.Suite, c.expectChannelPlans, channelPlans) assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans) diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 7fec98e8e5..8d84a0490b 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -187,34 +187,36 @@ func (b *ScoreBasedBalancer) calculateSegmentScore(s *meta.Segment) int { func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { log := log.With( - zap.Int64("collection", replica.CollectionID), - zap.Int64("replica id", replica.Replica.GetID()), - zap.String("replica group", replica.Replica.GetResourceGroup()), + zap.Int64("collection", replica.GetCollectionID()), + zap.Int64("replica id", replica.GetID()), + zap.String("replica group", replica.GetResourceGroup()), ) - nodes := replica.GetNodes() - if len(nodes) == 0 { + if replica.NodesCount() == 0 { return nil, nil } - outboundNodes := b.meta.ResourceManager.CheckOutboundNodes(replica) onlineNodes := make([]int64, 0) offlineNodes := make([]int64, 0) - for _, nid := range nodes { + + // read only nodes is offline in current replica. + if replica.RONodesCount() > 0 { + // if node is stop or transfer to other rg + log.RatedInfo(10, "meet read only node, try to move out all segment/channel", zap.Int64s("node", replica.GetRONodes())) + offlineNodes = append(offlineNodes, replica.GetRONodes()...) + } + + for _, nid := range replica.GetNodes() { if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil { log.Info("not existed node", zap.Int64("nid", nid), zap.Error(err)) continue } else if isStopping { offlineNodes = append(offlineNodes, nid) - } else if outboundNodes.Contain(nid) { - // if node is stop or transfer to other rg - log.RatedInfo(10, "meet outbound node, try to move out all segment/channel", zap.Int64("node", nid)) - offlineNodes = append(offlineNodes, nid) } else { onlineNodes = append(onlineNodes, nid) } } - if len(nodes) == len(offlineNodes) || len(onlineNodes) == 0 { + if len(onlineNodes) == 0 { // no available nodes to balance return nil, nil } @@ -258,7 +260,7 @@ func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlin b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil && segment.GetLevel() != datapb.SegmentLevel_L0 }) - plans := b.AssignSegment(replica.CollectionID, segments, onlineNodes, false) + plans := b.AssignSegment(replica.GetCollectionID(), segments, onlineNodes, false) for i := range plans { plans[i].From = nodeID plans[i].Replica = replica @@ -283,7 +285,7 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [ }) segmentDist[node] = segments - rowCount := b.calculateScore(replica.CollectionID, node) + rowCount := b.calculateScore(replica.GetCollectionID(), node) totalScore += rowCount nodeScore[node] = rowCount } @@ -322,7 +324,7 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [ return nil } - segmentPlans := b.AssignSegment(replica.CollectionID, segmentsToMove, onlineNodes, false) + segmentPlans := b.AssignSegment(replica.GetCollectionID(), segmentsToMove, onlineNodes, false) for i := range segmentPlans { segmentPlans[i].From = segmentPlans[i].Segment.Node segmentPlans[i].Replica = replica diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index a16022de6d..3e88dd8b8e 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -703,6 +703,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { for i := range c.outBoundNodes { suite.balancer.meta.ResourceManager.UnassignNode(meta.DefaultResourceGroupName, c.outBoundNodes[i]) } + utils.RecoverAllCollection(balancer.meta) // 4. balance and verify result segmentPlans, channelPlans := suite.getCollectionBalancePlans(suite.balancer, c.collectionID) diff --git a/internal/querycoordv2/balance/utils.go b/internal/querycoordv2/balance/utils.go index 968a554fa6..858de2ec07 100644 --- a/internal/querycoordv2/balance/utils.go +++ b/internal/querycoordv2/balance/utils.go @@ -141,7 +141,7 @@ func PrintCurrentReplicaDist(replica *meta.Replica, stoppingNodesSegments map[int64][]*meta.Segment, nodeSegments map[int64][]*meta.Segment, channelManager *meta.ChannelDistManager, segmentDistMgr *meta.SegmentDistManager, ) { - distInfo := fmt.Sprintf("%s {collectionID:%d, replicaID:%d, ", DistInfoPrefix, replica.CollectionID, replica.GetID()) + distInfo := fmt.Sprintf("%s {collectionID:%d, replicaID:%d, ", DistInfoPrefix, replica.GetCollectionID(), replica.GetID()) // 1. print stopping nodes segment distribution distInfo += "[stoppingNodesSegmentDist:" for stoppingNodeID, stoppedSegments := range stoppingNodesSegments { diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index 95970a72eb..1152a47cfc 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -20,7 +20,6 @@ import ( "context" "time" - "github.com/samber/lo" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -103,16 +102,16 @@ func (c *ChannelChecker) checkReplica(ctx context.Context, replica *meta.Replica ret := make([]task.Task, 0) lacks, redundancies := c.getDmChannelDiff(replica.GetCollectionID(), replica.GetID()) - tasks := c.createChannelLoadTask(c.getTraceCtx(ctx, replica.CollectionID), lacks, replica) + tasks := c.createChannelLoadTask(c.getTraceCtx(ctx, replica.GetCollectionID()), lacks, replica) task.SetReason("lacks of channel", tasks...) ret = append(ret, tasks...) - tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica) + tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica) task.SetReason("collection released", tasks...) ret = append(ret, tasks...) repeated := c.findRepeatedChannels(ctx, replica.GetID()) - tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), repeated, replica) + tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), repeated, replica) task.SetReason("redundancies of channel", tasks...) ret = append(ret, tasks...) @@ -218,11 +217,7 @@ func (c *ChannelChecker) findRepeatedChannels(ctx context.Context, replicaID int } func (c *ChannelChecker) createChannelLoadTask(ctx context.Context, channels []*meta.DmChannel, replica *meta.Replica) []task.Task { - outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica) - availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { - return !outboundNodes.Contain(node) - }) - plans := c.balancer.AssignChannel(channels, availableNodes, false) + plans := c.balancer.AssignChannel(channels, replica.GetNodes(), false) for i := range plans { plans[i].Replica = replica } diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 04dd6c0282..cbf1bdbf54 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -110,25 +110,25 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica // compare with targets to find the lack and redundancy of segments lacks, redundancies := c.getSealedSegmentDiff(replica.GetCollectionID(), replica.GetID()) // loadCtx := trace.ContextWithSpan(context.Background(), c.meta.GetCollection(replica.CollectionID).LoadSpan) - tasks := c.createSegmentLoadTasks(c.getTraceCtx(ctx, replica.CollectionID), lacks, replica) + tasks := c.createSegmentLoadTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), lacks, replica) task.SetReason("lacks of segment", tasks...) ret = append(ret, tasks...) redundancies = c.filterSegmentInUse(replica, redundancies) - tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica, querypb.DataScope_Historical) + tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical) task.SetReason("segment not exists in target", tasks...) ret = append(ret, tasks...) // compare inner dists to find repeated loaded segments redundancies = c.findRepeatedSealedSegments(replica.GetID()) redundancies = c.filterExistedOnLeader(replica, redundancies) - tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica, querypb.DataScope_Historical) + tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical) task.SetReason("redundancies of segment", tasks...) ret = append(ret, tasks...) // compare with target to find the lack and redundancy of segments _, redundancies = c.getGrowingSegmentDiff(replica.GetCollectionID(), replica.GetID()) - tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica, querypb.DataScope_Streaming) + tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Streaming) task.SetReason("streaming segment not exists in target", tasks...) ret = append(ret, tasks...) @@ -147,7 +147,7 @@ func (c *SegmentChecker) getGrowingSegmentDiff(collectionID int64, log := log.Ctx(context.TODO()).WithRateGroup("qcv2.SegmentChecker", 1, 60).With( zap.Int64("collectionID", collectionID), - zap.Int64("replicaID", replica.ID)) + zap.Int64("replicaID", replica.GetID())) leaders := c.dist.ChannelDistManager.GetShardLeadersByReplica(replica) for channelName, node := range leaders { @@ -362,14 +362,13 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments [] return nil } - // filter out stopping nodes and outbound nodes - outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica) - availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { + // filter out stopping nodes. + availableNodes := lo.Filter(replica.GetNodes(), func(node int64, _ int) bool { stop, err := c.nodeMgr.IsStoppingNode(node) if err != nil { return false } - return !outboundNodes.Contain(node) && !stop + return !stop }) if len(availableNodes) == 0 { @@ -399,7 +398,7 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments [] SegmentInfo: s, } }) - shardPlans := c.balancer.AssignSegment(replica.CollectionID, segmentInfos, availableNodes, false) + shardPlans := c.balancer.AssignSegment(replica.GetCollectionID(), segmentInfos, availableNodes, false) for i := range shardPlans { shardPlans[i].Replica = replica } diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index 0d1a06525d..63251533c9 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -109,7 +109,7 @@ func (s *Server) balanceSegments(ctx context.Context, tasks := make([]task.Task, 0, len(plans)) for _, plan := range plans { log.Info("manually balance segment...", - zap.Int64("replica", plan.Replica.ID), + zap.Int64("replica", plan.Replica.GetID()), zap.String("channel", plan.Segment.InsertChannel), zap.Int64("from", plan.From), zap.Int64("to", plan.To), @@ -133,7 +133,7 @@ func (s *Server) balanceSegments(ctx context.Context, ) if err != nil { log.Warn("create segment task for balance failed", - zap.Int64("replica", plan.Replica.ID), + zap.Int64("replica", plan.Replica.GetID()), zap.String("channel", plan.Segment.InsertChannel), zap.Int64("from", plan.From), zap.Int64("to", plan.To), @@ -186,7 +186,7 @@ func (s *Server) balanceChannels(ctx context.Context, tasks := make([]task.Task, 0, len(plans)) for _, plan := range plans { log.Info("manually balance channel...", - zap.Int64("replica", plan.Replica.ID), + zap.Int64("replica", plan.Replica.GetID()), zap.String("channel", plan.Channel.GetChannelName()), zap.Int64("from", plan.From), zap.Int64("to", plan.To), @@ -209,7 +209,7 @@ func (s *Server) balanceChannels(ctx context.Context, ) if err != nil { log.Warn("create channel task for balance failed", - zap.Int64("replica", plan.Replica.ID), + zap.Int64("replica", plan.Replica.GetID()), zap.String("channel", plan.Channel.GetChannelName()), zap.Int64("from", plan.From), zap.Int64("to", plan.To), diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 05c37c651e..396105e99d 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -149,6 +149,8 @@ func (job *LoadCollectionJob) Execute() error { // 2. create replica if not exist replicas := job.meta.ReplicaManager.GetByCollection(req.GetCollectionID()) if len(replicas) == 0 { + // API of LoadCollection is wired, we should use map[resourceGroupNames]replicaNumber as input, to keep consistency with `TransferReplica` API. + // Then we can implement dynamic replica changed in different resource group independently. replicas, err = utils.SpawnReplicasWithRG(job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber()) if err != nil { msg := "failed to spawn replica for collection" diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index cd81e38e50..48aa359a7b 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -1195,7 +1195,7 @@ func (suite *JobSuite) TestLoadCreateReplicaFailed() { ) suite.scheduler.Add(job) err := job.Wait() - suite.ErrorIs(err, ErrFailedAllocateID) + suite.ErrorIs(err, meta.ErrNodeNotEnough) } } diff --git a/internal/querycoordv2/meta/replica.go b/internal/querycoordv2/meta/replica.go new file mode 100644 index 0000000000..4553bf4453 --- /dev/null +++ b/internal/querycoordv2/meta/replica.go @@ -0,0 +1,169 @@ +package meta + +import ( + "github.com/golang/protobuf/proto" + + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// NilReplica is used to represent a nil replica. +var NilReplica = newReplica(&querypb.Replica{ + ID: -1, +}) + +// Replica is a immutable type for manipulating replica meta info for replica manager. +// Performed a copy-on-write strategy to keep the consistency of the replica manager. +// So only read only operations are allowed on these type. +type Replica struct { + replicaPB *querypb.Replica + rwNodes typeutil.UniqueSet // a helper field for manipulating replica's Available Nodes slice field. + // always keep consistent with replicaPB.Nodes. + // mutual exclusive with roNodes. + roNodes typeutil.UniqueSet // a helper field for manipulating replica's RO Nodes slice field. + // always keep consistent with replicaPB.RoNodes. + // node used by replica but cannot add more channel or segment ont it. + // include rebalance node or node out of resource group. +} + +// Deprecated: may break the consistency of ReplicaManager, use `Spawn` of `ReplicaManager` or `newReplica` instead. +func NewReplica(replica *querypb.Replica, nodes ...typeutil.UniqueSet) *Replica { + r := proto.Clone(replica).(*querypb.Replica) + // TODO: nodes is a bad parameter, break the consistency, should be removed in future. + // keep it for old unittest. + if len(nodes) > 0 && len(replica.Nodes) == 0 && nodes[0].Len() > 0 { + r.Nodes = nodes[0].Collect() + } + return newReplica(r) +} + +// newReplica creates a new replica from pb. +func newReplica(replica *querypb.Replica) *Replica { + return &Replica{ + replicaPB: proto.Clone(replica).(*querypb.Replica), + rwNodes: typeutil.NewUniqueSet(replica.Nodes...), + roNodes: typeutil.NewUniqueSet(replica.RoNodes...), + } +} + +// GetID returns the id of the replica. +func (replica *Replica) GetID() typeutil.UniqueID { + return replica.replicaPB.GetID() +} + +// GetCollectionID returns the collection id of the replica. +func (replica *Replica) GetCollectionID() typeutil.UniqueID { + return replica.replicaPB.GetCollectionID() +} + +// GetResourceGroup returns the resource group name of the replica. +func (replica *Replica) GetResourceGroup() string { + return replica.replicaPB.GetResourceGroup() +} + +// GetNodes returns the rw nodes of the replica. +// readonly, don't modify the returned slice. +func (replica *Replica) GetNodes() []int64 { + return replica.replicaPB.GetNodes() +} + +// GetRONodes returns the ro nodes of the replica. +// readonly, don't modify the returned slice. +func (replica *Replica) GetRONodes() []int64 { + return replica.replicaPB.GetRoNodes() +} + +// RangeOverRWNodes iterates over the read and write nodes of the replica. +func (replica *Replica) RangeOverRWNodes(f func(node int64) bool) { + replica.rwNodes.Range(f) +} + +// RangeOverRONodes iterates over the ro nodes of the replica. +func (replica *Replica) RangeOverRONodes(f func(node int64) bool) { + replica.roNodes.Range(f) +} + +// RWNodesCount returns the count of rw nodes of the replica. +func (replica *Replica) RWNodesCount() int { + return replica.rwNodes.Len() +} + +// RONodesCount returns the count of ro nodes of the replica. +func (replica *Replica) RONodesCount() int { + return replica.roNodes.Len() +} + +// NodesCount returns the count of rw nodes and ro nodes of the replica. +func (replica *Replica) NodesCount() int { + return replica.rwNodes.Len() + replica.roNodes.Len() +} + +// Contains checks if the node is in rw nodes of the replica. +func (replica *Replica) Contains(node int64) bool { + return replica.rwNodes.Contain(node) +} + +// ContainRONode checks if the node is in ro nodes of the replica. +func (replica *Replica) ContainRONode(node int64) bool { + return replica.roNodes.Contain(node) +} + +// Deprecated: Warning, break the consistency of ReplicaManager, use `SetAvailableNodesInSameCollectionAndRG` in ReplicaManager instead. +// TODO: removed in future, only for old unittest now. +func (replica *Replica) AddRWNode(nodes ...int64) { + replica.roNodes.Remove(nodes...) + replica.replicaPB.RoNodes = replica.roNodes.Collect() + replica.rwNodes.Insert(nodes...) + replica.replicaPB.Nodes = replica.rwNodes.Collect() +} + +// copyForWrite returns a mutable replica for write operations. +func (replica *Replica) copyForWrite() *mutableReplica { + return &mutableReplica{ + &Replica{ + replicaPB: proto.Clone(replica.replicaPB).(*querypb.Replica), + rwNodes: typeutil.NewUniqueSet(replica.replicaPB.Nodes...), + roNodes: typeutil.NewUniqueSet(replica.replicaPB.RoNodes...), + }, + } +} + +// mutableReplica is a mutable type (COW) for manipulating replica meta info for replica manager. +type mutableReplica struct { + *Replica +} + +// SetResourceGroup sets the resource group name of the replica. +func (replica *mutableReplica) SetResourceGroup(resourceGroup string) { + replica.replicaPB.ResourceGroup = resourceGroup +} + +// AddRWNode adds the node to rw nodes of the replica. +func (replica *mutableReplica) AddRWNode(nodes ...int64) { + replica.Replica.AddRWNode(nodes...) +} + +// AddRONode moves the node from rw nodes to ro nodes of the replica. +// only used in replica manager. +func (replica *mutableReplica) AddRONode(nodes ...int64) { + replica.rwNodes.Remove(nodes...) + replica.replicaPB.Nodes = replica.rwNodes.Collect() + replica.roNodes.Insert(nodes...) + replica.replicaPB.RoNodes = replica.roNodes.Collect() +} + +// RemoveNode removes the node from rw nodes and ro nodes of the replica. +// only used in replica manager. +func (replica *mutableReplica) RemoveNode(nodes ...int64) { + replica.roNodes.Remove(nodes...) + replica.replicaPB.RoNodes = replica.roNodes.Collect() + replica.rwNodes.Remove(nodes...) + replica.replicaPB.Nodes = replica.rwNodes.Collect() +} + +// IntoReplica returns the immutable replica, After calling this method, the mutable replica should not be used again. +func (replica *mutableReplica) IntoReplica() *Replica { + r := replica.Replica + replica.Replica = nil + return r +} diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index f3c77cdc63..dbe2911579 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -20,7 +20,7 @@ import ( "fmt" "sync" - "github.com/golang/protobuf/proto" + "github.com/cockroachdb/errors" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/metastore" @@ -30,89 +30,21 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -// NilReplica is used to represent a nil replica. -var NilReplica = NewReplica(&querypb.Replica{ - ID: -1, -}, typeutil.NewUniqueSet()) - -type Replica struct { - *querypb.Replica - nodes typeutil.UniqueSet // a helper field for manipulating replica's Nodes slice field - rwmutex sync.RWMutex -} - -func NewReplica(replica *querypb.Replica, nodes typeutil.UniqueSet) *Replica { - return &Replica{ - Replica: replica, - nodes: nodes, - } -} - -func (replica *Replica) AddNode(nodes ...int64) { - replica.rwmutex.Lock() - defer replica.rwmutex.Unlock() - replica.nodes.Insert(nodes...) - replica.Replica.Nodes = replica.nodes.Collect() -} - -func (replica *Replica) GetNodes() []int64 { - replica.rwmutex.RLock() - defer replica.rwmutex.RUnlock() - if replica.nodes != nil { - return replica.nodes.Collect() - } - return nil -} - -func (replica *Replica) Len() int { - replica.rwmutex.RLock() - defer replica.rwmutex.RUnlock() - if replica.nodes != nil { - return replica.nodes.Len() - } - - return 0 -} - -func (replica *Replica) Contains(node int64) bool { - replica.rwmutex.RLock() - defer replica.rwmutex.RUnlock() - if replica.nodes != nil { - return replica.nodes.Contain(node) - } - - return false -} - -func (replica *Replica) RemoveNode(nodes ...int64) { - replica.rwmutex.Lock() - defer replica.rwmutex.Unlock() - replica.nodes.Remove(nodes...) - replica.Replica.Nodes = replica.nodes.Collect() -} - -func (replica *Replica) Clone() *Replica { - replica.rwmutex.RLock() - defer replica.rwmutex.RUnlock() - return &Replica{ - Replica: proto.Clone(replica.Replica).(*querypb.Replica), - nodes: typeutil.NewUniqueSet(replica.Replica.Nodes...), - } -} - type ReplicaManager struct { rwmutex sync.RWMutex - idAllocator func() (int64, error) - replicas map[typeutil.UniqueID]*Replica - catalog metastore.QueryCoordCatalog + idAllocator func() (int64, error) + replicas map[typeutil.UniqueID]*Replica + collIDToReplicaIDs map[typeutil.UniqueID]typeutil.UniqueSet + catalog metastore.QueryCoordCatalog } func NewReplicaManager(idAllocator func() (int64, error), catalog metastore.QueryCoordCatalog) *ReplicaManager { return &ReplicaManager{ - idAllocator: idAllocator, - replicas: make(map[int64]*Replica), - catalog: catalog, + idAllocator: idAllocator, + replicas: make(map[int64]*Replica), + collIDToReplicaIDs: make(map[int64]typeutil.UniqueSet), + catalog: catalog, } } @@ -130,10 +62,7 @@ func (m *ReplicaManager) Recover(collections []int64) error { } if collectionSet.Contain(replica.GetCollectionID()) { - m.replicas[replica.GetID()] = &Replica{ - Replica: replica, - nodes: typeutil.NewUniqueSet(replica.GetNodes()...), - } + m.putReplicaInMemory(newReplica(replica)) log.Info("recover replica", zap.Int64("collectionID", replica.GetCollectionID()), zap.Int64("replicaID", replica.GetID()), @@ -154,6 +83,8 @@ func (m *ReplicaManager) Recover(collections []int64) error { return nil } +// Get returns the replica by id. +// Replica should be read-only, do not modify it. func (m *ReplicaManager) Get(id typeutil.UniqueID) *Replica { m.rwmutex.RLock() defer m.rwmutex.RUnlock() @@ -161,22 +92,36 @@ func (m *ReplicaManager) Get(id typeutil.UniqueID) *Replica { return m.replicas[id] } -// Spawn spawns replicas of the given number, for given collection, -// this doesn't store these replicas and assign nodes to them. -func (m *ReplicaManager) Spawn(collection int64, replicaNumber int32, rgName string) ([]*Replica, error) { - var ( - replicas = make([]*Replica, replicaNumber) - err error - ) - for i := range replicas { - replicas[i], err = m.spawn(collection, rgName) - if err != nil { - return nil, err +// Spawn spawns N replicas at resource group for given collection in ReplicaManager. +func (m *ReplicaManager) Spawn(collection int64, replicaNumInRG map[string]int) ([]*Replica, error) { + m.rwmutex.Lock() + defer m.rwmutex.Unlock() + if m.collIDToReplicaIDs[collection] != nil { + return nil, fmt.Errorf("replicas of collection %d is already spawned", collection) + } + + replicas := make([]*Replica, 0) + for rgName, replicaNum := range replicaNumInRG { + for ; replicaNum > 0; replicaNum-- { + id, err := m.idAllocator() + if err != nil { + return nil, err + } + replicas = append(replicas, newReplica(&querypb.Replica{ + ID: id, + CollectionID: collection, + ResourceGroup: rgName, + })) } } - return replicas, err + if err := m.put(replicas...); err != nil { + return nil, err + } + return replicas, nil } +// Deprecated: Warning, break the consistency of ReplicaManager, +// never use it in non-test code, use Spawn instead. func (m *ReplicaManager) Put(replicas ...*Replica) error { m.rwmutex.Lock() defer m.rwmutex.Unlock() @@ -184,30 +129,81 @@ func (m *ReplicaManager) Put(replicas ...*Replica) error { return m.put(replicas...) } -func (m *ReplicaManager) spawn(collectionID typeutil.UniqueID, rgName string) (*Replica, error) { - id, err := m.idAllocator() - if err != nil { - return nil, err +func (m *ReplicaManager) put(replicas ...*Replica) error { + if len(replicas) == 0 { + return nil } - return &Replica{ - Replica: &querypb.Replica{ - ID: id, - CollectionID: collectionID, - ResourceGroup: rgName, - }, - nodes: make(typeutil.UniqueSet), - }, nil + // Persist replicas into KV. + replicaPBs := make([]*querypb.Replica, 0, len(replicas)) + for _, replica := range replicas { + replicaPBs = append(replicaPBs, replica.replicaPB) + } + if err := m.catalog.SaveReplica(replicaPBs...); err != nil { + return err + } + + m.putReplicaInMemory(replicas...) + return nil } -func (m *ReplicaManager) put(replicas ...*Replica) error { +// putReplicaInMemory puts replicas into in-memory map and collIDToReplicaIDs. +func (m *ReplicaManager) putReplicaInMemory(replicas ...*Replica) { for _, replica := range replicas { - err := m.catalog.SaveReplica(replica.Replica) - if err != nil { - return err + // update in-memory replicas. + m.replicas[replica.GetID()] = replica + + // update collIDToReplicaIDs. + if m.collIDToReplicaIDs[replica.GetCollectionID()] == nil { + m.collIDToReplicaIDs[replica.GetCollectionID()] = typeutil.NewUniqueSet() } - m.replicas[replica.ID] = replica + m.collIDToReplicaIDs[replica.GetCollectionID()].Insert(replica.GetID()) } - return nil +} + +// TransferReplica transfers N replicas from srcRGName to dstRGName. +func (m *ReplicaManager) TransferReplica(collectionID typeutil.UniqueID, srcRGName string, dstRGName string, replicaNum int) error { + m.rwmutex.Lock() + defer m.rwmutex.Unlock() + + // Check if replica can be transfer. + srcReplicas, err := m.getSrcReplicasAndCheckIfTransferable(collectionID, srcRGName, replicaNum) + if err != nil { + return err + } + + // Transfer N replicas from srcRGName to dstRGName. + // Node Change will be executed by replica_observer in background. + replicas := make([]*Replica, 0, replicaNum) + for i := 0; i < replicaNum; i++ { + mutableReplica := srcReplicas[i].copyForWrite() + mutableReplica.SetResourceGroup(dstRGName) + replicas = append(replicas, mutableReplica.IntoReplica()) + } + return m.put(replicas...) +} + +// getSrcReplicasAndCheckIfTransferable checks if the collection can be transfer from srcRGName to dstRGName. +func (m *ReplicaManager) getSrcReplicasAndCheckIfTransferable(collectionID typeutil.UniqueID, srcRGName string, replicaNum int) ([]*Replica, error) { + // Check if collection is loaded. + if m.collIDToReplicaIDs[collectionID] == nil { + return nil, merr.WrapErrParameterInvalid( + "Collection not loaded", + fmt.Sprintf("collectionID %d", collectionID), + ) + } + + // Check if replica in srcRGName is enough. + srcReplicas := m.getByCollectionAndRG(collectionID, srcRGName) + if len(srcReplicas) < replicaNum { + err := merr.WrapErrParameterInvalid( + "NumReplica not greater than the number of replica in source resource group", fmt.Sprintf("only found [%d] replicas of collection [%d] in source resource group [%s], but %d require", + len(srcReplicas), + collectionID, + srcRGName, + replicaNum)) + return nil, err + } + return srcReplicas, nil } // RemoveCollection removes replicas of given collection, @@ -220,11 +216,11 @@ func (m *ReplicaManager) RemoveCollection(collectionID typeutil.UniqueID) error if err != nil { return err } - for id, replica := range m.replicas { - if replica.CollectionID == collectionID { - delete(m.replicas, id) - } + // Remove all replica of collection and remove collection from collIDToReplicaIDs. + for replicaID := range m.collIDToReplicaIDs[collectionID] { + delete(m.replicas, replicaID) } + delete(m.collIDToReplicaIDs, collectionID) return nil } @@ -233,12 +229,11 @@ func (m *ReplicaManager) GetByCollection(collectionID typeutil.UniqueID) []*Repl defer m.rwmutex.RUnlock() replicas := make([]*Replica, 0) - for _, replica := range m.replicas { - if replica.CollectionID == collectionID { - replicas = append(replicas, replica) + if m.collIDToReplicaIDs[collectionID] != nil { + for replicaID := range m.collIDToReplicaIDs[collectionID] { + replicas = append(replicas, m.replicas[replicaID]) } } - return replicas } @@ -247,7 +242,7 @@ func (m *ReplicaManager) GetByCollectionAndNode(collectionID, nodeID typeutil.Un defer m.rwmutex.RUnlock() for _, replica := range m.replicas { - if replica.CollectionID == collectionID && replica.nodes.Contain(nodeID) { + if replica.GetCollectionID() == collectionID && replica.Contains(nodeID) { return replica } } @@ -261,7 +256,7 @@ func (m *ReplicaManager) GetByNode(nodeID typeutil.UniqueID) []*Replica { replicas := make([]*Replica, 0) for _, replica := range m.replicas { - if replica.nodes.Contain(nodeID) { + if replica.rwNodes.Contain(nodeID) { replicas = append(replicas, replica) } } @@ -269,17 +264,19 @@ func (m *ReplicaManager) GetByNode(nodeID typeutil.UniqueID) []*Replica { return replicas } -func (m *ReplicaManager) GetByCollectionAndRG(collectionID int64, rgName string) []*Replica { - m.rwmutex.RLock() - defer m.rwmutex.RUnlock() - - ret := make([]*Replica, 0) - for _, replica := range m.replicas { - if replica.GetCollectionID() == collectionID && replica.GetResourceGroup() == rgName { - ret = append(ret, replica) - } +func (m *ReplicaManager) getByCollectionAndRG(collectionID int64, rgName string) []*Replica { + replicaIDs, ok := m.collIDToReplicaIDs[collectionID] + if !ok { + return make([]*Replica, 0) } + ret := make([]*Replica, 0) + replicaIDs.Range(func(replicaID typeutil.UniqueID) bool { + if m.replicas[replicaID].GetResourceGroup() == rgName { + ret = append(ret, m.replicas[replicaID]) + } + return true + }) return ret } @@ -297,20 +294,93 @@ func (m *ReplicaManager) GetByResourceGroup(rgName string) []*Replica { return ret } -func (m *ReplicaManager) AddNode(replicaID typeutil.UniqueID, nodes ...typeutil.UniqueID) error { +// RecoverNodesInCollection recovers all nodes in collection with latest resource group. +// Promise a node will be only assigned to one replica in same collection at same time. +// 1. Move the rw nodes to ro nodes if they are not in related resource group. +// 2. Add new incoming nodes into the replica if they are not in-used by other replicas of same collection. +// 3. replicas in same resource group will shared the nodes in resource group fairly. +func (m *ReplicaManager) RecoverNodesInCollection(collectionID typeutil.UniqueID, rgs map[string]typeutil.UniqueSet) error { + if err := m.validateResourceGroups(rgs); err != nil { + return err + } + m.rwmutex.Lock() defer m.rwmutex.Unlock() - replica, ok := m.replicas[replicaID] - if !ok { - return merr.WrapErrReplicaNotFound(replicaID) + // create a helper to do the recover. + helper, err := m.getCollectionAssignmentHelper(collectionID, rgs) + if err != nil { + return err } - replica = replica.Clone() - replica.AddNode(nodes...) - return m.put(replica) + modifiedReplicas := make([]*Replica, 0) + // recover node by resource group. + helper.RangeOverResourceGroup(func(replicaHelper *replicasInSameRGAssignmentHelper) { + replicaHelper.RangeOverReplicas(func(assignment *replicaAssignmentInfo) { + roNodes := assignment.GetNewRONodes() + recoverableNodes, incomingNodeCount := assignment.GetRecoverNodesAndIncomingNodeCount() + // There may be not enough incoming nodes for current replica, + // Even we filtering the nodes that are used by other replica of same collection in other resource group, + // current replica's expected node may be still used by other replica of same collection in same resource group. + incomingNode := replicaHelper.AllocateIncomingNodes(incomingNodeCount) + if len(roNodes) == 0 && len(recoverableNodes) == 0 && len(incomingNode) == 0 { + // nothing to do. + return + } + mutableReplica := m.replicas[assignment.GetReplicaID()].copyForWrite() + mutableReplica.AddRONode(roNodes...) // rw -> ro + mutableReplica.AddRWNode(recoverableNodes...) // ro -> rw + mutableReplica.AddRWNode(incomingNode...) // unused -> rw + log.Info( + "new replica recovery found", + zap.Int64s("newRONodes", roNodes), + zap.Int64s("roToRWNodes", recoverableNodes), + zap.Int64s("newIncomingNodes", incomingNode)) + modifiedReplicas = append(modifiedReplicas, mutableReplica.IntoReplica()) + }) + }) + return m.put(modifiedReplicas...) } +// validateResourceGroups checks if the resource groups are valid. +func (m *ReplicaManager) validateResourceGroups(rgs map[string]typeutil.UniqueSet) error { + // make sure that node in resource group is mutual exclusive. + node := typeutil.NewUniqueSet() + for _, rg := range rgs { + for id := range rg { + if node.Contain(id) { + return errors.New("node in resource group is not mutual exclusive") + } + node.Insert(id) + } + } + return nil +} + +// getCollectionAssignmentHelper checks if the collection is recoverable and group replicas by resource group. +func (m *ReplicaManager) getCollectionAssignmentHelper(collectionID typeutil.UniqueID, rgs map[string]typeutil.UniqueSet) (*collectionAssignmentHelper, error) { + // check if the collection is exist. + replicaIDs, ok := m.collIDToReplicaIDs[collectionID] + if !ok { + return nil, errors.Errorf("collection %d not loaded", collectionID) + } + + rgToReplicas := make(map[string][]*Replica) + for replicaID := range replicaIDs { + replica := m.replicas[replicaID] + rgName := replica.GetResourceGroup() + if _, ok := rgs[rgName]; !ok { + return nil, errors.Errorf("lost resource group info, collectionID: %d, replicaID: %d, resourceGroup: %s", collectionID, replicaID, rgName) + } + if _, ok := rgToReplicas[rgName]; !ok { + rgToReplicas[rgName] = make([]*Replica, 0) + } + rgToReplicas[rgName] = append(rgToReplicas[rgName], replica) + } + return newCollectionAssignmentHelper(collectionID, rgToReplicas, rgs), nil +} + +// RemoveNode removes the node from all replicas of given collection. func (m *ReplicaManager) RemoveNode(replicaID typeutil.UniqueID, nodes ...typeutil.UniqueID) error { m.rwmutex.Lock() defer m.rwmutex.Unlock() @@ -320,9 +390,9 @@ func (m *ReplicaManager) RemoveNode(replicaID typeutil.UniqueID, nodes ...typeut return merr.WrapErrReplicaNotFound(replicaID) } - replica = replica.Clone() - replica.RemoveNode(nodes...) - return m.put(replica) + mutableReplica := replica.copyForWrite() + mutableReplica.RemoveNode(nodes...) // ro -> unused + return m.put(mutableReplica.IntoReplica()) } func (m *ReplicaManager) GetResourceGroupByCollection(collection typeutil.UniqueID) typeutil.Set[string] { diff --git a/internal/querycoordv2/meta/replica_manager_helper.go b/internal/querycoordv2/meta/replica_manager_helper.go new file mode 100644 index 0000000000..1bf6f8e8fb --- /dev/null +++ b/internal/querycoordv2/meta/replica_manager_helper.go @@ -0,0 +1,272 @@ +package meta + +import ( + "sort" + + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// collectionAssignmentHelper is a helper to manage the replica assignment in same collection. +type collectionAssignmentHelper struct { + collectionID typeutil.UniqueID + resourceGroupToReplicas map[string]*replicasInSameRGAssignmentHelper +} + +// newCollectionAssignmentHelper creates a new collectionAssignmentHelper. +func newCollectionAssignmentHelper( + collectionID typeutil.UniqueID, + rgToReplicas map[string][]*Replica, + rgs map[string]typeutil.UniqueSet, +) *collectionAssignmentHelper { + resourceGroupToReplicas := make(map[string]*replicasInSameRGAssignmentHelper) + for rgName, replicas := range rgToReplicas { + resourceGroupToReplicas[rgName] = newReplicaAssignmentHelper(rgName, replicas, rgs[rgName]) + } + + helper := &collectionAssignmentHelper{ + collectionID: collectionID, + resourceGroupToReplicas: resourceGroupToReplicas, + } + helper.updateIncomingNodesAndExpectedNode() + return helper +} + +// updateIncomingNodesAndExpectedNode updates the incoming nodes for all resource groups. +// An incoming node is a node that not used by current collection but in resource group. +func (h *collectionAssignmentHelper) updateIncomingNodesAndExpectedNode() { + // incoming nodes should be compared with all node of replica in same collection, even not in same resource group. + for _, helper := range h.resourceGroupToReplicas { + // some node in current resource group may load other replica data of same collection in other resource group. + // those node cannot be used right now. + newIncomingNodes := helper.nodesInRG.Clone() + currentUsedNodeCount := newIncomingNodes.Len() + h.RangeOverReplicas(func(rgName string, assignment *replicaAssignmentInfo) { + assignment.RangeOverAllNodes(func(nodeID int64) { + if newIncomingNodes.Contain(nodeID) { + newIncomingNodes.Remove(nodeID) + if rgName != helper.rgName { + // Node is still used by other replica of same collection in other resource group, cannot be used right now. + // filter it out to calculate the expected node count to avoid node starve of some replica in same resource group. + currentUsedNodeCount-- + } + } + }) + }) + helper.incomingNodes = newIncomingNodes + helper.updateExpectedNodeCountForReplicas(currentUsedNodeCount) + } +} + +// RangeOverResourceGroup iterate resource groups +func (h *collectionAssignmentHelper) RangeOverResourceGroup(f func(helper *replicasInSameRGAssignmentHelper)) { + for _, helper := range h.resourceGroupToReplicas { + f(helper) + } +} + +// RangeOverReplicas iterate replicas +func (h *collectionAssignmentHelper) RangeOverReplicas(f func(rgName string, assignment *replicaAssignmentInfo)) { + for _, helper := range h.resourceGroupToReplicas { + for _, assignment := range helper.replicas { + f(helper.rgName, assignment) + } + } +} + +// newReplicaAssignmentHelper creates a new replicaAssignmentHelper. +func newReplicaAssignmentHelper(rgName string, replicas []*Replica, nodeInRG typeutil.UniqueSet) *replicasInSameRGAssignmentHelper { + assignmentInfos := make([]*replicaAssignmentInfo, 0, len(replicas)) + for _, replica := range replicas { + assignmentInfos = append(assignmentInfos, newReplicaAssignmentInfo(replica, nodeInRG)) + } + h := &replicasInSameRGAssignmentHelper{ + rgName: rgName, + nodesInRG: nodeInRG, + replicas: assignmentInfos, + } + return h +} + +// replicasInSameRGAssignmentHelper is a helper to manage the replica assignment in same rg. +type replicasInSameRGAssignmentHelper struct { + rgName string + nodesInRG typeutil.UniqueSet + incomingNodes typeutil.UniqueSet // nodes that not used by current replicas in resource group. + replicas []*replicaAssignmentInfo +} + +func (h *replicasInSameRGAssignmentHelper) AllocateIncomingNodes(n int) []int64 { + nodeIDs := make([]int64, 0, n) + h.incomingNodes.Range(func(nodeID int64) bool { + if n > 0 { + nodeIDs = append(nodeIDs, nodeID) + n-- + } else { + return false + } + return true + }) + h.incomingNodes.Remove(nodeIDs...) + return nodeIDs +} + +// RangeOverReplicas iterate replicas. +func (h *replicasInSameRGAssignmentHelper) RangeOverReplicas(f func(*replicaAssignmentInfo)) { + for _, info := range h.replicas { + f(info) + } +} + +// updateExpectedNodeCountForReplicas updates the expected node count for all replicas in same resource group. +func (h *replicasInSameRGAssignmentHelper) updateExpectedNodeCountForReplicas(currentUsageNodesCount int) { + minimumNodeCount := currentUsageNodesCount / len(h.replicas) + maximumNodeCount := minimumNodeCount + remainder := currentUsageNodesCount % len(h.replicas) + if remainder > 0 { + maximumNodeCount += 1 + } + + // rule: + // 1. make minimumNodeCount <= expectedNodeCount <= maximumNodeCount + // 2. expectedNodeCount should be closed to len(assignedNodes) for each replica as much as possible to avoid unnecessary node transfer. + sorter := make(replicaAssignmentInfoSorter, 0, len(h.replicas)) + for _, info := range h.replicas { + sorter = append(sorter, info) + } + sort.Sort(sort.Reverse(replicaAssignmentInfoSortByAvailableAndRecoverable{sorter})) + for _, info := range sorter { + if remainder > 0 { + info.expectedNodeCount = maximumNodeCount + remainder-- + } else { + info.expectedNodeCount = minimumNodeCount + } + } +} + +// newReplicaAssignmentInfo creates a new replicaAssignmentInfo. +func newReplicaAssignmentInfo(replica *Replica, nodeInRG typeutil.UniqueSet) *replicaAssignmentInfo { + // node in replica can be split into 3 part. + rwNodes := make(typeutil.UniqueSet, replica.RWNodesCount()) + newRONodes := make(typeutil.UniqueSet, replica.RONodesCount()) + unrecoverableRONodes := make(typeutil.UniqueSet, replica.RONodesCount()) + recoverableRONodes := make(typeutil.UniqueSet, replica.RONodesCount()) + + replica.RangeOverRWNodes(func(nodeID int64) bool { + if nodeInRG.Contain(nodeID) { + rwNodes.Insert(nodeID) + } else { + newRONodes.Insert(nodeID) + } + return true + }) + + replica.RangeOverRONodes(func(nodeID int64) bool { + if nodeInRG.Contain(nodeID) { + recoverableRONodes.Insert(nodeID) + } else { + unrecoverableRONodes.Insert(nodeID) + } + return true + }) + return &replicaAssignmentInfo{ + replicaID: replica.GetID(), + expectedNodeCount: 0, + rwNodes: rwNodes, + newRONodes: newRONodes, + recoverableRONodes: recoverableRONodes, + unrecoverableRONodes: unrecoverableRONodes, + } +} + +type replicaAssignmentInfo struct { + replicaID typeutil.UniqueID + expectedNodeCount int // expected node count for each replica. + rwNodes typeutil.UniqueSet // rw nodes is used by current replica. (rw -> rw) + newRONodes typeutil.UniqueSet // new ro nodes for these replica. (rw -> ro) + recoverableRONodes typeutil.UniqueSet // recoverable ro nodes for these replica (ro node can be put back to rw node if it's in current resource group). (may ro -> rw) + unrecoverableRONodes typeutil.UniqueSet // unrecoverable ro nodes for these replica (ro node can't be put back to rw node if it's not in current resource group). (ro -> ro) +} + +// GetReplicaID returns the replica id for these replica. +func (s *replicaAssignmentInfo) GetReplicaID() typeutil.UniqueID { + return s.replicaID +} + +// GetNewRONodes returns the new ro nodes for these replica. +func (s *replicaAssignmentInfo) GetNewRONodes() []int64 { + newRONodes := make([]int64, 0, s.newRONodes.Len()) + // not in current resource group must be set ro. + for nodeID := range s.newRONodes { + newRONodes = append(newRONodes, nodeID) + } + + // too much node is occupied by current replica, then set some node to ro. + if s.rwNodes.Len() > s.expectedNodeCount { + cnt := s.rwNodes.Len() - s.expectedNodeCount + s.rwNodes.Range(func(node int64) bool { + if cnt > 0 { + newRONodes = append(newRONodes, node) + cnt-- + } else { + return false + } + return true + }) + } + return newRONodes +} + +// GetRecoverNodesAndIncomingNodeCount returns the recoverable ro nodes and incoming node count for these replica. +func (s *replicaAssignmentInfo) GetRecoverNodesAndIncomingNodeCount() (recoverNodes []int64, incomingNodeCount int) { + recoverNodes = make([]int64, 0, s.recoverableRONodes.Len()) + incomingNodeCount = 0 + if s.rwNodes.Len() < s.expectedNodeCount { + incomingNodeCount = s.expectedNodeCount - s.rwNodes.Len() + s.recoverableRONodes.Range(func(node int64) bool { + if incomingNodeCount > 0 { + recoverNodes = append(recoverNodes, node) + incomingNodeCount-- + } else { + return false + } + return true + }) + } + return recoverNodes, incomingNodeCount +} + +// RangeOverAllNodes iterate all nodes in replica. +func (s *replicaAssignmentInfo) RangeOverAllNodes(f func(nodeID int64)) { + ff := func(nodeID int64) bool { + f(nodeID) + return true + } + s.rwNodes.Range(ff) + s.newRONodes.Range(ff) + s.recoverableRONodes.Range(ff) + s.unrecoverableRONodes.Range(ff) +} + +type replicaAssignmentInfoSorter []*replicaAssignmentInfo + +func (s replicaAssignmentInfoSorter) Len() int { + return len(s) +} + +func (s replicaAssignmentInfoSorter) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +type replicaAssignmentInfoSortByAvailableAndRecoverable struct { + replicaAssignmentInfoSorter +} + +func (s replicaAssignmentInfoSortByAvailableAndRecoverable) Less(i, j int) bool { + left := s.replicaAssignmentInfoSorter[i].rwNodes.Len() + s.replicaAssignmentInfoSorter[i].recoverableRONodes.Len() + right := s.replicaAssignmentInfoSorter[j].rwNodes.Len() + s.replicaAssignmentInfoSorter[j].recoverableRONodes.Len() + + // Reach stable sort result by replica id. + // Otherwise unstable assignment may cause unnecessary node transfer. + return left < right || (left == right && s.replicaAssignmentInfoSorter[i].replicaID < s.replicaAssignmentInfoSorter[j].replicaID) +} diff --git a/internal/querycoordv2/meta/replica_manager_helper_test.go b/internal/querycoordv2/meta/replica_manager_helper_test.go new file mode 100644 index 0000000000..6ed9c36920 --- /dev/null +++ b/internal/querycoordv2/meta/replica_manager_helper_test.go @@ -0,0 +1,490 @@ +package meta + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type expectedReplicaPlan struct { + newRONodes int + recoverNodes int + incomingNodeCount int + expectedNodeCount int +} +type testCase struct { + collectionID typeutil.UniqueID // collection id + rgToReplicas map[string][]*Replica // from resource group to replicas + rgs map[string]typeutil.UniqueSet // from resource group to nodes + expectedPlan map[typeutil.UniqueID]expectedReplicaPlan // from replica id to expected plan + expectedNewIncomingNodes map[string]typeutil.UniqueSet // from resource group to incoming nodes +} + +type CollectionAssignmentHelperSuite struct { + suite.Suite +} + +func (s *CollectionAssignmentHelperSuite) TestNoModificationCase() { + s.runCase(testCase{ + collectionID: 1, + rgToReplicas: map[string][]*Replica{ + "rg1": { + newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 1, + Nodes: []int64{1, 2, 3, 4}, + RoNodes: []int64{}, + }), + }, + "rg2": { + newReplica(&querypb.Replica{ + ID: 2, + CollectionID: 1, + Nodes: []int64{5, 6}, + RoNodes: []int64{}, + }), + newReplica(&querypb.Replica{ + ID: 3, + CollectionID: 1, + Nodes: []int64{7, 8}, + RoNodes: []int64{}, + }), + }, + }, + rgs: map[string]typeutil.UniqueSet{ + "rg1": typeutil.NewUniqueSet(1, 2, 3, 4), + "rg2": typeutil.NewUniqueSet(5, 6, 7, 8), + }, + expectedPlan: map[typeutil.UniqueID]expectedReplicaPlan{ + 1: { + newRONodes: 0, + recoverNodes: 0, + incomingNodeCount: 0, + expectedNodeCount: 4, + }, + 2: { + newRONodes: 0, + recoverNodes: 0, + incomingNodeCount: 0, + expectedNodeCount: 2, + }, + 3: { + newRONodes: 0, + recoverNodes: 0, + incomingNodeCount: 0, + expectedNodeCount: 2, + }, + }, + expectedNewIncomingNodes: map[string]typeutil.UniqueSet{ + "rg1": typeutil.NewUniqueSet(), + "rg2": typeutil.NewUniqueSet(), + }, + }) + + s.runCase(testCase{ + collectionID: 1, + rgToReplicas: map[string][]*Replica{ + "rg1": { + newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 1, + Nodes: []int64{1, 2, 3, 4}, + RoNodes: []int64{}, + }), + }, + "rg2": { + newReplica(&querypb.Replica{ + ID: 2, + CollectionID: 1, + Nodes: []int64{5}, + RoNodes: []int64{}, + }), + newReplica(&querypb.Replica{ + ID: 3, + CollectionID: 1, + Nodes: []int64{6, 7}, + RoNodes: []int64{}, + }), + }, + }, + rgs: map[string]typeutil.UniqueSet{ + "rg1": typeutil.NewUniqueSet(1, 2, 3, 4), + "rg2": typeutil.NewUniqueSet(5, 6, 7), + }, + expectedPlan: map[typeutil.UniqueID]expectedReplicaPlan{ + 1: { + newRONodes: 0, + recoverNodes: 0, + incomingNodeCount: 0, + expectedNodeCount: 4, + }, + 2: { + newRONodes: 0, + recoverNodes: 0, + incomingNodeCount: 0, + expectedNodeCount: 1, + }, + 3: { + newRONodes: 0, + recoverNodes: 0, + incomingNodeCount: 0, + expectedNodeCount: 2, + }, + }, + expectedNewIncomingNodes: map[string]typeutil.UniqueSet{ + "rg1": typeutil.NewUniqueSet(), + "rg2": typeutil.NewUniqueSet(), + }, + }) +} + +func (s *CollectionAssignmentHelperSuite) TestRO() { + s.runCase(testCase{ + collectionID: 1, + rgToReplicas: map[string][]*Replica{ + "rg1": { + newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 1, + Nodes: []int64{1, 2, 3, 4, 5}, + RoNodes: []int64{}, + }), + }, + "rg2": { + newReplica(&querypb.Replica{ + ID: 2, + CollectionID: 1, + Nodes: []int64{6}, + RoNodes: []int64{}, + }), + newReplica(&querypb.Replica{ + ID: 3, + CollectionID: 1, + Nodes: []int64{7, 8}, + RoNodes: []int64{}, + }), + }, + }, + rgs: map[string]typeutil.UniqueSet{ + "rg1": typeutil.NewUniqueSet(1, 2, 3, 4), + "rg2": typeutil.NewUniqueSet(5, 6, 7, 8), + }, + expectedPlan: map[typeutil.UniqueID]expectedReplicaPlan{ + 1: { + newRONodes: 1, + recoverNodes: 0, + incomingNodeCount: 0, + expectedNodeCount: 4, + }, + 2: { + newRONodes: 0, + recoverNodes: 0, + incomingNodeCount: 0, + expectedNodeCount: 1, + }, + 3: { + newRONodes: 0, + recoverNodes: 0, + incomingNodeCount: 0, + expectedNodeCount: 2, + }, + }, + expectedNewIncomingNodes: map[string]typeutil.UniqueSet{ + "rg1": typeutil.NewUniqueSet(), + "rg2": typeutil.NewUniqueSet(), // 5 is still used rg1 of replica 1. + }, + }) + + s.runCase(testCase{ + collectionID: 1, + rgToReplicas: map[string][]*Replica{ + "rg1": { + newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 1, + Nodes: []int64{1, 2, 3, 4, 5}, + RoNodes: []int64{}, + }), + }, + "rg2": { + newReplica(&querypb.Replica{ + ID: 2, + CollectionID: 1, + Nodes: []int64{6}, + RoNodes: []int64{}, + }), + newReplica(&querypb.Replica{ + ID: 3, + CollectionID: 1, + Nodes: []int64{7, 8}, + RoNodes: []int64{}, + }), + }, + }, + rgs: map[string]typeutil.UniqueSet{ + "rg1": typeutil.NewUniqueSet(1, 2, 3, 4), + "rg2": typeutil.NewUniqueSet(5, 7, 8), + }, + expectedPlan: map[typeutil.UniqueID]expectedReplicaPlan{ + 1: { + newRONodes: 1, + recoverNodes: 0, + incomingNodeCount: 0, + expectedNodeCount: 4, + }, + 2: { + newRONodes: 1, + recoverNodes: 0, + incomingNodeCount: 1, + expectedNodeCount: 1, + }, + 3: { + newRONodes: 1, + recoverNodes: 0, + incomingNodeCount: 0, + expectedNodeCount: 1, + }, + }, + expectedNewIncomingNodes: map[string]typeutil.UniqueSet{ + "rg1": typeutil.NewUniqueSet(), + "rg2": typeutil.NewUniqueSet(), // 5 is still used rg1 of replica 1. + }, + }) +} + +func (s *CollectionAssignmentHelperSuite) TestIncomingNode() { + s.runCase(testCase{ + collectionID: 1, + rgToReplicas: map[string][]*Replica{ + "rg1": { + newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 1, + Nodes: []int64{1, 2}, + RoNodes: []int64{5}, + }), + }, + "rg2": { + newReplica(&querypb.Replica{ + ID: 2, + CollectionID: 1, + Nodes: []int64{6}, + RoNodes: []int64{}, + }), + newReplica(&querypb.Replica{ + ID: 3, + CollectionID: 1, + Nodes: []int64{7}, + RoNodes: []int64{}, + }), + }, + }, + rgs: map[string]typeutil.UniqueSet{ + "rg1": typeutil.NewUniqueSet(1, 2, 3, 4), + "rg2": typeutil.NewUniqueSet(5, 6, 7, 8), + }, + expectedPlan: map[typeutil.UniqueID]expectedReplicaPlan{ + 1: { + newRONodes: 0, + recoverNodes: 0, + incomingNodeCount: 2, + expectedNodeCount: 4, + }, + 2: { + newRONodes: 0, + recoverNodes: 0, + incomingNodeCount: 0, + expectedNodeCount: 1, + }, + 3: { + newRONodes: 0, + recoverNodes: 0, + incomingNodeCount: 1, + expectedNodeCount: 2, + }, + }, + expectedNewIncomingNodes: map[string]typeutil.UniqueSet{ + "rg1": typeutil.NewUniqueSet(3, 4), + "rg2": typeutil.NewUniqueSet(8), + }, + }) +} + +func (s *CollectionAssignmentHelperSuite) TestRecoverNode() { + s.runCase(testCase{ + collectionID: 1, + rgToReplicas: map[string][]*Replica{ + "rg1": { + newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 1, + Nodes: []int64{1, 2}, + RoNodes: []int64{3}, + }), + }, + "rg2": { + newReplica(&querypb.Replica{ + ID: 2, + CollectionID: 1, + Nodes: []int64{6}, + RoNodes: []int64{7}, + }), + newReplica(&querypb.Replica{ + ID: 3, + CollectionID: 1, + Nodes: []int64{8}, + RoNodes: []int64{}, + }), + }, + }, + rgs: map[string]typeutil.UniqueSet{ + "rg1": typeutil.NewUniqueSet(1, 2, 3, 4), + "rg2": typeutil.NewUniqueSet(5, 6, 7, 8), + }, + expectedPlan: map[typeutil.UniqueID]expectedReplicaPlan{ + 1: { + newRONodes: 0, + recoverNodes: 1, + incomingNodeCount: 1, + expectedNodeCount: 4, + }, + 2: { + newRONodes: 0, + recoverNodes: 1, + incomingNodeCount: 0, + expectedNodeCount: 2, + }, + 3: { + newRONodes: 0, + recoverNodes: 0, + incomingNodeCount: 1, + expectedNodeCount: 2, + }, + }, + expectedNewIncomingNodes: map[string]typeutil.UniqueSet{ + "rg1": typeutil.NewUniqueSet(4), + "rg2": typeutil.NewUniqueSet(5), + }, + }) +} + +func (s *CollectionAssignmentHelperSuite) TestMixRecoverNode() { + s.runCase(testCase{ + collectionID: 1, + rgToReplicas: map[string][]*Replica{ + "rg1": { + newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 1, + Nodes: []int64{1, 2}, + RoNodes: []int64{3}, + }), + }, + "rg2": { + newReplica(&querypb.Replica{ + ID: 2, + CollectionID: 1, + Nodes: []int64{6}, + RoNodes: []int64{7}, + }), + newReplica(&querypb.Replica{ + ID: 3, + CollectionID: 1, + Nodes: []int64{8}, + RoNodes: []int64{}, + }), + }, + "rg3": { + newReplica(&querypb.Replica{ + ID: 4, + CollectionID: 1, + Nodes: []int64{9}, + RoNodes: []int64{}, + }), + newReplica(&querypb.Replica{ + ID: 5, + CollectionID: 1, + Nodes: []int64{10}, + RoNodes: []int64{}, + }), + }, + }, + rgs: map[string]typeutil.UniqueSet{ + "rg1": typeutil.NewUniqueSet(1, 2, 3, 4), + "rg2": typeutil.NewUniqueSet(5, 6, 7), + "rg3": typeutil.NewUniqueSet(8, 9, 10), + }, + expectedPlan: map[typeutil.UniqueID]expectedReplicaPlan{ + 1: { + newRONodes: 0, + recoverNodes: 1, + incomingNodeCount: 1, + expectedNodeCount: 4, + }, + 2: { + newRONodes: 0, + recoverNodes: 1, + incomingNodeCount: 0, + expectedNodeCount: 2, + }, + 3: { + newRONodes: 1, + recoverNodes: 0, + incomingNodeCount: 1, + expectedNodeCount: 1, + }, + 4: { + newRONodes: 0, + recoverNodes: 0, + incomingNodeCount: 0, + expectedNodeCount: 1, + }, + 5: { + newRONodes: 0, + recoverNodes: 0, + incomingNodeCount: 0, + expectedNodeCount: 1, + }, + }, + expectedNewIncomingNodes: map[string]typeutil.UniqueSet{ + "rg1": typeutil.NewUniqueSet(4), + "rg2": typeutil.NewUniqueSet(5), + "rg3": typeutil.NewUniqueSet(), + }, + }) +} + +func (s *CollectionAssignmentHelperSuite) runCase(c testCase) { + cHelper := newCollectionAssignmentHelper(c.collectionID, c.rgToReplicas, c.rgs) + cHelper.RangeOverResourceGroup(func(rHelper *replicasInSameRGAssignmentHelper) { + s.ElementsMatch(c.expectedNewIncomingNodes[rHelper.rgName].Collect(), rHelper.incomingNodes.Collect()) + rHelper.RangeOverReplicas(func(assignment *replicaAssignmentInfo) { + roNodes := assignment.GetNewRONodes() + recoverNodes, incomingNodes := assignment.GetRecoverNodesAndIncomingNodeCount() + plan := c.expectedPlan[assignment.GetReplicaID()] + s.Equal( + plan.newRONodes, + len(roNodes), + ) + s.Equal( + plan.incomingNodeCount, + incomingNodes, + ) + s.Equal( + plan.recoverNodes, + len(recoverNodes), + ) + s.Equal( + plan.expectedNodeCount, + assignment.expectedNodeCount, + ) + }) + }) +} + +func TestCollectionAssignmentHelper(t *testing.T) { + suite.Run(t, new(CollectionAssignmentHelperSuite)) +} diff --git a/internal/querycoordv2/meta/replica_manager_test.go b/internal/querycoordv2/meta/replica_manager_test.go index 579eaaac8d..e45689332e 100644 --- a/internal/querycoordv2/meta/replica_manager_test.go +++ b/internal/querycoordv2/meta/replica_manager_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/golang/protobuf/proto" + "github.com/samber/lo" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -27,29 +28,59 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" + "github.com/milvus-io/milvus/internal/proto/querypb" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) +type collectionLoadConfig struct { + spawnConfig map[string]int +} + +func (c *collectionLoadConfig) getTotalSpawn() int { + totalSpawn := 0 + for _, spawnNum := range c.spawnConfig { + totalSpawn += spawnNum + } + return totalSpawn +} + +// Old replica manager test suite. type ReplicaManagerSuite struct { suite.Suite - nodes []int64 - collections []int64 - replicaNumbers []int32 - idAllocator func() (int64, error) - kv kv.MetaKv - catalog metastore.QueryCoordCatalog - mgr *ReplicaManager + rgs map[string]typeutil.UniqueSet + collections map[int64]collectionLoadConfig + idAllocator func() (int64, error) + kv kv.MetaKv + catalog metastore.QueryCoordCatalog + mgr *ReplicaManager } func (suite *ReplicaManagerSuite) SetupSuite() { paramtable.Init() - suite.nodes = []int64{1, 2, 3} - suite.collections = []int64{100, 101, 102} - suite.replicaNumbers = []int32{1, 2, 3} + suite.rgs = map[string]typeutil.UniqueSet{ + "RG1": typeutil.NewUniqueSet(1), + "RG2": typeutil.NewUniqueSet(2, 3), + "RG3": typeutil.NewUniqueSet(4, 5, 6), + } + suite.collections = map[int64]collectionLoadConfig{ + 100: { + spawnConfig: map[string]int{"RG1": 1}, + }, + 101: { + spawnConfig: map[string]int{"RG2": 2}, + }, + 102: { + spawnConfig: map[string]int{"RG3": 2}, + }, + 103: { + spawnConfig: map[string]int{"RG1": 1, "RG2": 1, "RG3": 1}, + }, + } } func (suite *ReplicaManagerSuite) SetupTest() { @@ -69,7 +100,7 @@ func (suite *ReplicaManagerSuite) SetupTest() { suite.idAllocator = RandomIncrementIDAllocator() suite.mgr = NewReplicaManager(suite.idAllocator, suite.catalog) - suite.spawnAndPutAll() + suite.spawnAll() } func (suite *ReplicaManagerSuite) TearDownTest() { @@ -79,39 +110,39 @@ func (suite *ReplicaManagerSuite) TearDownTest() { func (suite *ReplicaManagerSuite) TestSpawn() { mgr := suite.mgr - for i, collection := range suite.collections { - replicas, err := mgr.Spawn(collection, suite.replicaNumbers[i], DefaultResourceGroupName) - suite.NoError(err) - suite.Len(replicas, int(suite.replicaNumbers[i])) - } - mgr.idAllocator = ErrorIDAllocator() - for i, collection := range suite.collections { - _, err := mgr.Spawn(collection, suite.replicaNumbers[i], DefaultResourceGroupName) - suite.Error(err) - } + _, err := mgr.Spawn(1, map[string]int{DefaultResourceGroupName: 1}) + suite.Error(err) + + replicas := mgr.GetByCollection(1) + suite.Len(replicas, 0) } func (suite *ReplicaManagerSuite) TestGet() { mgr := suite.mgr - for i, collection := range suite.collections { - replicas := mgr.GetByCollection(collection) + for collectionID, collectionCfg := range suite.collections { + replicas := mgr.GetByCollection(collectionID) replicaNodes := make(map[int64][]int64) nodes := make([]int64, 0) for _, replica := range replicas { - suite.Equal(collection, replica.GetCollectionID()) + suite.Equal(collectionID, replica.GetCollectionID()) suite.Equal(replica, mgr.Get(replica.GetID())) - suite.Equal(len(replica.Replica.GetNodes()), replica.Len()) - suite.Equal(replica.Replica.GetNodes(), replica.GetNodes()) - replicaNodes[replica.GetID()] = replica.Replica.GetNodes() - nodes = append(nodes, replica.Replica.Nodes...) + suite.Equal(len(replica.replicaPB.GetNodes()), replica.RWNodesCount()) + suite.Equal(replica.replicaPB.GetNodes(), replica.GetNodes()) + replicaNodes[replica.GetID()] = replica.GetNodes() + nodes = append(nodes, replica.GetNodes()...) } - suite.Len(nodes, int(suite.replicaNumbers[i])) + + expectedNodes := make([]int64, 0) + for rg := range collectionCfg.spawnConfig { + expectedNodes = append(expectedNodes, suite.rgs[rg].Collect()...) + } + suite.ElementsMatch(nodes, expectedNodes) for replicaID, nodes := range replicaNodes { for _, node := range nodes { - replica := mgr.GetByCollectionAndNode(collection, node) + replica := mgr.GetByCollectionAndNode(collectionID, node) suite.Equal(replicaID, replica.GetID()) } } @@ -122,14 +153,18 @@ func (suite *ReplicaManagerSuite) TestGetByNode() { mgr := suite.mgr randomNodeID := int64(11111) - testReplica1, err := mgr.spawn(3002, DefaultResourceGroupName) - suite.NoError(err) - testReplica1.AddNode(randomNodeID) - - testReplica2, err := mgr.spawn(3002, DefaultResourceGroupName) - suite.NoError(err) - testReplica2.AddNode(randomNodeID) - + testReplica1 := newReplica(&querypb.Replica{ + CollectionID: 3002, + ID: 10086, + Nodes: []int64{randomNodeID}, + ResourceGroup: DefaultResourceGroupName, + }) + testReplica2 := newReplica(&querypb.Replica{ + CollectionID: 3002, + ID: 10087, + Nodes: []int64{randomNodeID}, + ResourceGroup: DefaultResourceGroupName, + }) mgr.Put(testReplica1, testReplica2) replicas := mgr.GetByNode(randomNodeID) @@ -141,7 +176,7 @@ func (suite *ReplicaManagerSuite) TestRecover() { // Clear data in memory, and then recover from meta store suite.clearMemory() - mgr.Recover(suite.collections) + mgr.Recover(lo.Keys(suite.collections)) suite.TestGet() // Test recover from 2.1 meta store @@ -155,13 +190,13 @@ func (suite *ReplicaManagerSuite) TestRecover() { suite.kv.Save(querycoord.ReplicaMetaPrefixV1+"/2100", string(value)) suite.clearMemory() - mgr.Recover(append(suite.collections, 1000)) + mgr.Recover(append(lo.Keys(suite.collections), 1000)) replica := mgr.Get(2100) suite.NotNil(replica) - suite.EqualValues(1000, replica.CollectionID) - suite.EqualValues([]int64{1, 2, 3}, replica.Replica.Nodes) - suite.Len(replica.GetNodes(), len(replica.Replica.GetNodes())) - for _, node := range replica.Replica.GetNodes() { + suite.EqualValues(1000, replica.GetCollectionID()) + suite.EqualValues([]int64{1, 2, 3}, replica.GetNodes()) + suite.Len(replica.GetNodes(), len(replica.GetNodes())) + for _, node := range replica.GetNodes() { suite.True(replica.Contains(node)) } } @@ -169,7 +204,7 @@ func (suite *ReplicaManagerSuite) TestRecover() { func (suite *ReplicaManagerSuite) TestRemove() { mgr := suite.mgr - for _, collection := range suite.collections { + for collection := range suite.collections { err := mgr.RemoveCollection(collection) suite.NoError(err) @@ -178,8 +213,8 @@ func (suite *ReplicaManagerSuite) TestRemove() { } // Check whether the replicas are also removed from meta store - mgr.Recover(suite.collections) - for _, collection := range suite.collections { + mgr.Recover(lo.Keys(suite.collections)) + for collection := range suite.collections { replicas := mgr.GetByCollection(collection) suite.Empty(replicas) } @@ -188,69 +223,72 @@ func (suite *ReplicaManagerSuite) TestRemove() { func (suite *ReplicaManagerSuite) TestNodeManipulate() { mgr := suite.mgr - firstNode := suite.nodes[0] - newNode := suite.nodes[len(suite.nodes)-1] + 1 - // Add a new node for the replica with node 1 of all collections, - // then remove the node 1 - for _, collection := range suite.collections { - replica := mgr.GetByCollectionAndNode(collection, firstNode) - err := mgr.AddNode(replica.GetID(), newNode) - suite.NoError(err) + // add node into rg. + rgs := map[string]typeutil.UniqueSet{ + "RG1": typeutil.NewUniqueSet(1, 7), + "RG2": typeutil.NewUniqueSet(2, 3, 8), + "RG3": typeutil.NewUniqueSet(4, 5, 6, 9), + } - replica = mgr.GetByCollectionAndNode(collection, newNode) - suite.Contains(replica.GetNodes(), newNode) - suite.Contains(replica.Replica.GetNodes(), newNode) - - err = mgr.RemoveNode(replica.GetID(), firstNode) - suite.NoError(err) - replica = mgr.GetByCollectionAndNode(collection, firstNode) - suite.Nil(replica) + // Add node into rg. + for collectionID, cfg := range suite.collections { + rgsOfCollection := make(map[string]typeutil.UniqueSet) + for rg := range cfg.spawnConfig { + rgsOfCollection[rg] = rgs[rg] + } + mgr.RecoverNodesInCollection(collectionID, rgsOfCollection) + for rg := range cfg.spawnConfig { + for _, node := range rgs[rg].Collect() { + replica := mgr.GetByCollectionAndNode(collectionID, node) + suite.Contains(replica.GetNodes(), node) + } + } } // Check these modifications are applied to meta store suite.clearMemory() - mgr.Recover(suite.collections) - for _, collection := range suite.collections { - replica := mgr.GetByCollectionAndNode(collection, firstNode) - suite.Nil(replica) - - replica = mgr.GetByCollectionAndNode(collection, newNode) - suite.Contains(replica.GetNodes(), newNode) - suite.Contains(replica.Replica.GetNodes(), newNode) + mgr.Recover(lo.Keys(suite.collections)) + for collectionID, cfg := range suite.collections { + for rg := range cfg.spawnConfig { + for _, node := range rgs[rg].Collect() { + replica := mgr.GetByCollectionAndNode(collectionID, node) + suite.Contains(replica.GetNodes(), node) + } + } } } -func (suite *ReplicaManagerSuite) spawnAndPutAll() { +func (suite *ReplicaManagerSuite) spawnAll() { mgr := suite.mgr - for i, collection := range suite.collections { - replicas, err := mgr.Spawn(collection, suite.replicaNumbers[i], DefaultResourceGroupName) + for id, cfg := range suite.collections { + replicas, err := mgr.Spawn(id, cfg.spawnConfig) suite.NoError(err) - suite.Len(replicas, int(suite.replicaNumbers[i])) - for j, replica := range replicas { - replica.AddNode(suite.nodes[j]) + totalSpawn := 0 + rgsOfCollection := make(map[string]typeutil.UniqueSet) + for rg, spawnNum := range cfg.spawnConfig { + totalSpawn += spawnNum + rgsOfCollection[rg] = suite.rgs[rg] } - err = mgr.Put(replicas...) - suite.NoError(err) + mgr.RecoverNodesInCollection(id, rgsOfCollection) + suite.Len(replicas, totalSpawn) } } func (suite *ReplicaManagerSuite) TestResourceGroup() { mgr := NewReplicaManager(suite.idAllocator, suite.catalog) - replica1, err := mgr.spawn(int64(1000), DefaultResourceGroupName) - replica1.AddNode(1) + replicas1, err := mgr.Spawn(int64(1000), map[string]int{DefaultResourceGroupName: 1}) suite.NoError(err) - mgr.Put(replica1) + suite.NotNil(replicas1) + suite.Len(replicas1, 1) - replica2, err := mgr.spawn(int64(2000), DefaultResourceGroupName) - replica2.AddNode(1) + replica2, err := mgr.Spawn(int64(2000), map[string]int{DefaultResourceGroupName: 1}) suite.NoError(err) - mgr.Put(replica2) + suite.NotNil(replica2) + suite.Len(replica2, 1) replicas := mgr.GetByResourceGroup(DefaultResourceGroupName) suite.Len(replicas, 2) - replicas = mgr.GetByCollectionAndRG(int64(1000), DefaultResourceGroupName) - suite.Len(replicas, 1) rgNames := mgr.GetResourceGroupByCollection(int64(1000)) suite.Len(rgNames, 1) suite.True(rgNames.Contain(DefaultResourceGroupName)) @@ -260,6 +298,175 @@ func (suite *ReplicaManagerSuite) clearMemory() { suite.mgr.replicas = make(map[int64]*Replica) } +type ReplicaManagerV2Suite struct { + suite.Suite + + rgs map[string]typeutil.UniqueSet + collections map[int64]collectionLoadConfig + kv kv.MetaKv + catalog metastore.QueryCoordCatalog + mgr *ReplicaManager +} + +func (suite *ReplicaManagerV2Suite) SetupSuite() { + paramtable.Init() + + suite.rgs = map[string]typeutil.UniqueSet{ + "RG1": typeutil.NewUniqueSet(1), + "RG2": typeutil.NewUniqueSet(2, 3), + "RG3": typeutil.NewUniqueSet(4, 5, 6), + "RG4": typeutil.NewUniqueSet(7, 8, 9, 10), + "RG5": typeutil.NewUniqueSet(11, 12, 13, 14, 15), + } + suite.collections = map[int64]collectionLoadConfig{ + // 1000: { + // spawnConfig: map[string]int{"RG1": 1}, + // }, + // 1001: { + // spawnConfig: map[string]int{"RG2": 2}, + // }, + // 1002: { + // spawnConfig: map[string]int{"RG3": 2}, + // }, + // 1003: { + // spawnConfig: map[string]int{"RG1": 1, "RG2": 1, "RG3": 1}, + // }, + // 1004: { + // spawnConfig: map[string]int{"RG4": 2, "RG5": 3}, + // }, + 1005: { + spawnConfig: map[string]int{"RG4": 3, "RG5": 2}, + }, + } + + var err error + config := GenerateEtcdConfig() + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd.GetAsBool(), + config.EtcdUseSSL.GetAsBool(), + config.Endpoints.GetAsStrings(), + config.EtcdTLSCert.GetValue(), + config.EtcdTLSKey.GetValue(), + config.EtcdTLSCACert.GetValue(), + config.EtcdTLSMinVersion.GetValue()) + suite.Require().NoError(err) + suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue()) + suite.catalog = querycoord.NewCatalog(suite.kv) + + idAllocator := RandomIncrementIDAllocator() + suite.mgr = NewReplicaManager(idAllocator, suite.catalog) +} + +func (suite *ReplicaManagerV2Suite) TearDownSuite() { + suite.kv.Close() +} + +func (suite *ReplicaManagerV2Suite) TestSpawn() { + mgr := suite.mgr + + for id, cfg := range suite.collections { + replicas, err := mgr.Spawn(id, cfg.spawnConfig) + suite.NoError(err) + rgsOfCollection := make(map[string]typeutil.UniqueSet) + for rg := range cfg.spawnConfig { + rgsOfCollection[rg] = suite.rgs[rg] + } + mgr.RecoverNodesInCollection(id, rgsOfCollection) + for rg := range cfg.spawnConfig { + for _, node := range suite.rgs[rg].Collect() { + replica := mgr.GetByCollectionAndNode(id, node) + suite.Contains(replica.GetNodes(), node) + } + } + suite.Len(replicas, cfg.getTotalSpawn()) + replicas = mgr.GetByCollection(id) + suite.Len(replicas, cfg.getTotalSpawn()) + } + suite.testIfBalanced() +} + +func (suite *ReplicaManagerV2Suite) testIfBalanced() { + // If balanced + for id := range suite.collections { + replicas := suite.mgr.GetByCollection(id) + rgToReplica := make(map[string][]*Replica, 0) + for _, r := range replicas { + rgToReplica[r.GetResourceGroup()] = append(rgToReplica[r.GetResourceGroup()], r) + } + for _, replicas := range rgToReplica { + maximumNodes := -1 + minimumNodes := -1 + nodes := make([]int64, 0) + for _, r := range replicas { + availableNodes := suite.rgs[r.GetResourceGroup()] + if maximumNodes == -1 || r.RWNodesCount() > maximumNodes { + maximumNodes = r.RWNodesCount() + } + if minimumNodes == -1 || r.RWNodesCount() < minimumNodes { + minimumNodes = r.RWNodesCount() + } + nodes = append(nodes, r.GetNodes()...) + r.RangeOverRONodes(func(node int64) bool { + if availableNodes.Contain(node) { + nodes = append(nodes, node) + } + return true + }) + } + suite.ElementsMatch(nodes, suite.rgs[replicas[0].GetResourceGroup()].Collect()) + suite.True(maximumNodes-minimumNodes <= 1) + } + } +} + +func (suite *ReplicaManagerV2Suite) TestTransferReplica() { + suite.mgr.TransferReplica(1005, "RG4", "RG5", 1) + suite.recoverReplica(2, true) + suite.testIfBalanced() +} + +func (suite *ReplicaManagerV2Suite) TestTransferReplicaAndAddNode() { + suite.mgr.TransferReplica(1005, "RG4", "RG5", 1) + suite.recoverReplica(1, false) + suite.rgs["RG5"].Insert(16, 17, 18) + suite.recoverReplica(2, true) + suite.testIfBalanced() +} + +func (suite *ReplicaManagerV2Suite) TestTransferNode() { + suite.rgs["RG4"].Remove(7) + suite.rgs["RG5"].Insert(7) + suite.recoverReplica(2, true) + suite.testIfBalanced() +} + +func (suite *ReplicaManagerV2Suite) recoverReplica(k int, clearOutbound bool) { + // need at least two times to recover the replicas. + // transfer node between replicas need set to outbound and then set to incoming. + for i := 0; i < k; i++ { + // do a recover + for id, cfg := range suite.collections { + rgsOfCollection := make(map[string]typeutil.UniqueSet) + for rg := range cfg.spawnConfig { + rgsOfCollection[rg] = suite.rgs[rg] + } + suite.mgr.RecoverNodesInCollection(id, rgsOfCollection) + } + + // clear all outbound nodes + if clearOutbound { + for id := range suite.collections { + replicas := suite.mgr.GetByCollection(id) + for _, r := range replicas { + outboundNodes := r.GetRONodes() + suite.mgr.RemoveNode(r.GetID(), outboundNodes...) + } + } + } + } +} + func TestReplicaManager(t *testing.T) { suite.Run(t, new(ReplicaManagerSuite)) + suite.Run(t, new(ReplicaManagerV2Suite)) } diff --git a/internal/querycoordv2/meta/replica_test.go b/internal/querycoordv2/meta/replica_test.go new file mode 100644 index 0000000000..d7fa955ce7 --- /dev/null +++ b/internal/querycoordv2/meta/replica_test.go @@ -0,0 +1,175 @@ +package meta + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/internal/proto/querypb" +) + +type ReplicaSuite struct { + suite.Suite + + replicaPB *querypb.Replica +} + +func (suite *ReplicaSuite) SetupSuite() { + suite.replicaPB = &querypb.Replica{ + ID: 1, + CollectionID: 2, + Nodes: []int64{1, 2, 3}, + ResourceGroup: DefaultResourceGroupName, + RoNodes: []int64{4}, + } +} + +func (suite *ReplicaSuite) TestReadOperations() { + r := newReplica(suite.replicaPB) + suite.testRead(r) + // keep same after clone. + mutableReplica := r.copyForWrite() + suite.testRead(mutableReplica.IntoReplica()) +} + +func (suite *ReplicaSuite) TestClone() { + r := newReplica(suite.replicaPB) + r2 := r.copyForWrite() + suite.testRead(r) + + // after apply write operation on copy, the original should not be affected. + r2.AddRWNode(5, 6) + r2.AddRONode(1, 2) + r2.RemoveNode(3) + suite.testRead(r) +} + +func (suite *ReplicaSuite) TestRange() { + count := 0 + r := newReplica(suite.replicaPB) + r.RangeOverRWNodes(func(nodeID int64) bool { + count++ + return true + }) + suite.Equal(3, count) + count = 0 + r.RangeOverRONodes(func(nodeID int64) bool { + count++ + return true + }) + suite.Equal(1, count) + + count = 0 + r.RangeOverRWNodes(func(nodeID int64) bool { + count++ + return false + }) + suite.Equal(1, count) + + mr := r.copyForWrite() + mr.AddRONode(1) + + count = 0 + mr.RangeOverRWNodes(func(nodeID int64) bool { + count++ + return false + }) + suite.Equal(1, count) +} + +func (suite *ReplicaSuite) TestWriteOperation() { + r := newReplica(suite.replicaPB) + mr := r.copyForWrite() + + // test add available node. + suite.False(mr.Contains(5)) + suite.False(mr.Contains(6)) + mr.AddRWNode(5, 6) + suite.Equal(3, r.RWNodesCount()) + suite.Equal(1, r.RONodesCount()) + suite.Equal(4, r.NodesCount()) + suite.Equal(5, mr.RWNodesCount()) + suite.Equal(1, mr.RONodesCount()) + suite.Equal(6, mr.NodesCount()) + suite.True(mr.Contains(5)) + suite.True(mr.Contains(5)) + suite.True(mr.Contains(6)) + + // test add ro node. + suite.False(mr.Contains(4)) + suite.False(mr.Contains(7)) + mr.AddRWNode(4, 7) + suite.Equal(3, r.RWNodesCount()) + suite.Equal(1, r.RONodesCount()) + suite.Equal(4, r.NodesCount()) + suite.Equal(7, mr.RWNodesCount()) + suite.Equal(0, mr.RONodesCount()) + suite.Equal(7, mr.NodesCount()) + suite.True(mr.Contains(4)) + suite.True(mr.Contains(7)) + + // test remove node to ro. + mr.AddRONode(4, 7) + suite.Equal(3, r.RWNodesCount()) + suite.Equal(1, r.RONodesCount()) + suite.Equal(4, r.NodesCount()) + suite.Equal(5, mr.RWNodesCount()) + suite.Equal(2, mr.RONodesCount()) + suite.Equal(7, mr.NodesCount()) + suite.False(mr.Contains(4)) + suite.False(mr.Contains(7)) + + // test remove node. + mr.RemoveNode(4, 5, 7, 8) + suite.Equal(3, r.RWNodesCount()) + suite.Equal(1, r.RONodesCount()) + suite.Equal(4, r.NodesCount()) + suite.Equal(4, mr.RWNodesCount()) + suite.Equal(0, mr.RONodesCount()) + suite.Equal(4, mr.NodesCount()) + suite.False(mr.Contains(4)) + suite.False(mr.Contains(5)) + suite.False(mr.Contains(7)) + + // test set resource group. + mr.SetResourceGroup("rg1") + suite.Equal(r.GetResourceGroup(), DefaultResourceGroupName) + suite.Equal("rg1", mr.GetResourceGroup()) + + // should panic after IntoReplica. + mr.IntoReplica() + suite.Panics(func() { + mr.SetResourceGroup("newResourceGroup") + }) +} + +func (suite *ReplicaSuite) testRead(r *Replica) { + // Test GetID() + suite.Equal(suite.replicaPB.GetID(), r.GetID()) + + // Test GetCollectionID() + suite.Equal(suite.replicaPB.GetCollectionID(), r.GetCollectionID()) + + // Test GetResourceGroup() + suite.Equal(suite.replicaPB.GetResourceGroup(), r.GetResourceGroup()) + + // Test GetNodes() + suite.ElementsMatch(suite.replicaPB.GetNodes(), r.GetNodes()) + + // Test GetRONodes() + suite.ElementsMatch(suite.replicaPB.GetRoNodes(), r.GetRONodes()) + + // Test AvailableNodesCount() + suite.Equal(len(suite.replicaPB.GetNodes()), r.RWNodesCount()) + + // Test Contains() + suite.True(r.Contains(1)) + suite.False(r.Contains(4)) + + // Test ContainRONode() + suite.True(r.ContainRONode(4)) +} + +func TestReplica(t *testing.T) { + suite.Run(t, new(ReplicaSuite)) +} diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index c0f6257c53..3212e53534 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -311,6 +311,21 @@ func (rm *ResourceManager) unassignNode(rgName string, node int64) error { return nil } +// GetNodesOfMultiRG return nodes of multi rg, it can be used to get a consistent view of nodes of multi rg. +func (rm *ResourceManager) GetNodesOfMultiRG(rgName []string) (map[string]typeutil.UniqueSet, error) { + rm.rwmutex.RLock() + defer rm.rwmutex.RUnlock() + ret := make(map[string]typeutil.UniqueSet) + for _, name := range rgName { + if rm.groups[name] == nil { + return nil, merr.WrapErrResourceGroupNotFound(name) + } + rm.checkRGNodeStatus(name) + ret[name] = typeutil.NewUniqueSet(rm.groups[name].GetNodes()...) + } + return ret, nil +} + func (rm *ResourceManager) GetNodes(rgName string) ([]int64, error) { rm.rwmutex.RLock() defer rm.rwmutex.RUnlock() @@ -323,26 +338,6 @@ func (rm *ResourceManager) GetNodes(rgName string) ([]int64, error) { return rm.groups[rgName].GetNodes(), nil } -// return all outbound node -func (rm *ResourceManager) CheckOutboundNodes(replica *Replica) typeutil.UniqueSet { - rm.rwmutex.RLock() - defer rm.rwmutex.RUnlock() - - if rm.groups[replica.GetResourceGroup()] == nil { - return typeutil.NewUniqueSet() - } - rg := rm.groups[replica.GetResourceGroup()] - - ret := typeutil.NewUniqueSet() - for _, node := range replica.GetNodes() { - if !rg.containsNode(node) { - ret.Insert(node) - } - } - - return ret -} - // return outgoing node num on each rg from this replica func (rm *ResourceManager) GetOutgoingNodeNumByReplica(replica *Replica) map[string]int32 { rm.rwmutex.RLock() @@ -354,15 +349,15 @@ func (rm *ResourceManager) GetOutgoingNodeNumByReplica(replica *Replica) map[str rg := rm.groups[replica.GetResourceGroup()] ret := make(map[string]int32) - for _, node := range replica.GetNodes() { + replica.RangeOverRONodes(func(node int64) bool { if !rg.containsNode(node) { rgName, err := rm.findResourceGroupByNode(node) if err == nil { ret[rgName]++ } } - } - + return true + }) return ret } diff --git a/internal/querycoordv2/meta/resource_manager_test.go b/internal/querycoordv2/meta/resource_manager_test.go index 0c8828237d..0b8db28b37 100644 --- a/internal/querycoordv2/meta/resource_manager_test.go +++ b/internal/querycoordv2/meta/resource_manager_test.go @@ -312,20 +312,6 @@ func (suite *ResourceManagerSuite) TestCheckOutboundNodes() { suite.manager.AssignNode("rg", 1) suite.manager.AssignNode("rg", 2) suite.manager.AssignNode("rg", 3) - - replica := NewReplica( - &querypb.Replica{ - ID: 1, - CollectionID: 1, - Nodes: []int64{1, 2, 3, 4}, - ResourceGroup: "rg", - }, - typeutil.NewUniqueSet(1, 2, 3, 4), - ) - - outboundNodes := suite.manager.CheckOutboundNodes(replica) - suite.Len(outboundNodes, 1) - suite.True(outboundNodes.Contain(4)) } func (suite *ResourceManagerSuite) TestCheckResourceGroup() { @@ -391,9 +377,10 @@ func (suite *ResourceManagerSuite) TestGetOutboundNode() { ID: 1, CollectionID: 100, ResourceGroup: "rg", - Nodes: []int64{1, 2, 3}, + Nodes: []int64{1, 2}, + RoNodes: []int64{3}, }, - typeutil.NewUniqueSet(1, 2, 3), + typeutil.NewUniqueSet(1, 2), ) outgoingNodes := suite.manager.GetOutgoingNodeNumByReplica(replica) diff --git a/internal/querycoordv2/meta/segment_dist_manager_test.go b/internal/querycoordv2/meta/segment_dist_manager_test.go index 699972700d..79d5340ba0 100644 --- a/internal/querycoordv2/meta/segment_dist_manager_test.go +++ b/internal/querycoordv2/meta/segment_dist_manager_test.go @@ -23,7 +23,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) type SegmentDistManagerSuite struct { @@ -125,26 +124,20 @@ func (suite *SegmentDistManagerSuite) TestGetBy() { suite.Len(segments, 0) // Test GetBy With Wrong Replica - replica := &Replica{ - Replica: &querypb.Replica{ - ID: 1, - CollectionID: suite.collection + 1, - Nodes: []int64{suite.nodes[0]}, - }, - nodes: typeutil.NewUniqueSet(suite.nodes[0]), - } + replica := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: suite.collection + 1, + Nodes: []int64{suite.nodes[0]}, + }) segments = dist.GetByFilter(WithReplica(replica)) suite.Len(segments, 0) // Test GetBy With Correct Replica - replica = &Replica{ - Replica: &querypb.Replica{ - ID: 1, - CollectionID: suite.collection, - Nodes: []int64{suite.nodes[0]}, - }, - nodes: typeutil.NewUniqueSet(suite.nodes[0]), - } + replica = newReplica(&querypb.Replica{ + ID: 1, + CollectionID: suite.collection, + Nodes: []int64{suite.nodes[0]}, + }) segments = dist.GetByFilter(WithReplica(replica)) suite.Len(segments, 2) } diff --git a/internal/querycoordv2/observers/collection_observer_test.go b/internal/querycoordv2/observers/collection_observer_test.go index 0d2e8e401a..478094f975 100644 --- a/internal/querycoordv2/observers/collection_observer_test.go +++ b/internal/querycoordv2/observers/collection_observer_test.go @@ -390,10 +390,10 @@ func (suite *CollectionObserverSuite) loadAll() { func (suite *CollectionObserverSuite) load(collection int64) { // Mock meta data - replicas, err := suite.meta.ReplicaManager.Spawn(collection, suite.replicaNumber[collection], meta.DefaultResourceGroupName) + replicas, err := suite.meta.ReplicaManager.Spawn(collection, map[string]int{meta.DefaultResourceGroupName: int(suite.replicaNumber[collection])}) suite.NoError(err) for _, replica := range replicas { - replica.AddNode(suite.nodes...) + replica.AddRWNode(suite.nodes...) } err = suite.meta.ReplicaManager.Put(replicas...) suite.NoError(err) diff --git a/internal/querycoordv2/observers/replica_observer.go b/internal/querycoordv2/observers/replica_observer.go index 6816f45c8c..1f1d53fd01 100644 --- a/internal/querycoordv2/observers/replica_observer.go +++ b/internal/querycoordv2/observers/replica_observer.go @@ -29,7 +29,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" ) -// check replica, find outbound nodes and remove it from replica if all segment/channel has been moved +// check replica, find read only nodes and remove it from replica if all segment/channel has been moved type ReplicaObserver struct { cancel context.CancelFunc wg sync.WaitGroup @@ -85,46 +85,45 @@ func (ob *ReplicaObserver) checkNodesInReplica() { log := log.Ctx(context.Background()).WithRateGroup("qcv2.replicaObserver", 1, 60) collections := ob.meta.GetAll() for _, collectionID := range collections { - removedNodes := make([]int64, 0) - // remove nodes from replica which has been transferred to other rg + utils.RecoverReplicaOfCollection(ob.meta, collectionID) + } + + // check all ro nodes, remove it from replica if all segment/channel has been moved + for _, collectionID := range collections { replicas := ob.meta.ReplicaManager.GetByCollection(collectionID) for _, replica := range replicas { - outboundNodes := ob.meta.ResourceManager.CheckOutboundNodes(replica) - if len(outboundNodes) > 0 { - log.RatedInfo(10, "found outbound nodes in replica", - zap.Int64("collectionID", replica.GetCollectionID()), - zap.Int64("replicaID", replica.GetID()), - zap.Int64s("allOutboundNodes", outboundNodes.Collect()), - ) - - for node := range outboundNodes { - segments := ob.distMgr.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithNodeID(node)) - channels := ob.distMgr.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(node)) - - if len(channels) == 0 && len(segments) == 0 { - replica.RemoveNode(node) - removedNodes = append(removedNodes, node) - log.Info("all segment/channel has been removed from outbound node, remove it from replica", - zap.Int64("collectionID", replica.GetCollectionID()), - zap.Int64("replicaID", replica.GetID()), - zap.Int64("removedNodes", node), - zap.Int64s("availableNodes", replica.GetNodes()), - ) - } - } - } - } - - // assign removed nodes to other replicas in current rg - for _, node := range removedNodes { - rg, err := ob.meta.ResourceManager.FindResourceGroupByNode(node) - if err != nil { - // unreachable logic path - log.Warn("found node which does not belong to any resource group", zap.Int64("nodeID", node)) + roNodes := replica.GetRONodes() + if len(roNodes) == 0 { continue } - replicas := ob.meta.ReplicaManager.GetByCollectionAndRG(collectionID, rg) - utils.AddNodesToReplicas(ob.meta, replicas, node) + log.RatedInfo(10, "found ro nodes in replica", + zap.Int64("collectionID", replica.GetCollectionID()), + zap.Int64("replicaID", replica.GetID()), + zap.Int64s("RONodes", roNodes), + ) + removeNodes := make([]int64, 0, len(roNodes)) + for _, node := range roNodes { + channels := ob.distMgr.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(node)) + segments := ob.distMgr.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithNodeID(node)) + if len(channels) == 0 && len(segments) == 0 { + removeNodes = append(removeNodes, node) + } + } + if len(removeNodes) == 0 { + continue + } + logger := log.With( + zap.Int64("collectionID", replica.GetCollectionID()), + zap.Int64("replicaID", replica.GetID()), + zap.Int64s("removedNodes", removeNodes), + zap.Int64s("roNodes", roNodes), + zap.Int64s("availableNodes", replica.GetNodes()), + ) + if err := ob.meta.ReplicaManager.RemoveNode(replica.GetID(), removeNodes...); err != nil { + logger.Warn("fail to remove node from replica", zap.Error(err)) + continue + } + logger.Info("all segment/channel has been removed from ro node, try to remove it from replica") } } } diff --git a/internal/querycoordv2/observers/replica_observer_test.go b/internal/querycoordv2/observers/replica_observer_test.go index 102fe01cae..d4ca0ad937 100644 --- a/internal/querycoordv2/observers/replica_observer_test.go +++ b/internal/querycoordv2/observers/replica_observer_test.go @@ -147,7 +147,7 @@ func (suite *ReplicaObserverSuite) TestCheckNodesInReplica() { suite.Eventually(func() bool { replica0 := suite.meta.ReplicaManager.Get(10000) replica1 := suite.meta.ReplicaManager.Get(10001) - return suite.Contains(replica0.GetNodes(), int64(3)) && suite.NotContains(replica1.GetNodes(), int64(3)) && suite.Len(replica1.GetNodes(), 1) + return (replica0.Contains(3) || replica0.ContainRONode(3)) && suite.NotContains(replica1.GetNodes(), int64(3)) && suite.Len(replica1.GetNodes(), 1) }, 6*time.Second, 2*time.Second) suite.distMgr.ChannelDistManager.Update(3) @@ -156,7 +156,7 @@ func (suite *ReplicaObserverSuite) TestCheckNodesInReplica() { suite.Eventually(func() bool { replica0 := suite.meta.ReplicaManager.Get(10000) replica1 := suite.meta.ReplicaManager.Get(10001) - return suite.NotContains(replica0.GetNodes(), int64(3)) && suite.Contains(replica1.GetNodes(), int64(3)) && suite.Len(replica1.GetNodes(), 2) + return (!replica0.Contains(3) && !replica0.ContainRONode(3)) && suite.Contains(replica1.GetNodes(), int64(3)) && suite.Len(replica1.GetNodes(), 2) }, 6*time.Second, 2*time.Second) } diff --git a/internal/querycoordv2/observers/resource_observer.go b/internal/querycoordv2/observers/resource_observer.go index dfb23b2763..61be668663 100644 --- a/internal/querycoordv2/observers/resource_observer.go +++ b/internal/querycoordv2/observers/resource_observer.go @@ -105,9 +105,10 @@ func (ob *ResourceObserver) checkResourceGroup() { zap.Error(err), ) } - - utils.AddNodesToCollectionsInRG(ob.meta, rgName, nodes...) } } } + if enableRGAutoRecover { + utils.RecoverAllCollection(ob.meta) + } } diff --git a/internal/querycoordv2/observers/resource_observer_test.go b/internal/querycoordv2/observers/resource_observer_test.go index 101574b3c8..0817e01d8f 100644 --- a/internal/querycoordv2/observers/resource_observer_test.go +++ b/internal/querycoordv2/observers/resource_observer_test.go @@ -91,7 +91,7 @@ func (suite *ResourceObserverSuite) SetupTest() { func (suite *ResourceObserverSuite) TestCheckNodesInReplica() { suite.store.EXPECT().SaveCollection(mock.Anything).Return(nil) - suite.store.EXPECT().SaveReplica(mock.Anything).Return(nil) + suite.store.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil) suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2)) suite.meta.ReplicaManager.Put(meta.NewReplica( &querypb.Replica{ @@ -176,7 +176,7 @@ func (suite *ResourceObserverSuite) TestRecoverResourceGroupFailed() { func (suite *ResourceObserverSuite) TestRecoverReplicaFailed() { suite.store.EXPECT().SaveCollection(mock.Anything).Return(nil) - suite.store.EXPECT().SaveReplica(mock.Anything).Return(nil).Times(2) + suite.store.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil).Times(2) suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2)) suite.meta.ReplicaManager.Put(meta.NewReplica( &querypb.Replica{ @@ -199,7 +199,7 @@ func (suite *ResourceObserverSuite) TestRecoverReplicaFailed() { typeutil.NewUniqueSet(), )) - suite.store.EXPECT().SaveReplica(mock.Anything).Return(errors.New("store error")) + suite.store.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(errors.New("store error")) suite.meta.ResourceManager.AddResourceGroup("rg") suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ NodeID: int64(100), diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index be29a353f2..fa260c98b0 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -92,9 +92,9 @@ func (suite *TargetObserverSuite) SetupTest() { suite.NoError(err) err = suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collectionID, suite.partitionID)) suite.NoError(err) - replicas, err := suite.meta.ReplicaManager.Spawn(suite.collectionID, 1, meta.DefaultResourceGroupName) + replicas, err := suite.meta.ReplicaManager.Spawn(suite.collectionID, map[string]int{meta.DefaultResourceGroupName: 1}) suite.NoError(err) - replicas[0].AddNode(2) + replicas[0].AddRWNode(2) err = suite.meta.ReplicaManager.Put(replicas...) suite.NoError(err) @@ -276,9 +276,9 @@ func (suite *TargetObserverCheckSuite) SetupTest() { suite.NoError(err) err = suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collectionID, suite.partitionID)) suite.NoError(err) - replicas, err := suite.meta.ReplicaManager.Spawn(suite.collectionID, 1, meta.DefaultResourceGroupName) + replicas, err := suite.meta.ReplicaManager.Spawn(suite.collectionID, map[string]int{meta.DefaultResourceGroupName: 1}) suite.NoError(err) - replicas[0].AddNode(2) + replicas[0].AddRWNode(2) err = suite.meta.ReplicaManager.Put(replicas...) suite.NoError(err) } diff --git a/internal/querycoordv2/ops_services.go b/internal/querycoordv2/ops_services.go index 7647cf57e1..cb6853692c 100644 --- a/internal/querycoordv2/ops_services.go +++ b/internal/querycoordv2/ops_services.go @@ -276,9 +276,7 @@ func (s *Server) TransferSegment(ctx context.Context, req *querypb.TransferSegme // when no dst node specified, default to use all other nodes in same dstNodeSet := typeutil.NewUniqueSet() if req.GetToAllNodes() { - outboundNodes := s.meta.ResourceManager.CheckOutboundNodes(replica) - availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { return !outboundNodes.Contain(node) }) - dstNodeSet.Insert(availableNodes...) + dstNodeSet.Insert(replica.GetNodes()...) } else { // check whether dstNode is healthy if err := s.isStoppingNode(req.GetTargetNodeID()); err != nil { @@ -350,9 +348,7 @@ func (s *Server) TransferChannel(ctx context.Context, req *querypb.TransferChann // when no dst node specified, default to use all other nodes in same dstNodeSet := typeutil.NewUniqueSet() if req.GetToAllNodes() { - outboundNodes := s.meta.ResourceManager.CheckOutboundNodes(replica) - availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { return !outboundNodes.Contain(node) }) - dstNodeSet.Insert(availableNodes...) + dstNodeSet.Insert(replica.GetNodes()...) } else { // check whether dstNode is healthy if err := s.isStoppingNode(req.GetTargetNodeID()); err != nil { diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 487a284db6..1ddb4b6800 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -722,7 +722,7 @@ func (s *Server) handleNodeUp(node int64) { zap.String("resourceGroup", rgName), ) - utils.AddNodesToCollectionsInRG(s.meta, meta.DefaultResourceGroupName, node) + utils.RecoverAllCollection(s.meta) } func (s *Server) handleNodeDown(node int64) { @@ -776,7 +776,6 @@ func (s *Server) checkReplicas() { log := log.With(zap.Int64("collectionID", collection)) replicas := s.meta.ReplicaManager.GetByCollection(collection) for _, replica := range replicas { - replica := replica.Clone() toRemove := make([]int64, 0) for _, node := range replica.GetNodes() { if s.nodeMgr.Get(node) == nil { @@ -790,9 +789,7 @@ func (s *Server) checkReplicas() { zap.Int64s("offlineNodes", toRemove), ) log.Info("some nodes are offline, remove them from replica", zap.Any("toRemove", toRemove)) - replica.RemoveNode(toRemove...) - err := s.meta.ReplicaManager.Put(replica) - if err != nil { + if err := s.meta.ReplicaManager.RemoveNode(replica.GetID(), toRemove...); err != nil { log.Warn("failed to remove offline nodes from replica") } } diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 4b4d6108b3..b744616b0c 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -686,9 +686,7 @@ func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques // when no dst node specified, default to use all other nodes in same dstNodeSet := typeutil.NewUniqueSet() if len(req.GetDstNodeIDs()) == 0 { - outboundNodes := s.meta.ResourceManager.CheckOutboundNodes(replica) - availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { return !outboundNodes.Contain(node) }) - dstNodeSet.Insert(availableNodes...) + dstNodeSet.Insert(replica.GetNodes()...) } else { for _, dstNode := range req.GetDstNodeIDs() { if !replica.Contains(dstNode) { @@ -1086,30 +1084,14 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq return merr.Status(err), nil } - replicasInSource := s.meta.ReplicaManager.GetByResourceGroup(req.GetSourceResourceGroup()) - replicasInTarget := s.meta.ReplicaManager.GetByResourceGroup(req.GetTargetResourceGroup()) - loadSameCollection := false - sameCollectionID := int64(0) - for _, r1 := range replicasInSource { - for _, r2 := range replicasInTarget { - if r1.GetCollectionID() == r2.GetCollectionID() { - loadSameCollection = true - sameCollectionID = r1.GetCollectionID() - } - } - } - if loadSameCollection { - err := merr.WrapErrParameterInvalid("resource groups load not the same collection", fmt.Sprintf("collection %d loaded for both", sameCollectionID)) - return merr.Status(err), nil - } - - nodes, err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode())) + // Move node from source resource group to target resource group. + _, err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode())) if err != nil { log.Warn("failed to transfer node", zap.Error(err)) return merr.Status(err), nil } - - utils.AddNodesToCollectionsInRG(s.meta, req.GetTargetResourceGroup(), nodes...) + // Recover all replica on the source and target resource group. + utils.RecoverAllCollection(s.meta) return merr.Success(), nil } @@ -1127,6 +1109,7 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli return merr.Status(err), nil } + // TODO: !!!WARNING, replica manager and resource manager doesn't protected with each other by lock. if ok := s.meta.ResourceManager.ContainResourceGroup(req.GetSourceResourceGroup()); !ok { err := merr.WrapErrResourceGroupNotFound(req.GetSourceResourceGroup()) return merr.Status(errors.Wrap(err, @@ -1144,50 +1127,9 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli return merr.Status(err), nil } - replicas := s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetSourceResourceGroup()) - if len(replicas) < int(req.GetNumReplica()) { - err := merr.WrapErrParameterInvalid("NumReplica not greater than the number of replica in source resource group", fmt.Sprintf("only found [%d] replicas in source resource group[%s]", - len(replicas), req.GetSourceResourceGroup())) - return merr.Status(err), nil - } - - replicas = s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetTargetResourceGroup()) - if len(replicas) > 0 { - err := merr.WrapErrParameterInvalid("no same collection in target resource group", fmt.Sprintf("found [%d] replicas of same collection in target resource group[%s], dynamically increase replica num is unsupported", - len(replicas), req.GetTargetResourceGroup())) - return merr.Status(err), nil - } - - replicas = s.meta.ReplicaManager.GetByCollection(req.GetCollectionID()) - if (req.GetSourceResourceGroup() == meta.DefaultResourceGroupName || req.GetTargetResourceGroup() == meta.DefaultResourceGroupName) && - len(replicas) != int(req.GetNumReplica()) { - err := merr.WrapErrParameterInvalid("tranfer all replicas from/to default resource group", - fmt.Sprintf("try to transfer %d replicas from/to but %d replicas exist", req.GetNumReplica(), len(replicas))) - return merr.Status(err), nil - } - - err := s.transferReplica(req.GetTargetResourceGroup(), replicas[:req.GetNumReplica()]) - if err != nil { - return merr.Status(err), nil - } - - return merr.Success(), nil -} - -func (s *Server) transferReplica(targetRG string, replicas []*meta.Replica) error { - ret := make([]*meta.Replica, 0) - for _, replica := range replicas { - newReplica := replica.Clone() - newReplica.ResourceGroup = targetRG - - ret = append(ret, newReplica) - } - err := utils.AssignNodesToReplicas(s.meta, targetRG, ret...) - if err != nil { - return err - } - - return s.meta.ReplicaManager.Put(ret...) + // Apply change into replica manager. + err := s.meta.TransferReplica(req.GetCollectionID(), req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumReplica())) + return merr.Status(err), nil } func (s *Server) ListResourceGroups(ctx context.Context, req *milvuspb.ListResourceGroupsRequest) (*milvuspb.ListResourceGroupsResponse, error) { diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 39f2de6383..b193c01f15 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -541,13 +541,6 @@ func (suite *ServiceSuite) TestTransferNode() { }, typeutil.NewUniqueSet(), )) - resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{ - SourceResourceGroup: "rg1", - TargetResourceGroup: "rg2", - NumNode: 1, - }) - suite.NoError(err) - suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode) // test transfer node meet non-exist source rg resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{ @@ -745,7 +738,8 @@ func (suite *ServiceSuite) TestTransferReplica() { NumReplica: 1, }) suite.NoError(err) - suite.Contains(resp.Reason, "dynamically increase replica num is unsupported") + // we support dynamically increase replica num in resource group now. + suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success) resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{ SourceResourceGroup: meta.DefaultResourceGroupName, @@ -754,14 +748,24 @@ func (suite *ServiceSuite) TestTransferReplica() { NumReplica: 1, }) suite.NoError(err) - suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid) + // we support transfer replica to resource group load same collection. + suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success) replicaNum := len(suite.server.meta.ReplicaManager.GetByCollection(1)) + suite.Equal(3, replicaNum) resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{ SourceResourceGroup: meta.DefaultResourceGroupName, TargetResourceGroup: "rg3", CollectionID: 1, - NumReplica: int64(replicaNum), + NumReplica: 2, + }) + suite.NoError(err) + suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success) + resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{ + SourceResourceGroup: "rg1", + TargetResourceGroup: "rg3", + CollectionID: 1, + NumReplica: 1, }) suite.NoError(err) suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success) @@ -1232,8 +1236,8 @@ func (suite *ServiceSuite) TestLoadBalanceWithEmptySegmentList() { // update two collection's dist for _, collection := range suite.collections { replicas := suite.meta.ReplicaManager.GetByCollection(collection) - replicas[0].AddNode(srcNode) - replicas[0].AddNode(dstNode) + replicas[0].AddRWNode(srcNode) + replicas[0].AddRWNode(dstNode) suite.updateCollectionStatus(collection, querypb.LoadStatus_Loaded) for partition, segments := range suite.segments[collection] { @@ -1258,8 +1262,8 @@ func (suite *ServiceSuite) TestLoadBalanceWithEmptySegmentList() { defer func() { for _, collection := range suite.collections { replicas := suite.meta.ReplicaManager.GetByCollection(collection) - replicas[0].RemoveNode(srcNode) - replicas[0].RemoveNode(dstNode) + suite.meta.ReplicaManager.RemoveNode(replicas[0].GetID(), srcNode) + suite.meta.ReplicaManager.RemoveNode(replicas[0].GetID(), dstNode) } suite.nodeMgr.Remove(1001) suite.nodeMgr.Remove(1002) @@ -1377,7 +1381,7 @@ func (suite *ServiceSuite) TestLoadBalanceFailed() { suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode) suite.Contains(resp.Reason, "mock error") - suite.meta.ReplicaManager.AddNode(replicas[0].ID, 10) + suite.meta.ReplicaManager.RecoverNodesInCollection(collection, map[string]typeutil.UniqueSet{meta.DefaultResourceGroupName: typeutil.NewUniqueSet(10)}) req.SourceNodeIDs = []int64{10} resp, err = server.LoadBalance(ctx, req) suite.NoError(err) @@ -1399,7 +1403,7 @@ func (suite *ServiceSuite) TestLoadBalanceFailed() { suite.NoError(err) suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode) suite.nodeMgr.Remove(10) - suite.meta.ReplicaManager.RemoveNode(replicas[0].ID, 10) + suite.meta.ReplicaManager.RemoveNode(replicas[0].GetID(), 10) } } @@ -1697,8 +1701,8 @@ func (suite *ServiceSuite) TestHandleNodeUp() { })) server.handleNodeUp(111) nodes := suite.server.meta.ReplicaManager.Get(1).GetNodes() - suite.Len(nodes, 1) - suite.Equal(int64(111), nodes[0]) + nodesInRG, _ := suite.server.meta.ResourceManager.GetNodes(meta.DefaultResourceGroupName) + suite.ElementsMatch(nodes, nodesInRG) log.Info("handleNodeUp") // when more rg exist, new node shouldn't be assign to replica in default rg in handleNodeUp @@ -1710,9 +1714,8 @@ func (suite *ServiceSuite) TestHandleNodeUp() { })) server.handleNodeUp(222) nodes = suite.server.meta.ReplicaManager.Get(1).GetNodes() - suite.Len(nodes, 2) - suite.Contains(nodes, int64(111)) - suite.Contains(nodes, int64(222)) + nodesInRG, _ = suite.server.meta.ResourceManager.GetNodes(meta.DefaultResourceGroupName) + suite.ElementsMatch(nodes, nodesInRG) } func (suite *ServiceSuite) loadAll() { diff --git a/internal/querycoordv2/utils/meta.go b/internal/querycoordv2/utils/meta.go index b6c568f393..8be51b038b 100644 --- a/internal/querycoordv2/utils/meta.go +++ b/internal/querycoordv2/utils/meta.go @@ -17,9 +17,6 @@ package utils import ( - "math/rand" - "sort" - "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" @@ -28,6 +25,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) var ( @@ -72,7 +70,7 @@ func GroupNodesByReplica(replicaMgr *meta.ReplicaManager, collectionID int64, no for _, replica := range replicas { for _, node := range nodes { if replica.Contains(node) { - ret[replica.ID] = append(ret[replica.ID], node) + ret[replica.GetID()] = append(ret[replica.GetID()], node) } } } @@ -98,142 +96,91 @@ func GroupSegmentsByReplica(replicaMgr *meta.ReplicaManager, collectionID int64, for _, replica := range replicas { for _, segment := range segments { if replica.Contains(segment.Node) { - ret[replica.ID] = append(ret[replica.ID], segment) + ret[replica.GetID()] = append(ret[replica.GetID()], segment) } } } return ret } -// AssignNodesToReplicas assigns nodes to the given replicas, -// all given replicas must be the same collection, -// the given replicas have to be not in ReplicaManager -func AssignNodesToReplicas(m *meta.Meta, rgName string, replicas ...*meta.Replica) error { - replicaIDs := lo.Map(replicas, func(r *meta.Replica, _ int) int64 { return r.GetID() }) - log := log.With(zap.Int64("collectionID", replicas[0].GetCollectionID()), - zap.Int64s("replicas", replicaIDs), - zap.String("rgName", rgName), - ) - if len(replicaIDs) == 0 { - return nil - } - - nodeGroup, err := m.ResourceManager.GetNodes(rgName) - if err != nil { - log.Warn("failed to get nodes", zap.Error(err)) - return err - } - - if len(nodeGroup) < len(replicaIDs) { - log.Warn(meta.ErrNodeNotEnough.Error(), zap.Error(meta.ErrNodeNotEnough)) - return meta.ErrNodeNotEnough - } - - rand.Shuffle(len(nodeGroup), func(i, j int) { - nodeGroup[i], nodeGroup[j] = nodeGroup[j], nodeGroup[i] - }) - - log.Info("assign nodes to replicas", - zap.Int64s("nodes", nodeGroup), - ) - for i, node := range nodeGroup { - replicas[i%len(replicas)].AddNode(node) - } - - return nil -} - -// add nodes to all collections in rgName -// for each collection, add node to replica with least number of nodes -func AddNodesToCollectionsInRG(m *meta.Meta, rgName string, nodes ...int64) { - for _, node := range nodes { - for _, collection := range m.CollectionManager.GetAll() { - replica := m.ReplicaManager.GetByCollectionAndNode(collection, node) - if replica == nil { - replicas := m.ReplicaManager.GetByCollectionAndRG(collection, rgName) - AddNodesToReplicas(m, replicas, node) - } - } - } -} - -func AddNodesToReplicas(m *meta.Meta, replicas []*meta.Replica, node int64) { - if len(replicas) == 0 { +// RecoverReplicaOfCollection recovers all replica of collection with latest resource group. +func RecoverReplicaOfCollection(m *meta.Meta, collectionID typeutil.UniqueID) { + logger := log.With(zap.Int64("collectionID", collectionID)) + rgNames := m.ReplicaManager.GetResourceGroupByCollection(collectionID) + if rgNames.Len() == 0 { + logger.Error("no resource group found for collection", zap.Int64("collectionID", collectionID)) return } - sort.Slice(replicas, func(i, j int) bool { - return replicas[i].Len() < replicas[j].Len() - }) - replica := replicas[0] - // TODO(yah01): this may fail, need a component to check whether a node is assigned - err := m.ReplicaManager.AddNode(replica.GetID(), node) + rgs, err := m.ResourceManager.GetNodesOfMultiRG(rgNames.Collect()) if err != nil { - log.Warn("failed to assign node to replicas", - zap.Int64("collectionID", replica.GetCollectionID()), - zap.Int64("replicaID", replica.GetID()), - zap.Int64("nodeId", node), - zap.Error(err), - ) + logger.Error("unreachable code as expected, fail to get resource group for replica", zap.Error(err)) return } - log.Info("assign node to replica", - zap.Int64("collectionID", replica.GetCollectionID()), - zap.Int64("replicaID", replica.GetID()), - zap.Int64("nodeID", node), - ) + + if err := m.ReplicaManager.RecoverNodesInCollection(collectionID, rgs); err != nil { + logger.Warn("fail to set available nodes in replica", zap.Error(err)) + } } -// SpawnReplicas spawns replicas for given collection, assign nodes to them, and save them -func SpawnAllReplicasInRG(m *meta.Meta, collection int64, replicaNumber int32, rgName string) ([]*meta.Replica, error) { - replicas, err := m.ReplicaManager.Spawn(collection, replicaNumber, rgName) - if err != nil { - return nil, err +// RecoverAllCollectionrecovers all replica of all collection in resource group. +func RecoverAllCollection(m *meta.Meta) { + for _, collection := range m.CollectionManager.GetAll() { + RecoverReplicaOfCollection(m, collection) } - err = AssignNodesToReplicas(m, rgName, replicas...) - if err != nil { - return nil, err - } - return replicas, m.ReplicaManager.Put(replicas...) } -func checkResourceGroup(collectionID int64, replicaNumber int32, resourceGroups []string) error { +func checkResourceGroup(m *meta.Meta, resourceGroups []string, replicaNumber int32) (map[string]int, error) { if len(resourceGroups) != 0 && len(resourceGroups) != 1 && len(resourceGroups) != int(replicaNumber) { - return ErrUseWrongNumRG + return nil, ErrUseWrongNumRG } - return nil + replicaNumInRG := make(map[string]int) + if len(resourceGroups) == 0 { + // All replicas should be spawned in default resource group. + replicaNumInRG[meta.DefaultResourceGroupName] = int(replicaNumber) + } else if len(resourceGroups) == 1 { + // All replicas should be spawned in the given resource group. + replicaNumInRG[resourceGroups[0]] = int(replicaNumber) + } else { + // replicas should be spawned in different resource groups one by one. + for _, rgName := range resourceGroups { + replicaNumInRG[rgName] += 1 + } + } + + // TODO: !!!Warning, ResourceManager and ReplicaManager doesn't protected with each other in concurrent operation. + // 1. replica1 got rg1's node snapshot but doesn't spawn finished. + // 2. rg1 is removed. + // 3. replica1 spawn finished, but cannot find related resource group. + for rgName, num := range replicaNumInRG { + if !m.ContainResourceGroup(rgName) { + return nil, ErrGetNodesFromRG + } + nodes, err := m.ResourceManager.GetNodes(rgName) + if err != nil { + return nil, err + } + if num > len(nodes) { + log.Warn("node not enough", zap.Error(meta.ErrNodeNotEnough), zap.Int("replicaNum", num), zap.Int("nodeNum", len(nodes)), zap.String("rgName", rgName)) + return nil, meta.ErrNodeNotEnough + } + } + return replicaNumInRG, nil } +// SpawnReplicasWithRG spawns replicas in rgs one by one for given collection. func SpawnReplicasWithRG(m *meta.Meta, collection int64, resourceGroups []string, replicaNumber int32) ([]*meta.Replica, error) { - if err := checkResourceGroup(collection, replicaNumber, resourceGroups); err != nil { + replicaNumInRG, err := checkResourceGroup(m, resourceGroups, replicaNumber) + if err != nil { return nil, err } - if len(resourceGroups) == 0 { - return SpawnAllReplicasInRG(m, collection, replicaNumber, meta.DefaultResourceGroupName) + // Spawn it in replica manager. + replicas, err := m.ReplicaManager.Spawn(collection, replicaNumInRG) + if err != nil { + return nil, err } - - if len(resourceGroups) == 1 { - return SpawnAllReplicasInRG(m, collection, replicaNumber, resourceGroups[0]) - } - - replicaSet := make([]*meta.Replica, 0) - for _, rgName := range resourceGroups { - if !m.ResourceManager.ContainResourceGroup(rgName) { - return nil, merr.WrapErrResourceGroupNotFound(rgName) - } - - replicas, err := m.ReplicaManager.Spawn(collection, 1, rgName) - if err != nil { - return nil, err - } - - err = AssignNodesToReplicas(m, rgName, replicas...) - if err != nil { - return nil, err - } - replicaSet = append(replicaSet, replicas...) - } - - return replicaSet, m.ReplicaManager.Put(replicaSet...) + // Active recover it. + RecoverReplicaOfCollection(m, collection) + return replicas, nil } diff --git a/internal/querycoordv2/utils/meta_test.go b/internal/querycoordv2/utils/meta_test.go index ffc095acc8..7879793910 100644 --- a/internal/querycoordv2/utils/meta_test.go +++ b/internal/querycoordv2/utils/meta_test.go @@ -93,14 +93,14 @@ func TestSpawnReplicasWithRG(t *testing.T) { { name: "test 3 replica on 2 rg", - args: args{m, 1000, []string{"rg1", "rg2"}, 3}, + args: args{m, 1001, []string{"rg1", "rg2"}, 3}, wantReplicaNum: 0, wantErr: true, }, { name: "test 3 replica on 3 rg", - args: args{m, 1000, []string{"rg1", "rg2", "rg3"}, 3}, + args: args{m, 1002, []string{"rg1", "rg2", "rg3"}, 3}, wantReplicaNum: 3, wantErr: false, }, @@ -127,6 +127,7 @@ func TestAddNodesToCollectionsInRGFailed(t *testing.T) { store.EXPECT().SaveCollection(mock.Anything).Return(nil) store.EXPECT().SaveReplica(mock.Anything).Return(nil).Times(4) store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil) + store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(nil) nodeMgr := session.NewNodeManager() m := meta.NewMeta(RandomIncrementIDAllocator(), store, nodeMgr) m.ResourceManager.AddResourceGroup("rg") @@ -174,7 +175,7 @@ func TestAddNodesToCollectionsInRGFailed(t *testing.T) { storeErr := errors.New("store error") store.EXPECT().SaveReplica(mock.Anything).Return(storeErr) - AddNodesToCollectionsInRG(m, "rg", []int64{1, 2, 3, 4}...) + RecoverAllCollection(m) assert.Len(t, m.ReplicaManager.Get(1).GetNodes(), 0) assert.Len(t, m.ReplicaManager.Get(2).GetNodes(), 0) @@ -188,7 +189,9 @@ func TestAddNodesToCollectionsInRG(t *testing.T) { store := mocks.NewQueryCoordCatalog(t) store.EXPECT().SaveCollection(mock.Anything).Return(nil) store.EXPECT().SaveReplica(mock.Anything).Return(nil) + store.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil) store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil) + store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(nil) nodeMgr := session.NewNodeManager() m := meta.NewMeta(RandomIncrementIDAllocator(), store, nodeMgr) m.ResourceManager.AddResourceGroup("rg") @@ -233,8 +236,33 @@ func TestAddNodesToCollectionsInRG(t *testing.T) { }, typeutil.NewUniqueSet(), )) - - AddNodesToCollectionsInRG(m, "rg", []int64{1, 2, 3, 4}...) + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + })) + _, err := m.ResourceManager.HandleNodeUp(1) + assert.NoError(t, err) + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + })) + _, err = m.ResourceManager.HandleNodeUp(2) + assert.NoError(t, err) + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 3, + Address: "localhost", + })) + _, err = m.ResourceManager.HandleNodeUp(3) + assert.NoError(t, err) + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 4, + Address: "localhost", + })) + _, err = m.ResourceManager.HandleNodeUp(4) + assert.NoError(t, err) + _, err = m.ResourceManager.TransferNode(meta.DefaultResourceGroupName, "rg", 4) + assert.NoError(t, err) + RecoverAllCollection(m) assert.Len(t, m.ReplicaManager.Get(1).GetNodes(), 2) assert.Len(t, m.ReplicaManager.Get(2).GetNodes(), 2) diff --git a/pkg/util/typeutil/set.go b/pkg/util/typeutil/set.go index a760472001..ea7e145aa5 100644 --- a/pkg/util/typeutil/set.go +++ b/pkg/util/typeutil/set.go @@ -109,6 +109,24 @@ func (set Set[T]) Len() int { return len(set) } +// Range iterates over elements in the set +func (set Set[T]) Range(f func(element T) bool) { + for elem := range set { + if !f(elem) { + break + } + } +} + +// Clone returns a new set with the same elements +func (set Set[T]) Clone() Set[T] { + ret := make(Set[T], set.Len()) + for elem := range set { + ret.Insert(elem) + } + return ret +} + type ConcurrentSet[T comparable] struct { inner sync.Map } diff --git a/pkg/util/typeutil/set_test.go b/pkg/util/typeutil/set_test.go index 97438204e3..82bb69b81b 100644 --- a/pkg/util/typeutil/set_test.go +++ b/pkg/util/typeutil/set_test.go @@ -35,6 +35,19 @@ func TestUniqueSet(t *testing.T) { assert.False(t, set.Contain(7)) assert.True(t, set.Contain(9)) assert.False(t, set.Contain(5, 7, 9)) + + count := 0 + set.Range(func(element UniqueID) bool { + count++ + return true + }) + assert.Equal(t, set.Len(), count) + count = 0 + set.Range(func(element UniqueID) bool { + count++ + return false + }) + assert.Equal(t, 1, count) } func TestUniqueSetClear(t *testing.T) {