diff --git a/go.mod b/go.mod index 6481cbd0e4..cb02c55951 100644 --- a/go.mod +++ b/go.mod @@ -18,12 +18,12 @@ require ( github.com/gin-gonic/gin v1.9.1 github.com/go-playground/validator/v10 v10.14.0 github.com/gofrs/flock v0.8.1 - github.com/golang/protobuf v1.5.4 + github.com/golang/protobuf v1.5.4 // indirect github.com/google/btree v1.1.2 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.9 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250305065753-10afe827b61e + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250319042203-5e03a870569a github.com/minio/minio-go/v7 v7.0.73 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index e32caaf632..51d43fb6cd 100644 --- a/go.sum +++ b/go.sum @@ -734,8 +734,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250305065753-10afe827b61e h1:3wuhvb3a1Oq1NRPJpCpatKxfPR8XCdpZmRAgkF2u4Sg= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250305065753-10afe827b61e/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250319042203-5e03a870569a h1:UR+ueSDgg+Atix9QH35e7EwYp8wKm/Ncv1DcCTcUuXk= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250319042203-5e03a870569a/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/proxy/simple_rate_limiter.go b/internal/proxy/simple_rate_limiter.go index 6e4d62c65d..de1c45129e 100644 --- a/internal/proxy/simple_rate_limiter.go +++ b/internal/proxy/simple_rate_limiter.go @@ -69,6 +69,9 @@ func (m *SimpleLimiter) Check(dbID int64, collectionIDToPartIDs map[int64][]int6 if !Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() { return nil } + if n <= 0 { + return nil + } m.quotaStatesMu.RLock() defer m.quotaStatesMu.RUnlock() diff --git a/internal/proxy/simple_rate_limiter_test.go b/internal/proxy/simple_rate_limiter_test.go index 3d6329ba8a..cb40b692b0 100644 --- a/internal/proxy/simple_rate_limiter_test.go +++ b/internal/proxy/simple_rate_limiter_test.go @@ -398,7 +398,7 @@ func TestRateLimiter(t *testing.T) { err := simpleLimiter.Check(-1, nil, internalpb.RateType_DDLDB, 1) assert.NoError(t, err) - err = simpleLimiter.Check(-1, nil, internalpb.RateType_DDLDB, 1) + err = simpleLimiter.Check(-1, nil, internalpb.RateType_DDLDB, 0) assert.NoError(t, err) }) } diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index bb8e84cda7..8aabb63a5b 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -112,6 +112,7 @@ var dqlRateTypes = typeutil.NewSet( type LimiterRange struct { RateScope internalpb.RateScope OpType opType + IncludeRateTypes typeutil.Set[internalpb.RateType] ExcludeRateTypes typeutil.Set[internalpb.RateType] } @@ -235,8 +236,14 @@ func updateLimiter(node *rlinternal.RateLimiterNode, limiter *ratelimitutil.Limi return } limiters := node.GetLimiters() - getRateTypes(limiterRange.RateScope, limiterRange.OpType). - Complement(limiterRange.ExcludeRateTypes).Range(func(rt internalpb.RateType) bool { + rateTypes := getRateTypes(limiterRange.RateScope, limiterRange.OpType) + if limiterRange.IncludeRateTypes.Len() > 0 { + rateTypes = rateTypes.Intersection(limiterRange.IncludeRateTypes) + } + if limiterRange.ExcludeRateTypes.Len() > 0 { + rateTypes = rateTypes.Complement(limiterRange.ExcludeRateTypes) + } + rateTypes.Range(func(rt internalpb.RateType) bool { originLimiter, ok := limiters.Get(rt) if !ok { log.Warn("update limiter failed, limiter not found", @@ -557,6 +564,54 @@ func (q *QuotaCenter) collectMetrics() error { return nil } +func getDbPropertyWithAction(db *model.Database, property string, actionFunc func(bool)) { + if db == nil || property == "" || actionFunc == nil { + return + } + if v := db.GetProperty(property); v != "" { + if dbForceDenyDDLEnabled, err := strconv.ParseBool(v); err == nil { + actionFunc(dbForceDenyDDLEnabled) + } else { + log.Warn("invalid configuration for database force deny DDL", + zap.String("config item", property), + zap.String("config value", v)) + } + } +} + +func (q *QuotaCenter) calculateDBDDLRates() { + dbs, err := q.meta.ListDatabases(q.ctx, typeutil.MaxTimestamp) + if err != nil { + log.Warn("get databases failed", zap.Error(err)) + return + } + for _, db := range dbs { + dbDDLKeysWithRatesType := map[string]typeutil.Set[internalpb.RateType]{ + common.DatabaseForceDenyDDLKey: ddlRateTypes, + common.DatabaseForceDenyCollectionDDLKey: typeutil.NewSet(internalpb.RateType_DDLCollection), + common.DatabaseForceDenyPartitionDDLKey: typeutil.NewSet(internalpb.RateType_DDLPartition), + common.DatabaseForceDenyIndexDDLKey: typeutil.NewSet(internalpb.RateType_DDLIndex), + common.DatabaseForceDenyFlushDDLKey: typeutil.NewSet(internalpb.RateType_DDLFlush), + common.DatabaseForceDenyCompactionDDLKey: typeutil.NewSet(internalpb.RateType_DDLCompaction), + } + + for ddlKey, rateTypes := range dbDDLKeysWithRatesType { + getDbPropertyWithAction(db, ddlKey, func(enabled bool) { + if enabled { + dbLimiters := q.rateLimiter.GetOrCreateDatabaseLimiters(db.ID, + newParamLimiterFunc(internalpb.RateScope_Database, allOps)) + updateLimiter(dbLimiters, GetEarliestLimiter(), &LimiterRange{ + RateScope: internalpb.RateScope_Database, + OpType: ddl, + IncludeRateTypes: rateTypes, + }) + dbLimiters.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToDDL, commonpb.ErrorCode_ForceDeny) + } + }) + } + } +} + // forceDenyWriting sets dml rates to 0 to reject all dml requests. func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster bool, dbIDs, collectionIDs []int64, col2partitionIDs map[int64][]int64) error { log := log.Ctx(context.TODO()).WithRateGroup("quotaCenter.forceDenyWriting", 1.0, 60.0) @@ -1176,6 +1231,8 @@ func (q *QuotaCenter) calculateRates() error { return err } + q.calculateDBDDLRates() + // log.Debug("QuotaCenter calculates rate done", zap.Any("rates", q.currentRates)) return nil } diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index c9d21c2040..1030b25de5 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -462,6 +462,7 @@ func TestQuotaCenter(t *testing.T) { qc := mocks.NewMockQueryCoordClient(t) meta := mockrootcoord.NewIMetaTable(t) meta.EXPECT().GetCollectionByIDWithMaxTs(mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return([]*model.Database{}, nil).Maybe() quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta) quotaCenter.clearMetrics() err = quotaCenter.calculateRates() @@ -1798,3 +1799,146 @@ func TestTORequestLimiter(t *testing.T) { assert.Equal(t, 1, len(proxyLimit.Codes)) assert.Equal(t, commonpb.ErrorCode_ForceDeny, proxyLimit.Codes[0]) } + +func TestDatabaseForceDenyDDL(t *testing.T) { + getQuotaCenter := func() (*QuotaCenter, *mockrootcoord.IMetaTable) { + ctx := context.Background() + qc := mocks.NewMockQueryCoordClient(t) + meta := mockrootcoord.NewIMetaTable(t) + pcm := proxyutil.NewMockProxyClientManager(t) + dc := mocks.NewMockDataCoordClient(t) + core, _ := NewCore(ctx, nil) + core.tsoAllocator = newMockTsoAllocator() + + quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta) + return quotaCenter, meta + } + + t.Run("fail to list database", func(t *testing.T) { + quotaCenter, meta := getQuotaCenter() + meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(nil, errors.New("mock error")).Once() + quotaCenter.calculateDBDDLRates() + }) + + t.Run("force deny ddl for database", func(t *testing.T) { + quotaCenter, meta := getQuotaCenter() + meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return([]*model.Database{ + { + ID: 1, Name: "db1", Properties: []*commonpb.KeyValuePair{ + { + Key: common.DatabaseForceDenyDDLKey, + Value: "true", + }, + }, + }, + { + ID: 2, Name: "db2", Properties: []*commonpb.KeyValuePair{ + { + Key: "aaa", + Value: "true", + }, + }, + }, + { + ID: 3, Name: "db3", Properties: []*commonpb.KeyValuePair{ + { + Key: common.DatabaseForceDenyDDLKey, + Value: "100", + }, + }, + }, + }, nil).Once() + quotaCenter.calculateDBDDLRates() + + limiters := quotaCenter.rateLimiter.GetDatabaseLimiters(1) + assert.Equal(t, 1, limiters.GetQuotaStates().Len()) + assert.True(t, limiters.GetQuotaStates().Contain(milvuspb.QuotaState_DenyToDDL)) + + { + limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLCollection) + assert.Equal(t, true, ok) + assert.EqualValues(t, 0.0, limiter.Limit()) + } + { + limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLPartition) + assert.Equal(t, true, ok) + assert.EqualValues(t, 0.0, limiter.Limit()) + } + { + limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLIndex) + assert.Equal(t, true, ok) + assert.EqualValues(t, 0.0, limiter.Limit()) + } + { + limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLCompaction) + assert.Equal(t, true, ok) + assert.EqualValues(t, 0.0, limiter.Limit()) + } + { + limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLFlush) + assert.Equal(t, true, ok) + assert.EqualValues(t, 0.0, limiter.Limit()) + } + }) + + t.Run("force deny detail ddl for database", func(t *testing.T) { + quotaCenter, meta := getQuotaCenter() + meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return([]*model.Database{ + { + ID: 1, Name: "foo123", Properties: []*commonpb.KeyValuePair{ + { + Key: common.DatabaseForceDenyCollectionDDLKey, + Value: "true", + }, + { + Key: common.DatabaseForceDenyPartitionDDLKey, + Value: "true", + }, + { + Key: common.DatabaseForceDenyFlushDDLKey, + Value: "true", + }, + { + Key: common.DatabaseForceDenyCompactionDDLKey, + Value: "true", + }, + { + Key: common.DatabaseForceDenyIndexDDLKey, + Value: "true", + }, + }, + }, + }, nil).Once() + quotaCenter.calculateDBDDLRates() + + limiters := quotaCenter.rateLimiter.GetDatabaseLimiters(1) + assert.Equal(t, 1, limiters.GetQuotaStates().Len()) + assert.True(t, limiters.GetQuotaStates().Contain(milvuspb.QuotaState_DenyToDDL)) + + { + limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLCollection) + assert.Equal(t, true, ok) + assert.EqualValues(t, 0.0, limiter.Limit()) + } + { + limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLPartition) + assert.Equal(t, true, ok) + assert.EqualValues(t, 0.0, limiter.Limit()) + } + { + limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLIndex) + assert.Equal(t, true, ok) + assert.EqualValues(t, 0.0, limiter.Limit()) + } + { + limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLCompaction) + assert.Equal(t, true, ok) + assert.EqualValues(t, 0.0, limiter.Limit()) + } + { + limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLFlush) + assert.Equal(t, true, ok) + assert.EqualValues(t, 0.0, limiter.Limit()) + } + }) +} diff --git a/internal/util/ratelimitutil/rate_limiter_tree.go b/internal/util/ratelimitutil/rate_limiter_tree.go index 686bf268ce..a34e18c0b6 100644 --- a/internal/util/ratelimitutil/rate_limiter_tree.go +++ b/internal/util/ratelimitutil/rate_limiter_tree.go @@ -97,6 +97,11 @@ func (rln *RateLimiterNode) GetQuotaExceededError(rt internalpb.RateType) error if errCode, ok := rln.quotaStates.Get(milvuspb.QuotaState_DenyToRead); ok { return merr.WrapErrServiceQuotaExceeded(ratelimitutil.GetQuotaErrorString(errCode)) } + case internalpb.RateType_DDLCollection, internalpb.RateType_DDLPartition, + internalpb.RateType_DDLIndex, internalpb.RateType_DDLCompaction, internalpb.RateType_DDLFlush: + if errCode, ok := rln.quotaStates.Get(milvuspb.QuotaState_DenyToDDL); ok { + return merr.WrapErrServiceQuotaExceeded(ratelimitutil.GetQuotaErrorString(errCode)) + } } return merr.WrapErrServiceQuotaExceeded(fmt.Sprintf("rate type: %s", rt.String())) } diff --git a/internal/util/ratelimitutil/rate_limiter_tree_test.go b/internal/util/ratelimitutil/rate_limiter_tree_test.go index 6a22d405ac..7adedf68dc 100644 --- a/internal/util/ratelimitutil/rate_limiter_tree_test.go +++ b/internal/util/ratelimitutil/rate_limiter_tree_test.go @@ -143,6 +143,15 @@ func TestRateLimiterNodeGetQuotaExceededError(t *testing.T) { assert.True(t, strings.Contains(err.Error(), "disabled")) }) + t.Run("ddl", func(t *testing.T) { + limitNode := NewRateLimiterNode(internalpb.RateScope_Database) + limitNode.quotaStates.Insert(milvuspb.QuotaState_DenyToDDL, commonpb.ErrorCode_ForceDeny) + err := limitNode.GetQuotaExceededError(internalpb.RateType_DDLCollection) + assert.True(t, errors.Is(err, merr.ErrServiceQuotaExceeded)) + // reference: ratelimitutil.GetQuotaErrorString(errCode) + assert.True(t, strings.Contains(err.Error(), "disabled")) + }) + t.Run("unknown", func(t *testing.T) { limitNode := NewRateLimiterNode(internalpb.RateScope_Cluster) err := limitNode.GetQuotaExceededError(internalpb.RateType_DDLCompaction) diff --git a/pkg/common/common.go b/pkg/common/common.go index e782345f6a..1edac76b63 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -187,6 +187,13 @@ const ( DatabaseForceDenyWritingKey = "database.force.deny.writing" DatabaseForceDenyReadingKey = "database.force.deny.reading" + DatabaseForceDenyDDLKey = "database.force.deny.ddl" // all ddl + DatabaseForceDenyCollectionDDLKey = "database.force.deny.collectionDDL" + DatabaseForceDenyPartitionDDLKey = "database.force.deny.partitionDDL" + DatabaseForceDenyIndexDDLKey = "database.force.deny.index" + DatabaseForceDenyFlushDDLKey = "database.force.deny.flush" + DatabaseForceDenyCompactionDDLKey = "database.force.deny.compaction" + // collection level load properties CollectionReplicaNumber = "collection.replica.number" CollectionResourceGroups = "collection.resource_groups" diff --git a/pkg/go.mod b/pkg/go.mod index 69fcba3528..6aa11651b6 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -20,7 +20,7 @@ require ( github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.9 - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250305065753-10afe827b61e + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250319042203-5e03a870569a github.com/minio/minio-go/v7 v7.0.73 github.com/nats-io/nats-server/v2 v2.10.12 github.com/nats-io/nats.go v1.34.1 diff --git a/pkg/go.sum b/pkg/go.sum index f4c04c5a51..c06f07a40f 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -551,8 +551,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250305065753-10afe827b61e h1:3wuhvb3a1Oq1NRPJpCpatKxfPR8XCdpZmRAgkF2u4Sg= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250305065753-10afe827b61e/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250319042203-5e03a870569a h1:UR+ueSDgg+Atix9QH35e7EwYp8wKm/Ncv1DcCTcUuXk= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250319042203-5e03a870569a/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=