mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
fix: [GoSDK] move metaheader to client in case of race (#40443)
See failure run in #40352 This PR: - move metaheader map to client struct from config - set default value for field schema Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
a56b24054f
commit
c348e61bdb
@ -221,6 +221,7 @@ func (f *Field) ProtoMessage() *schemapb.FieldSchema {
|
|||||||
IsClusteringKey: f.IsClusteringKey,
|
IsClusteringKey: f.IsClusteringKey,
|
||||||
ElementType: schemapb.DataType(f.ElementType),
|
ElementType: schemapb.DataType(f.ElementType),
|
||||||
Nullable: f.Nullable,
|
Nullable: f.Nullable,
|
||||||
|
DefaultValue: f.DefaultValue,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -419,6 +420,8 @@ func (f *Field) ReadProto(p *schemapb.FieldSchema) *Field {
|
|||||||
f.IsPartitionKey = p.GetIsPartitionKey()
|
f.IsPartitionKey = p.GetIsPartitionKey()
|
||||||
f.IsClusteringKey = p.GetIsClusteringKey()
|
f.IsClusteringKey = p.GetIsClusteringKey()
|
||||||
f.ElementType = FieldType(p.GetElementType())
|
f.ElementType = FieldType(p.GetElementType())
|
||||||
|
f.DefaultValue = p.GetDefaultValue()
|
||||||
|
f.Nullable = p.GetNullable()
|
||||||
|
|
||||||
return f
|
return f
|
||||||
}
|
}
|
||||||
|
|||||||
@ -37,6 +37,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||||
"github.com/milvus-io/milvus/client/v2/common"
|
"github.com/milvus-io/milvus/client/v2/common"
|
||||||
"github.com/milvus-io/milvus/client/v2/entity"
|
"github.com/milvus-io/milvus/client/v2/entity"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/util/crypto"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -48,7 +49,9 @@ type Client struct {
|
|||||||
// mutable status
|
// mutable status
|
||||||
stateMut sync.RWMutex
|
stateMut sync.RWMutex
|
||||||
currentDB string
|
currentDB string
|
||||||
identifier string
|
identifier string // Identifier for this connection
|
||||||
|
|
||||||
|
metadataHeaders map[string]string
|
||||||
|
|
||||||
collCache *CollectionCache
|
collCache *CollectionCache
|
||||||
}
|
}
|
||||||
@ -67,7 +70,7 @@ func New(ctx context.Context, config *ClientConfig) (*Client, error) {
|
|||||||
addr := c.config.getParsedAddress()
|
addr := c.config.getParsedAddress()
|
||||||
|
|
||||||
// parse authentication parameters
|
// parse authentication parameters
|
||||||
c.config.parseAuthentication()
|
c.parseAuthentication()
|
||||||
// Parse grpc options
|
// Parse grpc options
|
||||||
options := c.dialOptions()
|
options := c.dialOptions()
|
||||||
|
|
||||||
@ -117,6 +120,21 @@ func (c *Client) dialOptions() []grpc.DialOption {
|
|||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// parseAuthentication prepares authentication headers for grpc inteceptors based on the provided username, password or API key.
|
||||||
|
func (c *Client) parseAuthentication() {
|
||||||
|
cfg := c.config
|
||||||
|
c.metadataHeaders = make(map[string]string)
|
||||||
|
if cfg.Username != "" || cfg.Password != "" {
|
||||||
|
value := crypto.Base64Encode(fmt.Sprintf("%s:%s", cfg.Username, cfg.Password))
|
||||||
|
c.metadataHeaders[authorizationHeader] = value
|
||||||
|
}
|
||||||
|
// API overwrites username & passwd
|
||||||
|
if cfg.APIKey != "" {
|
||||||
|
value := crypto.Base64Encode(cfg.APIKey)
|
||||||
|
c.metadataHeaders[authorizationHeader] = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) Close(ctx context.Context) error {
|
func (c *Client) Close(ctx context.Context) error {
|
||||||
if c.conn == nil {
|
if c.conn == nil {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@ -13,8 +13,6 @@ import (
|
|||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/backoff"
|
"google.golang.org/grpc/backoff"
|
||||||
"google.golang.org/grpc/keepalive"
|
"google.golang.org/grpc/keepalive"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/crypto"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -64,9 +62,6 @@ type ClientConfig struct {
|
|||||||
|
|
||||||
DisableConn bool
|
DisableConn bool
|
||||||
|
|
||||||
metadataHeaders map[string]string
|
|
||||||
|
|
||||||
identifier string // Identifier for this connection
|
|
||||||
ServerVersion string // ServerVersion
|
ServerVersion string // ServerVersion
|
||||||
parsedAddress *url.URL
|
parsedAddress *url.URL
|
||||||
flags uint64 // internal flags
|
flags uint64 // internal flags
|
||||||
@ -117,29 +112,10 @@ func (c *ClientConfig) useDatabase(dbName string) {
|
|||||||
c.DBName = dbName
|
c.DBName = dbName
|
||||||
}
|
}
|
||||||
|
|
||||||
// useDatabase change the inner db name.
|
|
||||||
func (c *ClientConfig) setIdentifier(identifier string) {
|
|
||||||
c.identifier = identifier
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ClientConfig) setServerInfo(serverInfo string) {
|
func (c *ClientConfig) setServerInfo(serverInfo string) {
|
||||||
c.ServerVersion = serverInfo
|
c.ServerVersion = serverInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseAuthentication prepares authentication headers for grpc inteceptors based on the provided username, password or API key.
|
|
||||||
func (c *ClientConfig) parseAuthentication() {
|
|
||||||
c.metadataHeaders = make(map[string]string)
|
|
||||||
if c.Username != "" || c.Password != "" {
|
|
||||||
value := crypto.Base64Encode(fmt.Sprintf("%s:%s", c.Username, c.Password))
|
|
||||||
c.metadataHeaders[authorizationHeader] = value
|
|
||||||
}
|
|
||||||
// API overwrites username & passwd
|
|
||||||
if c.APIKey != "" {
|
|
||||||
value := crypto.Base64Encode(c.APIKey)
|
|
||||||
c.metadataHeaders[authorizationHeader] = value
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ClientConfig) getRetryOnRateLimitInterceptor() grpc.UnaryClientInterceptor {
|
func (c *ClientConfig) getRetryOnRateLimitInterceptor() grpc.UnaryClientInterceptor {
|
||||||
if c.RetryRateLimit == nil {
|
if c.RetryRateLimit == nil {
|
||||||
c.RetryRateLimit = c.defaultRetryRateLimitOption()
|
c.RetryRateLimit = c.defaultRetryRateLimitOption()
|
||||||
|
|||||||
@ -48,7 +48,7 @@ func (c *Client) MetadataUnaryInterceptor() grpc.UnaryClientInterceptor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) metadata(ctx context.Context) context.Context {
|
func (c *Client) metadata(ctx context.Context) context.Context {
|
||||||
for k, v := range c.config.metadataHeaders {
|
for k, v := range c.metadataHeaders {
|
||||||
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
|
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
|
||||||
}
|
}
|
||||||
return ctx
|
return ctx
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user