From fbc7fe1cdc3a8cb3850eb5e3a6e414d07ef7b641 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 26 Apr 2022 11:27:53 +0800 Subject: [PATCH] Fix proxy unable to update cache (#16646) Signed-off-by: yangxuan --- internal/distributed/querynode/client/client.go | 4 ++-- internal/proxy/task_policies.go | 7 +++++-- internal/proxy/task_query.go | 2 +- internal/proxy/task_search.go | 2 +- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 745ba33ab4..86f44975cd 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -244,7 +244,7 @@ func (c *Client) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmen // Search performs replica search tasks in QueryNode. func (c *Client) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) { - ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) { + ret, err := c.grpcClient.Call(ctx, func(client interface{}) (interface{}, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() } @@ -258,7 +258,7 @@ func (c *Client) Search(ctx context.Context, req *querypb.SearchRequest) (*inter // Query performs replica query tasks in QueryNode. func (c *Client) Query(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) { - ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) { + ret, err := c.grpcClient.Call(ctx, func(client interface{}) (interface{}, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() } diff --git a/internal/proxy/task_policies.go b/internal/proxy/task_policies.go index cddd92750b..e8948edec5 100644 --- a/internal/proxy/task_policies.go +++ b/internal/proxy/task_policies.go @@ -3,7 +3,6 @@ package proxy import ( "context" "errors" - "fmt" qnClient "github.com/milvus-io/milvus/internal/distributed/querynode/client" @@ -75,7 +74,11 @@ func roundRobinPolicy(ctx context.Context, getQueryNodePolicy getQueryNodePolicy } if current == replicaNum && err != nil { - return fmt.Errorf("no shard leaders available for channel: %s, leaders: %v, err: %s", leaders.GetChannelName(), leaders.GetNodeIds(), err.Error()) + log.Warn("no shard leaders available for channel", + zap.String("channel name", leaders.GetChannelName()), + zap.Int64s("leaders", leaders.GetNodeIds()), zap.Error(err)) + // needs to return the error from query + return err } return nil } diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 95f3138f41..584a6c475b 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -271,7 +271,7 @@ func (t *queryTask) Execute(ctx context.Context) error { return executeQuery(WithoutCache) } if err != nil { - return err + return fmt.Errorf("fail to search on all shard leaders, err=%s", err.Error()) } log.Info("Query Execute done.", diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index e3995cb005..034192e592 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -295,7 +295,7 @@ func (t *searchTask) Execute(ctx context.Context) error { return executeSearch(WithoutCache) } if err != nil { - return err + return fmt.Errorf("fail to search on all shard leaders, err=%s", err.Error()) } log.Info("Search Execute done.",