diff --git a/tests/integration/minicluster_v2.go b/tests/integration/minicluster_v2.go index d2bd5e5c7c..8c6eaf266e 100644 --- a/tests/integration/minicluster_v2.go +++ b/tests/integration/minicluster_v2.go @@ -19,15 +19,22 @@ package integration import ( "context" "fmt" + "math" "net" "path" "sync" "time" "github.com/cockroachdb/errors" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" grpcdatacoord "github.com/milvus-io/milvus/internal/distributed/datacoord" @@ -95,6 +102,7 @@ type MiniClusterV2 struct { RootCoordClient types.RootCoordClient QueryCoordClient types.QueryCoordClient + MilvusClient milvuspb.MilvusServiceClient ProxyClient types.ProxyClient DataNodeClient types.DataNodeClient QueryNodeClient types.QueryNodeClient @@ -113,7 +121,8 @@ type MiniClusterV2 struct { dnid atomic.Int64 streamingnodes []*streamingnode.Server - Extension *ReportChanExtension + clientConn *grpc.ClientConn + Extension *ReportChanExtension } type OptionV2 func(cluster *MiniClusterV2) @@ -380,12 +389,50 @@ func (cluster *MiniClusterV2) Start() error { } } + port := params.ProxyGrpcServerCfg.Port.GetAsInt() + cluster.clientConn, err = grpc.DialContext(cluster.ctx, fmt.Sprintf("localhost:%d", port), getGrpcDialOpt()...) + if err != nil { + return err + } + + cluster.MilvusClient = milvuspb.NewMilvusServiceClient(cluster.clientConn) log.Info("minicluster started") return nil } +func getGrpcDialOpt() []grpc.DialOption { + return []grpc.DialOption{ + grpc.WithBlock(), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 5 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: true, + }), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: 100 * time.Millisecond, + Multiplier: 1.6, + Jitter: 0.2, + MaxDelay: 3 * time.Second, + }, + MinConnectTimeout: 3 * time.Second, + }), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithChainUnaryInterceptor(grpc_retry.UnaryClientInterceptor( + grpc_retry.WithMax(6), + grpc_retry.WithBackoff(func(attempt uint) time.Duration { + return 60 * time.Millisecond * time.Duration(math.Pow(3, float64(attempt))) + }), + grpc_retry.WithCodes(codes.Unavailable, codes.ResourceExhausted)), + ), + } +} + func (cluster *MiniClusterV2) Stop() error { log.Info("mini cluster stop") + if cluster.clientConn != nil { + cluster.clientConn.Close() + } cluster.RootCoord.Stop() log.Info("mini cluster rootCoord stopped") cluster.DataCoord.Stop() diff --git a/tests/integration/ratelimit/db_properties_test.go b/tests/integration/ratelimit/db_properties_test.go new file mode 100644 index 0000000000..e3e57cbe29 --- /dev/null +++ b/tests/integration/ratelimit/db_properties_test.go @@ -0,0 +1,221 @@ +/* + * Licensed to the LF AI & Data foundation under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ratelimit + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/suite" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" +) + +const dim = 768 + +type DBPropertiesSuite struct { + integration.MiniClusterSuite +} + +func (s *DBPropertiesSuite) prepareDatabase(ctx context.Context, dbName string, configKey string, configValue string) { + timeoutCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second) + defer cancelFunc() + + resp, err := s.Cluster.MilvusClient.CreateDatabase(timeoutCtx, &milvuspb.CreateDatabaseRequest{ + DbName: dbName, + Properties: []*commonpb.KeyValuePair{ + { + Key: configKey, + Value: configValue, + }, + }, + }) + s.NoError(merr.CheckRPCCall(resp, err)) + + resp2, err2 := s.Cluster.MilvusClient.DescribeDatabase(timeoutCtx, &milvuspb.DescribeDatabaseRequest{DbName: dbName}) + s.NoError(merr.CheckRPCCall(resp2, err2)) +} + +func (s *DBPropertiesSuite) prepareCollection(ctx context.Context, dbName string, collectionName string) { + timeoutCtx, cancelFunc := context.WithTimeout(ctx, 15*time.Second) + defer cancelFunc() + + schema := integration.ConstructSchemaOfVecDataType(collectionName, dim, true, schemapb.DataType_FloatVector) + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + createCollecctionResp, err := s.Cluster.MilvusClient.CreateCollection(timeoutCtx, &milvuspb.CreateCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + }) + s.NoError(merr.CheckRPCCall(createCollecctionResp, err)) + + describeCollectionResp, err := s.Cluster.MilvusClient.DescribeCollection(timeoutCtx, &milvuspb.DescribeCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(merr.CheckRPCCall(describeCollectionResp, err)) + + createIndexStatus, err := s.Cluster.MilvusClient.CreateIndex(timeoutCtx, &milvuspb.CreateIndexRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, metric.IP), + }) + s.NoError(err) + err = merr.Error(createIndexStatus) + if err != nil { + log.Warn("createIndexStatus fail reason", zap.Error(err)) + } + + s.WaitForIndexBuiltWithDB(timeoutCtx, dbName, collectionName, integration.FloatVecField) + log.Info("Create index done") + + // load + loadStatus, err := s.Cluster.MilvusClient.LoadCollection(timeoutCtx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + err = merr.Error(loadStatus) + if err != nil { + log.Warn("LoadCollection fail reason", zap.Error(err)) + } + s.WaitForLoadWithDB(ctx, dbName, collectionName) + log.Info("Load collection done") +} + +func (s *DBPropertiesSuite) insert(ctx context.Context, dbName string, collectionName string, + rowNum int, +) (*milvuspb.MutationResult, error) { + hashKeys := integration.GenerateHashKeys(rowNum) + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + timeoutCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second) + defer cancelFunc() + return s.Cluster.MilvusClient.Insert(timeoutCtx, &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(rowNum), + }) +} + +func (s *DBPropertiesSuite) search(ctx context.Context, dbName string, collectionName string) (*milvuspb.SearchResults, error) { + params := integration.GetSearchParams(integration.IndexFaissIvfFlat, metric.IP) + searchReq := integration.ConstructSearchRequest(dbName, collectionName, "", integration.FloatVecField, + schemapb.DataType_FloatVector, nil, metric.IP, params, 10, dim, 10, -1) + + timeoutCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second) + defer cancelFunc() + return s.Cluster.MilvusClient.Search(timeoutCtx, searchReq) +} + +func (s *DBPropertiesSuite) TestLimitWithDBSize() { + ctx := context.Background() + dbName := "db3" + s.prepareDatabase(ctx, dbName, common.DatabaseDiskQuotaKey, "1") + + collectionName := "Test" + funcutil.GenRandomStr() + s.prepareCollection(ctx, dbName, collectionName) + + resp, err := s.insert(ctx, dbName, collectionName, 1000) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + + timeoutCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second) + defer cancelFunc() + flushResp, err := s.Cluster.MilvusClient.Flush(timeoutCtx, &milvuspb.FlushRequest{DbName: dbName, CollectionNames: []string{collectionName}}) + s.NoError(err) + s.True(merr.Ok(flushResp.GetStatus())) + + waitForNextTick() + resp, err = s.insert(ctx, dbName, collectionName, 1000) + s.NoError(err) + fmt.Println("TestLimitWithDBSize insert response:", resp) + s.True(merr.ErrServiceQuotaExceeded.Is(merr.Error(resp.GetStatus()))) +} + +func (s *DBPropertiesSuite) TestDenyReadingDB() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dbName := "db2" + s.prepareDatabase(ctx, dbName, common.DatabaseForceDenyReadingKey, "true") + + collectionName := "Test" + funcutil.GenRandomStr() + s.prepareCollection(ctx, dbName, collectionName) + + waitForNextTick() + resp, err := s.search(ctx, dbName, collectionName) + s.NoError(err) + fmt.Println("TestDenyReadingDB search response:", resp) + s.True(merr.ErrServiceQuotaExceeded.Is(merr.Error(resp.GetStatus()))) +} + +func (s *DBPropertiesSuite) TestDenyWringDB() { + ctx := context.Background() + dbName := "db1" + s.prepareDatabase(ctx, dbName, common.DatabaseForceDenyWritingKey, "true") + + collectionName := "Test" + funcutil.GenRandomStr() + s.prepareCollection(ctx, dbName, collectionName) + + waitForNextTick() + resp, err := s.insert(ctx, dbName, collectionName, 100) + s.NoError(err) + fmt.Println("TestDenyWringDB insert response:", resp) + s.True(merr.ErrServiceQuotaExceeded.Is(merr.Error(resp.GetStatus()))) +} + +func (s *DBPropertiesSuite) SetupSuite() { + paramtable.Init() + paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaCenterCollectInterval.Key, "1") + s.MiniClusterSuite.SetupSuite() +} + +func (s *DBPropertiesSuite) TearDownSuite() { + paramtable.Get().Reset(paramtable.Get().QuotaConfig.QuotaCenterCollectInterval.Key) + s.MiniClusterSuite.TearDownSuite() +} + +func TestLimitWithDBProperties(t *testing.T) { + suite.Run(t, new(DBPropertiesSuite)) +} + +// wait for next tick of quota center +func waitForNextTick() { + interval := paramtable.Get().QuotaConfig.QuotaCenterCollectInterval.GetAsDuration(time.Second) + time.Sleep(interval * 2) +}