milvus/tests/integration/util_query.go
Spade A d6a428e880
feat: impl StructArray -- support create index for vector array (embedding list) and search on it (#43726)
Ref https://github.com/milvus-io/milvus/issues/42148

This PR supports create index for vector array (now, only for
`DataType.FLOAT_VECTOR`) and search on it.
The index type supported in this PR is `EMB_LIST_HNSW` and the metric
type is `MAX_SIM` only.

The way to use it:
```python
milvus_client = MilvusClient("xxx:19530")
schema = milvus_client.create_schema(enable_dynamic_field=True, auto_id=True)
...
struct_schema = milvus_client.create_struct_array_field_schema("struct_array_field")
...
struct_schema.add_field("struct_float_vec", DataType.ARRAY_OF_VECTOR, element_type=DataType.FLOAT_VECTOR, dim=128, max_capacity=1000)
...
schema.add_struct_array_field(struct_schema)
index_params = milvus_client.prepare_index_params()
index_params.add_index(field_name="struct_float_vec", index_type="EMB_LIST_HNSW", metric_type="MAX_SIM", index_params={"nlist": 128})
...
milvus_client.create_index(COLLECTION_NAME, schema=schema, index_params=index_params)
```

Note: This PR uses `Lims` to convey offsets of the vector array to
knowhere where vectors of multiple vector arrays are concatenated and we
need offsets to specify which vectors belong to which vector array.

---------

Signed-off-by: SpadeA <tangchenjie1210@gmail.com>
Signed-off-by: SpadeA-Tang <tangchenjie1210@gmail.com>
2025-08-20 10:27:46 +08:00

386 lines
11 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"math/rand"
"strconv"
"time"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proxy"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/v2/util/testutils"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
const (
AnnsFieldKey = "anns_field"
TopKKey = "topk"
NQKey = "nq"
MetricTypeKey = common.MetricTypeKey
RoundDecimalKey = "round_decimal"
OffsetKey = "offset"
LimitKey = "limit"
)
func (s *MiniClusterSuite) WaitForLoadWithDB(ctx context.Context, dbName, collection string) {
s.waitForLoadInternal(ctx, dbName, collection)
}
func (s *MiniClusterSuite) WaitForLoad(ctx context.Context, collection string) {
s.waitForLoadInternal(ctx, "", collection)
}
func (s *MiniClusterSuite) WaitForSortedSegmentLoaded(ctx context.Context, dbName, collection string) {
cluster := s.Cluster
getSegmentsSorted := func() bool {
querySegmentInfo, err := cluster.MilvusClient.GetQuerySegmentInfo(ctx, &milvuspb.GetQuerySegmentInfoRequest{
DbName: dbName,
CollectionName: collection,
})
if err != nil {
panic("GetQuerySegmentInfo fail")
}
for _, info := range querySegmentInfo.GetInfos() {
if !info.GetIsSorted() {
return false
}
}
return true
}
for !getSegmentsSorted() {
select {
case <-ctx.Done():
s.FailNow("failed to wait for get segments sorted")
return
default:
time.Sleep(500 * time.Millisecond)
}
}
}
func (s *MiniClusterSuite) waitForLoadInternal(ctx context.Context, dbName, collection string) {
cluster := s.Cluster
getLoadingProgress := func() *milvuspb.GetLoadingProgressResponse {
loadProgress, err := cluster.MilvusClient.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{
DbName: dbName,
CollectionName: collection,
})
if err != nil {
panic("GetLoadingProgress fail")
}
return loadProgress
}
for getLoadingProgress().GetProgress() != 100 {
select {
case <-ctx.Done():
s.FailNow("failed to wait for load")
return
default:
time.Sleep(500 * time.Millisecond)
}
}
}
func (s *MiniClusterSuite) WaitForLoadRefresh(ctx context.Context, dbName, collection string) {
cluster := s.Cluster
getLoadingProgress := func() *milvuspb.GetLoadingProgressResponse {
loadProgress, err := cluster.MilvusClient.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{
DbName: dbName,
CollectionName: collection,
})
if err != nil {
panic("GetLoadingProgress fail")
}
return loadProgress
}
for getLoadingProgress().GetRefreshProgress() != 100 {
select {
case <-ctx.Done():
s.FailNow("failed to wait for load (refresh)")
return
default:
time.Sleep(500 * time.Millisecond)
}
}
}
// CheckCollectionCacheReleased checks if the collection cache was released from querynodes.
func (s *MiniClusterSuite) CheckCollectionCacheReleased(collectionID int64) {
for _, qn2 := range s.Cluster.GetAllQueryNodes() {
s.Eventually(func() bool {
qn := qn2.MustGetClient(context.Background())
state, err := qn.GetComponentStates(context.Background(), &milvuspb.GetComponentStatesRequest{})
s.NoError(err)
if state.GetState().GetStateCode() != commonpb.StateCode_Healthy {
// skip checking stopping/stopped node
return true
}
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
s.NoError(err)
resp, err := qn.GetMetrics(context.Background(), req)
err = merr.CheckRPCCall(resp.GetStatus(), err)
s.NoError(err)
infos := metricsinfo.QueryNodeInfos{}
err = metricsinfo.UnmarshalComponentInfos(resp.Response, &infos)
s.NoError(err)
for _, id := range infos.QuotaMetrics.Effect.CollectionIDs {
if id == collectionID {
s.T().Logf("collection %d was not released in querynode %d", collectionID, qn2.GetNodeID())
return false
}
}
s.T().Logf("collection %d has been released from querynode %d", collectionID, qn2.GetNodeID())
return true
}, 3*time.Minute, 200*time.Millisecond)
}
}
func ConstructSearchRequest(
dbName, collectionName string,
expr string,
vecField string,
vectorType schemapb.DataType,
outputFields []string,
metricType string,
params map[string]any,
nq, dim int, topk, roundDecimal int,
) *milvuspb.SearchRequest {
return constructSearchRequest(dbName, collectionName, expr, vecField, false, vectorType, outputFields, metricType, params, nq, dim, topk, roundDecimal)
}
func ConstructEmbeddingListSearchRequest(
dbName, collectionName string,
expr string,
vecField string,
vectorType schemapb.DataType,
outputFields []string,
metricType string,
params map[string]any,
nq, dim int, topk, roundDecimal int,
) *milvuspb.SearchRequest {
return constructSearchRequest(dbName, collectionName, expr, vecField, true, vectorType, outputFields, metricType, params, nq, dim, topk, roundDecimal)
}
func constructSearchRequest(
dbName, collectionName string,
expr string,
vecField string,
isEmbeddingList bool,
vectorType schemapb.DataType,
outputFields []string,
metricType string,
params map[string]any,
nq, dim int, topk, roundDecimal int,
) *milvuspb.SearchRequest {
b, err := json.Marshal(params)
if err != nil {
panic(err)
}
plg := constructPlaceholderGroup(nq, dim, vectorType, isEmbeddingList)
plgBs, err := proto.Marshal(plg)
if err != nil {
panic(err)
}
return &milvuspb.SearchRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionNames: nil,
Dsl: expr,
PlaceholderGroup: plgBs,
DslType: commonpb.DslType_BoolExprV1,
OutputFields: outputFields,
SearchParams: []*commonpb.KeyValuePair{
{
Key: common.MetricTypeKey,
Value: metricType,
},
{
Key: proxy.ParamsKey,
Value: string(b),
},
{
Key: AnnsFieldKey,
Value: vecField,
},
{
Key: common.TopKKey,
Value: strconv.Itoa(topk),
},
{
Key: RoundDecimalKey,
Value: strconv.Itoa(roundDecimal),
},
},
TravelTimestamp: 0,
GuaranteeTimestamp: 0,
Nq: int64(nq),
}
}
func ConstructSearchRequestWithConsistencyLevel(
dbName, collectionName string,
expr string,
vecField string,
vectorType schemapb.DataType,
outputFields []string,
metricType string,
params map[string]any,
nq, dim int, topk, roundDecimal int,
useDefaultConsistency bool,
consistencyLevel commonpb.ConsistencyLevel,
) *milvuspb.SearchRequest {
b, err := json.Marshal(params)
if err != nil {
panic(err)
}
plg := constructPlaceholderGroup(nq, dim, vectorType, false)
plgBs, err := proto.Marshal(plg)
if err != nil {
panic(err)
}
return &milvuspb.SearchRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionNames: nil,
Dsl: expr,
PlaceholderGroup: plgBs,
DslType: commonpb.DslType_BoolExprV1,
OutputFields: outputFields,
SearchParams: []*commonpb.KeyValuePair{
{
Key: common.MetricTypeKey,
Value: metricType,
},
{
Key: proxy.ParamsKey,
Value: string(b),
},
{
Key: AnnsFieldKey,
Value: vecField,
},
{
Key: common.TopKKey,
Value: strconv.Itoa(topk),
},
{
Key: RoundDecimalKey,
Value: strconv.Itoa(roundDecimal),
},
},
TravelTimestamp: 0,
GuaranteeTimestamp: 0,
UseDefaultConsistency: useDefaultConsistency,
ConsistencyLevel: consistencyLevel,
}
}
func constructPlaceholderGroup(nq, dim int, vectorType schemapb.DataType, isEmbeddingList bool) *commonpb.PlaceholderGroup {
values := make([][]byte, 0, nq)
var placeholderType commonpb.PlaceholderType
switch vectorType {
case schemapb.DataType_FloatVector:
if !isEmbeddingList {
placeholderType = commonpb.PlaceholderType_FloatVector
} else {
placeholderType = commonpb.PlaceholderType_EmbListFloatVector
}
for i := 0; i < nq; i++ {
bs := make([]byte, 0, dim*4)
for j := 0; j < dim; j++ {
var buffer bytes.Buffer
f := rand.Float32()
err := binary.Write(&buffer, common.Endian, f)
if err != nil {
panic(err)
}
bs = append(bs, buffer.Bytes()...)
}
values = append(values, bs)
}
case schemapb.DataType_BinaryVector:
placeholderType = commonpb.PlaceholderType_BinaryVector
for i := 0; i < nq; i++ {
total := dim / 8
ret := make([]byte, total)
_, err := rand.Read(ret)
if err != nil {
panic(err)
}
values = append(values, ret)
}
case schemapb.DataType_Float16Vector:
placeholderType = commonpb.PlaceholderType_Float16Vector
data := testutils.GenerateFloat16Vectors(nq, dim)
for i := 0; i < nq; i++ {
rowBytes := dim * 2
values = append(values, data[rowBytes*i:rowBytes*(i+1)])
}
case schemapb.DataType_BFloat16Vector:
placeholderType = commonpb.PlaceholderType_BFloat16Vector
data := testutils.GenerateBFloat16Vectors(nq, dim)
for i := 0; i < nq; i++ {
rowBytes := dim * 2
values = append(values, data[rowBytes*i:rowBytes*(i+1)])
}
case schemapb.DataType_SparseFloatVector:
// for sparse, all query rows are encoded in a single byte array
values = make([][]byte, 0, 1)
placeholderType = commonpb.PlaceholderType_SparseFloatVector
sparseVecs := GenerateSparseFloatArray(nq)
values = append(values, sparseVecs.Contents...)
case schemapb.DataType_Int8Vector:
placeholderType = commonpb.PlaceholderType_Int8Vector
data := testutils.GenerateInt8Vectors(nq, dim)
for i := 0; i < nq; i++ {
rowBytes := dim
values = append(values, typeutil.Int8ArrayToBytes(data[rowBytes*i:rowBytes*(i+1)]))
}
default:
panic("invalid vector data type")
}
return &commonpb.PlaceholderGroup{
Placeholders: []*commonpb.PlaceholderValue{
{
Tag: "$0",
Type: placeholderType,
Values: values,
},
},
}
}