mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 09:08:43 +08:00
Compare commits
11 Commits
eb81e6ed01
...
0c63ed95bb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0c63ed95bb | ||
|
|
64d19fb4f3 | ||
|
|
bf76a9e8e2 | ||
|
|
cfd49b7680 | ||
|
|
605816949f | ||
|
|
a659506e82 | ||
|
|
20ce9fdc23 | ||
|
|
8efe9ccac6 | ||
|
|
43fe215787 | ||
|
|
f85e86a6ec | ||
|
|
a308331b81 |
8
.env
8
.env
@ -5,11 +5,11 @@ IMAGE_ARCH=amd64
|
||||
OS_NAME=ubuntu22.04
|
||||
|
||||
# for services.builder.image in docker-compose.yml
|
||||
DATE_VERSION=20251107-4a6e8d8
|
||||
LATEST_DATE_VERSION=20251107-4a6e8d8
|
||||
DATE_VERSION=20251203-0171511
|
||||
LATEST_DATE_VERSION=20251203-0171511
|
||||
# for services.gpubuilder.image in docker-compose.yml
|
||||
GPU_DATE_VERSION=20251107-4a6e8d8
|
||||
LATEST_GPU_DATE_VERSION=20251107-4a6e8d8
|
||||
GPU_DATE_VERSION=20251203-0171511
|
||||
LATEST_GPU_DATE_VERSION=20251203-0171511
|
||||
|
||||
# for other services in docker-compose.yml
|
||||
MINIO_ADDRESS=minio:9000
|
||||
|
||||
@ -108,7 +108,6 @@ func cleanLocalDir(path string) {
|
||||
|
||||
func runComponent[T component](ctx context.Context,
|
||||
localMsg bool,
|
||||
runWg *sync.WaitGroup,
|
||||
creator func(context.Context, dependency.Factory) (T, error),
|
||||
metricRegister func(*prometheus.Registry),
|
||||
) *conc.Future[component] {
|
||||
@ -184,15 +183,15 @@ func (mr *MilvusRoles) printLDPreLoad() {
|
||||
}
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
|
||||
return runComponent(ctx, localMsg, wg, components.NewProxy, metrics.RegisterProxy)
|
||||
func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool) *conc.Future[component] {
|
||||
return runComponent(ctx, localMsg, components.NewProxy, metrics.RegisterProxy)
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runMixCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
|
||||
return runComponent(ctx, localMsg, wg, components.NewMixCoord, metrics.RegisterMixCoord)
|
||||
func (mr *MilvusRoles) runMixCoord(ctx context.Context, localMsg bool) *conc.Future[component] {
|
||||
return runComponent(ctx, localMsg, components.NewMixCoord, metrics.RegisterMixCoord)
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
|
||||
func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool) *conc.Future[component] {
|
||||
// clear local storage
|
||||
queryDataLocalPath := pathutil.GetPath(pathutil.RootCachePath, 0)
|
||||
if !paramtable.Get().CommonCfg.EnablePosixMode.GetAsBool() {
|
||||
@ -200,21 +199,24 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync
|
||||
// under posix mode, this clean task will be done by mixcoord
|
||||
cleanLocalDir(queryDataLocalPath)
|
||||
}
|
||||
return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode)
|
||||
return runComponent(ctx, localMsg, components.NewQueryNode, metrics.RegisterQueryNode)
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runStreamingNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
|
||||
return runComponent(ctx, localMsg, wg, components.NewStreamingNode, metrics.RegisterStreamingNode)
|
||||
func (mr *MilvusRoles) runStreamingNode(ctx context.Context, localMsg bool) *conc.Future[component] {
|
||||
return runComponent(ctx, localMsg, components.NewStreamingNode, metrics.RegisterStreamingNode)
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
|
||||
return runComponent(ctx, localMsg, wg, components.NewDataNode, metrics.RegisterDataNode)
|
||||
func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool) *conc.Future[component] {
|
||||
return runComponent(ctx, localMsg, components.NewDataNode, metrics.RegisterDataNode)
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runCDC(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
|
||||
return runComponent(ctx, localMsg, wg, components.NewCDC, metrics.RegisterCDC)
|
||||
func (mr *MilvusRoles) runCDC(ctx context.Context, localMsg bool) *conc.Future[component] {
|
||||
return runComponent(ctx, localMsg, components.NewCDC, metrics.RegisterCDC)
|
||||
}
|
||||
|
||||
// waitForAllComponentsReady waits for all components to be ready.
|
||||
// It will return an error if any component is not ready before closing with a fast fail strategy.
|
||||
// It will return a map of components that are ready.
|
||||
func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, componentFutureMap map[string]*conc.Future[component]) (map[string]component, error) {
|
||||
roles := make([]string, 0, len(componentFutureMap))
|
||||
futures := make([]*conc.Future[component], 0, len(componentFutureMap))
|
||||
@ -235,22 +237,20 @@ func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, comp
|
||||
}
|
||||
componentMap := make(map[string]component, len(componentFutureMap))
|
||||
readyCount := 0
|
||||
var gerr error
|
||||
for {
|
||||
index, _, _ := reflect.Select(selectCases)
|
||||
if index == 0 {
|
||||
cancel()
|
||||
log.Warn("components are not ready before closing, wait for the start of components to be canceled...")
|
||||
return nil, context.Canceled
|
||||
} else {
|
||||
role := roles[index-1]
|
||||
component, err := futures[index-1].Await()
|
||||
readyCount++
|
||||
if err != nil {
|
||||
if gerr == nil {
|
||||
gerr = errors.Wrapf(err, "component %s is not ready before closing", role)
|
||||
cancel()
|
||||
}
|
||||
cancel()
|
||||
log.Warn("component is not ready before closing", zap.String("role", role), zap.Error(err))
|
||||
return nil, err
|
||||
} else {
|
||||
componentMap[role] = component
|
||||
log.Info("component is ready", zap.String("role", role))
|
||||
@ -264,9 +264,6 @@ func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, comp
|
||||
break
|
||||
}
|
||||
}
|
||||
if gerr != nil {
|
||||
return nil, errors.Wrap(gerr, "failed to wait for all components ready")
|
||||
}
|
||||
return componentMap, nil
|
||||
}
|
||||
|
||||
@ -468,45 +465,43 @@ func (mr *MilvusRoles) Run() {
|
||||
defer streaming.Release()
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
local := mr.Local
|
||||
|
||||
componentFutureMap := make(map[string]*conc.Future[component])
|
||||
|
||||
if (mr.EnableRootCoord && mr.EnableDataCoord && mr.EnableQueryCoord) || mr.EnableMixCoord {
|
||||
paramtable.SetLocalComponentEnabled(typeutil.MixCoordRole)
|
||||
mixCoord := mr.runMixCoord(ctx, local, &wg)
|
||||
mixCoord := mr.runMixCoord(ctx, local)
|
||||
componentFutureMap[typeutil.MixCoordRole] = mixCoord
|
||||
}
|
||||
|
||||
if mr.EnableQueryNode {
|
||||
paramtable.SetLocalComponentEnabled(typeutil.QueryNodeRole)
|
||||
queryNode := mr.runQueryNode(ctx, local, &wg)
|
||||
queryNode := mr.runQueryNode(ctx, local)
|
||||
componentFutureMap[typeutil.QueryNodeRole] = queryNode
|
||||
}
|
||||
|
||||
if mr.EnableDataNode {
|
||||
paramtable.SetLocalComponentEnabled(typeutil.DataNodeRole)
|
||||
dataNode := mr.runDataNode(ctx, local, &wg)
|
||||
dataNode := mr.runDataNode(ctx, local)
|
||||
componentFutureMap[typeutil.DataNodeRole] = dataNode
|
||||
}
|
||||
|
||||
if mr.EnableProxy {
|
||||
paramtable.SetLocalComponentEnabled(typeutil.ProxyRole)
|
||||
proxy := mr.runProxy(ctx, local, &wg)
|
||||
proxy := mr.runProxy(ctx, local)
|
||||
componentFutureMap[typeutil.ProxyRole] = proxy
|
||||
}
|
||||
|
||||
if mr.EnableStreamingNode {
|
||||
// Before initializing the local streaming node, make sure the local registry is ready.
|
||||
paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole)
|
||||
streamingNode := mr.runStreamingNode(ctx, local, &wg)
|
||||
streamingNode := mr.runStreamingNode(ctx, local)
|
||||
componentFutureMap[typeutil.StreamingNodeRole] = streamingNode
|
||||
}
|
||||
|
||||
if mr.EnableCDC {
|
||||
paramtable.SetLocalComponentEnabled(typeutil.CDCRole)
|
||||
cdc := mr.runCDC(ctx, local, &wg)
|
||||
cdc := mr.runCDC(ctx, local)
|
||||
componentFutureMap[typeutil.CDCRole] = cdc
|
||||
}
|
||||
|
||||
|
||||
@ -735,6 +735,7 @@ dataCoord:
|
||||
mixCompactionUsage: 4 # slot usage of mix compaction task.
|
||||
l0DeleteCompactionUsage: 8 # slot usage of l0 compaction task.
|
||||
indexTaskSlotUsage: 64 # slot usage of index task per 512mb
|
||||
scalarIndexTaskSlotUsage: 16 # slot usage of scalar index task per 512mb
|
||||
statsTaskSlotUsage: 8 # slot usage of stats task per 512mb
|
||||
analyzeTaskSlotUsage: 65535 # slot usage of analyze task
|
||||
jsonShreddingTriggerCount: 10 # jsonkey stats task count per trigger
|
||||
|
||||
@ -46,6 +46,7 @@ class MilvusConan(ConanFile):
|
||||
"mongo-cxx-driver/3.11.0#ae206de0e90fb8cb2fb95465fb8b2f01",
|
||||
"geos/3.12.0#0b177c90c25a8ca210578fb9e2899c37",
|
||||
"icu/74.2#cd1937b9561b8950a2ae6311284c5813",
|
||||
"libavrocpp/1.12.1@milvus/dev",
|
||||
)
|
||||
|
||||
generators = ("cmake", "cmake_find_package")
|
||||
|
||||
@ -106,13 +106,16 @@ class TestVectorArrayStorageV2 : public testing::Test {
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
|
||||
// Create writer
|
||||
milvus_storage::PackedRecordBatchWriter writer(
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs,
|
||||
paths,
|
||||
schema_->ConvertToArrowSchema(),
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory);
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
|
||||
// Generate and write data
|
||||
int64_t row_count = 0;
|
||||
@ -201,9 +204,9 @@ class TestVectorArrayStorageV2 : public testing::Test {
|
||||
auto record_batch = arrow::RecordBatch::Make(
|
||||
schema_->ConvertToArrowSchema(), test_data_count_, arrays);
|
||||
row_count += test_data_count_;
|
||||
EXPECT_TRUE(writer.Write(record_batch).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
|
||||
LoadFieldDataInfo load_info;
|
||||
load_info.field_infos.emplace(
|
||||
|
||||
@ -716,8 +716,11 @@ JsonKeyStats::GetColumnSchemaFromParquet(int64_t column_group_id,
|
||||
const std::string& file) {
|
||||
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
|
||||
.GetArrowFileSystem();
|
||||
auto file_reader =
|
||||
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(fs, file);
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto file_reader = result.ValueOrDie();
|
||||
std::shared_ptr<arrow::Schema> file_schema = file_reader->schema();
|
||||
LOG_DEBUG("get column schema: [{}] for segment {}",
|
||||
file_schema->ToString(true),
|
||||
@ -778,9 +781,11 @@ JsonKeyStats::GetCommonMetaFromParquet(const std::string& file) {
|
||||
|
||||
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
|
||||
.GetArrowFileSystem();
|
||||
auto file_reader =
|
||||
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
|
||||
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(fs, file);
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto file_reader = result.ValueOrDie();
|
||||
// get key value metadata from parquet file
|
||||
std::shared_ptr<milvus_storage::PackedFileMetadata> metadata =
|
||||
file_reader->file_metadata();
|
||||
@ -874,8 +879,11 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id,
|
||||
|
||||
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
|
||||
.GetArrowFileSystem();
|
||||
auto file_reader =
|
||||
std::make_shared<milvus_storage::FileRowGroupReader>(fs, files[0]);
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(fs, files[0]);
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto file_reader = result.ValueOrDie();
|
||||
std::shared_ptr<milvus_storage::PackedFileMetadata> metadata =
|
||||
file_reader->file_metadata();
|
||||
milvus_storage::FieldIDList field_id_list =
|
||||
@ -886,8 +894,11 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id,
|
||||
}
|
||||
|
||||
for (const auto& file : files) {
|
||||
auto reader =
|
||||
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(fs, file);
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto reader = result.ValueOrDie();
|
||||
auto row_group_meta_vector =
|
||||
reader->file_metadata()->GetRowGroupMetadataVector();
|
||||
num_rows += row_group_meta_vector.row_num();
|
||||
|
||||
@ -108,16 +108,19 @@ JsonStatsParquetWriter::Init(const ParquetWriteContext& context) {
|
||||
schema_ = context.schema;
|
||||
builders_ = context.builders;
|
||||
builders_map_ = context.builders_map;
|
||||
kv_metadata_ = std::move(context.kv_metadata);
|
||||
kv_metadata_ = context.kv_metadata;
|
||||
column_groups_ = context.column_groups;
|
||||
file_paths_ = context.file_paths;
|
||||
packed_writer_ = std::make_unique<milvus_storage::PackedRecordBatchWriter>(
|
||||
fs_,
|
||||
file_paths_,
|
||||
schema_,
|
||||
storage_config_,
|
||||
column_groups_,
|
||||
buffer_size_);
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(fs_,
|
||||
file_paths_,
|
||||
schema_,
|
||||
storage_config_,
|
||||
column_groups_,
|
||||
buffer_size_);
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create packed writer: " +
|
||||
result.status().ToString());
|
||||
packed_writer_ = result.ValueOrDie();
|
||||
for (const auto& [key, value] : kv_metadata_) {
|
||||
packed_writer_->AddUserMetadata(key, value);
|
||||
}
|
||||
|
||||
@ -232,7 +232,7 @@ class JsonStatsParquetWriter {
|
||||
size_t batch_size_;
|
||||
std::shared_ptr<arrow::fs::FileSystem> fs_;
|
||||
milvus_storage::StorageConfig storage_config_;
|
||||
std::unique_ptr<milvus_storage::PackedRecordBatchWriter> packed_writer_;
|
||||
std::shared_ptr<milvus_storage::PackedRecordBatchWriter> packed_writer_;
|
||||
std::vector<std::pair<std::string, std::string>> kv_metadata_;
|
||||
|
||||
// cache for builders
|
||||
|
||||
@ -374,6 +374,13 @@ BuildJsonKeyIndex(ProtoLayoutInterface result,
|
||||
build_index_info->storage_plugin_context().encryption_zone_id(),
|
||||
build_index_info->storage_plugin_context().collection_id(),
|
||||
build_index_info->storage_plugin_context().encryption_key());
|
||||
|
||||
auto plugin_context = std::make_shared<CPluginContext>();
|
||||
plugin_context->ez_id =
|
||||
build_index_info->storage_plugin_context().encryption_zone_id();
|
||||
plugin_context->collection_id =
|
||||
build_index_info->storage_plugin_context().collection_id();
|
||||
fileManagerContext.set_plugin_context(plugin_context);
|
||||
}
|
||||
|
||||
auto field_schema =
|
||||
@ -465,6 +472,12 @@ BuildTextIndex(ProtoLayoutInterface result,
|
||||
build_index_info->storage_plugin_context().encryption_zone_id(),
|
||||
build_index_info->storage_plugin_context().collection_id(),
|
||||
build_index_info->storage_plugin_context().encryption_key());
|
||||
auto plugin_context = std::make_shared<CPluginContext>();
|
||||
plugin_context->ez_id =
|
||||
build_index_info->storage_plugin_context().encryption_zone_id();
|
||||
plugin_context->collection_id =
|
||||
build_index_info->storage_plugin_context().collection_id();
|
||||
fileManagerContext.set_plugin_context(plugin_context);
|
||||
}
|
||||
|
||||
auto scalar_index_engine_version =
|
||||
|
||||
@ -97,13 +97,16 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam<bool> {
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
|
||||
// Create writer
|
||||
milvus_storage::PackedRecordBatchWriter writer(
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs,
|
||||
paths,
|
||||
schema->ConvertToArrowSchema(),
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory);
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
|
||||
// Generate and write data
|
||||
int64_t row_count = 0;
|
||||
@ -159,9 +162,9 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam<bool> {
|
||||
auto record_batch = arrow::RecordBatch::Make(
|
||||
schema->ConvertToArrowSchema(), test_data_count, arrays);
|
||||
row_count += test_data_count;
|
||||
EXPECT_TRUE(writer.Write(record_batch).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
|
||||
LoadFieldDataInfo load_info;
|
||||
load_info.field_infos.emplace(
|
||||
|
||||
@ -440,11 +440,15 @@ SegmentGrowingImpl::load_column_group_data_internal(
|
||||
std::vector<std::vector<int64_t>> row_group_lists;
|
||||
row_group_lists.reserve(insert_files.size());
|
||||
for (const auto& file : insert_files) {
|
||||
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(
|
||||
fs,
|
||||
file,
|
||||
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
|
||||
storage::GetReaderProperties());
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto reader = result.ValueOrDie();
|
||||
auto row_group_num =
|
||||
reader->file_metadata()->GetRowGroupMetadataVector().size();
|
||||
std::vector<int64_t> all_row_groups(row_group_num);
|
||||
|
||||
@ -143,12 +143,20 @@ TEST_F(TestGrowingStorageV2, LoadFieldData) {
|
||||
auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}};
|
||||
auto writer_memory = 16 * 1024 * 1024;
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
milvus_storage::PackedRecordBatchWriter writer(
|
||||
fs_, paths, schema_, storage_config, column_groups, writer_memory);
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs_,
|
||||
paths,
|
||||
schema_,
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
for (int i = 0; i < batch_size; ++i) {
|
||||
EXPECT_TRUE(writer.Write(record_batch_).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch_).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
|
||||
auto schema = std::make_shared<milvus::Schema>();
|
||||
auto ts_fid = schema->AddDebugField("ts", milvus::DataType::INT64, true);
|
||||
@ -187,20 +195,32 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) {
|
||||
auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}};
|
||||
auto writer_memory = 16 * 1024 * 1024;
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
milvus_storage::PackedRecordBatchWriter writer(
|
||||
fs_, paths, schema_, storage_config, column_groups, writer_memory);
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs_,
|
||||
paths,
|
||||
schema_,
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
for (int i = 0; i < batch_size; ++i) {
|
||||
EXPECT_TRUE(writer.Write(record_batch_).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch_).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
|
||||
auto channel = std::make_shared<milvus::ArrowReaderChannel>();
|
||||
int64_t memory_limit = 1024 * 1024 * 1024; // 1GB
|
||||
uint64_t parallel_degree = 2;
|
||||
|
||||
// read all row groups
|
||||
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
fs_, paths[0], schema_);
|
||||
auto reader_result =
|
||||
milvus_storage::FileRowGroupReader::Make(fs_, paths[0]);
|
||||
AssertInfo(reader_result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
reader_result.status().ToString());
|
||||
auto fr = reader_result.ValueOrDie();
|
||||
auto row_group_metadata = fr->file_metadata()->GetRowGroupMetadataVector();
|
||||
auto status = fr->Close();
|
||||
AssertInfo(
|
||||
@ -349,8 +369,16 @@ TEST_F(TestGrowingStorageV2, TestAllDataTypes) {
|
||||
auto writer_memory = 16 * 1024 * 1024;
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
auto arrow_schema = schema->ConvertToArrowSchema();
|
||||
milvus_storage::PackedRecordBatchWriter writer(
|
||||
fs_, paths, arrow_schema, storage_config, column_groups, writer_memory);
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs_,
|
||||
paths,
|
||||
arrow_schema,
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
int64_t total_rows = 0;
|
||||
for (int64_t i = 0; i < n_batch; i++) {
|
||||
auto dataset = DataGen(schema, per_batch);
|
||||
@ -358,9 +386,9 @@ TEST_F(TestGrowingStorageV2, TestAllDataTypes) {
|
||||
ConvertToArrowRecordBatch(dataset, dim, arrow_schema);
|
||||
total_rows += record_batch->num_rows();
|
||||
|
||||
EXPECT_TRUE(writer.Write(record_batch).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
|
||||
// Load data back from storage v2
|
||||
LoadFieldDataInfo load_info;
|
||||
|
||||
@ -183,55 +183,54 @@ LoadWithStrategy(const std::vector<std::string>& remote_files,
|
||||
memory_limit / blocks.size(), FILE_SLICE_SIZE.load());
|
||||
|
||||
for (const auto& block : blocks) {
|
||||
futures.emplace_back(pool.Submit([block,
|
||||
fs,
|
||||
file,
|
||||
file_idx,
|
||||
schema,
|
||||
reader_memory_limit]() {
|
||||
AssertInfo(fs != nullptr,
|
||||
"[StorageV2] file system is nullptr");
|
||||
auto row_group_reader =
|
||||
std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
futures.emplace_back(pool.Submit(
|
||||
[block, fs, file, file_idx, schema, reader_memory_limit]() {
|
||||
AssertInfo(fs != nullptr,
|
||||
"[StorageV2] file system is nullptr");
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(
|
||||
fs,
|
||||
file,
|
||||
schema,
|
||||
reader_memory_limit,
|
||||
milvus::storage::GetReaderProperties());
|
||||
AssertInfo(row_group_reader != nullptr,
|
||||
"[StorageV2] row group reader is nullptr");
|
||||
row_group_reader->SetRowGroupOffsetAndCount(block.offset,
|
||||
block.count);
|
||||
LOG_INFO(
|
||||
"[StorageV2] read row groups from file {} with offset "
|
||||
"{} and count "
|
||||
"{}",
|
||||
file,
|
||||
block.offset,
|
||||
block.count);
|
||||
auto ret = std::make_shared<ArrowDataWrapper>();
|
||||
for (int64_t i = 0; i < block.count; ++i) {
|
||||
std::shared_ptr<arrow::Table> table;
|
||||
AssertInfo(
|
||||
result.ok(),
|
||||
"[StorageV2] Failed to create row group reader: " +
|
||||
result.status().ToString());
|
||||
auto row_group_reader = result.ValueOrDie();
|
||||
auto status =
|
||||
row_group_reader->ReadNextRowGroup(&table);
|
||||
row_group_reader->SetRowGroupOffsetAndCount(
|
||||
block.offset, block.count);
|
||||
AssertInfo(status.ok(),
|
||||
"[StorageV2] Failed to read row group " +
|
||||
std::to_string(block.offset + i) +
|
||||
" from file " + file + " with error " +
|
||||
status.ToString());
|
||||
ret->arrow_tables.push_back(
|
||||
{file_idx,
|
||||
static_cast<size_t>(block.offset + i),
|
||||
table});
|
||||
}
|
||||
auto close_status = row_group_reader->Close();
|
||||
AssertInfo(close_status.ok(),
|
||||
"[StorageV2] Failed to close row group reader "
|
||||
"for file " +
|
||||
file + " with error " +
|
||||
close_status.ToString());
|
||||
return ret;
|
||||
}));
|
||||
"[StorageV2] Failed to set row group offset "
|
||||
"and count " +
|
||||
std::to_string(block.offset) + " and " +
|
||||
std::to_string(block.count) +
|
||||
" with error " + status.ToString());
|
||||
auto ret = std::make_shared<ArrowDataWrapper>();
|
||||
for (int64_t i = 0; i < block.count; ++i) {
|
||||
std::shared_ptr<arrow::Table> table;
|
||||
auto status =
|
||||
row_group_reader->ReadNextRowGroup(&table);
|
||||
AssertInfo(status.ok(),
|
||||
"[StorageV2] Failed to read row group " +
|
||||
std::to_string(block.offset + i) +
|
||||
" from file " + file +
|
||||
" with error " + status.ToString());
|
||||
ret->arrow_tables.push_back(
|
||||
{file_idx,
|
||||
static_cast<size_t>(block.offset + i),
|
||||
table});
|
||||
}
|
||||
auto close_status = row_group_reader->Close();
|
||||
AssertInfo(
|
||||
close_status.ok(),
|
||||
"[StorageV2] Failed to close row group reader "
|
||||
"for file " +
|
||||
file + " with error " +
|
||||
close_status.ToString());
|
||||
return ret;
|
||||
}));
|
||||
}
|
||||
|
||||
for (auto& future : futures) {
|
||||
|
||||
@ -108,16 +108,21 @@ NewPackedWriterWithStorageConfig(struct ArrowSchema* schema,
|
||||
}
|
||||
|
||||
auto writer_properties = builder.build();
|
||||
auto writer = std::make_unique<milvus_storage::PackedRecordBatchWriter>(
|
||||
trueFs,
|
||||
truePaths,
|
||||
trueSchema,
|
||||
storage_config,
|
||||
columnGroups,
|
||||
buffer_size,
|
||||
writer_properties);
|
||||
AssertInfo(writer, "[StorageV2] writer pointer is null");
|
||||
*c_packed_writer = writer.release();
|
||||
auto result =
|
||||
milvus_storage::PackedRecordBatchWriter::Make(trueFs,
|
||||
truePaths,
|
||||
trueSchema,
|
||||
storage_config,
|
||||
columnGroups,
|
||||
buffer_size,
|
||||
writer_properties);
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create packed writer: " +
|
||||
result.status().ToString());
|
||||
auto writer = result.ValueOrDie();
|
||||
*c_packed_writer =
|
||||
new std::shared_ptr<milvus_storage::PackedRecordBatchWriter>(
|
||||
std::move(writer));
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
@ -177,16 +182,21 @@ NewPackedWriter(struct ArrowSchema* schema,
|
||||
}
|
||||
|
||||
auto writer_properties = builder.build();
|
||||
auto writer = std::make_unique<milvus_storage::PackedRecordBatchWriter>(
|
||||
trueFs,
|
||||
truePaths,
|
||||
trueSchema,
|
||||
conf,
|
||||
columnGroups,
|
||||
buffer_size,
|
||||
writer_properties);
|
||||
AssertInfo(writer, "[StorageV2] writer pointer is null");
|
||||
*c_packed_writer = writer.release();
|
||||
auto result =
|
||||
milvus_storage::PackedRecordBatchWriter::Make(trueFs,
|
||||
truePaths,
|
||||
trueSchema,
|
||||
conf,
|
||||
columnGroups,
|
||||
buffer_size,
|
||||
writer_properties);
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create packed writer: " +
|
||||
result.status().ToString());
|
||||
auto writer = result.ValueOrDie();
|
||||
*c_packed_writer =
|
||||
new std::shared_ptr<milvus_storage::PackedRecordBatchWriter>(
|
||||
std::move(writer));
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
@ -201,9 +211,9 @@ WriteRecordBatch(CPackedWriter c_packed_writer,
|
||||
SCOPE_CGO_CALL_METRIC();
|
||||
|
||||
try {
|
||||
auto packed_writer =
|
||||
static_cast<milvus_storage::PackedRecordBatchWriter*>(
|
||||
c_packed_writer);
|
||||
auto packed_writer = *static_cast<
|
||||
std::shared_ptr<milvus_storage::PackedRecordBatchWriter>*>(
|
||||
c_packed_writer);
|
||||
|
||||
auto import_schema = arrow::ImportSchema(schema);
|
||||
if (!import_schema.ok()) {
|
||||
@ -248,10 +258,10 @@ CloseWriter(CPackedWriter c_packed_writer) {
|
||||
SCOPE_CGO_CALL_METRIC();
|
||||
|
||||
try {
|
||||
auto packed_writer =
|
||||
static_cast<milvus_storage::PackedRecordBatchWriter*>(
|
||||
c_packed_writer);
|
||||
auto status = packed_writer->Close();
|
||||
auto packed_writer = static_cast<
|
||||
std::shared_ptr<milvus_storage::PackedRecordBatchWriter>*>(
|
||||
c_packed_writer);
|
||||
auto status = (*packed_writer)->Close();
|
||||
delete packed_writer;
|
||||
if (!status.ok()) {
|
||||
return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed,
|
||||
|
||||
@ -109,11 +109,15 @@ GroupChunkTranslator::GroupChunkTranslator(
|
||||
parquet_file_metadata_.reserve(insert_files_.size());
|
||||
row_group_meta_list_.reserve(insert_files_.size());
|
||||
for (const auto& file : insert_files_) {
|
||||
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(
|
||||
fs,
|
||||
file,
|
||||
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
|
||||
storage::GetReaderProperties());
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto reader = result.ValueOrDie();
|
||||
parquet_file_metadata_.push_back(
|
||||
reader->file_metadata()->GetParquetMetadata());
|
||||
|
||||
|
||||
@ -59,12 +59,16 @@ class GroupChunkTranslatorTest : public ::testing::TestWithParam<bool> {
|
||||
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}};
|
||||
auto writer_memory = 16 * 1024 * 1024;
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
milvus_storage::PackedRecordBatchWriter writer(fs_,
|
||||
paths_,
|
||||
arrow_schema_,
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory);
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs_,
|
||||
paths_,
|
||||
arrow_schema_,
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
int64_t total_rows = 0;
|
||||
for (int64_t i = 0; i < n_batch; i++) {
|
||||
auto dataset = DataGen(schema_, per_batch);
|
||||
@ -72,9 +76,9 @@ class GroupChunkTranslatorTest : public ::testing::TestWithParam<bool> {
|
||||
ConvertToArrowRecordBatch(dataset, dim, arrow_schema_);
|
||||
total_rows += record_batch->num_rows();
|
||||
|
||||
EXPECT_TRUE(writer.Write(record_batch).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
}
|
||||
|
||||
protected:
|
||||
@ -116,8 +120,12 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
|
||||
milvus::proto::common::LoadPriority::LOW);
|
||||
|
||||
// num cells - get the expected number from the file directly
|
||||
auto fr =
|
||||
std::make_shared<milvus_storage::FileRowGroupReader>(fs_, paths_[0]);
|
||||
auto reader_result =
|
||||
milvus_storage::FileRowGroupReader::Make(fs_, paths_[0]);
|
||||
AssertInfo(reader_result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
reader_result.status().ToString());
|
||||
auto fr = reader_result.ValueOrDie();
|
||||
auto expected_num_cells =
|
||||
fr->file_metadata()->GetRowGroupMetadataVector().size();
|
||||
auto row_group_metadata_vector =
|
||||
@ -214,25 +222,33 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
|
||||
auto writer_memory = 16 * 1024 * 1024;
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
std::vector<std::string> single_file_paths{file_path};
|
||||
milvus_storage::PackedRecordBatchWriter writer(fs_,
|
||||
single_file_paths,
|
||||
arrow_schema_,
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory);
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs_,
|
||||
single_file_paths,
|
||||
arrow_schema_,
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
|
||||
for (int64_t i = 0; i < n_batch; i++) {
|
||||
auto dataset = DataGen(schema_, per_batch);
|
||||
auto record_batch =
|
||||
ConvertToArrowRecordBatch(dataset, dim, arrow_schema_);
|
||||
total_rows += record_batch->num_rows();
|
||||
EXPECT_TRUE(writer.Write(record_batch).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
|
||||
// Get the number of row groups in this file
|
||||
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
fs_, file_path);
|
||||
auto reader_result =
|
||||
milvus_storage::FileRowGroupReader::Make(fs_, file_path);
|
||||
AssertInfo(reader_result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
reader_result.status().ToString());
|
||||
auto fr = reader_result.ValueOrDie();
|
||||
expected_row_groups_per_file.push_back(
|
||||
fr->file_metadata()->GetRowGroupMetadataVector().size());
|
||||
auto status = fr->Close();
|
||||
@ -303,8 +319,12 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
|
||||
auto usage = translator->estimated_byte_size_of_cell(i).first;
|
||||
|
||||
// Get the expected memory size from the corresponding file
|
||||
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
auto reader_result = milvus_storage::FileRowGroupReader::Make(
|
||||
fs_, multi_file_paths[file_idx]);
|
||||
AssertInfo(reader_result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
reader_result.status().ToString());
|
||||
auto fr = reader_result.ValueOrDie();
|
||||
auto row_group_metadata_vector =
|
||||
fr->file_metadata()->GetRowGroupMetadataVector();
|
||||
auto expected_size = static_cast<int64_t>(
|
||||
|
||||
@ -1307,11 +1307,15 @@ GetFieldDatasFromStorageV2(std::vector<std::vector<std::string>>& remote_files,
|
||||
for (auto& column_group_file : remote_chunk_files) {
|
||||
// get all row groups for each file
|
||||
std::vector<std::vector<int64_t>> row_group_lists;
|
||||
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(
|
||||
fs,
|
||||
column_group_file,
|
||||
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
|
||||
GetReaderProperties());
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto reader = result.ValueOrDie();
|
||||
|
||||
auto row_group_num =
|
||||
reader->file_metadata()->GetRowGroupMetadataVector().size();
|
||||
@ -1515,12 +1519,16 @@ GetFieldIDList(FieldId column_group_id,
|
||||
field_id_list.Add(column_group_id.get());
|
||||
return field_id_list;
|
||||
}
|
||||
auto file_reader = std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(
|
||||
fs,
|
||||
filepath,
|
||||
arrow_schema,
|
||||
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
|
||||
GetReaderProperties());
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto file_reader = result.ValueOrDie();
|
||||
field_id_list =
|
||||
file_reader->file_metadata()->GetGroupFieldIDList().GetFieldIDList(
|
||||
column_group_id.get());
|
||||
|
||||
@ -22,14 +22,14 @@
|
||||
#include "monitor/scope_metric.h"
|
||||
|
||||
ReaderHandle
|
||||
createFFIReader(char* manifest,
|
||||
createFFIReader(ColumnGroupsHandle column_groups_handle,
|
||||
struct ArrowSchema* schema,
|
||||
char** needed_columns,
|
||||
int64_t needed_columns_size,
|
||||
const std::shared_ptr<Properties>& properties) {
|
||||
ReaderHandle reader_handler = 0;
|
||||
|
||||
FFIResult result = reader_new(manifest,
|
||||
FFIResult result = reader_new(column_groups_handle,
|
||||
schema,
|
||||
needed_columns,
|
||||
needed_columns_size,
|
||||
@ -97,7 +97,7 @@ NewPackedFFIReader(const char* manifest_path,
|
||||
}
|
||||
|
||||
CStatus
|
||||
NewPackedFFIReaderWithManifest(const char* manifest_content,
|
||||
NewPackedFFIReaderWithManifest(const ColumnGroupsHandle column_groups_handle,
|
||||
struct ArrowSchema* schema,
|
||||
char** needed_columns,
|
||||
int64_t needed_columns_size,
|
||||
@ -109,12 +109,10 @@ NewPackedFFIReaderWithManifest(const char* manifest_content,
|
||||
try {
|
||||
auto properties =
|
||||
MakeInternalPropertiesFromStorageConfig(c_storage_config);
|
||||
// Parse the column groups, the column groups is a JSON string
|
||||
auto cpp_column_groups =
|
||||
std::make_shared<milvus_storage::api::ColumnGroups>();
|
||||
auto des_result =
|
||||
cpp_column_groups->deserialize(std::string_view(manifest_content));
|
||||
AssertInfo(des_result.ok(), "failed to deserialize column groups");
|
||||
auto* cg_ptr = reinterpret_cast<
|
||||
std::shared_ptr<milvus_storage::api::ColumnGroups>*>(
|
||||
column_groups_handle);
|
||||
auto cpp_column_groups = *cg_ptr;
|
||||
|
||||
auto reader = GetLoonReader(cpp_column_groups,
|
||||
schema,
|
||||
|
||||
@ -105,7 +105,7 @@ NewPackedFFIReader(const char* manifest_path,
|
||||
* be freed after this call returns.
|
||||
*/
|
||||
CStatus
|
||||
NewPackedFFIReaderWithManifest(const char* manifest_content,
|
||||
NewPackedFFIReaderWithManifest(const ColumnGroupsHandle column_groups_handle,
|
||||
struct ArrowSchema* schema,
|
||||
char** needed_columns,
|
||||
int64_t needed_columns_size,
|
||||
|
||||
@ -257,12 +257,10 @@ GetManifest(const std::string& path,
|
||||
// Parse the JSON string
|
||||
json j = json::parse(path);
|
||||
|
||||
// Extract base_path and ver fields
|
||||
// Extract base_path
|
||||
std::string base_path = j.at("base_path").get<std::string>();
|
||||
int64_t ver = j.at("ver").get<int64_t>();
|
||||
|
||||
// return std::make_pair(base_path, ver);
|
||||
char* out_column_groups = nullptr;
|
||||
ColumnGroupsHandle out_column_groups = 0;
|
||||
int64_t out_read_version = 0;
|
||||
FFIResult result = get_latest_column_groups(base_path.c_str(),
|
||||
properties.get(),
|
||||
@ -298,9 +296,8 @@ GetColumnGroups(
|
||||
// Parse the JSON string
|
||||
json j = json::parse(path);
|
||||
|
||||
// Extract base_path and ver fields
|
||||
// Extract base_path
|
||||
std::string base_path = j.at("base_path").get<std::string>();
|
||||
int64_t ver = j.at("ver").get<int64_t>();
|
||||
|
||||
// TODO fetch manifest based on version after api supported
|
||||
auto transaction =
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# Update milvus-storage_VERSION for the first occurrence
|
||||
milvus_add_pkg_config("milvus-storage")
|
||||
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
|
||||
set( milvus-storage_VERSION ba7df7b)
|
||||
set( milvus-storage_VERSION 5fff4f5)
|
||||
set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git")
|
||||
message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}")
|
||||
message(STATUS "milvus-storage version: ${milvus-storage_VERSION}")
|
||||
|
||||
@ -75,8 +75,16 @@ TEST_F(StorageV2IndexRawDataTest, TestGetRawData) {
|
||||
auto writer_memory = 16 * 1024 * 1024;
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
auto arrow_schema = schema->ConvertToArrowSchema();
|
||||
milvus_storage::PackedRecordBatchWriter writer(
|
||||
fs_, paths, arrow_schema, storage_config, column_groups, writer_memory);
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs_,
|
||||
paths,
|
||||
arrow_schema,
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
int64_t total_rows = 0;
|
||||
for (int64_t i = 0; i < n_batch; i++) {
|
||||
auto dataset = DataGen(schema, per_batch);
|
||||
@ -84,9 +92,9 @@ TEST_F(StorageV2IndexRawDataTest, TestGetRawData) {
|
||||
ConvertToArrowRecordBatch(dataset, dim, arrow_schema);
|
||||
total_rows += record_batch->num_rows();
|
||||
|
||||
EXPECT_TRUE(writer.Write(record_batch).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
|
||||
{
|
||||
// test memory file manager
|
||||
|
||||
@ -175,13 +175,14 @@ func (i *indexInspector) createIndexForSegment(ctx context.Context, segment *Seg
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
taskSlot := calculateIndexTaskSlot(segment.getSegmentSize())
|
||||
|
||||
indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, indexID)
|
||||
indexType := GetIndexType(indexParams)
|
||||
isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType)
|
||||
taskSlot := calculateIndexTaskSlot(segment.getSegmentSize(), isVectorIndex)
|
||||
|
||||
// rewrite the index type if needed, and this final index type will be persisted in the meta
|
||||
if vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType) && Params.KnowhereConfig.Enable.GetAsBool() {
|
||||
if isVectorIndex && Params.KnowhereConfig.Enable.GetAsBool() {
|
||||
var err error
|
||||
indexParams, err = Params.KnowhereConfig.UpdateIndexParams(indexType, paramtable.BuildStage, indexParams)
|
||||
if err != nil {
|
||||
@ -227,9 +228,14 @@ func (i *indexInspector) reloadFromMeta() {
|
||||
continue
|
||||
}
|
||||
|
||||
indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, segIndex.IndexID)
|
||||
indexType := GetIndexType(indexParams)
|
||||
isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType)
|
||||
taskSlot := calculateIndexTaskSlot(segment.getSegmentSize(), isVectorIndex)
|
||||
|
||||
i.scheduler.Enqueue(newIndexBuildTask(
|
||||
model.CloneSegmentIndex(segIndex),
|
||||
calculateIndexTaskSlot(segment.getSegmentSize()),
|
||||
taskSlot,
|
||||
i.meta,
|
||||
i.handler,
|
||||
i.storageCli,
|
||||
|
||||
@ -369,8 +369,11 @@ func getSortStatus(sorted bool) string {
|
||||
return "unsorted"
|
||||
}
|
||||
|
||||
func calculateIndexTaskSlot(segmentSize int64) int64 {
|
||||
func calculateIndexTaskSlot(segmentSize int64, isVectorIndex bool) int64 {
|
||||
defaultSlots := Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64()
|
||||
if !isVectorIndex {
|
||||
defaultSlots = Params.DataCoordCfg.ScalarIndexTaskSlotUsage.GetAsInt64()
|
||||
}
|
||||
if segmentSize > 512*1024*1024 {
|
||||
taskSlot := max(segmentSize/512/1024/1024, 1) * defaultSlots
|
||||
return max(taskSlot, 1)
|
||||
|
||||
@ -36,6 +36,7 @@ const (
|
||||
|
||||
type Highlighter interface {
|
||||
AsSearchPipelineOperator(t *searchTask) (operator, error)
|
||||
FieldIDs() []int64
|
||||
}
|
||||
|
||||
// highlight task for one field
|
||||
@ -114,18 +115,26 @@ func (h *LexicalHighlighter) addTaskWithQuery(fieldID int64, query *highlightQue
|
||||
})
|
||||
}
|
||||
|
||||
func (h *LexicalHighlighter) AsSearchPipelineOperator(t *searchTask) (operator, error) {
|
||||
func (h *LexicalHighlighter) initHighlightQueries(t *searchTask) error {
|
||||
// add query to highlight tasks
|
||||
for _, query := range h.queries {
|
||||
fieldID, ok := t.schema.MapFieldID(query.fieldName)
|
||||
if !ok {
|
||||
return nil, merr.WrapErrParameterInvalidMsg("highlight field not found in schema: %s", query.fieldName)
|
||||
return merr.WrapErrParameterInvalidMsg("highlight field not found in schema: %s", query.fieldName)
|
||||
}
|
||||
h.addTaskWithQuery(fieldID, query)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *LexicalHighlighter) AsSearchPipelineOperator(t *searchTask) (operator, error) {
|
||||
return newLexicalHighlightOperator(t, lo.Values(h.tasks))
|
||||
}
|
||||
|
||||
func (h *LexicalHighlighter) FieldIDs() []int64 {
|
||||
return lo.Keys(h.tasks)
|
||||
}
|
||||
|
||||
func NewLexicalHighlighter(highlighter *commonpb.Highlighter) (*LexicalHighlighter, error) {
|
||||
params := funcutil.KeyValuePair2Map(highlighter.GetParams())
|
||||
h := &LexicalHighlighter{
|
||||
|
||||
@ -602,7 +602,12 @@ func (t *searchTask) createLexicalHighlighter(highlighter *commonpb.Highlighter,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return h.addTaskWithSearchText(fieldId, fieldName, analyzerName, texts)
|
||||
err = h.addTaskWithSearchText(fieldId, fieldName, analyzerName, texts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return h.initHighlightQueries(t)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -642,10 +647,24 @@ func (t *searchTask) initSearchRequest(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
analyzer, err := funcutil.GetAttrByKeyFromRepeatedKV(AnalyzerKey, t.request.GetSearchParams())
|
||||
if err == nil {
|
||||
t.SearchRequest.AnalyzerName = analyzer
|
||||
}
|
||||
|
||||
t.isIterator = isIterator
|
||||
t.SearchRequest.Offset = offset
|
||||
t.SearchRequest.FieldId = queryInfo.GetQueryFieldId()
|
||||
|
||||
if err := t.addHighlightTask(t.request.GetHighlighter(), queryInfo.GetMetricType(), queryInfo.GetQueryFieldId(), t.request.GetPlaceholderGroup(), t.SearchRequest.GetAnalyzerName()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// add highlight field ids to output fields id
|
||||
if t.highlighter != nil {
|
||||
t.SearchRequest.OutputFieldsId = append(t.SearchRequest.OutputFieldsId, t.highlighter.FieldIDs()...)
|
||||
}
|
||||
|
||||
if t.partitionKeyMode {
|
||||
// isolation has tighter constraint, check first
|
||||
mvErr := setQueryInfoIfMvEnable(queryInfo, t, plan)
|
||||
@ -696,16 +715,6 @@ func (t *searchTask) initSearchRequest(ctx context.Context) error {
|
||||
t.SearchRequest.GroupByFieldId = queryInfo.GroupByFieldId
|
||||
t.SearchRequest.GroupSize = queryInfo.GroupSize
|
||||
|
||||
if t.SearchRequest.MetricType == metric.BM25 {
|
||||
analyzer, err := funcutil.GetAttrByKeyFromRepeatedKV(AnalyzerKey, t.request.GetSearchParams())
|
||||
if err == nil {
|
||||
t.SearchRequest.AnalyzerName = analyzer
|
||||
}
|
||||
}
|
||||
if err := t.addHighlightTask(t.request.GetHighlighter(), t.SearchRequest.MetricType, t.SearchRequest.FieldId, t.request.GetPlaceholderGroup(), t.SearchRequest.GetAnalyzerName()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if embedding.HasNonBM25Functions(t.schema.CollectionSchema.Functions, []int64{queryInfo.GetQueryFieldId()}) {
|
||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Search-call-function-udf")
|
||||
defer sp.End()
|
||||
|
||||
@ -1078,18 +1078,13 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
|
||||
log.Warn("fail to get primary field schema", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
deduplicatedFieldsData, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, it.req.GetFieldsData(), schema)
|
||||
duplicate, err := CheckDuplicatePkExist(primaryFieldSchema, it.req.GetFieldsData())
|
||||
if err != nil {
|
||||
log.Warn("fail to deduplicate upsert data", zap.Error(err))
|
||||
log.Warn("fail to check duplicate primary keys", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// dedup won't decrease numOfRows to 0
|
||||
if newNumRows > 0 && newNumRows != it.req.NumRows {
|
||||
log.Info("upsert data deduplicated",
|
||||
zap.Uint32("original_num_rows", it.req.NumRows),
|
||||
zap.Uint32("deduplicated_num_rows", newNumRows))
|
||||
it.req.FieldsData = deduplicatedFieldsData
|
||||
it.req.NumRows = newNumRows
|
||||
if duplicate {
|
||||
return merr.WrapErrParameterInvalidMsg("duplicate primary keys are not allowed in the same batch")
|
||||
}
|
||||
|
||||
it.upsertMsg = &msgstream.UpsertMsg{
|
||||
|
||||
@ -35,7 +35,6 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proxy/shardclient"
|
||||
"github.com/milvus-io/milvus/internal/util/function/embedding"
|
||||
"github.com/milvus-io/milvus/internal/util/segcore"
|
||||
"github.com/milvus-io/milvus/pkg/v2/common"
|
||||
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/planpb"
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
|
||||
@ -1467,402 +1466,137 @@ func TestGenNullableFieldData_GeometryAndTimestamptz(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestUpsertTask_PlanNamespace_AfterPreExecute(t *testing.T) {
|
||||
mockey.PatchConvey("TestUpsertTask_PlanNamespace_AfterPreExecute", t, func() {
|
||||
// Setup global meta cache and common mocks
|
||||
globalMetaCache = &MetaCache{}
|
||||
mockey.Mock(GetReplicateID).Return("", nil).Build()
|
||||
mockey.Mock((*MetaCache).GetCollectionID).Return(int64(1001), nil).Build()
|
||||
mockey.Mock((*MetaCache).GetCollectionInfo).Return(&collectionInfo{updateTimestamp: 12345}, nil).Build()
|
||||
mockey.Mock((*MetaCache).GetPartitionInfo).Return(&partitionInfo{name: "_default"}, nil).Build()
|
||||
mockey.Mock((*MetaCache).GetPartitionID).Return(int64(1002), nil).Build()
|
||||
mockey.Mock(isPartitionKeyMode).Return(false, nil).Build()
|
||||
mockey.Mock(validatePartitionTag).Return(nil).Build()
|
||||
|
||||
// Schema with namespace enabled
|
||||
mockey.Mock((*MetaCache).GetCollectionSchema).To(func(_ *MetaCache, _ context.Context, _ string, _ string) (*schemaInfo, error) {
|
||||
info := createTestSchema()
|
||||
info.CollectionSchema.Properties = append(info.CollectionSchema.Properties, &commonpb.KeyValuePair{Key: common.NamespaceEnabledKey, Value: "true"})
|
||||
return info, nil
|
||||
}).Build()
|
||||
|
||||
// Capture plan to verify namespace
|
||||
var capturedPlan *planpb.PlanNode
|
||||
mockey.Mock(planparserv2.CreateRequeryPlan).To(func(_ *schemapb.FieldSchema, _ *schemapb.IDs) *planpb.PlanNode {
|
||||
capturedPlan = &planpb.PlanNode{}
|
||||
return capturedPlan
|
||||
}).Build()
|
||||
|
||||
// Mock query to return a valid result for queryPreExecute merge path
|
||||
mockey.Mock((*Proxy).query).Return(&milvuspb.QueryResults{
|
||||
Status: merr.Success(),
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "id",
|
||||
FieldId: 100,
|
||||
Type: schemapb.DataType_Int64,
|
||||
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: []int64{1, 2}}}}},
|
||||
},
|
||||
{
|
||||
FieldName: "name",
|
||||
FieldId: 102,
|
||||
Type: schemapb.DataType_VarChar,
|
||||
Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_StringData{StringData: &schemapb.StringArray{Data: []string{"old1", "old2"}}}}},
|
||||
},
|
||||
{
|
||||
FieldName: "vector",
|
||||
FieldId: 101,
|
||||
Type: schemapb.DataType_FloatVector,
|
||||
Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Dim: 128, Data: &schemapb.VectorField_FloatVector{FloatVector: &schemapb.FloatArray{Data: make([]float32, 256)}}}},
|
||||
},
|
||||
},
|
||||
}, segcore.StorageCost{}, nil).Build()
|
||||
|
||||
// Build task
|
||||
task := createTestUpdateTask()
|
||||
ns := "ns-1"
|
||||
task.req.PartialUpdate = true
|
||||
task.req.Namespace = &ns
|
||||
|
||||
// Skip insert/delete heavy logic
|
||||
mockey.Mock((*upsertTask).insertPreExecute).Return(nil).Build()
|
||||
mockey.Mock((*upsertTask).deletePreExecute).Return(nil).Build()
|
||||
|
||||
err := task.PreExecute(context.Background())
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, capturedPlan)
|
||||
assert.NotNil(t, capturedPlan.Namespace)
|
||||
assert.Equal(t, *task.req.Namespace, *capturedPlan.Namespace)
|
||||
})
|
||||
}
|
||||
|
||||
func TestUpsertTask_Deduplicate_Int64PK(t *testing.T) {
|
||||
// Test deduplication with Int64 primary key
|
||||
primaryFieldSchema := &schemapb.FieldSchema{
|
||||
Name: "id",
|
||||
FieldID: 100,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
}
|
||||
|
||||
collSchema := &schemapb.CollectionSchema{
|
||||
func TestUpsertTask_DuplicatePK_Int64(t *testing.T) {
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Name: "test_duplicate_pk",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
primaryFieldSchema,
|
||||
{
|
||||
Name: "float_field",
|
||||
FieldID: 101,
|
||||
DataType: schemapb.DataType_Float,
|
||||
},
|
||||
{FieldID: 100, Name: "id", IsPrimaryKey: true, DataType: schemapb.DataType_Int64},
|
||||
{FieldID: 101, Name: "value", DataType: schemapb.DataType_Int32},
|
||||
},
|
||||
}
|
||||
schema := newSchemaInfo(collSchema)
|
||||
|
||||
// Create field data with duplicate IDs: [1, 2, 3, 2, 1]
|
||||
// Expected to keep last occurrence of each: [3, 2, 1] (indices 2, 3, 4)
|
||||
// Data with duplicate primary keys: 1, 2, 1 (duplicate)
|
||||
fieldsData := []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "id",
|
||||
FieldId: 100,
|
||||
Type: schemapb.DataType_Int64,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_LongData{
|
||||
LongData: &schemapb.LongArray{
|
||||
Data: []int64{1, 2, 3, 2, 1},
|
||||
},
|
||||
LongData: &schemapb.LongArray{Data: []int64{1, 2, 1}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldName: "float_field",
|
||||
Type: schemapb.DataType_Float,
|
||||
FieldName: "value",
|
||||
FieldId: 101,
|
||||
Type: schemapb.DataType_Int32,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_FloatData{
|
||||
FloatData: &schemapb.FloatArray{
|
||||
Data: []float32{1.1, 2.2, 3.3, 2.4, 1.5},
|
||||
},
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{Data: []int32{100, 200, 300}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
deduplicatedFields, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema)
|
||||
// Test CheckDuplicatePkExist directly
|
||||
primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(schema)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, uint32(3), newNumRows)
|
||||
assert.Equal(t, 2, len(deduplicatedFields))
|
||||
|
||||
// Check deduplicated primary keys
|
||||
pkField := deduplicatedFields[0]
|
||||
pkData := pkField.GetScalars().GetLongData().GetData()
|
||||
assert.Equal(t, 3, len(pkData))
|
||||
assert.Equal(t, []int64{3, 2, 1}, pkData)
|
||||
|
||||
// Check corresponding float values (should be 3.3, 2.4, 1.5)
|
||||
floatField := deduplicatedFields[1]
|
||||
floatData := floatField.GetScalars().GetFloatData().GetData()
|
||||
assert.Equal(t, 3, len(floatData))
|
||||
assert.Equal(t, []float32{3.3, 2.4, 1.5}, floatData)
|
||||
hasDuplicate, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, hasDuplicate, "should detect duplicate primary keys")
|
||||
}
|
||||
|
||||
func TestUpsertTask_Deduplicate_VarCharPK(t *testing.T) {
|
||||
// Test deduplication with VarChar primary key
|
||||
primaryFieldSchema := &schemapb.FieldSchema{
|
||||
Name: "id",
|
||||
FieldID: 100,
|
||||
DataType: schemapb.DataType_VarChar,
|
||||
IsPrimaryKey: true,
|
||||
}
|
||||
|
||||
collSchema := &schemapb.CollectionSchema{
|
||||
func TestUpsertTask_DuplicatePK_VarChar(t *testing.T) {
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Name: "test_duplicate_pk_varchar",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
primaryFieldSchema,
|
||||
{
|
||||
Name: "int_field",
|
||||
FieldID: 101,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{FieldID: 100, Name: "id", IsPrimaryKey: true, DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: "max_length", Value: "100"}}},
|
||||
{FieldID: 101, Name: "value", DataType: schemapb.DataType_Int32},
|
||||
},
|
||||
}
|
||||
schema := newSchemaInfo(collSchema)
|
||||
|
||||
// Create field data with duplicate IDs: ["a", "b", "c", "b", "a"]
|
||||
// Expected to keep last occurrence of each: ["c", "b", "a"] (indices 2, 3, 4)
|
||||
// Data with duplicate primary keys: "a", "b", "a" (duplicate)
|
||||
fieldsData := []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "id",
|
||||
FieldId: 100,
|
||||
Type: schemapb.DataType_VarChar,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_StringData{
|
||||
StringData: &schemapb.StringArray{
|
||||
Data: []string{"a", "b", "c", "b", "a"},
|
||||
},
|
||||
StringData: &schemapb.StringArray{Data: []string{"a", "b", "a"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldName: "int_field",
|
||||
Type: schemapb.DataType_Int64,
|
||||
FieldName: "value",
|
||||
FieldId: 101,
|
||||
Type: schemapb.DataType_Int32,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_LongData{
|
||||
LongData: &schemapb.LongArray{
|
||||
Data: []int64{100, 200, 300, 201, 101},
|
||||
},
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{Data: []int32{100, 200, 300}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
deduplicatedFields, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema)
|
||||
// Test CheckDuplicatePkExist directly
|
||||
primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(schema)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, uint32(3), newNumRows)
|
||||
assert.Equal(t, 2, len(deduplicatedFields))
|
||||
|
||||
// Check deduplicated primary keys
|
||||
pkField := deduplicatedFields[0]
|
||||
pkData := pkField.GetScalars().GetStringData().GetData()
|
||||
assert.Equal(t, 3, len(pkData))
|
||||
assert.Equal(t, []string{"c", "b", "a"}, pkData)
|
||||
|
||||
// Check corresponding int64 values (should be 300, 201, 101)
|
||||
int64Field := deduplicatedFields[1]
|
||||
int64Data := int64Field.GetScalars().GetLongData().GetData()
|
||||
assert.Equal(t, 3, len(int64Data))
|
||||
assert.Equal(t, []int64{300, 201, 101}, int64Data)
|
||||
hasDuplicate, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, hasDuplicate, "should detect duplicate primary keys")
|
||||
}
|
||||
|
||||
func TestUpsertTask_Deduplicate_NoDuplicates(t *testing.T) {
|
||||
// Test with no duplicates - should return original data
|
||||
primaryFieldSchema := &schemapb.FieldSchema{
|
||||
Name: "id",
|
||||
FieldID: 100,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
}
|
||||
|
||||
collSchema := &schemapb.CollectionSchema{
|
||||
func TestUpsertTask_NoDuplicatePK(t *testing.T) {
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Name: "test_no_duplicate_pk",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
primaryFieldSchema,
|
||||
{FieldID: 100, Name: "id", IsPrimaryKey: true, DataType: schemapb.DataType_Int64},
|
||||
{FieldID: 101, Name: "value", DataType: schemapb.DataType_Int32},
|
||||
},
|
||||
}
|
||||
schema := newSchemaInfo(collSchema)
|
||||
|
||||
// Data with unique primary keys: 1, 2, 3
|
||||
fieldsData := []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "id",
|
||||
FieldId: 100,
|
||||
Type: schemapb.DataType_Int64,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_LongData{
|
||||
LongData: &schemapb.LongArray{
|
||||
Data: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
LongData: &schemapb.LongArray{Data: []int64{1, 2, 3}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
deduplicatedFields, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, uint32(5), newNumRows)
|
||||
assert.Equal(t, 1, len(deduplicatedFields))
|
||||
|
||||
// Should be unchanged
|
||||
pkField := deduplicatedFields[0]
|
||||
pkData := pkField.GetScalars().GetLongData().GetData()
|
||||
assert.Equal(t, []int64{1, 2, 3, 4, 5}, pkData)
|
||||
}
|
||||
|
||||
func TestUpsertTask_Deduplicate_WithVector(t *testing.T) {
|
||||
// Test deduplication with vector field
|
||||
primaryFieldSchema := &schemapb.FieldSchema{
|
||||
Name: "id",
|
||||
FieldID: 100,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
}
|
||||
|
||||
collSchema := &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
primaryFieldSchema,
|
||||
{
|
||||
Name: "vector",
|
||||
FieldID: 101,
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
},
|
||||
},
|
||||
}
|
||||
schema := newSchemaInfo(collSchema)
|
||||
|
||||
dim := 4
|
||||
// Create field data with duplicate IDs: [1, 2, 1]
|
||||
// Expected to keep indices [1, 2] (last occurrence of 2, last occurrence of 1)
|
||||
fieldsData := []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "id",
|
||||
Type: schemapb.DataType_Int64,
|
||||
FieldName: "value",
|
||||
FieldId: 101,
|
||||
Type: schemapb.DataType_Int32,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_LongData{
|
||||
LongData: &schemapb.LongArray{
|
||||
Data: []int64{1, 2, 1},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldName: "vector",
|
||||
Type: schemapb.DataType_FloatVector,
|
||||
Field: &schemapb.FieldData_Vectors{
|
||||
Vectors: &schemapb.VectorField{
|
||||
Dim: int64(dim),
|
||||
Data: &schemapb.VectorField_FloatVector{
|
||||
FloatVector: &schemapb.FloatArray{
|
||||
Data: []float32{
|
||||
1.0, 1.1, 1.2, 1.3, // vector for ID 1 (first occurrence)
|
||||
2.0, 2.1, 2.2, 2.3, // vector for ID 2
|
||||
1.4, 1.5, 1.6, 1.7, // vector for ID 1 (second occurrence - keep this)
|
||||
},
|
||||
},
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{Data: []int32{100, 200, 300}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
deduplicatedFields, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema)
|
||||
// Call CheckDuplicatePkExist directly to verify no duplicate error
|
||||
primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(schema)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, uint32(2), newNumRows)
|
||||
assert.Equal(t, 2, len(deduplicatedFields))
|
||||
|
||||
// Check deduplicated primary keys
|
||||
pkField := deduplicatedFields[0]
|
||||
pkData := pkField.GetScalars().GetLongData().GetData()
|
||||
assert.Equal(t, 2, len(pkData))
|
||||
assert.Equal(t, []int64{2, 1}, pkData)
|
||||
|
||||
// Check corresponding vector (should keep vectors for ID 2 and ID 1's last occurrence)
|
||||
vectorField := deduplicatedFields[1]
|
||||
vectorData := vectorField.GetVectors().GetFloatVector().GetData()
|
||||
assert.Equal(t, 8, len(vectorData)) // 2 vectors * 4 dimensions
|
||||
expectedVector := []float32{
|
||||
2.0, 2.1, 2.2, 2.3, // vector for ID 2
|
||||
1.4, 1.5, 1.6, 1.7, // vector for ID 1 (last occurrence)
|
||||
}
|
||||
assert.Equal(t, expectedVector, vectorData)
|
||||
}
|
||||
|
||||
func TestUpsertTask_Deduplicate_EmptyData(t *testing.T) {
|
||||
// Test with empty data
|
||||
primaryFieldSchema := &schemapb.FieldSchema{
|
||||
Name: "id",
|
||||
FieldID: 100,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
}
|
||||
|
||||
collSchema := &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
primaryFieldSchema,
|
||||
},
|
||||
}
|
||||
schema := newSchemaInfo(collSchema)
|
||||
|
||||
fieldsData := []*schemapb.FieldData{}
|
||||
|
||||
deduplicatedFields, newNumRows, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema)
|
||||
hasDuplicate, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, uint32(0), newNumRows)
|
||||
assert.Equal(t, 0, len(deduplicatedFields))
|
||||
}
|
||||
|
||||
func TestUpsertTask_Deduplicate_MissingPrimaryKey(t *testing.T) {
|
||||
// Test with missing primary key field
|
||||
primaryFieldSchema := &schemapb.FieldSchema{
|
||||
Name: "id",
|
||||
FieldID: 100,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
}
|
||||
|
||||
collSchema := &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
primaryFieldSchema,
|
||||
{
|
||||
Name: "other_field",
|
||||
FieldID: 101,
|
||||
DataType: schemapb.DataType_Float,
|
||||
},
|
||||
},
|
||||
}
|
||||
schema := newSchemaInfo(collSchema)
|
||||
|
||||
fieldsData := []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "other_field",
|
||||
Type: schemapb.DataType_Float,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_FloatData{
|
||||
FloatData: &schemapb.FloatArray{
|
||||
Data: []float32{1.1, 2.2},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, _, err := DeduplicateFieldData(primaryFieldSchema, fieldsData, schema)
|
||||
assert.Error(t, err)
|
||||
// validateFieldDataColumns will fail first due to column count mismatch
|
||||
// or the function will fail when trying to find primary key
|
||||
assert.True(t, err != nil)
|
||||
assert.False(t, hasDuplicate, "should not have duplicate primary keys")
|
||||
}
|
||||
|
||||
@ -1049,31 +1049,25 @@ func parsePrimaryFieldData2IDs(fieldData *schemapb.FieldData) (*schemapb.IDs, er
|
||||
return primaryData, nil
|
||||
}
|
||||
|
||||
// findLastOccurrenceIndices finds indices of last occurrences for each unique ID
|
||||
func findLastOccurrenceIndices[T comparable](ids []T) []int {
|
||||
lastOccurrence := make(map[T]int, len(ids))
|
||||
for idx, id := range ids {
|
||||
lastOccurrence[id] = idx
|
||||
}
|
||||
|
||||
keepIndices := make([]int, 0, len(lastOccurrence))
|
||||
for idx, id := range ids {
|
||||
if lastOccurrence[id] == idx {
|
||||
keepIndices = append(keepIndices, idx)
|
||||
// hasDuplicates checks if there are any duplicate values in the slice.
|
||||
// Returns true immediately when the first duplicate is found (early exit).
|
||||
func hasDuplicates[T comparable](ids []T) bool {
|
||||
seen := make(map[T]struct{}, len(ids))
|
||||
for _, id := range ids {
|
||||
if _, exists := seen[id]; exists {
|
||||
return true
|
||||
}
|
||||
seen[id] = struct{}{}
|
||||
}
|
||||
return keepIndices
|
||||
return false
|
||||
}
|
||||
|
||||
// DeduplicateFieldData removes duplicate primary keys from field data,
|
||||
// keeping the last occurrence of each ID
|
||||
func DeduplicateFieldData(primaryFieldSchema *schemapb.FieldSchema, fieldsData []*schemapb.FieldData, schema *schemaInfo) ([]*schemapb.FieldData, uint32, error) {
|
||||
// CheckDuplicatePkExist checks if there are duplicate primary keys in the field data.
|
||||
// Returns (true, nil) if duplicates exist, (false, nil) if no duplicates.
|
||||
// Returns (false, error) if there's an error during checking.
|
||||
func CheckDuplicatePkExist(primaryFieldSchema *schemapb.FieldSchema, fieldsData []*schemapb.FieldData) (bool, error) {
|
||||
if len(fieldsData) == 0 {
|
||||
return fieldsData, 0, nil
|
||||
}
|
||||
|
||||
if err := fillFieldPropertiesOnly(fieldsData, schema); err != nil {
|
||||
return nil, 0, err
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// find primary field data
|
||||
@ -1086,64 +1080,26 @@ func DeduplicateFieldData(primaryFieldSchema *schemapb.FieldSchema, fieldsData [
|
||||
}
|
||||
|
||||
if primaryFieldData == nil {
|
||||
return nil, 0, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("must assign pk when upsert, primary field: %v", primaryFieldSchema.GetName()))
|
||||
return false, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("must assign pk when upsert, primary field: %v", primaryFieldSchema.GetName()))
|
||||
}
|
||||
|
||||
// get row count
|
||||
var numRows int
|
||||
// check for duplicates based on primary key type
|
||||
switch primaryFieldData.Field.(type) {
|
||||
case *schemapb.FieldData_Scalars:
|
||||
scalarField := primaryFieldData.GetScalars()
|
||||
switch scalarField.Data.(type) {
|
||||
case *schemapb.ScalarField_LongData:
|
||||
numRows = len(scalarField.GetLongData().GetData())
|
||||
intIDs := scalarField.GetLongData().GetData()
|
||||
return hasDuplicates(intIDs), nil
|
||||
case *schemapb.ScalarField_StringData:
|
||||
numRows = len(scalarField.GetStringData().GetData())
|
||||
strIDs := scalarField.GetStringData().GetData()
|
||||
return hasDuplicates(strIDs), nil
|
||||
default:
|
||||
return nil, 0, merr.WrapErrParameterInvalidMsg("unsupported primary key type")
|
||||
return false, merr.WrapErrParameterInvalidMsg("unsupported primary key type")
|
||||
}
|
||||
default:
|
||||
return nil, 0, merr.WrapErrParameterInvalidMsg("primary field must be scalar type")
|
||||
return false, merr.WrapErrParameterInvalidMsg("primary field must be scalar type")
|
||||
}
|
||||
|
||||
if numRows == 0 {
|
||||
return fieldsData, 0, nil
|
||||
}
|
||||
|
||||
// build map to track last occurrence of each primary key
|
||||
var keepIndices []int
|
||||
switch primaryFieldData.Field.(type) {
|
||||
case *schemapb.FieldData_Scalars:
|
||||
scalarField := primaryFieldData.GetScalars()
|
||||
switch scalarField.Data.(type) {
|
||||
case *schemapb.ScalarField_LongData:
|
||||
// for Int64 primary keys
|
||||
intIDs := scalarField.GetLongData().GetData()
|
||||
keepIndices = findLastOccurrenceIndices(intIDs)
|
||||
|
||||
case *schemapb.ScalarField_StringData:
|
||||
// for VarChar primary keys
|
||||
strIDs := scalarField.GetStringData().GetData()
|
||||
keepIndices = findLastOccurrenceIndices(strIDs)
|
||||
}
|
||||
}
|
||||
|
||||
// if no duplicates found, return original data
|
||||
if len(keepIndices) == numRows {
|
||||
return fieldsData, uint32(numRows), nil
|
||||
}
|
||||
|
||||
log.Info("duplicate primary keys detected in upsert request, deduplicating",
|
||||
zap.Int("original_rows", numRows),
|
||||
zap.Int("deduplicated_rows", len(keepIndices)))
|
||||
|
||||
// use typeutil.AppendFieldData to rebuild field data with deduplicated rows
|
||||
result := typeutil.PrepareResultFieldData(fieldsData, int64(len(keepIndices)))
|
||||
for _, idx := range keepIndices {
|
||||
typeutil.AppendFieldData(result, fieldsData, int64(idx))
|
||||
}
|
||||
|
||||
return result, uint32(len(keepIndices)), nil
|
||||
}
|
||||
|
||||
// autoGenPrimaryFieldData generate primary data when autoID == true
|
||||
@ -1214,12 +1170,12 @@ func validateFieldDataColumns(columns []*schemapb.FieldData, schema *schemaInfo)
|
||||
expectColumnNum := 0
|
||||
|
||||
// Count expected columns
|
||||
for _, field := range schema.GetFields() {
|
||||
for _, field := range schema.CollectionSchema.GetFields() {
|
||||
if !typeutil.IsBM25FunctionOutputField(field, schema.CollectionSchema) {
|
||||
expectColumnNum++
|
||||
}
|
||||
}
|
||||
for _, structField := range schema.GetStructArrayFields() {
|
||||
for _, structField := range schema.CollectionSchema.GetStructArrayFields() {
|
||||
expectColumnNum += len(structField.GetFields())
|
||||
}
|
||||
|
||||
|
||||
@ -4866,3 +4866,147 @@ func TestGetStorageCost(t *testing.T) {
|
||||
assert.True(t, ok)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCheckDuplicatePkExist_Int64PK(t *testing.T) {
|
||||
primaryFieldSchema := &schemapb.FieldSchema{
|
||||
Name: "id",
|
||||
FieldID: 100,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
}
|
||||
|
||||
t.Run("with duplicates", func(t *testing.T) {
|
||||
fieldsData := []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "id",
|
||||
Type: schemapb.DataType_Int64,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_LongData{
|
||||
LongData: &schemapb.LongArray{
|
||||
Data: []int64{1, 2, 3, 1, 4, 2}, // duplicates: 1, 2
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, hasDup)
|
||||
})
|
||||
|
||||
t.Run("without duplicates", func(t *testing.T) {
|
||||
fieldsData := []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "id",
|
||||
Type: schemapb.DataType_Int64,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_LongData{
|
||||
LongData: &schemapb.LongArray{
|
||||
Data: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, hasDup)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCheckDuplicatePkExist_VarCharPK(t *testing.T) {
|
||||
primaryFieldSchema := &schemapb.FieldSchema{
|
||||
Name: "id",
|
||||
FieldID: 100,
|
||||
DataType: schemapb.DataType_VarChar,
|
||||
}
|
||||
|
||||
t.Run("with duplicates", func(t *testing.T) {
|
||||
fieldsData := []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "id",
|
||||
Type: schemapb.DataType_VarChar,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_StringData{
|
||||
StringData: &schemapb.StringArray{
|
||||
Data: []string{"a", "b", "c", "a", "d"}, // duplicate: "a"
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, hasDup)
|
||||
})
|
||||
|
||||
t.Run("without duplicates", func(t *testing.T) {
|
||||
fieldsData := []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "id",
|
||||
Type: schemapb.DataType_VarChar,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_StringData{
|
||||
StringData: &schemapb.StringArray{
|
||||
Data: []string{"a", "b", "c", "d", "e"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, hasDup)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCheckDuplicatePkExist_EmptyData(t *testing.T) {
|
||||
primaryFieldSchema := &schemapb.FieldSchema{
|
||||
Name: "id",
|
||||
FieldID: 100,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
}
|
||||
|
||||
hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, []*schemapb.FieldData{})
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, hasDup)
|
||||
}
|
||||
|
||||
func TestCheckDuplicatePkExist_MissingPrimaryKey(t *testing.T) {
|
||||
primaryFieldSchema := &schemapb.FieldSchema{
|
||||
Name: "id",
|
||||
FieldID: 100,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
}
|
||||
|
||||
fieldsData := []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "other_field",
|
||||
Type: schemapb.DataType_Int64,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_LongData{
|
||||
LongData: &schemapb.LongArray{
|
||||
Data: []int64{1, 2, 3},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
hasDup, err := CheckDuplicatePkExist(primaryFieldSchema, fieldsData)
|
||||
assert.Error(t, err)
|
||||
assert.False(t, hasDup)
|
||||
}
|
||||
|
||||
@ -245,7 +245,7 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegment() {
|
||||
for i := range c.collectionIDs {
|
||||
plans := balancer.AssignSegment(ctx, c.collectionIDs[i], c.assignments[i], c.nodes, false)
|
||||
if c.unstableAssignment {
|
||||
suite.Equal(len(plans), len(c.expectPlans[i]))
|
||||
assertSegmentPlanNumAndTargetNodeMatch(&suite.Suite, c.expectPlans[i], plans)
|
||||
} else {
|
||||
assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans)
|
||||
}
|
||||
|
||||
@ -559,7 +559,11 @@ func (b *MultiTargetBalancer) genSegmentPlan(ctx context.Context, replica *meta.
|
||||
globalNodeSegments[node] = b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(node))
|
||||
}
|
||||
|
||||
return b.genPlanByDistributions(nodeSegments, globalNodeSegments)
|
||||
plans := b.genPlanByDistributions(nodeSegments, globalNodeSegments)
|
||||
for i := range plans {
|
||||
plans[i].Replica = replica
|
||||
}
|
||||
return plans
|
||||
}
|
||||
|
||||
func (b *MultiTargetBalancer) genPlanByDistributions(nodeSegments, globalNodeSegments map[int64][]*meta.Segment) []SegmentAssignPlan {
|
||||
|
||||
@ -1258,6 +1258,31 @@ func assertSegmentAssignPlanElementMatch(suite *suite.Suite, left []SegmentAssig
|
||||
}
|
||||
}
|
||||
|
||||
// assertSegmentPlanNumAndTargetNodeMatch checks that:
|
||||
// 1. The number of plans matches the expected count
|
||||
// 2. Each plan's target node is in the expected target nodes set (extracted from expectPlans)
|
||||
// 3. The number of unique target nodes matches between expected and actual plans
|
||||
func assertSegmentPlanNumAndTargetNodeMatch(suite *suite.Suite, expectPlans []SegmentAssignPlan, plans []SegmentAssignPlan) {
|
||||
suite.Len(plans, len(expectPlans))
|
||||
|
||||
// Extract expected target nodes from expectPlans
|
||||
expectedSet := make(map[int64]struct{})
|
||||
for _, p := range expectPlans {
|
||||
expectedSet[p.To] = struct{}{}
|
||||
}
|
||||
|
||||
// Extract actual target nodes from plans
|
||||
actualSet := make(map[int64]struct{})
|
||||
for _, plan := range plans {
|
||||
_, ok := expectedSet[plan.To]
|
||||
suite.True(ok, "target node %d not in expected set", plan.To)
|
||||
actualSet[plan.To] = struct{}{}
|
||||
}
|
||||
|
||||
// Check that the number of unique target nodes matches
|
||||
suite.Len(actualSet, len(expectedSet), "number of unique target nodes mismatch: expected %d, got %d", len(expectedSet), len(actualSet))
|
||||
}
|
||||
|
||||
// remove it after resource group enhancement.
|
||||
func assertChannelAssignPlanElementMatch(suite *suite.Suite, left []ChannelAssignPlan, right []ChannelAssignPlan, subset ...bool) {
|
||||
type comparablePlan struct {
|
||||
@ -1300,3 +1325,28 @@ func assertChannelAssignPlanElementMatch(suite *suite.Suite, left []ChannelAssig
|
||||
suite.ElementsMatch(leftPlan, rightPlan)
|
||||
}
|
||||
}
|
||||
|
||||
// assertChannelPlanNumAndTargetNodeMatch checks that:
|
||||
// 1. The number of plans matches the expected count
|
||||
// 2. Each plan's target node is in the expected target nodes set (extracted from expectPlans)
|
||||
// 3. The number of unique target nodes matches between expected and actual plans
|
||||
func assertChannelPlanNumAndTargetNodeMatch(suite *suite.Suite, expectPlans []ChannelAssignPlan, plans []ChannelAssignPlan) {
|
||||
suite.Len(plans, len(expectPlans))
|
||||
|
||||
// Extract expected target nodes from expectPlans
|
||||
expectedSet := make(map[int64]struct{})
|
||||
for _, p := range expectPlans {
|
||||
expectedSet[p.To] = struct{}{}
|
||||
}
|
||||
|
||||
// Extract actual target nodes from plans
|
||||
actualSet := make(map[int64]struct{})
|
||||
for _, plan := range plans {
|
||||
_, ok := expectedSet[plan.To]
|
||||
suite.True(ok, "target node %d not in expected set", plan.To)
|
||||
actualSet[plan.To] = struct{}{}
|
||||
}
|
||||
|
||||
// Check that the number of unique target nodes matches
|
||||
suite.Len(actualSet, len(expectedSet), "number of unique target nodes mismatch: expected %d, got %d", len(expectedSet), len(actualSet))
|
||||
}
|
||||
|
||||
@ -282,7 +282,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() {
|
||||
for i := range c.collectionIDs {
|
||||
plans := balancer.AssignSegment(ctx, c.collectionIDs[i], c.assignments[i], c.nodes, false)
|
||||
if c.unstableAssignment {
|
||||
suite.Len(plans, len(c.expectPlans[i]))
|
||||
assertSegmentPlanNumAndTargetNodeMatch(&suite.Suite, c.expectPlans[i], plans)
|
||||
} else {
|
||||
assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans)
|
||||
}
|
||||
@ -1699,8 +1699,9 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignChannel() {
|
||||
{CollectionID: 1, ChannelName: "channel1"},
|
||||
{CollectionID: 1, ChannelName: "channel2"},
|
||||
},
|
||||
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
|
||||
distributions: map[int64][]*meta.DmChannel{},
|
||||
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
|
||||
distributions: map[int64][]*meta.DmChannel{},
|
||||
unstableAssignment: true,
|
||||
expectPlans: []ChannelAssignPlan{
|
||||
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "channel1"}}, From: -1, To: 2},
|
||||
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "channel2"}}, From: -1, To: 3},
|
||||
@ -1771,7 +1772,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignChannel() {
|
||||
// Test channel assignment
|
||||
plans := balancer.AssignChannel(ctx, c.collectionID, dmChannels, c.nodes, true)
|
||||
if c.unstableAssignment {
|
||||
suite.Len(plans, len(c.expectPlans))
|
||||
assertChannelPlanNumAndTargetNodeMatch(&suite.Suite, c.expectPlans, plans)
|
||||
} else {
|
||||
assertChannelAssignPlanElementMatch(&suite.Suite, c.expectPlans, plans)
|
||||
}
|
||||
|
||||
@ -244,13 +244,7 @@ func NewManifestReader(manifest string,
|
||||
}
|
||||
|
||||
func (mr *ManifestReader) init() error {
|
||||
// TODO add needed column option
|
||||
manifest, err := packed.GetManifest(mr.manifest, mr.storageConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reader, err := packed.NewFFIPackedReader(manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext)
|
||||
reader, err := packed.NewFFIPackedReader(mr.manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -31,6 +31,7 @@ import (
|
||||
|
||||
"github.com/apache/arrow/go/v17/arrow"
|
||||
"github.com/apache/arrow/go/v17/arrow/cdata"
|
||||
"github.com/cockroachdb/errors"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||
@ -38,9 +39,11 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
|
||||
)
|
||||
|
||||
func NewFFIPackedReader(manifest string, schema *arrow.Schema, neededColumns []string, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedReader, error) {
|
||||
cManifest := C.CString(manifest)
|
||||
defer C.free(unsafe.Pointer(cManifest))
|
||||
func NewFFIPackedReader(manifestPath string, schema *arrow.Schema, neededColumns []string, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedReader, error) {
|
||||
cColumnGroups, err := GetColumnGroups(manifestPath, storageConfig)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to get manifest")
|
||||
}
|
||||
|
||||
var cas cdata.CArrowSchema
|
||||
cdata.ExportArrowSchema(schema, &cas)
|
||||
@ -103,7 +106,7 @@ func NewFFIPackedReader(manifest string, schema *arrow.Schema, neededColumns []s
|
||||
cNeededColumnArray := (**C.char)(unsafe.Pointer(&cNeededColumn[0]))
|
||||
cNumColumns := C.int64_t(len(neededColumns))
|
||||
|
||||
status = C.NewPackedFFIReaderWithManifest(cManifest, cSchema, cNeededColumnArray, cNumColumns, &cPackedReader, cStorageConfig, pluginContextPtr)
|
||||
status = C.NewPackedFFIReaderWithManifest(cColumnGroups, cSchema, cNeededColumnArray, cNumColumns, &cPackedReader, cStorageConfig, pluginContextPtr)
|
||||
} else {
|
||||
return nil, fmt.Errorf("storageConfig is required")
|
||||
}
|
||||
@ -184,30 +187,29 @@ func (r *FFIPackedReader) Release() {
|
||||
r.Close()
|
||||
}
|
||||
|
||||
func GetManifest(manifestPath string, storageConfig *indexpb.StorageConfig) (manifest string, err error) {
|
||||
func GetColumnGroups(manifestPath string, storageConfig *indexpb.StorageConfig) (columnGroups C.ColumnGroupsHandle, err error) {
|
||||
var cColumnGroups C.ColumnGroupsHandle
|
||||
basePath, version, err := UnmarshalManfestPath(manifestPath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return cColumnGroups, err
|
||||
}
|
||||
log.Info("GetManifest", zap.String("manifestPath", manifestPath), zap.String("basePath", basePath), zap.Int64("version", version))
|
||||
|
||||
cProperties, err := MakePropertiesFromStorageConfig(storageConfig, nil)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return cColumnGroups, err
|
||||
}
|
||||
cBasePath := C.CString(basePath)
|
||||
defer C.free(unsafe.Pointer(cBasePath))
|
||||
|
||||
var cManifest *C.char
|
||||
var cVersion C.int64_t
|
||||
result := C.get_latest_column_groups(cBasePath, cProperties, &cManifest, &cVersion)
|
||||
result := C.get_latest_column_groups(cBasePath, cProperties, &cColumnGroups, &cVersion)
|
||||
err = HandleFFIResult(result)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return cColumnGroups, err
|
||||
}
|
||||
|
||||
manifest = C.GoString(cManifest)
|
||||
return manifest, nil
|
||||
return cColumnGroups, nil
|
||||
}
|
||||
|
||||
// Ensure FFIPackedReader implements array.RecordReader interface
|
||||
|
||||
@ -133,9 +133,9 @@ func (pw *FFIPackedWriter) WriteRecordBatch(recordBatch arrow.Record) error {
|
||||
}
|
||||
|
||||
func (pw *FFIPackedWriter) Close() (string, error) {
|
||||
var manifest *C.char
|
||||
var cColumnGroups C.ColumnGroupsHandle
|
||||
|
||||
result := C.writer_close(pw.cWriterHandle, nil, nil, 0, &manifest)
|
||||
result := C.writer_close(pw.cWriterHandle, nil, nil, 0, &cColumnGroups)
|
||||
if err := HandleFFIResult(result); err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -143,7 +143,10 @@ func (pw *FFIPackedWriter) Close() (string, error) {
|
||||
cBasePath := C.CString(pw.basePath)
|
||||
defer C.free(unsafe.Pointer(cBasePath))
|
||||
var transationHandle C.TransactionHandle
|
||||
result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle)
|
||||
|
||||
// TODO pass version
|
||||
// use -1 as latest
|
||||
result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle, C.int64_t(-1))
|
||||
if err := HandleFFIResult(result); err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -157,23 +160,14 @@ func (pw *FFIPackedWriter) Close() (string, error) {
|
||||
// #define LOON_TRANSACTION_RESOLVE_MERGE 1
|
||||
// #define LOON_TRANSACTION_RESOLVE_MAX 2
|
||||
|
||||
var commitResult C.bool
|
||||
result = C.transaction_commit(transationHandle, C.int16_t(0), C.int16_t(0), manifest, &commitResult)
|
||||
var commitResult C.TransactionCommitResult
|
||||
result = C.transaction_commit(transationHandle, C.int16_t(0), C.int16_t(0), cColumnGroups, &commitResult)
|
||||
if err := HandleFFIResult(result); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var readVersion C.int64_t
|
||||
|
||||
// TODO: not atomic, need to get version from transaction
|
||||
var cOutManifest *C.char
|
||||
result = C.get_latest_column_groups(cBasePath, pw.cProperties, &cOutManifest, &readVersion)
|
||||
if err := HandleFFIResult(result); err != nil {
|
||||
return "", err
|
||||
}
|
||||
outManifest := C.GoString(cOutManifest)
|
||||
log.Info("FFI writer closed with output manifest", zap.String("manifest", outManifest), zap.Int64("version", int64(readVersion)))
|
||||
log.Info("FFI writer closed", zap.Int64("version", int64(commitResult.committed_version)))
|
||||
|
||||
defer C.properties_free(pw.cProperties)
|
||||
return MarshalManifestPath(pw.basePath, int64(readVersion)), nil
|
||||
return MarshalManifestPath(pw.basePath, int64(commitResult.committed_version)), nil
|
||||
}
|
||||
|
||||
@ -4617,6 +4617,7 @@ type dataCoordConfig struct {
|
||||
MixCompactionSlotUsage ParamItem `refreshable:"true"`
|
||||
L0DeleteCompactionSlotUsage ParamItem `refreshable:"true"`
|
||||
IndexTaskSlotUsage ParamItem `refreshable:"true"`
|
||||
ScalarIndexTaskSlotUsage ParamItem `refreshable:"true"`
|
||||
StatsTaskSlotUsage ParamItem `refreshable:"true"`
|
||||
AnalyzeTaskSlotUsage ParamItem `refreshable:"true"`
|
||||
|
||||
@ -5634,6 +5635,16 @@ if param targetVecIndexVersion is not set, the default value is -1, which means
|
||||
}
|
||||
p.IndexTaskSlotUsage.Init(base.mgr)
|
||||
|
||||
p.ScalarIndexTaskSlotUsage = ParamItem{
|
||||
Key: "dataCoord.slot.scalarIndexTaskSlotUsage",
|
||||
Version: "2.6.8",
|
||||
Doc: "slot usage of scalar index task per 512mb",
|
||||
DefaultValue: "16",
|
||||
PanicIfEmpty: false,
|
||||
Export: true,
|
||||
}
|
||||
p.ScalarIndexTaskSlotUsage.Init(base.mgr)
|
||||
|
||||
p.StatsTaskSlotUsage = ParamItem{
|
||||
Key: "dataCoord.slot.statsTaskSlotUsage",
|
||||
Version: "2.5.8",
|
||||
|
||||
@ -212,6 +212,9 @@ make rebuild_cache >/dev/null 2>&1
|
||||
|
||||
CPU_ARCH=$(get_cpu_arch $CPU_TARGET)
|
||||
|
||||
# In case any 3rdparty (e.g. libavrocpp) requires a minimum version of CMake lower than 3.5
|
||||
export CMAKE_POLICY_VERSION_MINIMUM=3.5
|
||||
|
||||
arch=$(uname -m)
|
||||
CMAKE_CMD="cmake \
|
||||
${CMAKE_EXTRA_ARGS} \
|
||||
|
||||
@ -80,7 +80,7 @@ docker run -d ^
|
||||
--health-start-period=90s ^
|
||||
--health-timeout=20s ^
|
||||
--health-retries=3 ^
|
||||
milvusdb/milvus:v2.6.6 ^
|
||||
milvusdb/milvus:v2.6.7 ^
|
||||
milvus run standalone >nul
|
||||
if %errorlevel% neq 0 (
|
||||
echo Failed to start Milvus container.
|
||||
|
||||
@ -59,7 +59,7 @@ EOF
|
||||
--health-start-period=90s \
|
||||
--health-timeout=20s \
|
||||
--health-retries=3 \
|
||||
milvusdb/milvus:v2.6.6 \
|
||||
milvusdb/milvus:v2.6.7 \
|
||||
milvus run standalone 1> /dev/null
|
||||
}
|
||||
|
||||
|
||||
@ -436,7 +436,7 @@ func TestUpsertAutoID(t *testing.T) {
|
||||
// upsert without pks -> error
|
||||
vecColumn = hp.GenColumnData(nb, entity.FieldTypeFloatVector, *hp.TNewDataOption())
|
||||
_, err = mc.Upsert(ctx, client.NewColumnBasedInsertOption(schema.CollectionName).WithColumns(vecColumn))
|
||||
common.CheckErr(t, err, false, "has no corresponding fieldData pass in: invalid parameter")
|
||||
common.CheckErr(t, err, false, "must assign pk when upsert")
|
||||
}
|
||||
|
||||
// test upsert autoId collection
|
||||
|
||||
@ -170,6 +170,10 @@ func (s *BalanceTestSuit) initCollection(collectionName string, replica int, cha
|
||||
}
|
||||
|
||||
func (s *BalanceTestSuit) TestBalanceOnSingleReplica() {
|
||||
testBalanceOnSingleReplica(s)
|
||||
}
|
||||
|
||||
func testBalanceOnSingleReplica(s *BalanceTestSuit) {
|
||||
name := "test_balance_" + funcutil.GenRandomStr()
|
||||
s.initCollection(name, 1, 2, 2, 2000, 500)
|
||||
|
||||
@ -414,6 +418,16 @@ func (s *BalanceTestSuit) TestConcurrentBalanceChannelAndSegment() {
|
||||
s.Equal(int64(0), failCounter.Load())
|
||||
}
|
||||
|
||||
func (s *BalanceTestSuit) TestMultiTargetBalancePolicy() {
|
||||
// Set balance policy to MultipleTargetBalancer
|
||||
revertGuard := s.Cluster.MustModifyMilvusConfig(map[string]string{
|
||||
paramtable.Get().QueryCoordCfg.Balancer.Key: "MultipleTargetBalancer",
|
||||
})
|
||||
defer revertGuard()
|
||||
|
||||
testBalanceOnSingleReplica(s)
|
||||
}
|
||||
|
||||
func TestBalance(t *testing.T) {
|
||||
suite.Run(t, new(BalanceTestSuit))
|
||||
}
|
||||
|
||||
@ -595,7 +595,7 @@ class ResponseChecker:
|
||||
log.error(f"Query result {query_res} is not list")
|
||||
return False
|
||||
|
||||
log.warning(f'Expected query result is {exp_res}')
|
||||
# log.warning(f'Expected query result is {exp_res}')
|
||||
|
||||
@staticmethod
|
||||
def check_query_iterator(query_res, func_name, check_items):
|
||||
|
||||
@ -1221,14 +1221,13 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base):
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_milvus_client_partial_update_same_pk_same_field(self):
|
||||
"""
|
||||
target: Test PU will success and query will success
|
||||
target: Test partial update on an existing pk with the same field will success
|
||||
method:
|
||||
1. Create a collection
|
||||
2. Insert rows
|
||||
3. Upsert the rows with same pk and same field
|
||||
4. Query the rows
|
||||
5. Upsert the rows with same pk and different field
|
||||
expected: Step 2 -> 4 should success 5 should fail
|
||||
3. Upsert a single row with existing pk and same field (partial update)
|
||||
4. Query the row to verify the update
|
||||
expected: All steps should success, and the field value should be updated
|
||||
"""
|
||||
# step 1: create collection
|
||||
client = self._client()
|
||||
@ -1248,16 +1247,19 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base):
|
||||
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
|
||||
self.upsert(client, collection_name, rows, partial_update=True)
|
||||
|
||||
# step 3: Upsert the rows with same pk and same field
|
||||
new_rows = [{default_primary_key_field_name: 0,
|
||||
default_int32_field_name: i} for i in range(default_nb)]
|
||||
self.upsert(client, collection_name, new_rows, partial_update=True)
|
||||
# step 3: Upsert a single row with existing pk=0 and update the same field
|
||||
updated_value = 99999
|
||||
new_row = {default_primary_key_field_name: 0,
|
||||
default_int32_field_name: updated_value}
|
||||
self.upsert(client, collection_name, [new_row], partial_update=True)
|
||||
|
||||
# step 4: Query the rows
|
||||
# step 4: Query the row to verify the update
|
||||
expected_row = {default_primary_key_field_name: 0,
|
||||
default_int32_field_name: updated_value}
|
||||
result = self.query(client, collection_name, filter=f"{default_primary_key_field_name} == 0",
|
||||
check_task=CheckTasks.check_query_results,
|
||||
output_fields=[default_int32_field_name],
|
||||
check_items={exp_res: [new_rows[-1]],
|
||||
check_items={exp_res: [expected_row],
|
||||
"pk_name": default_primary_key_field_name})[0]
|
||||
assert len(result) == 1
|
||||
|
||||
|
||||
@ -165,9 +165,9 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
|
||||
# step 4: query the rows
|
||||
new_rows = cf.convert_timestamptz(rows, default_timestamp_field_name, IANA_timezone)
|
||||
self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= 0",
|
||||
check_task=CheckTasks.check_query_results,
|
||||
check_items={exp_res: new_rows,
|
||||
"pk_name": default_primary_key_field_name})
|
||||
check_task=CheckTasks.check_query_results,
|
||||
check_items={exp_res: new_rows,
|
||||
"pk_name": default_primary_key_field_name})
|
||||
|
||||
self.drop_collection(client, collection_name)
|
||||
|
||||
|
||||
@ -371,6 +371,72 @@ class TestMilvusClientUpsertInvalid(TestMilvusClientV2Base):
|
||||
check_task=CheckTasks.err_res,
|
||||
check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_milvus_client_upsert_duplicate_pk_int64(self):
|
||||
"""
|
||||
target: test upsert with duplicate primary keys (Int64)
|
||||
method:
|
||||
1. create collection with Int64 primary key
|
||||
2. upsert data with duplicate primary keys in the same batch
|
||||
expected: raise error - duplicate primary keys are not allowed
|
||||
"""
|
||||
client = self._client()
|
||||
collection_name = cf.gen_collection_name_by_testcase_name()
|
||||
# 1. create collection
|
||||
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
|
||||
# 2. upsert with duplicate PKs: 1, 2, 1 (duplicate)
|
||||
rng = np.random.default_rng(seed=19530)
|
||||
rows = [
|
||||
{default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
default_float_field_name: 1.0, default_string_field_name: "first"},
|
||||
{default_primary_key_field_name: 2, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
default_float_field_name: 2.0, default_string_field_name: "second"},
|
||||
{default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
default_float_field_name: 1.1, default_string_field_name: "duplicate"},
|
||||
]
|
||||
error = {ct.err_code: 1100,
|
||||
ct.err_msg: "duplicate primary keys are not allowed in the same batch"}
|
||||
self.upsert(client, collection_name, rows,
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
self.drop_collection(client, collection_name)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_milvus_client_upsert_duplicate_pk_varchar(self):
|
||||
"""
|
||||
target: test upsert with duplicate primary keys (VarChar)
|
||||
method:
|
||||
1. create collection with VarChar primary key
|
||||
2. upsert data with duplicate primary keys in the same batch
|
||||
expected: raise error - duplicate primary keys are not allowed
|
||||
"""
|
||||
client = self._client()
|
||||
collection_name = cf.gen_collection_name_by_testcase_name()
|
||||
dim = default_dim
|
||||
# 1. create collection with VarChar primary key
|
||||
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
||||
schema.add_field(default_primary_key_field_name, DataType.VARCHAR, max_length=64, is_primary=True,
|
||||
auto_id=False)
|
||||
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=dim)
|
||||
schema.add_field(default_float_field_name, DataType.FLOAT)
|
||||
index_params = self.prepare_index_params(client)[0]
|
||||
index_params.add_index(default_vector_field_name, metric_type="COSINE")
|
||||
self.create_collection(client, collection_name, dimension=dim, schema=schema, index_params=index_params)
|
||||
# 2. upsert with duplicate PKs: "a", "b", "a" (duplicate)
|
||||
rng = np.random.default_rng(seed=19530)
|
||||
rows = [
|
||||
{default_primary_key_field_name: "a", default_vector_field_name: list(rng.random((1, dim))[0]),
|
||||
default_float_field_name: 1.0},
|
||||
{default_primary_key_field_name: "b", default_vector_field_name: list(rng.random((1, dim))[0]),
|
||||
default_float_field_name: 2.0},
|
||||
{default_primary_key_field_name: "a", default_vector_field_name: list(rng.random((1, dim))[0]),
|
||||
default_float_field_name: 1.1},
|
||||
]
|
||||
error = {ct.err_code: 1100,
|
||||
ct.err_msg: "duplicate primary keys are not allowed in the same batch"}
|
||||
self.upsert(client, collection_name, rows,
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
self.drop_collection(client, collection_name)
|
||||
|
||||
|
||||
class TestMilvusClientUpsertValid(TestMilvusClientV2Base):
|
||||
""" Test case of search interface """
|
||||
@ -551,342 +617,3 @@ class TestMilvusClientUpsertValid(TestMilvusClientV2Base):
|
||||
self.drop_partition(client, collection_name, partition_name)
|
||||
if self.has_collection(client, collection_name)[0]:
|
||||
self.drop_collection(client, collection_name)
|
||||
|
||||
|
||||
class TestMilvusClientUpsertDedup(TestMilvusClientV2Base):
|
||||
"""Test case for upsert deduplication functionality"""
|
||||
|
||||
@pytest.fixture(scope="function", params=["COSINE", "L2"])
|
||||
def metric_type(self, request):
|
||||
yield request.param
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_milvus_client_upsert_dedup_int64_pk(self):
|
||||
"""
|
||||
target: test upsert with duplicate int64 primary keys in same batch
|
||||
method:
|
||||
1. create collection with int64 primary key
|
||||
2. upsert data with duplicate primary keys [1, 2, 3, 2, 1]
|
||||
3. query to verify only last occurrence is kept
|
||||
expected: only 3 unique records exist, with data from last occurrence
|
||||
"""
|
||||
client = self._client()
|
||||
collection_name = cf.gen_collection_name_by_testcase_name()
|
||||
|
||||
# 1. create collection
|
||||
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
|
||||
|
||||
# 2. upsert data with duplicate PKs: [1, 2, 3, 2, 1]
|
||||
# Expected: keep last occurrence -> [3, 2, 1] at indices [2, 3, 4]
|
||||
rng = np.random.default_rng(seed=19530)
|
||||
rows = [
|
||||
{default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
default_float_field_name: 1.0, default_string_field_name: "str_1_first"},
|
||||
{default_primary_key_field_name: 2, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
default_float_field_name: 2.0, default_string_field_name: "str_2_first"},
|
||||
{default_primary_key_field_name: 3, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
default_float_field_name: 3.0, default_string_field_name: "str_3"},
|
||||
{default_primary_key_field_name: 2, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
default_float_field_name: 2.1, default_string_field_name: "str_2_last"},
|
||||
{default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
default_float_field_name: 1.1, default_string_field_name: "str_1_last"},
|
||||
]
|
||||
|
||||
results = self.upsert(client, collection_name, rows)[0]
|
||||
# After deduplication, should only have 3 records
|
||||
assert results['upsert_count'] == 3
|
||||
|
||||
# 3. query to verify deduplication - should have only 3 unique records
|
||||
query_results = self.query(client, collection_name, filter="id >= 0")[0]
|
||||
assert len(query_results) == 3
|
||||
|
||||
# Verify that last occurrence data is kept
|
||||
id_to_data = {item['id']: item for item in query_results}
|
||||
assert 1 in id_to_data
|
||||
assert 2 in id_to_data
|
||||
assert 3 in id_to_data
|
||||
|
||||
# Check that data from last occurrence is preserved
|
||||
assert id_to_data[1]['float'] == 1.1
|
||||
assert id_to_data[1]['varchar'] == "str_1_last"
|
||||
assert id_to_data[2]['float'] == 2.1
|
||||
assert id_to_data[2]['varchar'] == "str_2_last"
|
||||
assert id_to_data[3]['float'] == 3.0
|
||||
assert id_to_data[3]['varchar'] == "str_3"
|
||||
|
||||
self.drop_collection(client, collection_name)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_milvus_client_upsert_dedup_varchar_pk(self):
|
||||
"""
|
||||
target: test upsert with duplicate varchar primary keys in same batch
|
||||
method:
|
||||
1. create collection with varchar primary key
|
||||
2. upsert data with duplicate primary keys ["a", "b", "c", "b", "a"]
|
||||
3. query to verify only last occurrence is kept
|
||||
expected: only 3 unique records exist, with data from last occurrence
|
||||
"""
|
||||
client = self._client()
|
||||
collection_name = cf.gen_collection_name_by_testcase_name()
|
||||
|
||||
# 1. create collection with varchar primary key
|
||||
schema = self.create_schema(client, enable_dynamic_field=True)[0]
|
||||
schema.add_field("id", DataType.VARCHAR, max_length=64, is_primary=True, auto_id=False)
|
||||
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
|
||||
schema.add_field("age", DataType.INT64)
|
||||
index_params = self.prepare_index_params(client)[0]
|
||||
index_params.add_index(default_vector_field_name, metric_type="COSINE")
|
||||
self.create_collection(client, collection_name, default_dim, schema=schema,
|
||||
index_params=index_params, consistency_level="Strong")
|
||||
|
||||
# 2. upsert data with duplicate PKs: ["a", "b", "c", "b", "a"]
|
||||
# Expected: keep last occurrence -> ["c", "b", "a"] at indices [2, 3, 4]
|
||||
rng = np.random.default_rng(seed=19530)
|
||||
rows = [
|
||||
{"id": "a", default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
"age": 10},
|
||||
{"id": "b", default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
"age": 20},
|
||||
{"id": "c", default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
"age": 30},
|
||||
{"id": "b", default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
"age": 21},
|
||||
{"id": "a", default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
"age": 11},
|
||||
]
|
||||
|
||||
results = self.upsert(client, collection_name, rows)[0]
|
||||
# After deduplication, should only have 3 records
|
||||
assert results['upsert_count'] == 3
|
||||
|
||||
# 3. query to verify deduplication
|
||||
query_results = self.query(client, collection_name, filter='id in ["a", "b", "c"]')[0]
|
||||
assert len(query_results) == 3
|
||||
|
||||
# Verify that last occurrence data is kept
|
||||
id_to_data = {item['id']: item for item in query_results}
|
||||
assert "a" in id_to_data
|
||||
assert "b" in id_to_data
|
||||
assert "c" in id_to_data
|
||||
|
||||
# Check that data from last occurrence is preserved
|
||||
assert id_to_data["a"]["age"] == 11
|
||||
assert id_to_data["b"]["age"] == 21
|
||||
assert id_to_data["c"]["age"] == 30
|
||||
|
||||
self.drop_collection(client, collection_name)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_milvus_client_upsert_dedup_all_duplicates(self):
|
||||
"""
|
||||
target: test upsert when all records have same primary key
|
||||
method:
|
||||
1. create collection
|
||||
2. upsert 5 records with same primary key
|
||||
3. query to verify only 1 record exists
|
||||
expected: only 1 record exists with data from last occurrence
|
||||
"""
|
||||
client = self._client()
|
||||
collection_name = cf.gen_collection_name_by_testcase_name()
|
||||
|
||||
# 1. create collection
|
||||
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
|
||||
|
||||
# 2. upsert data where all have same PK (id=1)
|
||||
rng = np.random.default_rng(seed=19530)
|
||||
rows = [
|
||||
{default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
default_float_field_name: i * 1.0, default_string_field_name: f"version_{i}"}
|
||||
for i in range(5)
|
||||
]
|
||||
|
||||
results = self.upsert(client, collection_name, rows)[0]
|
||||
# After deduplication, should only have 1 record
|
||||
assert results['upsert_count'] == 1
|
||||
|
||||
# 3. query to verify only 1 record exists
|
||||
query_results = self.query(client, collection_name, filter="id == 1")[0]
|
||||
assert len(query_results) == 1
|
||||
|
||||
# Verify it's the last occurrence (i=4)
|
||||
assert query_results[0]['float'] == 4.0
|
||||
assert query_results[0]['varchar'] == "version_4"
|
||||
|
||||
self.drop_collection(client, collection_name)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_milvus_client_upsert_dedup_no_duplicates(self):
|
||||
"""
|
||||
target: test upsert with no duplicate primary keys
|
||||
method:
|
||||
1. create collection
|
||||
2. upsert data with unique primary keys
|
||||
3. query to verify all records exist
|
||||
expected: all records exist as-is
|
||||
"""
|
||||
client = self._client()
|
||||
collection_name = cf.gen_collection_name_by_testcase_name()
|
||||
|
||||
# 1. create collection
|
||||
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
|
||||
|
||||
# 2. upsert data with unique PKs
|
||||
rng = np.random.default_rng(seed=19530)
|
||||
nb = 10
|
||||
rows = [
|
||||
{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
default_float_field_name: i * 1.0, default_string_field_name: str(i)}
|
||||
for i in range(nb)
|
||||
]
|
||||
|
||||
results = self.upsert(client, collection_name, rows)[0]
|
||||
# No deduplication should occur
|
||||
assert results['upsert_count'] == nb
|
||||
|
||||
# 3. query to verify all records exist
|
||||
query_results = self.query(client, collection_name, filter=f"id >= 0")[0]
|
||||
assert len(query_results) == nb
|
||||
|
||||
self.drop_collection(client, collection_name)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_milvus_client_upsert_dedup_large_batch(self):
|
||||
"""
|
||||
target: test upsert deduplication with large batch
|
||||
method:
|
||||
1. create collection
|
||||
2. upsert large batch with 50% duplicate primary keys
|
||||
3. query to verify correct number of records
|
||||
expected: only unique records exist
|
||||
"""
|
||||
client = self._client()
|
||||
collection_name = cf.gen_collection_name_by_testcase_name()
|
||||
|
||||
# 1. create collection
|
||||
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
|
||||
|
||||
# 2. upsert large batch where each ID appears twice
|
||||
rng = np.random.default_rng(seed=19530)
|
||||
nb = 500
|
||||
unique_ids = nb // 2 # 250 unique IDs
|
||||
|
||||
rows = []
|
||||
for i in range(nb):
|
||||
pk = i % unique_ids # This creates duplicates: 0,1,2...249,0,1,2...249
|
||||
rows.append({
|
||||
default_primary_key_field_name: pk,
|
||||
default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
default_float_field_name: float(i), # Different value for each row
|
||||
default_string_field_name: f"batch_{i}"
|
||||
})
|
||||
|
||||
results = self.upsert(client, collection_name, rows)[0]
|
||||
# After deduplication, should only have unique_ids records
|
||||
assert results['upsert_count'] == unique_ids
|
||||
|
||||
# 3. query to verify correct number of records
|
||||
query_results = self.query(client, collection_name, filter=f"id >= 0", limit=1000)[0]
|
||||
assert len(query_results) == unique_ids
|
||||
|
||||
# Verify that last occurrence is kept (should have higher float values)
|
||||
for item in query_results:
|
||||
pk = item['id']
|
||||
# Last occurrence of pk is at index (pk + unique_ids)
|
||||
expected_float = float(pk + unique_ids)
|
||||
assert item['float'] == expected_float
|
||||
assert item['varchar'] == f"batch_{pk + unique_ids}"
|
||||
|
||||
self.drop_collection(client, collection_name)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_milvus_client_upsert_dedup_with_partition(self):
|
||||
"""
|
||||
target: test upsert deduplication works correctly with partitions
|
||||
method:
|
||||
1. create collection with partition
|
||||
2. upsert data with duplicates to specific partition
|
||||
3. query to verify deduplication in partition
|
||||
expected: deduplication works within partition
|
||||
"""
|
||||
client = self._client()
|
||||
collection_name = cf.gen_collection_name_by_testcase_name()
|
||||
partition_name = cf.gen_unique_str("partition")
|
||||
|
||||
# 1. create collection and partition
|
||||
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
|
||||
self.create_partition(client, collection_name, partition_name)
|
||||
|
||||
# 2. upsert data with duplicates to partition
|
||||
rng = np.random.default_rng(seed=19530)
|
||||
rows = [
|
||||
{default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
default_float_field_name: 1.0, default_string_field_name: "first"},
|
||||
{default_primary_key_field_name: 2, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
default_float_field_name: 2.0, default_string_field_name: "unique"},
|
||||
{default_primary_key_field_name: 1, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
||||
default_float_field_name: 1.1, default_string_field_name: "last"},
|
||||
]
|
||||
|
||||
results = self.upsert(client, collection_name, rows, partition_name=partition_name)[0]
|
||||
assert results['upsert_count'] == 2
|
||||
|
||||
# 3. query partition to verify deduplication
|
||||
query_results = self.query(client, collection_name, filter="id >= 0",
|
||||
partition_names=[partition_name])[0]
|
||||
assert len(query_results) == 2
|
||||
|
||||
# Verify correct data
|
||||
id_to_data = {item['id']: item for item in query_results}
|
||||
assert id_to_data[1]['float'] == 1.1
|
||||
assert id_to_data[1]['varchar'] == "last"
|
||||
assert id_to_data[2]['float'] == 2.0
|
||||
assert id_to_data[2]['varchar'] == "unique"
|
||||
|
||||
self.drop_collection(client, collection_name)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_milvus_client_upsert_dedup_with_vectors(self):
|
||||
"""
|
||||
target: test upsert deduplication preserves correct vector data
|
||||
method:
|
||||
1. create collection
|
||||
2. upsert data with duplicate PKs but different vectors
|
||||
3. search to verify correct vector is preserved
|
||||
expected: vector from last occurrence is preserved
|
||||
"""
|
||||
client = self._client()
|
||||
collection_name = cf.gen_collection_name_by_testcase_name()
|
||||
|
||||
# 1. create collection
|
||||
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
|
||||
|
||||
# 2. upsert data with duplicate PK=1 but different vectors
|
||||
# Create distinctly different vectors for easy verification
|
||||
first_vector = [1.0] * default_dim # All 1.0
|
||||
last_vector = [2.0] * default_dim # All 2.0
|
||||
|
||||
rows = [
|
||||
{default_primary_key_field_name: 1, default_vector_field_name: first_vector,
|
||||
default_float_field_name: 1.0, default_string_field_name: "first"},
|
||||
{default_primary_key_field_name: 2, default_vector_field_name: [0.5] * default_dim,
|
||||
default_float_field_name: 2.0, default_string_field_name: "unique"},
|
||||
{default_primary_key_field_name: 1, default_vector_field_name: last_vector,
|
||||
default_float_field_name: 1.1, default_string_field_name: "last"},
|
||||
]
|
||||
|
||||
results = self.upsert(client, collection_name, rows)[0]
|
||||
assert results['upsert_count'] == 2
|
||||
|
||||
# 3. query to get vector data
|
||||
query_results = self.query(client, collection_name, filter="id == 1",
|
||||
output_fields=["id", "vector", "float", "varchar"])[0]
|
||||
assert len(query_results) == 1
|
||||
|
||||
# Verify it's the last occurrence with last_vector
|
||||
result = query_results[0]
|
||||
assert result['float'] == 1.1
|
||||
assert result['varchar'] == "last"
|
||||
# Vector should be last_vector (all 2.0)
|
||||
assert all(abs(v - 2.0) < 0.001 for v in result['vector'])
|
||||
|
||||
self.drop_collection(client, collection_name)
|
||||
@ -28,8 +28,8 @@ pytest-parallel
|
||||
pytest-random-order
|
||||
|
||||
# pymilvus
|
||||
pymilvus==2.7.0rc75
|
||||
pymilvus[bulk_writer]==2.7.0rc75
|
||||
pymilvus==2.7.0rc82
|
||||
pymilvus[bulk_writer]==2.7.0rc82
|
||||
# for protobuf
|
||||
protobuf>=5.29.5
|
||||
|
||||
|
||||
@ -272,7 +272,7 @@ class TestAsyncMilvusClient(TestMilvusClientV2Base):
|
||||
assert _index["indexed_rows"] == async_default_nb
|
||||
assert _index["state"] == "Finished"
|
||||
_load, _ = await self.async_milvus_client_wrap.get_load_state(c_name)
|
||||
assert _load == LoadState.Loaded
|
||||
assert _load['state'] == LoadState.Loaded
|
||||
|
||||
# dql tasks
|
||||
tasks = []
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user