diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index ee79b68a01..c757563928 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -2,7 +2,6 @@ package proxy import ( "context" - "errors" "fmt" "strconv" "strings" @@ -17,7 +16,6 @@ import ( "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -307,6 +305,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error { func (t *queryTask) Execute(ctx context.Context) error { tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute query %d", t.ID())) defer tr.CtxElapse(ctx, "done") + log := log.Ctx(ctx) executeQuery := func(withCache bool) error { shards, err := globalMetaCache.GetShards(ctx, withCache, t.collectionName) @@ -323,17 +322,16 @@ func (t *queryTask) Execute(ctx context.Context) error { } err := executeQuery(WithCache) - if errors.Is(err, errInvalidShardLeaders) || funcutil.IsGrpcErr(err) || errors.Is(err, grpcclient.ErrConnect) { - log.Ctx(ctx).Warn("invalid shard leaders cache, updating shardleader caches and retry search", - zap.Int64("msgID", t.ID()), zap.Error(err)) - return executeQuery(WithoutCache) + if err != nil { + log.Warn("invalid shard leaders cache, updating shardleader caches and retry query", zap.Error(err)) + err = executeQuery(WithoutCache) } if err != nil { - return fmt.Errorf("fail to search on all shard leaders, err=%s", err.Error()) + return fmt.Errorf("fail to query on all shard leaders, err=%s", err.Error()) } - log.Ctx(ctx).Debug("Query Execute done.", - zap.Int64("msgID", t.ID()), zap.Any("requestType", "query")) + log.Debug("Query Execute done.", + zap.Int64("msgID", t.ID()), zap.String("requestType", "query")) return nil } diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index e88dddd89d..247a8223c1 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -21,7 +21,6 @@ import ( "github.com/milvus-io/milvus/internal/util/commonpbutil" "github.com/milvus-io/milvus/internal/util/distance" "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -389,6 +388,7 @@ func (t *searchTask) Execute(ctx context.Context) error { tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute search %d", t.ID())) defer tr.CtxElapse(ctx, "done") + log := log.Ctx(ctx) executeSearch := func(withCache bool) error { shard2Leaders, err := globalMetaCache.GetShards(ctx, withCache, t.collectionName) @@ -398,23 +398,22 @@ func (t *searchTask) Execute(ctx context.Context) error { t.resultBuf = make(chan *internalpb.SearchResults, len(shard2Leaders)) t.toReduceResults = make([]*internalpb.SearchResults, 0, len(shard2Leaders)) if err := t.searchShardPolicy(ctx, t.shardMgr, t.searchShard, shard2Leaders); err != nil { - log.Ctx(ctx).Warn("failed to do search", zap.Error(err), zap.String("Shards", fmt.Sprintf("%v", shard2Leaders))) + log.Warn("failed to do search", zap.String("Shards", fmt.Sprintf("%v", shard2Leaders)), zap.Error(err)) return err } return nil } err := executeSearch(WithCache) - if errors.Is(err, errInvalidShardLeaders) || funcutil.IsGrpcErr(err) || errors.Is(err, grpcclient.ErrConnect) { - log.Ctx(ctx).Warn("first search failed, updating shardleader caches and retry search", - zap.Int64("msgID", t.ID()), zap.Error(err)) - return executeSearch(WithoutCache) + if err != nil { + log.Warn("first search failed, updating shardleader caches and retry search", zap.Error(err)) + err = executeSearch(WithoutCache) } if err != nil { return fmt.Errorf("fail to search on all shard leaders, err=%v", err) } - log.Ctx(ctx).Debug("Search Execute done.", zap.Int64("msgID", t.ID())) + log.Debug("Search Execute done") return nil }