Compare commits

...

11 Commits

Author SHA1 Message Date
wei liu
0c63ed95bb
test: [skip e2e] fix unstable assignment tests in balancer (#46042)
issue: #46038

- Add assertSegmentPlanNumAndTargetNodeMatch and
assertChannelPlanNumAndTargetNodeMatch helper functions to validate plan
count and target node membership for unstable assignment tests
- Mark "test assigning channels with resource exhausted nodes" as
unstable since node 2 and 3 have equal priority after filtering
- Replace simple length check with target node validation to ensure
plans assign to expected node set even when order is non-deterministic

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
2025-12-04 16:17:11 +08:00
XuanYang-cn
64d19fb4f3
fix: Set plugin context when cipher enabled (#46050)
See also: #46008

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
2025-12-04 15:17:15 +08:00
Zhen Ye
bf76a9e8e2
fix: milvus fast fail if any component is not ready (#46069)
issue: #45243

Signed-off-by: chyezh <chyezh@outlook.com>
2025-12-04 15:15:10 +08:00
cai.zhang
cfd49b7680
enhance: Estimate the taskSlot based on whether scalar or vector index (#45850)
issue: #45186

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
2025-12-04 14:15:10 +08:00
sre-ci-robot
605816949f
[automated] Bump milvus version to v2.6.7 (#46068)
Bump milvus version to v2.6.7
Signed-off-by: sre-ci-robot sre-ci-robot@users.noreply.github.com

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2025-12-04 11:09:12 +08:00
congqixia
a659506e82
enhance: Bump go version to 1.24.11 for builder image (#46049)
Follow-up of #46034

Bump go version in builder image fixing CVE-2025-61729

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
2025-12-04 11:07:11 +08:00
Ted Xu
20ce9fdc23
feat: bump loon version (#46029)
See: #44956

This PR upgrades loon to the latest version and resolves building
conflicts.

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
Co-authored-by: Congqi Xia <congqi.xia@zilliz.com>
2025-12-04 10:57:12 +08:00
aoiasd
8efe9ccac6
feat: Add support for using highlight without returning the field as the output field. (#45984)
relate: https://github.com/milvus-io/milvus/issues/42589

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
2025-12-04 10:35:11 +08:00
nico
43fe215787
test: update sdk version and skip some debug log (#46040)
Signed-off-by: nico <cheng.yuan@zilliz.com>
2025-12-04 10:33:11 +08:00
wei liu
f85e86a6ec
fix: change upsert duplicate PK behavior from dedup to error (#45997)
issue: #44320

Replace the DeduplicateFieldData function with CheckDuplicatePkExist
that returns an error when duplicate primary keys are detected in the
same batch, instead of silently deduplicating.

Changes:
- Replace DeduplicateFieldData with CheckDuplicatePkExist in util.go
- Update upsertTask.PreExecute to return error on duplicate PKs
- Simplify helper function from findLastOccurrenceIndices to
hasDuplicates
- Update unit tests to verify the new error behavior
- Add Python integration tests for duplicate PK error cases

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
2025-12-04 10:23:11 +08:00
wei liu
a308331b81
fix: Set replica field in balance plans to prevent panic (#45722)
issue: #45598

The MultiTargetBalancer was missing replica field assignment in the
generated segment and channel plans, which caused panic during balance
operations. This change ensures that all balance plans have the replica
field properly set to fix the panic issue.

Also refactored the balance test to extract common test logic into a
reusable helper function and added a new integration test specifically
for MultipleTargetBalancer policy.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
2025-12-04 10:19:11 +08:00
49 changed files with 771 additions and 1007 deletions

8
.env
View File

@ -5,11 +5,11 @@ IMAGE_ARCH=amd64
OS_NAME=ubuntu22.04
# for services.builder.image in docker-compose.yml
DATE_VERSION=20251107-4a6e8d8
LATEST_DATE_VERSION=20251107-4a6e8d8
DATE_VERSION=20251203-0171511
LATEST_DATE_VERSION=20251203-0171511
# for services.gpubuilder.image in docker-compose.yml
GPU_DATE_VERSION=20251107-4a6e8d8
LATEST_GPU_DATE_VERSION=20251107-4a6e8d8
GPU_DATE_VERSION=20251203-0171511
LATEST_GPU_DATE_VERSION=20251203-0171511
# for other services in docker-compose.yml
MINIO_ADDRESS=minio:9000

View File

@ -108,7 +108,6 @@ func cleanLocalDir(path string) {
func runComponent[T component](ctx context.Context,
localMsg bool,
runWg *sync.WaitGroup,
creator func(context.Context, dependency.Factory) (T, error),
metricRegister func(*prometheus.Registry),
) *conc.Future[component] {
@ -184,15 +183,15 @@ func (mr *MilvusRoles) printLDPreLoad() {
}
}
func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
return runComponent(ctx, localMsg, wg, components.NewProxy, metrics.RegisterProxy)
func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool) *conc.Future[component] {
return runComponent(ctx, localMsg, components.NewProxy, metrics.RegisterProxy)
}
func (mr *MilvusRoles) runMixCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
return runComponent(ctx, localMsg, wg, components.NewMixCoord, metrics.RegisterMixCoord)
func (mr *MilvusRoles) runMixCoord(ctx context.Context, localMsg bool) *conc.Future[component] {
return runComponent(ctx, localMsg, components.NewMixCoord, metrics.RegisterMixCoord)
}
func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool) *conc.Future[component] {
// clear local storage
queryDataLocalPath := pathutil.GetPath(pathutil.RootCachePath, 0)
if !paramtable.Get().CommonCfg.EnablePosixMode.GetAsBool() {
@ -200,21 +199,24 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync
// under posix mode, this clean task will be done by mixcoord
cleanLocalDir(queryDataLocalPath)
}
return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode)
return runComponent(ctx, localMsg, components.NewQueryNode, metrics.RegisterQueryNode)
}
func (mr *MilvusRoles) runStreamingNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
return runComponent(ctx, localMsg, wg, components.NewStreamingNode, metrics.RegisterStreamingNode)
func (mr *MilvusRoles) runStreamingNode(ctx context.Context, localMsg bool) *conc.Future[component] {
return runComponent(ctx, localMsg, components.NewStreamingNode, metrics.RegisterStreamingNode)
}
func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
return runComponent(ctx, localMsg, wg, components.NewDataNode, metrics.RegisterDataNode)
func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool) *conc.Future[component] {
return runComponent(ctx, localMsg, components.NewDataNode, metrics.RegisterDataNode)
}
func (mr *MilvusRoles) runCDC(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
return runComponent(ctx, localMsg, wg, components.NewCDC, metrics.RegisterCDC)
func (mr *MilvusRoles) runCDC(ctx context.Context, localMsg bool) *conc.Future[component] {
return runComponent(ctx, localMsg, components.NewCDC, metrics.RegisterCDC)
}
// waitForAllComponentsReady waits for all components to be ready.
// It will return an error if any component is not ready before closing with a fast fail strategy.
// It will return a map of components that are ready.
func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, componentFutureMap map[string]*conc.Future[component]) (map[string]component, error) {
roles := make([]string, 0, len(componentFutureMap))
futures := make([]*conc.Future[component], 0, len(componentFutureMap))
@ -235,22 +237,20 @@ func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, comp
}
componentMap := make(map[string]component, len(componentFutureMap))
readyCount := 0
var gerr error
for {
index, _, _ := reflect.Select(selectCases)
if index == 0 {
cancel()
log.Warn("components are not ready before closing, wait for the start of components to be canceled...")
return nil, context.Canceled
} else {
role := roles[index-1]
component, err := futures[index-1].Await()
readyCount++
if err != nil {
if gerr == nil {
gerr = errors.Wrapf(err, "component %s is not ready before closing", role)
cancel()
}
cancel()
log.Warn("component is not ready before closing", zap.String("role", role), zap.Error(err))
return nil, err
} else {
componentMap[role] = component
log.Info("component is ready", zap.String("role", role))
@ -264,9 +264,6 @@ func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, comp
break
}
}
if gerr != nil {
return nil, errors.Wrap(gerr, "failed to wait for all components ready")
}
return componentMap, nil
}
@ -468,45 +465,43 @@ func (mr *MilvusRoles) Run() {
defer streaming.Release()
}
var wg sync.WaitGroup
local := mr.Local
componentFutureMap := make(map[string]*conc.Future[component])
if (mr.EnableRootCoord && mr.EnableDataCoord && mr.EnableQueryCoord) || mr.EnableMixCoord {
paramtable.SetLocalComponentEnabled(typeutil.MixCoordRole)
mixCoord := mr.runMixCoord(ctx, local, &wg)
mixCoord := mr.runMixCoord(ctx, local)
componentFutureMap[typeutil.MixCoordRole] = mixCoord
}
if mr.EnableQueryNode {
paramtable.SetLocalComponentEnabled(typeutil.QueryNodeRole)
queryNode := mr.runQueryNode(ctx, local, &wg)
queryNode := mr.runQueryNode(ctx, local)
componentFutureMap[typeutil.QueryNodeRole] = queryNode
}
if mr.EnableDataNode {
paramtable.SetLocalComponentEnabled(typeutil.DataNodeRole)
dataNode := mr.runDataNode(ctx, local, &wg)
dataNode := mr.runDataNode(ctx, local)
componentFutureMap[typeutil.DataNodeRole] = dataNode
}
if mr.EnableProxy {
paramtable.SetLocalComponentEnabled(typeutil.ProxyRole)
proxy := mr.runProxy(ctx, local, &wg)
proxy := mr.runProxy(ctx, local)
componentFutureMap[typeutil.ProxyRole] = proxy
}
if mr.EnableStreamingNode {
// Before initializing the local streaming node, make sure the local registry is ready.
paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole)
streamingNode := mr.runStreamingNode(ctx, local, &wg)
streamingNode := mr.runStreamingNode(ctx, local)
componentFutureMap[typeutil.StreamingNodeRole] = streamingNode
}
if mr.EnableCDC {
paramtable.SetLocalComponentEnabled(typeutil.CDCRole)
cdc := mr.runCDC(ctx, local, &wg)
cdc := mr.runCDC(ctx, local)
componentFutureMap[typeutil.CDCRole] = cdc
}

View File

@ -735,6 +735,7 @@ dataCoord:
mixCompactionUsage: 4 # slot usage of mix compaction task.
l0DeleteCompactionUsage: 8 # slot usage of l0 compaction task.
indexTaskSlotUsage: 64 # slot usage of index task per 512mb
scalarIndexTaskSlotUsage: 16 # slot usage of scalar index task per 512mb
statsTaskSlotUsage: 8 # slot usage of stats task per 512mb
analyzeTaskSlotUsage: 65535 # slot usage of analyze task
jsonShreddingTriggerCount: 10 # jsonkey stats task count per trigger

View File

@ -46,6 +46,7 @@ class MilvusConan(ConanFile):
"mongo-cxx-driver/3.11.0#ae206de0e90fb8cb2fb95465fb8b2f01",
"geos/3.12.0#0b177c90c25a8ca210578fb9e2899c37",
"icu/74.2#cd1937b9561b8950a2ae6311284c5813",
"libavrocpp/1.12.1@milvus/dev",
)
generators = ("cmake", "cmake_find_package")

View File

@ -106,13 +106,16 @@ class TestVectorArrayStorageV2 : public testing::Test {
auto storage_config = milvus_storage::StorageConfig();
// Create writer
milvus_storage::PackedRecordBatchWriter writer(
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs,
paths,
schema_->ConvertToArrowSchema(),
storage_config,
column_groups,
writer_memory);
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
// Generate and write data
int64_t row_count = 0;
@ -201,9 +204,9 @@ class TestVectorArrayStorageV2 : public testing::Test {
auto record_batch = arrow::RecordBatch::Make(
schema_->ConvertToArrowSchema(), test_data_count_, arrays);
row_count += test_data_count_;
EXPECT_TRUE(writer.Write(record_batch).ok());
EXPECT_TRUE(writer->Write(record_batch).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
LoadFieldDataInfo load_info;
load_info.field_infos.emplace(

View File

@ -716,8 +716,11 @@ JsonKeyStats::GetColumnSchemaFromParquet(int64_t column_group_id,
const std::string& file) {
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
auto file_reader =
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
auto result = milvus_storage::FileRowGroupReader::Make(fs, file);
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto file_reader = result.ValueOrDie();
std::shared_ptr<arrow::Schema> file_schema = file_reader->schema();
LOG_DEBUG("get column schema: [{}] for segment {}",
file_schema->ToString(true),
@ -778,9 +781,11 @@ JsonKeyStats::GetCommonMetaFromParquet(const std::string& file) {
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
auto file_reader =
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
auto result = milvus_storage::FileRowGroupReader::Make(fs, file);
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto file_reader = result.ValueOrDie();
// get key value metadata from parquet file
std::shared_ptr<milvus_storage::PackedFileMetadata> metadata =
file_reader->file_metadata();
@ -874,8 +879,11 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id,
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
auto file_reader =
std::make_shared<milvus_storage::FileRowGroupReader>(fs, files[0]);
auto result = milvus_storage::FileRowGroupReader::Make(fs, files[0]);
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto file_reader = result.ValueOrDie();
std::shared_ptr<milvus_storage::PackedFileMetadata> metadata =
file_reader->file_metadata();
milvus_storage::FieldIDList field_id_list =
@ -886,8 +894,11 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id,
}
for (const auto& file : files) {
auto reader =
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
auto result = milvus_storage::FileRowGroupReader::Make(fs, file);
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto reader = result.ValueOrDie();
auto row_group_meta_vector =
reader->file_metadata()->GetRowGroupMetadataVector();
num_rows += row_group_meta_vector.row_num();

View File

@ -108,16 +108,19 @@ JsonStatsParquetWriter::Init(const ParquetWriteContext& context) {
schema_ = context.schema;
builders_ = context.builders;
builders_map_ = context.builders_map;
kv_metadata_ = std::move(context.kv_metadata);
kv_metadata_ = context.kv_metadata;
column_groups_ = context.column_groups;
file_paths_ = context.file_paths;
packed_writer_ = std::make_unique<milvus_storage::PackedRecordBatchWriter>(
fs_,
file_paths_,
schema_,
storage_config_,
column_groups_,
buffer_size_);
auto result = milvus_storage::PackedRecordBatchWriter::Make(fs_,
file_paths_,
schema_,
storage_config_,
column_groups_,
buffer_size_);
AssertInfo(result.ok(),
"[StorageV2] Failed to create packed writer: " +
result.status().ToString());
packed_writer_ = result.ValueOrDie();
for (const auto& [key, value] : kv_metadata_) {
packed_writer_->AddUserMetadata(key, value);
}

View File

@ -232,7 +232,7 @@ class JsonStatsParquetWriter {
size_t batch_size_;
std::shared_ptr<arrow::fs::FileSystem> fs_;
milvus_storage::StorageConfig storage_config_;
std::unique_ptr<milvus_storage::PackedRecordBatchWriter> packed_writer_;
std::shared_ptr<milvus_storage::PackedRecordBatchWriter> packed_writer_;
std::vector<std::pair<std::string, std::string>> kv_metadata_;
// cache for builders

View File

@ -374,6 +374,13 @@ BuildJsonKeyIndex(ProtoLayoutInterface result,
build_index_info->storage_plugin_context().encryption_zone_id(),
build_index_info->storage_plugin_context().collection_id(),
build_index_info->storage_plugin_context().encryption_key());
auto plugin_context = std::make_shared<CPluginContext>();
plugin_context->ez_id =
build_index_info->storage_plugin_context().encryption_zone_id();
plugin_context->collection_id =
build_index_info->storage_plugin_context().collection_id();
fileManagerContext.set_plugin_context(plugin_context);
}
auto field_schema =
@ -465,6 +472,12 @@ BuildTextIndex(ProtoLayoutInterface result,
build_index_info->storage_plugin_context().encryption_zone_id(),
build_index_info->storage_plugin_context().collection_id(),
build_index_info->storage_plugin_context().encryption_key());
auto plugin_context = std::make_shared<CPluginContext>();
plugin_context->ez_id =
build_index_info->storage_plugin_context().encryption_zone_id();
plugin_context->collection_id =
build_index_info->storage_plugin_context().collection_id();
fileManagerContext.set_plugin_context(plugin_context);
}
auto scalar_index_engine_version =

View File

@ -97,13 +97,16 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam<bool> {
auto storage_config = milvus_storage::StorageConfig();
// Create writer
milvus_storage::PackedRecordBatchWriter writer(
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs,
paths,
schema->ConvertToArrowSchema(),
storage_config,
column_groups,
writer_memory);
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
// Generate and write data
int64_t row_count = 0;
@ -159,9 +162,9 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam<bool> {
auto record_batch = arrow::RecordBatch::Make(
schema->ConvertToArrowSchema(), test_data_count, arrays);
row_count += test_data_count;
EXPECT_TRUE(writer.Write(record_batch).ok());
EXPECT_TRUE(writer->Write(record_batch).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
LoadFieldDataInfo load_info;
load_info.field_infos.emplace(

View File

@ -440,11 +440,15 @@ SegmentGrowingImpl::load_column_group_data_internal(
std::vector<std::vector<int64_t>> row_group_lists;
row_group_lists.reserve(insert_files.size());
for (const auto& file : insert_files) {
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
auto result = milvus_storage::FileRowGroupReader::Make(
fs,
file,
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
storage::GetReaderProperties());
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto reader = result.ValueOrDie();
auto row_group_num =
reader->file_metadata()->GetRowGroupMetadataVector().size();
std::vector<int64_t> all_row_groups(row_group_num);

View File

@ -143,12 +143,20 @@ TEST_F(TestGrowingStorageV2, LoadFieldData) {
auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}};
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
milvus_storage::PackedRecordBatchWriter writer(
fs_, paths, schema_, storage_config, column_groups, writer_memory);
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_,
paths,
schema_,
storage_config,
column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
EXPECT_TRUE(writer->Write(record_batch_).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
auto schema = std::make_shared<milvus::Schema>();
auto ts_fid = schema->AddDebugField("ts", milvus::DataType::INT64, true);
@ -187,20 +195,32 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) {
auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}};
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
milvus_storage::PackedRecordBatchWriter writer(
fs_, paths, schema_, storage_config, column_groups, writer_memory);
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_,
paths,
schema_,
storage_config,
column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
EXPECT_TRUE(writer->Write(record_batch_).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
auto channel = std::make_shared<milvus::ArrowReaderChannel>();
int64_t memory_limit = 1024 * 1024 * 1024; // 1GB
uint64_t parallel_degree = 2;
// read all row groups
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(
fs_, paths[0], schema_);
auto reader_result =
milvus_storage::FileRowGroupReader::Make(fs_, paths[0]);
AssertInfo(reader_result.ok(),
"[StorageV2] Failed to create file row group reader: " +
reader_result.status().ToString());
auto fr = reader_result.ValueOrDie();
auto row_group_metadata = fr->file_metadata()->GetRowGroupMetadataVector();
auto status = fr->Close();
AssertInfo(
@ -349,8 +369,16 @@ TEST_F(TestGrowingStorageV2, TestAllDataTypes) {
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
auto arrow_schema = schema->ConvertToArrowSchema();
milvus_storage::PackedRecordBatchWriter writer(
fs_, paths, arrow_schema, storage_config, column_groups, writer_memory);
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_,
paths,
arrow_schema,
storage_config,
column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
int64_t total_rows = 0;
for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema, per_batch);
@ -358,9 +386,9 @@ TEST_F(TestGrowingStorageV2, TestAllDataTypes) {
ConvertToArrowRecordBatch(dataset, dim, arrow_schema);
total_rows += record_batch->num_rows();
EXPECT_TRUE(writer.Write(record_batch).ok());
EXPECT_TRUE(writer->Write(record_batch).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
// Load data back from storage v2
LoadFieldDataInfo load_info;

View File

@ -183,55 +183,54 @@ LoadWithStrategy(const std::vector<std::string>& remote_files,
memory_limit / blocks.size(), FILE_SLICE_SIZE.load());
for (const auto& block : blocks) {
futures.emplace_back(pool.Submit([block,
fs,
file,
file_idx,
schema,
reader_memory_limit]() {
AssertInfo(fs != nullptr,
"[StorageV2] file system is nullptr");
auto row_group_reader =
std::make_shared<milvus_storage::FileRowGroupReader>(
futures.emplace_back(pool.Submit(
[block, fs, file, file_idx, schema, reader_memory_limit]() {
AssertInfo(fs != nullptr,
"[StorageV2] file system is nullptr");
auto result = milvus_storage::FileRowGroupReader::Make(
fs,
file,
schema,
reader_memory_limit,
milvus::storage::GetReaderProperties());
AssertInfo(row_group_reader != nullptr,
"[StorageV2] row group reader is nullptr");
row_group_reader->SetRowGroupOffsetAndCount(block.offset,
block.count);
LOG_INFO(
"[StorageV2] read row groups from file {} with offset "
"{} and count "
"{}",
file,
block.offset,
block.count);
auto ret = std::make_shared<ArrowDataWrapper>();
for (int64_t i = 0; i < block.count; ++i) {
std::shared_ptr<arrow::Table> table;
AssertInfo(
result.ok(),
"[StorageV2] Failed to create row group reader: " +
result.status().ToString());
auto row_group_reader = result.ValueOrDie();
auto status =
row_group_reader->ReadNextRowGroup(&table);
row_group_reader->SetRowGroupOffsetAndCount(
block.offset, block.count);
AssertInfo(status.ok(),
"[StorageV2] Failed to read row group " +
std::to_string(block.offset + i) +
" from file " + file + " with error " +
status.ToString());
ret->arrow_tables.push_back(
{file_idx,
static_cast<size_t>(block.offset + i),
table});
}
auto close_status = row_group_reader->Close();
AssertInfo(close_status.ok(),
"[StorageV2] Failed to close row group reader "
"for file " +
file + " with error " +
close_status.ToString());
return ret;
}));
"[StorageV2] Failed to set row group offset "
"and count " +
std::to_string(block.offset) + " and " +
std::to_string(block.count) +
" with error " + status.ToString());
auto ret = std::make_shared<ArrowDataWrapper>();
for (int64_t i = 0; i < block.count; ++i) {
std::shared_ptr<arrow::Table> table;
auto status =
row_group_reader->ReadNextRowGroup(&table);
AssertInfo(status.ok(),
"[StorageV2] Failed to read row group " +
std::to_string(block.offset + i) +
" from file " + file +
" with error " + status.ToString());
ret->arrow_tables.push_back(
{file_idx,
static_cast<size_t>(block.offset + i),
table});
}
auto close_status = row_group_reader->Close();
AssertInfo(
close_status.ok(),
"[StorageV2] Failed to close row group reader "
"for file " +
file + " with error " +
close_status.ToString());
return ret;
}));
}
for (auto& future : futures) {

View File

@ -108,16 +108,21 @@ NewPackedWriterWithStorageConfig(struct ArrowSchema* schema,
}
auto writer_properties = builder.build();
auto writer = std::make_unique<milvus_storage::PackedRecordBatchWriter>(
trueFs,
truePaths,
trueSchema,
storage_config,
columnGroups,
buffer_size,
writer_properties);
AssertInfo(writer, "[StorageV2] writer pointer is null");
*c_packed_writer = writer.release();
auto result =
milvus_storage::PackedRecordBatchWriter::Make(trueFs,
truePaths,
trueSchema,
storage_config,
columnGroups,
buffer_size,
writer_properties);
AssertInfo(result.ok(),
"[StorageV2] Failed to create packed writer: " +
result.status().ToString());
auto writer = result.ValueOrDie();
*c_packed_writer =
new std::shared_ptr<milvus_storage::PackedRecordBatchWriter>(
std::move(writer));
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
@ -177,16 +182,21 @@ NewPackedWriter(struct ArrowSchema* schema,
}
auto writer_properties = builder.build();
auto writer = std::make_unique<milvus_storage::PackedRecordBatchWriter>(
trueFs,
truePaths,
trueSchema,
conf,
columnGroups,
buffer_size,
writer_properties);
AssertInfo(writer, "[StorageV2] writer pointer is null");
*c_packed_writer = writer.release();
auto result =
milvus_storage::PackedRecordBatchWriter::Make(trueFs,
truePaths,
trueSchema,
conf,
columnGroups,
buffer_size,
writer_properties);
AssertInfo(result.ok(),
"[StorageV2] Failed to create packed writer: " +
result.status().ToString());
auto writer = result.ValueOrDie();
*c_packed_writer =
new std::shared_ptr<milvus_storage::PackedRecordBatchWriter>(
std::move(writer));
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
@ -201,9 +211,9 @@ WriteRecordBatch(CPackedWriter c_packed_writer,
SCOPE_CGO_CALL_METRIC();
try {
auto packed_writer =
static_cast<milvus_storage::PackedRecordBatchWriter*>(
c_packed_writer);
auto packed_writer = *static_cast<
std::shared_ptr<milvus_storage::PackedRecordBatchWriter>*>(
c_packed_writer);
auto import_schema = arrow::ImportSchema(schema);
if (!import_schema.ok()) {
@ -248,10 +258,10 @@ CloseWriter(CPackedWriter c_packed_writer) {
SCOPE_CGO_CALL_METRIC();
try {
auto packed_writer =
static_cast<milvus_storage::PackedRecordBatchWriter*>(
c_packed_writer);
auto status = packed_writer->Close();
auto packed_writer = static_cast<
std::shared_ptr<milvus_storage::PackedRecordBatchWriter>*>(
c_packed_writer);
auto status = (*packed_writer)->Close();
delete packed_writer;
if (!status.ok()) {
return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed,

View File

@ -109,11 +109,15 @@ GroupChunkTranslator::GroupChunkTranslator(
parquet_file_metadata_.reserve(insert_files_.size());
row_group_meta_list_.reserve(insert_files_.size());
for (const auto& file : insert_files_) {
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
auto result = milvus_storage::FileRowGroupReader::Make(
fs,
file,
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
storage::GetReaderProperties());
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto reader = result.ValueOrDie();
parquet_file_metadata_.push_back(
reader->file_metadata()->GetParquetMetadata());

View File

@ -59,12 +59,16 @@ class GroupChunkTranslatorTest : public ::testing::TestWithParam<bool> {
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}};
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
milvus_storage::PackedRecordBatchWriter writer(fs_,
paths_,
arrow_schema_,
storage_config,
column_groups,
writer_memory);
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_,
paths_,
arrow_schema_,
storage_config,
column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
int64_t total_rows = 0;
for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema_, per_batch);
@ -72,9 +76,9 @@ class GroupChunkTranslatorTest : public ::testing::TestWithParam<bool> {
ConvertToArrowRecordBatch(dataset, dim, arrow_schema_);
total_rows += record_batch->num_rows();
EXPECT_TRUE(writer.Write(record_batch).ok());
EXPECT_TRUE(writer->Write(record_batch).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
}
protected:
@ -116,8 +120,12 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
milvus::proto::common::LoadPriority::LOW);
// num cells - get the expected number from the file directly
auto fr =
std::make_shared<milvus_storage::FileRowGroupReader>(fs_, paths_[0]);
auto reader_result =
milvus_storage::FileRowGroupReader::Make(fs_, paths_[0]);
AssertInfo(reader_result.ok(),
"[StorageV2] Failed to create file row group reader: " +
reader_result.status().ToString());
auto fr = reader_result.ValueOrDie();
auto expected_num_cells =
fr->file_metadata()->GetRowGroupMetadataVector().size();
auto row_group_metadata_vector =
@ -214,25 +222,33 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
std::vector<std::string> single_file_paths{file_path};
milvus_storage::PackedRecordBatchWriter writer(fs_,
single_file_paths,
arrow_schema_,
storage_config,
column_groups,
writer_memory);
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_,
single_file_paths,
arrow_schema_,
storage_config,
column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema_, per_batch);
auto record_batch =
ConvertToArrowRecordBatch(dataset, dim, arrow_schema_);
total_rows += record_batch->num_rows();
EXPECT_TRUE(writer.Write(record_batch).ok());
EXPECT_TRUE(writer->Write(record_batch).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
// Get the number of row groups in this file
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(
fs_, file_path);
auto reader_result =
milvus_storage::FileRowGroupReader::Make(fs_, file_path);
AssertInfo(reader_result.ok(),
"[StorageV2] Failed to create file row group reader: " +
reader_result.status().ToString());
auto fr = reader_result.ValueOrDie();
expected_row_groups_per_file.push_back(
fr->file_metadata()->GetRowGroupMetadataVector().size());
auto status = fr->Close();
@ -303,8 +319,12 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
auto usage = translator->estimated_byte_size_of_cell(i).first;
// Get the expected memory size from the corresponding file
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(
auto reader_result = milvus_storage::FileRowGroupReader::Make(
fs_, multi_file_paths[file_idx]);
AssertInfo(reader_result.ok(),
"[StorageV2] Failed to create file row group reader: " +
reader_result.status().ToString());
auto fr = reader_result.ValueOrDie();
auto row_group_metadata_vector =
fr->file_metadata()->GetRowGroupMetadataVector();
auto expected_size = static_cast<int64_t>(

View File

@ -1307,11 +1307,15 @@ GetFieldDatasFromStorageV2(std::vector<std::vector<std::string>>& remote_files,
for (auto& column_group_file : remote_chunk_files) {
// get all row groups for each file
std::vector<std::vector<int64_t>> row_group_lists;
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
auto result = milvus_storage::FileRowGroupReader::Make(
fs,
column_group_file,
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
GetReaderProperties());
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto reader = result.ValueOrDie();
auto row_group_num =
reader->file_metadata()->GetRowGroupMetadataVector().size();
@ -1515,12 +1519,16 @@ GetFieldIDList(FieldId column_group_id,
field_id_list.Add(column_group_id.get());
return field_id_list;
}
auto file_reader = std::make_shared<milvus_storage::FileRowGroupReader>(
auto result = milvus_storage::FileRowGroupReader::Make(
fs,
filepath,
arrow_schema,
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
GetReaderProperties());
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto file_reader = result.ValueOrDie();
field_id_list =
file_reader->file_metadata()->GetGroupFieldIDList().GetFieldIDList(
column_group_id.get());

View File

@ -22,14 +22,14 @@
#include "monitor/scope_metric.h"
ReaderHandle
createFFIReader(char* manifest,
createFFIReader(ColumnGroupsHandle column_groups_handle,
struct ArrowSchema* schema,
char** needed_columns,
int64_t needed_columns_size,
const std::shared_ptr<Properties>& properties) {
ReaderHandle reader_handler = 0;
FFIResult result = reader_new(manifest,
FFIResult result = reader_new(column_groups_handle,
schema,
needed_columns,
needed_columns_size,
@ -97,7 +97,7 @@ NewPackedFFIReader(const char* manifest_path,
}
CStatus
NewPackedFFIReaderWithManifest(const char* manifest_content,
NewPackedFFIReaderWithManifest(const ColumnGroupsHandle column_groups_handle,
struct ArrowSchema* schema,
char** needed_columns,
int64_t needed_columns_size,
@ -109,12 +109,10 @@ NewPackedFFIReaderWithManifest(const char* manifest_content,
try {
auto properties =
MakeInternalPropertiesFromStorageConfig(c_storage_config);
// Parse the column groups, the column groups is a JSON string
auto cpp_column_groups =
std::make_shared<milvus_storage::api::ColumnGroups>();
auto des_result =
cpp_column_groups->deserialize(std::string_view(manifest_content));
AssertInfo(des_result.ok(), "failed to deserialize column groups");
auto* cg_ptr = reinterpret_cast<
std::shared_ptr<milvus_storage::api::ColumnGroups>*>(
column_groups_handle);
auto cpp_column_groups = *cg_ptr;
auto reader = GetLoonReader(cpp_column_groups,
schema,

View File

@ -105,7 +105,7 @@ NewPackedFFIReader(const char* manifest_path,
* be freed after this call returns.
*/
CStatus
NewPackedFFIReaderWithManifest(const char* manifest_content,
NewPackedFFIReaderWithManifest(const ColumnGroupsHandle column_groups_handle,
struct ArrowSchema* schema,
char** needed_columns,
int64_t needed_columns_size,

View File

@ -257,12 +257,10 @@ GetManifest(const std::string& path,
// Parse the JSON string
json j = json::parse(path);
// Extract base_path and ver fields
// Extract base_path
std::string base_path = j.at("base_path").get<std::string>();
int64_t ver = j.at("ver").get<int64_t>();
// return std::make_pair(base_path, ver);
char* out_column_groups = nullptr;
ColumnGroupsHandle out_column_groups = 0;
int64_t out_read_version = 0;
FFIResult result = get_latest_column_groups(base_path.c_str(),
properties.get(),
@ -298,9 +296,8 @@ GetColumnGroups(
// Parse the JSON string
json j = json::parse(path);
// Extract base_path and ver fields
// Extract base_path
std::string base_path = j.at("base_path").get<std::string>();
int64_t ver = j.at("ver").get<int64_t>();
// TODO fetch manifest based on version after api supported
auto transaction =

View File

@ -14,7 +14,7 @@
# Update milvus-storage_VERSION for the first occurrence
milvus_add_pkg_config("milvus-storage")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( milvus-storage_VERSION ba7df7b)
set( milvus-storage_VERSION 5fff4f5)
set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git")
message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}")
message(STATUS "milvus-storage version: ${milvus-storage_VERSION}")

View File

@ -75,8 +75,16 @@ TEST_F(StorageV2IndexRawDataTest, TestGetRawData) {
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
auto arrow_schema = schema->ConvertToArrowSchema();
milvus_storage::PackedRecordBatchWriter writer(
fs_, paths, arrow_schema, storage_config, column_groups, writer_memory);
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_,
paths,
arrow_schema,
storage_config,
column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
int64_t total_rows = 0;
for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema, per_batch);
@ -84,9 +92,9 @@ TEST_F(StorageV2IndexRawDataTest, TestGetRawData) {
ConvertToArrowRecordBatch(dataset, dim, arrow_schema);
total_rows += record_batch->num_rows();
EXPECT_TRUE(writer.Write(record_batch).ok());
EXPECT_TRUE(writer->Write(record_batch).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
{
// test memory file manager

View File

@ -175,13 +175,14 @@ func (i *indexInspector) createIndexForSegment(ctx context.Context, segment *Seg
if err != nil {
return err
}
taskSlot := calculateIndexTaskSlot(segment.getSegmentSize())
indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, indexID)
indexType := GetIndexType(indexParams)
isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType)
taskSlot := calculateIndexTaskSlot(segment.getSegmentSize(), isVectorIndex)
// rewrite the index type if needed, and this final index type will be persisted in the meta
if vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType) && Params.KnowhereConfig.Enable.GetAsBool() {
if isVectorIndex && Params.KnowhereConfig.Enable.GetAsBool() {
var err error
indexParams, err = Params.KnowhereConfig.UpdateIndexParams(indexType, paramtable.BuildStage, indexParams)
if err != nil {
@ -227,9 +228,14 @@ func (i *indexInspector) reloadFromMeta() {
continue
}
indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, segIndex.IndexID)
indexType := GetIndexType(indexParams)
isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType)
taskSlot := calculateIndexTaskSlot(segment.getSegmentSize(), isVectorIndex)
i.scheduler.Enqueue(newIndexBuildTask(
model.CloneSegmentIndex(segIndex),
calculateIndexTaskSlot(segment.getSegmentSize()),
taskSlot,
i.meta,
i.handler,
i.storageCli,

View File

@ -369,8 +369,11 @@ func getSortStatus(sorted bool) string {
return "unsorted"
}
func calculateIndexTaskSlot(segmentSize int64) int64 {
func calculateIndexTaskSlot(segmentSize int64, isVectorIndex bool) int64 {
defaultSlots := Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64()
if !isVectorIndex {
defaultSlots = Params.DataCoordCfg.ScalarIndexTaskSlotUsage.GetAsInt64()
}
if segmentSize > 512*1024*1024 {
taskSlot := max(segmentSize/512/1024/1024, 1) * defaultSlots
return max(taskSlot, 1)

View File

@ -36,6 +36,7 @@ const (
type Highlighter interface {
AsSearchPipelineOperator(t *searchTask) (operator, error)
FieldIDs() []int64
}
// highlight task for one field
@ -114,18 +115,26 @@ func (h *LexicalHighlighter) addTaskWithQuery(fieldID int64, query *highlightQue
})
}
func (h *LexicalHighlighter) AsSearchPipelineOperator(t *searchTask) (operator, error) {
func (h *LexicalHighlighter) initHighlightQueries(t *searchTask) error {
// add query to highlight tasks
for _, query := range h.queries {
fieldID, ok := t.schema.MapFieldID(query.fieldName)
if !ok {
return nil, merr.WrapErrParameterInvalidMsg("highlight field not found in schema: %s", query.fieldName)
return merr.WrapErrParameterInvalidMsg("highlight field not found in schema: %s", query.fieldName)
}
h.addTaskWithQuery(fieldID, query)
}
return nil
}
func (h *LexicalHighlighter) AsSearchPipelineOperator(t *searchTask) (operator, error) {
return newLexicalHighlightOperator(t, lo.Values(h.tasks))
}
func (h *LexicalHighlighter) FieldIDs() []int64 {
return lo.Keys(h.tasks)
}
func NewLexicalHighlighter(highlighter *commonpb.Highlighter) (*LexicalHighlighter, error) {
params := funcutil.KeyValuePair2Map(highlighter.GetParams())
h := &LexicalHighlighter{

View File

@ -602,7 +602,12 @@ func (t *searchTask) createLexicalHighlighter(highlighter *commonpb.Highlighter,
if err != nil {
return err
}
return h.addTaskWithSearchText(fieldId, fieldName, analyzerName, texts)
err = h.addTaskWithSearchText(fieldId, fieldName, analyzerName, texts)
if err != nil {
return err
}
return h.initHighlightQueries(t)
}
return nil
}
@ -642,10 +647,24 @@ func (t *searchTask) initSearchRequest(ctx context.Context) error {
}
}
analyzer, err := funcutil.GetAttrByKeyFromRepeatedKV(AnalyzerKey, t.request.GetSearchParams())
if err == nil {
t.SearchRequest.AnalyzerName = analyzer
}
t.isIterator = isIterator
t.SearchRequest.Offset = offset
t.SearchRequest.FieldId = queryInfo.GetQueryFieldId()
if err := t.addHighlightTask(t.request.GetHighlighter(), queryInfo.GetMetricType(), queryInfo.GetQueryFieldId(), t.request.GetPlaceholderGroup(), t.SearchRequest.GetAnalyzerName()); err != nil {
return err
}
// add highlight field ids to output fields id
if t.highlighter != nil {
t.SearchRequest.OutputFieldsId = append(t.SearchRequest.OutputFieldsId, t.highlighter.FieldIDs()...)
}
if t.partitionKeyMode {
// isolation has tighter constraint, check first
mvErr := setQueryInfoIfMvEnable(queryInfo, t, plan)
@ -696,16 +715,6 @@ func (t *searchTask) initSearchRequest(ctx context.Context) error {
t.SearchRequest.GroupByFieldId = queryInfo.GroupByFieldId
t.SearchRequest.GroupSize = queryInfo.GroupSize
if t.SearchRequest.MetricType == metric.BM25 {
analyzer, err := funcutil.GetAttrByKeyFromRepeatedKV(AnalyzerKey, t.request.GetSearchParams())
if err == nil {
t.SearchRequest.AnalyzerName = analyzer
}
}
if err := t.addHighlightTask(t.request.GetHighlighter(), t.SearchRequest.MetricType, t.SearchRequest.FieldId, t.request.GetPlaceholderGroup(), t.SearchRequest.GetAnalyzerName()); err != nil {
return err
}
if embedding.HasNonBM25Functions(t.schema.CollectionSchema.Functions, []int64{queryInfo.GetQueryFieldId()}) {
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Search-call-function-udf")
defer sp.End()

View File

@ -1078,18 +1078,13 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
log.Warn("fail to get primary field schema", zap.Error(err))
return err
}
deduplicatedFieldsData, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, it.req.GetFieldsData(), schema)
duplicate, err := CheckDuplicatePkExist(primaryFieldSchema, it.req.GetFieldsData())
if err != nil {
log.Warn("fail to deduplicate upsert data", zap.Error(err))
log.Warn("fail to check duplicate primary keys", zap.Error(err))
return err
}
// dedup won't decrease numOfRows to 0
if newNumRows > 0 && newNumRows != it.req.NumRows {
log.Info("upsert data deduplicated",
zap.Uint32("original_num_rows", it.req.NumRows),
zap.Uint32("deduplicated_num_rows", newNumRows))
it.req.FieldsData = deduplicatedFieldsData
it.req.NumRows = newNumRows
if duplicate {
return merr.WrapErrParameterInvalidMsg("duplicate primary keys are not allowed in the same batch")
}
it.upsertMsg = &msgstream.UpsertMsg{

View File

@ -35,7 +35,6 @@ import (
"github.com/milvus-io/milvus/internal/proxy/shardclient"
"github.com/milvus-io/milvus/internal/util/function/embedding"
"github.com/milvus-io/milvus/internal/util/segcore"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
"github.com/milvus-io/milvus/pkg/v2/proto/planpb"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
@ -1467,402 +1466,137 @@ func TestGenNullableFieldData_GeometryAndTimestamptz(t *testing.T) {
})
}
func TestUpsertTask_PlanNamespace_AfterPreExecute(t *testing.T) {
mockey.PatchConvey("TestUpsertTask_PlanNamespace_AfterPreExecute", t, func() {
// Setup global meta cache and common mocks
globalMetaCache = &MetaCache{}
mockey.Mock(GetReplicateID).Return("", nil).Build()
mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build()
mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{updateTimestamp: 12345}, nil).Build()
mockey.Mock((*MetaCache).GetPartitionInfo).Return(&partitionInfo{name: "_default"}, nil).Build()
mockey.Mock((*MetaCache).GetPartitionID).Return(int64(1002), nil).Build()
mockey.Mock(isPartitionKeyMode).Return(false, nil).Build()
mockey.Mock(validatePartitionTag).Return(nil).Build()
// Schema with namespace enabled
mockey.Mock((*MetaCache).GetCollectionSchema).To(func(_ *MetaCache, _ context.Context, _ string, _ string) (*schemaInfo, error) {
info := createTestSchema()
info.CollectionSchema.Properties = append(info.CollectionSchema.Properties, &commonpb.KeyValuePair{Key: common.NamespaceEnabledKey, Value: "true"})
return info, nil
}).Build()
// Capture plan to verify namespace
var capturedPlan *planpb.PlanNode
mockey.Mock(planparserv2.CreateRequeryPlan).To(func(_ *schemapb.FieldSchema, _ *schemapb.IDs) *planpb.PlanNode {
capturedPlan = &planpb.PlanNode{}
return capturedPlan
}).Build()
// Mock query to return a valid result for queryPreExecute merge path
mockey.Mock((*Proxy).query).Return(&milvuspb.QueryResults{
Status: merr.Success(),
FieldsData: []*schemapb.FieldData{
{
FieldName: "id",
FieldId: 100,
Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: []int64{1, 2}}}}},
},
{
FieldName: "name",
FieldId: 102,
Type: schemapb.DataType_VarChar,
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_StringData{StringData: &schemapb.StringArray{Data: []string{"old1", "old2"}}}}},
},
{
FieldName: "vector",
FieldId: 101,
Type: schemapb.DataType_FloatVector,
Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Dim: 128, Data: &schemapb.VectorField_FloatVector{FloatVector: &schemapb.FloatArray{Data: make([]float32, 256)}}}},
},
},
}, segcore.StorageCost{}, nil).Build()
// Build task
task := createTestUpdateTask()
ns := "ns-1"
task.req.PartialUpdate = true
task.req.Namespace = &ns
// Skip insert/delete heavy logic
mockey.Mock((*upsertTask).insertPreExecute).Return(nil).Build()
mockey.Mock((*upsertTask).deletePreExecute).Return(nil).Build()
err := task.PreExecute(context.Background())
assert.NoError(t, err)
assert.NotNil(t, capturedPlan)
assert.NotNil(t, capturedPlan.Namespace)
assert.Equal(t, *task.req.Namespace, *capturedPlan.Namespace)
})
}
func TestUpsertTask_Deduplicate_Int64PK(t *testing.T) {
// Test deduplication with Int64 primary key
primaryFieldSchema := &schemapb.FieldSchema{
Name: "id",
FieldID: 100,
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
}
collSchema := &schemapb.CollectionSchema{
func TestUpsertTask_DuplicatePK_Int64(t *testing.T) {
schema := &schemapb.CollectionSchema{
Name: "test_duplicate_pk",
Fields: []*schemapb.FieldSchema{
primaryFieldSchema,
{
Name: "float_field",
FieldID: 101,
DataType: schemapb.DataType_Float,
},
{FieldID: 100, Name: "id", IsPrimaryKey: true, DataType: schemapb.DataType_Int64},
{FieldID: 101, Name: "value", DataType: schemapb.DataType_Int32},
},
}
schema := newSchemaInfo(collSchema)
// Create field data with duplicate IDs: [1, 2, 3, 2, 1]
// Expected to keep last occurrence of each: [3, 2, 1] (indices 2, 3, 4)
// Data with duplicate primary keys: 1, 2, 1 (duplicate)
fieldsData := []*schemapb.FieldData{
{
FieldName: "id",
FieldId: 100,
Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: []int64{1, 2, 3, 2, 1},
},
LongData: &schemapb.LongArray{Data: []int64{1, 2, 1}},
},
},
},
},
{
FieldName: "float_field",
Type: schemapb.DataType_Float,
FieldName: "value",
FieldId: 101,
Type: schemapb.DataType_Int32,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_FloatData{
FloatData: &schemapb.FloatArray{
Data: []float32{1.1, 2.2, 3.3, 2.4, 1.5},
},
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{Data: []int32{100, 200, 300}},
},
},
},
},
}
deduplicatedFields, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema)
// Test CheckDuplicatePkExist directly
primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(schema)
assert.NoError(t, err)
assert.Equal(t, uint32(3), newNumRows)
assert.Equal(t, 2, len(deduplicatedFields))
// Check deduplicated primary keys
pkField := deduplicatedFields[0]
pkData := pkField.GetScalars().GetLongData().GetData()
assert.Equal(t, 3, len(pkData))
assert.Equal(t, []int64{3, 2, 1}, pkData)
// Check corresponding float values (should be 3.3, 2.4, 1.5)
floatField := deduplicatedFields[1]
floatData := floatField.GetScalars().GetFloatData().GetData()
assert.Equal(t, 3, len(floatData))
assert.Equal(t, []float32{3.3, 2.4, 1.5}, floatData)
hasDuplicate, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
assert.NoError(t, err)
assert.True(t, hasDuplicate, "should detect duplicate primary keys")
}
func TestUpsertTask_Deduplicate_VarCharPK(t *testing.T) {
// Test deduplication with VarChar primary key
primaryFieldSchema := &schemapb.FieldSchema{
Name: "id",
FieldID: 100,
DataType: schemapb.DataType_VarChar,
IsPrimaryKey: true,
}
collSchema := &schemapb.CollectionSchema{
func TestUpsertTask_DuplicatePK_VarChar(t *testing.T) {
schema := &schemapb.CollectionSchema{
Name: "test_duplicate_pk_varchar",
Fields: []*schemapb.FieldSchema{
primaryFieldSchema,
{
Name: "int_field",
FieldID: 101,
DataType: schemapb.DataType_Int64,
},
{FieldID: 100, Name: "id", IsPrimaryKey: true, DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: "max_length", Value: "100"}}},
{FieldID: 101, Name: "value", DataType: schemapb.DataType_Int32},
},
}
schema := newSchemaInfo(collSchema)
// Create field data with duplicate IDs: ["a", "b", "c", "b", "a"]
// Expected to keep last occurrence of each: ["c", "b", "a"] (indices 2, 3, 4)
// Data with duplicate primary keys: "a", "b", "a" (duplicate)
fieldsData := []*schemapb.FieldData{
{
FieldName: "id",
FieldId: 100,
Type: schemapb.DataType_VarChar,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{
Data: []string{"a", "b", "c", "b", "a"},
},
StringData: &schemapb.StringArray{Data: []string{"a", "b", "a"}},
},
},
},
},
{
FieldName: "int_field",
Type: schemapb.DataType_Int64,
FieldName: "value",
FieldId: 101,
Type: schemapb.DataType_Int32,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: []int64{100, 200, 300, 201, 101},
},
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{Data: []int32{100, 200, 300}},
},
},
},
},
}
deduplicatedFields, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema)
// Test CheckDuplicatePkExist directly
primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(schema)
assert.NoError(t, err)
assert.Equal(t, uint32(3), newNumRows)
assert.Equal(t, 2, len(deduplicatedFields))
// Check deduplicated primary keys
pkField := deduplicatedFields[0]
pkData := pkField.GetScalars().GetStringData().GetData()
assert.Equal(t, 3, len(pkData))
assert.Equal(t, []string{"c", "b", "a"}, pkData)
// Check corresponding int64 values (should be 300, 201, 101)
int64Field := deduplicatedFields[1]
int64Data := int64Field.GetScalars().GetLongData().GetData()
assert.Equal(t, 3, len(int64Data))
assert.Equal(t, []int64{300, 201, 101}, int64Data)
hasDuplicate, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
assert.NoError(t, err)
assert.True(t, hasDuplicate, "should detect duplicate primary keys")
}
func TestUpsertTask_Deduplicate_NoDuplicates(t *testing.T) {
// Test with no duplicates - should return original data
primaryFieldSchema := &schemapb.FieldSchema{
Name: "id",
FieldID: 100,
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
}
collSchema := &schemapb.CollectionSchema{
func TestUpsertTask_NoDuplicatePK(t *testing.T) {
schema := &schemapb.CollectionSchema{
Name: "test_no_duplicate_pk",
Fields: []*schemapb.FieldSchema{
primaryFieldSchema,
{FieldID: 100, Name: "id", IsPrimaryKey: true, DataType: schemapb.DataType_Int64},
{FieldID: 101, Name: "value", DataType: schemapb.DataType_Int32},
},
}
schema := newSchemaInfo(collSchema)
// Data with unique primary keys: 1, 2, 3
fieldsData := []*schemapb.FieldData{
{
FieldName: "id",
FieldId: 100,
Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: []int64{1, 2, 3, 4, 5},
},
LongData: &schemapb.LongArray{Data: []int64{1, 2, 3}},
},
},
},
},
}
deduplicatedFields, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema)
assert.NoError(t, err)
assert.Equal(t, uint32(5), newNumRows)
assert.Equal(t, 1, len(deduplicatedFields))
// Should be unchanged
pkField := deduplicatedFields[0]
pkData := pkField.GetScalars().GetLongData().GetData()
assert.Equal(t, []int64{1, 2, 3, 4, 5}, pkData)
}
func TestUpsertTask_Deduplicate_WithVector(t *testing.T) {
// Test deduplication with vector field
primaryFieldSchema := &schemapb.FieldSchema{
Name: "id",
FieldID: 100,
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
}
collSchema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
primaryFieldSchema,
{
Name: "vector",
FieldID: 101,
DataType: schemapb.DataType_FloatVector,
},
},
}
schema := newSchemaInfo(collSchema)
dim := 4
// Create field data with duplicate IDs: [1, 2, 1]
// Expected to keep indices [1, 2] (last occurrence of 2, last occurrence of 1)
fieldsData := []*schemapb.FieldData{
{
FieldName: "id",
Type: schemapb.DataType_Int64,
FieldName: "value",
FieldId: 101,
Type: schemapb.DataType_Int32,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: []int64{1, 2, 1},
},
},
},
},
},
{
FieldName: "vector",
Type: schemapb.DataType_FloatVector,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: []float32{
1.0, 1.1, 1.2, 1.3, // vector for ID 1 (first occurrence)
2.0, 2.1, 2.2, 2.3, // vector for ID 2
1.4, 1.5, 1.6, 1.7, // vector for ID 1 (second occurrence - keep this)
},
},
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{Data: []int32{100, 200, 300}},
},
},
},
},
}
deduplicatedFields, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema)
// Call CheckDuplicatePkExist directly to verify no duplicate error
primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(schema)
assert.NoError(t, err)
assert.Equal(t, uint32(2), newNumRows)
assert.Equal(t, 2, len(deduplicatedFields))
// Check deduplicated primary keys
pkField := deduplicatedFields[0]
pkData := pkField.GetScalars().GetLongData().GetData()
assert.Equal(t, 2, len(pkData))
assert.Equal(t, []int64{2, 1}, pkData)
// Check corresponding vector (should keep vectors for ID 2 and ID 1's last occurrence)
vectorField := deduplicatedFields[1]
vectorData := vectorField.GetVectors().GetFloatVector().GetData()
assert.Equal(t, 8, len(vectorData)) // 2 vectors * 4 dimensions
expectedVector := []float32{
2.0, 2.1, 2.2, 2.3, // vector for ID 2
1.4, 1.5, 1.6, 1.7, // vector for ID 1 (last occurrence)
}
assert.Equal(t, expectedVector, vectorData)
}
func TestUpsertTask_Deduplicate_EmptyData(t *testing.T) {
// Test with empty data
primaryFieldSchema := &schemapb.FieldSchema{
Name: "id",
FieldID: 100,
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
}
collSchema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
primaryFieldSchema,
},
}
schema := newSchemaInfo(collSchema)
fieldsData := []*schemapb.FieldData{}
deduplicatedFields, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema)
hasDuplicate, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
assert.NoError(t, err)
assert.Equal(t, uint32(0), newNumRows)
assert.Equal(t, 0, len(deduplicatedFields))
}
func TestUpsertTask_Deduplicate_MissingPrimaryKey(t *testing.T) {
// Test with missing primary key field
primaryFieldSchema := &schemapb.FieldSchema{
Name: "id",
FieldID: 100,
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
}
collSchema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
primaryFieldSchema,
{
Name: "other_field",
FieldID: 101,
DataType: schemapb.DataType_Float,
},
},
}
schema := newSchemaInfo(collSchema)
fieldsData := []*schemapb.FieldData{
{
FieldName: "other_field",
Type: schemapb.DataType_Float,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_FloatData{
FloatData: &schemapb.FloatArray{
Data: []float32{1.1, 2.2},
},
},
},
},
},
}
_, _, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema)
assert.Error(t, err)
// validateFieldDataColumns will fail first due to column count mismatch
// or the function will fail when trying to find primary key
assert.True(t, err != nil)
assert.False(t, hasDuplicate, "should not have duplicate primary keys")
}

View File

@ -1049,31 +1049,25 @@ func parsePrimaryFieldData2IDs(fieldData *schemapb.FieldData) (*schemapb.IDs, er
return primaryData, nil
}
// findLastOccurrenceIndices finds indices of last occurrences for each unique ID
func findLastOccurrenceIndices[T comparable](ids []T) []int {
lastOccurrence := make(map[T]int, len(ids))
for idx, id := range ids {
lastOccurrence[id] = idx
}
keepIndices := make([]int, 0, len(lastOccurrence))
for idx, id := range ids {
if lastOccurrence[id] == idx {
keepIndices = append(keepIndices, idx)
// hasDuplicates checks if there are any duplicate values in the slice.
// Returns true immediately when the first duplicate is found (early exit).
func hasDuplicates[T comparable](ids []T) bool {
seen := make(map[T]struct{}, len(ids))
for _, id := range ids {
if _, exists := seen[id]; exists {
return true
}
seen[id] = struct{}{}
}
return keepIndices
return false
}
// DeduplicateFieldData removes duplicate primary keys from field data,
// keeping the last occurrence of each ID
func DeduplicateFieldData(primaryFieldSchema *schemapb.FieldSchema, fieldsData []*schemapb.FieldData, schema *schemaInfo) ([]*schemapb.FieldData, uint32, error) {
// CheckDuplicatePkExist checks if there are duplicate primary keys in the field data.
// Returns (true, nil) if duplicates exist, (false, nil) if no duplicates.
// Returns (false, error) if there's an error during checking.
func CheckDuplicatePkExist(primaryFieldSchema *schemapb.FieldSchema, fieldsData []*schemapb.FieldData) (bool, error) {
if len(fieldsData) == 0 {
return fieldsData, 0, nil
}
if err := fillFieldPropertiesOnly(fieldsData, schema); err != nil {
return nil, 0, err
return false, nil
}
// find primary field data
@ -1086,64 +1080,26 @@ func DeduplicateFieldData(primaryFieldSchema *schemapb.FieldSchema, fieldsData [
}
if primaryFieldData == nil {
return nil, 0, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("must assign pk when upsert, primary field: %v", primaryFieldSchema.GetName()))
return false, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("must assign pk when upsert, primary field: %v", primaryFieldSchema.GetName()))
}
// get row count
var numRows int
// check for duplicates based on primary key type
switch primaryFieldData.Field.(type) {
case *schemapb.FieldData_Scalars:
scalarField := primaryFieldData.GetScalars()
switch scalarField.Data.(type) {
case *schemapb.ScalarField_LongData:
numRows = len(scalarField.GetLongData().GetData())
intIDs := scalarField.GetLongData().GetData()
return hasDuplicates(intIDs), nil
case *schemapb.ScalarField_StringData:
numRows = len(scalarField.GetStringData().GetData())
strIDs := scalarField.GetStringData().GetData()
return hasDuplicates(strIDs), nil
default:
return nil, 0, merr.WrapErrParameterInvalidMsg("unsupported primary key type")
return false, merr.WrapErrParameterInvalidMsg("unsupported primary key type")
}
default:
return nil, 0, merr.WrapErrParameterInvalidMsg("primary field must be scalar type")
return false, merr.WrapErrParameterInvalidMsg("primary field must be scalar type")
}
if numRows == 0 {
return fieldsData, 0, nil
}
// build map to track last occurrence of each primary key
var keepIndices []int
switch primaryFieldData.Field.(type) {
case *schemapb.FieldData_Scalars:
scalarField := primaryFieldData.GetScalars()
switch scalarField.Data.(type) {
case *schemapb.ScalarField_LongData:
// for Int64 primary keys
intIDs := scalarField.GetLongData().GetData()
keepIndices = findLastOccurrenceIndices(intIDs)
case *schemapb.ScalarField_StringData:
// for VarChar primary keys
strIDs := scalarField.GetStringData().GetData()
keepIndices = findLastOccurrenceIndices(strIDs)
}
}
// if no duplicates found, return original data
if len(keepIndices) == numRows {
return fieldsData, uint32(numRows), nil
}
log.Info("duplicate primary keys detected in upsert request, deduplicating",
zap.Int("original_rows", numRows),
zap.Int("deduplicated_rows", len(keepIndices)))
// use typeutil.AppendFieldData to rebuild field data with deduplicated rows
result := typeutil.PrepareResultFieldData(fieldsData, int64(len(keepIndices)))
for _, idx := range keepIndices {
typeutil.AppendFieldData(result, fieldsData, int64(idx))
}
return result, uint32(len(keepIndices)), nil
}
// autoGenPrimaryFieldData generate primary data when autoID == true
@ -1214,12 +1170,12 @@ func validateFieldDataColumns(columns []*schemapb.FieldData, schema *schemaInfo)
expectColumnNum := 0
// Count expected columns
for _, field := range schema.GetFields() {
for _, field := range schema.CollectionSchema.GetFields() {
if !typeutil.IsBM25FunctionOutputField(field, schema.CollectionSchema) {
expectColumnNum++
}
}
for _, structField := range schema.GetStructArrayFields() {
for _, structField := range schema.CollectionSchema.GetStructArrayFields() {
expectColumnNum += len(structField.GetFields())
}

View File

@ -4866,3 +4866,147 @@ func TestGetStorageCost(t *testing.T) {
assert.True(t, ok)
})
}
func TestCheckDuplicatePkExist_Int64PK(t *testing.T) {
primaryFieldSchema := &schemapb.FieldSchema{
Name: "id",
FieldID: 100,
DataType: schemapb.DataType_Int64,
}
t.Run("with duplicates", func(t *testing.T) {
fieldsData := []*schemapb.FieldData{
{
FieldName: "id",
Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: []int64{1, 2, 3, 1, 4, 2}, // duplicates: 1, 2
},
},
},
},
},
}
hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
assert.NoError(t, err)
assert.True(t, hasDup)
})
t.Run("without duplicates", func(t *testing.T) {
fieldsData := []*schemapb.FieldData{
{
FieldName: "id",
Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: []int64{1, 2, 3, 4, 5},
},
},
},
},
},
}
hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
assert.NoError(t, err)
assert.False(t, hasDup)
})
}
func TestCheckDuplicatePkExist_VarCharPK(t *testing.T) {
primaryFieldSchema := &schemapb.FieldSchema{
Name: "id",
FieldID: 100,
DataType: schemapb.DataType_VarChar,
}
t.Run("with duplicates", func(t *testing.T) {
fieldsData := []*schemapb.FieldData{
{
FieldName: "id",
Type: schemapb.DataType_VarChar,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{
Data: []string{"a", "b", "c", "a", "d"}, // duplicate: "a"
},
},
},
},
},
}
hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
assert.NoError(t, err)
assert.True(t, hasDup)
})
t.Run("without duplicates", func(t *testing.T) {
fieldsData := []*schemapb.FieldData{
{
FieldName: "id",
Type: schemapb.DataType_VarChar,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{
Data: []string{"a", "b", "c", "d", "e"},
},
},
},
},
},
}
hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
assert.NoError(t, err)
assert.False(t, hasDup)
})
}
func TestCheckDuplicatePkExist_EmptyData(t *testing.T) {
primaryFieldSchema := &schemapb.FieldSchema{
Name: "id",
FieldID: 100,
DataType: schemapb.DataType_Int64,
}
hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, []*schemapb.FieldData{})
assert.NoError(t, err)
assert.False(t, hasDup)
}
func TestCheckDuplicatePkExist_MissingPrimaryKey(t *testing.T) {
primaryFieldSchema := &schemapb.FieldSchema{
Name: "id",
FieldID: 100,
DataType: schemapb.DataType_Int64,
}
fieldsData := []*schemapb.FieldData{
{
FieldName: "other_field",
Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: []int64{1, 2, 3},
},
},
},
},
},
}
hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
assert.Error(t, err)
assert.False(t, hasDup)
}

View File

@ -245,7 +245,7 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegment() {
for i := range c.collectionIDs {
plans := balancer.AssignSegment(ctx, c.collectionIDs[i], c.assignments[i], c.nodes, false)
if c.unstableAssignment {
suite.Equal(len(plans), len(c.expectPlans[i]))
assertSegmentPlanNumAndTargetNodeMatch(&suite.Suite, c.expectPlans[i], plans)
} else {
assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans)
}

View File

@ -559,7 +559,11 @@ func (b *MultiTargetBalancer) genSegmentPlan(ctx context.Context, replica *meta.
globalNodeSegments[node] = b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(node))
}
return b.genPlanByDistributions(nodeSegments, globalNodeSegments)
plans := b.genPlanByDistributions(nodeSegments, globalNodeSegments)
for i := range plans {
plans[i].Replica = replica
}
return plans
}
func (b *MultiTargetBalancer) genPlanByDistributions(nodeSegments, globalNodeSegments map[int64][]*meta.Segment) []SegmentAssignPlan {

View File

@ -1258,6 +1258,31 @@ func assertSegmentAssignPlanElementMatch(suite *suite.Suite, left []SegmentAssig
}
}
// assertSegmentPlanNumAndTargetNodeMatch checks that:
// 1. The number of plans matches the expected count
// 2. Each plan's target node is in the expected target nodes set (extracted from expectPlans)
// 3. The number of unique target nodes matches between expected and actual plans
func assertSegmentPlanNumAndTargetNodeMatch(suite *suite.Suite, expectPlans []SegmentAssignPlan, plans []SegmentAssignPlan) {
suite.Len(plans, len(expectPlans))
// Extract expected target nodes from expectPlans
expectedSet := make(map[int64]struct{})
for _, p := range expectPlans {
expectedSet[p.To] = struct{}{}
}
// Extract actual target nodes from plans
actualSet := make(map[int64]struct{})
for _, plan := range plans {
_, ok := expectedSet[plan.To]
suite.True(ok, "target node %d not in expected set", plan.To)
actualSet[plan.To] = struct{}{}
}
// Check that the number of unique target nodes matches
suite.Len(actualSet, len(expectedSet), "number of unique target nodes mismatch: expected %d, got %d", len(expectedSet), len(actualSet))
}
// remove it after resource group enhancement.
func assertChannelAssignPlanElementMatch(suite *suite.Suite, left []ChannelAssignPlan, right []ChannelAssignPlan, subset ...bool) {
type comparablePlan struct {
@ -1300,3 +1325,28 @@ func assertChannelAssignPlanElementMatch(suite *suite.Suite, left []ChannelAssig
suite.ElementsMatch(leftPlan, rightPlan)
}
}
// assertChannelPlanNumAndTargetNodeMatch checks that:
// 1. The number of plans matches the expected count
// 2. Each plan's target node is in the expected target nodes set (extracted from expectPlans)
// 3. The number of unique target nodes matches between expected and actual plans
func assertChannelPlanNumAndTargetNodeMatch(suite *suite.Suite, expectPlans []ChannelAssignPlan, plans []ChannelAssignPlan) {
suite.Len(plans, len(expectPlans))
// Extract expected target nodes from expectPlans
expectedSet := make(map[int64]struct{})
for _, p := range expectPlans {
expectedSet[p.To] = struct{}{}
}
// Extract actual target nodes from plans
actualSet := make(map[int64]struct{})
for _, plan := range plans {
_, ok := expectedSet[plan.To]
suite.True(ok, "target node %d not in expected set", plan.To)
actualSet[plan.To] = struct{}{}
}
// Check that the number of unique target nodes matches
suite.Len(actualSet, len(expectedSet), "number of unique target nodes mismatch: expected %d, got %d", len(expectedSet), len(actualSet))
}

View File

@ -282,7 +282,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() {
for i := range c.collectionIDs {
plans := balancer.AssignSegment(ctx, c.collectionIDs[i], c.assignments[i], c.nodes, false)
if c.unstableAssignment {
suite.Len(plans, len(c.expectPlans[i]))
assertSegmentPlanNumAndTargetNodeMatch(&suite.Suite, c.expectPlans[i], plans)
} else {
assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans)
}
@ -1699,8 +1699,9 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignChannel() {
{CollectionID: 1, ChannelName: "channel1"},
{CollectionID: 1, ChannelName: "channel2"},
},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
distributions: map[int64][]*meta.DmChannel{},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
distributions: map[int64][]*meta.DmChannel{},
unstableAssignment: true,
expectPlans: []ChannelAssignPlan{
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "channel1"}}, From: -1, To: 2},
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "channel2"}}, From: -1, To: 3},
@ -1771,7 +1772,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignChannel() {
// Test channel assignment
plans := balancer.AssignChannel(ctx, c.collectionID, dmChannels, c.nodes, true)
if c.unstableAssignment {
suite.Len(plans, len(c.expectPlans))
assertChannelPlanNumAndTargetNodeMatch(&suite.Suite, c.expectPlans, plans)
} else {
assertChannelAssignPlanElementMatch(&suite.Suite, c.expectPlans, plans)
}

View File

@ -244,13 +244,7 @@ func NewManifestReader(manifest string,
}
func (mr *ManifestReader) init() error {
// TODO add needed column option
manifest, err := packed.GetManifest(mr.manifest, mr.storageConfig)
if err != nil {
return err
}
reader, err := packed.NewFFIPackedReader(manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext)
reader, err := packed.NewFFIPackedReader(mr.manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext)
if err != nil {
return err
}

View File

@ -31,6 +31,7 @@ import (
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/cdata"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log"
@ -38,9 +39,11 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
)
func NewFFIPackedReader(manifest string, schema *arrow.Schema, neededColumns []string, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedReader, error) {
cManifest := C.CString(manifest)
defer C.free(unsafe.Pointer(cManifest))
func NewFFIPackedReader(manifestPath string, schema *arrow.Schema, neededColumns []string, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedReader, error) {
cColumnGroups, err := GetColumnGroups(manifestPath, storageConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to get manifest")
}
var cas cdata.CArrowSchema
cdata.ExportArrowSchema(schema, &cas)
@ -103,7 +106,7 @@ func NewFFIPackedReader(manifest string, schema *arrow.Schema, neededColumns []s
cNeededColumnArray := (**C.char)(unsafe.Pointer(&cNeededColumn[0]))
cNumColumns := C.int64_t(len(neededColumns))
status = C.NewPackedFFIReaderWithManifest(cManifest, cSchema, cNeededColumnArray, cNumColumns, &cPackedReader, cStorageConfig, pluginContextPtr)
status = C.NewPackedFFIReaderWithManifest(cColumnGroups, cSchema, cNeededColumnArray, cNumColumns, &cPackedReader, cStorageConfig, pluginContextPtr)
} else {
return nil, fmt.Errorf("storageConfig is required")
}
@ -184,30 +187,29 @@ func (r *FFIPackedReader) Release() {
r.Close()
}
func GetManifest(manifestPath string, storageConfig *indexpb.StorageConfig) (manifest string, err error) {
func GetColumnGroups(manifestPath string, storageConfig *indexpb.StorageConfig) (columnGroups C.ColumnGroupsHandle, err error) {
var cColumnGroups C.ColumnGroupsHandle
basePath, version, err := UnmarshalManfestPath(manifestPath)
if err != nil {
return "", err
return cColumnGroups, err
}
log.Info("GetManifest", zap.String("manifestPath", manifestPath), zap.String("basePath", basePath), zap.Int64("version", version))
cProperties, err := MakePropertiesFromStorageConfig(storageConfig, nil)
if err != nil {
return "", err
return cColumnGroups, err
}
cBasePath := C.CString(basePath)
defer C.free(unsafe.Pointer(cBasePath))
var cManifest *C.char
var cVersion C.int64_t
result := C.get_latest_column_groups(cBasePath, cProperties, &cManifest, &cVersion)
result := C.get_latest_column_groups(cBasePath, cProperties, &cColumnGroups, &cVersion)
err = HandleFFIResult(result)
if err != nil {
return "", err
return cColumnGroups, err
}
manifest = C.GoString(cManifest)
return manifest, nil
return cColumnGroups, nil
}
// Ensure FFIPackedReader implements array.RecordReader interface

View File

@ -133,9 +133,9 @@ func (pw *FFIPackedWriter) WriteRecordBatch(recordBatch arrow.Record) error {
}
func (pw *FFIPackedWriter) Close() (string, error) {
var manifest *C.char
var cColumnGroups C.ColumnGroupsHandle
result := C.writer_close(pw.cWriterHandle, nil, nil, 0, &manifest)
result := C.writer_close(pw.cWriterHandle, nil, nil, 0, &cColumnGroups)
if err := HandleFFIResult(result); err != nil {
return "", err
}
@ -143,7 +143,10 @@ func (pw *FFIPackedWriter) Close() (string, error) {
cBasePath := C.CString(pw.basePath)
defer C.free(unsafe.Pointer(cBasePath))
var transationHandle C.TransactionHandle
result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle)
// TODO pass version
// use -1 as latest
result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle, C.int64_t(-1))
if err := HandleFFIResult(result); err != nil {
return "", err
}
@ -157,23 +160,14 @@ func (pw *FFIPackedWriter) Close() (string, error) {
// #define LOON_TRANSACTION_RESOLVE_MERGE 1
// #define LOON_TRANSACTION_RESOLVE_MAX 2
var commitResult C.bool
result = C.transaction_commit(transationHandle, C.int16_t(0), C.int16_t(0), manifest, &commitResult)
var commitResult C.TransactionCommitResult
result = C.transaction_commit(transationHandle, C.int16_t(0), C.int16_t(0), cColumnGroups, &commitResult)
if err := HandleFFIResult(result); err != nil {
return "", err
}
var readVersion C.int64_t
// TODO: not atomic, need to get version from transaction
var cOutManifest *C.char
result = C.get_latest_column_groups(cBasePath, pw.cProperties, &cOutManifest, &readVersion)
if err := HandleFFIResult(result); err != nil {
return "", err
}
outManifest := C.GoString(cOutManifest)
log.Info("FFI writer closed with output manifest", zap.String("manifest", outManifest), zap.Int64("version", int64(readVersion)))
log.Info("FFI writer closed", zap.Int64("version", int64(commitResult.committed_version)))
defer C.properties_free(pw.cProperties)
return MarshalManifestPath(pw.basePath, int64(readVersion)), nil
return MarshalManifestPath(pw.basePath, int64(commitResult.committed_version)), nil
}

View File

@ -4617,6 +4617,7 @@ type dataCoordConfig struct {
MixCompactionSlotUsage ParamItem `refreshable:"true"`
L0DeleteCompactionSlotUsage ParamItem `refreshable:"true"`
IndexTaskSlotUsage ParamItem `refreshable:"true"`
ScalarIndexTaskSlotUsage ParamItem `refreshable:"true"`
StatsTaskSlotUsage ParamItem `refreshable:"true"`
AnalyzeTaskSlotUsage ParamItem `refreshable:"true"`
@ -5634,6 +5635,16 @@ if param targetVecIndexVersion is not set, the default value is -1, which means
}
p.IndexTaskSlotUsage.Init(base.mgr)
p.ScalarIndexTaskSlotUsage = ParamItem{
Key: "dataCoord.slot.scalarIndexTaskSlotUsage",
Version: "2.6.8",
Doc: "slot usage of scalar index task per 512mb",
DefaultValue: "16",
PanicIfEmpty: false,
Export: true,
}
p.ScalarIndexTaskSlotUsage.Init(base.mgr)
p.StatsTaskSlotUsage = ParamItem{
Key: "dataCoord.slot.statsTaskSlotUsage",
Version: "2.5.8",

View File

@ -212,6 +212,9 @@ make rebuild_cache >/dev/null 2>&1
CPU_ARCH=$(get_cpu_arch $CPU_TARGET)
# In case any 3rdparty (e.g. libavrocpp) requires a minimum version of CMake lower than 3.5
export CMAKE_POLICY_VERSION_MINIMUM=3.5
arch=$(uname -m)
CMAKE_CMD="cmake \
${CMAKE_EXTRA_ARGS} \

View File

@ -80,7 +80,7 @@ docker run -d ^
--health-start-period=90s ^
--health-timeout=20s ^
--health-retries=3 ^
milvusdb/milvus:v2.6.6 ^
milvusdb/milvus:v2.6.7 ^
milvus run standalone >nul
if %errorlevel% neq 0 (
echo Failed to start Milvus container.

View File

@ -59,7 +59,7 @@ EOF
--health-start-period=90s \
--health-timeout=20s \
--health-retries=3 \
milvusdb/milvus:v2.6.6 \
milvusdb/milvus:v2.6.7 \
milvus run standalone 1> /dev/null
}

View File

@ -436,7 +436,7 @@ func TestUpsertAutoID(t *testing.T) {
// upsert without pks -> error
vecColumn = hp.GenColumnData(nb, entity.FieldTypeFloatVector, *hp.TNewDataOption())
_, err = mc.Upsert(ctx, client.NewColumnBasedInsertOption(schema.CollectionName).WithColumns(vecColumn))
common.CheckErr(t, err, false, "has no corresponding fieldData pass in: invalid parameter")
common.CheckErr(t, err, false, "must assign pk when upsert")
}
// test upsert autoId collection

View File

@ -170,6 +170,10 @@ func (s *BalanceTestSuit) initCollection(collectionName string, replica int, cha
}
func (s *BalanceTestSuit) TestBalanceOnSingleReplica() {
testBalanceOnSingleReplica(s)
}
func testBalanceOnSingleReplica(s *BalanceTestSuit) {
name := "test_balance_" + funcutil.GenRandomStr()
s.initCollection(name, 1, 2, 2, 2000, 500)
@ -414,6 +418,16 @@ func (s *BalanceTestSuit) TestConcurrentBalanceChannelAndSegment() {
s.Equal(int64(0), failCounter.Load())
}
func (s *BalanceTestSuit) TestMultiTargetBalancePolicy() {
// Set balance policy to MultipleTargetBalancer
revertGuard := s.Cluster.MustModifyMilvusConfig(map[string]string{
paramtable.Get().QueryCoordCfg.Balancer.Key: "MultipleTargetBalancer",
})
defer revertGuard()
testBalanceOnSingleReplica(s)
}
func TestBalance(t *testing.T) {
suite.Run(t, new(BalanceTestSuit))
}

View File

@ -595,7 +595,7 @@ class ResponseChecker:
log.error(f"Query result {query_res} is not list")
return False
log.warning(f'Expected query result is {exp_res}')
# log.warning(f'Expected query result is {exp_res}')
@staticmethod
def check_query_iterator(query_res, func_name, check_items):

View File

@ -1221,14 +1221,13 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base):
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_partial_update_same_pk_same_field(self):
"""
target: Test PU will success and query will success
target: Test partial update on an existing pk with the same field will success
method:
1. Create a collection
2. Insert rows
3. Upsert the rows with same pk and same field
4. Query the rows
5. Upsert the rows with same pk and different field
expected: Step 2 -> 4 should success 5 should fail
3. Upsert a single row with existing pk and same field (partial update)
4. Query the row to verify the update
expected: All steps should success, and the field value should be updated
"""
# step 1: create collection
client = self._client()
@ -1248,16 +1247,19 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base):
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.upsert(client, collection_name, rows, partial_update=True)
# step 3: Upsert the rows with same pk and same field
new_rows = [{default_primary_key_field_name: 0,
default_int32_field_name: i} for i in range(default_nb)]
self.upsert(client, collection_name, new_rows, partial_update=True)
# step 3: Upsert a single row with existing pk=0 and update the same field
updated_value = 99999
new_row = {default_primary_key_field_name: 0,
default_int32_field_name: updated_value}
self.upsert(client, collection_name, [new_row], partial_update=True)
# step 4: Query the rows
# step 4: Query the row to verify the update
expected_row = {default_primary_key_field_name: 0,
default_int32_field_name: updated_value}
result = self.query(client, collection_name, filter=f"{default_primary_key_field_name} == 0",
check_task=CheckTasks.check_query_results,
output_fields=[default_int32_field_name],
check_items={exp_res: [new_rows[-1]],
check_items={exp_res: [expected_row],
"pk_name": default_primary_key_field_name})[0]
assert len(result) == 1

View File

@ -165,9 +165,9 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
# step 4: query the rows
new_rows = cf.convert_timestamptz(rows, default_timestamp_field_name, IANA_timezone)
self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
check_items={exp_res: new_rows,
"pk_name": default_primary_key_field_name})
check_task=CheckTasks.check_query_results,
check_items={exp_res: new_rows,
"pk_name": default_primary_key_field_name})
self.drop_collection(client, collection_name)

View File

@ -371,6 +371,72 @@ class TestMilvusClientUpsertInvalid(TestMilvusClientV2Base):
check_task=CheckTasks.err_res,
check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_duplicate_pk_int64(self):
"""
target: test upsert with duplicate primary keys (Int64)
method:
1. create collection with Int64 primary key
2. upsert data with duplicate primary keys in the same batch
expected: raise error - duplicate primary keys are not allowed
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# 2. upsert with duplicate PKs: 1, 2, 1 (duplicate)
rng = np.random.default_rng(seed=19530)
rows = [
{default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: 1.0, default_string_field_name: "first"},
{default_primary_key_field_name: 2, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: 2.0, default_string_field_name: "second"},
{default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: 1.1, default_string_field_name: "duplicate"},
]
error = {ct.err_code: 1100,
ct.err_msg: "duplicate primary keys are not allowed in the same batch"}
self.upsert(client, collection_name, rows,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_duplicate_pk_varchar(self):
"""
target: test upsert with duplicate primary keys (VarChar)
method:
1. create collection with VarChar primary key
2. upsert data with duplicate primary keys in the same batch
expected: raise error - duplicate primary keys are not allowed
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
dim = default_dim
# 1. create collection with VarChar primary key
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.VARCHAR, max_length=64, is_primary=True,
auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=dim)
schema.add_field(default_float_field_name, DataType.FLOAT)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_vector_field_name, metric_type="COSINE")
self.create_collection(client, collection_name, dimension=dim, schema=schema, index_params=index_params)
# 2. upsert with duplicate PKs: "a", "b", "a" (duplicate)
rng = np.random.default_rng(seed=19530)
rows = [
{default_primary_key_field_name: "a", default_vector_field_name: list(rng.random((1, dim))[0]),
default_float_field_name: 1.0},
{default_primary_key_field_name: "b", default_vector_field_name: list(rng.random((1, dim))[0]),
default_float_field_name: 2.0},
{default_primary_key_field_name: "a", default_vector_field_name: list(rng.random((1, dim))[0]),
default_float_field_name: 1.1},
]
error = {ct.err_code: 1100,
ct.err_msg: "duplicate primary keys are not allowed in the same batch"}
self.upsert(client, collection_name, rows,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
class TestMilvusClientUpsertValid(TestMilvusClientV2Base):
""" Test case of search interface """
@ -551,342 +617,3 @@ class TestMilvusClientUpsertValid(TestMilvusClientV2Base):
self.drop_partition(client, collection_name, partition_name)
if self.has_collection(client, collection_name)[0]:
self.drop_collection(client, collection_name)
class TestMilvusClientUpsertDedup(TestMilvusClientV2Base):
"""Test case for upsert deduplication functionality"""
@pytest.fixture(scope="function", params=["COSINE", "L2"])
def metric_type(self, request):
yield request.param
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_dedup_int64_pk(self):
"""
target: test upsert with duplicate int64 primary keys in same batch
method:
1. create collection with int64 primary key
2. upsert data with duplicate primary keys [1, 2, 3, 2, 1]
3. query to verify only last occurrence is kept
expected: only 3 unique records exist, with data from last occurrence
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# 2. upsert data with duplicate PKs: [1, 2, 3, 2, 1]
# Expected: keep last occurrence -> [3, 2, 1] at indices [2, 3, 4]
rng = np.random.default_rng(seed=19530)
rows = [
{default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: 1.0, default_string_field_name: "str_1_first"},
{default_primary_key_field_name: 2, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: 2.0, default_string_field_name: "str_2_first"},
{default_primary_key_field_name: 3, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: 3.0, default_string_field_name: "str_3"},
{default_primary_key_field_name: 2, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: 2.1, default_string_field_name: "str_2_last"},
{default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: 1.1, default_string_field_name: "str_1_last"},
]
results = self.upsert(client, collection_name, rows)[0]
# After deduplication, should only have 3 records
assert results['upsert_count'] == 3
# 3. query to verify deduplication - should have only 3 unique records
query_results = self.query(client, collection_name, filter="id >= 0")[0]
assert len(query_results) == 3
# Verify that last occurrence data is kept
id_to_data = {item['id']: item for item in query_results}
assert 1 in id_to_data
assert 2 in id_to_data
assert 3 in id_to_data
# Check that data from last occurrence is preserved
assert id_to_data[1]['float'] == 1.1
assert id_to_data[1]['varchar'] == "str_1_last"
assert id_to_data[2]['float'] == 2.1
assert id_to_data[2]['varchar'] == "str_2_last"
assert id_to_data[3]['float'] == 3.0
assert id_to_data[3]['varchar'] == "str_3"
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_dedup_varchar_pk(self):
"""
target: test upsert with duplicate varchar primary keys in same batch
method:
1. create collection with varchar primary key
2. upsert data with duplicate primary keys ["a", "b", "c", "b", "a"]
3. query to verify only last occurrence is kept
expected: only 3 unique records exist, with data from last occurrence
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection with varchar primary key
schema = self.create_schema(client, enable_dynamic_field=True)[0]
schema.add_field("id", DataType.VARCHAR, max_length=64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field("age", DataType.INT64)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_vector_field_name, metric_type="COSINE")
self.create_collection(client, collection_name, default_dim, schema=schema,
index_params=index_params, consistency_level="Strong")
# 2. upsert data with duplicate PKs: ["a", "b", "c", "b", "a"]
# Expected: keep last occurrence -> ["c", "b", "a"] at indices [2, 3, 4]
rng = np.random.default_rng(seed=19530)
rows = [
{"id": "a", default_vector_field_name: list(rng.random((1, default_dim))[0]),
"age": 10},
{"id": "b", default_vector_field_name: list(rng.random((1, default_dim))[0]),
"age": 20},
{"id": "c", default_vector_field_name: list(rng.random((1, default_dim))[0]),
"age": 30},
{"id": "b", default_vector_field_name: list(rng.random((1, default_dim))[0]),
"age": 21},
{"id": "a", default_vector_field_name: list(rng.random((1, default_dim))[0]),
"age": 11},
]
results = self.upsert(client, collection_name, rows)[0]
# After deduplication, should only have 3 records
assert results['upsert_count'] == 3
# 3. query to verify deduplication
query_results = self.query(client, collection_name, filter='id in ["a", "b", "c"]')[0]
assert len(query_results) == 3
# Verify that last occurrence data is kept
id_to_data = {item['id']: item for item in query_results}
assert "a" in id_to_data
assert "b" in id_to_data
assert "c" in id_to_data
# Check that data from last occurrence is preserved
assert id_to_data["a"]["age"] == 11
assert id_to_data["b"]["age"] == 21
assert id_to_data["c"]["age"] == 30
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_dedup_all_duplicates(self):
"""
target: test upsert when all records have same primary key
method:
1. create collection
2. upsert 5 records with same primary key
3. query to verify only 1 record exists
expected: only 1 record exists with data from last occurrence
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# 2. upsert data where all have same PK (id=1)
rng = np.random.default_rng(seed=19530)
rows = [
{default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: f"version_{i}"}
for i in range(5)
]
results = self.upsert(client, collection_name, rows)[0]
# After deduplication, should only have 1 record
assert results['upsert_count'] == 1
# 3. query to verify only 1 record exists
query_results = self.query(client, collection_name, filter="id == 1")[0]
assert len(query_results) == 1
# Verify it's the last occurrence (i=4)
assert query_results[0]['float'] == 4.0
assert query_results[0]['varchar'] == "version_4"
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_dedup_no_duplicates(self):
"""
target: test upsert with no duplicate primary keys
method:
1. create collection
2. upsert data with unique primary keys
3. query to verify all records exist
expected: all records exist as-is
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# 2. upsert data with unique PKs
rng = np.random.default_rng(seed=19530)
nb = 10
rows = [
{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)}
for i in range(nb)
]
results = self.upsert(client, collection_name, rows)[0]
# No deduplication should occur
assert results['upsert_count'] == nb
# 3. query to verify all records exist
query_results = self.query(client, collection_name, filter=f"id >= 0")[0]
assert len(query_results) == nb
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_upsert_dedup_large_batch(self):
"""
target: test upsert deduplication with large batch
method:
1. create collection
2. upsert large batch with 50% duplicate primary keys
3. query to verify correct number of records
expected: only unique records exist
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# 2. upsert large batch where each ID appears twice
rng = np.random.default_rng(seed=19530)
nb = 500
unique_ids = nb // 2 # 250 unique IDs
rows = []
for i in range(nb):
pk = i % unique_ids # This creates duplicates: 0,1,2...249,0,1,2...249
rows.append({
default_primary_key_field_name: pk,
default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: float(i), # Different value for each row
default_string_field_name: f"batch_{i}"
})
results = self.upsert(client, collection_name, rows)[0]
# After deduplication, should only have unique_ids records
assert results['upsert_count'] == unique_ids
# 3. query to verify correct number of records
query_results = self.query(client, collection_name, filter=f"id >= 0", limit=1000)[0]
assert len(query_results) == unique_ids
# Verify that last occurrence is kept (should have higher float values)
for item in query_results:
pk = item['id']
# Last occurrence of pk is at index (pk + unique_ids)
expected_float = float(pk + unique_ids)
assert item['float'] == expected_float
assert item['varchar'] == f"batch_{pk + unique_ids}"
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_dedup_with_partition(self):
"""
target: test upsert deduplication works correctly with partitions
method:
1. create collection with partition
2. upsert data with duplicates to specific partition
3. query to verify deduplication in partition
expected: deduplication works within partition
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
partition_name = cf.gen_unique_str("partition")
# 1. create collection and partition
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
self.create_partition(client, collection_name, partition_name)
# 2. upsert data with duplicates to partition
rng = np.random.default_rng(seed=19530)
rows = [
{default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: 1.0, default_string_field_name: "first"},
{default_primary_key_field_name: 2, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: 2.0, default_string_field_name: "unique"},
{default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: 1.1, default_string_field_name: "last"},
]
results = self.upsert(client, collection_name, rows, partition_name=partition_name)[0]
assert results['upsert_count'] == 2
# 3. query partition to verify deduplication
query_results = self.query(client, collection_name, filter="id >= 0",
partition_names=[partition_name])[0]
assert len(query_results) == 2
# Verify correct data
id_to_data = {item['id']: item for item in query_results}
assert id_to_data[1]['float'] == 1.1
assert id_to_data[1]['varchar'] == "last"
assert id_to_data[2]['float'] == 2.0
assert id_to_data[2]['varchar'] == "unique"
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_dedup_with_vectors(self):
"""
target: test upsert deduplication preserves correct vector data
method:
1. create collection
2. upsert data with duplicate PKs but different vectors
3. search to verify correct vector is preserved
expected: vector from last occurrence is preserved
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# 2. upsert data with duplicate PK=1 but different vectors
# Create distinctly different vectors for easy verification
first_vector = [1.0] * default_dim # All 1.0
last_vector = [2.0] * default_dim # All 2.0
rows = [
{default_primary_key_field_name: 1, default_vector_field_name: first_vector,
default_float_field_name: 1.0, default_string_field_name: "first"},
{default_primary_key_field_name: 2, default_vector_field_name: [0.5] * default_dim,
default_float_field_name: 2.0, default_string_field_name: "unique"},
{default_primary_key_field_name: 1, default_vector_field_name: last_vector,
default_float_field_name: 1.1, default_string_field_name: "last"},
]
results = self.upsert(client, collection_name, rows)[0]
assert results['upsert_count'] == 2
# 3. query to get vector data
query_results = self.query(client, collection_name, filter="id == 1",
output_fields=["id", "vector", "float", "varchar"])[0]
assert len(query_results) == 1
# Verify it's the last occurrence with last_vector
result = query_results[0]
assert result['float'] == 1.1
assert result['varchar'] == "last"
# Vector should be last_vector (all 2.0)
assert all(abs(v - 2.0) < 0.001 for v in result['vector'])
self.drop_collection(client, collection_name)

View File

@ -28,8 +28,8 @@ pytest-parallel
pytest-random-order
# pymilvus
pymilvus==2.7.0rc75
pymilvus[bulk_writer]==2.7.0rc75
pymilvus==2.7.0rc82
pymilvus[bulk_writer]==2.7.0rc82
# for protobuf
protobuf>=5.29.5

View File

@ -272,7 +272,7 @@ class TestAsyncMilvusClient(TestMilvusClientV2Base):
assert _index["indexed_rows"] == async_default_nb
assert _index["state"] == "Finished"
_load, _ = await self.async_milvus_client_wrap.get_load_state(c_name)
assert _load == LoadState.Loaded
assert _load['state'] == LoadState.Loaded
# dql tasks
tasks = []