diff --git a/internal/distributed/querycoord/client/client.go b/internal/distributed/querycoord/client/client.go index d694a8d0b7..97ebaf3cb6 100644 --- a/internal/distributed/querycoord/client/client.go +++ b/internal/distributed/querycoord/client/client.go @@ -512,3 +512,14 @@ func (c *Client) CheckQueryNodeDistribution(ctx context.Context, req *querypb.Ch return client.CheckQueryNodeDistribution(ctx, req) }) } + +func (c *Client) UpdateLoadConfig(ctx context.Context, req *querypb.UpdateLoadConfigRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), + ) + return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*commonpb.Status, error) { + return client.UpdateLoadConfig(ctx, req) + }) +} diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index 2e77561dde..c0cc568b0d 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -484,3 +484,7 @@ func (s *Server) TransferChannel(ctx context.Context, req *querypb.TransferChann func (s *Server) CheckQueryNodeDistribution(ctx context.Context, req *querypb.CheckQueryNodeDistributionRequest) (*commonpb.Status, error) { return s.queryCoord.CheckQueryNodeDistribution(ctx, req) } + +func (s *Server) UpdateLoadConfig(ctx context.Context, req *querypb.UpdateLoadConfigRequest) (*commonpb.Status, error) { + return s.queryCoord.UpdateLoadConfig(ctx, req) +} diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index 0fad2abb13..a3121eb9fe 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -186,7 +186,7 @@ type QueryCoordCatalog interface { ReleaseCollection(collection int64) error ReleasePartition(collection int64, partitions ...int64) error ReleaseReplicas(collectionID int64) error - ReleaseReplica(collection, replica int64) error + ReleaseReplica(collection int64, replicas ...int64) error SaveResourceGroup(rgs ...*querypb.ResourceGroup) error RemoveResourceGroup(rgName string) error GetResourceGroups() ([]*querypb.ResourceGroup, error) diff --git a/internal/metastore/kv/querycoord/kv_catalog.go b/internal/metastore/kv/querycoord/kv_catalog.go index aee78b5392..ce546d5234 100644 --- a/internal/metastore/kv/querycoord/kv_catalog.go +++ b/internal/metastore/kv/querycoord/kv_catalog.go @@ -240,9 +240,26 @@ func (s Catalog) ReleaseReplicas(collectionID int64) error { return s.cli.RemoveWithPrefix(key) } -func (s Catalog) ReleaseReplica(collection, replica int64) error { - key := encodeReplicaKey(collection, replica) - return s.cli.Remove(key) +func (s Catalog) ReleaseReplica(collection int64, replicas ...int64) error { + keys := lo.Map(replicas, func(replica int64, _ int) string { + return encodeReplicaKey(collection, replica) + }) + if len(replicas) >= MetaOpsBatchSize { + index := 0 + for index < len(replicas) { + endIndex := index + MetaOpsBatchSize + if endIndex > len(replicas) { + endIndex = len(replicas) + } + err := s.cli.MultiRemove(keys[index:endIndex]) + if err != nil { + return err + } + index = endIndex + } + return nil + } + return s.cli.MultiRemove(keys) } func (s Catalog) SaveCollectionTargets(targets ...*querypb.CollectionTarget) error { diff --git a/internal/metastore/mocks/mock_querycoord_catalog.go b/internal/metastore/mocks/mock_querycoord_catalog.go index 92c1d3efb7..3ce66e8441 100644 --- a/internal/metastore/mocks/mock_querycoord_catalog.go +++ b/internal/metastore/mocks/mock_querycoord_catalog.go @@ -384,13 +384,20 @@ func (_c *QueryCoordCatalog_ReleasePartition_Call) RunAndReturn(run func(int64, return _c } -// ReleaseReplica provides a mock function with given fields: collection, replica -func (_m *QueryCoordCatalog) ReleaseReplica(collection int64, replica int64) error { - ret := _m.Called(collection, replica) +// ReleaseReplica provides a mock function with given fields: collection, replicas +func (_m *QueryCoordCatalog) ReleaseReplica(collection int64, replicas ...int64) error { + _va := make([]interface{}, len(replicas)) + for _i := range replicas { + _va[_i] = replicas[_i] + } + var _ca []interface{} + _ca = append(_ca, collection) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) var r0 error - if rf, ok := ret.Get(0).(func(int64, int64) error); ok { - r0 = rf(collection, replica) + if rf, ok := ret.Get(0).(func(int64, ...int64) error); ok { + r0 = rf(collection, replicas...) } else { r0 = ret.Error(0) } @@ -405,14 +412,21 @@ type QueryCoordCatalog_ReleaseReplica_Call struct { // ReleaseReplica is a helper method to define mock.On call // - collection int64 -// - replica int64 -func (_e *QueryCoordCatalog_Expecter) ReleaseReplica(collection interface{}, replica interface{}) *QueryCoordCatalog_ReleaseReplica_Call { - return &QueryCoordCatalog_ReleaseReplica_Call{Call: _e.mock.On("ReleaseReplica", collection, replica)} +// - replicas ...int64 +func (_e *QueryCoordCatalog_Expecter) ReleaseReplica(collection interface{}, replicas ...interface{}) *QueryCoordCatalog_ReleaseReplica_Call { + return &QueryCoordCatalog_ReleaseReplica_Call{Call: _e.mock.On("ReleaseReplica", + append([]interface{}{collection}, replicas...)...)} } -func (_c *QueryCoordCatalog_ReleaseReplica_Call) Run(run func(collection int64, replica int64)) *QueryCoordCatalog_ReleaseReplica_Call { +func (_c *QueryCoordCatalog_ReleaseReplica_Call) Run(run func(collection int64, replicas ...int64)) *QueryCoordCatalog_ReleaseReplica_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64), args[1].(int64)) + variadicArgs := make([]int64, len(args)-1) + for i, a := range args[1:] { + if a != nil { + variadicArgs[i] = a.(int64) + } + } + run(args[0].(int64), variadicArgs...) }) return _c } @@ -422,7 +436,7 @@ func (_c *QueryCoordCatalog_ReleaseReplica_Call) Return(_a0 error) *QueryCoordCa return _c } -func (_c *QueryCoordCatalog_ReleaseReplica_Call) RunAndReturn(run func(int64, int64) error) *QueryCoordCatalog_ReleaseReplica_Call { +func (_c *QueryCoordCatalog_ReleaseReplica_Call) RunAndReturn(run func(int64, ...int64) error) *QueryCoordCatalog_ReleaseReplica_Call { _c.Call.Return(run) return _c } diff --git a/internal/mocks/mock_querycoord.go b/internal/mocks/mock_querycoord.go index 0b3c3080cb..5ed0533e63 100644 --- a/internal/mocks/mock_querycoord.go +++ b/internal/mocks/mock_querycoord.go @@ -2394,6 +2394,61 @@ func (_c *MockQueryCoord_TransferSegment_Call) RunAndReturn(run func(context.Con return _c } +// UpdateLoadConfig provides a mock function with given fields: _a0, _a1 +func (_m *MockQueryCoord) UpdateLoadConfig(_a0 context.Context, _a1 *querypb.UpdateLoadConfigRequest) (*commonpb.Status, error) { + ret := _m.Called(_a0, _a1) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *querypb.UpdateLoadConfigRequest) (*commonpb.Status, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *querypb.UpdateLoadConfigRequest) *commonpb.Status); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *querypb.UpdateLoadConfigRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryCoord_UpdateLoadConfig_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateLoadConfig' +type MockQueryCoord_UpdateLoadConfig_Call struct { + *mock.Call +} + +// UpdateLoadConfig is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *querypb.UpdateLoadConfigRequest +func (_e *MockQueryCoord_Expecter) UpdateLoadConfig(_a0 interface{}, _a1 interface{}) *MockQueryCoord_UpdateLoadConfig_Call { + return &MockQueryCoord_UpdateLoadConfig_Call{Call: _e.mock.On("UpdateLoadConfig", _a0, _a1)} +} + +func (_c *MockQueryCoord_UpdateLoadConfig_Call) Run(run func(_a0 context.Context, _a1 *querypb.UpdateLoadConfigRequest)) *MockQueryCoord_UpdateLoadConfig_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*querypb.UpdateLoadConfigRequest)) + }) + return _c +} + +func (_c *MockQueryCoord_UpdateLoadConfig_Call) Return(_a0 *commonpb.Status, _a1 error) *MockQueryCoord_UpdateLoadConfig_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryCoord_UpdateLoadConfig_Call) RunAndReturn(run func(context.Context, *querypb.UpdateLoadConfigRequest) (*commonpb.Status, error)) *MockQueryCoord_UpdateLoadConfig_Call { + _c.Call.Return(run) + return _c +} + // UpdateResourceGroups provides a mock function with given fields: _a0, _a1 func (_m *MockQueryCoord) UpdateResourceGroups(_a0 context.Context, _a1 *querypb.UpdateResourceGroupsRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_querycoord_client.go b/internal/mocks/mock_querycoord_client.go index 240f00ab34..7d99a324bb 100644 --- a/internal/mocks/mock_querycoord_client.go +++ b/internal/mocks/mock_querycoord_client.go @@ -2592,6 +2592,76 @@ func (_c *MockQueryCoordClient_TransferSegment_Call) RunAndReturn(run func(conte return _c } +// UpdateLoadConfig provides a mock function with given fields: ctx, in, opts +func (_m *MockQueryCoordClient) UpdateLoadConfig(ctx context.Context, in *querypb.UpdateLoadConfigRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *querypb.UpdateLoadConfigRequest, ...grpc.CallOption) (*commonpb.Status, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *querypb.UpdateLoadConfigRequest, ...grpc.CallOption) *commonpb.Status); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *querypb.UpdateLoadConfigRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryCoordClient_UpdateLoadConfig_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateLoadConfig' +type MockQueryCoordClient_UpdateLoadConfig_Call struct { + *mock.Call +} + +// UpdateLoadConfig is a helper method to define mock.On call +// - ctx context.Context +// - in *querypb.UpdateLoadConfigRequest +// - opts ...grpc.CallOption +func (_e *MockQueryCoordClient_Expecter) UpdateLoadConfig(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryCoordClient_UpdateLoadConfig_Call { + return &MockQueryCoordClient_UpdateLoadConfig_Call{Call: _e.mock.On("UpdateLoadConfig", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockQueryCoordClient_UpdateLoadConfig_Call) Run(run func(ctx context.Context, in *querypb.UpdateLoadConfigRequest, opts ...grpc.CallOption)) *MockQueryCoordClient_UpdateLoadConfig_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*querypb.UpdateLoadConfigRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockQueryCoordClient_UpdateLoadConfig_Call) Return(_a0 *commonpb.Status, _a1 error) *MockQueryCoordClient_UpdateLoadConfig_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryCoordClient_UpdateLoadConfig_Call) RunAndReturn(run func(context.Context, *querypb.UpdateLoadConfigRequest, ...grpc.CallOption) (*commonpb.Status, error)) *MockQueryCoordClient_UpdateLoadConfig_Call { + _c.Call.Return(run) + return _c +} + // UpdateResourceGroups provides a mock function with given fields: ctx, in, opts func (_m *MockQueryCoordClient) UpdateResourceGroups(ctx context.Context, in *querypb.UpdateResourceGroupsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { _va := make([]interface{}, len(opts)) diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index 89201dc918..51220841b2 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -105,6 +105,8 @@ service QueryCoord { rpc TransferSegment(TransferSegmentRequest) returns (common.Status) {} rpc TransferChannel(TransferChannelRequest) returns (common.Status) {} rpc CheckQueryNodeDistribution(CheckQueryNodeDistributionRequest) returns (common.Status) {} + + rpc UpdateLoadConfig(UpdateLoadConfigRequest) returns (common.Status) {} } service QueryNode { @@ -892,4 +894,10 @@ message CheckQueryNodeDistributionRequest { int64 target_nodeID = 4; } - +message UpdateLoadConfigRequest { + common.MsgBase base = 1; + int64 dbID = 2; + repeated int64 collectionIDs = 3; + int32 replica_number = 4; + repeated string resource_groups = 5; +} diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index cef0e4cd61..78860f56f3 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -20,6 +20,7 @@ import ( "context" "time" + "github.com/samber/lo" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -101,12 +102,15 @@ func (c *ChannelChecker) Check(ctx context.Context) []task.Task { // clean node which has been move out from replica for _, nodeInfo := range c.nodeMgr.GetAll() { nodeID := nodeInfo.ID() - replicas := c.meta.ReplicaManager.GetByNode(nodeID) - channels := c.dist.ChannelDistManager.GetByFilter(meta.WithNodeID2Channel(nodeID)) - if len(replicas) == 0 && len(channels) != 0 { - reduceTasks := c.createChannelReduceTasks(ctx, channels, meta.NilReplica) - task.SetReason("dirty channel exists", reduceTasks...) - tasks = append(tasks, reduceTasks...) + channelOnQN := c.dist.ChannelDistManager.GetByFilter(meta.WithNodeID2Channel(nodeID)) + collectionChannels := lo.GroupBy(channelOnQN, func(ch *meta.DmChannel) int64 { return ch.CollectionID }) + for collectionID, channels := range collectionChannels { + replica := c.meta.ReplicaManager.GetByCollectionAndNode(collectionID, nodeID) + if replica == nil { + reduceTasks := c.createChannelReduceTasks(ctx, channels, meta.NilReplica) + task.SetReason("dirty channel exists", reduceTasks...) + tasks = append(tasks, reduceTasks...) + } } } return tasks diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 8c0389950a..184e562c58 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -107,13 +107,16 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task { // clean node which has been move out from replica for _, nodeInfo := range c.nodeMgr.GetAll() { nodeID := nodeInfo.ID() - replicas := c.meta.ReplicaManager.GetByNode(nodeID) - segments := c.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID)) - if len(replicas) == 0 && len(segments) != 0 { - reduceTasks := c.createSegmentReduceTasks(ctx, segments, meta.NilReplica, querypb.DataScope_Historical) - task.SetReason("dirty segment exists", reduceTasks...) - task.SetPriority(task.TaskPriorityNormal, reduceTasks...) - results = append(results, reduceTasks...) + segmentsOnQN := c.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID)) + collectionSegments := lo.GroupBy(segmentsOnQN, func(segment *meta.Segment) int64 { return segment.GetCollectionID() }) + for collectionID, segments := range collectionSegments { + replica := c.meta.ReplicaManager.GetByCollectionAndNode(collectionID, nodeID) + if replica == nil { + reduceTasks := c.createSegmentReduceTasks(ctx, segments, meta.NilReplica, querypb.DataScope_Historical) + task.SetReason("dirty segment exists", reduceTasks...) + task.SetPriority(task.TaskPriorityNormal, reduceTasks...) + results = append(results, reduceTasks...) + } } } diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 7e4d6170fc..4f438a9da1 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -92,6 +92,10 @@ func (job *LoadCollectionJob) PreExecute() error { req.ReplicaNumber = 1 } + if len(req.GetResourceGroups()) == 0 { + req.ResourceGroups = []string{meta.DefaultResourceGroupName} + } + collection := job.meta.GetCollection(req.GetCollectionID()) if collection == nil { return nil @@ -112,6 +116,14 @@ func (job *LoadCollectionJob) PreExecute() error { ) return merr.WrapErrParameterInvalid(collection.GetLoadFields(), req.GetLoadFields(), "can't change the load field list for loaded collection") } + collectionUsedRG := job.meta.ReplicaManager.GetResourceGroupByCollection(collection.GetCollectionID()).Collect() + left, right := lo.Difference(collectionUsedRG, req.GetResourceGroups()) + if len(left) > 0 || len(right) > 0 { + msg := fmt.Sprintf("collection with different resource groups %v existed, release this collection first before changing its resource groups", + collectionUsedRG) + log.Warn(msg) + return merr.WrapErrParameterInvalid(collectionUsedRG, req.GetResourceGroups(), "can't change the resource groups for loaded partitions") + } return nil } @@ -287,6 +299,10 @@ func (job *LoadPartitionJob) PreExecute() error { req.ReplicaNumber = 1 } + if len(req.GetResourceGroups()) == 0 { + req.ResourceGroups = []string{meta.DefaultResourceGroupName} + } + collection := job.meta.GetCollection(req.GetCollectionID()) if collection == nil { return nil @@ -305,6 +321,14 @@ func (job *LoadPartitionJob) PreExecute() error { ) return merr.WrapErrParameterInvalid(collection.GetLoadFields(), req.GetLoadFields(), "can't change the load field list for loaded collection") } + collectionUsedRG := job.meta.ReplicaManager.GetResourceGroupByCollection(collection.GetCollectionID()).Collect() + left, right := lo.Difference(collectionUsedRG, req.GetResourceGroups()) + if len(left) > 0 || len(right) > 0 { + msg := fmt.Sprintf("collection with different resource groups %v existed, release this collection first before changing its resource groups", + collectionUsedRG) + log.Warn(msg) + return merr.WrapErrParameterInvalid(collectionUsedRG, req.GetResourceGroups(), "can't change the resource groups for loaded partitions") + } return nil } diff --git a/internal/querycoordv2/job/job_update.go b/internal/querycoordv2/job/job_update.go new file mode 100644 index 0000000000..cb60af36fa --- /dev/null +++ b/internal/querycoordv2/job/job_update.go @@ -0,0 +1,176 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package job + +import ( + "context" + + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/observers" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +type UpdateLoadConfigJob struct { + *BaseJob + collectionID int64 + newReplicaNumber int32 + newResourceGroups []string + meta *meta.Meta + targetMgr meta.TargetManagerInterface + targetObserver *observers.TargetObserver + collectionObserver *observers.CollectionObserver +} + +func NewUpdateLoadConfigJob(ctx context.Context, + req *querypb.UpdateLoadConfigRequest, + meta *meta.Meta, + targetMgr meta.TargetManagerInterface, + targetObserver *observers.TargetObserver, + collectionObserver *observers.CollectionObserver, +) *UpdateLoadConfigJob { + collectionID := req.GetCollectionIDs()[0] + return &UpdateLoadConfigJob{ + BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), collectionID), + meta: meta, + targetMgr: targetMgr, + targetObserver: targetObserver, + collectionObserver: collectionObserver, + collectionID: collectionID, + newReplicaNumber: req.GetReplicaNumber(), + newResourceGroups: req.GetResourceGroups(), + } +} + +func (job *UpdateLoadConfigJob) Execute() error { + if !job.meta.CollectionManager.Exist(job.collectionID) { + msg := "modify replica for unloaded collection is not supported" + err := merr.WrapErrCollectionNotLoaded(msg) + log.Warn(msg, zap.Error(err)) + return err + } + + // 1. check replica parameters + if job.newReplicaNumber == 0 { + msg := "set replica number to 0 for loaded collection is not supported" + err := merr.WrapErrParameterInvalidMsg(msg) + log.Warn(msg, zap.Error(err)) + return err + } + + if len(job.newResourceGroups) == 0 { + job.newResourceGroups = []string{meta.DefaultResourceGroupName} + } + + var err error + // 2. reassign + toSpawn, toTransfer, toRelease, err := utils.ReassignReplicaToRG(job.meta, job.collectionID, job.newReplicaNumber, job.newResourceGroups) + if err != nil { + log.Warn("failed to reassign replica", zap.Error(err)) + return err + } + + log.Info("reassign replica", + zap.Int64("collectionID", job.collectionID), + zap.Int32("replicaNumber", job.newReplicaNumber), + zap.Strings("resourceGroups", job.newResourceGroups), + zap.Any("toSpawn", toSpawn), + zap.Any("toTransfer", toTransfer), + zap.Any("toRelease", toRelease)) + + // 3. try to spawn new replica + channels := job.targetMgr.GetDmChannelsByCollection(job.collectionID, meta.CurrentTargetFirst) + newReplicas, spawnErr := job.meta.ReplicaManager.Spawn(job.collectionID, toSpawn, lo.Keys(channels)) + if spawnErr != nil { + log.Warn("failed to spawn replica", zap.Error(spawnErr)) + err := spawnErr + return err + } + defer func() { + if err != nil { + // roll back replica from meta + replicaIDs := lo.Map(newReplicas, func(r *meta.Replica, _ int) int64 { return r.GetID() }) + err := job.meta.ReplicaManager.RemoveReplicas(job.collectionID, replicaIDs...) + if err != nil { + log.Warn("failed to remove replicas", zap.Int64s("replicaIDs", replicaIDs), zap.Error(err)) + } + } + }() + + // 4. try to transfer replicas + replicaOldRG := make(map[int64]string) + for rg, replicas := range toTransfer { + collectionReplicas := lo.GroupBy(replicas, func(r *meta.Replica) int64 { return r.GetCollectionID() }) + for collectionID, replicas := range collectionReplicas { + for _, replica := range replicas { + replicaOldRG[replica.GetID()] = replica.GetResourceGroup() + } + + if transferErr := job.meta.ReplicaManager.MoveReplica(rg, replicas); transferErr != nil { + log.Warn("failed to transfer replica for collection", zap.Int64("collectionID", collectionID), zap.Error(transferErr)) + err = transferErr + return err + } + } + } + defer func() { + if err != nil { + for _, replicas := range toTransfer { + for _, replica := range replicas { + oldRG := replicaOldRG[replica.GetID()] + if replica.GetResourceGroup() != oldRG { + if err := job.meta.ReplicaManager.TransferReplica(replica.GetID(), replica.GetResourceGroup(), oldRG, 1); err != nil { + log.Warn("failed to roll back replicas", zap.Int64("replica", replica.GetID()), zap.Error(err)) + } + } + } + } + } + }() + + // 5. remove replica from meta + err = job.meta.ReplicaManager.RemoveReplicas(job.collectionID, toRelease...) + if err != nil { + log.Warn("failed to remove replicas", zap.Int64s("replicaIDs", toRelease), zap.Error(err)) + return err + } + + // 6. recover node distribution among replicas + utils.RecoverReplicaOfCollection(job.meta, job.collectionID) + + // 7. update replica number in meta + err = job.meta.UpdateReplicaNumber(job.collectionID, job.newReplicaNumber) + if err != nil { + msg := "failed to update replica number" + log.Warn(msg, zap.Error(err)) + return err + } + + // 8. update next target, no need to rollback if pull target failed, target observer will pull target in periodically + _, err = job.targetObserver.UpdateNextTarget(job.collectionID) + if err != nil { + msg := "failed to update next target" + log.Warn(msg, zap.Error(err)) + } + + return nil +} diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index c038a2ba7b..3ad7f8984a 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -637,3 +637,25 @@ func (m *CollectionManager) removePartition(collectionID typeutil.UniqueID, part return nil } + +func (m *CollectionManager) UpdateReplicaNumber(collectionID typeutil.UniqueID, replicaNumber int32) error { + m.rwmutex.Lock() + defer m.rwmutex.Unlock() + + collection, ok := m.collections[collectionID] + if !ok { + return merr.WrapErrCollectionNotFound(collectionID) + } + newCollection := collection.Clone() + newCollection.ReplicaNumber = replicaNumber + + partitions := m.getPartitionsByCollection(collectionID) + newPartitions := make([]*Partition, 0, len(partitions)) + for _, partition := range partitions { + newPartition := partition.Clone() + newPartition.ReplicaNumber = replicaNumber + newPartitions = append(newPartitions, newPartition) + } + + return m.putCollection(true, newCollection, newPartitions...) +} diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index 2a947a6532..f59bc39cf3 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -98,9 +98,6 @@ func (m *ReplicaManager) Get(id typeutil.UniqueID) *Replica { func (m *ReplicaManager) Spawn(collection int64, replicaNumInRG map[string]int, channels []string) ([]*Replica, error) { m.rwmutex.Lock() defer m.rwmutex.Unlock() - if m.collIDToReplicaIDs[collection] != nil { - return nil, fmt.Errorf("replicas of collection %d is already spawned", collection) - } balancePolicy := paramtable.Get().QueryCoordCfg.Balancer.GetValue() enableChannelExclusiveMode := balancePolicy == ChannelLevelScoreBalancerName @@ -202,6 +199,21 @@ func (m *ReplicaManager) TransferReplica(collectionID typeutil.UniqueID, srcRGNa return m.put(replicas...) } +func (m *ReplicaManager) MoveReplica(dstRGName string, toMove []*Replica) error { + m.rwmutex.Lock() + defer m.rwmutex.Unlock() + replicas := make([]*Replica, 0, len(toMove)) + replicaIDs := make([]int64, 0) + for _, replica := range toMove { + mutableReplica := replica.CopyForWrite() + mutableReplica.SetResourceGroup(dstRGName) + replicas = append(replicas, mutableReplica.IntoReplica()) + replicaIDs = append(replicaIDs, replica.GetID()) + } + log.Info("move replicas to resource group", zap.String("dstRGName", dstRGName), zap.Int64s("replicas", replicaIDs)) + return m.put(replicas...) +} + // getSrcReplicasAndCheckIfTransferable checks if the collection can be transfer from srcRGName to dstRGName. func (m *ReplicaManager) getSrcReplicasAndCheckIfTransferable(collectionID typeutil.UniqueID, srcRGName string, replicaNum int) ([]*Replica, error) { // Check if collection is loaded. @@ -244,10 +256,40 @@ func (m *ReplicaManager) RemoveCollection(collectionID typeutil.UniqueID) error return nil } +func (m *ReplicaManager) RemoveReplicas(collectionID typeutil.UniqueID, replicas ...typeutil.UniqueID) error { + m.rwmutex.Lock() + defer m.rwmutex.Unlock() + + log.Info("release replicas", zap.Int64("collectionID", collectionID), zap.Int64s("replicas", replicas)) + + return m.removeReplicas(collectionID, replicas...) +} + +func (m *ReplicaManager) removeReplicas(collectionID typeutil.UniqueID, replicas ...typeutil.UniqueID) error { + err := m.catalog.ReleaseReplica(collectionID, replicas...) + if err != nil { + return err + } + + for _, replica := range replicas { + delete(m.replicas, replica) + } + + m.collIDToReplicaIDs[collectionID].Remove(replicas...) + if m.collIDToReplicaIDs[collectionID].Len() == 0 { + delete(m.collIDToReplicaIDs, collectionID) + } + + return nil +} + func (m *ReplicaManager) GetByCollection(collectionID typeutil.UniqueID) []*Replica { m.rwmutex.RLock() defer m.rwmutex.RUnlock() + return m.getByCollection(collectionID) +} +func (m *ReplicaManager) getByCollection(collectionID typeutil.UniqueID) []*Replica { replicas := make([]*Replica, 0) if m.collIDToReplicaIDs[collectionID] != nil { for replicaID := range m.collIDToReplicaIDs[collectionID] { diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index 45b44201d9..060f287bc1 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -349,6 +349,22 @@ func (rm *ResourceManager) GetNodes(rgName string) ([]int64, error) { return rm.groups[rgName].GetNodes(), nil } +// GetResourceGroupByNodeID return whether resource group's node match required node count +func (rm *ResourceManager) VerifyNodeCount(requiredNodeCount map[string]int) error { + rm.rwmutex.RLock() + defer rm.rwmutex.RUnlock() + for rgName, nodeCount := range requiredNodeCount { + if rm.groups[rgName] == nil { + return merr.WrapErrResourceGroupNotFound(rgName) + } + if rm.groups[rgName].NodeNum() != nodeCount { + return ErrNodeNotEnough + } + } + + return nil +} + // GetOutgoingNodeNumByReplica return outgoing node num on each rg from this replica. func (rm *ResourceManager) GetOutgoingNodeNumByReplica(replica *Replica) map[string]int32 { rm.rwmutex.RLock() diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 35398beb9c..d41c0a551c 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -20,6 +20,8 @@ import ( "context" "fmt" "os" + "strconv" + "strings" "sync" "syscall" "time" @@ -40,6 +42,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/balance" "github.com/milvus-io/milvus/internal/querycoordv2/checkers" "github.com/milvus-io/milvus/internal/querycoordv2/dist" @@ -54,6 +57,7 @@ import ( "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -466,6 +470,9 @@ func (s *Server) startQueryCoord() error { go s.handleNodeUpLoop() go s.watchNodes(revision) + // watch load config changes + s.watchLoadConfigChanges() + // check whether old node exist, if yes suspend auto balance until all old nodes down s.updateBalanceConfigLoop(s.ctx) @@ -840,3 +847,62 @@ func (s *Server) updateBalanceConfig() bool { log.RatedDebug(10, "old query node exist", zap.Strings("sessions", lo.Keys(sessions))) return false } + +func (s *Server) watchLoadConfigChanges() { + replicaNumHandler := config.NewHandler("watchReplicaNumberChanges", func(e *config.Event) { + log.Info("watch load config changes", zap.String("key", e.Key), zap.String("value", e.Value), zap.String("type", e.EventType)) + + collectionIDs := s.meta.GetAll() + if len(collectionIDs) == 0 { + log.Warn("no collection loaded, skip to trigger update load config") + return + } + + replicaNum, err := strconv.ParseInt(e.Value, 10, 64) + if err != nil { + log.Warn("invalid cluster level load config, skip it", zap.String("key", e.Key), zap.String("value", e.Value)) + return + } + if replicaNum <= 0 { + log.Info("invalid cluster level load config, skip it", zap.Int64("replica_num", replicaNum)) + return + } + rgs := paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.GetAsStrings() + + s.UpdateLoadConfig(s.ctx, &querypb.UpdateLoadConfigRequest{ + CollectionIDs: collectionIDs, + ReplicaNumber: int32(replicaNum), + ResourceGroups: rgs, + }) + }) + paramtable.Get().Watch(paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.Key, replicaNumHandler) + + rgHandler := config.NewHandler("watchResourceGroupChanges", func(e *config.Event) { + log.Info("watch load config changes", zap.String("key", e.Key), zap.String("value", e.Value), zap.String("type", e.EventType)) + collectionIDs := s.meta.GetAll() + if len(collectionIDs) == 0 { + log.Warn("no collection loaded, skip to trigger update load config") + return + } + + if len(e.Value) == 0 { + log.Warn("invalid cluster level load config, skip it", zap.String("key", e.Key), zap.String("value", e.Value)) + return + } + + rgs := strings.Split(e.Value, ",") + rgs = lo.Map(rgs, func(rg string, _ int) string { return strings.TrimSpace(rg) }) + if len(rgs) == 0 { + log.Info("invalid cluster level load config, skip it", zap.Strings("resource_groups", rgs)) + return + } + + replicaNum := paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.GetAsInt64() + s.UpdateLoadConfig(s.ctx, &querypb.UpdateLoadConfigRequest{ + CollectionIDs: collectionIDs, + ReplicaNumber: int32(replicaNum), + ResourceGroups: rgs, + }) + }) + paramtable.Get().Watch(paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.Key, rgHandler) +} diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 95ca6cead9..42709e2d7a 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -25,6 +25,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -239,24 +240,59 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection } } - if err := s.checkResourceGroup(req.GetCollectionID(), req.GetResourceGroups()); err != nil { - msg := "failed to load collection" - log.Warn(msg, zap.Error(err)) - metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc() - return merr.Status(errors.Wrap(err, msg)), nil + if req.GetReplicaNumber() <= 0 { + log.Info("request doesn't indicate the number of replicas, set it to 1") + req.ReplicaNumber = 1 + } + + if len(req.GetResourceGroups()) == 0 { + log.Info(fmt.Sprintf("request doesn't indicate the resource groups, set it to %s", meta.DefaultResourceGroupName)) + req.ResourceGroups = []string{meta.DefaultResourceGroupName} + } + + var loadJob job.Job + collection := s.meta.GetCollection(req.GetCollectionID()) + if collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded { + // if collection is loaded, check if collection is loaded with the same replica number and resource groups + // if replica number or resource group changes, switch to update load config + collectionUsedRG := s.meta.ReplicaManager.GetResourceGroupByCollection(collection.GetCollectionID()).Collect() + left, right := lo.Difference(collectionUsedRG, req.GetResourceGroups()) + rgChanged := len(left) > 0 || len(right) > 0 + replicaChanged := collection.GetReplicaNumber() != req.GetReplicaNumber() + if replicaChanged || rgChanged { + log.Warn("collection is loaded with different replica number or resource group, switch to update load config", + zap.Int32("oldReplicaNumber", collection.GetReplicaNumber()), + zap.Strings("oldResourceGroups", collectionUsedRG)) + updateReq := &querypb.UpdateLoadConfigRequest{ + CollectionIDs: []int64{req.GetCollectionID()}, + ReplicaNumber: req.GetReplicaNumber(), + ResourceGroups: req.GetResourceGroups(), + } + loadJob = job.NewUpdateLoadConfigJob( + ctx, + updateReq, + s.meta, + s.targetMgr, + s.targetObserver, + s.collectionObserver, + ) + } + } + + if loadJob == nil { + loadJob = job.NewLoadCollectionJob(ctx, + req, + s.dist, + s.meta, + s.broker, + s.cluster, + s.targetMgr, + s.targetObserver, + s.collectionObserver, + s.nodeMgr, + ) } - loadJob := job.NewLoadCollectionJob(ctx, - req, - s.dist, - s.meta, - s.broker, - s.cluster, - s.targetMgr, - s.targetObserver, - s.collectionObserver, - s.nodeMgr, - ) s.jobScheduler.Add(loadJob) err := loadJob.Wait() if err != nil { @@ -360,13 +396,6 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions } } - if err := s.checkResourceGroup(req.GetCollectionID(), req.GetResourceGroups()); err != nil { - msg := "failed to load partitions" - log.Warn(msg, zap.Error(err)) - metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc() - return merr.Status(errors.Wrap(err, msg)), nil - } - loadJob := job.NewLoadPartitionJob(ctx, req, s.dist, @@ -391,23 +420,6 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions return merr.Success(), nil } -func (s *Server) checkResourceGroup(collectionID int64, resourceGroups []string) error { - if len(resourceGroups) != 0 { - collectionUsedRG := s.meta.ReplicaManager.GetResourceGroupByCollection(collectionID) - for _, rgName := range resourceGroups { - if len(collectionUsedRG) > 0 && !collectionUsedRG.Contain(rgName) { - return merr.WrapErrParameterInvalid("created resource group(s)", rgName, "given resource group not found") - } - - if len(resourceGroups) > 1 && rgName == meta.DefaultResourceGroupName { - return merr.WrapErrParameterInvalid("no default resource group mixed with the other resource group(s)", rgName) - } - } - } - - return nil -} - func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) { log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetCollectionID()), @@ -1168,3 +1180,79 @@ func (s *Server) DescribeResourceGroup(ctx context.Context, req *querypb.Describ } return resp, nil } + +func (s *Server) UpdateLoadConfig(ctx context.Context, req *querypb.UpdateLoadConfigRequest) (*commonpb.Status, error) { + log := log.Ctx(ctx).With( + zap.Int64s("collectionIDs", req.GetCollectionIDs()), + zap.Int32("replicaNumber", req.GetReplicaNumber()), + zap.Strings("resourceGroups", req.GetResourceGroups()), + ) + + log.Info("update load config request received") + if err := merr.CheckHealthy(s.State()); err != nil { + msg := "failed to update load config" + log.Warn(msg, zap.Error(err)) + return merr.Status(errors.Wrap(err, msg)), nil + } + + jobs := make([]job.Job, 0, len(req.GetCollectionIDs())) + for _, collectionID := range req.GetCollectionIDs() { + collection := s.meta.GetCollection(collectionID) + if collection == nil || collection.GetStatus() != querypb.LoadStatus_Loaded { + err := merr.WrapErrCollectionNotLoaded(collectionID) + log.Warn("failed to update load config", zap.Error(err)) + continue + } + + collectionUsedRG := s.meta.ReplicaManager.GetResourceGroupByCollection(collection.GetCollectionID()).Collect() + left, right := lo.Difference(collectionUsedRG, req.GetResourceGroups()) + rgChanged := len(left) > 0 || len(right) > 0 + replicaChanged := collection.GetReplicaNumber() != req.GetReplicaNumber() + + subReq := proto.Clone(req).(*querypb.UpdateLoadConfigRequest) + subReq.CollectionIDs = []int64{collectionID} + if len(req.ResourceGroups) == 0 { + subReq.ResourceGroups = collectionUsedRG + rgChanged = false + } + + if subReq.GetReplicaNumber() == 0 { + subReq.ReplicaNumber = collection.GetReplicaNumber() + replicaChanged = false + } + + if !replicaChanged && !rgChanged { + log.Info("no need to update load config", zap.Int64("collectionID", collectionID)) + continue + } + + updateJob := job.NewUpdateLoadConfigJob( + ctx, + subReq, + s.meta, + s.targetMgr, + s.targetObserver, + s.collectionObserver, + ) + + jobs = append(jobs, updateJob) + s.jobScheduler.Add(updateJob) + } + + var err error + for _, job := range jobs { + subErr := job.Wait() + if subErr != nil { + err = merr.Combine(err, subErr) + } + } + + if err != nil { + msg := "failed to update load config" + log.Warn(msg, zap.Error(err)) + return merr.Status(errors.Wrap(err, msg)), nil + } + log.Info("update load config request finished") + + return merr.Success(), nil +} diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index a8b7097367..7e80f82fc9 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -869,7 +869,7 @@ func (suite *ServiceSuite) TestLoadCollectionFailed() { } req := &querypb.LoadCollectionRequest{ - CollectionID: 0, + CollectionID: 1001, ReplicaNumber: 2, ResourceGroups: []string{meta.DefaultResourceGroupName, "rg"}, } diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 3c7941e497..6e959e6831 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -276,22 +276,23 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) { // get segment's replica first, then get shard leader by replica replica := ex.meta.ReplicaManager.GetByCollectionAndNode(task.CollectionID(), action.Node()) if replica == nil { - msg := "node doesn't belong to any replica" + msg := "node doesn't belong to any replica, try to send release to worker" err := merr.WrapErrNodeNotAvailable(action.Node()) log.Warn(msg, zap.Error(err)) - return + dstNode = action.Node() + req.NeedTransfer = false + } else { + view := ex.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(action.Shard())) + if view == nil { + msg := "no shard leader for the segment to execute releasing" + err := merr.WrapErrChannelNotFound(task.Shard(), "shard delegator not found") + log.Warn(msg, zap.Error(err)) + return + } + dstNode = view.ID + log = log.With(zap.Int64("shardLeader", view.ID)) + req.NeedTransfer = true } - view := ex.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(action.Shard())) - if view == nil { - msg := "no shard leader for the segment to execute releasing" - err := merr.WrapErrChannelNotFound(task.Shard(), "shard delegator not found") - log.Warn(msg, zap.Error(err)) - return - } - - dstNode = view.ID - log = log.With(zap.Int64("shardLeader", view.ID)) - req.NeedTransfer = true } } diff --git a/internal/querycoordv2/utils/meta.go b/internal/querycoordv2/utils/meta.go index 4fbb5a663a..8a20eb3bc7 100644 --- a/internal/querycoordv2/utils/meta.go +++ b/internal/querycoordv2/utils/meta.go @@ -110,7 +110,7 @@ func RecoverAllCollection(m *meta.Meta) { } } -func checkResourceGroup(m *meta.Meta, resourceGroups []string, replicaNumber int32) (map[string]int, error) { +func AssignReplica(m *meta.Meta, resourceGroups []string, replicaNumber int32, checkNodeNum bool) (map[string]int, error) { if len(resourceGroups) != 0 && len(resourceGroups) != 1 && len(resourceGroups) != int(replicaNumber) { return nil, errors.Errorf( "replica=[%d] resource group=[%s], resource group num can only be 0, 1 or same as replica number", replicaNumber, strings.Join(resourceGroups, ",")) @@ -142,10 +142,13 @@ func checkResourceGroup(m *meta.Meta, resourceGroups []string, replicaNumber int if err != nil { return nil, err } + if num > len(nodes) { - err := merr.WrapErrResourceGroupNodeNotEnough(rgName, len(nodes), num) log.Warn("failed to check resource group", zap.Error(err)) - return nil, err + if checkNodeNum { + err := merr.WrapErrResourceGroupNodeNotEnough(rgName, len(nodes), num) + return nil, err + } } } return replicaNumInRG, nil @@ -153,7 +156,7 @@ func checkResourceGroup(m *meta.Meta, resourceGroups []string, replicaNumber int // SpawnReplicasWithRG spawns replicas in rgs one by one for given collection. func SpawnReplicasWithRG(m *meta.Meta, collection int64, resourceGroups []string, replicaNumber int32, channels []string) ([]*meta.Replica, error) { - replicaNumInRG, err := checkResourceGroup(m, resourceGroups, replicaNumber) + replicaNumInRG, err := AssignReplica(m, resourceGroups, replicaNumber, true) if err != nil { return nil, err } @@ -167,3 +170,76 @@ func SpawnReplicasWithRG(m *meta.Meta, collection int64, resourceGroups []string RecoverReplicaOfCollection(m, collection) return replicas, nil } + +func ReassignReplicaToRG( + m *meta.Meta, + collectionID int64, + newReplicaNumber int32, + newResourceGroups []string, +) (map[string]int, map[string][]*meta.Replica, []int64, error) { + // assign all replicas to newResourceGroups, got each rg's replica number + newAssignment, err := AssignReplica(m, newResourceGroups, newReplicaNumber, false) + if err != nil { + return nil, nil, nil, err + } + + replicas := m.ReplicaManager.GetByCollection(collectionID) + replicasInRG := lo.GroupBy(replicas, func(replica *meta.Replica) string { + return replica.GetResourceGroup() + }) + + // if rg doesn't exist in newResourceGroups, add all replicas to candidateToRelease + candidateToRelease := make([]*meta.Replica, 0) + outRg, _ := lo.Difference(lo.Keys(replicasInRG), newResourceGroups) + if len(outRg) > 0 { + for _, rgName := range outRg { + candidateToRelease = append(candidateToRelease, replicasInRG[rgName]...) + } + } + + // if rg has more replicas than newAssignment's replica number, add the rest replicas to candidateToMove + // also set the lacked replica number as rg's replicaToSpawn value + replicaToSpawn := make(map[string]int, len(newAssignment)) + for rgName, count := range newAssignment { + if len(replicasInRG[rgName]) > count { + candidateToRelease = append(candidateToRelease, replicasInRG[rgName][count:]...) + } else { + lack := count - len(replicasInRG[rgName]) + if lack > 0 { + replicaToSpawn[rgName] = lack + } + } + } + + candidateIdx := 0 + // if newReplicaNumber is small than current replica num, pick replica from candidate and add it to replicasToRelease + replicasToRelease := make([]int64, 0) + replicaReleaseCounter := len(replicas) - int(newReplicaNumber) + for replicaReleaseCounter > 0 { + replicasToRelease = append(replicasToRelease, candidateToRelease[candidateIdx].GetID()) + replicaReleaseCounter -= 1 + candidateIdx += 1 + } + + // if candidateToMove is not empty, pick replica from candidate add add it to replicaToTransfer + // which means if rg has less replicas than expected, we transfer some existed replica to it. + replicaToTransfer := make(map[string][]*meta.Replica) + if candidateIdx < len(candidateToRelease) { + for rg := range replicaToSpawn { + for replicaToSpawn[rg] > 0 && candidateIdx < len(candidateToRelease) { + if replicaToTransfer[rg] == nil { + replicaToTransfer[rg] = make([]*meta.Replica, 0) + } + replicaToTransfer[rg] = append(replicaToTransfer[rg], candidateToRelease[candidateIdx]) + candidateIdx += 1 + replicaToSpawn[rg] -= 1 + } + + if replicaToSpawn[rg] == 0 { + delete(replicaToSpawn, rg) + } + } + } + + return replicaToSpawn, replicaToTransfer, replicasToRelease, nil +} diff --git a/internal/rootcoord/alter_collection_task.go b/internal/rootcoord/alter_collection_task.go index 779e631bcf..7d5f9fa693 100644 --- a/internal/rootcoord/alter_collection_task.go +++ b/internal/rootcoord/alter_collection_task.go @@ -21,13 +21,16 @@ import ( "fmt" "github.com/cockroachdb/errors" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/proxyutil" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" ) type alterCollectionTask struct { @@ -57,7 +60,7 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error { } newColl := oldColl.Clone() - updateCollectionProperties(newColl, a.Req.GetProperties()) + newColl.Properties = MergeProperties(oldColl.Properties, a.Req.GetProperties()) ts := a.GetTs() redoTask := newBaseRedoTask(a.core.stepExecutor) @@ -85,27 +88,34 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error { opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_AlterCollection)}, }) + oldReplicaNumber, _ := common.DatabaseLevelReplicaNumber(oldColl.Properties) + oldResourceGroups, _ := common.DatabaseLevelResourceGroups(oldColl.Properties) + newReplicaNumber, _ := common.DatabaseLevelReplicaNumber(newColl.Properties) + newResourceGroups, _ := common.DatabaseLevelResourceGroups(newColl.Properties) + left, right := lo.Difference(oldResourceGroups, newResourceGroups) + rgChanged := len(left) > 0 || len(right) > 0 + replicaChanged := oldReplicaNumber != newReplicaNumber + if rgChanged || replicaChanged { + log.Ctx(ctx).Warn("alter collection trigger update load config", + zap.Int64("collectionID", oldColl.CollectionID), + zap.Int64("oldReplicaNumber", oldReplicaNumber), + zap.Int64("newReplicaNumber", newReplicaNumber), + zap.Strings("oldResourceGroups", oldResourceGroups), + zap.Strings("newResourceGroups", newResourceGroups), + ) + redoTask.AddAsyncStep(NewSimpleStep("", func(ctx context.Context) ([]nestedStep, error) { + resp, err := a.core.queryCoord.UpdateLoadConfig(ctx, &querypb.UpdateLoadConfigRequest{ + CollectionIDs: []int64{oldColl.CollectionID}, + ReplicaNumber: int32(newReplicaNumber), + ResourceGroups: newResourceGroups, + }) + if err := merr.CheckRPCCall(resp, err); err != nil { + log.Warn("failed to trigger update load config for collection", zap.Int64("collectionID", newColl.CollectionID), zap.Error(err)) + return nil, err + } + return nil, nil + })) + } + return redoTask.Execute(ctx) } - -func updateCollectionProperties(coll *model.Collection, updatedProps []*commonpb.KeyValuePair) { - props := make(map[string]string) - for _, prop := range coll.Properties { - props[prop.Key] = prop.Value - } - - for _, prop := range updatedProps { - props[prop.Key] = prop.Value - } - - propKV := make([]*commonpb.KeyValuePair, 0) - - for key, value := range props { - propKV = append(propKV, &commonpb.KeyValuePair{ - Key: key, - Value: value, - }) - } - - coll.Properties = propKV -} diff --git a/internal/rootcoord/alter_collection_task_test.go b/internal/rootcoord/alter_collection_task_test.go index 3252534986..7993fd5cce 100644 --- a/internal/rootcoord/alter_collection_task_test.go +++ b/internal/rootcoord/alter_collection_task_test.go @@ -229,7 +229,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) { Value: "true", }, } - updateCollectionProperties(coll, updateProps1) + coll.Properties = MergeProperties(coll.Properties, updateProps1) assert.Contains(t, coll.Properties, &commonpb.KeyValuePair{ Key: common.CollectionTTLConfigKey, @@ -247,7 +247,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) { Value: "2", }, } - updateCollectionProperties(coll, updateProps2) + coll.Properties = MergeProperties(coll.Properties, updateProps2) assert.Contains(t, coll.Properties, &commonpb.KeyValuePair{ Key: common.CollectionTTLConfigKey, @@ -265,7 +265,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) { Value: "true", }, } - updateCollectionProperties(coll, updatePropsIso) + coll.Properties = MergeProperties(coll.Properties, updatePropsIso) assert.Contains(t, coll.Properties, &commonpb.KeyValuePair{ Key: common.PartitionKeyIsolationKey, Value: "true", diff --git a/internal/rootcoord/alter_database_task.go b/internal/rootcoord/alter_database_task.go index 7f7340dfc5..3fef13e504 100644 --- a/internal/rootcoord/alter_database_task.go +++ b/internal/rootcoord/alter_database_task.go @@ -21,11 +21,16 @@ import ( "fmt" "github.com/cockroachdb/errors" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" ) type alterDatabaseTask struct { @@ -55,7 +60,7 @@ func (a *alterDatabaseTask) Execute(ctx context.Context) error { } newDB := oldDB.Clone() - ret := updateProperties(oldDB.Properties, a.Req.GetProperties()) + ret := MergeProperties(oldDB.Properties, a.Req.GetProperties()) newDB.Properties = ret ts := a.GetTs() @@ -67,10 +72,48 @@ func (a *alterDatabaseTask) Execute(ctx context.Context) error { ts: ts, }) + oldReplicaNumber, _ := common.DatabaseLevelReplicaNumber(oldDB.Properties) + oldResourceGroups, _ := common.DatabaseLevelResourceGroups(oldDB.Properties) + newReplicaNumber, _ := common.DatabaseLevelReplicaNumber(newDB.Properties) + newResourceGroups, _ := common.DatabaseLevelResourceGroups(newDB.Properties) + left, right := lo.Difference(oldResourceGroups, newResourceGroups) + rgChanged := len(left) > 0 || len(right) > 0 + replicaChanged := oldReplicaNumber != newReplicaNumber + if rgChanged || replicaChanged { + log.Ctx(ctx).Warn("alter database trigger update load config", + zap.Int64("dbID", oldDB.ID), + zap.Int64("oldReplicaNumber", oldReplicaNumber), + zap.Int64("newReplicaNumber", newReplicaNumber), + zap.Strings("oldResourceGroups", oldResourceGroups), + zap.Strings("newResourceGroups", newResourceGroups), + ) + redoTask.AddAsyncStep(NewSimpleStep("", func(ctx context.Context) ([]nestedStep, error) { + colls, err := a.core.meta.ListCollections(ctx, oldDB.Name, a.ts, true) + if err != nil { + log.Warn("failed to trigger update load config for database", zap.Int64("dbID", oldDB.ID), zap.Error(err)) + return nil, err + } + if len(colls) == 0 { + return nil, nil + } + + resp, err := a.core.queryCoord.UpdateLoadConfig(ctx, &querypb.UpdateLoadConfigRequest{ + CollectionIDs: lo.Map(colls, func(coll *model.Collection, _ int) int64 { return coll.CollectionID }), + ReplicaNumber: int32(newReplicaNumber), + ResourceGroups: newResourceGroups, + }) + if err := merr.CheckRPCCall(resp, err); err != nil { + log.Warn("failed to trigger update load config for database", zap.Int64("dbID", oldDB.ID), zap.Error(err)) + return nil, err + } + return nil, nil + })) + } + return redoTask.Execute(ctx) } -func updateProperties(oldProps []*commonpb.KeyValuePair, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair { +func MergeProperties(oldProps []*commonpb.KeyValuePair, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair { props := make(map[string]string) for _, prop := range oldProps { props[prop.Key] = prop.Value diff --git a/internal/rootcoord/alter_database_task_test.go b/internal/rootcoord/alter_database_task_test.go index 31b0ce3e6e..aa90efc9a2 100644 --- a/internal/rootcoord/alter_database_task_test.go +++ b/internal/rootcoord/alter_database_task_test.go @@ -147,7 +147,7 @@ func Test_alterDatabaseTask_Execute(t *testing.T) { }, } - ret := updateProperties(oldProps, updateProps1) + ret := MergeProperties(oldProps, updateProps1) assert.Contains(t, ret, &commonpb.KeyValuePair{ Key: common.CollectionTTLConfigKey, @@ -165,7 +165,7 @@ func Test_alterDatabaseTask_Execute(t *testing.T) { Value: "2", }, } - ret2 := updateProperties(ret, updateProps2) + ret2 := MergeProperties(ret, updateProps2) assert.Contains(t, ret2, &commonpb.KeyValuePair{ Key: common.CollectionTTLConfigKey, diff --git a/internal/util/mock/grpc_querycoord_client.go b/internal/util/mock/grpc_querycoord_client.go index 310b930bb2..eb1932c8df 100644 --- a/internal/util/mock/grpc_querycoord_client.go +++ b/internal/util/mock/grpc_querycoord_client.go @@ -181,3 +181,7 @@ func (m *GrpcQueryCoordClient) TransferChannel(ctx context.Context, req *querypb func (m *GrpcQueryCoordClient) CheckQueryNodeDistribution(ctx context.Context, req *querypb.CheckQueryNodeDistributionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { return &commonpb.Status{}, m.Err } + +func (m *GrpcQueryCoordClient) UpdateLoadConfig(ctx context.Context, req *querypb.UpdateLoadConfigRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + return &commonpb.Status{}, m.Err +} diff --git a/pkg/config/etcd_source.go b/pkg/config/etcd_source.go index 8127f72f38..9885ae746a 100644 --- a/pkg/config/etcd_source.go +++ b/pkg/config/etcd_source.go @@ -71,6 +71,7 @@ func NewEtcdSource(etcdInfo *EtcdInfo) (*EtcdSource, error) { keyPrefix: etcdInfo.KeyPrefix, } es.configRefresher = newRefresher(etcdInfo.RefreshInterval, es.refreshConfigurations) + es.configRefresher.start(es.GetSourceName()) return es, nil } @@ -92,7 +93,6 @@ func (es *EtcdSource) GetConfigurations() (map[string]string, error) { if err != nil { return nil, err } - es.configRefresher.start(es.GetSourceName()) es.RLock() for key, value := range es.currentConfigs { configMap[key] = value diff --git a/tests/integration/minicluster_v2.go b/tests/integration/minicluster_v2.go index 8c6eaf266e..37cf1459b4 100644 --- a/tests/integration/minicluster_v2.go +++ b/tests/integration/minicluster_v2.go @@ -67,18 +67,27 @@ import ( var params *paramtable.ComponentParam = paramtable.Get() +var ( + initOnce sync.Once + configMap map[string]string +) + func DefaultParams() map[string]string { - testPath := fmt.Sprintf("integration-test-%d", time.Now().Unix()) - return map[string]string{ - params.EtcdCfg.RootPath.Key: testPath, - params.MinioCfg.RootPath.Key: testPath, - //"runtime.role": typeutil.StandaloneRole, - //params.IntegrationTestCfg.IntegrationMode.Key: "true", - params.LocalStorageCfg.Path.Key: path.Join("/tmp", testPath), - params.CommonCfg.StorageType.Key: "local", - params.DataNodeCfg.MemoryForceSyncEnable.Key: "false", // local execution will print too many logs - params.CommonCfg.GracefulStopTimeout.Key: "30", - } + initOnce.Do(func() { + testPath := fmt.Sprintf("integration-test-%d", time.Now().Unix()) + + // Notice: don't use ParamItem.Key here, the config key will be empty before param table init + configMap = map[string]string{ + "etcd.rootPath": testPath, + "minio.rootPath": testPath, + "localStorage.path": path.Join("/tmp", testPath), + "common.storageType": "local", + "dataNode.memory.forceSyncEnable": "false", // local execution will print too many logs + "common.gracefulStopTimeout": "30", + } + }) + + return configMap } type MiniClusterV2 struct { diff --git a/tests/integration/rbac/privilege_group_test.go b/tests/integration/rbac/privilege_group_test.go index 11a574911a..da89b603a4 100644 --- a/tests/integration/rbac/privilege_group_test.go +++ b/tests/integration/rbac/privilege_group_test.go @@ -34,12 +34,11 @@ type PrivilegeGroupTestSuite struct { } func (s *PrivilegeGroupTestSuite) SetupSuite() { + s.MiniClusterSuite.SetupSuite() paramtable.Init() paramtable.Get().Save(paramtable.Get().QueryCoordCfg.BalanceCheckInterval.Key, "1000") paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "1") paramtable.Get().Save(paramtable.Get().CommonCfg.AuthorizationEnabled.Key, "true") - - s.Require().NoError(s.SetupEmbedEtcd()) } func (s *PrivilegeGroupTestSuite) TestPrivilegeGroup() { diff --git a/tests/integration/replicas/load/load_test.go b/tests/integration/replicas/load/load_test.go index cac163103d..d91fa3df19 100644 --- a/tests/integration/replicas/load/load_test.go +++ b/tests/integration/replicas/load/load_test.go @@ -19,11 +19,13 @@ package balance import ( "context" "fmt" + "path" "strings" "testing" "time" "github.com/stretchr/testify/suite" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -31,6 +33,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/tests/integration" @@ -47,11 +51,22 @@ type LoadTestSuite struct { } func (s *LoadTestSuite) SetupSuite() { + s.Require().NoError(s.SetupEmbedEtcd()) + + // setup mini cluster to use embed etcd + endpoints := etcd.GetEmbedEtcdEndpoints(s.EtcdServer) + val := strings.Join(endpoints, ",") + // setup env value to init etcd source + s.T().Setenv("etcd.endpoints", val) + + for key, value := range integration.DefaultParams() { + s.T().Setenv(key, value) + } + paramtable.Init() paramtable.Get().Save(paramtable.Get().QueryCoordCfg.BalanceCheckInterval.Key, "1000") paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "1") - - s.Require().NoError(s.SetupEmbedEtcd()) + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.CheckNodeInReplicaInterval.Key, "5") } func (s *LoadTestSuite) loadCollection(collectionName string, db string, replica int, rgs []string) { @@ -83,110 +98,11 @@ func (s *LoadTestSuite) releaseCollection(db, collectionName string) { s.True(merr.Ok(status)) } -func (s *LoadTestSuite) TestLoadWithDatabaseLevelConfig() { - ctx := context.Background() - s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{ - DBName: dbName, - Dim: dim, - CollectionName: collectionName, - ChannelNum: 1, - SegmentNum: 3, - RowNumPerSegment: 2000, - }) - - // prepare resource groups - rgNum := 3 - rgs := make([]string, 0) - for i := 0; i < rgNum; i++ { - rgs = append(rgs, fmt.Sprintf("rg_%d", i)) - s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{ - ResourceGroup: rgs[i], - Config: &rgpb.ResourceGroupConfig{ - Requests: &rgpb.ResourceGroupLimit{ - NodeNum: 1, - }, - Limits: &rgpb.ResourceGroupLimit{ - NodeNum: 1, - }, - - TransferFrom: []*rgpb.ResourceGroupTransfer{ - { - ResourceGroup: meta.DefaultResourceGroupName, - }, - }, - TransferTo: []*rgpb.ResourceGroupTransfer{ - { - ResourceGroup: meta.DefaultResourceGroupName, - }, - }, - }, - }) - } - - resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{}) - s.NoError(err) - s.True(merr.Ok(resp.GetStatus())) - s.Len(resp.GetResourceGroups(), rgNum+1) - - for i := 1; i < rgNum; i++ { - s.Cluster.AddQueryNode() - } - - s.Eventually(func() bool { - matchCounter := 0 - for _, rg := range rgs { - resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{ - ResourceGroup: rg, - }) - s.NoError(err) - s.True(merr.Ok(resp.GetStatus())) - if len(resp1.ResourceGroup.Nodes) == 1 { - matchCounter += 1 - } - } - return matchCounter == rgNum - }, 30*time.Second, time.Second) - - status, err := s.Cluster.Proxy.AlterDatabase(ctx, &milvuspb.AlterDatabaseRequest{ - DbName: "default", - Properties: []*commonpb.KeyValuePair{ - { - Key: common.DatabaseReplicaNumber, - Value: "3", - }, - { - Key: common.DatabaseResourceGroups, - Value: strings.Join(rgs, ","), - }, - }, - }) - s.NoError(err) - s.True(merr.Ok(status)) - - resp1, err := s.Cluster.Proxy.DescribeDatabase(ctx, &milvuspb.DescribeDatabaseRequest{ - DbName: "default", - }) - s.NoError(err) - s.True(merr.Ok(resp1.Status)) - s.Len(resp1.GetProperties(), 2) - - // load collection without specified replica and rgs - s.loadCollection(collectionName, dbName, 0, nil) - resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ - DbName: dbName, - CollectionName: collectionName, - }) - s.NoError(err) - s.True(merr.Ok(resp2.Status)) - s.Len(resp2.GetReplicas(), 3) - s.releaseCollection(dbName, collectionName) -} - func (s *LoadTestSuite) TestLoadWithPredefineCollectionLevelConfig() { ctx := context.Background() // prepare resource groups - rgNum := 3 + rgNum := 5 rgs := make([]string, 0) for i := 0; i < rgNum; i++ { rgs = append(rgs, fmt.Sprintf("rg_%d", i)) @@ -246,7 +162,7 @@ func (s *LoadTestSuite) TestLoadWithPredefineCollectionLevelConfig() { SegmentNum: 3, RowNumPerSegment: 2000, ReplicaNumber: 3, - ResourceGroups: rgs, + ResourceGroups: rgs[:3], }) // load collection without specified replica and rgs @@ -258,6 +174,57 @@ func (s *LoadTestSuite) TestLoadWithPredefineCollectionLevelConfig() { s.NoError(err) s.True(merr.Ok(resp2.Status)) s.Len(resp2.GetReplicas(), 3) + + // modify config, increase replica number + resp3, err := s.Cluster.Proxy.AlterCollection(ctx, &milvuspb.AlterCollectionRequest{ + CollectionName: collectionName, + Properties: []*commonpb.KeyValuePair{ + { + Key: common.DatabaseReplicaNumber, + Value: "5", + }, + { + Key: common.DatabaseResourceGroups, + Value: strings.Join(rgs, ","), + }, + }, + }) + s.NoError(err) + s.True(merr.Ok(resp3)) + s.Eventually(func() bool { + resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp2.Status)) + return len(resp2.GetReplicas()) == 5 + }, 30*time.Second, time.Second) + + // modify config, decrease replica number + resp4, err := s.Cluster.Proxy.AlterCollection(ctx, &milvuspb.AlterCollectionRequest{ + CollectionName: collectionName, + Properties: []*commonpb.KeyValuePair{ + { + Key: common.DatabaseReplicaNumber, + Value: "2", + }, + { + Key: common.DatabaseResourceGroups, + Value: strings.Join(rgs[:2], ","), + }, + }, + }) + s.NoError(err) + s.True(merr.Ok(resp4)) + s.Eventually(func() bool { + resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp2.Status)) + return len(resp2.GetReplicas()) == 2 + }, 30*time.Second, time.Second) + s.releaseCollection(dbName, collectionName) } @@ -265,7 +232,7 @@ func (s *LoadTestSuite) TestLoadWithPredefineDatabaseLevelConfig() { ctx := context.Background() // prepare resource groups - rgNum := 3 + rgNum := 5 rgs := make([]string, 0) for i := 0; i < rgNum; i++ { rgs = append(rgs, fmt.Sprintf("rg_%d", i)) @@ -327,7 +294,7 @@ func (s *LoadTestSuite) TestLoadWithPredefineDatabaseLevelConfig() { }, { Key: common.DatabaseResourceGroups, - Value: strings.Join(rgs, ","), + Value: strings.Join(rgs[:3], ","), }, }, }) @@ -352,6 +319,59 @@ func (s *LoadTestSuite) TestLoadWithPredefineDatabaseLevelConfig() { s.NoError(err) s.True(merr.Ok(resp2.Status)) s.Len(resp2.GetReplicas(), 3) + + // modify config, increase replica number + resp3, err := s.Cluster.Proxy.AlterDatabase(ctx, &milvuspb.AlterDatabaseRequest{ + DbName: newDbName, + Properties: []*commonpb.KeyValuePair{ + { + Key: common.DatabaseReplicaNumber, + Value: "5", + }, + { + Key: common.DatabaseResourceGroups, + Value: strings.Join(rgs, ","), + }, + }, + }) + s.NoError(err) + s.True(merr.Ok(resp3)) + s.Eventually(func() bool { + resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: newDbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp2.Status)) + return len(resp2.GetReplicas()) == 5 + }, 30*time.Second, time.Second) + + // modify config, decrease replica number + resp4, err := s.Cluster.Proxy.AlterDatabase(ctx, &milvuspb.AlterDatabaseRequest{ + DbName: newDbName, + Properties: []*commonpb.KeyValuePair{ + { + Key: common.DatabaseReplicaNumber, + Value: "2", + }, + { + Key: common.DatabaseResourceGroups, + Value: strings.Join(rgs[:2], ","), + }, + }, + }) + s.NoError(err) + s.True(merr.Ok(resp4)) + s.Eventually(func() bool { + resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: newDbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp2.Status)) + return len(resp2.GetReplicas()) == 2 + }, 30*time.Second, time.Second) + s.releaseCollection(newDbName, collectionName) } @@ -359,7 +379,7 @@ func (s *LoadTestSuite) TestLoadWithPredefineClusterLevelConfig() { ctx := context.Background() // prepare resource groups - rgNum := 3 + rgNum := 5 rgs := make([]string, 0) for i := 0; i < rgNum; i++ { rgs = append(rgs, fmt.Sprintf("rg_%d", i)) @@ -420,9 +440,7 @@ func (s *LoadTestSuite) TestLoadWithPredefineClusterLevelConfig() { RowNumPerSegment: 2000, }) paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.Key, "3") - defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.Key) - paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.Key, strings.Join(rgs, ",")) - defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.Key) + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.Key, strings.Join(rgs[:3], ",")) // load collection without specified replica and rgs s.loadCollection(collectionName, dbName, 0, nil) @@ -433,6 +451,385 @@ func (s *LoadTestSuite) TestLoadWithPredefineClusterLevelConfig() { s.NoError(err) s.True(merr.Ok(resp2.Status)) s.Len(resp2.GetReplicas(), 3) + + paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.Key) + paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.Key) + + // modify load config, increase replicas + endpoints := etcd.GetEmbedEtcdEndpoints(s.EtcdServer) + configPrefix := path.Join(paramtable.Get().EtcdCfg.RootPath.GetValue(), "config") + log.Info("endpoints", zap.Strings("endpoints", endpoints), zap.String("configPrefix", configPrefix)) + s.Cluster.EtcdCli.Put(ctx, path.Join(configPrefix, paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.Key), "5") + s.Cluster.EtcdCli.Put(ctx, path.Join(configPrefix, paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.Key), strings.Join(rgs, ",")) + s.Eventually(func() bool { + resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp3.Status)) + return len(resp3.GetReplicas()) == 5 + }, 30*time.Second, 1*time.Second) + + // modify load config, decrease replicas + endpoints = etcd.GetEmbedEtcdEndpoints(s.EtcdServer) + log.Info("endpoints", zap.Strings("endpoints", endpoints), zap.String("configPrefix", configPrefix)) + s.Cluster.EtcdCli.Put(ctx, path.Join(configPrefix, paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.Key), "2") + s.Cluster.EtcdCli.Put(ctx, path.Join(configPrefix, paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.Key), strings.Join(rgs[:2], ",")) + s.Eventually(func() bool { + resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp3.Status)) + return len(resp3.GetReplicas()) == 2 + }, 30*time.Second, 1*time.Second) + + s.releaseCollection(dbName, collectionName) +} + +func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs() { + ctx := context.Background() + s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{ + DBName: dbName, + Dim: dim, + CollectionName: collectionName, + ChannelNum: 1, + SegmentNum: 3, + RowNumPerSegment: 2000, + }) + + // prepare resource groups + rgNum := 5 + rgs := make([]string, 0) + for i := 0; i < rgNum; i++ { + rgs = append(rgs, fmt.Sprintf("rg_%d", i)) + s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{ + ResourceGroup: rgs[i], + Config: &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + + TransferFrom: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: meta.DefaultResourceGroupName, + }, + }, + TransferTo: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: meta.DefaultResourceGroupName, + }, + }, + }, + }) + } + + resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + s.Len(resp.GetResourceGroups(), rgNum+1) + + for i := 1; i < rgNum; i++ { + s.Cluster.AddQueryNode() + } + + nodesInRG := make(map[string][]int64) + s.Eventually(func() bool { + matchCounter := 0 + for _, rg := range rgs { + resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{ + ResourceGroup: rg, + }) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + if len(resp1.ResourceGroup.Nodes) == 1 { + matchCounter += 1 + nodesInRG[rg] = []int64{resp1.ResourceGroup.Nodes[0].NodeId} + } + } + return matchCounter == rgNum + }, 30*time.Second, time.Second) + + // load collection + s.loadCollection(collectionName, dbName, 5, rgs) + resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp2.Status)) + s.Len(resp2.GetReplicas(), 5) + + // test load collection with dynamic update + s.loadCollection(collectionName, dbName, 3, rgs[:3]) + s.Eventually(func() bool { + resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp3.Status)) + return len(resp3.GetReplicas()) == 3 + }, 30*time.Second, 1*time.Second) + + s.Eventually(func() bool { + segmentNum, channelNum := 0, 0 + for _, qn := range s.Cluster.GetAllQueryNodes() { + resp, err := qn.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.Status)) + segmentNum += len(resp.Segments) + channelNum += len(resp.Channels) + } + return segmentNum == 9 && channelNum == 3 + }, 30*time.Second, 1*time.Second) + + s.loadCollection(collectionName, dbName, 2, rgs[3:]) + s.Eventually(func() bool { + resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp3.Status)) + return len(resp3.GetReplicas()) == 2 + }, 30*time.Second, 1*time.Second) + + s.Eventually(func() bool { + segmentNum, channelNum := 0, 0 + for _, qn := range s.Cluster.GetAllQueryNodes() { + resp, err := qn.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.Status)) + segmentNum += len(resp.Segments) + channelNum += len(resp.Channels) + } + return segmentNum == 6 && channelNum == 2 + }, 30*time.Second, 1*time.Second) + + // test load collection with dynamic update + s.loadCollection(collectionName, dbName, 5, rgs) + s.Eventually(func() bool { + resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp3.Status)) + return len(resp3.GetReplicas()) == 5 + }, 30*time.Second, 1*time.Second) + + s.releaseCollection(dbName, collectionName) +} + +func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_WithoutRG() { + ctx := context.Background() + s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{ + DBName: dbName, + Dim: dim, + CollectionName: collectionName, + ChannelNum: 1, + SegmentNum: 3, + RowNumPerSegment: 2000, + }) + + // prepare resource groups + for i := 1; i < 5; i++ { + s.Cluster.AddQueryNode() + } + + // load collection + s.loadCollection(collectionName, dbName, 5, nil) + resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp2.Status)) + s.Len(resp2.GetReplicas(), 5) + + // test load collection with dynamic update + s.loadCollection(collectionName, dbName, 3, nil) + s.Eventually(func() bool { + resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp3.Status)) + return len(resp3.GetReplicas()) == 3 + }, 30*time.Second, 1*time.Second) + + // test load collection with dynamic update + s.loadCollection(collectionName, dbName, 5, nil) + s.Eventually(func() bool { + resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp3.Status)) + return len(resp3.GetReplicas()) == 5 + }, 30*time.Second, 1*time.Second) + + s.releaseCollection(dbName, collectionName) +} + +func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_WithRGLackOfNode() { + ctx := context.Background() + s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{ + DBName: dbName, + Dim: dim, + CollectionName: collectionName, + ChannelNum: 1, + SegmentNum: 3, + RowNumPerSegment: 2000, + }) + + // load collection + s.loadCollection(collectionName, dbName, 1, nil) + resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp2.Status)) + s.Len(resp2.GetReplicas(), 1) + + recycleRG := "__recycle_rg" + s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{ + ResourceGroup: recycleRG, + Config: &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 0, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 100, + }, + }, + }) + // prepare resource groups + rgNum := 5 + rgs := make([]string, 0) + for i := 0; i < rgNum; i++ { + rgs = append(rgs, fmt.Sprintf("rg_%d", i)) + s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{ + ResourceGroup: rgs[i], + Config: &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + TransferFrom: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: recycleRG, + }, + }, + TransferTo: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: recycleRG, + }, + }, + }, + }) + } + + s.Cluster.QueryCoord.UpdateResourceGroups(ctx, &querypb.UpdateResourceGroupsRequest{ + ResourceGroups: map[string]*rgpb.ResourceGroupConfig{ + meta.DefaultResourceGroupName: { + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + }, + }, + }) + + resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + s.Len(resp.GetResourceGroups(), rgNum+2) + + // test load collection with dynamic update + s.loadCollection(collectionName, dbName, 3, rgs[:3]) + s.Eventually(func() bool { + resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp3.Status)) + return len(resp3.GetReplicas()) == 3 + }, 30*time.Second, 1*time.Second) + + s.loadCollection(collectionName, dbName, 2, rgs[3:]) + s.Eventually(func() bool { + resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp3.Status)) + return len(resp3.GetReplicas()) == 2 + }, 30*time.Second, 1*time.Second) + + // test load collection with dynamic update + s.loadCollection(collectionName, dbName, 5, rgs) + s.Eventually(func() bool { + resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp3.Status)) + return len(resp3.GetReplicas()) == 5 + }, 30*time.Second, 1*time.Second) + + // add qn back, expect each replica has shard leaders + for i := 0; i < rgNum; i++ { + s.Cluster.AddQueryNode() + } + + s.Eventually(func() bool { + matchCounter := 0 + for _, rg := range rgs { + resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{ + ResourceGroup: rg, + }) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + if len(resp1.ResourceGroup.Nodes) == 1 { + matchCounter += 1 + } + } + return matchCounter == rgNum + }, 30*time.Second, time.Second) + + s.Eventually(func() bool { + resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp3.Status)) + s.Equal(5, len(resp3.GetReplicas())) + for _, replica := range resp3.GetReplicas() { + if len(replica.GetNodeIds()) != 1 { + return false + } + } + return true + }, 30*time.Second, 1*time.Second) + s.releaseCollection(dbName, collectionName) }