From 3f063a29b009adb37cdf47b4b53cc5deb06a5fca Mon Sep 17 00:00:00 2001 From: liliu-z <105927039+liliu-z@users.noreply.github.com> Date: Wed, 10 Dec 2025 10:59:14 +0800 Subject: [PATCH] feat: Support Search By PK (#45820) issue: #39157 Overview: Support search by PK by resolving IDs to vectors on Proxy side. Upgrade go-api to adapt to new proto definitions. Design: - Upgrade milvus-proto/go-api to latest master. - Implement handleIfSearchByPK in Proxy: resolve IDs to vectors via internal Query, then rewrite SearchRequest. - Adapt to 'SearchInput' oneof field in SearchRequest across client and handlers. - Fix binary vector stride calculation bug in placeholder utils. Compatibility: - Old Pymilvus can still work w/o this feature What is included: - Dense and Sparse - Multi vector fields - Rejection on BM25 What is **not** include: - Hybrid Search - EmbeddingList - Restful API Signed-off-by: Li Liu --- client/go.mod | 2 +- client/go.sum | 4 +- client/milvusclient/read_options.go | 5 +- go.mod | 2 +- go.sum | 4 +- .../distributed/proxy/httpserver/handler.go | 8 +- .../proxy/httpserver/handler_v1.go | 10 +- .../proxy/httpserver/handler_v2.go | 22 +- internal/proxy/impl.go | 210 +++++++++++++++--- internal/proxy/proxy_test.go | 99 ++++----- internal/proxy/task_search.go | 4 +- internal/proxy/task_search_test.go | 8 +- internal/proxy/task_test.go | 18 +- pkg/go.mod | 2 +- pkg/go.sum | 4 +- pkg/util/funcutil/placeholdergroup.go | 10 +- pkg/util/funcutil/placeholdergroup_test.go | 6 +- tests/go_client/go.mod | 2 +- tests/go_client/go.sum | 4 +- tests/integration/hellomilvus/sparse_test.go | 4 +- tests/integration/util_query.go | 36 +-- 21 files changed, 307 insertions(+), 157 deletions(-) diff --git a/client/go.mod b/client/go.mod index ebdfe10792..0bec47437e 100644 --- a/client/go.mod +++ b/client/go.mod @@ -6,7 +6,7 @@ require ( github.com/blang/semver/v4 v4.0.0 github.com/cockroachdb/errors v1.9.1 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 - github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1 + github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45 github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256 github.com/quasilyte/go-ruleguard/dsl v0.3.23 github.com/samber/lo v1.27.0 diff --git a/client/go.sum b/client/go.sum index 82e599f179..8122feb488 100644 --- a/client/go.sum +++ b/client/go.sum @@ -330,8 +330,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5 github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= -github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1 h1:nSZoftB+vB285AwYAOoJnwxKPMhP7l0p+VurCJGG9ds= -github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45 h1:TMUhlirMCH2zgJD+qClP5EP0yuFl1VrE4j+0fiRSuJU= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256 h1:M2waty0w2k4YT2HHzJk3fx6EFPD4DKxNJatitIV+gGU= github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256/go.mod h1:HT6Wxahwj/l8+i+D/C3iwDzCjDa36U9gyVw6CjjK4pE= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= diff --git a/client/milvusclient/read_options.go b/client/milvusclient/read_options.go index d99b97216f..c19ba744da 100644 --- a/client/milvusclient/read_options.go +++ b/client/milvusclient/read_options.go @@ -102,10 +102,13 @@ func (r *AnnRequest) searchRequest() (*milvuspb.SearchRequest, error) { var err error // placeholder group - request.PlaceholderGroup, err = vector2PlaceholderGroupBytes(r.vectors) + placeHolderGroupBytes, err := vector2PlaceholderGroupBytes(r.vectors) if err != nil { return nil, err } + request.SearchInput = &milvuspb.SearchRequest_PlaceholderGroup{ + PlaceholderGroup: placeHolderGroupBytes, + } params := map[string]string{ spAnnsField: r.annField, diff --git a/go.mod b/go.mod index e057e9b06a..828be72a15 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.18.0 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119072500-4bd276fe335e + github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45 github.com/minio/minio-go/v7 v7.0.73 github.com/panjf2000/ants/v2 v2.11.3 // indirect github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 // indirect diff --git a/go.sum b/go.sum index 59d5d3f611..3312637813 100644 --- a/go.sum +++ b/go.sum @@ -799,8 +799,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119072500-4bd276fe335e h1:NZx+z8JDXrLf/y+bIs360ypdPDM9AZTkkA0Fi1v3MWc= -github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119072500-4bd276fe335e/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45 h1:TMUhlirMCH2zgJD+qClP5EP0yuFl1VrE4j+0fiRSuJU= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= diff --git a/internal/distributed/proxy/httpserver/handler.go b/internal/distributed/proxy/httpserver/handler.go index 7c3c03b46c..089c9e8c1f 100644 --- a/internal/distributed/proxy/httpserver/handler.go +++ b/internal/distributed/proxy/httpserver/handler.go @@ -374,9 +374,13 @@ func (h *Handlers) handleSearch(c *gin.Context) (interface{}, error) { Nq: wrappedReq.Nq, } if len(wrappedReq.BinaryVectors) > 0 { - req.PlaceholderGroup = binaryVector2Bytes(wrappedReq.BinaryVectors) + req.SearchInput = &milvuspb.SearchRequest_PlaceholderGroup{ + PlaceholderGroup: binaryVector2Bytes(wrappedReq.BinaryVectors), + } } else { - req.PlaceholderGroup = vector2Bytes(wrappedReq.Vectors) + req.SearchInput = &milvuspb.SearchRequest_PlaceholderGroup{ + PlaceholderGroup: vector2Bytes(wrappedReq.Vectors), + } } return h.proxy.Search(c, &req) } diff --git a/internal/distributed/proxy/httpserver/handler_v1.go b/internal/distributed/proxy/httpserver/handler_v1.go index d830e66c2f..ea7a383c95 100644 --- a/internal/distributed/proxy/httpserver/handler_v1.go +++ b/internal/distributed/proxy/httpserver/handler_v1.go @@ -942,10 +942,12 @@ func (h *HandlersV1) search(c *gin.Context) { return } req := &milvuspb.SearchRequest{ - DbName: httpReq.DbName, - CollectionName: httpReq.CollectionName, - Dsl: httpReq.Filter, - PlaceholderGroup: vectors2PlaceholderGroupBytes([][]float32{httpReq.Vector}), + DbName: httpReq.DbName, + CollectionName: httpReq.CollectionName, + Dsl: httpReq.Filter, + SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{ + PlaceholderGroup: vectors2PlaceholderGroupBytes([][]float32{httpReq.Vector}), + }, DslType: commonpb.DslType_BoolExprV1, OutputFields: httpReq.OutputFields, GuaranteeTimestamp: BoundedTimestamp, diff --git a/internal/distributed/proxy/httpserver/handler_v2.go b/internal/distributed/proxy/httpserver/handler_v2.go index d428d4bd60..ea6d7ded15 100644 --- a/internal/distributed/proxy/httpserver/handler_v2.go +++ b/internal/distributed/proxy/httpserver/handler_v2.go @@ -1389,7 +1389,9 @@ func (h *HandlersV2) search(ctx context.Context, c *gin.Context, anyReq any, dbN return nil, err } req.SearchParams = searchParams - req.PlaceholderGroup = placeholderGroup + req.SearchInput = &milvuspb.SearchRequest_PlaceholderGroup{ + PlaceholderGroup: placeholderGroup, + } req.ExprTemplateValues = generateExpressionTemplate(httpReq.ExprParams) resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Search", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) { return h.proxy.Search(reqCtx, req.(*milvuspb.SearchRequest)) @@ -1507,14 +1509,16 @@ func (h *HandlersV2) advancedSearch(ctx context.Context, c *gin.Context, anyReq return nil, err } searchReq := &milvuspb.SearchRequest{ - DbName: dbName, - CollectionName: httpReq.CollectionName, - Dsl: subReq.Filter, - PlaceholderGroup: placeholderGroup, - DslType: commonpb.DslType_BoolExprV1, - OutputFields: httpReq.OutputFields, - PartitionNames: httpReq.PartitionNames, - SearchParams: searchParams, + DbName: dbName, + CollectionName: httpReq.CollectionName, + Dsl: subReq.Filter, + SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{ + PlaceholderGroup: placeholderGroup, + }, + DslType: commonpb.DslType_BoolExprV1, + OutputFields: httpReq.OutputFields, + PartitionNames: httpReq.PartitionNames, + SearchParams: searchParams, } searchReq.ExprTemplateValues = generateExpressionTemplate(subReq.ExprParams) req.Requests = append(req.Requests, searchReq) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index e02e922f17..7823c70019 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/errors" "github.com/gin-gonic/gin" + "github.com/samber/lo" "github.com/tidwall/gjson" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" @@ -42,6 +43,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/http" + "github.com/milvus-io/milvus/internal/parser/planparserv2" "github.com/milvus-io/milvus/internal/proxy/connection" "github.com/milvus-io/milvus/internal/proxy/privilege" "github.com/milvus-io/milvus/internal/proxy/replicate" @@ -2928,15 +2930,11 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest, ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Search") defer sp.End() - if request.SearchByPrimaryKeys { - placeholderGroupBytes, err := node.getVectorPlaceholderGroupForSearchByPks(ctx, request) - if err != nil { - return &milvuspb.SearchResults{ - Status: merr.Status(err), - }, false, false, false, nil - } - - request.PlaceholderGroup = placeholderGroupBytes + // Handle search by primary keys: transform IDs to vectors + if err := node.handleIfSearchByPK(ctx, request); err != nil { + return &milvuspb.SearchResults{ + Status: merr.Status(err), + }, false, false, false, nil } qt := &searchTask{ @@ -2967,7 +2965,7 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest, zap.String("collection", request.CollectionName), zap.Strings("partitions", request.PartitionNames), zap.String("dsl", request.Dsl), - zap.Int("len(PlaceholderGroup)", len(request.PlaceholderGroup)), + zap.Int("len(PlaceholderGroup)", len(request.GetPlaceholderGroup())), zap.Strings("OutputFields", request.OutputFields), zap.Any("search_params", request.SearchParams), zap.String("ConsistencyLevel", request.GetConsistencyLevel().String()), @@ -3324,59 +3322,197 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea return qt.result, qt.resultSizeInsufficient, qt.isTopkReduce, nil } -func (node *Proxy) getVectorPlaceholderGroupForSearchByPks(ctx context.Context, request *milvuspb.SearchRequest) ([]byte, error) { - placeholderGroup := &commonpb.PlaceholderGroup{} - err := proto.Unmarshal(request.PlaceholderGroup, placeholderGroup) +// validateIDsType validates that the IDs type matches the primary key field type +func validateIDsType(pkField *schemapb.FieldSchema, ids *schemapb.IDs) error { + if ids == nil { + return nil + } + + pkType := pkField.GetDataType() + switch pkType { + case schemapb.DataType_Int64: + if ids.GetIntId() == nil { + return merr.WrapErrParameterInvalid("int64 IDs", "got other type", + "primary key is int64, but IDs type mismatch") + } + case schemapb.DataType_VarChar: + if ids.GetStrId() == nil { + return merr.WrapErrParameterInvalid("string IDs", "got other type", + "primary key is varchar, but IDs type mismatch") + } + default: + return merr.WrapErrParameterInvalid("int64 or varchar", pkType.String(), + fmt.Sprintf("unsupported primary key type: %s", pkType.String())) + } + + return nil +} + +// After this function, the request will have PlaceholderGroup set, ready for normal search pipeline. +// If the request is not search-by-IDs, this function does nothing. +// +// Returns error if the transformation fails. +func (node *Proxy) handleIfSearchByPK(ctx context.Context, request *milvuspb.SearchRequest) error { + // Check if this is a search by PK request + ids := request.GetIds() + if ids == nil || typeutil.GetSizeOfIDs(ids) == 0 { + return nil // Not search by PK, do nothing + } + + // Get collection schema for validation and plan building + collectionInfo, err := globalMetaCache.GetCollectionInfo(ctx, + request.GetDbName(), request.GetCollectionName(), 0) if err != nil { - return nil, err + return err } - if len(placeholderGroup.Placeholders) != 1 || len(placeholderGroup.Placeholders[0].Values) != 1 { - return nil, merr.WrapErrParameterInvalidMsg("please provide primary key") + // Validate that anns_field is provided + annsFieldName, err := funcutil.GetAttrByKeyFromRepeatedKV(AnnsFieldKey, request.SearchParams) + if err != nil || annsFieldName == "" { + return merr.WrapErrParameterInvalid("valid anns_field in search_params", "missing", + "anns_field is required for search by IDs") } - queryExpr := string(placeholderGroup.Placeholders[0].Values[0]) - annsField, err := funcutil.GetAttrByKeyFromRepeatedKV(AnnsFieldKey, request.SearchParams) + annField := typeutil.GetFieldByName(collectionInfo.schema.CollectionSchema, annsFieldName) + if annField == nil { + return merr.WrapErrFieldNotFound(annsFieldName, "vector field not found in schema") + } + + if annField.GetDataType() == schemapb.DataType_ArrayOfVector { + return merr.WrapErrParameterInvalidMsg("array of vector is not supported for search by IDs") + } + + if !typeutil.IsVectorType(annField.GetDataType()) { + return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("field (%s) to search is not of vector data type", annsFieldName)) + } + + // Get primary key field + pkField, err := collectionInfo.schema.GetPkField() if err != nil { - return nil, err + return err } - queryRequest := &milvuspb.QueryRequest{ + // Validate IDs type matches primary key type + if err := validateIDsType(pkField, ids); err != nil { + return err + } + + // Create requery plan using IDs (no expr parsing overhead) + plan := planparserv2.CreateRequeryPlan(pkField, ids) + + // Build query request to fetch vectors by IDs + queryReq := &milvuspb.QueryRequest{ Base: request.Base, DbName: request.DbName, CollectionName: request.CollectionName, - Expr: queryExpr, - OutputFields: []string{annsField}, + OutputFields: []string{pkField.GetName(), annsFieldName}, // Only need the vector field PartitionNames: request.PartitionNames, TravelTimestamp: request.TravelTimestamp, GuaranteeTimestamp: request.GuaranteeTimestamp, - QueryParams: nil, - NotReturnAllMeta: request.NotReturnAllMeta, ConsistencyLevel: request.ConsistencyLevel, UseDefaultConsistency: request.UseDefaultConsistency, } - queryResults, _ := node.Query(ctx, queryRequest) - - err = merr.Error(queryResults.GetStatus()) - if err != nil { - return nil, err + // Create queryTask to execute the retrieval + qt := &queryTask{ + ctx: ctx, + Condition: NewTaskCondition(ctx), + RetrieveRequest: &internalpb.RetrieveRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_Retrieve), + commonpbutil.WithSourceID(paramtable.GetNodeID()), + ), + ReqID: paramtable.GetNodeID(), + ConsistencyLevel: request.ConsistencyLevel, + }, + request: queryReq, + plan: plan, + mixCoord: node.mixCoord, + lb: node.lbPolicy, + shardclientMgr: node.shardMgr, + mustUsePartitionKey: Params.ProxyCfg.MustUsePartitionKey.GetAsBool(), + // reQuery defaults to false - we need full query processing: + // partition conversion, struct field reconstruction, timestamp handling etc } - var vectorFieldsData *schemapb.FieldData - for _, fieldsData := range queryResults.GetFieldsData() { - if fieldsData.GetFieldName() == annsField { - vectorFieldsData = fieldsData - break + // Execute query + queryResult, _, err := node.query(ctx, qt, nil) + if err != nil { + return err + } + + if !merr.Ok(queryResult.GetStatus()) { + return merr.Error(queryResult.GetStatus()) + } + + // Extract primary key field to check result count + pkFieldData := lo.FindOrElse(queryResult.GetFieldsData(), nil, func(f *schemapb.FieldData) bool { + return f.GetFieldName() == pkField.GetName() + }) + + if pkFieldData == nil { + return merr.WrapErrFieldNotFound(pkField.GetName(), "primary key field not found in query result") + } + + // Check if the returned pk count matches the input IDs count + inputIDsCount := typeutil.GetSizeOfIDs(ids) + returnedPKCount := typeutil.GetPKSize(pkFieldData) + if returnedPKCount != inputIDsCount { + // Find which IDs are missing + returnedPKSet := make(map[interface{}]struct{}) + switch pkFieldData.GetType() { + case schemapb.DataType_Int64: + for _, pk := range pkFieldData.GetScalars().GetLongData().GetData() { + returnedPKSet[pk] = struct{}{} + } + case schemapb.DataType_VarChar: + for _, pk := range pkFieldData.GetScalars().GetStringData().GetData() { + returnedPKSet[pk] = struct{}{} + } } + + var missingIDs []interface{} + switch ids.GetIdField().(type) { + case *schemapb.IDs_IntId: + for _, id := range ids.GetIntId().GetData() { + if _, exists := returnedPKSet[id]; !exists { + missingIDs = append(missingIDs, id) + } + } + case *schemapb.IDs_StrId: + for _, id := range ids.GetStrId().GetData() { + if _, exists := returnedPKSet[id]; !exists { + missingIDs = append(missingIDs, id) + } + } + } + + return merr.WrapErrParameterInvalidMsg( + fmt.Sprintf("some of the provided primary key IDs do not exist: missing IDs = %v", missingIDs)) } - placeholderGroupBytes, err := funcutil.FieldDataToPlaceholderGroupBytes(vectorFieldsData) + // Extract vector field from query result + vectorFieldData := lo.FindOrElse(queryResult.GetFieldsData(), nil, func(f *schemapb.FieldData) bool { + return f.GetFieldName() == annsFieldName || f.GetType() == schemapb.DataType_ArrayOfStruct + }) + + if vectorFieldData == nil { + return merr.WrapErrFieldNotFound(annsFieldName, "vector field not found in query result") + } + + // Convert to PlaceholderGroup + placeholderBytes, err := funcutil.FieldDataToPlaceholderGroupBytes(vectorFieldData) if err != nil { - return nil, err + return err } - return placeholderGroupBytes, nil + // Transform request: replace IDs with PlaceholderGroup + // Now the request is ready for normal search pipeline + request.SearchInput = &milvuspb.SearchRequest_PlaceholderGroup{ + PlaceholderGroup: placeholderBytes, + } + + return nil } // Flush notify data nodes to persist the data of collection. diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 79ae46a061..46389bb0a0 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -629,18 +629,19 @@ func constructTestSearchRequest(dbName, collectionName, floatVecField, expr stri } return &milvuspb.SearchRequest{ - Base: nil, - DbName: dbName, - CollectionName: collectionName, - PartitionNames: nil, - Dsl: expr, - PlaceholderGroup: plgBs, - DslType: commonpb.DslType_BoolExprV1, - OutputFields: nil, - SearchParams: searchParams, - TravelTimestamp: 0, - GuaranteeTimestamp: 0, - SearchByPrimaryKeys: false, + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionNames: nil, + Dsl: expr, + SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{ + PlaceholderGroup: plgBs, + }, + DslType: commonpb.DslType_BoolExprV1, + OutputFields: nil, + SearchParams: searchParams, + TravelTimestamp: 0, + GuaranteeTimestamp: 0, } } @@ -725,42 +726,23 @@ func constructTestEmbeddingListSearchRequest(dbName, collectionName, structFVec, } return &milvuspb.SearchRequest{ - Base: nil, - DbName: dbName, - CollectionName: collectionName, - PartitionNames: nil, - Dsl: expr, - PlaceholderGroup: plgBs, - DslType: commonpb.DslType_BoolExprV1, - OutputFields: nil, - SearchParams: searchParams, - TravelTimestamp: 0, - GuaranteeTimestamp: 0, - SearchByPrimaryKeys: false, - } -} - -// Helper functions for TestProxy -func constructPrimaryKeysPlaceholderGroup(int64Field string, insertedIDs []int64) *commonpb.PlaceholderGroup { - expr := fmt.Sprintf("%v in [%v]", int64Field, insertedIDs[0]) - exprBytes := []byte(expr) - - return &commonpb.PlaceholderGroup{ - Placeholders: []*commonpb.PlaceholderValue{ - { - Tag: "$0", - Type: commonpb.PlaceholderType_None, - Values: [][]byte{exprBytes}, - }, + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionNames: nil, + Dsl: expr, + SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{ + PlaceholderGroup: plgBs, }, + DslType: commonpb.DslType_BoolExprV1, + OutputFields: nil, + SearchParams: searchParams, + TravelTimestamp: 0, + GuaranteeTimestamp: 0, } } func constructSearchByPksRequest(t *testing.T, dbName, collectionName, floatVecField, int64Field string, insertedIDs []int64, nprobe, topk, roundDecimal int) *milvuspb.SearchRequest { - plg := constructPrimaryKeysPlaceholderGroup(int64Field, insertedIDs) - plgBs, err := proto.Marshal(plg) - assert.NoError(t, err) - params := make(map[string]string) params["nprobe"] = strconv.Itoa(nprobe) b, err := json.Marshal(params) @@ -774,18 +756,25 @@ func constructSearchByPksRequest(t *testing.T, dbName, collectionName, floatVecF } return &milvuspb.SearchRequest{ - Base: nil, - DbName: dbName, - CollectionName: collectionName, - PartitionNames: nil, - Dsl: "", - PlaceholderGroup: plgBs, - DslType: commonpb.DslType_BoolExprV1, - OutputFields: nil, - SearchParams: searchParams, - TravelTimestamp: 0, - GuaranteeTimestamp: 0, - SearchByPrimaryKeys: true, + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionNames: nil, + Dsl: "", + SearchInput: &milvuspb.SearchRequest_Ids{ + Ids: &schemapb.IDs{ + IdField: &schemapb.IDs_IntId{ + IntId: &schemapb.LongArray{ + Data: insertedIDs, + }, + }, + }, + }, + DslType: commonpb.DslType_BoolExprV1, + OutputFields: nil, + SearchParams: searchParams, + TravelTimestamp: 0, + GuaranteeTimestamp: 0, } } diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 4d2332733e..8850a50cbf 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -705,9 +705,9 @@ func (t *searchTask) initSearchRequest(ctx context.Context) error { return err } if typeutil.IsFieldSparseFloatVector(t.schema.CollectionSchema, t.SearchRequest.FieldId) { - metrics.ProxySearchSparseNumNonZeros.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), t.collectionName, metrics.SearchLabel, strconv.FormatInt(t.SearchRequest.FieldId, 10)).Observe(float64(typeutil.EstimateSparseVectorNNZFromPlaceholderGroup(t.request.PlaceholderGroup, int(t.request.GetNq())))) + metrics.ProxySearchSparseNumNonZeros.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), t.collectionName, metrics.SearchLabel, strconv.FormatInt(t.SearchRequest.FieldId, 10)).Observe(float64(typeutil.EstimateSparseVectorNNZFromPlaceholderGroup(t.request.GetPlaceholderGroup(), int(t.request.GetNq())))) } - t.SearchRequest.PlaceholderGroup = t.request.PlaceholderGroup + t.SearchRequest.PlaceholderGroup = t.request.GetPlaceholderGroup() t.SearchRequest.Topk = queryInfo.GetTopk() t.SearchRequest.MetricType = queryInfo.GetMetricType() t.queryInfos = append(t.queryInfos, queryInfo) diff --git a/internal/proxy/task_search_test.go b/internal/proxy/task_search_test.go index 9f6579baa0..a811b30c5d 100644 --- a/internal/proxy/task_search_test.go +++ b/internal/proxy/task_search_test.go @@ -1082,7 +1082,9 @@ func TestSearchTask_WithFunctions(t *testing.T) { {Key: AnnsFieldKey, Value: "vector1"}, {Key: TopKKey, Value: "10"}, }, - PlaceholderGroup: holderByte, + SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{ + PlaceholderGroup: holderByte, + }, }, mixCoord: qc, tr: timerecord.NewTimeRecorder("test-search"), @@ -4857,7 +4859,9 @@ func TestSearchTask_InitSearchRequestWithStructArrayFields(t *testing.T) { {Key: common.MetricTypeKey, Value: metric.L2}, {Key: ParamsKey, Value: `{"nprobe": 10}`}, }, - PlaceholderGroup: nil, + SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{ + PlaceholderGroup: nil, + }, ConsistencyLevel: commonpb.ConsistencyLevel_Session, }, schema: schemaInfo, diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 188acc28aa..0452e0c178 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -561,14 +561,16 @@ func constructSearchRequest( } return &milvuspb.SearchRequest{ - Base: nil, - DbName: dbName, - CollectionName: collectionName, - PartitionNames: nil, - Dsl: expr, - PlaceholderGroup: plgBs, - DslType: commonpb.DslType_BoolExprV1, - OutputFields: nil, + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionNames: nil, + Dsl: expr, + SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{ + PlaceholderGroup: plgBs, + }, + DslType: commonpb.DslType_BoolExprV1, + OutputFields: nil, SearchParams: []*commonpb.KeyValuePair{ { Key: common.MetricTypeKey, diff --git a/pkg/go.mod b/pkg/go.mod index 40c805165c..02a4ea2df9 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -22,7 +22,7 @@ require ( github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/json-iterator/go v1.1.13-0.20220915233716-71ac16282d12 github.com/klauspost/compress v1.18.0 - github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1 + github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45 github.com/minio/minio-go/v7 v7.0.73 github.com/panjf2000/ants/v2 v2.11.3 github.com/prometheus/client_golang v1.20.5 diff --git a/pkg/go.sum b/pkg/go.sum index 887ccd93e6..21113c0eae 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -621,8 +621,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1 h1:nSZoftB+vB285AwYAOoJnwxKPMhP7l0p+VurCJGG9ds= -github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45 h1:TMUhlirMCH2zgJD+qClP5EP0yuFl1VrE4j+0fiRSuJU= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.73 h1:qr2vi96Qm7kZ4v7LLebjte+MQh621fFWnv93p12htEo= diff --git a/pkg/util/funcutil/placeholdergroup.go b/pkg/util/funcutil/placeholdergroup.go index 17d1a83017..ac39e498b8 100644 --- a/pkg/util/funcutil/placeholdergroup.go +++ b/pkg/util/funcutil/placeholdergroup.go @@ -96,7 +96,7 @@ func fieldDataToPlaceholderValue(fieldData *schemapb.FieldData) (*commonpb.Place placeholderValue := &commonpb.PlaceholderValue{ Tag: "$0", Type: commonpb.PlaceholderType_BinaryVector, - Values: flattenedByteVectorsToByteVectors(x.BinaryVector, int(vectors.Dim)), + Values: flattenedBinaryVectorsToByteVectors(x.BinaryVector, int(vectors.Dim)), } return placeholderValue, nil case schemapb.DataType_Float16Vector: @@ -120,7 +120,7 @@ func fieldDataToPlaceholderValue(fieldData *schemapb.FieldData) (*commonpb.Place placeholderValue := &commonpb.PlaceholderValue{ Tag: "$0", Type: commonpb.PlaceholderType_BFloat16Vector, - Values: flattenedFloat16VectorsToByteVectors(x.Bfloat16Vector, int(vectors.Dim)), + Values: flattenedBFloat16VectorsToByteVectors(x.Bfloat16Vector, int(vectors.Dim)), } return placeholderValue, nil case schemapb.DataType_SparseFloatVector: @@ -188,10 +188,10 @@ func floatVectorToByteVector(vector []float32) []byte { return data } -func flattenedByteVectorsToByteVectors(flattenedVectors []byte, dimension int) [][]byte { +func flattenedBinaryVectorsToByteVectors(flattenedVectors []byte, dimension int) [][]byte { result := make([][]byte, 0) - for i := 0; i < len(flattenedVectors); i += dimension { - result = append(result, flattenedVectors[i:i+dimension]) + for i := 0; i < len(flattenedVectors); i += dimension / 8 { + result = append(result, flattenedVectors[i:i+dimension/8]) } return result } diff --git a/pkg/util/funcutil/placeholdergroup_test.go b/pkg/util/funcutil/placeholdergroup_test.go index 83de1bd38a..d83625d596 100644 --- a/pkg/util/funcutil/placeholdergroup_test.go +++ b/pkg/util/funcutil/placeholdergroup_test.go @@ -6,11 +6,11 @@ import ( "github.com/stretchr/testify/assert" ) -func Test_flattenedByteVectorsToByteVectors(t *testing.T) { +func Test_flattenedBinaryVectorsToByteVectors(t *testing.T) { flattenedVectors := []byte{0, 1, 2, 3, 4, 5} - dimension := 3 + dimension := 24 - actual := flattenedByteVectorsToByteVectors(flattenedVectors, dimension) + actual := flattenedBinaryVectorsToByteVectors(flattenedVectors, dimension) expected := [][]byte{ {0, 1, 2}, {3, 4, 5}, diff --git a/tests/go_client/go.mod b/tests/go_client/go.mod index 60c38e58b9..58315c89ae 100644 --- a/tests/go_client/go.mod +++ b/tests/go_client/go.mod @@ -54,7 +54,7 @@ require ( github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/lufia/plan9stats v0.0.0-20240226150601-1dcf7310316a // indirect - github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1 // indirect + github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect diff --git a/tests/go_client/go.sum b/tests/go_client/go.sum index f5a1a02020..cd9cdaad3d 100644 --- a/tests/go_client/go.sum +++ b/tests/go_client/go.sum @@ -332,8 +332,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5 github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= -github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1 h1:nSZoftB+vB285AwYAOoJnwxKPMhP7l0p+VurCJGG9ds= -github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45 h1:TMUhlirMCH2zgJD+qClP5EP0yuFl1VrE4j+0fiRSuJU= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256 h1:M2waty0w2k4YT2HHzJk3fx6EFPD4DKxNJatitIV+gGU= github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256/go.mod h1:HT6Wxahwj/l8+i+D/C3iwDzCjDa36U9gyVw6CjjK4pE= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= diff --git a/tests/integration/hellomilvus/sparse_test.go b/tests/integration/hellomilvus/sparse_test.go index f398815227..781ee16739 100644 --- a/tests/integration/hellomilvus/sparse_test.go +++ b/tests/integration/hellomilvus/sparse_test.go @@ -483,7 +483,9 @@ func (s *HelloMilvusSuite) TestSparse_invalid_search_request() { if err != nil { panic(err) } - searchReq.PlaceholderGroup = plgBs + searchReq.SearchInput = &milvuspb.SearchRequest_PlaceholderGroup{ + PlaceholderGroup: plgBs, + } } sparseVecs := integration.GenerateSparseFloatArray(nq) diff --git a/tests/integration/util_query.go b/tests/integration/util_query.go index fe1445558c..82527c6c66 100644 --- a/tests/integration/util_query.go +++ b/tests/integration/util_query.go @@ -212,14 +212,16 @@ func constructSearchRequest( } return &milvuspb.SearchRequest{ - Base: nil, - DbName: dbName, - CollectionName: collectionName, - PartitionNames: nil, - Dsl: expr, - PlaceholderGroup: plgBs, - DslType: commonpb.DslType_BoolExprV1, - OutputFields: outputFields, + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionNames: nil, + Dsl: expr, + SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{ + PlaceholderGroup: plgBs, + }, + DslType: commonpb.DslType_BoolExprV1, + OutputFields: outputFields, SearchParams: []*commonpb.KeyValuePair{ { Key: common.MetricTypeKey, @@ -271,14 +273,16 @@ func ConstructSearchRequestWithConsistencyLevel( } return &milvuspb.SearchRequest{ - Base: nil, - DbName: dbName, - CollectionName: collectionName, - PartitionNames: nil, - Dsl: expr, - PlaceholderGroup: plgBs, - DslType: commonpb.DslType_BoolExprV1, - OutputFields: outputFields, + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionNames: nil, + Dsl: expr, + SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{ + PlaceholderGroup: plgBs, + }, + DslType: commonpb.DslType_BoolExprV1, + OutputFields: outputFields, SearchParams: []*commonpb.KeyValuePair{ { Key: common.MetricTypeKey,