From e938bacf201319e843288537c03c7fe891e638ae Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Fri, 7 Nov 2025 15:47:34 +0800 Subject: [PATCH] enhance: [2.6] skip check source id (#45379) pr: https://github.com/milvus-io/milvus/pull/45377 relate:https://github.com/milvus-io/milvus/issues/45381 Signed-off-by: aoiasd --- internal/proxy/authentication_interceptor.go | 82 ++++++++----------- .../proxy/authentication_interceptor_test.go | 18 ---- internal/util/grpcclient/auth.go | 19 ----- internal/util/grpcclient/client.go | 4 - pkg/util/constant.go | 8 +- .../cluster/process/milvus_process.go | 4 - 6 files changed, 34 insertions(+), 101 deletions(-) delete mode 100644 internal/util/grpcclient/auth.go diff --git a/internal/proxy/authentication_interceptor.go b/internal/proxy/authentication_interceptor.go index 210a70e3a9..0083c4725e 100644 --- a/internal/proxy/authentication_interceptor.go +++ b/internal/proxy/authentication_interceptor.go @@ -32,20 +32,6 @@ func parseMD(rawToken string) (username, password string) { return } -func validSourceID(ctx context.Context, authorization []string) bool { - if len(authorization) < 1 { - // log.Warn("key not found in header", zap.String("key", util.HeaderSourceID)) - return false - } - // token format: base64 - token := authorization[0] - sourceID, err := crypto.Base64Decode(token) - if err != nil { - return false - } - return sourceID == util.MemberCredID -} - func GrpcAuthInterceptor(authFunc grpc_auth.AuthFunc) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { var newCtx context.Context @@ -76,48 +62,44 @@ func AuthenticationInterceptor(ctx context.Context) (context.Context, error) { if globalMetaCache == nil { return nil, merr.WrapErrServiceUnavailable("internal: Milvus Proxy is not ready yet. please wait") } - // check: - // 1. if rpc call from a member (like index/query/data component) - // 2. if rpc call from sdk + // check rpc call from sdk if Params.CommonCfg.AuthorizationEnabled.GetAsBool() { - if !validSourceID(ctx, md[strings.ToLower(util.HeaderSourceID)]) { - authStrArr := md[strings.ToLower(util.HeaderAuthorize)] + authStrArr := md[strings.ToLower(util.HeaderAuthorize)] - if len(authStrArr) < 1 { - log.Warn("key not found in header") - return nil, status.Error(codes.Unauthenticated, "missing authorization in header") - } + if len(authStrArr) < 1 { + log.Warn("key not found in header") + return nil, status.Error(codes.Unauthenticated, "missing authorization in header") + } - // token format: base64 - // token := strings.TrimPrefix(authorization[0], "Bearer ") - token := authStrArr[0] - rawToken, err := crypto.Base64Decode(token) + // token format: base64 + // token := strings.TrimPrefix(authorization[0], "Bearer ") + token := authStrArr[0] + rawToken, err := crypto.Base64Decode(token) + if err != nil { + log.Warn("fail to decode the token", zap.Error(err)) + return nil, status.Error(codes.Unauthenticated, "invalid token format") + } + + if !strings.Contains(rawToken, util.CredentialSeperator) { + user, err := VerifyAPIKey(rawToken) if err != nil { - log.Warn("fail to decode the token", zap.Error(err)) - return nil, status.Error(codes.Unauthenticated, "invalid token format") + log.Warn("fail to verify apikey", zap.Error(err)) + return nil, status.Error(codes.Unauthenticated, "auth check failure, please check api key is correct") } - - if !strings.Contains(rawToken, util.CredentialSeperator) { - user, err := VerifyAPIKey(rawToken) - if err != nil { - log.Warn("fail to verify apikey", zap.Error(err)) - return nil, status.Error(codes.Unauthenticated, "auth check failure, please check api key is correct") - } - metrics.UserRPCCounter.WithLabelValues(user).Inc() - userToken := fmt.Sprintf("%s%s%s", user, util.CredentialSeperator, util.PasswordHolder) - md[strings.ToLower(util.HeaderAuthorize)] = []string{crypto.Base64Encode(userToken)} - md[util.HeaderToken] = []string{rawToken} - ctx = metadata.NewIncomingContext(ctx, md) - } else { - // username+password authentication - username, password := parseMD(rawToken) - if !passwordVerify(ctx, username, password, globalMetaCache) { - log.Warn("fail to verify password", zap.String("username", username)) - // NOTE: don't use the merr, because it will cause the wrong retry behavior in the sdk - return nil, status.Error(codes.Unauthenticated, "auth check failure, please check username and password are correct") - } - metrics.UserRPCCounter.WithLabelValues(username).Inc() + metrics.UserRPCCounter.WithLabelValues(user).Inc() + userToken := fmt.Sprintf("%s%s%s", user, util.CredentialSeperator, util.PasswordHolder) + md[strings.ToLower(util.HeaderAuthorize)] = []string{crypto.Base64Encode(userToken)} + md[util.HeaderToken] = []string{rawToken} + ctx = metadata.NewIncomingContext(ctx, md) + } else { + // username+password authentication + username, password := parseMD(rawToken) + if !passwordVerify(ctx, username, password, globalMetaCache) { + log.Warn("fail to verify password", zap.String("username", username)) + // NOTE: don't use the merr, because it will cause the wrong retry behavior in the sdk + return nil, status.Error(codes.Unauthenticated, "auth check failure, please check username and password are correct") } + metrics.UserRPCCounter.WithLabelValues(username).Inc() } } return ctx, nil diff --git a/internal/proxy/authentication_interceptor_test.go b/internal/proxy/authentication_interceptor_test.go index bd0ba45dc6..31654374a5 100644 --- a/internal/proxy/authentication_interceptor_test.go +++ b/internal/proxy/authentication_interceptor_test.go @@ -49,19 +49,6 @@ func TestValidAuth(t *testing.T) { assert.False(t, res) } -func TestValidSourceID(t *testing.T) { - ctx := context.Background() - // no metadata - res := validSourceID(ctx, nil) - assert.False(t, res) - // illegal metadata - res = validSourceID(ctx, []string{"invalid_sourceid"}) - assert.False(t, res) - // normal sourceId - res = validSourceID(ctx, []string{crypto.Base64Encode(util.MemberCredID)}) - assert.True(t, res) -} - func TestAuthenticationInterceptor(t *testing.T) { ctx := context.Background() paramtable.Get().Save(Params.CommonCfg.AuthorizationEnabled.Key, "true") // mock authorization is turned on @@ -84,11 +71,6 @@ func TestAuthenticationInterceptor(t *testing.T) { ctx = metadata.NewIncomingContext(ctx, md) _, err = AuthenticationInterceptor(ctx) assert.NoError(t, err) - // with valid sourceId - md = metadata.Pairs("sourceid", crypto.Base64Encode(util.MemberCredID)) - ctx = metadata.NewIncomingContext(ctx, md) - _, err = AuthenticationInterceptor(ctx) - assert.NoError(t, err) { // wrong authorization style diff --git a/internal/util/grpcclient/auth.go b/internal/util/grpcclient/auth.go deleted file mode 100644 index 1d0d91bb55..0000000000 --- a/internal/util/grpcclient/auth.go +++ /dev/null @@ -1,19 +0,0 @@ -package grpcclient - -import ( - "context" - - "github.com/milvus-io/milvus/pkg/v2/util" -) - -type Token struct { - Value string -} - -func (t *Token) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) { - return map[string]string{util.HeaderSourceID: t.Value}, nil -} - -func (t *Token) RequireTransportSecurity() bool { - return false -} diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go index 3974d1b0b3..61b28317b2 100644 --- a/internal/util/grpcclient/client.go +++ b/internal/util/grpcclient/client.go @@ -40,8 +40,6 @@ import ( "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/tracer" - "github.com/milvus-io/milvus/pkg/v2/util" - "github.com/milvus-io/milvus/pkg/v2/util/crypto" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/generic" "github.com/milvus-io/milvus/pkg/v2/util/interceptor" @@ -310,7 +308,6 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { }, MinConnectTimeout: c.DialTimeout, }), - grpc.WithPerRPCCredentials(&Token{Value: crypto.Base64Encode(util.MemberCredID)}), grpc.FailOnNonTempDialError(true), grpc.WithReturnConnectionError(), grpc.WithDisableRetry(), @@ -349,7 +346,6 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { }, MinConnectTimeout: c.DialTimeout, }), - grpc.WithPerRPCCredentials(&Token{Value: crypto.Base64Encode(util.MemberCredID)}), grpc.FailOnNonTempDialError(true), grpc.WithReturnConnectionError(), grpc.WithDisableRetry(), diff --git a/pkg/util/constant.go b/pkg/util/constant.go index 72f52c80b5..a16a595282 100644 --- a/pkg/util/constant.go +++ b/pkg/util/constant.go @@ -44,12 +44,8 @@ const ( SegmentIndexPrefix = "segment-index" FieldIndexPrefix = "field-index" - HeaderAuthorize = "authorization" - HeaderToken = "token" - // HeaderSourceID identify requests from Milvus members and client requests - HeaderSourceID = "sourceId" - // MemberCredID id for Milvus members (data/index/query node/coord component) - MemberCredID = "@@milvus-member@@" + HeaderAuthorize = "authorization" + HeaderToken = "token" CredentialSeperator = ":" UserRoot = "root" PasswordHolder = "___" diff --git a/tests/integration/cluster/process/milvus_process.go b/tests/integration/cluster/process/milvus_process.go index e0ffe7b972..8d732c0ade 100644 --- a/tests/integration/cluster/process/milvus_process.go +++ b/tests/integration/cluster/process/milvus_process.go @@ -45,12 +45,9 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/v2/log" - "github.com/milvus-io/milvus/pkg/v2/util" "github.com/milvus-io/milvus/pkg/v2/util/contextutil" - "github.com/milvus-io/milvus/pkg/v2/util/crypto" "github.com/milvus-io/milvus/pkg/v2/util/interceptor" "github.com/milvus-io/milvus/pkg/v2/util/lifetime" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -499,7 +496,6 @@ func DailGRPClient(ctx context.Context, addr string, rootPath string, nodeID int }, MinConnectTimeout: 5 * time.Second, }), - grpc.WithPerRPCCredentials(&grpcclient.Token{Value: crypto.Base64Encode(util.MemberCredID)}), grpc.FailOnNonTempDialError(true), grpc.WithReturnConnectionError(), grpc.WithDisableRetry(),