From b3cff64bffb4b3f3b98715a959c7ec61d48e1c25 Mon Sep 17 00:00:00 2001 From: ThreadDao Date: Thu, 7 Aug 2025 18:55:40 +0800 Subject: [PATCH] test: new search iterator go client cases (#43771) issue: #33419 Signed-off-by: ThreadDao --- tests/go_client/base/milvus_client.go | 5 + tests/go_client/common/consts.go | 1 + tests/go_client/common/response_checker.go | 59 +++ tests/go_client/common/utils.go | 14 +- .../go_client/testcases/helper/read_helper.go | 20 + .../testcases/search_iterator_test.go | 437 ++++++++++++++++++ 6 files changed, 535 insertions(+), 1 deletion(-) create mode 100644 tests/go_client/testcases/search_iterator_test.go diff --git a/tests/go_client/base/milvus_client.go b/tests/go_client/base/milvus_client.go index b300cae16b..e3195e34f6 100644 --- a/tests/go_client/base/milvus_client.go +++ b/tests/go_client/base/milvus_client.go @@ -323,6 +323,11 @@ func (mc *MilvusClient) HybridSearch(ctx context.Context, option client.HybridSe return resultSets, err } +func (mc *MilvusClient) SearchIterator(ctx context.Context, option client.SearchIteratorOption, callOptions ...grpc.CallOption) (client.SearchIterator, error) { + searchIterator, err := mc.mClient.SearchIterator(ctx, option, callOptions...) + return searchIterator, err +} + // ListResourceGroups list all resource groups func (mc *MilvusClient) ListResourceGroups(ctx context.Context, option client.ListResourceGroupsOption, callOptions ...grpc.CallOption) ([]string, error) { resourceGroups, err := mc.mClient.ListResourceGroups(ctx, option, callOptions...) diff --git a/tests/go_client/common/consts.go b/tests/go_client/common/consts.go index 7501363956..89f0172aac 100644 --- a/tests/go_client/common/consts.go +++ b/tests/go_client/common/consts.go @@ -72,6 +72,7 @@ const ( MaxTopK = 16384 MaxVectorFieldNum = 4 MaxShardNum = 16 + DefaultBatchSize = 1000 ) const ( diff --git a/tests/go_client/common/response_checker.go b/tests/go_client/common/response_checker.go index 501ea4cf03..d463ec63c9 100644 --- a/tests/go_client/common/response_checker.go +++ b/tests/go_client/common/response_checker.go @@ -1,7 +1,9 @@ package common import ( + "context" "fmt" + "io" "reflect" "strings" "testing" @@ -182,6 +184,63 @@ func CheckQueryResult(t *testing.T, expColumns []column.Column, actualColumns [] } } +type CheckIteratorOption func(opt *checkIteratorOpt) + +type checkIteratorOpt struct { + expBatchSize []int + expOutputFields []string +} + +func WithExpBatchSize(expBatchSize []int) CheckIteratorOption { + return func(opt *checkIteratorOpt) { + opt.expBatchSize = expBatchSize + } +} + +func WithExpOutputFields(expOutputFields []string) CheckIteratorOption { + return func(opt *checkIteratorOpt) { + opt.expOutputFields = expOutputFields + } +} + +// check queryIterator: result limit, each batch size, output fields +func CheckSearchIteratorResult(ctx context.Context, t *testing.T, itr client.SearchIterator, expLimit int, opts ...CheckIteratorOption) { + opt := &checkIteratorOpt{} + for _, o := range opts { + o(opt) + } + actualLimit := 0 + var actualBatchSize []int + for { + rs, err := itr.Next(ctx) + if err != nil { + if err == io.EOF { + break + } else { + log.Error("SearchIterator next gets error", zap.Error(err)) + break + } + } + + if opt.expBatchSize != nil { + actualBatchSize = append(actualBatchSize, rs.ResultCount) + } + var actualOutputFields []string + if opt.expOutputFields != nil { + for _, column := range rs.Fields { + actualOutputFields = append(actualOutputFields, column.Name()) + } + require.ElementsMatch(t, opt.expOutputFields, actualOutputFields) + } + actualLimit = actualLimit + rs.ResultCount + } + require.Equal(t, expLimit, actualLimit) + if opt.expBatchSize != nil { + log.Debug("SearchIterator result len", zap.Any("result len", actualBatchSize)) + require.True(t, EqualIntSlice(opt.expBatchSize, actualBatchSize)) + } +} + // GenColumnDataOption -- create column data -- type checkIndexOpt struct { state index.IndexState diff --git a/tests/go_client/common/utils.go b/tests/go_client/common/utils.go index 7021c2c554..59668ca1d1 100644 --- a/tests/go_client/common/utils.go +++ b/tests/go_client/common/utils.go @@ -137,7 +137,7 @@ type InvalidExprStruct struct { } var InvalidExpressions = []InvalidExprStruct{ - {Expr: "id in [0]", ErrNil: true, ErrMsg: "fieldName(id) not found"}, // not exist field but no error + {Expr: "id in [0]", ErrNil: true, ErrMsg: "fieldName(id) not found"}, // not exist field but no error, because enable dynamic {Expr: "int64 in not [0]", ErrNil: false, ErrMsg: "cannot parse expression"}, // wrong term expr keyword {Expr: "int64 < floatVec", ErrNil: false, ErrMsg: "not supported"}, // unsupported compare field {Expr: "floatVec in [0]", ErrNil: false, ErrMsg: "cannot be casted to FloatVector"}, // value and field type mismatch @@ -223,3 +223,15 @@ func GenText(lang string) string { func IsZeroValue(value interface{}) bool { return reflect.DeepEqual(value, reflect.Zero(reflect.TypeOf(value)).Interface()) } + +func EqualIntSlice(a []int, b []int) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/tests/go_client/testcases/helper/read_helper.go b/tests/go_client/testcases/helper/read_helper.go index fcd66ca684..b88caffe84 100644 --- a/tests/go_client/testcases/helper/read_helper.go +++ b/tests/go_client/testcases/helper/read_helper.go @@ -2,6 +2,7 @@ package helper import ( "github.com/milvus-io/milvus/client/v2/entity" + "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/tests/go_client/common" ) @@ -102,3 +103,22 @@ func GenFp16OrBf16VectorsFromFloatVector(nq int, dim int, dataType entity.FieldT } return vectors } + +func GenBatchSizes(limit int, batch int) []int { + if batch == 0 { + log.Fatal("Batch should be larger than 0") + } + if limit == 0 { + return []int{} + } + _loop := limit / batch + _last := limit % batch + batchSizes := make([]int, 0, _loop+1) + for i := 0; i < _loop; i++ { + batchSizes = append(batchSizes, batch) + } + if _last > 0 { + batchSizes = append(batchSizes, _last) + } + return batchSizes +} diff --git a/tests/go_client/testcases/search_iterator_test.go b/tests/go_client/testcases/search_iterator_test.go new file mode 100644 index 0000000000..fb207fb581 --- /dev/null +++ b/tests/go_client/testcases/search_iterator_test.go @@ -0,0 +1,437 @@ +package testcases + +import ( + "fmt" + "io" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/client/v2/column" + "github.com/milvus-io/milvus/client/v2/entity" + client "github.com/milvus-io/milvus/client/v2/milvusclient" + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/tests/go_client/common" + hp "github.com/milvus-io/milvus/tests/go_client/testcases/helper" +) + +func TestSearchIteratorDefault(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := hp.CreateDefaultMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption(), + hp.TWithConsistencyLevel(entity.ClStrong)) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption().TWithNb(common.DefaultNb*2)) + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName)) + + vector := entity.FloatVector(common.GenFloatVector(common.DefaultDim)) + + // search iterator default + itr, err := mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector)) + common.CheckErr(t, err, true) + actualLimit := 0 + for { + rs, err := itr.Next(ctx) + if err != nil { + if err == io.EOF { + break + } else { + log.Error("SearchIterator next gets error", zap.Error(err)) + break + } + } + actualLimit = actualLimit + rs.ResultCount + } + require.LessOrEqual(t, actualLimit, common.DefaultNb*2) + + // search iterator with limit + limit := 2000 + itr, err = mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithIteratorLimit(int64(limit))) + common.CheckErr(t, err, true) + common.CheckSearchIteratorResult(ctx, t, itr, limit, common.WithExpBatchSize(hp.GenBatchSizes(limit, common.DefaultBatchSize))) +} + +func TestSearchIteratorGrowing(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := hp.CreateDefaultMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption(), + hp.TWithConsistencyLevel(entity.ClStrong)) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName)) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption().TWithNb(common.DefaultNb*2)) + + // search iterator growing + vector := entity.FloatVector(common.GenFloatVector(common.DefaultDim)) + // wait limit support + limit := 1000 + itr, err := mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithIteratorLimit(int64(limit)).WithBatchSize(100)) + common.CheckErr(t, err, true) + common.CheckSearchIteratorResult(ctx, t, itr, limit, common.WithExpBatchSize(hp.GenBatchSizes(limit, 100))) +} + +func TestSearchIteratorHitEmpty(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := hp.CreateDefaultMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption(), + hp.TWithConsistencyLevel(entity.ClStrong)) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName)) + + // search + vector := entity.FloatVector(common.GenFloatVector(common.DefaultDim)) + itr, err := mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector)) + common.CheckErr(t, err, true) + common.CheckSearchIteratorResult(ctx, t, itr, 0, common.WithExpBatchSize(hp.GenBatchSizes(0, common.DefaultBatchSize))) +} + +func TestSearchIteratorBatchSize(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := hp.CreateDefaultMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption(), + hp.TWithConsistencyLevel(entity.ClStrong)) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption().TWithNb(common.DefaultNb)) + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName)) + + // search iterator with special limit: 0, -1, -2 + vector := entity.FloatVector(common.GenFloatVector(common.DefaultDim)) + itr, err := mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithIteratorLimit(0)) + common.CheckErr(t, err, true) + common.CheckSearchIteratorResult(ctx, t, itr, 0, common.WithExpBatchSize(hp.GenBatchSizes(0, common.DefaultBatchSize))) + + for _, _limit := range []int64{-1, -2} { + itr, err = mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithIteratorLimit(_limit)) + common.CheckErr(t, err, true) + actualLimit := 0 + for { + rs, err := itr.Next(ctx) + if err != nil { + if err == io.EOF { + break + } + log.Error("SearchIterator next gets error", zap.Error(err)) + break + } + actualLimit = actualLimit + rs.ResultCount + require.LessOrEqual(t, rs.ResultCount, common.DefaultBatchSize) + } + require.LessOrEqual(t, actualLimit, common.DefaultNb) + } + + // search iterator + type batchStruct struct { + batch int + expBatchSize []int + } + limit := 201 + batchStructs := []batchStruct{ + {batch: limit / 2, expBatchSize: hp.GenBatchSizes(limit, limit/2)}, + {batch: limit, expBatchSize: hp.GenBatchSizes(limit, limit)}, + {batch: limit + 1, expBatchSize: hp.GenBatchSizes(limit, limit+1)}, + } + + for _, _batchStruct := range batchStructs { + vector := entity.FloatVector(common.GenFloatVector(common.DefaultDim)) + itr, err := mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithIteratorLimit(int64(limit)).WithBatchSize(_batchStruct.batch)) + common.CheckErr(t, err, true) + common.CheckSearchIteratorResult(ctx, t, itr, limit, common.WithExpBatchSize(_batchStruct.expBatchSize)) + } +} + +func TestSearchIteratorOutputAllFields(t *testing.T) { + t.Parallel() + for _, dynamic := range [2]bool{false, true} { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := hp.CreateDefaultMilvusClient(ctx, t) + + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.AllFields), hp.TNewFieldsOption(), + hp.TNewSchemaOption().TWithEnableDynamicField(dynamic), hp.TWithConsistencyLevel(entity.ClStrong)) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption()) + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName)) + + var allFieldsName []string + for _, field := range schema.Fields { + allFieldsName = append(allFieldsName, field.Name) + } + if dynamic { + allFieldsName = append(allFieldsName, common.DefaultDynamicFieldName) + } + vector := entity.FloatVector(common.GenFloatVector(common.DefaultDim)) + itr, err := mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithANNSField(common.DefaultFloatVecFieldName). + WithOutputFields("*").WithIteratorLimit(100).WithBatchSize(12)) + common.CheckErr(t, err, true) + common.CheckSearchIteratorResult(ctx, t, itr, 100, common.WithExpBatchSize(hp.GenBatchSizes(100, 12)), common.WithExpOutputFields(allFieldsName)) + } +} + +func TestQueryIteratorOutputSparseFieldsRows(t *testing.T) { + t.Parallel() + // connect + for _, withRows := range [2]bool{true, false} { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := hp.CreateDefaultMilvusClient(ctx, t) + + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64VarcharSparseVec), hp.TNewFieldsOption(), + hp.TNewSchemaOption().TWithEnableDynamicField(true), hp.TWithConsistencyLevel(entity.ClStrong)) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema).TWithIsRows(withRows), hp.TNewDataOption()) + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName)) + + fieldsName := []string{common.DefaultDynamicFieldName} + for _, field := range schema.Fields { + fieldsName = append(fieldsName, field.Name) + } + + // output * fields + vector := common.GenSparseVector(common.DefaultDim) + itr, err := mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithOutputFields("*").WithIteratorLimit(200).WithBatchSize(120)) + common.CheckErr(t, err, true) + common.CheckSearchIteratorResult(ctx, t, itr, 200, common.WithExpBatchSize(hp.GenBatchSizes(200, 120)), common.WithExpOutputFields(fieldsName)) + } +} + +func TestSearchIteratorInvalid(t *testing.T) { + nb := 201 + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := hp.CreateDefaultMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption(), + hp.TWithConsistencyLevel(entity.ClStrong)) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption().TWithNb(nb)) + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName)) + + // search iterator with not exist collection name + vector := entity.FloatVector(common.GenFloatVector(common.DefaultDim)) + _, err := mc.SearchIterator(ctx, client.NewSearchIteratorOption(common.GenRandomString("c", 5), vector)) + common.CheckErr(t, err, false, "collection not found") + + // search iterator with not exist partition name + _, err = mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithPartitions(common.GenRandomString("p", 5))) + common.CheckErr(t, err, false, "not found") + _, err = mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithPartitions(common.DefaultPartition, common.GenRandomString("p", 5))) + common.CheckErr(t, err, false, "not found") + + // search iterator with not exist vector field name + _, err = mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithANNSField(common.GenRandomString("f", 5))) + common.CheckErr(t, err, false, "failed to get field schema by name") + + // search iterator with count(*) + _, err = mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithOutputFields(common.QueryCountFieldName)) + common.CheckErr(t, err, false, "field count(*) not exist") + + // search iterator with invalid batch size + for _, batch := range []int{-1, 0, -2} { + _, err := mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithBatchSize(batch)) + common.CheckErr(t, err, false, "batch size must be greater than 0") + } + + itr, err2 := mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithBatchSize(common.MaxTopK+1)) + common.CheckErr(t, err2, true) + _, err2 = itr.Next(ctx) + common.CheckErr(t, err2, false, "batch size is invalid, it should be in range [1, 16384]") + + // search iterator with invalid offset + for _, offset := range []int{-2, -1, common.MaxTopK + 1} { + _, err := mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithOffset(offset)) + common.CheckErr(t, err, false, "it should be in range [1, 16384]") + } +} + +func TestSearchIteratorWithInvalidExpr(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := hp.CreateDefaultMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64VecJSON), hp.TNewFieldsOption(), hp.TNewSchemaOption().TWithEnableDynamicField(true), + hp.TWithConsistencyLevel(entity.ClStrong)) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption().TWithNb(common.DefaultNb)) + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName)) + + vector := entity.FloatVector(common.GenFloatVector(common.DefaultDim)) + for _, _invalidExprs := range common.InvalidExpressions { + t.Log(_invalidExprs) + _, err := mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithFilter(_invalidExprs.Expr)) + common.CheckErr(t, err, _invalidExprs.ErrNil, _invalidExprs.ErrMsg, "") + } +} + +func TestSearchIteratorTemplateKey(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := hp.CreateDefaultMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption(), + hp.TWithConsistencyLevel(entity.ClStrong)) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption().TWithNb(common.DefaultNb*2)) + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName)) + + vector := entity.FloatVector(common.GenFloatVector(common.DefaultDim)) + + // search iterator default + value := 2000 + itr, err := mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithIteratorLimit(100).WithBatchSize(10). + WithFilter(fmt.Sprintf("%s < {key}", common.DefaultInt64FieldName)).WithTemplateParam("key", value)) + common.CheckErr(t, err, true) + actualLimit := 0 + for { + rs, err := itr.Next(ctx) + if err != nil { + if err == io.EOF { + break + } + log.Error("SearchIterator next gets error", zap.Error(err)) + break + } + actualLimit = actualLimit + rs.ResultCount + require.Equal(t, 10, rs.ResultCount) + + // check result ids < value + for _, id := range rs.IDs.(*column.ColumnInt64).Data() { + require.Less(t, id, int64(value)) + } + } + require.LessOrEqual(t, actualLimit, 100) +} + +func TestSearchIteratorGroupBy(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := hp.CreateDefaultMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption(), + hp.TWithConsistencyLevel(entity.ClStrong)) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption().TWithNb(common.DefaultNb)) + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName)) + + vector := entity.FloatVector(common.GenFloatVector(common.DefaultDim)) + _, err := mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithGroupByField(common.DefaultInt64FieldName). + WithIteratorLimit(500).WithBatchSize(100)) + common.CheckErr(t, err, false, "Not allowed to do groupBy when doing iteration") +} + +func TestSearchIteratorIgnoreGrowing(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := hp.CreateDefaultMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption(), + hp.TWithConsistencyLevel(entity.ClStrong)) + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption().TWithNb(common.DefaultNb)) + prepare.FlushData(ctx, t, mc, schema.CollectionName) + prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName)) + + // growing pk [DefaultNb, DefaultNb*2] + prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption().TWithNb(common.DefaultNb).TWithStart(common.DefaultNb)) + + // search iterator growing + vector := entity.FloatVector(common.GenFloatVector(common.DefaultDim)) + itr, err := mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithIgnoreGrowing(true).WithIteratorLimit(100).WithBatchSize(10)) + common.CheckErr(t, err, true) + actualLimit := 0 + for { + rs, err := itr.Next(ctx) + if err != nil { + if err == io.EOF { + break + } + log.Error("SearchIterator next gets error", zap.Error(err)) + break + } + actualLimit = actualLimit + rs.ResultCount + for _, id := range rs.IDs.(*column.ColumnInt64).Data() { + require.Less(t, id, int64(common.DefaultNb)) + } + } + require.LessOrEqual(t, actualLimit, 100) +} + +func TestSearchIteratorNull(t *testing.T) { + ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout) + mc := hp.CreateDefaultMilvusClient(ctx, t) + + pkField := entity.NewField().WithName(common.DefaultInt64FieldName).WithDataType(entity.FieldTypeInt64).WithIsPrimaryKey(true) + vecField := entity.NewField().WithName(common.DefaultFloatVecFieldName).WithDataType(entity.FieldTypeFloatVector).WithDim(common.DefaultDim) + int32NullField := entity.NewField().WithName(common.DefaultInt32FieldName).WithDataType(entity.FieldTypeInt32).WithNullable(true) + schema := entity.NewSchema().WithName(common.GenRandomString("null_int32", 10)).WithField(pkField).WithField(vecField).WithField(int32NullField) + errCreate := mc.CreateCollection(ctx, client.NewCreateCollectionOption(schema.CollectionName, schema).WithConsistencyLevel(entity.ClStrong)) + common.CheckErr(t, errCreate, true) + + prepare := hp.CollPrepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema)) + prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName)) + + // Generate test data with boundary values + nb := common.DefaultNb * 3 + pkColumn := hp.GenColumnData(nb, entity.FieldTypeInt64, *hp.TNewDataOption()) + vecColumn := hp.GenColumnData(nb, entity.FieldTypeFloatVector, *hp.TNewDataOption()) + int32Values := make([]int32, 0, nb) + validData := make([]bool, 0, nb) + + // Generate JSON documents + for i := 0; i < nb; i++ { + _mod := i % 2 + if _mod == 0 { + validData = append(validData, false) + } else { + int32Values = append(int32Values, int32(i)) + validData = append(validData, true) + } + } + nullColumn, err := column.NewNullableColumnInt32(common.DefaultInt32FieldName, int32Values, validData) + common.CheckErr(t, err, true) + _, err = mc.Insert(ctx, client.NewColumnBasedInsertOption(schema.CollectionName, pkColumn, vecColumn, nullColumn)) + common.CheckErr(t, err, true) + + // search iterator with null expr + expr := fmt.Sprintf("%s is null", common.DefaultInt32FieldName) + vector := entity.FloatVector(common.GenFloatVector(common.DefaultDim)) + itr, err2 := mc.SearchIterator(ctx, client.NewSearchIteratorOption(schema.CollectionName, vector).WithFilter(expr).WithIteratorLimit(100).WithBatchSize(10).WithOutputFields(common.DefaultInt32FieldName)) + common.CheckErr(t, err2, true) + actualLimit := 0 + for { + rs, err := itr.Next(ctx) + if err != nil { + if err == io.EOF { + break + } + log.Error("SearchIterator next gets error", zap.Error(err)) + break + } + actualLimit = actualLimit + rs.ResultCount + require.Equal(t, 10, rs.ResultCount) + for _, field := range rs.Fields { + if field.Name() == common.DefaultInt32FieldName { + for i := 0; i < field.Len(); i++ { + isNull, err := field.IsNull(i) + common.CheckErr(t, err, true) + require.True(t, isNull) + } + } + } + } + require.LessOrEqual(t, actualLimit, 100) +}