feat: support reopen segment for data/schema changes (#46359)

issue: #46358

This PR implements segment reopening functionality on query nodes,
enabling the application of data or schema changes to already-loaded
segments without requiring a full reload.

### Core (C++)

**New SegmentLoadInfo class**
(`internal/core/src/segcore/SegmentLoadInfo.h/cpp`):
- Encapsulates segment load configuration with structured access
- Implements `ComputeDiff()` to calculate differences between old and
new load states
- Tracks indexes, binlogs, and column groups that need to be loaded or
dropped
- Provides `ConvertFieldIndexInfoToLoadIndexInfo()` for index loading

**ChunkedSegmentSealedImpl modifications**:
- Added `Reopen(const SegmentLoadInfo&)` method to apply incremental
changes based on computed diff
- Refactored `LoadColumnGroups()` and `LoadColumnGroup()` to support
selective loading via field ID map
- Extracted `LoadBatchIndexes()` and `LoadBatchFieldData()` for reusable
batch loading logic
- Added `LoadManifest()` for manifest-based loading path
- Updated all methods to use `SegmentLoadInfo` wrapper instead of direct
proto access

**SegmentGrowingImpl modifications**:
- Added `Reopen()` stub method for interface compliance

**C API additions** (`segment_c.h/cpp`):
- Added `ReopenSegment()` function exposing reopen to Go layer

### Go Side

**QueryNode handlers** (`internal/querynodev2/`):
- Added `HandleReopen()` in handlers.go
- Added `ReopenSegments()` RPC in services.go

**Segment interface** (`internal/querynodev2/segments/`):
- Extended `Segment` interface with `Reopen()` method
- Implemented `Reopen()` in LocalSegment
- Added `Reopen()` to segment loader

**Segcore wrapper** (`internal/util/segcore/`):
- Added `Reopen()` method in segment.go
- Added `ReopenSegmentRequest` in requests.go

### Proto

- Added new fields to support reopen in `query_coord.proto`

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-12-17 15:49:16 +08:00 committed by GitHub
parent d63ec2d8c6
commit 21ed1fabfd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 3120 additions and 1485 deletions

View File

@ -250,82 +250,6 @@ ChunkedSegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
request.has_raw_data);
}
LoadIndexInfo
ChunkedSegmentSealedImpl::ConvertFieldIndexInfoToLoadIndexInfo(
const milvus::proto::segcore::FieldIndexInfo* field_index_info) const {
LoadIndexInfo load_index_info;
load_index_info.segment_id = id_;
// Extract field ID
auto field_id = FieldId(field_index_info->fieldid());
load_index_info.field_id = field_id.get();
load_index_info.partition_id = segment_load_info_.partitionid();
// Get field type from schema
const auto& field_meta = get_schema()[field_id];
load_index_info.field_type = field_meta.get_data_type();
load_index_info.element_type = field_meta.get_element_type();
// Set index metadata
load_index_info.index_id = field_index_info->indexid();
load_index_info.index_build_id = field_index_info->buildid();
load_index_info.index_version = field_index_info->index_version();
load_index_info.index_store_version =
field_index_info->index_store_version();
load_index_info.index_engine_version =
static_cast<IndexVersion>(field_index_info->current_index_version());
load_index_info.index_size = field_index_info->index_size();
load_index_info.num_rows = field_index_info->num_rows();
load_index_info.schema = field_meta.ToProto();
// Copy index file paths, excluding indexParams file
for (const auto& file_path : field_index_info->index_file_paths()) {
size_t last_slash = file_path.find_last_of('/');
std::string filename = (last_slash != std::string::npos)
? file_path.substr(last_slash + 1)
: file_path;
if (filename != "indexParams") {
load_index_info.index_files.push_back(file_path);
}
}
auto& mmap_config = storage::MmapManager::GetInstance().GetMmapConfig();
auto use_mmap = IsVectorDataType(field_meta.get_data_type())
? mmap_config.GetVectorIndexEnableMmap()
: mmap_config.GetScalarIndexEnableMmap();
// Set index params
for (const auto& kv_pair : field_index_info->index_params()) {
if (kv_pair.key() == "mmap.enabled") {
std::string lower;
std::transform(kv_pair.value().begin(),
kv_pair.value().end(),
std::back_inserter(lower),
::tolower);
use_mmap = (lower == "true");
}
load_index_info.index_params[kv_pair.key()] = kv_pair.value();
}
size_t dim =
IsVectorDataType(field_meta.get_data_type()) &&
!IsSparseFloatVectorDataType(field_meta.get_data_type())
? field_meta.get_dim()
: 1;
load_index_info.dim = dim;
auto remote_chunk_manager =
milvus::storage::RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
load_index_info.mmap_dir_path =
milvus::storage::LocalChunkManagerSingleton::GetInstance()
.GetChunkManager()
->GetRootPath();
load_index_info.enable_mmap = use_mmap;
return load_index_info;
}
void
ChunkedSegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) {
switch (load_info.storage_version) {
@ -339,176 +263,6 @@ ChunkedSegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) {
}
}
void
ChunkedSegmentSealedImpl::LoadColumnGroups(const std::string& manifest_path) {
LOG_INFO(
"Loading segment {} field data with manifest {}", id_, manifest_path);
auto properties = milvus::storage::LoonFFIPropertiesSingleton::GetInstance()
.GetProperties();
auto column_groups = GetColumnGroups(manifest_path, properties);
auto arrow_schema = schema_->ConvertToArrowSchema();
reader_ = milvus_storage::api::Reader::create(
column_groups, arrow_schema, nullptr, *properties);
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::LOW);
std::vector<std::future<void>> load_group_futures;
for (int64_t i = 0; i < column_groups->size(); ++i) {
auto future = pool.Submit([this, column_groups, properties, i] {
LoadColumnGroup(column_groups, properties, i);
});
load_group_futures.emplace_back(std::move(future));
}
std::vector<std::exception_ptr> load_exceptions;
for (auto& future : load_group_futures) {
try {
future.get();
} catch (...) {
load_exceptions.push_back(std::current_exception());
}
}
// If any exceptions occurred during index loading, handle them
if (!load_exceptions.empty()) {
LOG_ERROR("Failed to load {} out of {} indexes for segment {}",
load_exceptions.size(),
load_group_futures.size(),
id_);
// Rethrow the first exception
std::rethrow_exception(load_exceptions[0]);
}
}
void
ChunkedSegmentSealedImpl::LoadColumnGroup(
const std::shared_ptr<milvus_storage::api::ColumnGroups>& column_groups,
const std::shared_ptr<milvus_storage::api::Properties>& properties,
int64_t index) {
AssertInfo(index < column_groups->size(),
"load column group index out of range");
auto column_group = column_groups->get_column_group(index);
std::vector<FieldId> milvus_field_ids;
for (auto& column : column_group->columns) {
auto field_id = std::stoll(column);
milvus_field_ids.emplace_back(field_id);
}
auto field_metas = schema_->get_field_metas(milvus_field_ids);
// assumption: vector field occupies whole column group
bool is_vector = false;
bool index_has_rawdata = true;
bool has_mmap_setting = false;
bool mmap_enabled = false;
for (auto& [field_id, field_meta] : field_metas) {
if (IsVectorDataType(field_meta.get_data_type())) {
is_vector = true;
}
std::shared_lock lck(mutex_);
auto iter = index_has_raw_data_.find(field_id);
if (iter != index_has_raw_data_.end()) {
index_has_rawdata = index_has_rawdata && iter->second;
} else {
index_has_rawdata = false;
}
// if field has mmap setting, use it
// - mmap setting at collection level, then all field are the same
// - mmap setting at field level, we define that as long as one field shall be mmap, then whole group shall be mmaped
auto [field_has_setting, field_mmap_enabled] =
schema_->MmapEnabled(field_id);
has_mmap_setting = has_mmap_setting || field_has_setting;
mmap_enabled = mmap_enabled || field_mmap_enabled;
}
if (index_has_rawdata) {
LOG_INFO(
"[StorageV2] segment {} index(es) provide all raw data for column "
"group index {}, skip loading binlog",
this->get_segment_id(),
index);
return;
}
auto& mmap_config = storage::MmapManager::GetInstance().GetMmapConfig();
bool global_use_mmap = is_vector ? mmap_config.GetVectorFieldEnableMmap()
: mmap_config.GetScalarFieldEnableMmap();
auto use_mmap = has_mmap_setting ? mmap_enabled : global_use_mmap;
auto chunk_reader_result = reader_->get_chunk_reader(index);
AssertInfo(chunk_reader_result.ok(),
"get chunk reader failed, segment {}, column group index {}",
get_segment_id(),
index);
auto chunk_reader = std::move(chunk_reader_result).ValueOrDie();
LOG_INFO("[StorageV2] segment {} loads manifest cg index {}",
this->get_segment_id(),
index);
auto translator =
std::make_unique<storagev2translator::ManifestGroupTranslator>(
get_segment_id(),
index,
std::move(chunk_reader),
field_metas,
use_mmap,
column_group->columns.size(),
segment_load_info_.priority());
auto chunked_column_group =
std::make_shared<ChunkedColumnGroup>(std::move(translator));
// Create ProxyChunkColumn for each field
for (const auto& field_id : milvus_field_ids) {
auto field_meta = field_metas.at(field_id);
auto column = std::make_shared<ProxyChunkColumn>(
chunked_column_group, field_id, field_meta);
auto data_type = field_meta.get_data_type();
std::optional<ParquetStatistics> statistics_opt;
load_field_data_common(
field_id,
column,
segment_load_info_.num_of_rows(),
data_type,
use_mmap,
true,
std::
nullopt); // manifest cannot provide parquet skip index directly
if (field_id == TimestampFieldID) {
auto timestamp_proxy_column = get_column(TimestampFieldID);
AssertInfo(timestamp_proxy_column != nullptr,
"timestamp proxy column is nullptr");
// TODO check timestamp_index ready instead of check system_ready_count_
int64_t num_rows = segment_load_info_.num_of_rows();
auto all_ts_chunks = timestamp_proxy_column->GetAllChunks(nullptr);
std::vector<Timestamp> timestamps(num_rows);
int64_t offset = 0;
for (auto& all_ts_chunk : all_ts_chunks) {
auto chunk_data = all_ts_chunk.get();
auto fixed_chunk = dynamic_cast<FixedWidthChunk*>(chunk_data);
auto span = fixed_chunk->Span();
for (size_t j = 0; j < span.row_count(); j++) {
auto ts = *(int64_t*)((char*)span.data() +
j * span.element_sizeof());
timestamps[offset++] = ts;
}
}
init_timestamp_index(timestamps, num_rows);
system_ready_count_++;
AssertInfo(offset == num_rows,
"[StorageV2] timestamp total row count {} not equal "
"to expected {}",
offset,
num_rows);
}
}
}
std::optional<ChunkedSegmentSealedImpl::ParquetStatistics>
parse_parquet_statistics(
const std::vector<std::shared_ptr<parquet::FileMetaData>>& file_metas,
@ -2774,6 +2528,58 @@ ChunkedSegmentSealedImpl::Reopen(SchemaPtr sch) {
schema_ = sch;
}
void
ChunkedSegmentSealedImpl::Reopen(
const milvus::proto::segcore::SegmentLoadInfo& new_load_info) {
SegmentLoadInfo new_seg_load_info(new_load_info);
SegmentLoadInfo current;
{
std::unique_lock lck(mutex_);
current = segment_load_info_;
segment_load_info_ = new_seg_load_info;
}
// compute load diff
auto diff = current.ComputeDiff(new_seg_load_info);
milvus::tracer::TraceContext trace_ctx;
if (!diff.indexes_to_load.empty()) {
LoadBatchIndexes(trace_ctx, diff.indexes_to_load);
}
// drop index
if (!diff.indexes_to_drop.empty()) {
for (auto field_id : diff.indexes_to_drop) {
DropIndex(field_id);
}
}
// load column groups
if (!diff.column_groups_to_load.empty()) {
auto properties =
milvus::storage::LoonFFIPropertiesSingleton::GetInstance()
.GetProperties();
LoadColumnGroups(new_seg_load_info.GetColumnGroups(),
properties,
diff.column_groups_to_load);
}
// load field binlog
if (!diff.binlogs_to_load.empty()) {
LoadBatchFieldData(trace_ctx, diff.binlogs_to_load);
}
// drop field
if (!diff.field_data_to_drop.empty()) {
for (auto field_id : diff.field_data_to_drop) {
DropFieldData(field_id);
}
}
LOG_INFO("Reopen segment {} done", id_);
}
void
ChunkedSegmentSealedImpl::FinishLoad() {
std::unique_lock lck(mutex_);
@ -2910,38 +2716,210 @@ void
ChunkedSegmentSealedImpl::SetLoadInfo(
const proto::segcore::SegmentLoadInfo& load_info) {
std::unique_lock lck(mutex_);
segment_load_info_ = load_info;
segment_load_info_ = SegmentLoadInfo(load_info);
LOG_INFO(
"SetLoadInfo for segment {}, num_rows: {}, index count: {}, "
"storage_version: {}",
id_,
segment_load_info_.num_of_rows(),
segment_load_info_.index_infos_size(),
segment_load_info_.storageversion());
segment_load_info_.GetNumOfRows(),
segment_load_info_.GetIndexInfoCount(),
segment_load_info_.GetStorageVersion());
}
void
ChunkedSegmentSealedImpl::Load(milvus::tracer::TraceContext& trace_ctx) {
// Get load info from segment_load_info_
auto num_rows = segment_load_info_.num_of_rows();
LOG_INFO("Loading segment {} with {} rows", id_, num_rows);
ChunkedSegmentSealedImpl::LoadManifest(const std::string& manifest_path) {
LOG_INFO(
"Loading segment {} field data with manifest {}", id_, manifest_path);
auto properties = milvus::storage::LoonFFIPropertiesSingleton::GetInstance()
.GetProperties();
// Step 1: Separate indexed and non-indexed fields
std::map<FieldId, std::vector<const proto::segcore::FieldIndexInfo*>>
field_id_to_index_info;
std::set<FieldId> indexed_fields;
auto column_groups = segment_load_info_.GetColumnGroups();
for (int i = 0; i < segment_load_info_.index_infos_size(); i++) {
const auto& index_info = segment_load_info_.index_infos(i);
if (index_info.index_file_paths_size() == 0) {
continue;
auto arrow_schema = schema_->ConvertToArrowSchema();
reader_ = milvus_storage::api::Reader::create(
column_groups, arrow_schema, nullptr, *properties);
std::map<int, std::vector<FieldId>> cg_field_ids_map;
for (int i = 0; i < column_groups->size(); ++i) {
auto column_group = column_groups->get_column_group(i);
std::vector<FieldId> milvus_field_ids;
for (auto& column : column_group->columns) {
auto field_id = std::stoll(column);
milvus_field_ids.emplace_back(field_id);
}
auto field_id = FieldId(index_info.fieldid());
field_id_to_index_info[field_id].push_back(&index_info);
indexed_fields.insert(field_id);
cg_field_ids_map[i] = milvus_field_ids;
}
// Step 2: Load indexes in parallel using thread pool
LoadColumnGroups(column_groups, properties, cg_field_ids_map);
}
void
ChunkedSegmentSealedImpl::LoadColumnGroups(
const std::shared_ptr<milvus_storage::api::ColumnGroups>& column_groups,
const std::shared_ptr<milvus_storage::api::Properties>& properties,
std::map<int, std::vector<FieldId>>& cg_field_ids_map) {
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::LOW);
std::vector<std::future<void>> load_group_futures;
for (const auto& kv : cg_field_ids_map) {
auto cg_index = kv.first;
const auto& field_ids = kv.second;
auto future =
pool.Submit([this, column_groups, properties, cg_index, field_ids] {
LoadColumnGroup(column_groups, properties, cg_index, field_ids);
});
load_group_futures.emplace_back(std::move(future));
}
std::vector<std::exception_ptr> load_exceptions;
for (auto& future : load_group_futures) {
try {
future.get();
} catch (...) {
load_exceptions.push_back(std::current_exception());
}
}
// If any exceptions occurred during index loading, handle them
if (!load_exceptions.empty()) {
LOG_ERROR("Failed to load {} out of {} indexes for segment {}",
load_exceptions.size(),
load_group_futures.size(),
id_);
// Rethrow the first exception
std::rethrow_exception(load_exceptions[0]);
}
}
void
ChunkedSegmentSealedImpl::LoadColumnGroup(
const std::shared_ptr<milvus_storage::api::ColumnGroups>& column_groups,
const std::shared_ptr<milvus_storage::api::Properties>& properties,
int64_t index,
const std::vector<FieldId>& milvus_field_ids) {
AssertInfo(index < column_groups->size(),
"load column group index out of range");
auto column_group = column_groups->get_column_group(index);
auto field_metas = schema_->get_field_metas(milvus_field_ids);
// assumption: vector field occupies whole column group
bool is_vector = false;
bool index_has_rawdata = true;
bool has_mmap_setting = false;
bool mmap_enabled = false;
for (auto& [field_id, field_meta] : field_metas) {
if (IsVectorDataType(field_meta.get_data_type())) {
is_vector = true;
}
std::shared_lock lck(mutex_);
auto iter = index_has_raw_data_.find(field_id);
if (iter != index_has_raw_data_.end()) {
index_has_rawdata = index_has_rawdata && iter->second;
} else {
index_has_rawdata = false;
}
// if field has mmap setting, use it
// - mmap setting at collection level, then all field are the same
// - mmap setting at field level, we define that as long as one field shall be mmap, then whole group shall be mmaped
auto [field_has_setting, field_mmap_enabled] =
schema_->MmapEnabled(field_id);
has_mmap_setting = has_mmap_setting || field_has_setting;
mmap_enabled = mmap_enabled || field_mmap_enabled;
}
if (index_has_rawdata) {
LOG_INFO(
"[StorageV2] segment {} index(es) provide all raw data for column "
"group index {}, skip loading binlog",
this->get_segment_id(),
index);
return;
}
auto& mmap_config = storage::MmapManager::GetInstance().GetMmapConfig();
bool global_use_mmap = is_vector ? mmap_config.GetVectorFieldEnableMmap()
: mmap_config.GetScalarFieldEnableMmap();
auto use_mmap = has_mmap_setting ? mmap_enabled : global_use_mmap;
auto chunk_reader_result = reader_->get_chunk_reader(index);
AssertInfo(chunk_reader_result.ok(),
"get chunk reader failed, segment {}, column group index {}",
get_segment_id(),
index);
auto chunk_reader = std::move(chunk_reader_result).ValueOrDie();
LOG_INFO("[StorageV2] segment {} loads manifest cg index {}",
this->get_segment_id(),
index);
auto translator =
std::make_unique<storagev2translator::ManifestGroupTranslator>(
get_segment_id(),
index,
std::move(chunk_reader),
field_metas,
use_mmap,
column_group->columns.size(),
segment_load_info_.GetPriority());
auto chunked_column_group =
std::make_shared<ChunkedColumnGroup>(std::move(translator));
// Create ProxyChunkColumn for each field
for (const auto& field_id : milvus_field_ids) {
auto field_meta = field_metas.at(field_id);
auto column = std::make_shared<ProxyChunkColumn>(
chunked_column_group, field_id, field_meta);
auto data_type = field_meta.get_data_type();
std::optional<ParquetStatistics> statistics_opt;
load_field_data_common(
field_id,
column,
segment_load_info_.GetNumOfRows(),
data_type,
use_mmap,
true,
std::
nullopt); // manifest cannot provide parquet skip index directly
if (field_id == TimestampFieldID) {
auto timestamp_proxy_column = get_column(TimestampFieldID);
AssertInfo(timestamp_proxy_column != nullptr,
"timestamp proxy column is nullptr");
// TODO check timestamp_index ready instead of check system_ready_count_
int64_t num_rows = segment_load_info_.GetNumOfRows();
auto all_ts_chunks = timestamp_proxy_column->GetAllChunks(nullptr);
std::vector<Timestamp> timestamps(num_rows);
int64_t offset = 0;
for (auto& all_ts_chunk : all_ts_chunks) {
auto chunk_data = all_ts_chunk.get();
auto fixed_chunk = dynamic_cast<FixedWidthChunk*>(chunk_data);
auto span = fixed_chunk->Span();
for (size_t j = 0; j < span.row_count(); j++) {
auto ts = *(int64_t*)((char*)span.data() +
j * span.element_sizeof());
timestamps[offset++] = ts;
}
}
init_timestamp_index(timestamps, num_rows);
system_ready_count_++;
AssertInfo(offset == num_rows,
"[StorageV2] timestamp total row count {} not equal "
"to expected {}",
offset,
num_rows);
}
}
}
void
ChunkedSegmentSealedImpl::LoadBatchIndexes(
milvus::tracer::TraceContext& trace_ctx,
std::map<FieldId, std::vector<const proto::segcore::FieldIndexInfo*>>&
field_id_to_index_info) {
auto num_rows = segment_load_info_.GetNumOfRows();
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::LOW);
std::vector<std::future<void>> load_index_futures;
load_index_futures.reserve(field_id_to_index_info.size());
@ -2955,10 +2933,9 @@ ChunkedSegmentSealedImpl::Load(milvus::tracer::TraceContext& trace_ctx) {
field_id,
index_info_ptr,
num_rows]() mutable -> void {
// Convert proto FieldIndexInfo to LoadIndexInfo
auto load_index_info =
ConvertFieldIndexInfoToLoadIndexInfo(index_info_ptr);
segment_load_info_.ConvertFieldIndexInfoToLoadIndexInfo(
index_info_ptr, *schema_, id_);
LOG_INFO("Loading index for segment {} field {} with {} files",
id_,
field_id.get(),
@ -2996,26 +2973,24 @@ ChunkedSegmentSealedImpl::Load(milvus::tracer::TraceContext& trace_ctx) {
// Rethrow the first exception
std::rethrow_exception(index_exceptions[0]);
}
}
LOG_INFO("Finished loading {} indexes for segment {}",
field_id_to_index_info.size(),
void
ChunkedSegmentSealedImpl::LoadBatchFieldData(
milvus::tracer::TraceContext& trace_ctx,
std::vector<std::pair<std::vector<FieldId>, proto::segcore::FieldBinlog>>&
field_binlog_to_load) {
LOG_INFO("Loading field binlog for {} fields in segment {}",
field_binlog_to_load.size(),
id_);
auto manifest_path = segment_load_info_.manifest_path();
if (manifest_path != "") {
LoadColumnGroups(manifest_path);
return;
}
std::map<FieldId, LoadFieldDataInfo> field_data_to_load;
for (int i = 0; i < segment_load_info_.binlog_paths_size(); i++) {
for (auto& [field_ids, field_binlog] : field_binlog_to_load) {
LoadFieldDataInfo load_field_data_info;
load_field_data_info.storage_version =
segment_load_info_.storageversion();
const auto& field_binlog = segment_load_info_.binlog_paths(i);
std::vector<FieldId> field_ids;
segment_load_info_.GetStorageVersion();
// const auto& field_binlog = segment_load_info_.GetBinlogPath(i);
// std::vector<FieldId> field_ids;
// when child fields specified, field id is group id, child field ids are actual id values here
if (field_binlog.child_fields_size() > 0) {
field_ids.reserve(field_binlog.child_fields_size());
@ -3094,45 +3069,103 @@ ChunkedSegmentSealedImpl::Load(milvus::tracer::TraceContext& trace_ctx) {
field_data_to_load[FieldId(group_id)] = load_field_data_info;
}
// Step 4: Load field data for non-indexed fields
if (!field_data_to_load.empty()) {
LOG_INFO("Loading field data for {} fields in segment {}",
field_data_to_load.size(),
id_);
std::vector<std::future<void>> load_field_futures;
load_field_futures.reserve(field_data_to_load.size());
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::LOW);
std::vector<std::future<void>> load_field_futures;
load_field_futures.reserve(field_data_to_load.size());
for (const auto& [field_id, load_field_data_info] :
field_data_to_load) {
// Create a local copy to capture in lambda (C++17 compatible)
const auto field_data = load_field_data_info;
auto future = pool.Submit(
[this, field_data]() -> void { LoadFieldData(field_data); });
for (const auto& [field_id, load_field_data_info] : field_data_to_load) {
// Create a local copy to capture in lambda (C++17 compatible)
const auto field_data = load_field_data_info;
auto future = pool.Submit(
[this, field_data]() -> void { LoadFieldData(field_data); });
load_field_futures.push_back(std::move(future));
load_field_futures.push_back(std::move(future));
}
// Wait for all field data loading to complete and collect exceptions
std::vector<std::exception_ptr> field_exceptions;
field_exceptions.reserve(load_field_futures.size());
for (auto& future : load_field_futures) {
try {
future.get();
} catch (...) {
field_exceptions.push_back(std::current_exception());
}
}
// Wait for all field data loading to complete and collect exceptions
std::vector<std::exception_ptr> field_exceptions;
field_exceptions.reserve(load_field_futures.size());
for (auto& future : load_field_futures) {
try {
future.get();
} catch (...) {
field_exceptions.push_back(std::current_exception());
// If any exceptions occurred during field data loading, handle them
if (!field_exceptions.empty()) {
LOG_ERROR("Failed to load {} out of {} field data for segment {}",
field_exceptions.size(),
load_field_futures.size(),
id_);
// Rethrow the first exception
std::rethrow_exception(field_exceptions[0]);
}
}
void
ChunkedSegmentSealedImpl::Load(milvus::tracer::TraceContext& trace_ctx) {
// Get load info from segment_load_info_
auto num_rows = segment_load_info_.GetNumOfRows();
LOG_INFO("Loading segment {} with {} rows", id_, num_rows);
// Step 1: Separate indexed and non-indexed fields
std::map<FieldId, std::vector<const proto::segcore::FieldIndexInfo*>>
field_id_to_index_info;
std::set<FieldId> indexed_fields;
for (int i = 0; i < segment_load_info_.GetIndexInfoCount(); i++) {
const auto& index_info = segment_load_info_.GetIndexInfo(i);
if (index_info.index_file_paths_size() == 0) {
continue;
}
auto field_id = FieldId(index_info.fieldid());
field_id_to_index_info[field_id].push_back(&index_info);
indexed_fields.insert(field_id);
}
// Step 2: Load indexes in parallel using thread pool
LoadBatchIndexes(trace_ctx, field_id_to_index_info);
LOG_INFO("Finished loading {} indexes for segment {}",
field_id_to_index_info.size(),
id_);
// Step 3.a: Load with manifest
auto manifest_path = segment_load_info_.GetManifestPath();
if (manifest_path != "") {
LoadManifest(manifest_path);
return;
}
// Step 3.b: Load with field binlog
std::vector<std::pair<std::vector<FieldId>, proto::segcore::FieldBinlog>>
field_binlog_to_load;
for (int i = 0; i < segment_load_info_.GetBinlogPathCount(); i++) {
LoadFieldDataInfo load_field_data_info;
load_field_data_info.storage_version =
segment_load_info_.GetStorageVersion();
const auto& field_binlog = segment_load_info_.GetBinlogPath(i);
std::vector<FieldId> field_ids;
// when child fields specified, field id is group id, child field ids are actual id values here
if (field_binlog.child_fields_size() > 0) {
field_ids.reserve(field_binlog.child_fields_size());
for (auto field_id : field_binlog.child_fields()) {
field_ids.emplace_back(field_id);
}
} else {
field_ids.emplace_back(field_binlog.fieldid());
}
// If any exceptions occurred during field data loading, handle them
if (!field_exceptions.empty()) {
LOG_ERROR("Failed to load {} out of {} field data for segment {}",
field_exceptions.size(),
load_field_futures.size(),
id_);
field_binlog_to_load.emplace_back(field_ids, field_binlog);
}
// Rethrow the first exception
std::rethrow_exception(field_exceptions[0]);
}
if (!field_binlog_to_load.empty()) {
LoadBatchFieldData(trace_ctx, field_binlog_to_load);
}
LOG_INFO("Successfully loaded segment {} with {} rows", id_, num_rows);

View File

@ -42,6 +42,7 @@
#include "pb/index_cgo_msg.pb.h"
#include "pb/common.pb.h"
#include "milvus-storage/reader.h"
#include "segcore/SegmentLoadInfo.h"
namespace milvus::segcore {
@ -191,6 +192,10 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
void
Reopen(SchemaPtr sch) override;
void
Reopen(
const milvus::proto::segcore::SegmentLoadInfo& new_load_info) override;
void
LazyCheckSchema(SchemaPtr sch) override;
@ -940,6 +945,18 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
void
load_column_group_data_internal(const LoadFieldDataInfo& load_info);
void
LoadBatchIndexes(
milvus::tracer::TraceContext& trace_ctx,
std::map<FieldId, std::vector<const proto::segcore::FieldIndexInfo*>>&
field_id_to_index_info);
void
LoadBatchFieldData(milvus::tracer::TraceContext& trace_ctx,
std::vector<std::pair<std::vector<FieldId>,
proto::segcore::FieldBinlog>>&
field_binlog_to_load);
/**
* @brief Load all column groups from a manifest file path
*
@ -949,7 +966,13 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
* @param manifest_path JSON string containing base_path and version fields
*/
void
LoadColumnGroups(const std::string& manifest_path);
LoadManifest(const std::string& manifest_path);
void
LoadColumnGroups(
const std::shared_ptr<milvus_storage::api::ColumnGroups>& column_groups,
const std::shared_ptr<milvus_storage::api::Properties>& properties,
std::map<int, std::vector<FieldId>>& cg_field_ids_map);
/**
* @brief Load a single column group at the specified index
@ -965,7 +988,8 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
LoadColumnGroup(
const std::shared_ptr<milvus_storage::api::ColumnGroups>& column_groups,
const std::shared_ptr<milvus_storage::api::Properties>& properties,
int64_t index);
int64_t index,
const std::vector<FieldId>& milvus_field_ids);
void
load_field_data_common(
@ -977,11 +1001,6 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
bool is_proxy_column,
std::optional<ParquetStatistics> statistics = {});
// Convert proto::segcore::FieldIndexInfo to LoadIndexInfo
LoadIndexInfo
ConvertFieldIndexInfoToLoadIndexInfo(
const milvus::proto::segcore::FieldIndexInfo* field_index_info) const;
std::shared_ptr<ChunkedColumnInterface>
get_column(FieldId field_id) const {
std::shared_ptr<ChunkedColumnInterface> res;
@ -1035,7 +1054,8 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
mutable DeletedRecord<true> deleted_record_;
LoadFieldDataInfo field_data_info_;
milvus::proto::segcore::SegmentLoadInfo segment_load_info_;
SegmentLoadInfo segment_load_info_;
SchemaPtr schema_;
int64_t id_;

View File

@ -1510,6 +1510,14 @@ SegmentGrowingImpl::Reopen(SchemaPtr sch) {
}
}
void
SegmentGrowingImpl::Reopen(
const milvus::proto::segcore::SegmentLoadInfo& new_load_info) {
ThrowInfo(milvus::UnexpectedError,
"Unexpected reopening growing segment {} with load info",
id_);
}
void
SegmentGrowingImpl::Load(milvus::tracer::TraceContext& trace_ctx) {
// Convert load_info_ (SegmentLoadInfo) to LoadFieldDataInfo

View File

@ -101,6 +101,10 @@ class SegmentGrowingImpl : public SegmentGrowing {
void
Reopen(SchemaPtr sch) override;
void
Reopen(
const milvus::proto::segcore::SegmentLoadInfo& new_load_info) override;
void
LazyCheckSchema(SchemaPtr sch) override;

View File

@ -234,6 +234,9 @@ class SegmentInterface {
virtual void
Reopen(SchemaPtr sch) = 0;
virtual void
Reopen(const milvus::proto::segcore::SegmentLoadInfo& new_load_info) = 0;
// FinishLoad notifies the segment that all load operation are done
// currently it's used to sync field data list with updated schema.
virtual void

View File

@ -0,0 +1,283 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "segcore/SegmentLoadInfo.h"
#include <algorithm>
#include <cctype>
#include "common/FieldMeta.h"
#include "storage/LocalChunkManagerSingleton.h"
#include "storage/loon_ffi/property_singleton.h"
#include "storage/MmapManager.h"
namespace milvus::segcore {
std::shared_ptr<milvus_storage::api::ColumnGroups>
SegmentLoadInfo::GetColumnGroups() {
auto manifest_path = GetManifestPath();
if (manifest_path.empty()) {
return nullptr;
}
// return cached result if exists
if (column_groups_ != nullptr) {
return column_groups_;
}
auto properties = milvus::storage::LoonFFIPropertiesSingleton::GetInstance()
.GetProperties();
column_groups_ = ::GetColumnGroups(manifest_path, properties);
return column_groups_;
}
LoadIndexInfo
SegmentLoadInfo::ConvertFieldIndexInfoToLoadIndexInfo(
const proto::segcore::FieldIndexInfo* field_index_info,
const Schema& schema,
int64_t segment_id) const {
LoadIndexInfo load_index_info;
load_index_info.segment_id = segment_id;
// Extract field ID
auto field_id = FieldId(field_index_info->fieldid());
load_index_info.field_id = field_id.get();
load_index_info.partition_id = GetPartitionID();
// Get field type from schema
const auto& field_meta = schema[field_id];
load_index_info.field_type = field_meta.get_data_type();
load_index_info.element_type = field_meta.get_element_type();
// Set index metadata
load_index_info.index_id = field_index_info->indexid();
load_index_info.index_build_id = field_index_info->buildid();
load_index_info.index_version = field_index_info->index_version();
load_index_info.index_store_version =
field_index_info->index_store_version();
load_index_info.index_engine_version =
static_cast<IndexVersion>(field_index_info->current_index_version());
load_index_info.index_size = field_index_info->index_size();
load_index_info.num_rows = field_index_info->num_rows();
load_index_info.schema = field_meta.ToProto();
// Copy index file paths, excluding indexParams file
for (const auto& file_path : field_index_info->index_file_paths()) {
size_t last_slash = file_path.find_last_of('/');
std::string filename = (last_slash != std::string::npos)
? file_path.substr(last_slash + 1)
: file_path;
if (filename != "indexParams") {
load_index_info.index_files.push_back(file_path);
}
}
auto& mmap_config = storage::MmapManager::GetInstance().GetMmapConfig();
auto use_mmap = IsVectorDataType(field_meta.get_data_type())
? mmap_config.GetVectorIndexEnableMmap()
: mmap_config.GetScalarIndexEnableMmap();
// Set index params
for (const auto& kv_pair : field_index_info->index_params()) {
if (kv_pair.key() == "mmap.enabled") {
std::string lower;
std::transform(kv_pair.value().begin(),
kv_pair.value().end(),
std::back_inserter(lower),
::tolower);
use_mmap = (lower == "true");
}
load_index_info.index_params[kv_pair.key()] = kv_pair.value();
}
size_t dim =
IsVectorDataType(field_meta.get_data_type()) &&
!IsSparseFloatVectorDataType(field_meta.get_data_type())
? field_meta.get_dim()
: 1;
load_index_info.dim = dim;
load_index_info.mmap_dir_path =
milvus::storage::LocalChunkManagerSingleton::GetInstance()
.GetChunkManager()
->GetRootPath();
load_index_info.enable_mmap = use_mmap;
return load_index_info;
}
std::vector<LoadIndexInfo>
SegmentLoadInfo::GetAllLoadIndexInfos(FieldId field_id,
const Schema& schema,
int64_t segment_id) const {
auto field_index_infos = GetFieldIndexInfos(field_id);
std::vector<LoadIndexInfo> result;
result.reserve(field_index_infos.size());
for (const auto* index_info : field_index_infos) {
result.push_back(ConvertFieldIndexInfoToLoadIndexInfo(
index_info, schema, segment_id));
}
return result;
}
std::map<FieldId, std::vector<LoadIndexInfo>>
SegmentLoadInfo::GetAllLoadIndexInfos(const Schema& schema,
int64_t segment_id) const {
std::map<FieldId, std::vector<LoadIndexInfo>> result;
for (const auto& [field_id, index_infos] : field_index_cache_) {
std::vector<LoadIndexInfo> converted;
converted.reserve(index_infos.size());
for (const auto* index_info : index_infos) {
converted.push_back(ConvertFieldIndexInfoToLoadIndexInfo(
index_info, schema, segment_id));
}
result.emplace(field_id, std::move(converted));
}
return result;
}
void
SegmentLoadInfo::ComputeDiffIndexes(LoadDiff& diff, SegmentLoadInfo& new_info) {
// Get current indexed field IDs
std::set<int64_t> current_index_ids;
for (auto const& index_info : GetIndexInfos()) {
current_index_ids.insert(index_info.indexid());
}
std::set<int64_t> new_index_ids;
// Find indexes to load: indexes in new_info but not in current
for (const auto& new_index_info : new_info.GetIndexInfos()) {
new_index_ids.insert(new_index_info.indexid());
if (current_index_ids.find(new_index_info.indexid()) ==
current_index_ids.end()) {
diff.indexes_to_load[FieldId(new_index_info.fieldid())]
.emplace_back(&new_index_info);
}
}
// Find indexes to drop: fields that have indexes in current but not in new_info
for (const auto& index_info : GetIndexInfos()) {
if (new_index_ids.find(index_info.indexid()) == new_index_ids.end()) {
diff.indexes_to_drop.insert(FieldId(index_info.fieldid()));
}
}
}
void
SegmentLoadInfo::ComputeDiffBinlogs(LoadDiff& diff, SegmentLoadInfo& new_info) {
// field id -> binlog group id
std::map<int64_t, int64_t> current_fields;
for (int i = 0; i < GetBinlogPathCount(); i++) {
auto& field_binlog = GetBinlogPath(i);
for (auto child_id : field_binlog.child_fields()) {
current_fields[child_id] = field_binlog.fieldid();
}
}
std::map<int64_t, int64_t> new_binlog_fields;
for (int i = 0; i < new_info.GetBinlogPathCount(); i++) {
auto& new_field_binlog = new_info.GetBinlogPath(i);
std::vector<FieldId> ids_to_load;
for (auto child_id : new_field_binlog.child_fields()) {
new_binlog_fields[child_id] = new_field_binlog.fieldid();
auto iter = current_fields.find(new_field_binlog.fieldid());
// Find binlogs to load: fields in new_info match current
if (iter == current_fields.end() ||
iter->second != new_field_binlog.fieldid()) {
ids_to_load.emplace_back(child_id);
}
}
if (!ids_to_load.empty()) {
diff.binlogs_to_load.emplace_back(ids_to_load, new_field_binlog);
}
}
// Find field data to drop: fields in current but not in new_info
for (const auto& [field_id, group_id] : current_fields) {
if (new_binlog_fields.find(field_id) == new_binlog_fields.end()) {
diff.field_data_to_drop.emplace(field_id);
}
}
}
void
SegmentLoadInfo::ComputeDiffColumnGroups(LoadDiff& diff,
SegmentLoadInfo& new_info) {
auto cur_column_group = GetColumnGroups();
auto new_column_group = new_info.GetColumnGroups();
AssertInfo(cur_column_group, "current column groups shall not be null");
AssertInfo(new_column_group, "new column groups shall not be null");
// Build a set of current FieldIds from current column groups
std::map<int64_t, int> cur_field_ids;
for (int i = 0; i < cur_column_group->size(); i++) {
auto cg = cur_column_group->get_column_group(i);
for (const auto& column : cg->columns) {
auto field_id = std::stoll(column);
cur_field_ids.emplace(field_id, i);
}
}
// Build a set of new FieldIds and find column groups to load
std::map<int64_t, int> new_field_ids;
for (int i = 0; i < new_column_group->size(); i++) {
auto cg = new_column_group->get_column_group(i);
for (const auto& column : cg->columns) {
auto field_id = std::stoll(column);
new_field_ids.emplace(field_id, i);
auto iter = cur_field_ids.find(field_id);
// If this field doesn't exist in current, mark the column group for loading
if (iter == cur_field_ids.end() || iter->second != i) {
diff.column_groups_to_load[i].emplace_back(field_id);
}
}
}
// Find field data to drop: fields in current but not in new
for (const auto& [field_id, cg_index] : cur_field_ids) {
if (new_field_ids.find(field_id) == new_field_ids.end()) {
diff.field_data_to_drop.emplace(field_id);
}
}
}
LoadDiff
SegmentLoadInfo::ComputeDiff(SegmentLoadInfo& new_info) {
LoadDiff diff;
// Handle index changes
ComputeDiffIndexes(diff, new_info);
// Handle field data changes
// Note: Updates can only happen within the same category:
// - binlog -> binlog
// - manifest -> manifest
// Cross-category changes are not supported.
if (HasManifestPath()) {
AssertInfo(new_info.HasManifestPath(),
"manifest could only be updated with other manifest");
ComputeDiffColumnGroups(diff, new_info);
} else {
AssertInfo(
!new_info.HasManifestPath(),
"field binlogs could only be updated with non-manfest load info");
ComputeDiffBinlogs(diff, new_info);
}
return diff;
}
} // namespace milvus::segcore

View File

@ -0,0 +1,708 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <map>
#include <optional>
#include <set>
#include <string>
#include <vector>
#include "common/Schema.h"
#include "common/Types.h"
#include "milvus-storage/column_groups.h"
#include "pb/segcore.pb.h"
#include "segcore/Types.h"
namespace milvus::segcore {
/**
* @brief Structure representing the difference between two SegmentLoadInfos,
* used for reopening segments.
*
* Note: SegmentLoadInfo can only have updates within the same category:
* - binlog -> binlog updates
* - manifest -> manifest updates
* Cross-category changes (binlog <-> manifest) are not supported.
*/
struct LoadDiff {
// Indexes that need to be loaded (field_id -> list of FieldIndexInfo pointers)
std::map<FieldId, std::vector<const proto::segcore::FieldIndexInfo*>>
indexes_to_load;
// Field binlog paths that need to be loaded [field_ids,FieldBinlog]
// Only populated when both current and new use binlog mode
std::vector<std::pair<std::vector<FieldId>, proto::segcore::FieldBinlog>>
binlogs_to_load;
// Field id to column group index to be loaded
std::map<int, std::vector<FieldId>> column_groups_to_load;
// Indexes that need to be dropped (field_id set)
std::set<FieldId> indexes_to_drop;
// Field data that need to be dropped (field_id set)
// Only populated when both current and new use binlog mode
std::set<FieldId> field_data_to_drop;
// Whether manifest path has changed (only when both use manifest mode)
bool manifest_updated = false;
// New manifest path (valid when manifest_updated is true)
std::string new_manifest_path;
[[nodiscard]] bool
HasChanges() const {
return !indexes_to_load.empty() || !binlogs_to_load.empty() ||
!column_groups_to_load.empty() || !indexes_to_drop.empty() ||
!field_data_to_drop.empty() || manifest_updated;
}
[[nodiscard]] bool
HasManifestChange() const {
return manifest_updated;
}
};
/**
* @brief Utility class that wraps milvus::proto::segcore::SegmentLoadInfo
* and provides convenient accessor methods.
*
* This class simplifies access to SegmentLoadInfo fields and provides
* utility methods for common operations like field lookups, index queries,
* and binlog path management.
*/
class SegmentLoadInfo {
public:
using ProtoType = milvus::proto::segcore::SegmentLoadInfo;
/**
* @brief Default constructor creates an empty SegmentLoadInfo
*/
SegmentLoadInfo() = default;
/**
* @brief Construct from a protobuf SegmentLoadInfo (copy)
* @param info The protobuf SegmentLoadInfo to wrap
*/
explicit SegmentLoadInfo(const ProtoType& info) : info_(info) {
BuildIndexCache();
}
/**
* @brief Construct from a protobuf SegmentLoadInfo (move)
* @param info The protobuf SegmentLoadInfo to wrap
*/
explicit SegmentLoadInfo(ProtoType&& info) : info_(std::move(info)) {
BuildIndexCache();
}
/**
* @brief Copy constructor
*/
SegmentLoadInfo(const SegmentLoadInfo& other)
: info_(other.info_),
field_index_cache_(other.field_index_cache_),
field_binlog_cache_(other.field_binlog_cache_),
column_groups_(other.column_groups_) {
}
/**
* @brief Move constructor
*/
SegmentLoadInfo(SegmentLoadInfo&& other) noexcept
: info_(std::move(other.info_)),
field_index_cache_(std::move(other.field_index_cache_)),
field_binlog_cache_(std::move(other.field_binlog_cache_)),
column_groups_(std::move(other.column_groups_)) {
}
/**
* @brief Copy assignment operator
*/
SegmentLoadInfo&
operator=(const SegmentLoadInfo& other) {
if (this != &other) {
info_ = other.info_;
field_index_cache_ = other.field_index_cache_;
field_binlog_cache_ = other.field_binlog_cache_;
column_groups_ = other.column_groups_;
}
return *this;
}
/**
* @brief Move assignment operator
*/
SegmentLoadInfo&
operator=(SegmentLoadInfo&& other) noexcept {
if (this != &other) {
info_ = std::move(other.info_);
field_index_cache_ = std::move(other.field_index_cache_);
field_binlog_cache_ = std::move(other.field_binlog_cache_);
column_groups_ = std::move(other.column_groups_);
}
return *this;
}
/**
* @brief Set from protobuf (copy)
*/
void
Set(const ProtoType& info) {
info_ = info;
BuildIndexCache();
}
/**
* @brief Set from protobuf (move)
*/
void
Set(ProtoType&& info) {
info_ = std::move(info);
BuildIndexCache();
}
// ==================== Basic Accessors ====================
[[nodiscard]] int64_t
GetSegmentID() const {
return info_.segmentid();
}
[[nodiscard]] int64_t
GetPartitionID() const {
return info_.partitionid();
}
[[nodiscard]] int64_t
GetCollectionID() const {
return info_.collectionid();
}
[[nodiscard]] int64_t
GetDbID() const {
return info_.dbid();
}
[[nodiscard]] int64_t
GetNumOfRows() const {
return info_.num_of_rows();
}
[[nodiscard]] int64_t
GetFlushTime() const {
return info_.flush_time();
}
[[nodiscard]] int64_t
GetReadableVersion() const {
return info_.readableversion();
}
[[nodiscard]] int64_t
GetStorageVersion() const {
return info_.storageversion();
}
[[nodiscard]] bool
IsSorted() const {
return info_.is_sorted();
}
[[nodiscard]] const std::string&
GetInsertChannel() const {
return info_.insert_channel();
}
[[nodiscard]] const std::string&
GetManifestPath() const {
return info_.manifest_path();
}
[[nodiscard]] bool
HasManifestPath() const {
return !info_.manifest_path().empty();
}
[[nodiscard]] proto::common::LoadPriority
GetPriority() const {
return info_.priority();
}
// ==================== Compaction Info ====================
[[nodiscard]] const google::protobuf::RepeatedField<int64_t>&
GetCompactionFrom() const {
return info_.compactionfrom();
}
[[nodiscard]] bool
IsCompacted() const {
return info_.compactionfrom_size() > 0;
}
[[nodiscard]] int
GetCompactionFromCount() const {
return info_.compactionfrom_size();
}
// ==================== Index Info ====================
[[nodiscard]] int
GetIndexInfoCount() const {
return info_.index_infos_size();
}
[[nodiscard]] const proto::segcore::FieldIndexInfo&
GetIndexInfo(int index) const {
return info_.index_infos(index);
}
[[nodiscard]] const google::protobuf::RepeatedPtrField<
proto::segcore::FieldIndexInfo>&
GetIndexInfos() const {
return info_.index_infos();
}
/**
* @brief Check if a field has index info
* @param field_id The field ID to check
* @return true if the field has at least one index info
*/
[[nodiscard]] bool
HasIndexInfo(FieldId field_id) const {
return field_index_cache_.find(field_id) != field_index_cache_.end();
}
/**
* @brief Get all index infos for a specific field
* @param field_id The field ID
* @return Vector of pointers to FieldIndexInfo, empty if field has no index
*/
[[nodiscard]] std::vector<const proto::segcore::FieldIndexInfo*>
GetFieldIndexInfos(FieldId field_id) const {
auto it = field_index_cache_.find(field_id);
if (it != field_index_cache_.end()) {
return it->second;
}
return {};
}
/**
* @brief Get the first index info for a specific field
* @param field_id The field ID
* @return Pointer to FieldIndexInfo, nullptr if field has no index
*/
[[nodiscard]] const proto::segcore::FieldIndexInfo*
GetFirstFieldIndexInfo(FieldId field_id) const {
auto it = field_index_cache_.find(field_id);
if (it != field_index_cache_.end() && !it->second.empty()) {
return it->second[0];
}
return nullptr;
}
/**
* @brief Get all field IDs that have index info
* @return Set of field IDs with indexes
*/
[[nodiscard]] std::set<FieldId>
GetIndexedFieldIds() const {
std::set<FieldId> result;
for (const auto& pair : field_index_cache_) {
result.insert(pair.first);
}
return result;
}
// ==================== Binlog Info ====================
[[nodiscard]] int
GetBinlogPathCount() const {
return info_.binlog_paths_size();
}
[[nodiscard]] const proto::segcore::FieldBinlog&
GetBinlogPath(int index) const {
return info_.binlog_paths(index);
}
[[nodiscard]] const google::protobuf::RepeatedPtrField<
proto::segcore::FieldBinlog>&
GetBinlogPaths() const {
return info_.binlog_paths();
}
/**
* @brief Check if a field/group has binlog paths
* @param field_id The field ID or group ID
* @return true if the field has binlog paths
*/
[[nodiscard]] bool
HasBinlogPath(FieldId field_id) const {
return field_binlog_cache_.find(field_id) != field_binlog_cache_.end();
}
/**
* @brief Get binlog info for a specific field/group
* @param field_id The field ID or group ID
* @return Pointer to FieldBinlog, nullptr if not found
*/
[[nodiscard]] const proto::segcore::FieldBinlog*
GetFieldBinlog(FieldId field_id) const {
auto it = field_binlog_cache_.find(field_id);
if (it != field_binlog_cache_.end()) {
return it->second;
}
return nullptr;
}
/**
* @brief Get all binlog file paths for a specific field/group
* @param field_id The field ID or group ID
* @return Vector of binlog file paths, empty if field has no binlogs
*/
[[nodiscard]] std::vector<std::string>
GetFieldBinlogPaths(FieldId field_id) const {
auto binlog = GetFieldBinlog(field_id);
if (binlog == nullptr) {
return {};
}
std::vector<std::string> paths;
paths.reserve(binlog->binlogs_size());
for (const auto& log : binlog->binlogs()) {
paths.push_back(log.log_path());
}
return paths;
}
/**
* @brief Calculate total row count from binlogs for a field
* @param field_id The field ID or group ID
* @return Total number of entries across all binlogs
*/
[[nodiscard]] int64_t
GetFieldBinlogRowCount(FieldId field_id) const {
auto binlog = GetFieldBinlog(field_id);
if (binlog == nullptr) {
return 0;
}
int64_t total = 0;
for (const auto& log : binlog->binlogs()) {
total += log.entries_num();
}
return total;
}
/**
* @brief Get child field IDs for a column group
* @param group_id The group/field ID
* @return Vector of child field IDs, empty if not a group or no children
*/
[[nodiscard]] std::vector<int64_t>
GetChildFieldIds(FieldId group_id) const {
auto binlog = GetFieldBinlog(group_id);
if (binlog == nullptr || binlog->child_fields_size() == 0) {
return {};
}
std::vector<int64_t> result;
result.reserve(binlog->child_fields_size());
for (auto child_id : binlog->child_fields()) {
result.emplace_back(child_id);
}
return result;
}
/**
* @brief Check if a binlog entry represents a column group
* @param field_id The field ID to check
* @return true if this is a column group with child fields
*/
[[nodiscard]] bool
IsColumnGroup(FieldId field_id) const {
auto binlog = GetFieldBinlog(field_id);
return binlog != nullptr && binlog->child_fields_size() > 0;
}
// ==================== Column Groups Cache ====================
/**
* @brief Get column groups from manifest
* @return Shared pointer to ColumnGroups, nullptr if manifest is empty
*/
[[nodiscard]] std::shared_ptr<milvus_storage::api::ColumnGroups>
GetColumnGroups();
// ==================== Stats & Delta Logs ====================
[[nodiscard]] int
GetStatslogCount() const {
return info_.statslogs_size();
}
[[nodiscard]] const proto::segcore::FieldBinlog&
GetStatslog(int index) const {
return info_.statslogs(index);
}
[[nodiscard]] const google::protobuf::RepeatedPtrField<
proto::segcore::FieldBinlog>&
GetStatslogs() const {
return info_.statslogs();
}
[[nodiscard]] int
GetDeltalogCount() const {
return info_.deltalogs_size();
}
[[nodiscard]] const proto::segcore::FieldBinlog&
GetDeltalog(int index) const {
return info_.deltalogs(index);
}
[[nodiscard]] const google::protobuf::RepeatedPtrField<
proto::segcore::FieldBinlog>&
GetDeltalogs() const {
return info_.deltalogs();
}
// ==================== Text Index Stats ====================
[[nodiscard]] bool
HasTextStatsLog(int64_t field_id) const {
return info_.textstatslogs().find(field_id) !=
info_.textstatslogs().end();
}
[[nodiscard]] const proto::segcore::TextIndexStats*
GetTextStatsLog(int64_t field_id) const {
auto it = info_.textstatslogs().find(field_id);
if (it != info_.textstatslogs().end()) {
return &it->second;
}
return nullptr;
}
[[nodiscard]] const google::protobuf::Map<int64_t,
proto::segcore::TextIndexStats>&
GetTextStatsLogs() const {
return info_.textstatslogs();
}
// ==================== BM25 Logs ====================
[[nodiscard]] int
GetBm25logCount() const {
return info_.bm25logs_size();
}
[[nodiscard]] const proto::segcore::FieldBinlog&
GetBm25log(int index) const {
return info_.bm25logs(index);
}
[[nodiscard]] const google::protobuf::RepeatedPtrField<
proto::segcore::FieldBinlog>&
GetBm25logs() const {
return info_.bm25logs();
}
// ==================== JSON Key Stats ====================
[[nodiscard]] bool
HasJsonKeyStatsLog(int64_t field_id) const {
return info_.jsonkeystatslogs().find(field_id) !=
info_.jsonkeystatslogs().end();
}
[[nodiscard]] const proto::segcore::JsonKeyStats*
GetJsonKeyStatsLog(int64_t field_id) const {
auto it = info_.jsonkeystatslogs().find(field_id);
if (it != info_.jsonkeystatslogs().end()) {
return &it->second;
}
return nullptr;
}
[[nodiscard]] const google::protobuf::Map<int64_t,
proto::segcore::JsonKeyStats>&
GetJsonKeyStatsLogs() const {
return info_.jsonkeystatslogs();
}
// ==================== Diff Computation ====================
/**
* @brief Compute the difference between this SegmentLoadInfo and a new one
*
* This method compares the current SegmentLoadInfo with a new version and
* produces a LoadDiff that describes what needs to be loaded or dropped
* when reopening a segment.
*
* Note: Updates can only happen within the same category:
* - binlog -> binlog updates
* - manifest -> manifest updates
* Cross-category changes (binlog <-> manifest) are not supported.
*
* The diff logic for indexes (always computed):
* - indexes_to_load: Indexes present in new_info but not in this (by field_id)
* - indexes_to_drop: Indexes present in this but not in new_info (by field_id)
*
* The diff logic for field data:
*
* Binlog mode (when current has no manifest):
* - binlogs_to_load: Binlogs present in new_info but not in this
* - field_data_to_drop: Fields present in this but not in new_info
*
* Manifest mode (when current has manifest):
* - manifest_updated: true if manifest path changed
* - new_manifest_path: The new manifest path to load from
* - Caller should reload all fields via the new manifest at runtime
* (since manifest content can only be obtained through runtime APIs)
*
* @param new_info The new SegmentLoadInfo to compare against
* @return LoadDiff containing the differences
*/
[[nodiscard]] LoadDiff
ComputeDiff(SegmentLoadInfo& new_info);
// ==================== Underlying Proto Access ====================
/**
* @brief Get const reference to the underlying protobuf message
*/
[[nodiscard]] const ProtoType&
GetProto() const {
return info_;
}
/**
* @brief Get mutable pointer to the underlying protobuf message
* @note After modifying the proto, call RebuildCache() to update caches
*/
ProtoType*
MutableProto() {
return &info_;
}
/**
* @brief Rebuild internal caches after direct proto modification
*/
void
RebuildCache() {
BuildIndexCache();
}
/**
* @brief Check if the SegmentLoadInfo is empty/unset
*/
[[nodiscard]] bool
IsEmpty() const {
return info_.segmentid() == 0 && info_.num_of_rows() == 0;
}
// ==================== LoadIndexInfo Conversion ====================
/**
* @brief Convert a FieldIndexInfo to LoadIndexInfo
*
* This method converts the protobuf FieldIndexInfo to the internal
* LoadIndexInfo structure used for loading indexes.
*
* @param field_index_info Pointer to the FieldIndexInfo to convert
* @param schema The schema to get field metadata from
* @param segment_id The segment ID for the LoadIndexInfo
* @return LoadIndexInfo structure populated with the converted data
*/
[[nodiscard]] LoadIndexInfo
ConvertFieldIndexInfoToLoadIndexInfo(
const proto::segcore::FieldIndexInfo* field_index_info,
const Schema& schema,
int64_t segment_id) const;
/**
* @brief Get all LoadIndexInfos for a specific field
*
* Converts all index infos for the given field to LoadIndexInfo structures.
* A field may have multiple indexes (e.g., for JSON fields with multiple paths).
*
* @param field_id The field ID to get LoadIndexInfos for
* @param schema The schema to get field metadata from
* @param segment_id The segment ID for the LoadIndexInfo
* @return Vector of LoadIndexInfo structures, empty if field has no indexes
*/
[[nodiscard]] std::vector<LoadIndexInfo>
GetAllLoadIndexInfos(FieldId field_id,
const Schema& schema,
int64_t segment_id) const;
/**
* @brief Get LoadIndexInfos for all indexed fields
*
* Converts all index infos in this SegmentLoadInfo to LoadIndexInfo structures.
*
* @param schema The schema to get field metadata from
* @param segment_id The segment ID for the LoadIndexInfo
* @return Map from field ID to vector of LoadIndexInfo structures
*/
[[nodiscard]] std::map<FieldId, std::vector<LoadIndexInfo>>
GetAllLoadIndexInfos(const Schema& schema, int64_t segment_id) const;
private:
void
BuildIndexCache() {
field_index_cache_.clear();
field_binlog_cache_.clear();
// Build index cache
for (int i = 0; i < info_.index_infos_size(); i++) {
const auto& index_info = info_.index_infos(i);
if (index_info.index_file_paths_size() == 0) {
continue;
}
auto field_id = FieldId(index_info.fieldid());
field_index_cache_[field_id].push_back(&index_info);
}
// Build binlog cache
for (int i = 0; i < info_.binlog_paths_size(); i++) {
const auto& binlog = info_.binlog_paths(i);
auto field_id = FieldId(binlog.fieldid());
field_binlog_cache_[field_id] = &binlog;
}
}
void
ComputeDiffIndexes(LoadDiff& diff, SegmentLoadInfo& new_info);
void
ComputeDiffBinlogs(LoadDiff& diff, SegmentLoadInfo& new_info);
void
ComputeDiffColumnGroups(LoadDiff& diff, SegmentLoadInfo& new_info);
ProtoType info_;
// Cache for quick field -> index info lookup
std::map<FieldId, std::vector<const proto::segcore::FieldIndexInfo*>>
field_index_cache_;
// Cache for quick field -> binlog lookup
std::map<FieldId, const proto::segcore::FieldBinlog*> field_binlog_cache_;
// Cache for column groups metadata (used with manifest mode)
std::shared_ptr<milvus_storage::api::ColumnGroups> column_groups_;
};
} // namespace milvus::segcore

View File

@ -0,0 +1,328 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include "segcore/SegmentLoadInfo.h"
using namespace milvus;
using namespace milvus::segcore;
class SegmentLoadInfoTest : public ::testing::Test {
protected:
void
SetUp() override {
// Setup a basic SegmentLoadInfo proto
proto_.set_segmentid(12345);
proto_.set_partitionid(100);
proto_.set_collectionid(200);
proto_.set_dbid(1);
proto_.set_num_of_rows(10000);
proto_.set_flush_time(1234567890);
proto_.set_readableversion(5);
proto_.set_storageversion(1);
proto_.set_is_sorted(true);
proto_.set_insert_channel("test_channel");
proto_.set_manifest_path("/path/to/manifest");
proto_.set_priority(proto::common::LoadPriority::LOW);
// Add compaction from
proto_.add_compactionfrom(111);
proto_.add_compactionfrom(222);
// Add index info
auto* index_info = proto_.add_index_infos();
index_info->set_fieldid(101);
index_info->set_indexid(1001);
index_info->set_buildid(2001);
index_info->set_index_version(1);
index_info->add_index_file_paths("/path/to/index1");
index_info->add_index_file_paths("/path/to/index2");
auto* index_info2 = proto_.add_index_infos();
index_info2->set_fieldid(102);
index_info2->set_indexid(1002);
index_info2->add_index_file_paths("/path/to/index3");
// Add binlog paths
auto* binlog = proto_.add_binlog_paths();
binlog->set_fieldid(101);
auto* log1 = binlog->add_binlogs();
log1->set_log_path("/path/to/binlog1");
log1->set_entries_num(500);
auto* log2 = binlog->add_binlogs();
log2->set_log_path("/path/to/binlog2");
log2->set_entries_num(500);
// Add column group binlog
auto* group_binlog = proto_.add_binlog_paths();
group_binlog->set_fieldid(200);
group_binlog->add_child_fields(201);
group_binlog->add_child_fields(202);
auto* group_log = group_binlog->add_binlogs();
group_log->set_log_path("/path/to/group_binlog");
group_log->set_entries_num(1000);
// Add statslogs
auto* statslog = proto_.add_statslogs();
statslog->set_fieldid(101);
auto* stat_log = statslog->add_binlogs();
stat_log->set_log_path("/path/to/statslog");
// Add deltalogs
auto* deltalog = proto_.add_deltalogs();
deltalog->set_fieldid(0);
auto* delta_log = deltalog->add_binlogs();
delta_log->set_log_path("/path/to/deltalog");
// Add text stats
auto& text_stats = (*proto_.mutable_textstatslogs())[101];
text_stats.set_fieldid(101);
text_stats.set_version(1);
text_stats.add_files("/path/to/text_stats");
// Add json key stats
auto& json_stats = (*proto_.mutable_jsonkeystatslogs())[102];
json_stats.set_fieldid(102);
json_stats.set_version(1);
// Add bm25 logs
auto* bm25log = proto_.add_bm25logs();
bm25log->set_fieldid(103);
auto* bm25_binlog = bm25log->add_binlogs();
bm25_binlog->set_log_path("/path/to/bm25log");
}
proto::segcore::SegmentLoadInfo proto_;
};
TEST_F(SegmentLoadInfoTest, DefaultConstructor) {
SegmentLoadInfo info;
EXPECT_TRUE(info.IsEmpty());
EXPECT_EQ(info.GetSegmentID(), 0);
EXPECT_EQ(info.GetNumOfRows(), 0);
}
TEST_F(SegmentLoadInfoTest, ConstructFromProto) {
SegmentLoadInfo info(proto_);
EXPECT_FALSE(info.IsEmpty());
EXPECT_EQ(info.GetSegmentID(), 12345);
EXPECT_EQ(info.GetPartitionID(), 100);
EXPECT_EQ(info.GetCollectionID(), 200);
EXPECT_EQ(info.GetDbID(), 1);
EXPECT_EQ(info.GetNumOfRows(), 10000);
EXPECT_EQ(info.GetFlushTime(), 1234567890);
EXPECT_EQ(info.GetReadableVersion(), 5);
EXPECT_EQ(info.GetStorageVersion(), 1);
EXPECT_TRUE(info.IsSorted());
EXPECT_EQ(info.GetInsertChannel(), "test_channel");
EXPECT_EQ(info.GetManifestPath(), "/path/to/manifest");
EXPECT_TRUE(info.HasManifestPath());
EXPECT_EQ(info.GetPriority(), proto::common::LoadPriority::LOW);
}
TEST_F(SegmentLoadInfoTest, MoveConstructor) {
SegmentLoadInfo info1(proto_);
SegmentLoadInfo info2(std::move(info1));
EXPECT_EQ(info2.GetSegmentID(), 12345);
EXPECT_EQ(info2.GetNumOfRows(), 10000);
}
TEST_F(SegmentLoadInfoTest, CopyAssignment) {
SegmentLoadInfo info1(proto_);
SegmentLoadInfo info2;
info2 = info1;
EXPECT_EQ(info2.GetSegmentID(), 12345);
EXPECT_EQ(info2.GetNumOfRows(), 10000);
}
TEST_F(SegmentLoadInfoTest, SetMethod) {
SegmentLoadInfo info;
info.Set(proto_);
EXPECT_EQ(info.GetSegmentID(), 12345);
EXPECT_EQ(info.GetNumOfRows(), 10000);
}
TEST_F(SegmentLoadInfoTest, CompactionInfo) {
SegmentLoadInfo info(proto_);
EXPECT_TRUE(info.IsCompacted());
EXPECT_EQ(info.GetCompactionFromCount(), 2);
EXPECT_EQ(info.GetCompactionFrom()[0], 111);
EXPECT_EQ(info.GetCompactionFrom()[1], 222);
}
TEST_F(SegmentLoadInfoTest, IndexInfo) {
SegmentLoadInfo info(proto_);
EXPECT_EQ(info.GetIndexInfoCount(), 2);
EXPECT_TRUE(info.HasIndexInfo(FieldId(101)));
EXPECT_TRUE(info.HasIndexInfo(FieldId(102)));
EXPECT_FALSE(info.HasIndexInfo(FieldId(999)));
auto index_infos = info.GetFieldIndexInfos(FieldId(101));
EXPECT_EQ(index_infos.size(), 1);
EXPECT_EQ(index_infos[0]->indexid(), 1001);
auto first_index = info.GetFirstFieldIndexInfo(FieldId(101));
EXPECT_NE(first_index, nullptr);
EXPECT_EQ(first_index->buildid(), 2001);
auto indexed_fields = info.GetIndexedFieldIds();
EXPECT_EQ(indexed_fields.size(), 2);
EXPECT_TRUE(indexed_fields.count(FieldId(101)) > 0);
EXPECT_TRUE(indexed_fields.count(FieldId(102)) > 0);
// Test non-existent field
auto empty_infos = info.GetFieldIndexInfos(FieldId(999));
EXPECT_TRUE(empty_infos.empty());
auto null_index = info.GetFirstFieldIndexInfo(FieldId(999));
EXPECT_EQ(null_index, nullptr);
}
TEST_F(SegmentLoadInfoTest, BinlogInfo) {
SegmentLoadInfo info(proto_);
EXPECT_EQ(info.GetBinlogPathCount(), 2);
EXPECT_TRUE(info.HasBinlogPath(FieldId(101)));
EXPECT_TRUE(info.HasBinlogPath(FieldId(200)));
EXPECT_FALSE(info.HasBinlogPath(FieldId(999)));
auto binlog = info.GetFieldBinlog(FieldId(101));
EXPECT_NE(binlog, nullptr);
EXPECT_EQ(binlog->binlogs_size(), 2);
auto paths = info.GetFieldBinlogPaths(FieldId(101));
EXPECT_EQ(paths.size(), 2);
EXPECT_EQ(paths[0], "/path/to/binlog1");
EXPECT_EQ(paths[1], "/path/to/binlog2");
auto row_count = info.GetFieldBinlogRowCount(FieldId(101));
EXPECT_EQ(row_count, 1000);
// Test non-existent field
auto empty_paths = info.GetFieldBinlogPaths(FieldId(999));
EXPECT_TRUE(empty_paths.empty());
auto zero_count = info.GetFieldBinlogRowCount(FieldId(999));
EXPECT_EQ(zero_count, 0);
}
TEST_F(SegmentLoadInfoTest, ColumnGroup) {
SegmentLoadInfo info(proto_);
EXPECT_TRUE(info.IsColumnGroup(FieldId(200)));
EXPECT_FALSE(info.IsColumnGroup(FieldId(101)));
EXPECT_FALSE(info.IsColumnGroup(FieldId(999)));
auto child_fields = info.GetChildFieldIds(FieldId(200));
EXPECT_EQ(child_fields.size(), 2);
EXPECT_EQ(child_fields[0], 201);
EXPECT_EQ(child_fields[1], 202);
auto empty_children = info.GetChildFieldIds(FieldId(101));
EXPECT_TRUE(empty_children.empty());
}
TEST_F(SegmentLoadInfoTest, StatsAndDeltaLogs) {
SegmentLoadInfo info(proto_);
EXPECT_EQ(info.GetStatslogCount(), 1);
EXPECT_EQ(info.GetDeltalogCount(), 1);
const auto& statslog = info.GetStatslog(0);
EXPECT_EQ(statslog.fieldid(), 101);
const auto& deltalog = info.GetDeltalog(0);
EXPECT_EQ(deltalog.fieldid(), 0);
}
TEST_F(SegmentLoadInfoTest, TextStats) {
SegmentLoadInfo info(proto_);
EXPECT_TRUE(info.HasTextStatsLog(101));
EXPECT_FALSE(info.HasTextStatsLog(999));
auto text_stats = info.GetTextStatsLog(101);
EXPECT_NE(text_stats, nullptr);
EXPECT_EQ(text_stats->fieldid(), 101);
EXPECT_EQ(text_stats->version(), 1);
auto null_stats = info.GetTextStatsLog(999);
EXPECT_EQ(null_stats, nullptr);
}
TEST_F(SegmentLoadInfoTest, JsonKeyStats) {
SegmentLoadInfo info(proto_);
EXPECT_TRUE(info.HasJsonKeyStatsLog(102));
EXPECT_FALSE(info.HasJsonKeyStatsLog(999));
auto json_stats = info.GetJsonKeyStatsLog(102);
EXPECT_NE(json_stats, nullptr);
EXPECT_EQ(json_stats->fieldid(), 102);
auto null_stats = info.GetJsonKeyStatsLog(999);
EXPECT_EQ(null_stats, nullptr);
}
TEST_F(SegmentLoadInfoTest, Bm25Logs) {
SegmentLoadInfo info(proto_);
EXPECT_EQ(info.GetBm25logCount(), 1);
const auto& bm25log = info.GetBm25log(0);
EXPECT_EQ(bm25log.fieldid(), 103);
}
TEST_F(SegmentLoadInfoTest, UnderlyingProtoAccess) {
SegmentLoadInfo info(proto_);
const auto& proto = info.GetProto();
EXPECT_EQ(proto.segmentid(), 12345);
auto* mutable_proto = info.MutableProto();
mutable_proto->set_segmentid(99999);
info.RebuildCache();
EXPECT_EQ(info.GetSegmentID(), 99999);
}
TEST_F(SegmentLoadInfoTest, EmptyManifestPath) {
proto::segcore::SegmentLoadInfo empty_proto;
empty_proto.set_segmentid(1);
empty_proto.set_num_of_rows(100);
SegmentLoadInfo info(empty_proto);
EXPECT_FALSE(info.HasManifestPath());
EXPECT_TRUE(info.GetManifestPath().empty());
}
TEST_F(SegmentLoadInfoTest, IndexWithoutFiles) {
proto::segcore::SegmentLoadInfo test_proto;
test_proto.set_segmentid(1);
// Add index info without files - should be ignored in cache
auto* index_info = test_proto.add_index_infos();
index_info->set_fieldid(101);
index_info->set_indexid(1001);
// No index_file_paths added
SegmentLoadInfo info(test_proto);
// Index without files should not be in cache
EXPECT_FALSE(info.HasIndexInfo(FieldId(101)));
EXPECT_EQ(info.GetIndexInfoCount(), 1); // Proto still has it
}

View File

@ -139,6 +139,29 @@ NewSegmentWithLoadInfo(CCollection collection,
}
}
CStatus
ReopenSegment(CTraceContext c_trace,
CSegmentInterface c_segment,
const uint8_t* load_info_blob,
const int64_t load_info_length) {
SCOPE_CGO_CALL_METRIC();
try {
AssertInfo(load_info_blob, "load info is null");
milvus::proto::segcore::SegmentLoadInfo load_info;
auto suc = load_info.ParseFromArray(load_info_blob, load_info_length);
AssertInfo(suc, "unmarshal load info failed");
auto segment =
static_cast<milvus::segcore::SegmentInterface*>(c_segment);
segment->Reopen(load_info);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
}
CStatus
SegmentLoad(CTraceContext c_trace, CSegmentInterface c_segment) {
SCOPE_CGO_CALL_METRIC();

View File

@ -69,6 +69,27 @@ NewSegmentWithLoadInfo(CCollection collection,
CStatus
SegmentLoad(CTraceContext c_trace, CSegmentInterface c_segment);
/**
* @brief Reopen an existing segment with updated load information
*
* This function reopens a segment with new load configuration, typically used
* when the segment needs to be reconfigured due to schema changes or updated
* load parameters. The segment will be reinitialized with the provided load info
* while preserving its identity (segment_id).
*
* @param c_trace Tracing context for distributed tracing and debugging
* @param c_segment The segment handle to be reopened
* @param load_info_blob Serialized SegmentLoadInfo protobuf message containing
* the new load configuration (field data info, index info, etc.)
* @param load_info_length Length of the load_info_blob in bytes
* @return CStatus indicating success or failure with error details
*/
CStatus
ReopenSegment(CTraceContext c_trace,
CSegmentInterface c_segment,
const uint8_t* load_info_blob,
const int64_t load_info_length);
void
DeleteSegment(CSegmentInterface c_segment);

View File

@ -216,6 +216,21 @@ func (node *QueryNode) loadStats(ctx context.Context, req *querypb.LoadSegmentsR
return status
}
func (node *QueryNode) reopenSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) *commonpb.Status {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64s("segmentIDs", lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })),
)
log.Info("start to reopen segments")
err := node.loader.ReopenSegments(ctx, req.GetInfos())
if err != nil {
log.Warn("failed to reopen segments", zap.Error(err))
return merr.Status(err)
}
return merr.Success()
}
func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryRequest, channel string) (*internalpb.RetrieveResults, error) {
msgID := req.Req.Base.GetMsgID()
traceID := trace.SpanFromContext(ctx).SpanContext().TraceID()

View File

@ -450,6 +450,53 @@ func (_c *MockLoader_LoadLazySegment_Call) RunAndReturn(run func(context.Context
return _c
}
// ReopenSegments provides a mock function with given fields: ctx, loadInfos
func (_m *MockLoader) ReopenSegments(ctx context.Context, loadInfos []*querypb.SegmentLoadInfo) error {
ret := _m.Called(ctx, loadInfos)
if len(ret) == 0 {
panic("no return value specified for ReopenSegments")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, []*querypb.SegmentLoadInfo) error); ok {
r0 = rf(ctx, loadInfos)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockLoader_ReopenSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReopenSegments'
type MockLoader_ReopenSegments_Call struct {
*mock.Call
}
// ReopenSegments is a helper method to define mock.On call
// - ctx context.Context
// - loadInfos []*querypb.SegmentLoadInfo
func (_e *MockLoader_Expecter) ReopenSegments(ctx interface{}, loadInfos interface{}) *MockLoader_ReopenSegments_Call {
return &MockLoader_ReopenSegments_Call{Call: _e.mock.On("ReopenSegments", ctx, loadInfos)}
}
func (_c *MockLoader_ReopenSegments_Call) Run(run func(ctx context.Context, loadInfos []*querypb.SegmentLoadInfo)) *MockLoader_ReopenSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]*querypb.SegmentLoadInfo))
})
return _c
}
func (_c *MockLoader_ReopenSegments_Call) Return(_a0 error) *MockLoader_ReopenSegments_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockLoader_ReopenSegments_Call) RunAndReturn(run func(context.Context, []*querypb.SegmentLoadInfo) error) *MockLoader_ReopenSegments_Call {
_c.Call.Return(run)
return _c
}
// NewMockLoader creates a new instance of MockLoader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockLoader(t interface {

View File

@ -1610,6 +1610,53 @@ func (_c *MockSegment_RemoveUnusedFieldFiles_Call) RunAndReturn(run func() error
return _c
}
// Reopen provides a mock function with given fields: ctx, newLoadInfo
func (_m *MockSegment) Reopen(ctx context.Context, newLoadInfo *querypb.SegmentLoadInfo) error {
ret := _m.Called(ctx, newLoadInfo)
if len(ret) == 0 {
panic("no return value specified for Reopen")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.SegmentLoadInfo) error); ok {
r0 = rf(ctx, newLoadInfo)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockSegment_Reopen_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Reopen'
type MockSegment_Reopen_Call struct {
*mock.Call
}
// Reopen is a helper method to define mock.On call
// - ctx context.Context
// - newLoadInfo *querypb.SegmentLoadInfo
func (_e *MockSegment_Expecter) Reopen(ctx interface{}, newLoadInfo interface{}) *MockSegment_Reopen_Call {
return &MockSegment_Reopen_Call{Call: _e.mock.On("Reopen", ctx, newLoadInfo)}
}
func (_c *MockSegment_Reopen_Call) Run(run func(ctx context.Context, newLoadInfo *querypb.SegmentLoadInfo)) *MockSegment_Reopen_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*querypb.SegmentLoadInfo))
})
return _c
}
func (_c *MockSegment_Reopen_Call) Return(_a0 error) *MockSegment_Reopen_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSegment_Reopen_Call) RunAndReturn(run func(context.Context, *querypb.SegmentLoadInfo) error) *MockSegment_Reopen_Call {
_c.Call.Return(run)
return _c
}
// ResetIndexesLazyLoad provides a mock function with given fields: lazyState
func (_m *MockSegment) ResetIndexesLazyLoad(lazyState bool) {
_m.Called(lazyState)

View File

@ -1372,6 +1372,17 @@ func (s *LocalSegment) Load(ctx context.Context) error {
return s.csegment.Load(ctx)
}
func (s *LocalSegment) Reopen(ctx context.Context, newLoadInfo *querypb.SegmentLoadInfo) error {
err := s.csegment.Reopen(ctx, &segcore.ReopenRequest{
LoadInfo: newLoadInfo,
})
if err != nil {
return err
}
s.loadInfo.Store(newLoadInfo)
return nil
}
type ReleaseScope int
const (

View File

@ -88,6 +88,7 @@ type Segment interface {
Load(ctx context.Context) error
FinishLoad() error
Release(ctx context.Context, opts ...releaseOption)
Reopen(ctx context.Context, newLoadInfo *querypb.SegmentLoadInfo) error
// Bloom filter related
SetBloomFilter(bf *pkoracle.BloomFilterSet)

View File

@ -186,6 +186,10 @@ func (s *L0Segment) Load(ctx context.Context) error {
return nil
}
func (s *L0Segment) Reopen(ctx context.Context, newLoadInfo *querypb.SegmentLoadInfo) error {
return merr.WrapErrServiceInternal("unexpected reopen on l0 segment")
}
func (s *L0Segment) Release(ctx context.Context, opts ...releaseOption) {
s.dataGuard.Lock()
defer s.dataGuard.Unlock()

View File

@ -103,6 +103,11 @@ type Loader interface {
LoadJSONIndex(ctx context.Context,
segment Segment,
info *querypb.SegmentLoadInfo) error
// ReopenSegments update segment data according to new load info.
ReopenSegments(ctx context.Context,
loadInfos []*querypb.SegmentLoadInfo,
) error
}
type ResourceEstimate struct {
@ -2246,6 +2251,40 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context,
return loader.waitSegmentLoadDone(ctx, commonpb.SegmentState_SegmentStateNone, []int64{loadInfo.GetSegmentID()}, version)
}
func (loader *segmentLoader) ReopenSegments(ctx context.Context,
loadInfos []*querypb.SegmentLoadInfo,
) error {
// Filter out LOADING segments only
// use None to avoid loaded check
infos := loader.prepare(ctx, commonpb.SegmentState_SegmentStateNone, loadInfos...)
defer loader.unregister(infos...)
// use full resource in case of whole segment reopen
// TODO use calculated resource from segcore after supported
requestResourceResult, err := loader.requestResource(ctx, infos...)
if err != nil {
log.Warn("reopen segment request resource failed", zap.Error(err))
return err
}
defer loader.freeRequestResource(requestResourceResult)
for _, info := range infos {
segment := loader.manager.Segment.GetSealed(info.GetSegmentID())
if segment == nil {
log.Warn("failed to reopen segment, segment not loaded", zap.Int64("segmentID", info.GetSegmentID()))
continue
}
err := segment.Reopen(ctx, info)
if err != nil {
log.Warn("failed to reopen segment", zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))
return err
}
}
return nil
}
func (loader *segmentLoader) LoadJSONIndex(ctx context.Context,
seg Segment,
loadInfo *querypb.SegmentLoadInfo,

View File

@ -529,14 +529,15 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
}
defer node.manager.Collection.Unref(req.GetCollectionID(), 1)
if req.GetLoadScope() == querypb.LoadScope_Delta {
switch req.GetLoadScope() {
case querypb.LoadScope_Delta:
return node.loadDeltaLogs(ctx, req), nil
}
if req.GetLoadScope() == querypb.LoadScope_Index {
case querypb.LoadScope_Index:
return node.loadIndex(ctx, req), nil
}
if req.GetLoadScope() == querypb.LoadScope_Stats {
case querypb.LoadScope_Stats:
return node.loadStats(ctx, req), nil
case querypb.LoadScope_Reopen:
return node.reopenSegments(ctx, req), nil
}
// Actual load segment
@ -1272,6 +1273,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
return info.IndexInfo.IndexID, info.IndexInfo
}),
JsonStatsInfo: s.GetFieldJSONIndexStats(),
ManifestPath: s.LoadInfo().GetManifestPath(),
})
}

View File

@ -17,6 +17,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/proto/segcorepb"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -116,3 +117,7 @@ func (req *cLoadFieldDataRequest) Release() {
type AddFieldDataInfoRequest = LoadFieldDataRequest
type AddFieldDataInfoResult struct{}
type ReopenRequest struct {
LoadInfo *querypb.SegmentLoadInfo
}

View File

@ -319,6 +319,21 @@ func (s *cSegmentImpl) Load(ctx context.Context) error {
return ConsumeCStatusIntoError(&status)
}
func (s *cSegmentImpl) Reopen(ctx context.Context, req *ReopenRequest) error {
traceCtx := ParseCTraceContext(ctx)
defer runtime.KeepAlive(traceCtx)
defer runtime.KeepAlive(req)
segLoadInfo := ConvertToSegcoreSegmentLoadInfo(req.LoadInfo)
loadInfoBlob, err := proto.Marshal(segLoadInfo)
if err != nil {
return err
}
status := C.ReopenSegment(traceCtx.ctx, s.ptr, (*C.uint8_t)(unsafe.Pointer(&loadInfoBlob[0])), C.int64_t(len(loadInfoBlob)))
return ConsumeCStatusIntoError(&status)
}
func (s *cSegmentImpl) DropIndex(ctx context.Context, fieldID int64) error {
status := C.DropSealedSegmentIndex(s.ptr, C.int64_t(fieldID))
if err := ConsumeCStatusIntoError(&status); err != nil {

View File

@ -42,6 +42,8 @@ type SealedSegment interface {
DropIndex(ctx context.Context, fieldID int64) error
DropJSONIndex(ctx context.Context, fieldID int64, nestedPath string) error
Reopen(ctx context.Context, request *ReopenRequest) error
}
// basicSegmentMethodSet is the basic method set of a segment.

View File

@ -419,6 +419,7 @@ enum LoadScope {
Delta = 1;
Index = 2;
Stats = 3;
Reopen = 4;
}
message LoadSegmentsRequest {
@ -677,6 +678,7 @@ message SegmentVersionInfo {
bool is_sorted = 9;
repeated int64 field_json_index_stats = 10;
map<int64, JsonStatsInfo> json_stats_info = 11;
string manifest_path = 12;
}
message ChannelVersionInfo {

File diff suppressed because it is too large Load Diff