enhance: [cherry-pick] Retry on incomplete query result (#35061)

This PR cherry-picks the following PRs:

1. Return specific error codes when encountering incomplete requery
results error. https://github.com/milvus-io/milvus/pull/31343
2. Retry on incomplete requery result in proxy.
https://github.com/milvus-io/milvus/pull/31713

issue: https://github.com/milvus-io/milvus/issues/34820

pr: https://github.com/milvus-io/milvus/pull/31343,
https://github.com/milvus-io/milvus/pull/31713

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-08-05 15:22:16 +08:00 committed by GitHub
parent ff7c1a79ee
commit 20dca130c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 36 additions and 3 deletions

View File

@ -56,6 +56,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -2508,8 +2509,26 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
return it.result, nil
}
// Search search the most similar records of requests.
// Search searches the most similar records of requests.
func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
var err error
rsp := &milvuspb.SearchResults{
Status: merr.Success(),
}
err2 := retry.Handle(ctx, func() (bool, error) {
rsp, err = node.search(ctx, request)
if errors.Is(merr.Error(rsp.GetStatus()), merr.ErrInconsistentRequery) {
return true, merr.Error(rsp.GetStatus())
}
return false, nil
})
if err2 != nil {
rsp.Status = merr.Status(err2)
}
return rsp, err
}
func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
receiveSize := proto.Size(request)
metrics.ProxyReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),

View File

@ -663,8 +663,8 @@ func (t *searchTask) Requery() error {
for i := 0; i < typeutil.GetSizeOfIDs(ids); i++ {
id := typeutil.GetPK(ids, int64(i))
if _, ok := offsets[id]; !ok {
return fmt.Errorf("incomplete query result, missing id %s, len(searchIDs) = %d, len(queryIDs) = %d, collection=%d",
id, typeutil.GetSizeOfIDs(ids), len(offsets), t.GetCollectionID())
return merr.WrapErrInconsistentRequery(fmt.Sprintf("incomplete query result, missing id %s, len(searchIDs) = %d, len(queryIDs) = %d, collection=%d",
id, typeutil.GetSizeOfIDs(ids), len(offsets), t.GetCollectionID()))
}
typeutil.AppendFieldData(t.result.Results.FieldsData, queryResult.GetFieldsData(), int64(offsets[id]))
}

View File

@ -153,6 +153,9 @@ var (
// import
ErrImportFailed = newMilvusError("importing data failed", 2100, false)
// Search/Query related
ErrInconsistentRequery = newMilvusError("inconsistent requery result", 2200, true)
)
type milvusError struct {

View File

@ -145,6 +145,9 @@ func (s *ErrSuite) TestWrap() {
// alias related
s.ErrorIs(WrapErrAliasNotFound("alias", "failed to get collection id"), ErrAliasNotFound)
s.ErrorIs(WrapErrCollectionIDOfAliasNotFound(1000, "failed to get collection id"), ErrCollectionIDOfAliasNotFound)
// Search/Query related
s.ErrorIs(WrapErrInconsistentRequery("unknown"), ErrInconsistentRequery)
}
func (s *ErrSuite) TestOldCode() {

View File

@ -941,3 +941,11 @@ func WrapErrImportFailed(msg ...string) error {
}
return err
}
func WrapErrInconsistentRequery(msg ...string) error {
err := error(ErrInconsistentRequery)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
}