diff --git a/go.mod b/go.mod index 4bc4191509..f2dfc5e92e 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/klauspost/compress v1.14.2 github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api v0.0.0-20221214030318-aadb4b6b9651 + github.com/milvus-io/milvus-proto/go-api v0.0.0-20221226093525-ce18c3347db0 github.com/minio/minio-go/v7 v7.0.17 github.com/opentracing/opentracing-go v1.2.0 github.com/panjf2000/ants/v2 v2.4.8 diff --git a/go.sum b/go.sum index aecfbe6851..64f4273fc7 100644 --- a/go.sum +++ b/go.sum @@ -488,8 +488,10 @@ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyex github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api v0.0.0-20221214030318-aadb4b6b9651 h1:lXwp7St1mNKatOnl2mt6TU3QRpMTf75liXqTGmTkjis= -github.com/milvus-io/milvus-proto/go-api v0.0.0-20221214030318-aadb4b6b9651/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20221213131318-537b49f7c0aa h1:ok2ZT20iWlDqXWBzgVpbYev4tsOKvqUXPIJ1EUaQdEg= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20221213131318-537b49f7c0aa/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20221226093525-ce18c3347db0 h1:GSiYfmb/CgWCdTKHzI0zl0L1xTr9/kaM6wr1O882lYc= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20221226093525-ce18c3347db0/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= github.com/milvus-io/pulsar-client-go v0.6.8 h1:fZdZH73aPRszu2fazyeeahQEz34tyn1Pt9EkqJmV100= github.com/milvus-io/pulsar-client-go v0.6.8/go.mod h1:oFIlYIk23tamkSLttw849qphmMIpHY8ztEBWDWJW+sc= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/core/src/pb/common.pb.cc b/internal/core/src/pb/common.pb.cc index a2125d7fde..19b3e37e1a 100644 --- a/internal/core/src/pb/common.pb.cc +++ b/internal/core/src/pb/common.pb.cc @@ -363,7 +363,7 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE( ".ObjectType\022>\n\020object_privilege\030\002 \001(\0162$." "milvus.proto.common.ObjectPrivilege\022\031\n\021o" "bject_name_index\030\003 \001(\005\022\032\n\022object_name_in" - "dexs\030\004 \001(\005*\226\t\n\tErrorCode\022\013\n\007Success\020\000\022\023\n" + "dexs\030\004 \001(\005*\254\t\n\tErrorCode\022\013\n\007Success\020\000\022\023\n" "\017UnexpectedError\020\001\022\021\n\rConnectFailed\020\002\022\024\n" "\020PermissionDenied\020\003\022\027\n\023CollectionNotExis" "ts\020\004\022\023\n\017IllegalArgument\020\005\022\024\n\020IllegalDime" @@ -391,94 +391,95 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE( "cheFailure\020+\022\025\n\021ListPolicyFailure\020,\022\022\n\016N" "otShardLeader\020-\022\026\n\022NoReplicaAvailable\020.\022" "\023\n\017SegmentNotFound\020/\022\r\n\tForceDeny\0200\022\r\n\tR" - "ateLimit\0201\022\022\n\016NodeIDNotMatch\0202\022\017\n\013DataCo" - "ordNA\020d\022\022\n\rDDRequestRace\020\350\007*c\n\nIndexStat" - "e\022\022\n\016IndexStateNone\020\000\022\014\n\010Unissued\020\001\022\016\n\nI" - "nProgress\020\002\022\014\n\010Finished\020\003\022\n\n\006Failed\020\004\022\t\n" - "\005Retry\020\005*\202\001\n\014SegmentState\022\024\n\020SegmentStat" - "eNone\020\000\022\014\n\010NotExist\020\001\022\013\n\007Growing\020\002\022\n\n\006Se" - "aled\020\003\022\013\n\007Flushed\020\004\022\014\n\010Flushing\020\005\022\013\n\007Dro" - "pped\020\006\022\r\n\tImporting\020\007*>\n\017PlaceholderType" - "\022\010\n\004None\020\000\022\020\n\014BinaryVector\020d\022\017\n\013FloatVec" - "tor\020e*\215\r\n\007MsgType\022\r\n\tUndefined\020\000\022\024\n\020Crea" - "teCollection\020d\022\022\n\016DropCollection\020e\022\021\n\rHa" - "sCollection\020f\022\026\n\022DescribeCollection\020g\022\023\n" - "\017ShowCollections\020h\022\024\n\020GetSystemConfigs\020i" - "\022\022\n\016LoadCollection\020j\022\025\n\021ReleaseCollectio" - "n\020k\022\017\n\013CreateAlias\020l\022\r\n\tDropAlias\020m\022\016\n\nA" - "lterAlias\020n\022\023\n\017AlterCollection\020o\022\024\n\017Crea" - "tePartition\020\310\001\022\022\n\rDropPartition\020\311\001\022\021\n\014Ha" - "sPartition\020\312\001\022\026\n\021DescribePartition\020\313\001\022\023\n" - "\016ShowPartitions\020\314\001\022\023\n\016LoadPartitions\020\315\001\022" - "\026\n\021ReleasePartitions\020\316\001\022\021\n\014ShowSegments\020" - "\372\001\022\024\n\017DescribeSegment\020\373\001\022\021\n\014LoadSegments" - "\020\374\001\022\024\n\017ReleaseSegments\020\375\001\022\024\n\017HandoffSegm" - "ents\020\376\001\022\030\n\023LoadBalanceSegments\020\377\001\022\025\n\020Des" - "cribeSegments\020\200\002\022\020\n\013CreateIndex\020\254\002\022\022\n\rDe" - "scribeIndex\020\255\002\022\016\n\tDropIndex\020\256\002\022\013\n\006Insert" - "\020\220\003\022\013\n\006Delete\020\221\003\022\n\n\005Flush\020\222\003\022\027\n\022ResendSe" - "gmentStats\020\223\003\022\013\n\006Search\020\364\003\022\021\n\014SearchResu" - "lt\020\365\003\022\022\n\rGetIndexState\020\366\003\022\032\n\025GetIndexBui" - "ldProgress\020\367\003\022\034\n\027GetCollectionStatistics" - "\020\370\003\022\033\n\026GetPartitionStatistics\020\371\003\022\r\n\010Retr" - "ieve\020\372\003\022\023\n\016RetrieveResult\020\373\003\022\024\n\017WatchDmC" - "hannels\020\374\003\022\025\n\020RemoveDmChannels\020\375\003\022\027\n\022Wat" - "chQueryChannels\020\376\003\022\030\n\023RemoveQueryChannel" - "s\020\377\003\022\035\n\030SealedSegmentsChangeInfo\020\200\004\022\027\n\022W" - "atchDeltaChannels\020\201\004\022\024\n\017GetShardLeaders\020" - "\202\004\022\020\n\013GetReplicas\020\203\004\022\023\n\016UnsubDmChannel\020\204" - "\004\022\024\n\017GetDistribution\020\205\004\022\025\n\020SyncDistribut" - "ion\020\206\004\022\020\n\013SegmentInfo\020\330\004\022\017\n\nSystemInfo\020\331" - "\004\022\024\n\017GetRecoveryInfo\020\332\004\022\024\n\017GetSegmentSta" - "te\020\333\004\022\r\n\010TimeTick\020\260\t\022\023\n\016QueryNodeStats\020\261" - "\t\022\016\n\tLoadIndex\020\262\t\022\016\n\tRequestID\020\263\t\022\017\n\nReq" - "uestTSO\020\264\t\022\024\n\017AllocateSegment\020\265\t\022\026\n\021Segm" - "entStatistics\020\266\t\022\025\n\020SegmentFlushDone\020\267\t\022" - "\017\n\nDataNodeTt\020\270\t\022\025\n\020CreateCredential\020\334\013\022" - "\022\n\rGetCredential\020\335\013\022\025\n\020DeleteCredential\020" - "\336\013\022\025\n\020UpdateCredential\020\337\013\022\026\n\021ListCredUse" - "rnames\020\340\013\022\017\n\nCreateRole\020\300\014\022\r\n\010DropRole\020\301" - "\014\022\024\n\017OperateUserRole\020\302\014\022\017\n\nSelectRole\020\303\014" - "\022\017\n\nSelectUser\020\304\014\022\023\n\016SelectResource\020\305\014\022\025" - "\n\020OperatePrivilege\020\306\014\022\020\n\013SelectGrant\020\307\014\022" - "\033\n\026RefreshPolicyInfoCache\020\310\014\022\017\n\nListPoli" - "cy\020\311\014*\"\n\007DslType\022\007\n\003Dsl\020\000\022\016\n\nBoolExprV1\020" - "\001*B\n\017CompactionState\022\021\n\rUndefiedState\020\000\022" - "\r\n\tExecuting\020\001\022\r\n\tCompleted\020\002*X\n\020Consist" - "encyLevel\022\n\n\006Strong\020\000\022\013\n\007Session\020\001\022\013\n\007Bo" - "unded\020\002\022\016\n\nEventually\020\003\022\016\n\nCustomized\020\004*" - "\236\001\n\013ImportState\022\021\n\rImportPending\020\000\022\020\n\014Im" - "portFailed\020\001\022\021\n\rImportStarted\020\002\022\023\n\017Impor" - "tPersisted\020\005\022\021\n\rImportFlushed\020\010\022\023\n\017Impor" - "tCompleted\020\006\022\032\n\026ImportFailedAndCleaned\020\007" - "*2\n\nObjectType\022\016\n\nCollection\020\000\022\n\n\006Global" - "\020\001\022\010\n\004User\020\002*\206\005\n\017ObjectPrivilege\022\020\n\014Priv" - "ilegeAll\020\000\022\035\n\031PrivilegeCreateCollection\020" - "\001\022\033\n\027PrivilegeDropCollection\020\002\022\037\n\033Privil" - "egeDescribeCollection\020\003\022\034\n\030PrivilegeShow" - "Collections\020\004\022\021\n\rPrivilegeLoad\020\005\022\024\n\020Priv" - "ilegeRelease\020\006\022\027\n\023PrivilegeCompaction\020\007\022" - "\023\n\017PrivilegeInsert\020\010\022\023\n\017PrivilegeDelete\020" - "\t\022\032\n\026PrivilegeGetStatistics\020\n\022\030\n\024Privile" - "geCreateIndex\020\013\022\030\n\024PrivilegeIndexDetail\020" - "\014\022\026\n\022PrivilegeDropIndex\020\r\022\023\n\017PrivilegeSe" - "arch\020\016\022\022\n\016PrivilegeFlush\020\017\022\022\n\016PrivilegeQ" - "uery\020\020\022\030\n\024PrivilegeLoadBalance\020\021\022\023\n\017Priv" - "ilegeImport\020\022\022\034\n\030PrivilegeCreateOwnershi" - "p\020\023\022\027\n\023PrivilegeUpdateUser\020\024\022\032\n\026Privileg" - "eDropOwnership\020\025\022\034\n\030PrivilegeSelectOwner" - "ship\020\026\022\034\n\030PrivilegeManageOwnership\020\027\022\027\n\023" - "PrivilegeSelectUser\020\030*S\n\tStateCode\022\020\n\014In" - "itializing\020\000\022\013\n\007Healthy\020\001\022\014\n\010Abnormal\020\002\022" - "\013\n\007StandBy\020\003\022\014\n\010Stopping\020\004*c\n\tLoadState\022" - "\025\n\021LoadStateNotExist\020\000\022\024\n\020LoadStateNotLo" - "ad\020\001\022\024\n\020LoadStateLoading\020\002\022\023\n\017LoadStateL" - "oaded\020\003:^\n\021privilege_ext_obj\022\037.google.pr" - "otobuf.MessageOptions\030\351\007 \001(\0132!.milvus.pr" - "oto.common.PrivilegeExtBf\n\016io.milvus.grp" - "cB\013CommonProtoP\001Z1github.com/milvus-io/m" - "ilvus-proto/go-api/commonpb\240\001\001\252\002\016IO.Milv" - "us.Grpcb\006proto3" + "ateLimit\0201\022\022\n\016NodeIDNotMatch\0202\022\024\n\020Upsert" + "AutoIDTrue\0203\022\017\n\013DataCoordNA\020d\022\022\n\rDDReque" + "stRace\020\350\007*c\n\nIndexState\022\022\n\016IndexStateNon" + "e\020\000\022\014\n\010Unissued\020\001\022\016\n\nInProgress\020\002\022\014\n\010Fin" + "ished\020\003\022\n\n\006Failed\020\004\022\t\n\005Retry\020\005*\202\001\n\014Segme" + "ntState\022\024\n\020SegmentStateNone\020\000\022\014\n\010NotExis" + "t\020\001\022\013\n\007Growing\020\002\022\n\n\006Sealed\020\003\022\013\n\007Flushed\020" + "\004\022\014\n\010Flushing\020\005\022\013\n\007Dropped\020\006\022\r\n\tImportin" + "g\020\007*>\n\017PlaceholderType\022\010\n\004None\020\000\022\020\n\014Bina" + "ryVector\020d\022\017\n\013FloatVector\020e*\232\r\n\007MsgType\022" + "\r\n\tUndefined\020\000\022\024\n\020CreateCollection\020d\022\022\n\016" + "DropCollection\020e\022\021\n\rHasCollection\020f\022\026\n\022D" + "escribeCollection\020g\022\023\n\017ShowCollections\020h" + "\022\024\n\020GetSystemConfigs\020i\022\022\n\016LoadCollection" + "\020j\022\025\n\021ReleaseCollection\020k\022\017\n\013CreateAlias" + "\020l\022\r\n\tDropAlias\020m\022\016\n\nAlterAlias\020n\022\023\n\017Alt" + "erCollection\020o\022\024\n\017CreatePartition\020\310\001\022\022\n\r" + "DropPartition\020\311\001\022\021\n\014HasPartition\020\312\001\022\026\n\021D" + "escribePartition\020\313\001\022\023\n\016ShowPartitions\020\314\001" + "\022\023\n\016LoadPartitions\020\315\001\022\026\n\021ReleasePartitio" + "ns\020\316\001\022\021\n\014ShowSegments\020\372\001\022\024\n\017DescribeSegm" + "ent\020\373\001\022\021\n\014LoadSegments\020\374\001\022\024\n\017ReleaseSegm" + "ents\020\375\001\022\024\n\017HandoffSegments\020\376\001\022\030\n\023LoadBal" + "anceSegments\020\377\001\022\025\n\020DescribeSegments\020\200\002\022\020" + "\n\013CreateIndex\020\254\002\022\022\n\rDescribeIndex\020\255\002\022\016\n\t" + "DropIndex\020\256\002\022\013\n\006Insert\020\220\003\022\013\n\006Delete\020\221\003\022\n" + "\n\005Flush\020\222\003\022\027\n\022ResendSegmentStats\020\223\003\022\013\n\006U" + "psert\020\224\003\022\013\n\006Search\020\364\003\022\021\n\014SearchResult\020\365\003" + "\022\022\n\rGetIndexState\020\366\003\022\032\n\025GetIndexBuildPro" + "gress\020\367\003\022\034\n\027GetCollectionStatistics\020\370\003\022\033" + "\n\026GetPartitionStatistics\020\371\003\022\r\n\010Retrieve\020" + "\372\003\022\023\n\016RetrieveResult\020\373\003\022\024\n\017WatchDmChanne" + "ls\020\374\003\022\025\n\020RemoveDmChannels\020\375\003\022\027\n\022WatchQue" + "ryChannels\020\376\003\022\030\n\023RemoveQueryChannels\020\377\003\022" + "\035\n\030SealedSegmentsChangeInfo\020\200\004\022\027\n\022WatchD" + "eltaChannels\020\201\004\022\024\n\017GetShardLeaders\020\202\004\022\020\n" + "\013GetReplicas\020\203\004\022\023\n\016UnsubDmChannel\020\204\004\022\024\n\017" + "GetDistribution\020\205\004\022\025\n\020SyncDistribution\020\206" + "\004\022\020\n\013SegmentInfo\020\330\004\022\017\n\nSystemInfo\020\331\004\022\024\n\017" + "GetRecoveryInfo\020\332\004\022\024\n\017GetSegmentState\020\333\004" + "\022\r\n\010TimeTick\020\260\t\022\023\n\016QueryNodeStats\020\261\t\022\016\n\t" + "LoadIndex\020\262\t\022\016\n\tRequestID\020\263\t\022\017\n\nRequestT" + "SO\020\264\t\022\024\n\017AllocateSegment\020\265\t\022\026\n\021SegmentSt" + "atistics\020\266\t\022\025\n\020SegmentFlushDone\020\267\t\022\017\n\nDa" + "taNodeTt\020\270\t\022\025\n\020CreateCredential\020\334\013\022\022\n\rGe" + "tCredential\020\335\013\022\025\n\020DeleteCredential\020\336\013\022\025\n" + "\020UpdateCredential\020\337\013\022\026\n\021ListCredUsername" + "s\020\340\013\022\017\n\nCreateRole\020\300\014\022\r\n\010DropRole\020\301\014\022\024\n\017" + "OperateUserRole\020\302\014\022\017\n\nSelectRole\020\303\014\022\017\n\nS" + "electUser\020\304\014\022\023\n\016SelectResource\020\305\014\022\025\n\020Ope" + "ratePrivilege\020\306\014\022\020\n\013SelectGrant\020\307\014\022\033\n\026Re" + "freshPolicyInfoCache\020\310\014\022\017\n\nListPolicy\020\311\014" + "*\"\n\007DslType\022\007\n\003Dsl\020\000\022\016\n\nBoolExprV1\020\001*B\n\017" + "CompactionState\022\021\n\rUndefiedState\020\000\022\r\n\tEx" + "ecuting\020\001\022\r\n\tCompleted\020\002*X\n\020ConsistencyL" + "evel\022\n\n\006Strong\020\000\022\013\n\007Session\020\001\022\013\n\007Bounded" + "\020\002\022\016\n\nEventually\020\003\022\016\n\nCustomized\020\004*\236\001\n\013I" + "mportState\022\021\n\rImportPending\020\000\022\020\n\014ImportF" + "ailed\020\001\022\021\n\rImportStarted\020\002\022\023\n\017ImportPers" + "isted\020\005\022\021\n\rImportFlushed\020\010\022\023\n\017ImportComp" + "leted\020\006\022\032\n\026ImportFailedAndCleaned\020\007*2\n\nO" + "bjectType\022\016\n\nCollection\020\000\022\n\n\006Global\020\001\022\010\n" + "\004User\020\002*\233\005\n\017ObjectPrivilege\022\020\n\014Privilege" + "All\020\000\022\035\n\031PrivilegeCreateCollection\020\001\022\033\n\027" + "PrivilegeDropCollection\020\002\022\037\n\033PrivilegeDe" + "scribeCollection\020\003\022\034\n\030PrivilegeShowColle" + "ctions\020\004\022\021\n\rPrivilegeLoad\020\005\022\024\n\020Privilege" + "Release\020\006\022\027\n\023PrivilegeCompaction\020\007\022\023\n\017Pr" + "ivilegeInsert\020\010\022\023\n\017PrivilegeDelete\020\t\022\032\n\026" + "PrivilegeGetStatistics\020\n\022\030\n\024PrivilegeCre" + "ateIndex\020\013\022\030\n\024PrivilegeIndexDetail\020\014\022\026\n\022" + "PrivilegeDropIndex\020\r\022\023\n\017PrivilegeSearch\020" + "\016\022\022\n\016PrivilegeFlush\020\017\022\022\n\016PrivilegeQuery\020" + "\020\022\030\n\024PrivilegeLoadBalance\020\021\022\023\n\017Privilege" + "Import\020\022\022\034\n\030PrivilegeCreateOwnership\020\023\022\027" + "\n\023PrivilegeUpdateUser\020\024\022\032\n\026PrivilegeDrop" + "Ownership\020\025\022\034\n\030PrivilegeSelectOwnership\020" + "\026\022\034\n\030PrivilegeManageOwnership\020\027\022\027\n\023Privi" + "legeSelectUser\020\030\022\023\n\017PrivilegeUpsert\020\031*S\n" + "\tStateCode\022\020\n\014Initializing\020\000\022\013\n\007Healthy\020" + "\001\022\014\n\010Abnormal\020\002\022\013\n\007StandBy\020\003\022\014\n\010Stopping" + "\020\004*c\n\tLoadState\022\025\n\021LoadStateNotExist\020\000\022\024" + "\n\020LoadStateNotLoad\020\001\022\024\n\020LoadStateLoading" + "\020\002\022\023\n\017LoadStateLoaded\020\003:^\n\021privilege_ext" + "_obj\022\037.google.protobuf.MessageOptions\030\351\007" + " \001(\0132!.milvus.proto.common.PrivilegeExtB" + "f\n\016io.milvus.grpcB\013CommonProtoP\001Z1github" + ".com/milvus-io/milvus-proto/go-api/commo" + "npb\240\001\001\252\002\016IO.Milvus.Grpcb\006proto3" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = { &::descriptor_table_google_2fprotobuf_2fdescriptor_2eproto, @@ -499,7 +500,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once; static bool descriptor_table_common_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = { - &descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 5535, + &descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 5591, &descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 11, 1, schemas, file_default_instances, TableStruct_common_2eproto::offsets, file_level_metadata_common_2eproto, 11, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto, @@ -566,6 +567,7 @@ bool ErrorCode_IsValid(int value) { case 48: case 49: case 50: + case 51: case 100: case 1000: return true; @@ -667,6 +669,7 @@ bool MsgType_IsValid(int value) { case 401: case 402: case 403: + case 404: case 500: case 501: case 502: @@ -831,6 +834,7 @@ bool ObjectPrivilege_IsValid(int value) { case 22: case 23: case 24: + case 25: return true; default: return false; diff --git a/internal/core/src/pb/common.pb.h b/internal/core/src/pb/common.pb.h index ee5b9e6684..9efcb8776a 100644 --- a/internal/core/src/pb/common.pb.h +++ b/internal/core/src/pb/common.pb.h @@ -163,6 +163,7 @@ enum ErrorCode : int { ForceDeny = 48, RateLimit = 49, NodeIDNotMatch = 50, + UpsertAutoIDTrue = 51, DataCoordNA = 100, DDRequestRace = 1000, ErrorCode_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(), @@ -308,6 +309,7 @@ enum MsgType : int { Delete = 401, Flush = 402, ResendSegmentStats = 403, + Upsert = 404, Search = 500, SearchResult = 501, GetIndexState = 502, @@ -538,12 +540,13 @@ enum ObjectPrivilege : int { PrivilegeSelectOwnership = 22, PrivilegeManageOwnership = 23, PrivilegeSelectUser = 24, + PrivilegeUpsert = 25, ObjectPrivilege_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(), ObjectPrivilege_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max() }; bool ObjectPrivilege_IsValid(int value); constexpr ObjectPrivilege ObjectPrivilege_MIN = PrivilegeAll; -constexpr ObjectPrivilege ObjectPrivilege_MAX = PrivilegeSelectUser; +constexpr ObjectPrivilege ObjectPrivilege_MAX = PrivilegeUpsert; constexpr int ObjectPrivilege_ARRAYSIZE = ObjectPrivilege_MAX + 1; const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* ObjectPrivilege_descriptor(); diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 2a0a4ca6a6..d7199ced86 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -684,6 +684,10 @@ func (s *Server) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (* return s.proxy.Delete(ctx, request) } +func (s *Server) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) { + panic("TODO: not implement") +} + func (s *Server) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) { return s.proxy.Search(ctx, request) } diff --git a/internal/metrics/rootcoord_metrics.go b/internal/metrics/rootcoord_metrics.go index 61e89f7545..8754f67e69 100644 --- a/internal/metrics/rootcoord_metrics.go +++ b/internal/metrics/rootcoord_metrics.go @@ -145,6 +145,17 @@ var ( roleNameLabelName, nodeIDLabelName, }) + + // RootCoordQuotaStates records the quota states of cluster. + RootCoordQuotaStates = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.RootCoordRole, + Name: "quota_states", + Help: "The quota states of cluster", + }, []string{ + "quota_states", + }) ) //RegisterRootCoord registers RootCoord metrics @@ -176,4 +187,5 @@ func RegisterRootCoord(registry *prometheus.Registry) { registry.MustRegister(RootCoordNumOfRoles) registry.MustRegister(RootCoordTtDelay) + registry.MustRegister(RootCoordQuotaStates) } diff --git a/internal/proto/proxy.proto b/internal/proto/proxy.proto index 130435147c..efed2aa6ea 100644 --- a/internal/proto/proxy.proto +++ b/internal/proto/proxy.proto @@ -53,4 +53,6 @@ message RefreshPolicyInfoCacheRequest { message SetRatesRequest { common.MsgBase base = 1; repeated internal.Rate rates = 2; + repeated milvus.QuotaState states = 3; + repeated string state_reasons = 4; } diff --git a/internal/proto/proxypb/proxy.pb.go b/internal/proto/proxypb/proxy.pb.go index 5bbaeed181..d0dd6c7422 100644 --- a/internal/proto/proxypb/proxy.pb.go +++ b/internal/proto/proxypb/proxy.pb.go @@ -252,11 +252,13 @@ func (m *RefreshPolicyInfoCacheRequest) GetOpKey() string { } type SetRatesRequest struct { - Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` - Rates []*internalpb.Rate `protobuf:"bytes,2,rep,name=rates,proto3" json:"rates,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` + Rates []*internalpb.Rate `protobuf:"bytes,2,rep,name=rates,proto3" json:"rates,omitempty"` + States []milvuspb.QuotaState `protobuf:"varint,3,rep,packed,name=states,proto3,enum=milvus.proto.milvus.QuotaState" json:"states,omitempty"` + StateReasons []string `protobuf:"bytes,4,rep,name=state_reasons,json=stateReasons,proto3" json:"state_reasons,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *SetRatesRequest) Reset() { *m = SetRatesRequest{} } @@ -298,6 +300,20 @@ func (m *SetRatesRequest) GetRates() []*internalpb.Rate { return nil } +func (m *SetRatesRequest) GetStates() []milvuspb.QuotaState { + if m != nil { + return m.States + } + return nil +} + +func (m *SetRatesRequest) GetStateReasons() []string { + if m != nil { + return m.StateReasons + } + return nil +} + func init() { proto.RegisterType((*InvalidateCollMetaCacheRequest)(nil), "milvus.proto.proxy.InvalidateCollMetaCacheRequest") proto.RegisterType((*InvalidateCredCacheRequest)(nil), "milvus.proto.proxy.InvalidateCredCacheRequest") @@ -309,43 +325,47 @@ func init() { func init() { proto.RegisterFile("proxy.proto", fileDescriptor_700b50b08ed8dbaf) } var fileDescriptor_700b50b08ed8dbaf = []byte{ - // 573 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0xdd, 0x6e, 0xda, 0x30, - 0x18, 0x6d, 0xda, 0x42, 0xbb, 0x0f, 0x54, 0x24, 0xab, 0x63, 0x2c, 0x5d, 0x27, 0x94, 0x4e, 0x2b, - 0xaa, 0x34, 0x58, 0xd9, 0x9e, 0xa0, 0x54, 0x42, 0x68, 0xa2, 0xaa, 0xc2, 0x76, 0xb3, 0x9b, 0xc9, - 0x49, 0xbe, 0x82, 0x51, 0x62, 0xa7, 0xb1, 0x61, 0xe3, 0x6a, 0xd2, 0xde, 0x68, 0x77, 0x7b, 0xbc, - 0x29, 0x3f, 0x04, 0xc2, 0x42, 0xa3, 0xad, 0xda, 0x1d, 0xc7, 0x3e, 0xf6, 0x39, 0x27, 0xfe, 0x0e, - 0x50, 0xf1, 0x03, 0xf1, 0x6d, 0xd1, 0xf6, 0x03, 0xa1, 0x04, 0x21, 0x1e, 0x73, 0xe7, 0x33, 0x19, - 0xa3, 0x76, 0xb4, 0xa3, 0x57, 0x6d, 0xe1, 0x79, 0x82, 0xc7, 0x6b, 0xfa, 0x11, 0xe3, 0x0a, 0x03, - 0x4e, 0xdd, 0x04, 0x57, 0xd7, 0x4f, 0x18, 0xbf, 0x34, 0x78, 0x39, 0xe0, 0x73, 0xea, 0x32, 0x87, - 0x2a, 0xec, 0x09, 0xd7, 0x1d, 0xa2, 0xa2, 0x3d, 0x6a, 0x4f, 0xd0, 0xc4, 0xfb, 0x19, 0x4a, 0x45, - 0xde, 0xc2, 0xbe, 0x45, 0x25, 0x36, 0xb4, 0xa6, 0xd6, 0xaa, 0x74, 0x5f, 0xb4, 0x33, 0x8a, 0x89, - 0xd4, 0x50, 0x8e, 0xaf, 0xa8, 0x44, 0x33, 0x62, 0x92, 0x67, 0x70, 0xe0, 0x58, 0x5f, 0x38, 0xf5, - 0xb0, 0xb1, 0xdb, 0xd4, 0x5a, 0x4f, 0xcc, 0xb2, 0x63, 0xdd, 0x50, 0x0f, 0xc9, 0x39, 0xd4, 0x6c, - 0xe1, 0xba, 0x68, 0x2b, 0x26, 0x78, 0x4c, 0xd8, 0x8b, 0x08, 0x47, 0xab, 0xe5, 0x88, 0x68, 0x40, - 0x75, 0xb5, 0x32, 0xb8, 0x6e, 0xec, 0x37, 0xb5, 0xd6, 0x9e, 0x99, 0x59, 0x33, 0xa6, 0xa0, 0xaf, - 0x39, 0x0f, 0xd0, 0x79, 0xa4, 0x6b, 0x1d, 0x0e, 0x67, 0x32, 0xfc, 0x52, 0xa9, 0xed, 0x14, 0x1b, - 0x3f, 0x34, 0xa8, 0x7f, 0xf2, 0xff, 0xbf, 0x50, 0xb8, 0xe7, 0x53, 0x29, 0xbf, 0x8a, 0xc0, 0x49, - 0x3e, 0x4d, 0x8a, 0x8d, 0xef, 0x70, 0x6a, 0xe2, 0x5d, 0x80, 0x72, 0x72, 0x2b, 0x5c, 0x66, 0x2f, - 0x06, 0xfc, 0x4e, 0x3c, 0xd2, 0x4a, 0x1d, 0xca, 0xc2, 0xff, 0xb8, 0xf0, 0x63, 0x23, 0x25, 0x33, - 0x41, 0xe4, 0x18, 0x4a, 0xc2, 0xff, 0x80, 0x8b, 0xc4, 0x43, 0x0c, 0x8c, 0x39, 0xd4, 0x46, 0xa8, - 0x4c, 0xaa, 0x50, 0xfe, 0xbb, 0xe4, 0x25, 0x94, 0x82, 0xf0, 0x86, 0xc6, 0x6e, 0x73, 0xaf, 0x55, - 0xe9, 0x9e, 0x64, 0x8f, 0xa4, 0xc3, 0x1a, 0xaa, 0x98, 0x31, 0xb3, 0xfb, 0xf3, 0x00, 0x4a, 0xb7, - 0xe1, 0x68, 0x13, 0x17, 0x48, 0x1f, 0x55, 0x4f, 0x78, 0xbe, 0xe0, 0xc8, 0xd5, 0x48, 0x85, 0xfb, - 0xa4, 0x9d, 0xbd, 0x23, 0x01, 0x7f, 0x12, 0x13, 0xd3, 0xfa, 0xab, 0x5c, 0xfe, 0x06, 0xd9, 0xd8, - 0x21, 0xf7, 0x70, 0xdc, 0xc7, 0x08, 0x32, 0xa9, 0x98, 0x2d, 0x7b, 0x13, 0xca, 0x39, 0xba, 0xa4, - 0xbb, 0xc5, 0x73, 0x1e, 0x79, 0xa9, 0x79, 0x96, 0xab, 0x39, 0x52, 0x01, 0xe3, 0x63, 0x13, 0xa5, - 0x2f, 0xb8, 0x44, 0x63, 0x87, 0x04, 0x70, 0x9a, 0xad, 0x63, 0x3c, 0xee, 0x69, 0x29, 0x37, 0xb5, - 0xe3, 0xff, 0x82, 0x87, 0x1b, 0xac, 0x9f, 0xe4, 0x3e, 0x4b, 0x68, 0x75, 0x16, 0xc6, 0xa4, 0x50, - 0xed, 0xa3, 0xba, 0x76, 0x96, 0xf1, 0x2e, 0xb6, 0xc7, 0x4b, 0x49, 0x7f, 0x19, 0x6b, 0x0a, 0xcf, - 0xb3, 0x5d, 0x45, 0xae, 0x18, 0x75, 0xe3, 0x48, 0xed, 0x82, 0x48, 0x1b, 0x8d, 0x2b, 0x8a, 0x63, - 0xc1, 0xd3, 0x55, 0x55, 0xd7, 0x75, 0x2e, 0xf2, 0x74, 0xf2, 0x5b, 0x5d, 0xa4, 0x31, 0x85, 0x7a, - 0x7e, 0x15, 0xc9, 0x65, 0x9e, 0xc8, 0x83, 0xb5, 0x2d, 0xd2, 0x72, 0xa0, 0xd6, 0x47, 0x15, 0xcd, - 0xff, 0x10, 0x55, 0xc0, 0x6c, 0x49, 0x5e, 0x6f, 0x1b, 0xf8, 0x84, 0xb0, 0xbc, 0xf9, 0xbc, 0x90, - 0x97, 0xbe, 0xd0, 0x0d, 0x1c, 0x2e, 0xbb, 0x4d, 0xce, 0xf2, 0x32, 0x6c, 0x34, 0xbf, 0xc0, 0xf5, - 0xd5, 0xfb, 0xcf, 0xdd, 0x31, 0x53, 0x93, 0x99, 0x15, 0xee, 0x74, 0x62, 0xea, 0x1b, 0x26, 0x92, - 0x5f, 0x9d, 0xe5, 0x50, 0x75, 0xa2, 0xd3, 0x9d, 0x48, 0xc2, 0xb7, 0xac, 0x72, 0x04, 0xdf, 0xfd, - 0x0e, 0x00, 0x00, 0xff, 0xff, 0x9d, 0xbe, 0x61, 0x31, 0xe4, 0x06, 0x00, 0x00, + // 625 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0x51, 0x4f, 0x13, 0x4d, + 0x14, 0x65, 0x29, 0x2d, 0x70, 0xe9, 0x07, 0xc9, 0x84, 0x0f, 0x6b, 0x11, 0x6d, 0x16, 0x23, 0x0d, + 0x89, 0xad, 0x54, 0x13, 0xdf, 0x29, 0x49, 0x43, 0x0c, 0x04, 0xa7, 0xfa, 0xe2, 0x0b, 0x99, 0xdd, + 0xbd, 0xd0, 0x21, 0xdb, 0x99, 0x65, 0x67, 0x8a, 0xf6, 0xc9, 0xc4, 0x7f, 0xe4, 0x9b, 0x3f, 0xc4, + 0x1f, 0x64, 0x76, 0x66, 0xbb, 0xd0, 0xba, 0x65, 0xa3, 0xc4, 0xb7, 0x3d, 0x77, 0xce, 0x9d, 0x73, + 0xef, 0xec, 0xbd, 0x07, 0xd6, 0xa2, 0x58, 0x7e, 0x19, 0xb7, 0xa2, 0x58, 0x6a, 0x49, 0xc8, 0x90, + 0x87, 0x37, 0x23, 0x65, 0x51, 0xcb, 0x9c, 0xd4, 0xab, 0xbe, 0x1c, 0x0e, 0xa5, 0xb0, 0xb1, 0xfa, + 0x3a, 0x17, 0x1a, 0x63, 0xc1, 0xc2, 0x14, 0x57, 0xef, 0x66, 0xb8, 0x3f, 0x1c, 0x78, 0x7a, 0x2c, + 0x6e, 0x58, 0xc8, 0x03, 0xa6, 0xb1, 0x2b, 0xc3, 0xf0, 0x04, 0x35, 0xeb, 0x32, 0x7f, 0x80, 0x14, + 0xaf, 0x47, 0xa8, 0x34, 0x79, 0x05, 0x4b, 0x1e, 0x53, 0x58, 0x73, 0x1a, 0x4e, 0x73, 0xad, 0xf3, + 0xa4, 0x35, 0xa5, 0x98, 0x4a, 0x9d, 0xa8, 0xcb, 0x43, 0xa6, 0x90, 0x1a, 0x26, 0x79, 0x04, 0xcb, + 0x81, 0x77, 0x2e, 0xd8, 0x10, 0x6b, 0x8b, 0x0d, 0xa7, 0xb9, 0x4a, 0x2b, 0x81, 0x77, 0xca, 0x86, + 0x48, 0xf6, 0x60, 0xc3, 0x97, 0x61, 0x88, 0xbe, 0xe6, 0x52, 0x58, 0x42, 0xc9, 0x10, 0xd6, 0x6f, + 0xc3, 0x86, 0xe8, 0x42, 0xf5, 0x36, 0x72, 0x7c, 0x54, 0x5b, 0x6a, 0x38, 0xcd, 0x12, 0x9d, 0x8a, + 0xb9, 0x57, 0x50, 0xbf, 0x53, 0x79, 0x8c, 0xc1, 0x03, 0xab, 0xae, 0xc3, 0xca, 0x48, 0x25, 0x2f, + 0x95, 0x95, 0x9d, 0x61, 0xf7, 0x9b, 0x03, 0x5b, 0x1f, 0xa3, 0x7f, 0x2f, 0x94, 0x9c, 0x45, 0x4c, + 0xa9, 0xcf, 0x32, 0x0e, 0xd2, 0xa7, 0xc9, 0xb0, 0xfb, 0x15, 0x76, 0x28, 0x5e, 0xc4, 0xa8, 0x06, + 0x67, 0x32, 0xe4, 0xfe, 0xf8, 0x58, 0x5c, 0xc8, 0x07, 0x96, 0xb2, 0x05, 0x15, 0x19, 0x7d, 0x18, + 0x47, 0xb6, 0x90, 0x32, 0x4d, 0x11, 0xd9, 0x84, 0xb2, 0x8c, 0xde, 0xe1, 0x38, 0xad, 0xc1, 0x02, + 0xf7, 0xa7, 0x03, 0x1b, 0x7d, 0xd4, 0x94, 0x69, 0x54, 0x7f, 0xaf, 0x79, 0x00, 0xe5, 0x38, 0xb9, + 0xa1, 0xb6, 0xd8, 0x28, 0x35, 0xd7, 0x3a, 0xdb, 0xd3, 0x29, 0xd9, 0xb4, 0x26, 0x2a, 0xd4, 0x32, + 0xc9, 0x5b, 0xa8, 0x28, 0x6d, 0x72, 0x4a, 0x8d, 0x52, 0x73, 0xbd, 0xf3, 0x6c, 0x3a, 0x27, 0x05, + 0xef, 0x47, 0x52, 0xb3, 0x7e, 0xc2, 0xa3, 0x29, 0x9d, 0xec, 0xc2, 0x7f, 0xe6, 0xeb, 0x3c, 0x46, + 0xa6, 0xa4, 0x50, 0xb5, 0xa5, 0x46, 0xa9, 0xb9, 0x4a, 0xab, 0x26, 0x48, 0x6d, 0xac, 0xf3, 0x7d, + 0x19, 0xca, 0x67, 0xc9, 0xe6, 0x90, 0x10, 0x48, 0x0f, 0x75, 0x57, 0x0e, 0x23, 0x29, 0x50, 0xe8, + 0xbe, 0xbd, 0xa4, 0x95, 0xab, 0xf6, 0x3b, 0x31, 0x7d, 0x92, 0xfa, 0xf3, 0x5c, 0xfe, 0x0c, 0xd9, + 0x5d, 0x20, 0xd7, 0xb0, 0xd9, 0x43, 0x03, 0xb9, 0xd2, 0xdc, 0x57, 0xdd, 0x01, 0x13, 0x02, 0x43, + 0xd2, 0x99, 0xf3, 0x22, 0x79, 0xe4, 0x89, 0xe6, 0x6e, 0xae, 0x66, 0x5f, 0xc7, 0x5c, 0x5c, 0x52, + 0x54, 0x91, 0x14, 0x0a, 0xdd, 0x05, 0x12, 0xc3, 0xce, 0xf4, 0xb6, 0xdb, 0x6d, 0xca, 0x76, 0x7e, + 0x56, 0xdb, 0x5a, 0xcd, 0xfd, 0x06, 0x51, 0xdf, 0xce, 0xfd, 0xe9, 0x49, 0xa9, 0xa3, 0xa4, 0x4d, + 0x06, 0xd5, 0x1e, 0xea, 0xa3, 0x60, 0xd2, 0xde, 0xfe, 0xfc, 0xf6, 0x32, 0xd2, 0x1f, 0xb6, 0x75, + 0x05, 0x8f, 0xa7, 0xad, 0x00, 0x85, 0xe6, 0x2c, 0xb4, 0x2d, 0xb5, 0x0a, 0x5a, 0x9a, 0x59, 0xe8, + 0xa2, 0x76, 0x3c, 0xf8, 0xff, 0xd6, 0x09, 0xee, 0xea, 0xec, 0xe7, 0xe9, 0xe4, 0x9b, 0x46, 0x91, + 0xc6, 0x15, 0x6c, 0xe5, 0x6f, 0x3a, 0x39, 0xc8, 0x13, 0xb9, 0xd7, 0x15, 0x8a, 0xb4, 0x02, 0xd8, + 0xe8, 0xa1, 0x36, 0xf3, 0x7f, 0x82, 0x3a, 0xe6, 0xbe, 0x22, 0x2f, 0xe6, 0x0d, 0x7c, 0x4a, 0x98, + 0xdc, 0xbc, 0x57, 0xc8, 0xcb, 0xfe, 0xd0, 0x29, 0xac, 0x4c, 0x9c, 0x83, 0xec, 0xe6, 0xf5, 0x30, + 0xe3, 0x2b, 0x05, 0x55, 0x1f, 0xbe, 0xf9, 0xd4, 0xb9, 0xe4, 0x7a, 0x30, 0xf2, 0x92, 0x93, 0xb6, + 0xa5, 0xbe, 0xe4, 0x32, 0xfd, 0x6a, 0x4f, 0x86, 0xaa, 0x6d, 0xb2, 0xdb, 0x46, 0x22, 0xf2, 0xbc, + 0x8a, 0x81, 0xaf, 0x7f, 0x05, 0x00, 0x00, 0xff, 0xff, 0x36, 0x94, 0x65, 0x38, 0x43, 0x07, 0x00, + 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/internal/proxy/error.go b/internal/proxy/error.go index 1f4af9cded..5c793ebd5b 100644 --- a/internal/proxy/error.go +++ b/internal/proxy/error.go @@ -24,6 +24,8 @@ import ( "google.golang.org/grpc/status" "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/types" ) // TODO(dragondriver): add more common error type @@ -90,3 +92,22 @@ func ErrProxyNotReady() error { func ErrPartitionNotExist(partitionName string) error { return fmt.Errorf("partition is not exist: %s", partitionName) } + +var ( + ErrRateLimit = errors.New("RequestLimited") + ErrForceDeny = errors.New("RequestDenied") +) + +func wrapRateLimitError() error { + return fmt.Errorf("[%w] request is rejected by grpc RateLimiter middleware, please retry later", ErrRateLimit) +} + +func wrapForceDenyError(rt internalpb.RateType, limiter types.Limiter) error { + switch rt { + case internalpb.RateType_DMLInsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad: + return fmt.Errorf("[%w] deny to write, reason: %s", ErrForceDeny, limiter.GetWriteStateReason()) + case internalpb.RateType_DQLSearch, internalpb.RateType_DQLQuery: + return fmt.Errorf("[%w] deny to read, reason: %s", ErrForceDeny, limiter.GetReadStateReason()) + } + return nil +} diff --git a/internal/proxy/error_test.go b/internal/proxy/error_test.go index 11feebe16c..a757803a80 100644 --- a/internal/proxy/error_test.go +++ b/internal/proxy/error_test.go @@ -17,10 +17,13 @@ package proxy import ( + "errors" "testing" "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/stretchr/testify/assert" "go.uber.org/zap" ) @@ -149,3 +152,16 @@ func Test_errProxyIsUnhealthy(t *testing.T) { zap.Error(errProxyIsUnhealthy(id))) } } + +func Test_ErrRateLimitAndErrForceDeny(t *testing.T) { + err := wrapRateLimitError() + assert.True(t, errors.Is(err, ErrRateLimit)) + + limiter := NewMultiRateLimiter() + err = wrapForceDenyError(internalpb.RateType_DMLInsert, limiter) + assert.True(t, errors.Is(err, ErrForceDeny)) + err = wrapForceDenyError(internalpb.RateType_DMLDelete, limiter) + assert.True(t, errors.Is(err, ErrForceDeny)) + err = wrapForceDenyError(internalpb.RateType_DQLSearch, limiter) + assert.True(t, errors.Is(err, ErrForceDeny)) +} diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 354a9563b0..d3854fbd58 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2128,6 +2128,10 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) return it.result, nil } +func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) { + panic("TODO: not implement") +} + // Delete delete records from collection, then these records cannot be searched. func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) { sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete") @@ -4185,6 +4189,13 @@ func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesReques resp.Reason = err.Error() return resp, nil } + node.multiRateLimiter.SetQuotaStates(request.GetStates(), request.GetStateReasons()) + log.Info("current rates in proxy", zap.Int64("proxyNodeID", paramtable.GetNodeID()), zap.Any("rates", request.GetRates())) + if len(request.GetStates()) != 0 { + for i := range request.GetStates() { + log.Warn("Proxy set quota states", zap.String("state", request.GetStates()[i].String()), zap.String("reason", request.GetStateReasons()[i])) + } + } resp.ErrorCode = commonpb.ErrorCode_Success return resp, nil } @@ -4248,16 +4259,22 @@ func (node *Proxy) CheckHealth(ctx context.Context, request *milvuspb.CheckHealt err := group.Wait() if err != nil || len(errReasons) != 0 { return &milvuspb.CheckHealthResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, IsHealthy: false, Reasons: errReasons, }, nil } + states, reasons := node.multiRateLimiter.GetQuotaStates() return &milvuspb.CheckHealthResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - IsHealthy: true, + QuotaStates: states, + Reasons: reasons, + IsHealthy: true, }, nil } diff --git a/internal/proxy/impl_test.go b/internal/proxy/impl_test.go index d22765f725..3ab1598cc8 100644 --- a/internal/proxy/impl_test.go +++ b/internal/proxy/impl_test.go @@ -1,3 +1,19 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package proxy import ( @@ -43,6 +59,7 @@ func TestProxy_InvalidateCollectionMetaCache_remove_stream(t *testing.T) { func TestProxy_CheckHealth(t *testing.T) { t.Run("not healthy", func(t *testing.T) { node := &Proxy{session: &sessionutil.Session{ServerID: 1}} + node.multiRateLimiter = NewMultiRateLimiter() node.stateCode.Store(commonpb.StateCode_Abnormal) ctx := context.Background() resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) @@ -59,6 +76,7 @@ func TestProxy_CheckHealth(t *testing.T) { indexCoord: NewIndexCoordMock(), session: &sessionutil.Session{ServerID: 1}, } + node.multiRateLimiter = NewMultiRateLimiter() node.stateCode.Store(commonpb.StateCode_Healthy) ctx := context.Background() resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) @@ -96,6 +114,7 @@ func TestProxy_CheckHealth(t *testing.T) { }), dataCoord: dataCoordMock, indexCoord: indexCoordMock} + node.multiRateLimiter = NewMultiRateLimiter() node.stateCode.Store(commonpb.StateCode_Healthy) ctx := context.Background() resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) @@ -103,4 +122,29 @@ func TestProxy_CheckHealth(t *testing.T) { assert.Equal(t, false, resp.IsHealthy) assert.Equal(t, 4, len(resp.Reasons)) }) + + t.Run("check quota state", func(t *testing.T) { + node := &Proxy{ + rootCoord: NewRootCoordMock(), + dataCoord: NewDataCoordMock(), + queryCoord: NewQueryCoordMock(), + indexCoord: NewIndexCoordMock(), + } + node.multiRateLimiter = NewMultiRateLimiter() + node.stateCode.Store(commonpb.StateCode_Healthy) + resp, err := node.CheckHealth(context.Background(), &milvuspb.CheckHealthRequest{}) + assert.NoError(t, err) + assert.Equal(t, true, resp.IsHealthy) + assert.Equal(t, 0, len(resp.GetQuotaStates())) + assert.Equal(t, 0, len(resp.GetReasons())) + + states := []milvuspb.QuotaState{milvuspb.QuotaState_DenyToWrite, milvuspb.QuotaState_DenyToRead} + reasons := []string{"memory quota exhausted", "manually deny to read"} + node.multiRateLimiter.SetQuotaStates(states, reasons) + resp, err = node.CheckHealth(context.Background(), &milvuspb.CheckHealthRequest{}) + assert.NoError(t, err) + assert.Equal(t, true, resp.IsHealthy) + assert.Equal(t, 2, len(resp.GetQuotaStates())) + assert.Equal(t, 2, len(resp.GetReasons())) + }) } diff --git a/internal/proxy/multi_rate_limiter.go b/internal/proxy/multi_rate_limiter.go index 06974006fe..348799a145 100644 --- a/internal/proxy/multi_rate_limiter.go +++ b/internal/proxy/multi_rate_limiter.go @@ -18,10 +18,12 @@ package proxy import ( "fmt" + "sync" "time" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -34,6 +36,8 @@ import ( type MultiRateLimiter struct { globalRateLimiter *rateLimiter // TODO: add collection level rateLimiter + quotaStatesMu sync.RWMutex + quotaStates map[milvuspb.QuotaState]string } // NewMultiRateLimiter returns a new MultiRateLimiter. @@ -43,14 +47,54 @@ func NewMultiRateLimiter() *MultiRateLimiter { return m } -// Limit returns true, the request will be rejected. -// Otherwise, the request will pass. Limit also returns limit of limiter. -func (m *MultiRateLimiter) Limit(rt internalpb.RateType, n int) (bool, float64) { +// Check checks if request would be limited or denied. +func (m *MultiRateLimiter) Check(rt internalpb.RateType, n int) error { if !Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() { - return false, 1 // no limit + return nil + } + limit, rate := m.globalRateLimiter.limit(rt, n) + if rate == 0 { + return wrapForceDenyError(rt, m) + } + if limit { + return wrapRateLimitError() + } + return nil +} + +// GetQuotaStates returns quota states. +func (m *MultiRateLimiter) GetQuotaStates() ([]milvuspb.QuotaState, []string) { + m.quotaStatesMu.RLock() + defer m.quotaStatesMu.RUnlock() + states := make([]milvuspb.QuotaState, 0, len(m.quotaStates)) + reasons := make([]string, 0, len(m.quotaStates)) + for k, v := range m.quotaStates { + states = append(states, k) + reasons = append(reasons, v) + } + return states, reasons +} + +func (m *MultiRateLimiter) GetReadStateReason() string { + m.quotaStatesMu.RLock() + defer m.quotaStatesMu.RUnlock() + return m.quotaStates[milvuspb.QuotaState_DenyToRead] +} + +func (m *MultiRateLimiter) GetWriteStateReason() string { + m.quotaStatesMu.RLock() + defer m.quotaStatesMu.RUnlock() + return m.quotaStates[milvuspb.QuotaState_DenyToWrite] +} + +// SetQuotaStates sets quota states for MultiRateLimiter. +func (m *MultiRateLimiter) SetQuotaStates(states []milvuspb.QuotaState, reasons []string) { + m.quotaStatesMu.Lock() + defer m.quotaStatesMu.Unlock() + m.quotaStates = make(map[milvuspb.QuotaState]string, len(states)) + for i := 0; i < len(states); i++ { + m.quotaStates[states[i]] = reasons[i] } - // TODO: call other rate limiters - return m.globalRateLimiter.limit(rt, n) } // rateLimiter implements Limiter. @@ -83,7 +127,7 @@ func (rl *rateLimiter) setRates(rates []*internalpb.Rate) error { return fmt.Errorf("unregister rateLimiter for rateType %s", r.GetRt().String()) } } - rl.printRates(rates) + // rl.printRates(rates) return nil } diff --git a/internal/proxy/multi_rate_limiter_test.go b/internal/proxy/multi_rate_limiter_test.go index 8518e4d866..78aea55ca9 100644 --- a/internal/proxy/multi_rate_limiter_test.go +++ b/internal/proxy/multi_rate_limiter_test.go @@ -17,10 +17,12 @@ package proxy import ( + "errors" "fmt" "math" "testing" + "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/ratelimitutil" @@ -36,12 +38,12 @@ func TestMultiRateLimiter(t *testing.T) { multiLimiter.globalRateLimiter.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1) } for _, rt := range internalpb.RateType_value { - ok, _ := multiLimiter.Limit(internalpb.RateType(rt), 1) - assert.False(t, ok) - ok, _ = multiLimiter.Limit(internalpb.RateType(rt), math.MaxInt) - assert.False(t, ok) - ok, _ = multiLimiter.Limit(internalpb.RateType(rt), math.MaxInt) - assert.True(t, ok) + err := multiLimiter.Check(internalpb.RateType(rt), 1) + assert.NoError(t, err) + err = multiLimiter.Check(internalpb.RateType(rt), math.MaxInt) + assert.NoError(t, err) + err = multiLimiter.Check(internalpb.RateType(rt), math.MaxInt) + assert.True(t, errors.Is(err, ErrRateLimit)) } Params.QuotaConfig.QuotaAndLimitsEnabled = bak }) @@ -51,9 +53,8 @@ func TestMultiRateLimiter(t *testing.T) { bak := Params.QuotaConfig.QuotaAndLimitsEnabled paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "false") for _, rt := range internalpb.RateType_value { - ok, r := multiLimiter.Limit(internalpb.RateType(rt), 1) - assert.False(t, ok) - assert.NotEqual(t, float64(0), r) + err := multiLimiter.Check(internalpb.RateType(rt), 1) + assert.NoError(t, err) } Params.QuotaConfig.QuotaAndLimitsEnabled = bak }) @@ -65,9 +66,8 @@ func TestMultiRateLimiter(t *testing.T) { multiLimiter := NewMultiRateLimiter() bak := Params.QuotaConfig.QuotaAndLimitsEnabled paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true") - ok, r := multiLimiter.Limit(internalpb.RateType_DMLInsert, 1*1024*1024) - assert.False(t, ok) - assert.NotEqual(t, float64(0), r) + err := multiLimiter.Check(internalpb.RateType_DMLInsert, 1*1024*1024) + assert.NoError(t, err) Params.QuotaConfig.QuotaAndLimitsEnabled = bak Params.QuotaConfig.DMLMaxInsertRate = bakInsertRate } @@ -77,6 +77,17 @@ func TestMultiRateLimiter(t *testing.T) { run(math.MaxFloat64 / 3) run(math.MaxFloat64 / 10000) }) + + t.Run("test GetReadStateReason and GetWriteStateReason", func(t *testing.T) { + multiLimiter := NewMultiRateLimiter() + states := []milvuspb.QuotaState{milvuspb.QuotaState_DenyToWrite, milvuspb.QuotaState_DenyToRead} + writeReason := "memory quota exhausted" + readReason := "manually deny to read" + reasons := []string{writeReason, readReason} + multiLimiter.SetQuotaStates(states, reasons) + assert.Equal(t, writeReason, multiLimiter.GetWriteStateReason()) + assert.Equal(t, readReason, multiLimiter.GetReadStateReason()) + }) } func TestRateLimiter(t *testing.T) { diff --git a/internal/proxy/rate_limit_interceptor.go b/internal/proxy/rate_limit_interceptor.go index 2346c102ab..a84ae9eec8 100644 --- a/internal/proxy/rate_limit_interceptor.go +++ b/internal/proxy/rate_limit_interceptor.go @@ -18,6 +18,7 @@ package proxy import ( "context" + "errors" "fmt" "reflect" @@ -34,19 +35,20 @@ import ( func RateLimitInterceptor(limiter types.Limiter) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { rt, n, err := getRequestInfo(req) - if err == nil { - limit, rate := limiter.Limit(rt, n) - if rate == 0 { - res, err1 := getFailedResponse(req, commonpb.ErrorCode_ForceDeny, fmt.Sprintf("force to deny %s.", info.FullMethod)) - if err1 == nil { - return res, nil - } + if err != nil { + return handler(ctx, req) + } + err = limiter.Check(rt, n) + if errors.Is(err, ErrForceDeny) { + rsp := getFailedResponse(req, commonpb.ErrorCode_ForceDeny, info.FullMethod, err) + if rsp != nil { + return rsp, nil } - if limit { - res, err2 := getFailedResponse(req, commonpb.ErrorCode_RateLimit, fmt.Sprintf("%s is rejected by grpc RateLimiter middleware, please retry later.", info.FullMethod)) - if err2 == nil { - return res, nil - } + } + if errors.Is(err, ErrRateLimit) { + rsp := getFailedResponse(req, commonpb.ErrorCode_RateLimit, info.FullMethod, err) + if rsp != nil { + return rsp, nil } } return handler(ctx, req) @@ -112,40 +114,37 @@ func failedBoolResponse(code commonpb.ErrorCode, reason string) *milvuspb.BoolRe } // getFailedResponse returns failed response. -func getFailedResponse(req interface{}, code commonpb.ErrorCode, reason string) (interface{}, error) { +func getFailedResponse(req interface{}, code commonpb.ErrorCode, fullMethod string, err error) interface{} { + reason := fmt.Sprintf("%s, req: %s", err, fullMethod) switch req.(type) { case *milvuspb.InsertRequest, *milvuspb.DeleteRequest: - return failedMutationResult(code, reason), nil + return failedMutationResult(code, reason) case *milvuspb.ImportRequest: return &milvuspb.ImportResponse{ Status: failedStatus(code, reason), - }, nil + } case *milvuspb.SearchRequest: return &milvuspb.SearchResults{ Status: failedStatus(code, reason), - }, nil + } case *milvuspb.QueryRequest: return &milvuspb.QueryResults{ Status: failedStatus(code, reason), - }, nil + } case *milvuspb.CreateCollectionRequest, *milvuspb.DropCollectionRequest, *milvuspb.LoadCollectionRequest, *milvuspb.ReleaseCollectionRequest, *milvuspb.CreatePartitionRequest, *milvuspb.DropPartitionRequest, *milvuspb.LoadPartitionsRequest, *milvuspb.ReleasePartitionsRequest, *milvuspb.CreateIndexRequest, *milvuspb.DropIndexRequest: - return failedStatus(code, reason), nil + return failedStatus(code, reason) case *milvuspb.FlushRequest: return &milvuspb.FlushResponse{ Status: failedStatus(code, reason), - }, nil + } case *milvuspb.ManualCompactionRequest: return &milvuspb.ManualCompactionResponse{ Status: failedStatus(code, reason), - }, nil - // TODO: support more request + } } - if req == nil { - return nil, fmt.Errorf("null request") - } - return nil, fmt.Errorf("unsupported request type %s", reflect.TypeOf(req).Name()) + return nil } diff --git a/internal/proxy/rate_limit_interceptor_test.go b/internal/proxy/rate_limit_interceptor_test.go index 4d91a9ed2b..98d1dc11b0 100644 --- a/internal/proxy/rate_limit_interceptor_test.go +++ b/internal/proxy/rate_limit_interceptor_test.go @@ -18,6 +18,7 @@ package proxy import ( "context" + "fmt" "testing" "github.com/golang/protobuf/proto" @@ -30,12 +31,38 @@ import ( ) type limiterMock struct { - limit bool - rate float64 + limit bool + rate float64 + quotaStates []milvuspb.QuotaState + quotaStateReasons []string } -func (l *limiterMock) Limit(_ internalpb.RateType, _ int) (bool, float64) { - return l.limit, l.rate +func (l *limiterMock) Check(rt internalpb.RateType, n int) error { + if l.rate == 0 { + return ErrForceDeny + } + if l.limit { + return ErrRateLimit + } + return nil +} + +func (l *limiterMock) GetReadStateReason() string { + for i := range l.quotaStates { + if l.quotaStates[i] == milvuspb.QuotaState_DenyToRead { + return l.quotaStateReasons[i] + } + } + return "" +} + +func (l *limiterMock) GetWriteStateReason() string { + for i := range l.quotaStates { + if l.quotaStates[i] == milvuspb.QuotaState_DenyToWrite { + return l.quotaStateReasons[i] + } + } + return "" } func TestRateLimitInterceptor(t *testing.T) { @@ -93,8 +120,8 @@ func TestRateLimitInterceptor(t *testing.T) { t.Run("test getFailedResponse", func(t *testing.T) { testGetFailedResponse := func(req interface{}) { - _, err := getFailedResponse(req, commonpb.ErrorCode_UnexpectedError, "mock") - assert.NoError(t, err) + rsp := getFailedResponse(req, commonpb.ErrorCode_UnexpectedError, "method", fmt.Errorf("mock err")) + assert.NotNil(t, rsp) } testGetFailedResponse(&milvuspb.DeleteRequest{}) @@ -106,10 +133,10 @@ func TestRateLimitInterceptor(t *testing.T) { testGetFailedResponse(&milvuspb.ManualCompactionRequest{}) // test illegal - _, err := getFailedResponse(&milvuspb.SearchResults{}, commonpb.ErrorCode_UnexpectedError, "mock") - assert.Error(t, err) - _, err = getFailedResponse(nil, commonpb.ErrorCode_UnexpectedError, "mock") - assert.Error(t, err) + rsp := getFailedResponse(&milvuspb.SearchResults{}, commonpb.ErrorCode_UnexpectedError, "method", fmt.Errorf("mock err")) + assert.Nil(t, rsp) + rsp = getFailedResponse(nil, commonpb.ErrorCode_UnexpectedError, "method", fmt.Errorf("mock err")) + assert.Nil(t, rsp) }) t.Run("test RateLimitInterceptor", func(t *testing.T) { diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index dc199baf08..57697e298d 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -27,6 +27,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -45,15 +46,31 @@ const ( SetRatesTimeout = 10 * time.Second ) -type ForceDenyTriggerReason string +type TriggerReason int32 const ( - ManualForceDeny ForceDenyTriggerReason = "ManualForceDeny" - MemoryExhausted ForceDenyTriggerReason = "MemoryExhausted" - DiskQuotaExceeded ForceDenyTriggerReason = "DiskQuotaExceeded" - TimeTickLongDelay ForceDenyTriggerReason = "TimeTickLongDelay" + ManuallyDenyToRead TriggerReason = 0 + ManuallyDenyToWrite TriggerReason = 1 + MemoryQuotaExhausted TriggerReason = 2 + DiskQuotaExhausted TriggerReason = 3 + TimeTickLongDelay TriggerReason = 4 ) +var TriggerReasonString = map[TriggerReason]string{ + ManuallyDenyToRead: "manually deny to read", + ManuallyDenyToWrite: "manually deny to write", + MemoryQuotaExhausted: "memory quota exhausted, please allocate more resources", + DiskQuotaExhausted: "disk quota exhausted, please allocate more resources", + TimeTickLongDelay: "time tick long delay", +} + +func (t TriggerReason) String() string { + if s, ok := TriggerReasonString[t]; ok { + return s + } + return "" +} + type RateAllocateStrategy int32 const ( @@ -97,6 +114,7 @@ type QuotaCenter struct { dataCoordMetrics *metricsinfo.DataCoordQuotaMetrics currentRates map[internalpb.RateType]Limit + quotaStates map[milvuspb.QuotaState]string tsoAllocator tso.Allocator rateAllocateStrategy RateAllocateStrategy @@ -112,6 +130,7 @@ func NewQuotaCenter(proxies *proxyClientManager, queryCoord types.QueryCoord, da queryCoord: queryCoord, dataCoord: dataCoord, currentRates: make(map[internalpb.RateType]Limit), + quotaStates: make(map[milvuspb.QuotaState]string), tsoAllocator: tsoAllocator, rateAllocateStrategy: DefaultRateAllocateStrategy, @@ -144,6 +163,7 @@ func (q *QuotaCenter) run() { if err != nil { log.Warn("quotaCenter setRates failed", zap.Error(err)) } + q.recordMetrics() } } } @@ -251,18 +271,20 @@ func (q *QuotaCenter) syncMetrics() error { } // forceDenyWriting sets dml rates to 0 to reject all dml requests. -func (q *QuotaCenter) forceDenyWriting(reason ForceDenyTriggerReason) { +func (q *QuotaCenter) forceDenyWriting(reason TriggerReason) { q.currentRates[internalpb.RateType_DMLInsert] = 0 q.currentRates[internalpb.RateType_DMLDelete] = 0 q.currentRates[internalpb.RateType_DMLBulkLoad] = 0 - log.Warn("QuotaCenter force to deny writing", zap.String("reason", string(reason))) + log.Warn("QuotaCenter force to deny writing", zap.String("reason", reason.String())) + q.quotaStates[milvuspb.QuotaState_DenyToWrite] = reason.String() } // forceDenyWriting sets dql rates to 0 to reject all dql requests. -func (q *QuotaCenter) forceDenyReading(reason ForceDenyTriggerReason) { +func (q *QuotaCenter) forceDenyReading(reason TriggerReason) { q.currentRates[internalpb.RateType_DQLSearch] = 0 q.currentRates[internalpb.RateType_DQLQuery] = 0 - log.Warn("QuotaCenter force to deny reading", zap.String("reason", string(reason))) + log.Warn("QuotaCenter force to deny reading", zap.String("reason", reason.String())) + q.quotaStates[milvuspb.QuotaState_DenyToRead] = reason.String() } // getRealTimeRate return real time rate in Proxy. @@ -288,7 +310,7 @@ func (q *QuotaCenter) guaranteeMinRate(minRate float64, rateType internalpb.Rate // calculateReadRates calculates and sets dql rates. func (q *QuotaCenter) calculateReadRates() { if Params.QuotaConfig.ForceDenyReading.GetAsBool() { - q.forceDenyReading(ManualForceDeny) + q.forceDenyReading(ManuallyDenyToRead) return } @@ -333,13 +355,13 @@ func (q *QuotaCenter) calculateReadRates() { // calculateWriteRates calculates and sets dml rates. func (q *QuotaCenter) calculateWriteRates() error { if Params.QuotaConfig.ForceDenyWriting.GetAsBool() { - q.forceDenyWriting(ManualForceDeny) + q.forceDenyWriting(ManuallyDenyToWrite) return nil } exceeded := q.ifDiskQuotaExceeded() if exceeded { - q.forceDenyWriting(DiskQuotaExceeded) // disk quota protection + q.forceDenyWriting(DiskQuotaExhausted) // disk quota protection return nil } @@ -355,7 +377,7 @@ func (q *QuotaCenter) calculateWriteRates() error { memFactor := q.getMemoryFactor() if memFactor <= 0 { - q.forceDenyWriting(MemoryExhausted) // memory protection + q.forceDenyWriting(MemoryQuotaExhausted) // memory protection return nil } @@ -408,6 +430,7 @@ func (q *QuotaCenter) resetCurrentRates() { q.currentRates[rt] = Inf // no limit } } + q.quotaStates = make(map[milvuspb.QuotaState]string) } // getTimeTickDelayFactor gets time tick delay of DataNodes and QueryNodes, @@ -665,13 +688,38 @@ func (q *QuotaCenter) setRates() error { case ByRateWeight: // TODO: support ByRateWeight } + states := make([]milvuspb.QuotaState, 0, len(q.quotaStates)) + stateReasons := make([]string, 0, len(q.quotaStates)) + for k, v := range q.quotaStates { + states = append(states, k) + stateReasons = append(stateReasons, v) + } timestamp := tsoutil.ComposeTSByTime(time.Now(), 0) req := &proxypb.SetRatesRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgID(int64(timestamp)), commonpbutil.WithTimeStamp(timestamp), ), - Rates: map2List(), + Rates: map2List(), + States: states, + StateReasons: stateReasons, } return q.proxies.SetRates(ctx, req) } + +// recordMetrics records metrics of quota states. +func (q *QuotaCenter) recordMetrics() { + for _, reason := range TriggerReasonString { + hit := false + for _, v := range q.quotaStates { + if v == reason { + hit = true + } + } + if hit { + metrics.RootCoordQuotaStates.WithLabelValues(reason).Set(1) + } else { + metrics.RootCoordQuotaStates.WithLabelValues(reason).Set(0) + } + } +} diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index a48ff6d4bd..446e7b7591 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -115,10 +115,10 @@ func TestQuotaCenter(t *testing.T) { t.Run("test forceDeny", func(t *testing.T) { quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator) - quotaCenter.forceDenyReading(ManualForceDeny) + quotaCenter.forceDenyReading(ManuallyDenyToRead) assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DQLQuery]) assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DQLQuery]) - quotaCenter.forceDenyWriting(ManualForceDeny) + quotaCenter.forceDenyWriting(ManuallyDenyToWrite) assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DMLInsert]) assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DMLDelete]) }) @@ -441,10 +441,19 @@ func TestQuotaCenter(t *testing.T) { t.Run("test setRates", func(t *testing.T) { quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator) quotaCenter.currentRates[internalpb.RateType_DMLInsert] = 100 + quotaCenter.quotaStates[milvuspb.QuotaState_DenyToWrite] = TriggerReasonString[MemoryQuotaExhausted] + quotaCenter.quotaStates[milvuspb.QuotaState_DenyToRead] = TriggerReasonString[ManuallyDenyToRead] err = quotaCenter.setRates() assert.NoError(t, err) }) + t.Run("test recordMetrics", func(t *testing.T) { + quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator) + quotaCenter.quotaStates[milvuspb.QuotaState_DenyToWrite] = TriggerReasonString[MemoryQuotaExhausted] + quotaCenter.quotaStates[milvuspb.QuotaState_DenyToRead] = TriggerReasonString[ManuallyDenyToRead] + quotaCenter.recordMetrics() + }) + t.Run("test guaranteeMinRate", func(t *testing.T) { quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator) minRate := Limit(100) diff --git a/internal/types/types.go b/internal/types/types.go index dcdef37d52..9dc65c876a 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -40,7 +40,9 @@ type TimeTickProvider interface { // If Limit function return true, the request will be rejected. // Otherwise, the request will pass. Limit also returns limit of limiter. type Limiter interface { - Limit(rt internalpb.RateType, n int) (bool, float64) + Check(rt internalpb.RateType, n int) error + GetReadStateReason() string + GetWriteStateReason() string } // Component is the interface all services implement