diff --git a/internal/proxy/multi_rate_limiter.go b/internal/proxy/multi_rate_limiter.go index 4e4ba74a5b..beb694bd75 100644 --- a/internal/proxy/multi_rate_limiter.go +++ b/internal/proxy/multi_rate_limiter.go @@ -39,9 +39,9 @@ import ( ) var QuotaErrorString = map[commonpb.ErrorCode]string{ - commonpb.ErrorCode_ForceDeny: "manually force deny", - commonpb.ErrorCode_MemoryQuotaExhausted: "memory quota exhausted, please allocate more resources", - commonpb.ErrorCode_DiskQuotaExhausted: "disk quota exhausted, please allocate more resources", + commonpb.ErrorCode_ForceDeny: "the writing has been deactivated by the administrator", + commonpb.ErrorCode_MemoryQuotaExhausted: "memory quota exceeded, please allocate more resources", + commonpb.ErrorCode_DiskQuotaExhausted: "disk quota exceeded, please allocate more resources", commonpb.ErrorCode_TimeTickLongDelay: "time tick long delay", } @@ -84,10 +84,10 @@ func (m *MultiRateLimiter) Check(collectionID int64, rt internalpb.RateType, n i limit, rate := limiter.limit(rt, n) if rate == 0 { - return limiter.getError(rt) + return limiter.getQuotaExceededError(rt) } if limit { - return merr.WrapErrServiceRateLimit(rate) + return limiter.getRateLimitError(rate) } return nil } @@ -238,20 +238,24 @@ func (rl *rateLimiter) setRates(collectionRate *proxypb.CollectionRate) error { return nil } -func (rl *rateLimiter) getError(rt internalpb.RateType) error { +func (rl *rateLimiter) getQuotaExceededError(rt internalpb.RateType) error { switch rt { case internalpb.RateType_DMLInsert, internalpb.RateType_DMLUpsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad: if errCode, ok := rl.quotaStates.Get(milvuspb.QuotaState_DenyToWrite); ok { - return merr.OldCodeToMerr(errCode) + return merr.WrapErrServiceQuotaExceeded(GetQuotaErrorString(errCode)) } case internalpb.RateType_DQLSearch, internalpb.RateType_DQLQuery: if errCode, ok := rl.quotaStates.Get(milvuspb.QuotaState_DenyToRead); ok { - return merr.OldCodeToMerr(errCode) + return merr.WrapErrServiceQuotaExceeded(GetQuotaErrorString(errCode)) } } return nil } +func (rl *rateLimiter) getRateLimitError(rate float64) error { + return merr.WrapErrServiceRateLimit(rate, "request is rejected by grpc RateLimiter middleware, please retry later") +} + // setRateGaugeByRateType sets ProxyLimiterRate metrics. func setRateGaugeByRateType(rateType internalpb.RateType, nodeID int64, collectionID int64, rate float64) { if ratelimitutil.Limit(rate) == ratelimitutil.Inf { diff --git a/internal/proxy/multi_rate_limiter_test.go b/internal/proxy/multi_rate_limiter_test.go index db80be17ca..fee4dc2092 100644 --- a/internal/proxy/multi_rate_limiter_test.go +++ b/internal/proxy/multi_rate_limiter_test.go @@ -283,8 +283,8 @@ func TestRateLimiter(t *testing.T) { }, }) assert.NoError(t, err) - assert.ErrorIs(t, limiter.getError(internalpb.RateType_DQLQuery), merr.ErrServiceForceDeny) - assert.Equal(t, limiter.getError(internalpb.RateType_DMLInsert), merr.ErrServiceDiskLimitExceeded) + assert.Error(t, limiter.getQuotaExceededError(internalpb.RateType_DQLQuery)) + assert.Error(t, limiter.getQuotaExceededError(internalpb.RateType_DMLInsert)) }) t.Run("tests refresh rate by config", func(t *testing.T) { diff --git a/internal/proxy/rate_limit_interceptor.go b/internal/proxy/rate_limit_interceptor.go index 30289726ce..218ccea094 100644 --- a/internal/proxy/rate_limit_interceptor.go +++ b/internal/proxy/rate_limit_interceptor.go @@ -21,7 +21,6 @@ import ( "fmt" "reflect" - "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" "google.golang.org/grpc" @@ -41,7 +40,7 @@ func RateLimitInterceptor(limiter types.Limiter) grpc.UnaryServerInterceptor { err = limiter.Check(collectionID, rt, n) if err != nil { - rsp := getFailedResponse(req, rt, err, info.FullMethod) + rsp := getFailedResponse(req, err) if rsp != nil { return rsp, nil } @@ -121,26 +120,8 @@ func failedMutationResult(err error) *milvuspb.MutationResult { } } -func wrapQuotaError(rt internalpb.RateType, err error, fullMethod string) error { - if errors.Is(err, merr.ErrServiceRateLimit) { - return errors.Wrapf(err, "request %s is rejected by grpc RateLimiter middleware, please retry later", fullMethod) - } - - // deny to write/read - var op string - switch rt { - case internalpb.RateType_DMLInsert, internalpb.RateType_DMLUpsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad: - op = "write" - case internalpb.RateType_DQLSearch, internalpb.RateType_DQLQuery: - op = "read" - } - - return merr.WrapErrServiceForceDeny(op, err, fullMethod) -} - // getFailedResponse returns failed response. -func getFailedResponse(req any, rt internalpb.RateType, err error, fullMethod string) any { - err = wrapQuotaError(rt, err, fullMethod) +func getFailedResponse(req any, err error) any { switch req.(type) { case *milvuspb.InsertRequest, *milvuspb.DeleteRequest, *milvuspb.UpsertRequest: return failedMutationResult(err) diff --git a/internal/proxy/rate_limit_interceptor_test.go b/internal/proxy/rate_limit_interceptor_test.go index 4300b9c0b6..45cfe64042 100644 --- a/internal/proxy/rate_limit_interceptor_test.go +++ b/internal/proxy/rate_limit_interceptor_test.go @@ -40,7 +40,7 @@ type limiterMock struct { func (l *limiterMock) Check(collection int64, rt internalpb.RateType, n int) error { if l.rate == 0 { - return merr.ErrServiceForceDeny + return merr.ErrServiceQuotaExceeded } if l.limit { return merr.ErrServiceRateLimit @@ -167,23 +167,23 @@ func TestRateLimitInterceptor(t *testing.T) { t.Run("test getFailedResponse", func(t *testing.T) { testGetFailedResponse := func(req interface{}, rt internalpb.RateType, err error, fullMethod string) { - rsp := getFailedResponse(req, rt, err, fullMethod) + rsp := getFailedResponse(req, err) assert.NotNil(t, rsp) } - testGetFailedResponse(&milvuspb.DeleteRequest{}, internalpb.RateType_DMLDelete, merr.ErrServiceForceDeny, "delete") - testGetFailedResponse(&milvuspb.UpsertRequest{}, internalpb.RateType_DMLUpsert, merr.ErrServiceForceDeny, "upsert") + testGetFailedResponse(&milvuspb.DeleteRequest{}, internalpb.RateType_DMLDelete, merr.ErrServiceQuotaExceeded, "delete") + testGetFailedResponse(&milvuspb.UpsertRequest{}, internalpb.RateType_DMLUpsert, merr.ErrServiceQuotaExceeded, "upsert") testGetFailedResponse(&milvuspb.ImportRequest{}, internalpb.RateType_DMLBulkLoad, merr.ErrServiceMemoryLimitExceeded, "import") testGetFailedResponse(&milvuspb.SearchRequest{}, internalpb.RateType_DQLSearch, merr.ErrServiceDiskLimitExceeded, "search") - testGetFailedResponse(&milvuspb.QueryRequest{}, internalpb.RateType_DQLQuery, merr.ErrServiceForceDeny, "query") + testGetFailedResponse(&milvuspb.QueryRequest{}, internalpb.RateType_DQLQuery, merr.ErrServiceQuotaExceeded, "query") testGetFailedResponse(&milvuspb.CreateCollectionRequest{}, internalpb.RateType_DDLCollection, merr.ErrServiceRateLimit, "createCollection") testGetFailedResponse(&milvuspb.FlushRequest{}, internalpb.RateType_DDLFlush, merr.ErrServiceRateLimit, "flush") testGetFailedResponse(&milvuspb.ManualCompactionRequest{}, internalpb.RateType_DDLCompaction, merr.ErrServiceRateLimit, "compaction") // test illegal - rsp := getFailedResponse(&milvuspb.SearchResults{}, internalpb.RateType_DQLSearch, merr.OldCodeToMerr(commonpb.ErrorCode_UnexpectedError), "method") + rsp := getFailedResponse(&milvuspb.SearchResults{}, merr.OldCodeToMerr(commonpb.ErrorCode_UnexpectedError)) assert.Nil(t, rsp) - rsp = getFailedResponse(nil, internalpb.RateType_DQLSearch, merr.OldCodeToMerr(commonpb.ErrorCode_UnexpectedError), "method") + rsp = getFailedResponse(nil, merr.OldCodeToMerr(commonpb.ErrorCode_UnexpectedError)) assert.Nil(t, rsp) }) diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 73577988cf..d83358ea95 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -40,8 +40,9 @@ var ( ErrServiceCrossClusterRouting = newMilvusError("cross cluster routing", 6, false) ErrServiceDiskLimitExceeded = newMilvusError("disk limit exceeded", 7, false) ErrServiceRateLimit = newMilvusError("rate limit exceeded", 8, true) - ErrServiceForceDeny = newMilvusError("force deny", 9, false) + ErrServiceQuotaExceeded = newMilvusError("quota exceeded", 9, false) ErrServiceUnimplemented = newMilvusError("service unimplemented", 10, false) + ErrServiceTimeTickLongDelay = newMilvusError("time tick long delay", 11, false) // Collection related ErrCollectionNotFound = newMilvusError("collection not found", 100, false) diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index 5ab4a0c73b..546f1e9b45 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -149,7 +149,7 @@ func (s *ErrSuite) TestOldCode() { s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_MemoryQuotaExhausted), ErrServiceMemoryLimitExceeded) s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_DiskQuotaExhausted), ErrServiceDiskLimitExceeded) s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_RateLimit), ErrServiceRateLimit) - s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_ForceDeny), ErrServiceForceDeny) + s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_ForceDeny), ErrServiceQuotaExceeded) s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_UnexpectedError), errUnexpected) } diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index fa0ac4f052..b093cf1291 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -153,10 +153,16 @@ func oldCode(code int32) commonpb.ErrorCode { case ErrServiceMemoryLimitExceeded.code(): return commonpb.ErrorCode_InsufficientMemoryToLoad + case ErrServiceDiskLimitExceeded.code(): + return commonpb.ErrorCode_DiskQuotaExhausted + + case ErrServiceTimeTickLongDelay.code(): + return commonpb.ErrorCode_TimeTickLongDelay + case ErrServiceRateLimit.code(): return commonpb.ErrorCode_RateLimit - case ErrServiceForceDeny.code(): + case ErrServiceQuotaExceeded.code(): return commonpb.ErrorCode_ForceDeny case ErrIndexNotFound.code(): @@ -193,11 +199,14 @@ func OldCodeToMerr(code commonpb.ErrorCode) error { case commonpb.ErrorCode_DiskQuotaExhausted: return ErrServiceDiskLimitExceeded + case commonpb.ErrorCode_TimeTickLongDelay: + return ErrServiceTimeTickLongDelay + case commonpb.ErrorCode_RateLimit: return ErrServiceRateLimit case commonpb.ErrorCode_ForceDeny: - return ErrServiceForceDeny + return ErrServiceQuotaExceeded case commonpb.ErrorCode_IndexNotExist: return ErrIndexNotFound @@ -358,16 +367,20 @@ func WrapErrServiceDiskLimitExceeded(predict, limit float32, msg ...string) erro return err } -func WrapErrServiceRateLimit(rate float64) error { - return wrapFields(ErrServiceRateLimit, value("rate", rate)) +func WrapErrServiceRateLimit(rate float64, msg ...string) error { + err := wrapFields(ErrServiceRateLimit, value("rate", rate)) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err } -func WrapErrServiceForceDeny(op string, reason error, method string) error { - return wrapFieldsWithDesc(ErrServiceForceDeny, - reason.Error(), - value("op", op), - value("req", method), - ) +func WrapErrServiceQuotaExceeded(reason string, msg ...string) error { + err := wrapFields(ErrServiceQuotaExceeded, value("reason", reason)) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err } func WrapErrServiceUnimplemented(grpcErr error) error {