Update grpc trace (#5817)

* Update grpc trace

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* go fmt

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* remove useless code in mod

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2021-06-17 14:17:56 +08:00 committed by GitHub
parent fb51d298a1
commit b7bf26b486
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 122 additions and 90 deletions

2
go.mod
View File

@ -18,12 +18,12 @@ require (
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.4.3
github.com/google/btree v1.0.0
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/jarcoal/httpmock v1.0.8
github.com/klauspost/compress v1.10.11 // indirect
github.com/minio/minio-go/v7 v7.0.10
github.com/mitchellh/mapstructure v1.1.2
github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486
github.com/opentracing/opentracing-go v1.2.0
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pkg/errors v0.9.1

2
go.sum
View File

@ -277,8 +277,6 @@ github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486 h1:K35HCWaOTJIPW6cDHK4yj3QfRY/NhE0pBbfoc0M2NMQ=
github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486/go.mod h1:DYR5Eij8rJl8h7gblRrOZ8g0kW1umSpKqYIBTgeDtLo=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=

View File

@ -18,13 +18,13 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/retry"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -69,17 +69,16 @@ func (c *Client) Init() error {
}
func (c *Client) connect() error {
tracer := opentracing.GlobalTracer()
var err error
connectGrpcFunc := func() error {
log.Debug("DataNode connect ", zap.String("address", c.addr))
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
opts := trace.GetInterceptorOpts()
log.Debug("DataNode connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
grpc_opentracing.StreamClientInterceptor(opts...)))
if err != nil {
return err
}
@ -87,7 +86,7 @@ func (c *Client) connect() error {
return nil
}
err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
err := retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
if err != nil {
log.Debug("DataNodeClient try connect failed", zap.Error(err))
return err

View File

@ -22,8 +22,6 @@ import (
"sync"
"time"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -31,6 +29,7 @@ import (
dsc "github.com/milvus-io/milvus/internal/distributed/dataservice/client"
msc "github.com/milvus-io/milvus/internal/distributed/masterservice/client"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -93,14 +92,14 @@ func (s *Server) startGrpc() error {
func (s *Server) startGrpcLoop(listener net.Listener) {
defer s.wg.Done()
tracer := opentracing.GlobalTracer()
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.UnaryInterceptor(
otgrpc.OpenTracingServerInterceptor(tracer)),
grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(
otgrpc.OpenTracingStreamServerInterceptor(tracer)))
grpc_opentracing.StreamServerInterceptor(opts...)))
datapb.RegisterDataNodeServer(s.grpcServer, s)
ctx, cancel := context.WithCancel(s.ctx)

View File

@ -16,13 +16,13 @@ import (
"fmt"
"time"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -80,7 +80,6 @@ func (c *Client) Init() error {
}
func (c *Client) connect() error {
tracer := opentracing.GlobalTracer()
var err error
getDataServiceAddressFn := func() error {
c.addr, err = getDataServiceAddress(c.sess)
@ -97,12 +96,13 @@ func (c *Client) connect() error {
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
opts := trace.GetInterceptorOpts()
log.Debug("DataServiceClient try reconnect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
grpc_opentracing.StreamClientInterceptor(opts...)))
if err != nil {
return err
}

View File

@ -25,6 +25,7 @@ import (
"google.golang.org/grpc"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/milvus-io/milvus/internal/dataservice"
"github.com/milvus-io/milvus/internal/log"
@ -32,8 +33,6 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/trace"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -126,14 +125,14 @@ func (s *Server) startGrpcLoop(grpcPort int) {
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
tracer := opentracing.GlobalTracer()
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.UnaryInterceptor(
otgrpc.OpenTracingServerInterceptor(tracer)),
grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(
otgrpc.OpenTracingStreamServerInterceptor(tracer)))
grpc_opentracing.StreamServerInterceptor(opts...)))
//grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor))
datapb.RegisterDataServiceServer(s.grpcServer, s)
grpc_prometheus.Register(s.grpcServer)

View File

@ -16,11 +16,11 @@ import (
"fmt"
"time"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/retry"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/milvus-io/milvus/internal/util/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -64,17 +64,16 @@ func (c *Client) Init() error {
}
func (c *Client) connect() error {
tracer := opentracing.GlobalTracer()
var err error
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
opts := trace.GetInterceptorOpts()
log.Debug("IndexNodeClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
grpc_opentracing.StreamClientInterceptor(opts...)))
if err != nil {
return err
}
@ -82,7 +81,7 @@ func (c *Client) connect() error {
return nil
}
err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
err := retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
if err != nil {
log.Debug("IndexNodeClient try connect failed", zap.Error(err))
return err

View File

@ -23,6 +23,7 @@ import (
"go.uber.org/zap"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpcindexserviceclient "github.com/milvus-io/milvus/internal/distributed/indexservice/client"
"github.com/milvus-io/milvus/internal/indexnode"
"github.com/milvus-io/milvus/internal/log"
@ -33,8 +34,6 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/trace"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"google.golang.org/grpc"
)
@ -79,14 +78,14 @@ func (s *Server) startGrpcLoop(grpcPort int) {
ctx, cancel := context.WithCancel(s.loopCtx)
defer cancel()
tracer := opentracing.GlobalTracer()
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.UnaryInterceptor(
otgrpc.OpenTracingServerInterceptor(tracer)),
grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(
otgrpc.OpenTracingStreamServerInterceptor(tracer)))
grpc_opentracing.StreamServerInterceptor(opts...)))
indexpb.RegisterIndexNodeServer(s.grpcServer, s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(lis); err != nil {

View File

@ -18,12 +18,12 @@ import (
"google.golang.org/grpc"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -84,7 +84,6 @@ func (c *Client) Init() error {
}
func (c *Client) connect() error {
tracer := opentracing.GlobalTracer()
var err error
getIndexServiceaddrFn := func() error {
c.addr, err = getIndexServiceaddr(c.sess)
@ -102,12 +101,13 @@ func (c *Client) connect() error {
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
opts := trace.GetInterceptorOpts()
log.Debug("IndexServiceClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
grpc_opentracing.StreamClientInterceptor(opts...)))
if err != nil {
return err
}

View File

@ -21,6 +21,7 @@ import (
"go.uber.org/zap"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/indexservice"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -30,8 +31,6 @@ import (
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"google.golang.org/grpc"
)
@ -164,14 +163,14 @@ func (s *Server) startGrpcLoop(grpcPort int) {
ctx, cancel := context.WithCancel(s.loopCtx)
defer cancel()
tracer := opentracing.GlobalTracer()
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.UnaryInterceptor(
otgrpc.OpenTracingServerInterceptor(tracer)),
grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(
otgrpc.OpenTracingStreamServerInterceptor(tracer)))
grpc_opentracing.StreamServerInterceptor(opts...)))
indexpb.RegisterIndexServiceServer(s.grpcServer, s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)

View File

@ -17,6 +17,7 @@ import (
"fmt"
"time"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
@ -24,9 +25,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"google.golang.org/grpc"
)
@ -87,7 +87,6 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string, tim
}
func (c *GrpcClient) connect() error {
tracer := opentracing.GlobalTracer()
var err error
getMasterServiceAddrFn := func() error {
ch := make(chan struct{}, 1)
@ -118,12 +117,13 @@ func (c *GrpcClient) connect() error {
var conn *grpc.ClientConn
var err error
ch := make(chan struct{}, 1)
opts := trace.GetInterceptorOpts()
go func() {
conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
grpc_opentracing.StreamClientInterceptor(opts...)))
ch <- struct{}{}
}()
select {

View File

@ -20,8 +20,6 @@ import (
"sync"
"time"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -35,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/trace"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/masterpb"
@ -223,14 +222,14 @@ func (s *Server) startGrpcLoop(grpcPort int) {
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
tracer := opentracing.GlobalTracer()
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.UnaryInterceptor(
otgrpc.OpenTracingServerInterceptor(tracer)),
grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(
otgrpc.OpenTracingStreamServerInterceptor(tracer)))
grpc_opentracing.StreamServerInterceptor(opts...)))
masterpb.RegisterMasterServiceServer(s.grpcServer, s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)

View File

@ -15,14 +15,14 @@ import (
"context"
"time"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/util/retry"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/milvus-io/milvus/internal/util/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
)
@ -58,16 +58,16 @@ func (c *Client) Init() error {
}
func (c *Client) connect() error {
tracer := opentracing.GlobalTracer()
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
opts := trace.GetInterceptorOpts()
log.Debug("ProxyNodeClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
grpc_opentracing.StreamClientInterceptor(opts...)))
if err != nil {
return err
}

View File

@ -28,8 +28,8 @@ import (
grpcindexserviceclient "github.com/milvus-io/milvus/internal/distributed/indexservice/client"
grpcmasterserviceclient "github.com/milvus-io/milvus/internal/distributed/masterservice/client"
grpcqueryserviceclient "github.com/milvus-io/milvus/internal/distributed/queryservice/client"
otgrpc "github.com/opentracing-contrib/go-grpc"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -93,15 +93,15 @@ func (s *Server) startGrpcLoop(grpcPort int) {
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
tracer := opentracing.GlobalTracer()
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.MaxRecvMsgSize(GRPCMaxMagSize),
grpc.UnaryInterceptor(
otgrpc.OpenTracingServerInterceptor(tracer)),
grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(
otgrpc.OpenTracingStreamServerInterceptor(tracer)),
grpc.MaxRecvMsgSize(GRPCMaxMagSize))
grpc_opentracing.StreamServerInterceptor(opts...)))
proxypb.RegisterProxyNodeServiceServer(s.grpcServer, s)
milvuspb.RegisterMilvusServiceServer(s.grpcServer, s)

View File

@ -18,14 +18,14 @@ import (
"google.golang.org/grpc"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/retry"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/milvus-io/milvus/internal/util/trace"
"go.uber.org/zap"
)
@ -64,17 +64,16 @@ func (c *Client) Init() error {
}
func (c *Client) connect() error {
tracer := opentracing.GlobalTracer()
var err error
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
opts := trace.GetInterceptorOpts()
log.Debug("QueryNodeClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
grpc_opentracing.StreamClientInterceptor(opts...)))
if err != nil {
return err
}
@ -82,7 +81,7 @@ func (c *Client) connect() error {
return nil
}
err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
err := retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
if err != nil {
log.Debug("QueryNodeClient try connect failed", zap.Error(err))
return err

View File

@ -25,11 +25,10 @@ import (
"github.com/milvus-io/milvus/internal/types"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"google.golang.org/grpc"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
dsc "github.com/milvus-io/milvus/internal/distributed/dataservice/client"
isc "github.com/milvus-io/milvus/internal/distributed/indexservice/client"
msc "github.com/milvus-io/milvus/internal/distributed/masterservice/client"
@ -248,14 +247,14 @@ func (s *Server) startGrpcLoop(grpcPort int) {
return
}
tracer := opentracing.GlobalTracer()
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.UnaryInterceptor(
otgrpc.OpenTracingServerInterceptor(tracer)),
grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(
otgrpc.OpenTracingStreamServerInterceptor(tracer)))
grpc_opentracing.StreamServerInterceptor(opts...)))
querypb.RegisterQueryNodeServer(s.grpcServer, s)
ctx, cancel := context.WithCancel(s.ctx)

View File

@ -16,11 +16,11 @@ import (
"fmt"
"time"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -83,7 +83,6 @@ func (c *Client) Init() error {
}
func (c *Client) connect() error {
tracer := opentracing.GlobalTracer()
var err error
getQueryServiceAddressFn := func() error {
c.addr, err = getQueryServiceAddress(c.sess)
@ -100,12 +99,13 @@ func (c *Client) connect() error {
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
opts := trace.GetInterceptorOpts()
log.Debug("QueryServiceClient try reconnect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
grpc_opentracing.StreamClientInterceptor(opts...)))
if err != nil {
return err
}

View File

@ -20,6 +20,7 @@ import (
"sync"
"time"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
dsc "github.com/milvus-io/milvus/internal/distributed/dataservice/client"
msc "github.com/milvus-io/milvus/internal/distributed/masterservice/client"
"github.com/milvus-io/milvus/internal/log"
@ -28,8 +29,6 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/trace"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -182,14 +181,14 @@ func (s *Server) startGrpcLoop(grpcPort int) {
ctx, cancel := context.WithCancel(s.loopCtx)
defer cancel()
tracer := opentracing.GlobalTracer()
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.UnaryInterceptor(
otgrpc.OpenTracingServerInterceptor(tracer)),
grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(
otgrpc.OpenTracingStreamServerInterceptor(tracer)))
grpc_opentracing.StreamServerInterceptor(opts...)))
querypb.RegisterQueryServiceServer(s.grpcServer, s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)

View File

@ -0,0 +1,44 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package trace
import (
"context"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/opentracing/opentracing-go"
"google.golang.org/grpc"
)
type InterceptorSuite struct {
ClientOpts []grpc.DialOption
ServerOpts []grpc.ServerOption
}
var (
filterFunc = func(ctx context.Context, fullMethodName string) bool {
if fullMethodName == `/milvus.proto.master.MasterService/UpdateChannelTimeTick` ||
fullMethodName == `/milvus.proto.master.MasterService/AllocTimestamp` {
return false
}
return true
}
)
func GetInterceptorOpts() []grpc_opentracing.Option {
tracer := opentracing.GlobalTracer()
opts := []grpc_opentracing.Option{
grpc_opentracing.WithTracer(tracer),
grpc_opentracing.WithFilterFunc(filterFunc),
}
return opts
}