mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
69b7eeb7b7
commit
01e609daff
@ -2,7 +2,6 @@ package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -17,7 +16,6 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/grpcclient"
|
||||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
@ -307,6 +305,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
|
||||
func (t *queryTask) Execute(ctx context.Context) error {
|
||||
tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute query %d", t.ID()))
|
||||
defer tr.CtxElapse(ctx, "done")
|
||||
log := log.Ctx(ctx)
|
||||
|
||||
executeQuery := func(withCache bool) error {
|
||||
shards, err := globalMetaCache.GetShards(ctx, withCache, t.collectionName)
|
||||
@ -323,17 +322,16 @@ func (t *queryTask) Execute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
err := executeQuery(WithCache)
|
||||
if errors.Is(err, errInvalidShardLeaders) || funcutil.IsGrpcErr(err) || errors.Is(err, grpcclient.ErrConnect) {
|
||||
log.Ctx(ctx).Warn("invalid shard leaders cache, updating shardleader caches and retry search",
|
||||
zap.Int64("msgID", t.ID()), zap.Error(err))
|
||||
return executeQuery(WithoutCache)
|
||||
if err != nil {
|
||||
log.Warn("invalid shard leaders cache, updating shardleader caches and retry query", zap.Error(err))
|
||||
err = executeQuery(WithoutCache)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("fail to search on all shard leaders, err=%s", err.Error())
|
||||
return fmt.Errorf("fail to query on all shard leaders, err=%s", err.Error())
|
||||
}
|
||||
|
||||
log.Ctx(ctx).Debug("Query Execute done.",
|
||||
zap.Int64("msgID", t.ID()), zap.Any("requestType", "query"))
|
||||
log.Debug("Query Execute done.",
|
||||
zap.Int64("msgID", t.ID()), zap.String("requestType", "query"))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -21,7 +21,6 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/internal/util/distance"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/grpcclient"
|
||||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
"github.com/milvus-io/milvus/internal/util/trace"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
@ -389,6 +388,7 @@ func (t *searchTask) Execute(ctx context.Context) error {
|
||||
|
||||
tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute search %d", t.ID()))
|
||||
defer tr.CtxElapse(ctx, "done")
|
||||
log := log.Ctx(ctx)
|
||||
|
||||
executeSearch := func(withCache bool) error {
|
||||
shard2Leaders, err := globalMetaCache.GetShards(ctx, withCache, t.collectionName)
|
||||
@ -398,23 +398,22 @@ func (t *searchTask) Execute(ctx context.Context) error {
|
||||
t.resultBuf = make(chan *internalpb.SearchResults, len(shard2Leaders))
|
||||
t.toReduceResults = make([]*internalpb.SearchResults, 0, len(shard2Leaders))
|
||||
if err := t.searchShardPolicy(ctx, t.shardMgr, t.searchShard, shard2Leaders); err != nil {
|
||||
log.Ctx(ctx).Warn("failed to do search", zap.Error(err), zap.String("Shards", fmt.Sprintf("%v", shard2Leaders)))
|
||||
log.Warn("failed to do search", zap.String("Shards", fmt.Sprintf("%v", shard2Leaders)), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
err := executeSearch(WithCache)
|
||||
if errors.Is(err, errInvalidShardLeaders) || funcutil.IsGrpcErr(err) || errors.Is(err, grpcclient.ErrConnect) {
|
||||
log.Ctx(ctx).Warn("first search failed, updating shardleader caches and retry search",
|
||||
zap.Int64("msgID", t.ID()), zap.Error(err))
|
||||
return executeSearch(WithoutCache)
|
||||
if err != nil {
|
||||
log.Warn("first search failed, updating shardleader caches and retry search", zap.Error(err))
|
||||
err = executeSearch(WithoutCache)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("fail to search on all shard leaders, err=%v", err)
|
||||
}
|
||||
|
||||
log.Ctx(ctx).Debug("Search Execute done.", zap.Int64("msgID", t.ID()))
|
||||
log.Debug("Search Execute done")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user