Add NotFoundTSafer and NoReplicaAvailable to retryable error code (#22505)

Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
jaime 2023-03-03 16:37:49 +08:00 committed by GitHub
parent add2596af6
commit 6605db2a24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 182 additions and 119 deletions

2
go.mod
View File

@ -28,7 +28,7 @@ require (
github.com/klauspost/compress v1.14.2
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-2c1f3aadc378
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230301092744-7efc6eec15fd
github.com/minio/minio-go/v7 v7.0.17
github.com/opentracing/opentracing-go v1.2.0
github.com/panjf2000/ants/v2 v2.4.8

2
go.sum
View File

@ -493,6 +493,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-2c1f3aadc378 h1:ttJp/ZUB/3GGbd2mIbASSfdOiBUrkP50gn5gDgCsD0g=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-2c1f3aadc378/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230301092744-7efc6eec15fd h1:9ilgTEqZSdEPbJKSrRGB1TIHTaF7DqVDIwn8/azcaBk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230301092744-7efc6eec15fd/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/pulsar-client-go v0.6.8 h1:fZdZH73aPRszu2fazyeeahQEz34tyn1Pt9EkqJmV100=
github.com/milvus-io/pulsar-client-go v0.6.8/go.mod h1:oFIlYIk23tamkSLttw849qphmMIpHY8ztEBWDWJW+sc=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=

View File

@ -158,16 +158,26 @@ func (t *codeError) Is(err error) bool {
return ok
}
func IsRetryErrorCode(code commonpb.ErrorCode) bool {
return code == commonpb.ErrorCode_NotReadyServe ||
code == commonpb.ErrorCode_NotShardLeader ||
code == commonpb.ErrorCode_NodeIDNotMatch
func GetErrorCode(e error) (commonpb.ErrorCode, bool) {
codeErr, ok := e.(*codeError)
if !ok {
return -1, false
}
return codeErr.code, true
}
func IsTriableError(err error) bool {
if is := errors.Is(err, CodeErr); is {
func IsRetryableErrorCode(code commonpb.ErrorCode) bool {
return code == commonpb.ErrorCode_NotReadyServe ||
code == commonpb.ErrorCode_NotShardLeader ||
code == commonpb.ErrorCode_NodeIDNotMatch ||
code == commonpb.ErrorCode_NoReplicaAvailable ||
code == commonpb.ErrorCode_NotFoundTSafer
}
func IsRetryableError(err error) bool {
if ok := errors.Is(err, CodeErr); ok {
codeErr := err.(*codeError)
return IsRetryErrorCode(codeErr.code)
return IsRetryableErrorCode(codeErr.code)
}
return false
}

View File

@ -21,9 +21,9 @@ import (
"fmt"
"testing"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
)
func TestIgnorableError(t *testing.T) {
@ -81,7 +81,21 @@ func TestStatusFromError(t *testing.T) {
}
func TestCodeError(t *testing.T) {
assert.False(t, IsTriableError(nil))
assert.True(t, IsTriableError(NewCodeError(commonpb.ErrorCode_NotReadyServe, nil)))
assert.False(t, IsTriableError(NewCodeError(commonpb.ErrorCode_UnexpectedError, nil)))
assert.False(t, IsRetryableError(nil))
assert.True(t, IsRetryableError(NewCodeError(commonpb.ErrorCode_NotReadyServe, nil)))
assert.False(t, IsRetryableError(NewCodeError(commonpb.ErrorCode_UnexpectedError, nil)))
{
err := NewCodeError(commonpb.ErrorCode_NotReadyServe, nil)
code, ret := GetErrorCode(err)
assert.True(t, ret)
assert.Equal(t, commonpb.ErrorCode_NotReadyServe, code)
}
{
err := errors.New("test")
code, ret := GetErrorCode(err)
assert.False(t, ret)
assert.Equal(t, commonpb.ErrorCode(-1), code)
}
}

View File

@ -363,7 +363,7 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
".ObjectType\022>\n\020object_privilege\030\002 \001(\0162$."
"milvus.proto.common.ObjectPrivilege\022\031\n\021o"
"bject_name_index\030\003 \001(\005\022\032\n\022object_name_in"
"dexs\030\004 \001(\005*\303\n\n\tErrorCode\022\013\n\007Success\020\000\022\023\n"
"dexs\030\004 \001(\005*\327\n\n\tErrorCode\022\013\n\007Success\020\000\022\023\n"
"\017UnexpectedError\020\001\022\021\n\rConnectFailed\020\002\022\024\n"
"\020PermissionDenied\020\003\022\027\n\023CollectionNotExis"
"ts\020\004\022\023\n\017IllegalArgument\020\005\022\024\n\020IllegalDime"
@ -396,103 +396,103 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
"\0204\022\030\n\024MemoryQuotaExhausted\0205\022\026\n\022DiskQuot"
"aExhausted\0206\022\025\n\021TimeTickLongDelay\0207\022\021\n\rN"
"otReadyServe\0208\022\033\n\027NotReadyCoordActivatin"
"g\0209\022\017\n\013DataCoordNA\020d\022\022\n\rDDRequestRace\020\350\007"
"*c\n\nIndexState\022\022\n\016IndexStateNone\020\000\022\014\n\010Un"
"issued\020\001\022\016\n\nInProgress\020\002\022\014\n\010Finished\020\003\022\n"
"\n\006Failed\020\004\022\t\n\005Retry\020\005*\202\001\n\014SegmentState\022\024"
"\n\020SegmentStateNone\020\000\022\014\n\010NotExist\020\001\022\013\n\007Gr"
"owing\020\002\022\n\n\006Sealed\020\003\022\013\n\007Flushed\020\004\022\014\n\010Flus"
"hing\020\005\022\013\n\007Dropped\020\006\022\r\n\tImporting\020\007*>\n\017Pl"
"aceholderType\022\010\n\004None\020\000\022\020\n\014BinaryVector\020"
"d\022\017\n\013FloatVector\020e*\263\016\n\007MsgType\022\r\n\tUndefi"
"ned\020\000\022\024\n\020CreateCollection\020d\022\022\n\016DropColle"
"ction\020e\022\021\n\rHasCollection\020f\022\026\n\022DescribeCo"
"llection\020g\022\023\n\017ShowCollections\020h\022\024\n\020GetSy"
"stemConfigs\020i\022\022\n\016LoadCollection\020j\022\025\n\021Rel"
"easeCollection\020k\022\017\n\013CreateAlias\020l\022\r\n\tDro"
"pAlias\020m\022\016\n\nAlterAlias\020n\022\023\n\017AlterCollect"
"ion\020o\022\024\n\020RenameCollection\020p\022\024\n\017CreatePar"
"tition\020\310\001\022\022\n\rDropPartition\020\311\001\022\021\n\014HasPart"
"ition\020\312\001\022\026\n\021DescribePartition\020\313\001\022\023\n\016Show"
"Partitions\020\314\001\022\023\n\016LoadPartitions\020\315\001\022\026\n\021Re"
"leasePartitions\020\316\001\022\021\n\014ShowSegments\020\372\001\022\024\n"
"\017DescribeSegment\020\373\001\022\021\n\014LoadSegments\020\374\001\022\024"
"\n\017ReleaseSegments\020\375\001\022\024\n\017HandoffSegments\020"
"\376\001\022\030\n\023LoadBalanceSegments\020\377\001\022\025\n\020Describe"
"Segments\020\200\002\022\020\n\013CreateIndex\020\254\002\022\022\n\rDescrib"
"eIndex\020\255\002\022\016\n\tDropIndex\020\256\002\022\013\n\006Insert\020\220\003\022\013"
"\n\006Delete\020\221\003\022\n\n\005Flush\020\222\003\022\027\n\022ResendSegment"
"Stats\020\223\003\022\013\n\006Search\020\364\003\022\021\n\014SearchResult\020\365\003"
"\022\022\n\rGetIndexState\020\366\003\022\032\n\025GetIndexBuildPro"
"gress\020\367\003\022\034\n\027GetCollectionStatistics\020\370\003\022\033"
"\n\026GetPartitionStatistics\020\371\003\022\r\n\010Retrieve\020"
"\372\003\022\023\n\016RetrieveResult\020\373\003\022\024\n\017WatchDmChanne"
"ls\020\374\003\022\025\n\020RemoveDmChannels\020\375\003\022\027\n\022WatchQue"
"ryChannels\020\376\003\022\030\n\023RemoveQueryChannels\020\377\003\022"
"\035\n\030SealedSegmentsChangeInfo\020\200\004\022\027\n\022WatchD"
"eltaChannels\020\201\004\022\024\n\017GetShardLeaders\020\202\004\022\020\n"
"\013GetReplicas\020\203\004\022\023\n\016UnsubDmChannel\020\204\004\022\024\n\017"
"GetDistribution\020\205\004\022\025\n\020SyncDistribution\020\206"
"\004\022\020\n\013SegmentInfo\020\330\004\022\017\n\nSystemInfo\020\331\004\022\024\n\017"
"GetRecoveryInfo\020\332\004\022\024\n\017GetSegmentState\020\333\004"
"\022\r\n\010TimeTick\020\260\t\022\023\n\016QueryNodeStats\020\261\t\022\016\n\t"
"LoadIndex\020\262\t\022\016\n\tRequestID\020\263\t\022\017\n\nRequestT"
"SO\020\264\t\022\024\n\017AllocateSegment\020\265\t\022\026\n\021SegmentSt"
"atistics\020\266\t\022\025\n\020SegmentFlushDone\020\267\t\022\017\n\nDa"
"taNodeTt\020\270\t\022\025\n\020CreateCredential\020\334\013\022\022\n\rGe"
"tCredential\020\335\013\022\025\n\020DeleteCredential\020\336\013\022\025\n"
"\020UpdateCredential\020\337\013\022\026\n\021ListCredUsername"
"s\020\340\013\022\017\n\nCreateRole\020\300\014\022\r\n\010DropRole\020\301\014\022\024\n\017"
"OperateUserRole\020\302\014\022\017\n\nSelectRole\020\303\014\022\017\n\nS"
"electUser\020\304\014\022\023\n\016SelectResource\020\305\014\022\025\n\020Ope"
"ratePrivilege\020\306\014\022\020\n\013SelectGrant\020\307\014\022\033\n\026Re"
"freshPolicyInfoCache\020\310\014\022\017\n\nListPolicy\020\311\014"
"\022\030\n\023CreateResourceGroup\020\244\r\022\026\n\021DropResour"
"ceGroup\020\245\r\022\027\n\022ListResourceGroups\020\246\r\022\032\n\025D"
"escribeResourceGroup\020\247\r\022\021\n\014TransferNode\020"
"\250\r\022\024\n\017TransferReplica\020\251\r*\"\n\007DslType\022\007\n\003D"
"sl\020\000\022\016\n\nBoolExprV1\020\001*B\n\017CompactionState\022"
"\021\n\rUndefiedState\020\000\022\r\n\tExecuting\020\001\022\r\n\tCom"
"pleted\020\002*X\n\020ConsistencyLevel\022\n\n\006Strong\020\000"
"\022\013\n\007Session\020\001\022\013\n\007Bounded\020\002\022\016\n\nEventually"
"\020\003\022\016\n\nCustomized\020\004*\213\001\n\013ImportState\022\021\n\rIm"
"portPending\020\000\022\020\n\014ImportFailed\020\001\022\021\n\rImpor"
"tStarted\020\002\022\023\n\017ImportPersisted\020\005\022\023\n\017Impor"
"tCompleted\020\006\022\032\n\026ImportFailedAndCleaned\020\007"
"*2\n\nObjectType\022\016\n\nCollection\020\000\022\n\n\006Global"
"\020\001\022\010\n\004User\020\002*\202\007\n\017ObjectPrivilege\022\020\n\014Priv"
"ilegeAll\020\000\022\035\n\031PrivilegeCreateCollection\020"
"\001\022\033\n\027PrivilegeDropCollection\020\002\022\037\n\033Privil"
"egeDescribeCollection\020\003\022\034\n\030PrivilegeShow"
"Collections\020\004\022\021\n\rPrivilegeLoad\020\005\022\024\n\020Priv"
"ilegeRelease\020\006\022\027\n\023PrivilegeCompaction\020\007\022"
"\023\n\017PrivilegeInsert\020\010\022\023\n\017PrivilegeDelete\020"
"\t\022\032\n\026PrivilegeGetStatistics\020\n\022\030\n\024Privile"
"geCreateIndex\020\013\022\030\n\024PrivilegeIndexDetail\020"
"\014\022\026\n\022PrivilegeDropIndex\020\r\022\023\n\017PrivilegeSe"
"arch\020\016\022\022\n\016PrivilegeFlush\020\017\022\022\n\016PrivilegeQ"
"uery\020\020\022\030\n\024PrivilegeLoadBalance\020\021\022\023\n\017Priv"
"ilegeImport\020\022\022\034\n\030PrivilegeCreateOwnershi"
"p\020\023\022\027\n\023PrivilegeUpdateUser\020\024\022\032\n\026Privileg"
"eDropOwnership\020\025\022\034\n\030PrivilegeSelectOwner"
"ship\020\026\022\034\n\030PrivilegeManageOwnership\020\027\022\027\n\023"
"PrivilegeSelectUser\020\030\022 \n\034PrivilegeCreate"
"ResourceGroup\020\032\022\036\n\032PrivilegeDropResource"
"Group\020\033\022\"\n\036PrivilegeDescribeResourceGrou"
"p\020\034\022\037\n\033PrivilegeListResourceGroups\020\035\022\031\n\025"
"PrivilegeTransferNode\020\036\022\034\n\030PrivilegeTran"
"sferReplica\020\037\022\037\n\033PrivilegeGetLoadingProg"
"ress\020 \022\031\n\025PrivilegeGetLoadState\020!*S\n\tSta"
"teCode\022\020\n\014Initializing\020\000\022\013\n\007Healthy\020\001\022\014\n"
"\010Abnormal\020\002\022\013\n\007StandBy\020\003\022\014\n\010Stopping\020\004*c"
"\n\tLoadState\022\025\n\021LoadStateNotExist\020\000\022\024\n\020Lo"
"adStateNotLoad\020\001\022\024\n\020LoadStateLoading\020\002\022\023"
"\n\017LoadStateLoaded\020\003:^\n\021privilege_ext_obj"
"\022\037.google.protobuf.MessageOptions\030\351\007 \001(\013"
"2!.milvus.proto.common.PrivilegeExtBU\n\016i"
"o.milvus.grpcB\013CommonProtoP\001Z1github.com"
"/milvus-io/milvus-proto/go-api/commonpb\240"
"\001\001b\006proto3"
"g\0209\022\022\n\016NotFoundTSafer\020:\022\017\n\013DataCoordNA\020d"
"\022\022\n\rDDRequestRace\020\350\007*c\n\nIndexState\022\022\n\016In"
"dexStateNone\020\000\022\014\n\010Unissued\020\001\022\016\n\nInProgre"
"ss\020\002\022\014\n\010Finished\020\003\022\n\n\006Failed\020\004\022\t\n\005Retry\020"
"\005*\202\001\n\014SegmentState\022\024\n\020SegmentStateNone\020\000"
"\022\014\n\010NotExist\020\001\022\013\n\007Growing\020\002\022\n\n\006Sealed\020\003\022"
"\013\n\007Flushed\020\004\022\014\n\010Flushing\020\005\022\013\n\007Dropped\020\006\022"
"\r\n\tImporting\020\007*>\n\017PlaceholderType\022\010\n\004Non"
"e\020\000\022\020\n\014BinaryVector\020d\022\017\n\013FloatVector\020e*\263"
"\016\n\007MsgType\022\r\n\tUndefined\020\000\022\024\n\020CreateColle"
"ction\020d\022\022\n\016DropCollection\020e\022\021\n\rHasCollec"
"tion\020f\022\026\n\022DescribeCollection\020g\022\023\n\017ShowCo"
"llections\020h\022\024\n\020GetSystemConfigs\020i\022\022\n\016Loa"
"dCollection\020j\022\025\n\021ReleaseCollection\020k\022\017\n\013"
"CreateAlias\020l\022\r\n\tDropAlias\020m\022\016\n\nAlterAli"
"as\020n\022\023\n\017AlterCollection\020o\022\024\n\020RenameColle"
"ction\020p\022\024\n\017CreatePartition\020\310\001\022\022\n\rDropPar"
"tition\020\311\001\022\021\n\014HasPartition\020\312\001\022\026\n\021Describe"
"Partition\020\313\001\022\023\n\016ShowPartitions\020\314\001\022\023\n\016Loa"
"dPartitions\020\315\001\022\026\n\021ReleasePartitions\020\316\001\022\021"
"\n\014ShowSegments\020\372\001\022\024\n\017DescribeSegment\020\373\001\022"
"\021\n\014LoadSegments\020\374\001\022\024\n\017ReleaseSegments\020\375\001"
"\022\024\n\017HandoffSegments\020\376\001\022\030\n\023LoadBalanceSeg"
"ments\020\377\001\022\025\n\020DescribeSegments\020\200\002\022\020\n\013Creat"
"eIndex\020\254\002\022\022\n\rDescribeIndex\020\255\002\022\016\n\tDropInd"
"ex\020\256\002\022\013\n\006Insert\020\220\003\022\013\n\006Delete\020\221\003\022\n\n\005Flush"
"\020\222\003\022\027\n\022ResendSegmentStats\020\223\003\022\013\n\006Search\020\364"
"\003\022\021\n\014SearchResult\020\365\003\022\022\n\rGetIndexState\020\366\003"
"\022\032\n\025GetIndexBuildProgress\020\367\003\022\034\n\027GetColle"
"ctionStatistics\020\370\003\022\033\n\026GetPartitionStatis"
"tics\020\371\003\022\r\n\010Retrieve\020\372\003\022\023\n\016RetrieveResult"
"\020\373\003\022\024\n\017WatchDmChannels\020\374\003\022\025\n\020RemoveDmCha"
"nnels\020\375\003\022\027\n\022WatchQueryChannels\020\376\003\022\030\n\023Rem"
"oveQueryChannels\020\377\003\022\035\n\030SealedSegmentsCha"
"ngeInfo\020\200\004\022\027\n\022WatchDeltaChannels\020\201\004\022\024\n\017G"
"etShardLeaders\020\202\004\022\020\n\013GetReplicas\020\203\004\022\023\n\016U"
"nsubDmChannel\020\204\004\022\024\n\017GetDistribution\020\205\004\022\025"
"\n\020SyncDistribution\020\206\004\022\020\n\013SegmentInfo\020\330\004\022"
"\017\n\nSystemInfo\020\331\004\022\024\n\017GetRecoveryInfo\020\332\004\022\024"
"\n\017GetSegmentState\020\333\004\022\r\n\010TimeTick\020\260\t\022\023\n\016Q"
"ueryNodeStats\020\261\t\022\016\n\tLoadIndex\020\262\t\022\016\n\tRequ"
"estID\020\263\t\022\017\n\nRequestTSO\020\264\t\022\024\n\017AllocateSeg"
"ment\020\265\t\022\026\n\021SegmentStatistics\020\266\t\022\025\n\020Segme"
"ntFlushDone\020\267\t\022\017\n\nDataNodeTt\020\270\t\022\025\n\020Creat"
"eCredential\020\334\013\022\022\n\rGetCredential\020\335\013\022\025\n\020De"
"leteCredential\020\336\013\022\025\n\020UpdateCredential\020\337\013"
"\022\026\n\021ListCredUsernames\020\340\013\022\017\n\nCreateRole\020\300"
"\014\022\r\n\010DropRole\020\301\014\022\024\n\017OperateUserRole\020\302\014\022\017"
"\n\nSelectRole\020\303\014\022\017\n\nSelectUser\020\304\014\022\023\n\016Sele"
"ctResource\020\305\014\022\025\n\020OperatePrivilege\020\306\014\022\020\n\013"
"SelectGrant\020\307\014\022\033\n\026RefreshPolicyInfoCache"
"\020\310\014\022\017\n\nListPolicy\020\311\014\022\030\n\023CreateResourceGr"
"oup\020\244\r\022\026\n\021DropResourceGroup\020\245\r\022\027\n\022ListRe"
"sourceGroups\020\246\r\022\032\n\025DescribeResourceGroup"
"\020\247\r\022\021\n\014TransferNode\020\250\r\022\024\n\017TransferReplic"
"a\020\251\r*\"\n\007DslType\022\007\n\003Dsl\020\000\022\016\n\nBoolExprV1\020\001"
"*B\n\017CompactionState\022\021\n\rUndefiedState\020\000\022\r"
"\n\tExecuting\020\001\022\r\n\tCompleted\020\002*X\n\020Consiste"
"ncyLevel\022\n\n\006Strong\020\000\022\013\n\007Session\020\001\022\013\n\007Bou"
"nded\020\002\022\016\n\nEventually\020\003\022\016\n\nCustomized\020\004*\213"
"\001\n\013ImportState\022\021\n\rImportPending\020\000\022\020\n\014Imp"
"ortFailed\020\001\022\021\n\rImportStarted\020\002\022\023\n\017Import"
"Persisted\020\005\022\023\n\017ImportCompleted\020\006\022\032\n\026Impo"
"rtFailedAndCleaned\020\007*2\n\nObjectType\022\016\n\nCo"
"llection\020\000\022\n\n\006Global\020\001\022\010\n\004User\020\002*\202\007\n\017Obj"
"ectPrivilege\022\020\n\014PrivilegeAll\020\000\022\035\n\031Privil"
"egeCreateCollection\020\001\022\033\n\027PrivilegeDropCo"
"llection\020\002\022\037\n\033PrivilegeDescribeCollectio"
"n\020\003\022\034\n\030PrivilegeShowCollections\020\004\022\021\n\rPri"
"vilegeLoad\020\005\022\024\n\020PrivilegeRelease\020\006\022\027\n\023Pr"
"ivilegeCompaction\020\007\022\023\n\017PrivilegeInsert\020\010"
"\022\023\n\017PrivilegeDelete\020\t\022\032\n\026PrivilegeGetSta"
"tistics\020\n\022\030\n\024PrivilegeCreateIndex\020\013\022\030\n\024P"
"rivilegeIndexDetail\020\014\022\026\n\022PrivilegeDropIn"
"dex\020\r\022\023\n\017PrivilegeSearch\020\016\022\022\n\016PrivilegeF"
"lush\020\017\022\022\n\016PrivilegeQuery\020\020\022\030\n\024PrivilegeL"
"oadBalance\020\021\022\023\n\017PrivilegeImport\020\022\022\034\n\030Pri"
"vilegeCreateOwnership\020\023\022\027\n\023PrivilegeUpda"
"teUser\020\024\022\032\n\026PrivilegeDropOwnership\020\025\022\034\n\030"
"PrivilegeSelectOwnership\020\026\022\034\n\030PrivilegeM"
"anageOwnership\020\027\022\027\n\023PrivilegeSelectUser\020"
"\030\022 \n\034PrivilegeCreateResourceGroup\020\032\022\036\n\032P"
"rivilegeDropResourceGroup\020\033\022\"\n\036Privilege"
"DescribeResourceGroup\020\034\022\037\n\033PrivilegeList"
"ResourceGroups\020\035\022\031\n\025PrivilegeTransferNod"
"e\020\036\022\034\n\030PrivilegeTransferReplica\020\037\022\037\n\033Pri"
"vilegeGetLoadingProgress\020 \022\031\n\025PrivilegeG"
"etLoadState\020!*S\n\tStateCode\022\020\n\014Initializi"
"ng\020\000\022\013\n\007Healthy\020\001\022\014\n\010Abnormal\020\002\022\013\n\007Stand"
"By\020\003\022\014\n\010Stopping\020\004*c\n\tLoadState\022\025\n\021LoadS"
"tateNotExist\020\000\022\024\n\020LoadStateNotLoad\020\001\022\024\n\020"
"LoadStateLoading\020\002\022\023\n\017LoadStateLoaded\020\003:"
"^\n\021privilege_ext_obj\022\037.google.protobuf.M"
"essageOptions\030\351\007 \001(\0132!.milvus.proto.comm"
"on.PrivilegeExtBU\n\016io.milvus.grpcB\013Commo"
"nProtoP\001Z1github.com/milvus-io/milvus-pr"
"oto/go-api/commonpb\240\001\001b\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = {
&::descriptor_table_google_2fprotobuf_2fdescriptor_2eproto,
@ -513,7 +513,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once;
static bool descriptor_table_common_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = {
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 6090,
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 6110,
&descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 11, 1,
schemas, file_default_instances, TableStruct_common_2eproto::offsets,
file_level_metadata_common_2eproto, 11, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto,
@ -587,6 +587,7 @@ bool ErrorCode_IsValid(int value) {
case 55:
case 56:
case 57:
case 58:
case 100:
case 1000:
return true;

View File

@ -170,6 +170,7 @@ enum ErrorCode : int {
TimeTickLongDelay = 55,
NotReadyServe = 56,
NotReadyCoordActivating = 57,
NotFoundTSafer = 58,
DataCoordNA = 100,
DDRequestRace = 1000,
ErrorCode_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),

View File

@ -338,7 +338,7 @@ func (t *queryTask) Execute(ctx context.Context) error {
retryCtx, cancel := context.WithCancel(ctx)
err := retry.Do(retryCtx, func() error {
queryError := executeQuery()
if !common.IsTriableError(queryError) {
if !common.IsRetryableError(queryError) {
cancel()
}
if queryError != nil {

View File

@ -418,7 +418,7 @@ func (t *searchTask) Execute(ctx context.Context) error {
retryCtx, cancel := context.WithCancel(ctx)
err := retry.Do(retryCtx, func() error {
searchErr := executeSearch()
if !common.IsTriableError(searchErr) {
if !common.IsRetryableError(searchErr) {
cancel()
}
if searchErr != nil {

View File

@ -952,6 +952,7 @@ func (node *QueryNode) queryWithDmlChannel(ctx context.Context, req *querypb.Que
qs, err := node.queryShardService.getQueryShard(dmlChannel)
if err != nil {
log.Ctx(ctx).Warn("Query failed, failed to get query shard", zap.Int64("msgID", msgID), zap.String("dml channel", dmlChannel), zap.Error(err))
failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
failRet.Status.Reason = err.Error()
return failRet, nil
}

View File

@ -24,7 +24,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
)
func genSimpleQueryShard(ctx context.Context) (*queryShard, error) {
@ -123,6 +125,12 @@ func TestQueryShard_getServiceableTime(t *testing.T) {
deltaTimestamp, err := qs.getServiceableTime(qs.deltaChannel)
assert.NoError(t, err)
assert.Equal(t, timestamp, deltaTimestamp)
_, err = qs.getServiceableTime("unknown-channel")
assert.Error(t, err)
code, ret := common.GetErrorCode(err)
assert.True(t, ret)
assert.Equal(t, commonpb.ErrorCode_NotFoundTSafer, code)
}
func genSearchResultData(nq int64, topk int64, ids []int64, scores []float32, topks []int64) *schemapb.SearchResultData {

View File

@ -163,8 +163,13 @@ func (b *baseReadTask) Ready() (bool, error) {
serviceTime, err := b.QS.getServiceableTime(channel)
if err != nil {
return false, fmt.Errorf("failed to get service timestamp, taskID = %d, collectionID = %d, err=%w", b.ID(), b.CollectionID, err)
log.Error("failed to get service timestamp",
zap.Int64("taskID", b.ID()),
zap.Int64("collectionID", b.CollectionID),
zap.Error(err))
return false, err
}
guaranteeTs := b.GuaranteeTimestamp
gt, _ := tsoutil.ParseTS(guaranteeTs)
st, _ := tsoutil.ParseTS(serviceTime)

View File

@ -2,16 +2,18 @@ package querynode
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)
type baseReadTaskSuite struct {
@ -119,9 +121,25 @@ func (s *baseReadTaskSuite) TestReady() {
defer cancel()
s.task.ctx = ctx
s.Run("get serviceable time fail", func() {
s.task.DataScope = querypb.DataScope_Historical
mockedErr := errors.New("test")
s.tsafe.EXPECT().
getTSafe(mock.AnythingOfType("string")).
Return(0, mockedErr).
Times(1)
ready, err := s.task.Ready()
s.Error(err)
s.False(ready)
s.ErrorIs(err, mockedErr)
})
baseTime := time.Now()
serviceable := tsoutil.ComposeTSByTime(baseTime, 0)
s.tsafe.EXPECT().getTSafe(mock.AnythingOfType("string")).Return(serviceable, nil)
s.tsafe.EXPECT().
getTSafe(mock.AnythingOfType("string")).
Return(serviceable, nil)
s.Run("lag too large", func() {
tooLargeGuarantee := baseTime.Add(Params.QueryNodeCfg.MaxTimestampLag).Add(time.Second)
guaranteeTs := tsoutil.ComposeTSByTime(tooLargeGuarantee, 0)

View File

@ -22,6 +22,8 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -127,7 +129,8 @@ func (t *tSafeReplica) setTSafe(vChannel Channel, timestamp Timestamp) error {
func (t *tSafeReplica) getTSafePrivate(vChannel Channel) (*tSafe, error) {
if _, ok := t.tSafes[vChannel]; !ok {
return nil, fmt.Errorf("cannot found tSafer, vChannel = %s", vChannel)
return nil, common.NewCodeError(commonpb.ErrorCode_NotFoundTSafer,
fmt.Errorf("cannot found tSafer, vChannel = %s", vChannel))
}
return t.tSafes[vChannel], nil
}