mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-28 14:35:27 +08:00
feat: Support Search By PK (#45820)
issue: #39157 Overview: Support search by PK by resolving IDs to vectors on Proxy side. Upgrade go-api to adapt to new proto definitions. Design: - Upgrade milvus-proto/go-api to latest master. - Implement handleIfSearchByPK in Proxy: resolve IDs to vectors via internal Query, then rewrite SearchRequest. - Adapt to 'SearchInput' oneof field in SearchRequest across client and handlers. - Fix binary vector stride calculation bug in placeholder utils. Compatibility: - Old Pymilvus can still work w/o this feature What is included: - Dense and Sparse - Multi vector fields - Rejection on BM25 What is **not** include: - Hybrid Search - EmbeddingList - Restful API Signed-off-by: Li Liu <li.liu@zilliz.com>
This commit is contained in:
parent
b5e11f810d
commit
3f063a29b0
@ -6,7 +6,7 @@ require (
|
||||
github.com/blang/semver/v4 v4.0.0
|
||||
github.com/cockroachdb/errors v1.9.1
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45
|
||||
github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256
|
||||
github.com/quasilyte/go-ruleguard/dsl v0.3.23
|
||||
github.com/samber/lo v1.27.0
|
||||
|
||||
@ -330,8 +330,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
|
||||
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
|
||||
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
|
||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1 h1:nSZoftB+vB285AwYAOoJnwxKPMhP7l0p+VurCJGG9ds=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45 h1:TMUhlirMCH2zgJD+qClP5EP0yuFl1VrE4j+0fiRSuJU=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
|
||||
github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256 h1:M2waty0w2k4YT2HHzJk3fx6EFPD4DKxNJatitIV+gGU=
|
||||
github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256/go.mod h1:HT6Wxahwj/l8+i+D/C3iwDzCjDa36U9gyVw6CjjK4pE=
|
||||
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
|
||||
|
||||
@ -102,10 +102,13 @@ func (r *AnnRequest) searchRequest() (*milvuspb.SearchRequest, error) {
|
||||
|
||||
var err error
|
||||
// placeholder group
|
||||
request.PlaceholderGroup, err = vector2PlaceholderGroupBytes(r.vectors)
|
||||
placeHolderGroupBytes, err := vector2PlaceholderGroupBytes(r.vectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
request.SearchInput = &milvuspb.SearchRequest_PlaceholderGroup{
|
||||
PlaceholderGroup: placeHolderGroupBytes,
|
||||
}
|
||||
|
||||
params := map[string]string{
|
||||
spAnnsField: r.annField,
|
||||
|
||||
2
go.mod
2
go.mod
@ -21,7 +21,7 @@ require (
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
|
||||
github.com/klauspost/compress v1.18.0
|
||||
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119072500-4bd276fe335e
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45
|
||||
github.com/minio/minio-go/v7 v7.0.73
|
||||
github.com/panjf2000/ants/v2 v2.11.3 // indirect
|
||||
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 // indirect
|
||||
|
||||
4
go.sum
4
go.sum
@ -799,8 +799,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L
|
||||
github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
|
||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
|
||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119072500-4bd276fe335e h1:NZx+z8JDXrLf/y+bIs360ypdPDM9AZTkkA0Fi1v3MWc=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119072500-4bd276fe335e/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45 h1:TMUhlirMCH2zgJD+qClP5EP0yuFl1VrE4j+0fiRSuJU=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
|
||||
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
|
||||
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
|
||||
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
|
||||
|
||||
@ -374,9 +374,13 @@ func (h *Handlers) handleSearch(c *gin.Context) (interface{}, error) {
|
||||
Nq: wrappedReq.Nq,
|
||||
}
|
||||
if len(wrappedReq.BinaryVectors) > 0 {
|
||||
req.PlaceholderGroup = binaryVector2Bytes(wrappedReq.BinaryVectors)
|
||||
req.SearchInput = &milvuspb.SearchRequest_PlaceholderGroup{
|
||||
PlaceholderGroup: binaryVector2Bytes(wrappedReq.BinaryVectors),
|
||||
}
|
||||
} else {
|
||||
req.PlaceholderGroup = vector2Bytes(wrappedReq.Vectors)
|
||||
req.SearchInput = &milvuspb.SearchRequest_PlaceholderGroup{
|
||||
PlaceholderGroup: vector2Bytes(wrappedReq.Vectors),
|
||||
}
|
||||
}
|
||||
return h.proxy.Search(c, &req)
|
||||
}
|
||||
|
||||
@ -942,10 +942,12 @@ func (h *HandlersV1) search(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
req := &milvuspb.SearchRequest{
|
||||
DbName: httpReq.DbName,
|
||||
CollectionName: httpReq.CollectionName,
|
||||
Dsl: httpReq.Filter,
|
||||
PlaceholderGroup: vectors2PlaceholderGroupBytes([][]float32{httpReq.Vector}),
|
||||
DbName: httpReq.DbName,
|
||||
CollectionName: httpReq.CollectionName,
|
||||
Dsl: httpReq.Filter,
|
||||
SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{
|
||||
PlaceholderGroup: vectors2PlaceholderGroupBytes([][]float32{httpReq.Vector}),
|
||||
},
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: httpReq.OutputFields,
|
||||
GuaranteeTimestamp: BoundedTimestamp,
|
||||
|
||||
@ -1389,7 +1389,9 @@ func (h *HandlersV2) search(ctx context.Context, c *gin.Context, anyReq any, dbN
|
||||
return nil, err
|
||||
}
|
||||
req.SearchParams = searchParams
|
||||
req.PlaceholderGroup = placeholderGroup
|
||||
req.SearchInput = &milvuspb.SearchRequest_PlaceholderGroup{
|
||||
PlaceholderGroup: placeholderGroup,
|
||||
}
|
||||
req.ExprTemplateValues = generateExpressionTemplate(httpReq.ExprParams)
|
||||
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Search", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
|
||||
return h.proxy.Search(reqCtx, req.(*milvuspb.SearchRequest))
|
||||
@ -1507,14 +1509,16 @@ func (h *HandlersV2) advancedSearch(ctx context.Context, c *gin.Context, anyReq
|
||||
return nil, err
|
||||
}
|
||||
searchReq := &milvuspb.SearchRequest{
|
||||
DbName: dbName,
|
||||
CollectionName: httpReq.CollectionName,
|
||||
Dsl: subReq.Filter,
|
||||
PlaceholderGroup: placeholderGroup,
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: httpReq.OutputFields,
|
||||
PartitionNames: httpReq.PartitionNames,
|
||||
SearchParams: searchParams,
|
||||
DbName: dbName,
|
||||
CollectionName: httpReq.CollectionName,
|
||||
Dsl: subReq.Filter,
|
||||
SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{
|
||||
PlaceholderGroup: placeholderGroup,
|
||||
},
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: httpReq.OutputFields,
|
||||
PartitionNames: httpReq.PartitionNames,
|
||||
SearchParams: searchParams,
|
||||
}
|
||||
searchReq.ExprTemplateValues = generateExpressionTemplate(subReq.ExprParams)
|
||||
req.Requests = append(req.Requests, searchReq)
|
||||
|
||||
@ -27,6 +27,7 @@ import (
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/samber/lo"
|
||||
"github.com/tidwall/gjson"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
@ -42,6 +43,7 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/distributed/streaming"
|
||||
"github.com/milvus-io/milvus/internal/http"
|
||||
"github.com/milvus-io/milvus/internal/parser/planparserv2"
|
||||
"github.com/milvus-io/milvus/internal/proxy/connection"
|
||||
"github.com/milvus-io/milvus/internal/proxy/privilege"
|
||||
"github.com/milvus-io/milvus/internal/proxy/replicate"
|
||||
@ -2928,15 +2930,11 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest,
|
||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Search")
|
||||
defer sp.End()
|
||||
|
||||
if request.SearchByPrimaryKeys {
|
||||
placeholderGroupBytes, err := node.getVectorPlaceholderGroupForSearchByPks(ctx, request)
|
||||
if err != nil {
|
||||
return &milvuspb.SearchResults{
|
||||
Status: merr.Status(err),
|
||||
}, false, false, false, nil
|
||||
}
|
||||
|
||||
request.PlaceholderGroup = placeholderGroupBytes
|
||||
// Handle search by primary keys: transform IDs to vectors
|
||||
if err := node.handleIfSearchByPK(ctx, request); err != nil {
|
||||
return &milvuspb.SearchResults{
|
||||
Status: merr.Status(err),
|
||||
}, false, false, false, nil
|
||||
}
|
||||
|
||||
qt := &searchTask{
|
||||
@ -2967,7 +2965,7 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest,
|
||||
zap.String("collection", request.CollectionName),
|
||||
zap.Strings("partitions", request.PartitionNames),
|
||||
zap.String("dsl", request.Dsl),
|
||||
zap.Int("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
|
||||
zap.Int("len(PlaceholderGroup)", len(request.GetPlaceholderGroup())),
|
||||
zap.Strings("OutputFields", request.OutputFields),
|
||||
zap.Any("search_params", request.SearchParams),
|
||||
zap.String("ConsistencyLevel", request.GetConsistencyLevel().String()),
|
||||
@ -3324,59 +3322,197 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
|
||||
return qt.result, qt.resultSizeInsufficient, qt.isTopkReduce, nil
|
||||
}
|
||||
|
||||
func (node *Proxy) getVectorPlaceholderGroupForSearchByPks(ctx context.Context, request *milvuspb.SearchRequest) ([]byte, error) {
|
||||
placeholderGroup := &commonpb.PlaceholderGroup{}
|
||||
err := proto.Unmarshal(request.PlaceholderGroup, placeholderGroup)
|
||||
// validateIDsType validates that the IDs type matches the primary key field type
|
||||
func validateIDsType(pkField *schemapb.FieldSchema, ids *schemapb.IDs) error {
|
||||
if ids == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
pkType := pkField.GetDataType()
|
||||
switch pkType {
|
||||
case schemapb.DataType_Int64:
|
||||
if ids.GetIntId() == nil {
|
||||
return merr.WrapErrParameterInvalid("int64 IDs", "got other type",
|
||||
"primary key is int64, but IDs type mismatch")
|
||||
}
|
||||
case schemapb.DataType_VarChar:
|
||||
if ids.GetStrId() == nil {
|
||||
return merr.WrapErrParameterInvalid("string IDs", "got other type",
|
||||
"primary key is varchar, but IDs type mismatch")
|
||||
}
|
||||
default:
|
||||
return merr.WrapErrParameterInvalid("int64 or varchar", pkType.String(),
|
||||
fmt.Sprintf("unsupported primary key type: %s", pkType.String()))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// After this function, the request will have PlaceholderGroup set, ready for normal search pipeline.
|
||||
// If the request is not search-by-IDs, this function does nothing.
|
||||
//
|
||||
// Returns error if the transformation fails.
|
||||
func (node *Proxy) handleIfSearchByPK(ctx context.Context, request *milvuspb.SearchRequest) error {
|
||||
// Check if this is a search by PK request
|
||||
ids := request.GetIds()
|
||||
if ids == nil || typeutil.GetSizeOfIDs(ids) == 0 {
|
||||
return nil // Not search by PK, do nothing
|
||||
}
|
||||
|
||||
// Get collection schema for validation and plan building
|
||||
collectionInfo, err := globalMetaCache.GetCollectionInfo(ctx,
|
||||
request.GetDbName(), request.GetCollectionName(), 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
if len(placeholderGroup.Placeholders) != 1 || len(placeholderGroup.Placeholders[0].Values) != 1 {
|
||||
return nil, merr.WrapErrParameterInvalidMsg("please provide primary key")
|
||||
// Validate that anns_field is provided
|
||||
annsFieldName, err := funcutil.GetAttrByKeyFromRepeatedKV(AnnsFieldKey, request.SearchParams)
|
||||
if err != nil || annsFieldName == "" {
|
||||
return merr.WrapErrParameterInvalid("valid anns_field in search_params", "missing",
|
||||
"anns_field is required for search by IDs")
|
||||
}
|
||||
queryExpr := string(placeholderGroup.Placeholders[0].Values[0])
|
||||
|
||||
annsField, err := funcutil.GetAttrByKeyFromRepeatedKV(AnnsFieldKey, request.SearchParams)
|
||||
annField := typeutil.GetFieldByName(collectionInfo.schema.CollectionSchema, annsFieldName)
|
||||
if annField == nil {
|
||||
return merr.WrapErrFieldNotFound(annsFieldName, "vector field not found in schema")
|
||||
}
|
||||
|
||||
if annField.GetDataType() == schemapb.DataType_ArrayOfVector {
|
||||
return merr.WrapErrParameterInvalidMsg("array of vector is not supported for search by IDs")
|
||||
}
|
||||
|
||||
if !typeutil.IsVectorType(annField.GetDataType()) {
|
||||
return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("field (%s) to search is not of vector data type", annsFieldName))
|
||||
}
|
||||
|
||||
// Get primary key field
|
||||
pkField, err := collectionInfo.schema.GetPkField()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
queryRequest := &milvuspb.QueryRequest{
|
||||
// Validate IDs type matches primary key type
|
||||
if err := validateIDsType(pkField, ids); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create requery plan using IDs (no expr parsing overhead)
|
||||
plan := planparserv2.CreateRequeryPlan(pkField, ids)
|
||||
|
||||
// Build query request to fetch vectors by IDs
|
||||
queryReq := &milvuspb.QueryRequest{
|
||||
Base: request.Base,
|
||||
DbName: request.DbName,
|
||||
CollectionName: request.CollectionName,
|
||||
Expr: queryExpr,
|
||||
OutputFields: []string{annsField},
|
||||
OutputFields: []string{pkField.GetName(), annsFieldName}, // Only need the vector field
|
||||
PartitionNames: request.PartitionNames,
|
||||
TravelTimestamp: request.TravelTimestamp,
|
||||
GuaranteeTimestamp: request.GuaranteeTimestamp,
|
||||
QueryParams: nil,
|
||||
NotReturnAllMeta: request.NotReturnAllMeta,
|
||||
ConsistencyLevel: request.ConsistencyLevel,
|
||||
UseDefaultConsistency: request.UseDefaultConsistency,
|
||||
}
|
||||
|
||||
queryResults, _ := node.Query(ctx, queryRequest)
|
||||
|
||||
err = merr.Error(queryResults.GetStatus())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// Create queryTask to execute the retrieval
|
||||
qt := &queryTask{
|
||||
ctx: ctx,
|
||||
Condition: NewTaskCondition(ctx),
|
||||
RetrieveRequest: &internalpb.RetrieveRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
|
||||
commonpbutil.WithSourceID(paramtable.GetNodeID()),
|
||||
),
|
||||
ReqID: paramtable.GetNodeID(),
|
||||
ConsistencyLevel: request.ConsistencyLevel,
|
||||
},
|
||||
request: queryReq,
|
||||
plan: plan,
|
||||
mixCoord: node.mixCoord,
|
||||
lb: node.lbPolicy,
|
||||
shardclientMgr: node.shardMgr,
|
||||
mustUsePartitionKey: Params.ProxyCfg.MustUsePartitionKey.GetAsBool(),
|
||||
// reQuery defaults to false - we need full query processing:
|
||||
// partition conversion, struct field reconstruction, timestamp handling etc
|
||||
}
|
||||
|
||||
var vectorFieldsData *schemapb.FieldData
|
||||
for _, fieldsData := range queryResults.GetFieldsData() {
|
||||
if fieldsData.GetFieldName() == annsField {
|
||||
vectorFieldsData = fieldsData
|
||||
break
|
||||
// Execute query
|
||||
queryResult, _, err := node.query(ctx, qt, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !merr.Ok(queryResult.GetStatus()) {
|
||||
return merr.Error(queryResult.GetStatus())
|
||||
}
|
||||
|
||||
// Extract primary key field to check result count
|
||||
pkFieldData := lo.FindOrElse(queryResult.GetFieldsData(), nil, func(f *schemapb.FieldData) bool {
|
||||
return f.GetFieldName() == pkField.GetName()
|
||||
})
|
||||
|
||||
if pkFieldData == nil {
|
||||
return merr.WrapErrFieldNotFound(pkField.GetName(), "primary key field not found in query result")
|
||||
}
|
||||
|
||||
// Check if the returned pk count matches the input IDs count
|
||||
inputIDsCount := typeutil.GetSizeOfIDs(ids)
|
||||
returnedPKCount := typeutil.GetPKSize(pkFieldData)
|
||||
if returnedPKCount != inputIDsCount {
|
||||
// Find which IDs are missing
|
||||
returnedPKSet := make(map[interface{}]struct{})
|
||||
switch pkFieldData.GetType() {
|
||||
case schemapb.DataType_Int64:
|
||||
for _, pk := range pkFieldData.GetScalars().GetLongData().GetData() {
|
||||
returnedPKSet[pk] = struct{}{}
|
||||
}
|
||||
case schemapb.DataType_VarChar:
|
||||
for _, pk := range pkFieldData.GetScalars().GetStringData().GetData() {
|
||||
returnedPKSet[pk] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
var missingIDs []interface{}
|
||||
switch ids.GetIdField().(type) {
|
||||
case *schemapb.IDs_IntId:
|
||||
for _, id := range ids.GetIntId().GetData() {
|
||||
if _, exists := returnedPKSet[id]; !exists {
|
||||
missingIDs = append(missingIDs, id)
|
||||
}
|
||||
}
|
||||
case *schemapb.IDs_StrId:
|
||||
for _, id := range ids.GetStrId().GetData() {
|
||||
if _, exists := returnedPKSet[id]; !exists {
|
||||
missingIDs = append(missingIDs, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return merr.WrapErrParameterInvalidMsg(
|
||||
fmt.Sprintf("some of the provided primary key IDs do not exist: missing IDs = %v", missingIDs))
|
||||
}
|
||||
|
||||
placeholderGroupBytes, err := funcutil.FieldDataToPlaceholderGroupBytes(vectorFieldsData)
|
||||
// Extract vector field from query result
|
||||
vectorFieldData := lo.FindOrElse(queryResult.GetFieldsData(), nil, func(f *schemapb.FieldData) bool {
|
||||
return f.GetFieldName() == annsFieldName || f.GetType() == schemapb.DataType_ArrayOfStruct
|
||||
})
|
||||
|
||||
if vectorFieldData == nil {
|
||||
return merr.WrapErrFieldNotFound(annsFieldName, "vector field not found in query result")
|
||||
}
|
||||
|
||||
// Convert to PlaceholderGroup
|
||||
placeholderBytes, err := funcutil.FieldDataToPlaceholderGroupBytes(vectorFieldData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
return placeholderGroupBytes, nil
|
||||
// Transform request: replace IDs with PlaceholderGroup
|
||||
// Now the request is ready for normal search pipeline
|
||||
request.SearchInput = &milvuspb.SearchRequest_PlaceholderGroup{
|
||||
PlaceholderGroup: placeholderBytes,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Flush notify data nodes to persist the data of collection.
|
||||
|
||||
@ -629,18 +629,19 @@ func constructTestSearchRequest(dbName, collectionName, floatVecField, expr stri
|
||||
}
|
||||
|
||||
return &milvuspb.SearchRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionNames: nil,
|
||||
Dsl: expr,
|
||||
PlaceholderGroup: plgBs,
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: nil,
|
||||
SearchParams: searchParams,
|
||||
TravelTimestamp: 0,
|
||||
GuaranteeTimestamp: 0,
|
||||
SearchByPrimaryKeys: false,
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionNames: nil,
|
||||
Dsl: expr,
|
||||
SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{
|
||||
PlaceholderGroup: plgBs,
|
||||
},
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: nil,
|
||||
SearchParams: searchParams,
|
||||
TravelTimestamp: 0,
|
||||
GuaranteeTimestamp: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@ -725,42 +726,23 @@ func constructTestEmbeddingListSearchRequest(dbName, collectionName, structFVec,
|
||||
}
|
||||
|
||||
return &milvuspb.SearchRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionNames: nil,
|
||||
Dsl: expr,
|
||||
PlaceholderGroup: plgBs,
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: nil,
|
||||
SearchParams: searchParams,
|
||||
TravelTimestamp: 0,
|
||||
GuaranteeTimestamp: 0,
|
||||
SearchByPrimaryKeys: false,
|
||||
}
|
||||
}
|
||||
|
||||
// Helper functions for TestProxy
|
||||
func constructPrimaryKeysPlaceholderGroup(int64Field string, insertedIDs []int64) *commonpb.PlaceholderGroup {
|
||||
expr := fmt.Sprintf("%v in [%v]", int64Field, insertedIDs[0])
|
||||
exprBytes := []byte(expr)
|
||||
|
||||
return &commonpb.PlaceholderGroup{
|
||||
Placeholders: []*commonpb.PlaceholderValue{
|
||||
{
|
||||
Tag: "$0",
|
||||
Type: commonpb.PlaceholderType_None,
|
||||
Values: [][]byte{exprBytes},
|
||||
},
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionNames: nil,
|
||||
Dsl: expr,
|
||||
SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{
|
||||
PlaceholderGroup: plgBs,
|
||||
},
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: nil,
|
||||
SearchParams: searchParams,
|
||||
TravelTimestamp: 0,
|
||||
GuaranteeTimestamp: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func constructSearchByPksRequest(t *testing.T, dbName, collectionName, floatVecField, int64Field string, insertedIDs []int64, nprobe, topk, roundDecimal int) *milvuspb.SearchRequest {
|
||||
plg := constructPrimaryKeysPlaceholderGroup(int64Field, insertedIDs)
|
||||
plgBs, err := proto.Marshal(plg)
|
||||
assert.NoError(t, err)
|
||||
|
||||
params := make(map[string]string)
|
||||
params["nprobe"] = strconv.Itoa(nprobe)
|
||||
b, err := json.Marshal(params)
|
||||
@ -774,18 +756,25 @@ func constructSearchByPksRequest(t *testing.T, dbName, collectionName, floatVecF
|
||||
}
|
||||
|
||||
return &milvuspb.SearchRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionNames: nil,
|
||||
Dsl: "",
|
||||
PlaceholderGroup: plgBs,
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: nil,
|
||||
SearchParams: searchParams,
|
||||
TravelTimestamp: 0,
|
||||
GuaranteeTimestamp: 0,
|
||||
SearchByPrimaryKeys: true,
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionNames: nil,
|
||||
Dsl: "",
|
||||
SearchInput: &milvuspb.SearchRequest_Ids{
|
||||
Ids: &schemapb.IDs{
|
||||
IdField: &schemapb.IDs_IntId{
|
||||
IntId: &schemapb.LongArray{
|
||||
Data: insertedIDs,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: nil,
|
||||
SearchParams: searchParams,
|
||||
TravelTimestamp: 0,
|
||||
GuaranteeTimestamp: 0,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -705,9 +705,9 @@ func (t *searchTask) initSearchRequest(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
if typeutil.IsFieldSparseFloatVector(t.schema.CollectionSchema, t.SearchRequest.FieldId) {
|
||||
metrics.ProxySearchSparseNumNonZeros.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), t.collectionName, metrics.SearchLabel, strconv.FormatInt(t.SearchRequest.FieldId, 10)).Observe(float64(typeutil.EstimateSparseVectorNNZFromPlaceholderGroup(t.request.PlaceholderGroup, int(t.request.GetNq()))))
|
||||
metrics.ProxySearchSparseNumNonZeros.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), t.collectionName, metrics.SearchLabel, strconv.FormatInt(t.SearchRequest.FieldId, 10)).Observe(float64(typeutil.EstimateSparseVectorNNZFromPlaceholderGroup(t.request.GetPlaceholderGroup(), int(t.request.GetNq()))))
|
||||
}
|
||||
t.SearchRequest.PlaceholderGroup = t.request.PlaceholderGroup
|
||||
t.SearchRequest.PlaceholderGroup = t.request.GetPlaceholderGroup()
|
||||
t.SearchRequest.Topk = queryInfo.GetTopk()
|
||||
t.SearchRequest.MetricType = queryInfo.GetMetricType()
|
||||
t.queryInfos = append(t.queryInfos, queryInfo)
|
||||
|
||||
@ -1082,7 +1082,9 @@ func TestSearchTask_WithFunctions(t *testing.T) {
|
||||
{Key: AnnsFieldKey, Value: "vector1"},
|
||||
{Key: TopKKey, Value: "10"},
|
||||
},
|
||||
PlaceholderGroup: holderByte,
|
||||
SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{
|
||||
PlaceholderGroup: holderByte,
|
||||
},
|
||||
},
|
||||
mixCoord: qc,
|
||||
tr: timerecord.NewTimeRecorder("test-search"),
|
||||
@ -4857,7 +4859,9 @@ func TestSearchTask_InitSearchRequestWithStructArrayFields(t *testing.T) {
|
||||
{Key: common.MetricTypeKey, Value: metric.L2},
|
||||
{Key: ParamsKey, Value: `{"nprobe": 10}`},
|
||||
},
|
||||
PlaceholderGroup: nil,
|
||||
SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{
|
||||
PlaceholderGroup: nil,
|
||||
},
|
||||
ConsistencyLevel: commonpb.ConsistencyLevel_Session,
|
||||
},
|
||||
schema: schemaInfo,
|
||||
|
||||
@ -561,14 +561,16 @@ func constructSearchRequest(
|
||||
}
|
||||
|
||||
return &milvuspb.SearchRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionNames: nil,
|
||||
Dsl: expr,
|
||||
PlaceholderGroup: plgBs,
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: nil,
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionNames: nil,
|
||||
Dsl: expr,
|
||||
SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{
|
||||
PlaceholderGroup: plgBs,
|
||||
},
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: nil,
|
||||
SearchParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.MetricTypeKey,
|
||||
|
||||
@ -22,7 +22,7 @@ require (
|
||||
github.com/jolestar/go-commons-pool/v2 v2.1.2
|
||||
github.com/json-iterator/go v1.1.13-0.20220915233716-71ac16282d12
|
||||
github.com/klauspost/compress v1.18.0
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45
|
||||
github.com/minio/minio-go/v7 v7.0.73
|
||||
github.com/panjf2000/ants/v2 v2.11.3
|
||||
github.com/prometheus/client_golang v1.20.5
|
||||
|
||||
@ -621,8 +621,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L
|
||||
github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
|
||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
|
||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1 h1:nSZoftB+vB285AwYAOoJnwxKPMhP7l0p+VurCJGG9ds=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45 h1:TMUhlirMCH2zgJD+qClP5EP0yuFl1VrE4j+0fiRSuJU=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
|
||||
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
|
||||
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
|
||||
github.com/minio/minio-go/v7 v7.0.73 h1:qr2vi96Qm7kZ4v7LLebjte+MQh621fFWnv93p12htEo=
|
||||
|
||||
@ -96,7 +96,7 @@ func fieldDataToPlaceholderValue(fieldData *schemapb.FieldData) (*commonpb.Place
|
||||
placeholderValue := &commonpb.PlaceholderValue{
|
||||
Tag: "$0",
|
||||
Type: commonpb.PlaceholderType_BinaryVector,
|
||||
Values: flattenedByteVectorsToByteVectors(x.BinaryVector, int(vectors.Dim)),
|
||||
Values: flattenedBinaryVectorsToByteVectors(x.BinaryVector, int(vectors.Dim)),
|
||||
}
|
||||
return placeholderValue, nil
|
||||
case schemapb.DataType_Float16Vector:
|
||||
@ -120,7 +120,7 @@ func fieldDataToPlaceholderValue(fieldData *schemapb.FieldData) (*commonpb.Place
|
||||
placeholderValue := &commonpb.PlaceholderValue{
|
||||
Tag: "$0",
|
||||
Type: commonpb.PlaceholderType_BFloat16Vector,
|
||||
Values: flattenedFloat16VectorsToByteVectors(x.Bfloat16Vector, int(vectors.Dim)),
|
||||
Values: flattenedBFloat16VectorsToByteVectors(x.Bfloat16Vector, int(vectors.Dim)),
|
||||
}
|
||||
return placeholderValue, nil
|
||||
case schemapb.DataType_SparseFloatVector:
|
||||
@ -188,10 +188,10 @@ func floatVectorToByteVector(vector []float32) []byte {
|
||||
return data
|
||||
}
|
||||
|
||||
func flattenedByteVectorsToByteVectors(flattenedVectors []byte, dimension int) [][]byte {
|
||||
func flattenedBinaryVectorsToByteVectors(flattenedVectors []byte, dimension int) [][]byte {
|
||||
result := make([][]byte, 0)
|
||||
for i := 0; i < len(flattenedVectors); i += dimension {
|
||||
result = append(result, flattenedVectors[i:i+dimension])
|
||||
for i := 0; i < len(flattenedVectors); i += dimension / 8 {
|
||||
result = append(result, flattenedVectors[i:i+dimension/8])
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
@ -6,11 +6,11 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func Test_flattenedByteVectorsToByteVectors(t *testing.T) {
|
||||
func Test_flattenedBinaryVectorsToByteVectors(t *testing.T) {
|
||||
flattenedVectors := []byte{0, 1, 2, 3, 4, 5}
|
||||
dimension := 3
|
||||
dimension := 24
|
||||
|
||||
actual := flattenedByteVectorsToByteVectors(flattenedVectors, dimension)
|
||||
actual := flattenedBinaryVectorsToByteVectors(flattenedVectors, dimension)
|
||||
expected := [][]byte{
|
||||
{0, 1, 2},
|
||||
{3, 4, 5},
|
||||
|
||||
@ -54,7 +54,7 @@ require (
|
||||
github.com/kr/pretty v0.3.1 // indirect
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/lufia/plan9stats v0.0.0-20240226150601-1dcf7310316a // indirect
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1 // indirect
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
|
||||
@ -332,8 +332,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
|
||||
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
|
||||
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
|
||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1 h1:nSZoftB+vB285AwYAOoJnwxKPMhP7l0p+VurCJGG9ds=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45 h1:TMUhlirMCH2zgJD+qClP5EP0yuFl1VrE4j+0fiRSuJU=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251124145901-0b96e4c8af45/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
|
||||
github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256 h1:M2waty0w2k4YT2HHzJk3fx6EFPD4DKxNJatitIV+gGU=
|
||||
github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256/go.mod h1:HT6Wxahwj/l8+i+D/C3iwDzCjDa36U9gyVw6CjjK4pE=
|
||||
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
|
||||
|
||||
@ -483,7 +483,9 @@ func (s *HelloMilvusSuite) TestSparse_invalid_search_request() {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
searchReq.PlaceholderGroup = plgBs
|
||||
searchReq.SearchInput = &milvuspb.SearchRequest_PlaceholderGroup{
|
||||
PlaceholderGroup: plgBs,
|
||||
}
|
||||
}
|
||||
|
||||
sparseVecs := integration.GenerateSparseFloatArray(nq)
|
||||
|
||||
@ -212,14 +212,16 @@ func constructSearchRequest(
|
||||
}
|
||||
|
||||
return &milvuspb.SearchRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionNames: nil,
|
||||
Dsl: expr,
|
||||
PlaceholderGroup: plgBs,
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: outputFields,
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionNames: nil,
|
||||
Dsl: expr,
|
||||
SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{
|
||||
PlaceholderGroup: plgBs,
|
||||
},
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: outputFields,
|
||||
SearchParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.MetricTypeKey,
|
||||
@ -271,14 +273,16 @@ func ConstructSearchRequestWithConsistencyLevel(
|
||||
}
|
||||
|
||||
return &milvuspb.SearchRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionNames: nil,
|
||||
Dsl: expr,
|
||||
PlaceholderGroup: plgBs,
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: outputFields,
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionNames: nil,
|
||||
Dsl: expr,
|
||||
SearchInput: &milvuspb.SearchRequest_PlaceholderGroup{
|
||||
PlaceholderGroup: plgBs,
|
||||
},
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: outputFields,
|
||||
SearchParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.MetricTypeKey,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user