From 6605db2a2449680dd7f72409b243b0064db213bf Mon Sep 17 00:00:00 2001 From: jaime Date: Fri, 3 Mar 2023 16:37:49 +0800 Subject: [PATCH] Add NotFoundTSafer and NoReplicaAvailable to retryable error code (#22505) Signed-off-by: jaime --- go.mod | 2 +- go.sum | 2 + internal/common/error.go | 24 ++- internal/common/error_test.go | 24 ++- internal/core/src/pb/common.pb.cc | 199 +++++++++++++------------ internal/core/src/pb/common.pb.h | 1 + internal/proxy/task_query.go | 2 +- internal/proxy/task_search.go | 2 +- internal/querynode/impl.go | 1 + internal/querynode/query_shard_test.go | 8 + internal/querynode/task_read.go | 7 +- internal/querynode/task_read_test.go | 24 ++- internal/querynode/tsafe_replica.go | 5 +- 13 files changed, 182 insertions(+), 119 deletions(-) diff --git a/go.mod b/go.mod index 8fff5ec25c..e1c2e98a1b 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/klauspost/compress v1.14.2 github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-2c1f3aadc378 + github.com/milvus-io/milvus-proto/go-api v0.0.0-20230301092744-7efc6eec15fd github.com/minio/minio-go/v7 v7.0.17 github.com/opentracing/opentracing-go v1.2.0 github.com/panjf2000/ants/v2 v2.4.8 diff --git a/go.sum b/go.sum index 7a278939a2..df7f1ab287 100644 --- a/go.sum +++ b/go.sum @@ -493,6 +493,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-2c1f3aadc378 h1:ttJp/ZUB/3GGbd2mIbASSfdOiBUrkP50gn5gDgCsD0g= github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-2c1f3aadc378/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20230301092744-7efc6eec15fd h1:9ilgTEqZSdEPbJKSrRGB1TIHTaF7DqVDIwn8/azcaBk= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20230301092744-7efc6eec15fd/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= github.com/milvus-io/pulsar-client-go v0.6.8 h1:fZdZH73aPRszu2fazyeeahQEz34tyn1Pt9EkqJmV100= github.com/milvus-io/pulsar-client-go v0.6.8/go.mod h1:oFIlYIk23tamkSLttw849qphmMIpHY8ztEBWDWJW+sc= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/common/error.go b/internal/common/error.go index dc59bab1cd..bd4827d6a8 100644 --- a/internal/common/error.go +++ b/internal/common/error.go @@ -158,16 +158,26 @@ func (t *codeError) Is(err error) bool { return ok } -func IsRetryErrorCode(code commonpb.ErrorCode) bool { - return code == commonpb.ErrorCode_NotReadyServe || - code == commonpb.ErrorCode_NotShardLeader || - code == commonpb.ErrorCode_NodeIDNotMatch +func GetErrorCode(e error) (commonpb.ErrorCode, bool) { + codeErr, ok := e.(*codeError) + if !ok { + return -1, false + } + return codeErr.code, true } -func IsTriableError(err error) bool { - if is := errors.Is(err, CodeErr); is { +func IsRetryableErrorCode(code commonpb.ErrorCode) bool { + return code == commonpb.ErrorCode_NotReadyServe || + code == commonpb.ErrorCode_NotShardLeader || + code == commonpb.ErrorCode_NodeIDNotMatch || + code == commonpb.ErrorCode_NoReplicaAvailable || + code == commonpb.ErrorCode_NotFoundTSafer +} + +func IsRetryableError(err error) bool { + if ok := errors.Is(err, CodeErr); ok { codeErr := err.(*codeError) - return IsRetryErrorCode(codeErr.code) + return IsRetryableErrorCode(codeErr.code) } return false } diff --git a/internal/common/error_test.go b/internal/common/error_test.go index 359ba4516e..4c089736ed 100644 --- a/internal/common/error_test.go +++ b/internal/common/error_test.go @@ -21,9 +21,9 @@ import ( "fmt" "testing" - "github.com/milvus-io/milvus-proto/go-api/commonpb" - "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/commonpb" ) func TestIgnorableError(t *testing.T) { @@ -81,7 +81,21 @@ func TestStatusFromError(t *testing.T) { } func TestCodeError(t *testing.T) { - assert.False(t, IsTriableError(nil)) - assert.True(t, IsTriableError(NewCodeError(commonpb.ErrorCode_NotReadyServe, nil))) - assert.False(t, IsTriableError(NewCodeError(commonpb.ErrorCode_UnexpectedError, nil))) + assert.False(t, IsRetryableError(nil)) + assert.True(t, IsRetryableError(NewCodeError(commonpb.ErrorCode_NotReadyServe, nil))) + assert.False(t, IsRetryableError(NewCodeError(commonpb.ErrorCode_UnexpectedError, nil))) + + { + err := NewCodeError(commonpb.ErrorCode_NotReadyServe, nil) + code, ret := GetErrorCode(err) + assert.True(t, ret) + assert.Equal(t, commonpb.ErrorCode_NotReadyServe, code) + } + + { + err := errors.New("test") + code, ret := GetErrorCode(err) + assert.False(t, ret) + assert.Equal(t, commonpb.ErrorCode(-1), code) + } } diff --git a/internal/core/src/pb/common.pb.cc b/internal/core/src/pb/common.pb.cc index fa88afddc4..b51baa05d7 100644 --- a/internal/core/src/pb/common.pb.cc +++ b/internal/core/src/pb/common.pb.cc @@ -363,7 +363,7 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE( ".ObjectType\022>\n\020object_privilege\030\002 \001(\0162$." "milvus.proto.common.ObjectPrivilege\022\031\n\021o" "bject_name_index\030\003 \001(\005\022\032\n\022object_name_in" - "dexs\030\004 \001(\005*\303\n\n\tErrorCode\022\013\n\007Success\020\000\022\023\n" + "dexs\030\004 \001(\005*\327\n\n\tErrorCode\022\013\n\007Success\020\000\022\023\n" "\017UnexpectedError\020\001\022\021\n\rConnectFailed\020\002\022\024\n" "\020PermissionDenied\020\003\022\027\n\023CollectionNotExis" "ts\020\004\022\023\n\017IllegalArgument\020\005\022\024\n\020IllegalDime" @@ -396,103 +396,103 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE( "\0204\022\030\n\024MemoryQuotaExhausted\0205\022\026\n\022DiskQuot" "aExhausted\0206\022\025\n\021TimeTickLongDelay\0207\022\021\n\rN" "otReadyServe\0208\022\033\n\027NotReadyCoordActivatin" - "g\0209\022\017\n\013DataCoordNA\020d\022\022\n\rDDRequestRace\020\350\007" - "*c\n\nIndexState\022\022\n\016IndexStateNone\020\000\022\014\n\010Un" - "issued\020\001\022\016\n\nInProgress\020\002\022\014\n\010Finished\020\003\022\n" - "\n\006Failed\020\004\022\t\n\005Retry\020\005*\202\001\n\014SegmentState\022\024" - "\n\020SegmentStateNone\020\000\022\014\n\010NotExist\020\001\022\013\n\007Gr" - "owing\020\002\022\n\n\006Sealed\020\003\022\013\n\007Flushed\020\004\022\014\n\010Flus" - "hing\020\005\022\013\n\007Dropped\020\006\022\r\n\tImporting\020\007*>\n\017Pl" - "aceholderType\022\010\n\004None\020\000\022\020\n\014BinaryVector\020" - "d\022\017\n\013FloatVector\020e*\263\016\n\007MsgType\022\r\n\tUndefi" - "ned\020\000\022\024\n\020CreateCollection\020d\022\022\n\016DropColle" - "ction\020e\022\021\n\rHasCollection\020f\022\026\n\022DescribeCo" - "llection\020g\022\023\n\017ShowCollections\020h\022\024\n\020GetSy" - "stemConfigs\020i\022\022\n\016LoadCollection\020j\022\025\n\021Rel" - "easeCollection\020k\022\017\n\013CreateAlias\020l\022\r\n\tDro" - "pAlias\020m\022\016\n\nAlterAlias\020n\022\023\n\017AlterCollect" - "ion\020o\022\024\n\020RenameCollection\020p\022\024\n\017CreatePar" - "tition\020\310\001\022\022\n\rDropPartition\020\311\001\022\021\n\014HasPart" - "ition\020\312\001\022\026\n\021DescribePartition\020\313\001\022\023\n\016Show" - "Partitions\020\314\001\022\023\n\016LoadPartitions\020\315\001\022\026\n\021Re" - "leasePartitions\020\316\001\022\021\n\014ShowSegments\020\372\001\022\024\n" - "\017DescribeSegment\020\373\001\022\021\n\014LoadSegments\020\374\001\022\024" - "\n\017ReleaseSegments\020\375\001\022\024\n\017HandoffSegments\020" - "\376\001\022\030\n\023LoadBalanceSegments\020\377\001\022\025\n\020Describe" - "Segments\020\200\002\022\020\n\013CreateIndex\020\254\002\022\022\n\rDescrib" - "eIndex\020\255\002\022\016\n\tDropIndex\020\256\002\022\013\n\006Insert\020\220\003\022\013" - "\n\006Delete\020\221\003\022\n\n\005Flush\020\222\003\022\027\n\022ResendSegment" - "Stats\020\223\003\022\013\n\006Search\020\364\003\022\021\n\014SearchResult\020\365\003" - "\022\022\n\rGetIndexState\020\366\003\022\032\n\025GetIndexBuildPro" - "gress\020\367\003\022\034\n\027GetCollectionStatistics\020\370\003\022\033" - "\n\026GetPartitionStatistics\020\371\003\022\r\n\010Retrieve\020" - "\372\003\022\023\n\016RetrieveResult\020\373\003\022\024\n\017WatchDmChanne" - "ls\020\374\003\022\025\n\020RemoveDmChannels\020\375\003\022\027\n\022WatchQue" - "ryChannels\020\376\003\022\030\n\023RemoveQueryChannels\020\377\003\022" - "\035\n\030SealedSegmentsChangeInfo\020\200\004\022\027\n\022WatchD" - "eltaChannels\020\201\004\022\024\n\017GetShardLeaders\020\202\004\022\020\n" - "\013GetReplicas\020\203\004\022\023\n\016UnsubDmChannel\020\204\004\022\024\n\017" - "GetDistribution\020\205\004\022\025\n\020SyncDistribution\020\206" - "\004\022\020\n\013SegmentInfo\020\330\004\022\017\n\nSystemInfo\020\331\004\022\024\n\017" - "GetRecoveryInfo\020\332\004\022\024\n\017GetSegmentState\020\333\004" - "\022\r\n\010TimeTick\020\260\t\022\023\n\016QueryNodeStats\020\261\t\022\016\n\t" - "LoadIndex\020\262\t\022\016\n\tRequestID\020\263\t\022\017\n\nRequestT" - "SO\020\264\t\022\024\n\017AllocateSegment\020\265\t\022\026\n\021SegmentSt" - "atistics\020\266\t\022\025\n\020SegmentFlushDone\020\267\t\022\017\n\nDa" - "taNodeTt\020\270\t\022\025\n\020CreateCredential\020\334\013\022\022\n\rGe" - "tCredential\020\335\013\022\025\n\020DeleteCredential\020\336\013\022\025\n" - "\020UpdateCredential\020\337\013\022\026\n\021ListCredUsername" - "s\020\340\013\022\017\n\nCreateRole\020\300\014\022\r\n\010DropRole\020\301\014\022\024\n\017" - "OperateUserRole\020\302\014\022\017\n\nSelectRole\020\303\014\022\017\n\nS" - "electUser\020\304\014\022\023\n\016SelectResource\020\305\014\022\025\n\020Ope" - "ratePrivilege\020\306\014\022\020\n\013SelectGrant\020\307\014\022\033\n\026Re" - "freshPolicyInfoCache\020\310\014\022\017\n\nListPolicy\020\311\014" - "\022\030\n\023CreateResourceGroup\020\244\r\022\026\n\021DropResour" - "ceGroup\020\245\r\022\027\n\022ListResourceGroups\020\246\r\022\032\n\025D" - "escribeResourceGroup\020\247\r\022\021\n\014TransferNode\020" - "\250\r\022\024\n\017TransferReplica\020\251\r*\"\n\007DslType\022\007\n\003D" - "sl\020\000\022\016\n\nBoolExprV1\020\001*B\n\017CompactionState\022" - "\021\n\rUndefiedState\020\000\022\r\n\tExecuting\020\001\022\r\n\tCom" - "pleted\020\002*X\n\020ConsistencyLevel\022\n\n\006Strong\020\000" - "\022\013\n\007Session\020\001\022\013\n\007Bounded\020\002\022\016\n\nEventually" - "\020\003\022\016\n\nCustomized\020\004*\213\001\n\013ImportState\022\021\n\rIm" - "portPending\020\000\022\020\n\014ImportFailed\020\001\022\021\n\rImpor" - "tStarted\020\002\022\023\n\017ImportPersisted\020\005\022\023\n\017Impor" - "tCompleted\020\006\022\032\n\026ImportFailedAndCleaned\020\007" - "*2\n\nObjectType\022\016\n\nCollection\020\000\022\n\n\006Global" - "\020\001\022\010\n\004User\020\002*\202\007\n\017ObjectPrivilege\022\020\n\014Priv" - "ilegeAll\020\000\022\035\n\031PrivilegeCreateCollection\020" - "\001\022\033\n\027PrivilegeDropCollection\020\002\022\037\n\033Privil" - "egeDescribeCollection\020\003\022\034\n\030PrivilegeShow" - "Collections\020\004\022\021\n\rPrivilegeLoad\020\005\022\024\n\020Priv" - "ilegeRelease\020\006\022\027\n\023PrivilegeCompaction\020\007\022" - "\023\n\017PrivilegeInsert\020\010\022\023\n\017PrivilegeDelete\020" - "\t\022\032\n\026PrivilegeGetStatistics\020\n\022\030\n\024Privile" - "geCreateIndex\020\013\022\030\n\024PrivilegeIndexDetail\020" - "\014\022\026\n\022PrivilegeDropIndex\020\r\022\023\n\017PrivilegeSe" - "arch\020\016\022\022\n\016PrivilegeFlush\020\017\022\022\n\016PrivilegeQ" - "uery\020\020\022\030\n\024PrivilegeLoadBalance\020\021\022\023\n\017Priv" - "ilegeImport\020\022\022\034\n\030PrivilegeCreateOwnershi" - "p\020\023\022\027\n\023PrivilegeUpdateUser\020\024\022\032\n\026Privileg" - "eDropOwnership\020\025\022\034\n\030PrivilegeSelectOwner" - "ship\020\026\022\034\n\030PrivilegeManageOwnership\020\027\022\027\n\023" - "PrivilegeSelectUser\020\030\022 \n\034PrivilegeCreate" - "ResourceGroup\020\032\022\036\n\032PrivilegeDropResource" - "Group\020\033\022\"\n\036PrivilegeDescribeResourceGrou" - "p\020\034\022\037\n\033PrivilegeListResourceGroups\020\035\022\031\n\025" - "PrivilegeTransferNode\020\036\022\034\n\030PrivilegeTran" - "sferReplica\020\037\022\037\n\033PrivilegeGetLoadingProg" - "ress\020 \022\031\n\025PrivilegeGetLoadState\020!*S\n\tSta" - "teCode\022\020\n\014Initializing\020\000\022\013\n\007Healthy\020\001\022\014\n" - "\010Abnormal\020\002\022\013\n\007StandBy\020\003\022\014\n\010Stopping\020\004*c" - "\n\tLoadState\022\025\n\021LoadStateNotExist\020\000\022\024\n\020Lo" - "adStateNotLoad\020\001\022\024\n\020LoadStateLoading\020\002\022\023" - "\n\017LoadStateLoaded\020\003:^\n\021privilege_ext_obj" - "\022\037.google.protobuf.MessageOptions\030\351\007 \001(\013" - "2!.milvus.proto.common.PrivilegeExtBU\n\016i" - "o.milvus.grpcB\013CommonProtoP\001Z1github.com" - "/milvus-io/milvus-proto/go-api/commonpb\240" - "\001\001b\006proto3" + "g\0209\022\022\n\016NotFoundTSafer\020:\022\017\n\013DataCoordNA\020d" + "\022\022\n\rDDRequestRace\020\350\007*c\n\nIndexState\022\022\n\016In" + "dexStateNone\020\000\022\014\n\010Unissued\020\001\022\016\n\nInProgre" + "ss\020\002\022\014\n\010Finished\020\003\022\n\n\006Failed\020\004\022\t\n\005Retry\020" + "\005*\202\001\n\014SegmentState\022\024\n\020SegmentStateNone\020\000" + "\022\014\n\010NotExist\020\001\022\013\n\007Growing\020\002\022\n\n\006Sealed\020\003\022" + "\013\n\007Flushed\020\004\022\014\n\010Flushing\020\005\022\013\n\007Dropped\020\006\022" + "\r\n\tImporting\020\007*>\n\017PlaceholderType\022\010\n\004Non" + "e\020\000\022\020\n\014BinaryVector\020d\022\017\n\013FloatVector\020e*\263" + "\016\n\007MsgType\022\r\n\tUndefined\020\000\022\024\n\020CreateColle" + "ction\020d\022\022\n\016DropCollection\020e\022\021\n\rHasCollec" + "tion\020f\022\026\n\022DescribeCollection\020g\022\023\n\017ShowCo" + "llections\020h\022\024\n\020GetSystemConfigs\020i\022\022\n\016Loa" + "dCollection\020j\022\025\n\021ReleaseCollection\020k\022\017\n\013" + "CreateAlias\020l\022\r\n\tDropAlias\020m\022\016\n\nAlterAli" + "as\020n\022\023\n\017AlterCollection\020o\022\024\n\020RenameColle" + "ction\020p\022\024\n\017CreatePartition\020\310\001\022\022\n\rDropPar" + "tition\020\311\001\022\021\n\014HasPartition\020\312\001\022\026\n\021Describe" + "Partition\020\313\001\022\023\n\016ShowPartitions\020\314\001\022\023\n\016Loa" + "dPartitions\020\315\001\022\026\n\021ReleasePartitions\020\316\001\022\021" + "\n\014ShowSegments\020\372\001\022\024\n\017DescribeSegment\020\373\001\022" + "\021\n\014LoadSegments\020\374\001\022\024\n\017ReleaseSegments\020\375\001" + "\022\024\n\017HandoffSegments\020\376\001\022\030\n\023LoadBalanceSeg" + "ments\020\377\001\022\025\n\020DescribeSegments\020\200\002\022\020\n\013Creat" + "eIndex\020\254\002\022\022\n\rDescribeIndex\020\255\002\022\016\n\tDropInd" + "ex\020\256\002\022\013\n\006Insert\020\220\003\022\013\n\006Delete\020\221\003\022\n\n\005Flush" + "\020\222\003\022\027\n\022ResendSegmentStats\020\223\003\022\013\n\006Search\020\364" + "\003\022\021\n\014SearchResult\020\365\003\022\022\n\rGetIndexState\020\366\003" + "\022\032\n\025GetIndexBuildProgress\020\367\003\022\034\n\027GetColle" + "ctionStatistics\020\370\003\022\033\n\026GetPartitionStatis" + "tics\020\371\003\022\r\n\010Retrieve\020\372\003\022\023\n\016RetrieveResult" + "\020\373\003\022\024\n\017WatchDmChannels\020\374\003\022\025\n\020RemoveDmCha" + "nnels\020\375\003\022\027\n\022WatchQueryChannels\020\376\003\022\030\n\023Rem" + "oveQueryChannels\020\377\003\022\035\n\030SealedSegmentsCha" + "ngeInfo\020\200\004\022\027\n\022WatchDeltaChannels\020\201\004\022\024\n\017G" + "etShardLeaders\020\202\004\022\020\n\013GetReplicas\020\203\004\022\023\n\016U" + "nsubDmChannel\020\204\004\022\024\n\017GetDistribution\020\205\004\022\025" + "\n\020SyncDistribution\020\206\004\022\020\n\013SegmentInfo\020\330\004\022" + "\017\n\nSystemInfo\020\331\004\022\024\n\017GetRecoveryInfo\020\332\004\022\024" + "\n\017GetSegmentState\020\333\004\022\r\n\010TimeTick\020\260\t\022\023\n\016Q" + "ueryNodeStats\020\261\t\022\016\n\tLoadIndex\020\262\t\022\016\n\tRequ" + "estID\020\263\t\022\017\n\nRequestTSO\020\264\t\022\024\n\017AllocateSeg" + "ment\020\265\t\022\026\n\021SegmentStatistics\020\266\t\022\025\n\020Segme" + "ntFlushDone\020\267\t\022\017\n\nDataNodeTt\020\270\t\022\025\n\020Creat" + "eCredential\020\334\013\022\022\n\rGetCredential\020\335\013\022\025\n\020De" + "leteCredential\020\336\013\022\025\n\020UpdateCredential\020\337\013" + "\022\026\n\021ListCredUsernames\020\340\013\022\017\n\nCreateRole\020\300" + "\014\022\r\n\010DropRole\020\301\014\022\024\n\017OperateUserRole\020\302\014\022\017" + "\n\nSelectRole\020\303\014\022\017\n\nSelectUser\020\304\014\022\023\n\016Sele" + "ctResource\020\305\014\022\025\n\020OperatePrivilege\020\306\014\022\020\n\013" + "SelectGrant\020\307\014\022\033\n\026RefreshPolicyInfoCache" + "\020\310\014\022\017\n\nListPolicy\020\311\014\022\030\n\023CreateResourceGr" + "oup\020\244\r\022\026\n\021DropResourceGroup\020\245\r\022\027\n\022ListRe" + "sourceGroups\020\246\r\022\032\n\025DescribeResourceGroup" + "\020\247\r\022\021\n\014TransferNode\020\250\r\022\024\n\017TransferReplic" + "a\020\251\r*\"\n\007DslType\022\007\n\003Dsl\020\000\022\016\n\nBoolExprV1\020\001" + "*B\n\017CompactionState\022\021\n\rUndefiedState\020\000\022\r" + "\n\tExecuting\020\001\022\r\n\tCompleted\020\002*X\n\020Consiste" + "ncyLevel\022\n\n\006Strong\020\000\022\013\n\007Session\020\001\022\013\n\007Bou" + "nded\020\002\022\016\n\nEventually\020\003\022\016\n\nCustomized\020\004*\213" + "\001\n\013ImportState\022\021\n\rImportPending\020\000\022\020\n\014Imp" + "ortFailed\020\001\022\021\n\rImportStarted\020\002\022\023\n\017Import" + "Persisted\020\005\022\023\n\017ImportCompleted\020\006\022\032\n\026Impo" + "rtFailedAndCleaned\020\007*2\n\nObjectType\022\016\n\nCo" + "llection\020\000\022\n\n\006Global\020\001\022\010\n\004User\020\002*\202\007\n\017Obj" + "ectPrivilege\022\020\n\014PrivilegeAll\020\000\022\035\n\031Privil" + "egeCreateCollection\020\001\022\033\n\027PrivilegeDropCo" + "llection\020\002\022\037\n\033PrivilegeDescribeCollectio" + "n\020\003\022\034\n\030PrivilegeShowCollections\020\004\022\021\n\rPri" + "vilegeLoad\020\005\022\024\n\020PrivilegeRelease\020\006\022\027\n\023Pr" + "ivilegeCompaction\020\007\022\023\n\017PrivilegeInsert\020\010" + "\022\023\n\017PrivilegeDelete\020\t\022\032\n\026PrivilegeGetSta" + "tistics\020\n\022\030\n\024PrivilegeCreateIndex\020\013\022\030\n\024P" + "rivilegeIndexDetail\020\014\022\026\n\022PrivilegeDropIn" + "dex\020\r\022\023\n\017PrivilegeSearch\020\016\022\022\n\016PrivilegeF" + "lush\020\017\022\022\n\016PrivilegeQuery\020\020\022\030\n\024PrivilegeL" + "oadBalance\020\021\022\023\n\017PrivilegeImport\020\022\022\034\n\030Pri" + "vilegeCreateOwnership\020\023\022\027\n\023PrivilegeUpda" + "teUser\020\024\022\032\n\026PrivilegeDropOwnership\020\025\022\034\n\030" + "PrivilegeSelectOwnership\020\026\022\034\n\030PrivilegeM" + "anageOwnership\020\027\022\027\n\023PrivilegeSelectUser\020" + "\030\022 \n\034PrivilegeCreateResourceGroup\020\032\022\036\n\032P" + "rivilegeDropResourceGroup\020\033\022\"\n\036Privilege" + "DescribeResourceGroup\020\034\022\037\n\033PrivilegeList" + "ResourceGroups\020\035\022\031\n\025PrivilegeTransferNod" + "e\020\036\022\034\n\030PrivilegeTransferReplica\020\037\022\037\n\033Pri" + "vilegeGetLoadingProgress\020 \022\031\n\025PrivilegeG" + "etLoadState\020!*S\n\tStateCode\022\020\n\014Initializi" + "ng\020\000\022\013\n\007Healthy\020\001\022\014\n\010Abnormal\020\002\022\013\n\007Stand" + "By\020\003\022\014\n\010Stopping\020\004*c\n\tLoadState\022\025\n\021LoadS" + "tateNotExist\020\000\022\024\n\020LoadStateNotLoad\020\001\022\024\n\020" + "LoadStateLoading\020\002\022\023\n\017LoadStateLoaded\020\003:" + "^\n\021privilege_ext_obj\022\037.google.protobuf.M" + "essageOptions\030\351\007 \001(\0132!.milvus.proto.comm" + "on.PrivilegeExtBU\n\016io.milvus.grpcB\013Commo" + "nProtoP\001Z1github.com/milvus-io/milvus-pr" + "oto/go-api/commonpb\240\001\001b\006proto3" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = { &::descriptor_table_google_2fprotobuf_2fdescriptor_2eproto, @@ -513,7 +513,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once; static bool descriptor_table_common_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = { - &descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 6090, + &descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 6110, &descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 11, 1, schemas, file_default_instances, TableStruct_common_2eproto::offsets, file_level_metadata_common_2eproto, 11, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto, @@ -587,6 +587,7 @@ bool ErrorCode_IsValid(int value) { case 55: case 56: case 57: + case 58: case 100: case 1000: return true; diff --git a/internal/core/src/pb/common.pb.h b/internal/core/src/pb/common.pb.h index 8579af5a08..7f2ddfc6d6 100644 --- a/internal/core/src/pb/common.pb.h +++ b/internal/core/src/pb/common.pb.h @@ -170,6 +170,7 @@ enum ErrorCode : int { TimeTickLongDelay = 55, NotReadyServe = 56, NotReadyCoordActivating = 57, + NotFoundTSafer = 58, DataCoordNA = 100, DDRequestRace = 1000, ErrorCode_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(), diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 2b2acb70d4..9648c1e4f3 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -338,7 +338,7 @@ func (t *queryTask) Execute(ctx context.Context) error { retryCtx, cancel := context.WithCancel(ctx) err := retry.Do(retryCtx, func() error { queryError := executeQuery() - if !common.IsTriableError(queryError) { + if !common.IsRetryableError(queryError) { cancel() } if queryError != nil { diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 4ba9a284a1..315c251c0b 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -418,7 +418,7 @@ func (t *searchTask) Execute(ctx context.Context) error { retryCtx, cancel := context.WithCancel(ctx) err := retry.Do(retryCtx, func() error { searchErr := executeSearch() - if !common.IsTriableError(searchErr) { + if !common.IsRetryableError(searchErr) { cancel() } if searchErr != nil { diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 11c494fb63..8007e136f2 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -952,6 +952,7 @@ func (node *QueryNode) queryWithDmlChannel(ctx context.Context, req *querypb.Que qs, err := node.queryShardService.getQueryShard(dmlChannel) if err != nil { log.Ctx(ctx).Warn("Query failed, failed to get query shard", zap.Int64("msgID", msgID), zap.String("dml channel", dmlChannel), zap.Error(err)) + failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader failRet.Status.Reason = err.Error() return failRet, nil } diff --git a/internal/querynode/query_shard_test.go b/internal/querynode/query_shard_test.go index 5a5796edf8..31b79e5f60 100644 --- a/internal/querynode/query_shard_test.go +++ b/internal/querynode/query_shard_test.go @@ -24,7 +24,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/common" ) func genSimpleQueryShard(ctx context.Context) (*queryShard, error) { @@ -123,6 +125,12 @@ func TestQueryShard_getServiceableTime(t *testing.T) { deltaTimestamp, err := qs.getServiceableTime(qs.deltaChannel) assert.NoError(t, err) assert.Equal(t, timestamp, deltaTimestamp) + + _, err = qs.getServiceableTime("unknown-channel") + assert.Error(t, err) + code, ret := common.GetErrorCode(err) + assert.True(t, ret) + assert.Equal(t, commonpb.ErrorCode_NotFoundTSafer, code) } func genSearchResultData(nq int64, topk int64, ids []int64, scores []float32, topks []int64) *schemapb.SearchResultData { diff --git a/internal/querynode/task_read.go b/internal/querynode/task_read.go index 8684424444..75edb2056a 100644 --- a/internal/querynode/task_read.go +++ b/internal/querynode/task_read.go @@ -163,8 +163,13 @@ func (b *baseReadTask) Ready() (bool, error) { serviceTime, err := b.QS.getServiceableTime(channel) if err != nil { - return false, fmt.Errorf("failed to get service timestamp, taskID = %d, collectionID = %d, err=%w", b.ID(), b.CollectionID, err) + log.Error("failed to get service timestamp", + zap.Int64("taskID", b.ID()), + zap.Int64("collectionID", b.CollectionID), + zap.Error(err)) + return false, err } + guaranteeTs := b.GuaranteeTimestamp gt, _ := tsoutil.ParseTS(guaranteeTs) st, _ := tsoutil.ParseTS(serviceTime) diff --git a/internal/querynode/task_read_test.go b/internal/querynode/task_read_test.go index e8d06407b4..846ce1b89a 100644 --- a/internal/querynode/task_read_test.go +++ b/internal/querynode/task_read_test.go @@ -2,16 +2,18 @@ package querynode import ( "context" + "errors" "testing" "time" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" ) type baseReadTaskSuite struct { @@ -119,9 +121,25 @@ func (s *baseReadTaskSuite) TestReady() { defer cancel() s.task.ctx = ctx + s.Run("get serviceable time fail", func() { + s.task.DataScope = querypb.DataScope_Historical + mockedErr := errors.New("test") + s.tsafe.EXPECT(). + getTSafe(mock.AnythingOfType("string")). + Return(0, mockedErr). + Times(1) + ready, err := s.task.Ready() + s.Error(err) + s.False(ready) + s.ErrorIs(err, mockedErr) + }) + baseTime := time.Now() serviceable := tsoutil.ComposeTSByTime(baseTime, 0) - s.tsafe.EXPECT().getTSafe(mock.AnythingOfType("string")).Return(serviceable, nil) + s.tsafe.EXPECT(). + getTSafe(mock.AnythingOfType("string")). + Return(serviceable, nil) + s.Run("lag too large", func() { tooLargeGuarantee := baseTime.Add(Params.QueryNodeCfg.MaxTimestampLag).Add(time.Second) guaranteeTs := tsoutil.ComposeTSByTime(tooLargeGuarantee, 0) diff --git a/internal/querynode/tsafe_replica.go b/internal/querynode/tsafe_replica.go index 9806108e6f..e178ee3c25 100644 --- a/internal/querynode/tsafe_replica.go +++ b/internal/querynode/tsafe_replica.go @@ -22,6 +22,8 @@ import ( "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -127,7 +129,8 @@ func (t *tSafeReplica) setTSafe(vChannel Channel, timestamp Timestamp) error { func (t *tSafeReplica) getTSafePrivate(vChannel Channel) (*tSafe, error) { if _, ok := t.tSafes[vChannel]; !ok { - return nil, fmt.Errorf("cannot found tSafer, vChannel = %s", vChannel) + return nil, common.NewCodeError(commonpb.ErrorCode_NotFoundTSafer, + fmt.Errorf("cannot found tSafer, vChannel = %s", vChannel)) } return t.tSafes[vChannel], nil }