diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 9db4d3a765..e217fb54dc 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -20,6 +20,7 @@ import ( "context" "fmt" + "go.uber.org/zap" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -27,6 +28,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/grpcclient" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -38,6 +41,7 @@ var Params *paramtable.ComponentParam = paramtable.Get() // Client is the grpc client for DataNode type Client struct { grpcClient grpcclient.GrpcClient[datapb.DataNodeClient] + sess *sessionutil.Session addr string } @@ -46,16 +50,24 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (*Client, error) if addr == "" { return nil, fmt.Errorf("address is empty") } + sess := sessionutil.NewSession(ctx) + if sess == nil { + err := fmt.Errorf("new session error, maybe can not connect to etcd") + log.Debug("DataNodeClient New Etcd Session failed", zap.Error(err)) + return nil, err + } config := &Params.DataNodeGrpcClientCfg client := &Client{ addr: addr, grpcClient: grpcclient.NewClientBase[datapb.DataNodeClient](config, "milvus.proto.data.DataNode"), + sess: sess, } // node shall specify node id client.grpcClient.SetRole(fmt.Sprintf("%s-%d", typeutil.DataNodeRole, nodeID)) client.grpcClient.SetGetAddrFunc(client.getAddr) client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient) client.grpcClient.SetNodeID(nodeID) + client.grpcClient.SetSession(sess) return client, nil } diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index af45015b7f..192bf898cc 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -20,6 +20,7 @@ import ( "context" "fmt" + "go.uber.org/zap" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -27,6 +28,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/grpcclient" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -39,6 +42,7 @@ var Params *paramtable.ComponentParam = paramtable.Get() type Client struct { grpcClient grpcclient.GrpcClient[indexpb.IndexNodeClient] addr string + sess *sessionutil.Session } // NewClient creates a new IndexNode client. @@ -46,16 +50,24 @@ func NewClient(ctx context.Context, addr string, nodeID int64, encryption bool) if addr == "" { return nil, fmt.Errorf("address is empty") } + sess := sessionutil.NewSession(ctx) + if sess == nil { + err := fmt.Errorf("new session error, maybe can not connect to etcd") + log.Debug("IndexNodeClient New Etcd Session failed", zap.Error(err)) + return nil, err + } config := &Params.IndexNodeGrpcClientCfg client := &Client{ addr: addr, grpcClient: grpcclient.NewClientBase[indexpb.IndexNodeClient](config, "milvus.proto.index.IndexNode"), + sess: sess, } // node shall specify node id client.grpcClient.SetRole(fmt.Sprintf("%s-%d", typeutil.IndexNodeRole, nodeID)) client.grpcClient.SetGetAddrFunc(client.getAddr) client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient) client.grpcClient.SetNodeID(nodeID) + client.grpcClient.SetSession(sess) if encryption { client.grpcClient.EnableEncryption() } diff --git a/internal/distributed/proxy/client/client.go b/internal/distributed/proxy/client/client.go index b45f00efd7..4151bfa50e 100644 --- a/internal/distributed/proxy/client/client.go +++ b/internal/distributed/proxy/client/client.go @@ -20,6 +20,7 @@ import ( "context" "fmt" + "go.uber.org/zap" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -27,6 +28,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/util/grpcclient" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -39,6 +42,7 @@ var Params *paramtable.ComponentParam = paramtable.Get() type Client struct { grpcClient grpcclient.GrpcClient[proxypb.ProxyClient] addr string + sess *sessionutil.Session } // NewClient creates a new client instance @@ -46,16 +50,24 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (*Client, error) if addr == "" { return nil, fmt.Errorf("address is empty") } + sess := sessionutil.NewSession(ctx) + if sess == nil { + err := fmt.Errorf("new session error, maybe can not connect to etcd") + log.Debug("Proxy client new session failed", zap.Error(err)) + return nil, err + } config := &Params.ProxyGrpcClientCfg client := &Client{ addr: addr, grpcClient: grpcclient.NewClientBase[proxypb.ProxyClient](config, "milvus.proto.proxy.Proxy"), + sess: sess, } // node shall specify node id client.grpcClient.SetRole(fmt.Sprintf("%s-%d", typeutil.ProxyRole, nodeID)) client.grpcClient.SetGetAddrFunc(client.getAddr) client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient) client.grpcClient.SetNodeID(nodeID) + client.grpcClient.SetSession(sess) return client, nil } diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 8afa16a0e7..bea362fe41 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -319,14 +319,14 @@ func (s *Server) startInternalGrpc(grpcPort int, errChan chan error) { Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead } - log.Debug("Proxy internal server listen on tcp", zap.Int("port", grpcPort)) + log.Info("Proxy internal server listen on tcp", zap.Int("port", grpcPort)) lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort)) if err != nil { log.Warn("Proxy internal server failed to listen on", zap.Error(err), zap.Int("port", grpcPort)) errChan <- err return } - log.Debug("Proxy internal server already listen on tcp", zap.Int("port", grpcPort)) + log.Info("Proxy internal server already listen on tcp", zap.Int("port", grpcPort)) opts := tracer.GetInterceptorOpts() s.grpcInternalServer = grpc.NewServer( @@ -358,7 +358,7 @@ func (s *Server) startInternalGrpc(grpcPort int, errChan chan error) { grpc_health_v1.RegisterHealthServer(s.grpcInternalServer, s) errChan <- nil - log.Debug("create Proxy internal grpc server", + log.Info("create Proxy internal grpc server", zap.Any("enforcement policy", kaep), zap.Any("server parameters", kasp)) diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index a7291a9ad5..64c0ceec5d 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -20,6 +20,7 @@ import ( "context" "fmt" + "go.uber.org/zap" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -27,6 +28,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/grpcclient" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -37,6 +40,7 @@ import ( type Client struct { grpcClient grpcclient.GrpcClient[querypb.QueryNodeClient] addr string + sess *sessionutil.Session } // NewClient creates a new QueryNode client. @@ -44,16 +48,24 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (*Client, error) if addr == "" { return nil, fmt.Errorf("addr is empty") } + sess := sessionutil.NewSession(ctx) + if sess == nil { + err := fmt.Errorf("new session error, maybe can not connect to etcd") + log.Debug("QueryNodeClient NewClient failed", zap.Error(err)) + return nil, err + } config := ¶mtable.Get().QueryNodeGrpcClientCfg client := &Client{ addr: addr, grpcClient: grpcclient.NewClientBase[querypb.QueryNodeClient](config, "milvus.proto.query.QueryNode"), + sess: sess, } // node shall specify node id client.grpcClient.SetRole(fmt.Sprintf("%s-%d", typeutil.QueryNodeRole, nodeID)) client.grpcClient.SetGetAddrFunc(client.getAddr) client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient) client.grpcClient.SetNodeID(nodeID) + client.grpcClient.SetSession(sess) return client, nil } diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go index ff53082e6b..d9ec46d6a3 100644 --- a/internal/util/grpcclient/client.go +++ b/internal/util/grpcclient/client.go @@ -379,7 +379,7 @@ func (c *ClientBase[T]) checkGrpcErr(ctx context.Context, err error) (needRetry, return false, false, merr.WrapErrServiceUnimplemented(err) case IsServerIDMismatchErr(err): if ok, err := c.checkNodeSessionExist(ctx); !ok { - // if session doesn't exist, no need to retry for datanode/indexnode/querynode + // if session doesn't exist, no need to retry for datanode/indexnode/querynode/proxy return false, false, err } return true, true, err