enhance: Support proxy/delegator qn client pooling (#35194)

See also #35196

Add param item for proxy/delegator query node client pooling and
implement pooling logic

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-08-02 11:24:19 +08:00 committed by GitHub
parent 3b735b4b02
commit c64a078458
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 123 additions and 19 deletions

View File

@ -299,6 +299,8 @@ proxy:
maxConnectionNum: 10000 # the max client info numbers that proxy should manage, avoid too many client infos maxConnectionNum: 10000 # the max client info numbers that proxy should manage, avoid too many client infos
gracefulStopTimeout: 30 # seconds. force stop node without graceful stop gracefulStopTimeout: 30 # seconds. force stop node without graceful stop
slowQuerySpanInSeconds: 5 # query whose executed time exceeds the `slowQuerySpanInSeconds` can be considered slow, in seconds. slowQuerySpanInSeconds: 5 # query whose executed time exceeds the `slowQuerySpanInSeconds` can be considered slow, in seconds.
queryNodePooling:
size: 10 # the size for shardleader(querynode) client pool
http: http:
enabled: true # Whether to enable the http server enabled: true # Whether to enable the http server
debug_mode: false # Whether to enable http server debug mode debug_mode: false # Whether to enable http server debug mode
@ -451,6 +453,8 @@ queryNode:
enableSegmentPrune: false # use partition stats to prune data in search/query on shard delegator enableSegmentPrune: false # use partition stats to prune data in search/query on shard delegator
queryStreamBatchSize: 4194304 # return batch size of stream query queryStreamBatchSize: 4194304 # return batch size of stream query
bloomFilterApplyParallelFactor: 4 # parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM bloomFilterApplyParallelFactor: 4 # parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM
workerPooling:
size: 10 # the size for worker querynode client pool
ip: # TCP/IP address of queryNode. If not specified, use the first unicastable address ip: # TCP/IP address of queryNode. If not specified, use the first unicastable address
port: 21123 # TCP port of queryNode port: 21123 # TCP port of queryNode
grpc: grpc:

View File

@ -6,11 +6,13 @@ import (
"sync" "sync"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus/internal/registry" "github.com/milvus-io/milvus/internal/registry"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
) )
type queryNodeCreatorFunc func(ctx context.Context, addr string, nodeID int64) (types.QueryNodeClient, error) type queryNodeCreatorFunc func(ctx context.Context, addr string, nodeID int64) (types.QueryNodeClient, error)
@ -32,6 +34,10 @@ type shardClient struct {
client types.QueryNodeClient client types.QueryNodeClient
isClosed bool isClosed bool
refCnt int refCnt int
clients []types.QueryNodeClient
idx atomic.Int64
poolSize int
pooling bool
} }
func (n *shardClient) getClient(ctx context.Context) (types.QueryNodeClient, error) { func (n *shardClient) getClient(ctx context.Context) (types.QueryNodeClient, error) {
@ -40,6 +46,10 @@ func (n *shardClient) getClient(ctx context.Context) (types.QueryNodeClient, err
if n.isClosed { if n.isClosed {
return nil, errClosed return nil, errClosed
} }
if n.pooling {
idx := n.idx.Inc()
return n.clients[int(idx)%n.poolSize], nil
}
return n.client, nil return n.client, nil
} }
@ -96,6 +106,31 @@ func newShardClient(info *nodeInfo, client types.QueryNodeClient) *shardClient {
return ret return ret
} }
func newPoolingShardClient(info *nodeInfo, creator queryNodeCreatorFunc) (*shardClient, error) {
num := paramtable.Get().ProxyCfg.QueryNodePoolingSize.GetAsInt()
if num <= 0 {
num = 1
}
clients := make([]types.QueryNodeClient, 0, num)
for i := 0; i < num; i++ {
client, err := creator(context.Background(), info.address, info.nodeID)
if err != nil {
return nil, err
}
clients = append(clients, client)
}
return &shardClient{
info: nodeInfo{
nodeID: info.nodeID,
address: info.address,
},
refCnt: 1,
pooling: true,
clients: clients,
poolSize: num,
}, nil
}
type shardClientMgr interface { type shardClientMgr interface {
GetClient(ctx context.Context, nodeID UniqueID) (types.QueryNodeClient, error) GetClient(ctx context.Context, nodeID UniqueID) (types.QueryNodeClient, error)
UpdateShardLeaders(oldLeaders map[string][]nodeInfo, newLeaders map[string][]nodeInfo) error UpdateShardLeaders(oldLeaders map[string][]nodeInfo, newLeaders map[string][]nodeInfo) error
@ -182,11 +217,10 @@ func (c *shardClientMgrImpl) UpdateShardLeaders(oldLeaders map[string][]nodeInfo
if c.clientCreator == nil { if c.clientCreator == nil {
return fmt.Errorf("clientCreator function is nil") return fmt.Errorf("clientCreator function is nil")
} }
shardClient, err := c.clientCreator(context.Background(), node.address, node.nodeID) client, err := newPoolingShardClient(node, c.clientCreator)
if err != nil { if err != nil {
return err return err
} }
client := newShardClient(node, shardClient)
c.clients.data[node.nodeID] = client c.clients.data[node.nodeID] = client
} }
} }

View File

@ -22,6 +22,7 @@ import (
"io" "io"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
@ -30,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
) )
// Worker is the interface definition for querynode worker role. // Worker is the interface definition for querynode worker role.
@ -49,21 +51,55 @@ type Worker interface {
// remoteWorker wraps grpc QueryNode client as Worker. // remoteWorker wraps grpc QueryNode client as Worker.
type remoteWorker struct { type remoteWorker struct {
client types.QueryNodeClient client types.QueryNodeClient
clients []types.QueryNodeClient
poolSize int
idx atomic.Int64
pooling bool
} }
// NewRemoteWorker creates a grpcWorker. // NewRemoteWorker creates a grpcWorker.
func NewRemoteWorker(client types.QueryNodeClient) Worker { func NewRemoteWorker(client types.QueryNodeClient) Worker {
return &remoteWorker{ return &remoteWorker{
client: client, client: client,
pooling: false,
} }
} }
func NewPoolingRemoteWorker(fn func() (types.QueryNodeClient, error)) (Worker, error) {
num := paramtable.Get().QueryNodeCfg.WorkerPoolingSize.GetAsInt()
if num <= 0 {
num = 1
}
clients := make([]types.QueryNodeClient, 0, num)
for i := 0; i < num; i++ {
c, err := fn()
if err != nil {
return nil, err
}
clients = append(clients, c)
}
return &remoteWorker{
pooling: true,
clients: clients,
poolSize: num,
}, nil
}
func (w *remoteWorker) getClient() types.QueryNodeClient {
if w.pooling {
idx := w.idx.Inc()
return w.clients[int(idx)%w.poolSize]
}
return w.client
}
// LoadSegments implements Worker. // LoadSegments implements Worker.
func (w *remoteWorker) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error { func (w *remoteWorker) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error {
log := log.Ctx(ctx).With( log := log.Ctx(ctx).With(
zap.Int64("workerID", req.GetDstNodeID()), zap.Int64("workerID", req.GetDstNodeID()),
) )
status, err := w.client.LoadSegments(ctx, req) client := w.getClient()
status, err := client.LoadSegments(ctx, req)
if err = merr.CheckRPCCall(status, err); err != nil { if err = merr.CheckRPCCall(status, err); err != nil {
log.Warn("failed to call LoadSegments via grpc worker", log.Warn("failed to call LoadSegments via grpc worker",
zap.Error(err), zap.Error(err),
@ -77,7 +113,8 @@ func (w *remoteWorker) ReleaseSegments(ctx context.Context, req *querypb.Release
log := log.Ctx(ctx).With( log := log.Ctx(ctx).With(
zap.Int64("workerID", req.GetNodeID()), zap.Int64("workerID", req.GetNodeID()),
) )
status, err := w.client.ReleaseSegments(ctx, req) client := w.getClient()
status, err := client.ReleaseSegments(ctx, req)
if err = merr.CheckRPCCall(status, err); err != nil { if err = merr.CheckRPCCall(status, err); err != nil {
log.Warn("failed to call ReleaseSegments via grpc worker", log.Warn("failed to call ReleaseSegments via grpc worker",
zap.Error(err), zap.Error(err),
@ -91,7 +128,8 @@ func (w *remoteWorker) Delete(ctx context.Context, req *querypb.DeleteRequest) e
log := log.Ctx(ctx).With( log := log.Ctx(ctx).With(
zap.Int64("workerID", req.GetBase().GetTargetID()), zap.Int64("workerID", req.GetBase().GetTargetID()),
) )
status, err := w.client.Delete(ctx, req) client := w.getClient()
status, err := client.Delete(ctx, req)
if err := merr.CheckRPCCall(status, err); err != nil { if err := merr.CheckRPCCall(status, err); err != nil {
if errors.Is(err, merr.ErrServiceUnimplemented) { if errors.Is(err, merr.ErrServiceUnimplemented) {
log.Warn("invoke legacy querynode Delete method, ignore error", zap.Error(err)) log.Warn("invoke legacy querynode Delete method, ignore error", zap.Error(err))
@ -104,27 +142,30 @@ func (w *remoteWorker) Delete(ctx context.Context, req *querypb.DeleteRequest) e
} }
func (w *remoteWorker) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) { func (w *remoteWorker) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
ret, err := w.client.SearchSegments(ctx, req) client := w.getClient()
ret, err := client.SearchSegments(ctx, req)
if err != nil && errors.Is(err, merr.ErrServiceUnimplemented) { if err != nil && errors.Is(err, merr.ErrServiceUnimplemented) {
// for compatible with rolling upgrade from version before v2.2.9 // for compatible with rolling upgrade from version before v2.2.9
return w.client.Search(ctx, req) return client.Search(ctx, req)
} }
return ret, err return ret, err
} }
func (w *remoteWorker) QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) { func (w *remoteWorker) QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) {
ret, err := w.client.QuerySegments(ctx, req) client := w.getClient()
ret, err := client.QuerySegments(ctx, req)
if err != nil && errors.Is(err, merr.ErrServiceUnimplemented) { if err != nil && errors.Is(err, merr.ErrServiceUnimplemented) {
// for compatible with rolling upgrade from version before v2.2.9 // for compatible with rolling upgrade from version before v2.2.9
return w.client.Query(ctx, req) return client.Query(ctx, req)
} }
return ret, err return ret, err
} }
func (w *remoteWorker) QueryStreamSegments(ctx context.Context, req *querypb.QueryRequest, srv streamrpc.QueryStreamServer) error { func (w *remoteWorker) QueryStreamSegments(ctx context.Context, req *querypb.QueryRequest, srv streamrpc.QueryStreamServer) error {
client, err := w.client.QueryStreamSegments(ctx, req) c := w.getClient()
client, err := c.QueryStreamSegments(ctx, req)
if err != nil { if err != nil {
return err return err
} }
@ -155,7 +196,8 @@ func (w *remoteWorker) QueryStreamSegments(ctx context.Context, req *querypb.Que
} }
func (w *remoteWorker) GetStatistics(ctx context.Context, req *querypb.GetStatisticsRequest) (*internalpb.GetStatisticsResponse, error) { func (w *remoteWorker) GetStatistics(ctx context.Context, req *querypb.GetStatisticsRequest) (*internalpb.GetStatisticsResponse, error) {
return w.client.GetStatistics(ctx, req) client := w.getClient()
return client.GetStatistics(ctx, req)
} }
func (w *remoteWorker) IsHealthy() bool { func (w *remoteWorker) IsHealthy() bool {
@ -163,6 +205,11 @@ func (w *remoteWorker) IsHealthy() bool {
} }
func (w *remoteWorker) Stop() { func (w *remoteWorker) Stop() {
if w.pooling {
for _, client := range w.clients {
client.Close()
}
}
if err := w.client.Close(); err != nil { if err := w.client.Close(); err != nil {
log.Warn("failed to call Close via grpc worker", zap.Error(err)) log.Warn("failed to call Close via grpc worker", zap.Error(err))
} }

View File

@ -337,12 +337,9 @@ func (node *QueryNode) Init() error {
} }
} }
client, err := grpcquerynodeclient.NewClient(node.ctx, addr, nodeID) return cluster.NewPoolingRemoteWorker(func() (types.QueryNodeClient, error) {
if err != nil { return grpcquerynodeclient.NewClient(node.ctx, addr, nodeID)
return nil, err })
}
return cluster.NewRemoteWorker(client), nil
}) })
node.delegators = typeutil.NewConcurrentMap[string, delegator.ShardDelegator]() node.delegators = typeutil.NewConcurrentMap[string, delegator.ShardDelegator]()
node.subscribingChannels = typeutil.NewConcurrentSet[string]() node.subscribingChannels = typeutil.NewConcurrentSet[string]()

View File

@ -1209,6 +1209,7 @@ type proxyConfig struct {
GracefulStopTimeout ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"`
SlowQuerySpanInSeconds ParamItem `refreshable:"true"` SlowQuerySpanInSeconds ParamItem `refreshable:"true"`
QueryNodePoolingSize ParamItem `refreshable:"false"`
} }
func (p *proxyConfig) init(base *BaseTable) { func (p *proxyConfig) init(base *BaseTable) {
@ -1611,6 +1612,15 @@ please adjust in embedded Milvus: false`,
Export: true, Export: true,
} }
p.SlowQuerySpanInSeconds.Init(base.mgr) p.SlowQuerySpanInSeconds.Init(base.mgr)
p.QueryNodePoolingSize = ParamItem{
Key: "proxy.queryNodePooling.size",
Version: "2.4.7",
Doc: "the size for shardleader(querynode) client pool",
DefaultValue: "10",
Export: true,
}
p.QueryNodePoolingSize.Init(base.mgr)
} }
// ///////////////////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////////////////
@ -2314,6 +2324,9 @@ type queryNodeConfig struct {
UseStreamComputing ParamItem `refreshable:"false"` UseStreamComputing ParamItem `refreshable:"false"`
QueryStreamBatchSize ParamItem `refreshable:"false"` QueryStreamBatchSize ParamItem `refreshable:"false"`
BloomFilterApplyParallelFactor ParamItem `refreshable:"true"` BloomFilterApplyParallelFactor ParamItem `refreshable:"true"`
// worker
WorkerPoolingSize ParamItem `refreshable:"false"`
} }
func (p *queryNodeConfig) init(base *BaseTable) { func (p *queryNodeConfig) init(base *BaseTable) {
@ -2955,6 +2968,15 @@ user-task-polling:
Export: true, Export: true,
} }
p.BloomFilterApplyParallelFactor.Init(base.mgr) p.BloomFilterApplyParallelFactor.Init(base.mgr)
p.WorkerPoolingSize = ParamItem{
Key: "queryNode.workerPooling.size",
Version: "2.4.7",
Doc: "the size for worker querynode client pool",
DefaultValue: "10",
Export: true,
}
p.WorkerPoolingSize.Init(base.mgr)
} }
// ///////////////////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////////////////