diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index d37d9a7d85..dc9f328711 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -2,7 +2,6 @@ package proxy import ( "context" - "errors" "fmt" "strconv" "strings" @@ -17,7 +16,6 @@ import ( "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -318,6 +316,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error { func (t *queryTask) Execute(ctx context.Context) error { tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute query %d", t.ID())) defer tr.CtxElapse(ctx, "done") + log := log.Ctx(ctx) executeQuery := func(withCache bool) error { shards, err := globalMetaCache.GetShards(ctx, withCache, t.collectionName) @@ -334,17 +333,17 @@ func (t *queryTask) Execute(ctx context.Context) error { } err := executeQuery(WithCache) - if errors.Is(err, errInvalidShardLeaders) || funcutil.IsGrpcErr(err) || errors.Is(err, grpcclient.ErrConnect) { - log.Ctx(ctx).Warn("invalid shard leaders cache, updating shardleader caches and retry search", + if err != nil { + log.Warn("invalid shard leaders cache, updating shardleader caches and retry query", zap.Error(err)) - return executeQuery(WithoutCache) + err = executeQuery(WithoutCache) } if err != nil { - return fmt.Errorf("fail to search on all shard leaders, err=%s", err.Error()) + return fmt.Errorf("fail to query on all shard leaders, err=%s", err.Error()) } - log.Ctx(ctx).Debug("Query Execute done.", - zap.Any("requestType", "query")) + log.Debug("Query Execute done.", + zap.String("requestType", "query")) return nil } diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 3ce9d70a9b..d2158128fa 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -387,6 +387,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error { func (t *searchTask) Execute(ctx context.Context) error { sp, ctx := trace.StartSpanFromContextWithOperationName(t.TraceCtx(), "Proxy-Search-Execute") defer sp.Finish() + log := log.Ctx(ctx) tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute search %d", t.ID())) defer tr.CtxElapse(ctx, "done") @@ -399,7 +400,7 @@ func (t *searchTask) Execute(ctx context.Context) error { t.resultBuf = make(chan *internalpb.SearchResults, len(shard2Leaders)) t.toReduceResults = make([]*internalpb.SearchResults, 0, len(shard2Leaders)) if err := t.searchShardPolicy(ctx, t.shardMgr, t.searchShard, shard2Leaders); err != nil { - log.Ctx(ctx).Warn("failed to do search", zap.Error(err), zap.String("Shards", fmt.Sprintf("%v", shard2Leaders))) + log.Warn("failed to do search", zap.Error(err), zap.String("Shards", fmt.Sprintf("%v", shard2Leaders))) return err } return nil @@ -407,7 +408,7 @@ func (t *searchTask) Execute(ctx context.Context) error { err := executeSearch(WithCache) if err != nil { - log.Ctx(ctx).Warn("first search failed, updating shardleader caches and retry search", + log.Warn("first search failed, updating shardleader caches and retry search", zap.Error(err)) err = executeSearch(WithoutCache) } @@ -415,7 +416,7 @@ func (t *searchTask) Execute(ctx context.Context) error { return fmt.Errorf("fail to search on all shard leaders, err=%v", err) } - log.Ctx(ctx).Debug("Search Execute done.") + log.Debug("Search Execute done.") return nil }