diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 006ef87b6c..f216078e87 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -56,6 +56,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -2508,8 +2509,26 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) return it.result, nil } -// Search search the most similar records of requests. +// Search searches the most similar records of requests. func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) { + var err error + rsp := &milvuspb.SearchResults{ + Status: merr.Success(), + } + err2 := retry.Handle(ctx, func() (bool, error) { + rsp, err = node.search(ctx, request) + if errors.Is(merr.Error(rsp.GetStatus()), merr.ErrInconsistentRequery) { + return true, merr.Error(rsp.GetStatus()) + } + return false, nil + }) + if err2 != nil { + rsp.Status = merr.Status(err2) + } + return rsp, err +} + +func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) { receiveSize := proto.Size(request) metrics.ProxyReceiveBytes.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index e6c8765d0a..66e4df64b0 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -663,8 +663,8 @@ func (t *searchTask) Requery() error { for i := 0; i < typeutil.GetSizeOfIDs(ids); i++ { id := typeutil.GetPK(ids, int64(i)) if _, ok := offsets[id]; !ok { - return fmt.Errorf("incomplete query result, missing id %s, len(searchIDs) = %d, len(queryIDs) = %d, collection=%d", - id, typeutil.GetSizeOfIDs(ids), len(offsets), t.GetCollectionID()) + return merr.WrapErrInconsistentRequery(fmt.Sprintf("incomplete query result, missing id %s, len(searchIDs) = %d, len(queryIDs) = %d, collection=%d", + id, typeutil.GetSizeOfIDs(ids), len(offsets), t.GetCollectionID())) } typeutil.AppendFieldData(t.result.Results.FieldsData, queryResult.GetFieldsData(), int64(offsets[id])) } diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 4ac7809373..dc81e9f6b2 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -153,6 +153,9 @@ var ( // import ErrImportFailed = newMilvusError("importing data failed", 2100, false) + + // Search/Query related + ErrInconsistentRequery = newMilvusError("inconsistent requery result", 2200, true) ) type milvusError struct { diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index b9f862356d..53499b4520 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -145,6 +145,9 @@ func (s *ErrSuite) TestWrap() { // alias related s.ErrorIs(WrapErrAliasNotFound("alias", "failed to get collection id"), ErrAliasNotFound) s.ErrorIs(WrapErrCollectionIDOfAliasNotFound(1000, "failed to get collection id"), ErrCollectionIDOfAliasNotFound) + + // Search/Query related + s.ErrorIs(WrapErrInconsistentRequery("unknown"), ErrInconsistentRequery) } func (s *ErrSuite) TestOldCode() { diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index ebc66e21ce..7d740bf90f 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -941,3 +941,11 @@ func WrapErrImportFailed(msg ...string) error { } return err } + +func WrapErrInconsistentRequery(msg ...string) error { + err := error(ErrInconsistentRequery) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +}